当前位置:AIGC资讯 > AIGC > 正文

LLaMA-Factory源码分析

搭建远程容器调试环境

docker-compose部署

使用LLaMA-Factory源码中的docker/docker-cuda下的Dockerfile和docker-compose.yml构建镜像,启动其服务。Dockerfile使用官方默认,只需要修改docker-compose.yml文件,国内使用modelscope仓库。

services:
  llamafactory:
    build:
      dockerfile: ./docker/docker-cuda/Dockerfile
      context: ../..
      args:
        INSTALL_BNB: false
        INSTALL_VLLM: false
        INSTALL_DEEPSPEED: false
        INSTALL_FLASHATTN: false
        INSTALL_MODELSCOPE: true
        PIP_INDEX: https://pypi.tuna.tsinghua.edu.cn/simple/
    container_name: llamafactory
    volumes:
#      - ../../hf_cache:/root/.cache/huggingface
      - ../../ms_cache:/root/.cache/modelscope
      - ../../data:/app/data
      - ../../output:/app/output
    ports:
      - "7860:7860"
      - "8000:8000"
      - "5008:22"
    ipc: host
    tty: true
    stdin_open: true
    command: bash
    environment:
      - USE_MODELSCOPE_HUB=1
    deploy:
      resources:
        reservations:
          devices:
          - driver: nvidia
            count: "all"
            capabilities: [ gpu ]
    restart: unless-stopped

执行命令

#下载源码
git clone --depth 1 https://github.com/hiyouga/LLaMA-Factory.git
cd LLaMA-Factory/docker/docker-cuda/
#构建镜像,启动服务
docker compose up -d
#进入容器
docker compose exec llamafactory bash

#启动webui服务

llamafactory-cli webui

#页面访问

http://localhost:7860/

 搭建远程容器调试环境

#打开LLaMA-Factory服务端口,宿主机执行

firewall-cmd --zone=public --add-port=7860/tcp --permanent

firewall-cmd --reload

#进入容器,更新容器root密码

passwd root

apt-get -y update

apt-get install openssh-server

apt-get install openssh-client

#允许root远程登录

vim /etc/ssh/sshd_config

PermitRootLogin yes

#重启ssh服务

/etc/init.d/ssh restart

#验证

ssh root@127.0.0.1 -p 8080

#配置使用modelscope国内仓库源环境变量

vim /etc/profile

export USE_MODELSCOPE_HUB=1

#安装微调使用的相关组件,根据报错信息,进行安装

pip install addict oss2 badam

pip install datasets==2.16.0

pip install protobuf==3.20

源码分析方式

搭建容器化运行环境,pycharm远程连接容器环境,远程调试分析源码。5008为远程LLaMA-Factory服务容器5008:22对外映射的端口。

运行参数配置:

train.py --stage sft --do_train True --model_name_or_path /root/.cache/modelscope/hub/Qwen2-0.5B --preprocessing_num_workers 16 --finetuning_type full --template qwen --flash_attn auto --dataset_dir /app/data --dataset alpaca_zh --cutoff_len 1024 --learning_rate 5e-05 --num_train_epochs 3.0 --max_samples 1000 --per_device_train_batch_size 2 --gradient_accumulation_steps 8 --lr_scheduler_type cosine --max_grad_norm 1.0 --logging_steps 5 --save_steps 100 --warmup_steps 0 --optim adamw_torch --packing False --report_to none --use_badam True --output_dir /app/saves/Qwen2-0.5B/full/train_test --fp16 True --plot_loss True --ddp_timeout 180000000 --include_num_input_tokens_seen True --badam_mode layer --badam_switch_mode ascending --badam_switch_interval 50 --badam_update_ratio 0.05 --val_size 0.1 --eval_strategy steps --eval_steps 100 --per_device_eval_batch_size 2

源码分析

概述

首先,以sft(Supervised Fine Tuning)监督微调为切入口,分析LLaMA-Factory源码运行机制。

LLaMA-Factory源码切入口,为train.py,执行该文件,分析sft运行过程。

运行时,主要日志

run_sft函数workflow.py主要部分

    tokenizer_module = load_tokenizer(model_args)
    tokenizer = tokenizer_module["tokenizer"]
    dataset_module = get_dataset(model_args, data_args, training_args, stage="sft", **tokenizer_module)
    model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train)

 sft主要内容

load_tokenizer载入分词器:根据模型参数,下载并加载指定模型中的分词器tokenizer,并导入指定模型分词器的类库,实例化分词器,并替换相关参数值,但是没有载入.safetensors结尾的模型权值参数等大文件信息。

get_dataset载入数据集:根据数据集参数,根据载入的分词器,构建指定模型的模板,使用模板template中的stop_workds的值'<|im_end|>',替换掉分词器tokenizer的eos_token的值(默认值是'<|endoftext|>')。使用加载的模型模板和分词器,构建分词器的chat_template聊天模板。从数据集模块库中,加载数据集模块,文本数据集模块格式json, 使用import导入数据集模块类库。更新数据集特征,transform数据集,将数据集字符转为数字编号。

load_model载入模型:根据分词器,模型参数、训练参数、微调参数加载模型。载入并补充相关参数, 替换,修正模型相关参数,载入指定模型的模块类,并实例化该类。

CustomSeq2SeqTrainer:初始化训练器trainer: 根据以上构建的信息,初始化训练器。

train训练模型: 主要使用transformers中的方法, 加载train_dataset,并加载torch的dataloader,构建train_dataloader,迭代训练模型。训练完成后,保存模型,输出相关信息。

解析参数

解析并校验参数:得到模型 数据集 训练 微调参数

数据参数

模型参数

微调参数

训练参数

载入分词器模块

tokenizer_module = load_tokenizer(model_args)
tokenizer = tokenizer_module["tokenizer"]

        该部分,只加载仓库中分词器相关信息,并没有载入.safetensors结尾的模型参数等大文件信息。主要加载参数指定模型的分词器,并替换分词器的eos、pad等相关参数信息,实例化分词器,载入分词器模块类库等。

主要流程

_get_init_kwargs根据模型参数,调用try_download_model_from_ms,判断本地模型是否存在,如果不存在,使用modelscope API查询模型快照,方便后续使用。 AutoTokenizer.from_pretrained调用get_tokenizer_config,最终会调用cached_file方法,获取模型仓库中的分词器配置文件tokenizer_config.json,如果本地存在,返回路径,不存在,调用modelscope API下载到本地,再返回本地路径。再open文件,读取配置文件具体内容,并返回,得到分词器的tokenizer_config字典。 从tokenizer_config字典中获取指定模型的分词器类名称Qwen2Tokenizer,如果use_fast为true且分词器类后缀没有Fast,则拼接,让分词器类为Qwen2TokenizerFast,表示可以从分词器类全局字典中获取该分词器类。因为分词器字典构造格式为key为模块名称:qwen, value为元组:('Qwen2Tokenizer', 'Qwen2TokenizerFast')。所以通过分词器类名称无法直接从分词器全局字典中获取器模块名称。需要for循环遍历该全局字典,直到判断出Qwen2TokenizerFast在value中,则获取到qwen模块名称,并使用import导入该模块,并从模块中得到分词器Qwen2TokenizerFast类,并返回给tokenizer_class。 tokenizer_class.from_pretrained将vocab.json tokenizer.json、tokenizer_config.json、merges.txt、special_tokens_map.json、added_tokens.json添加到词汇表vocab_files。循环遍历vocab_files,解析词汇表文件名,并使用cached_file方法判断文件是否存在,不存在,从modescope或huggingface下载,存在则直接使用,并将文件的绝对路径信息,依次存放入resolced_vocab_files字典中。 cls._from_pretrained使用token = AddedToken(**token) ,添加toeken decoder,并使用cls.convert_added_tokens将AddedTokens添加到init_kwargs参数字典的相应字段中。 tokenizer = cls(*init_inputs, **init_kwargs)基于之前的参数信息,实例化分词器类Qwen2TokenizerFast(即初始化分词器类)。实例化完成后,返回分词器类,并返回分词器模块tokenizer_module。

总结:生成相关类,先根据参数,读取相关文件配置信息,根据模型类型,获取对应模型的对应类,再使用读取的文件信息和相关参数,实例化模型的相关类。

关键源码及关键数据展示

AutoTokenizer.from_pretrained 基于模型类型,实例化分词器类。

def load_tokenizer(model_args: "ModelArguments") -> "TokenizerModule":
    r"""
    Loads pretrained tokenizer.

    Note: including inplace operation of model_args.
    """
    init_kwargs = _get_init_kwargs(model_args)
    try:
        tokenizer = AutoTokenizer.from_pretrained(
            model_args.model_name_or_path,
            use_fast=model_args.use_fast_tokenizer,
            split_special_tokens=model_args.split_special_tokens,
            padding_side="right",
            **init_kwargs,
        )
    except ValueError:  # try the fast one
        tokenizer = AutoTokenizer.from_pretrained(
            model_args.model_name_or_path,
            use_fast=True,
            padding_side="right",
            **init_kwargs,
        )

    if model_args.new_special_tokens is not None:
        num_added_tokens = tokenizer.add_special_tokens(
            dict(additional_special_tokens=model_args.new_special_tokens),
            replace_additional_special_tokens=False,
        )
        logger.info("Add {} to special tokens.".format(",".join(model_args.new_special_tokens)))
        if num_added_tokens > 0 and not model_args.resize_vocab:
            model_args.resize_vocab = True
            logger.warning("New tokens have been added, changed `resize_vocab` to True.")

    patch_tokenizer(tokenizer)

    if model_args.visual_inputs:
        try:
            processor = AutoProcessor.from_pretrained(model_args.model_name_or_path, **init_kwargs)
            setattr(processor, "tokenizer", tokenizer)
        except Exception:
            raise ValueError(
                "This multimodal LLM is not supported.\n"
                "Download LLaVA-1.5 models from: https://huggingface.co/llava-hf\n"
                "Download Yi-VL models from: https://huggingface.co/BUAADreamer"
            )
    else:
        processor = None

    return {"tokenizer": tokenizer, "processor": processor}
tokenizer_config = get_tokenizer_config(pretrained_model_name_or_path, **kwargs),get_tokenizer_config从模型中载入预训练分词器。其中的cached_file判断模型是否存在,直接使用,不存在,从ms或hg下载,返回tokenizer_config.json的路径信息,resolved_config_file='/root/.cache/modelscope/hub/Qwen2-0.5B/tokenizer_config.json'。open打开该文件,读取文件信息,并将文件信息返回给tokenizer_config。
Qwen2-0.5B/tokenizer_config.json文件具体信息

{
  "add_prefix_space": false,
  "added_tokens_decoder": {
    "151643": {
      "content": "<|endoftext|>",
      "lstrip": false,
      "normalized": false,
      "rstrip": false,
      "single_word": false,
      "special": true
    },
    "151644": {
      "content": "<|im_start|>",
      "lstrip": false,
      "normalized": false,
      "rstrip": false,
      "single_word": false,
      "special": true
    },
    "151645": {
      "content": "<|im_end|>",
      "lstrip": false,
      "normalized": false,
      "rstrip": false,
      "single_word": false,
      "special": true
    }
  },
  "additional_special_tokens": ["<|im_start|>", "<|im_end|>"],
  "bos_token": null,
  "chat_template": "{% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<|im_start|>system\nYou are a helpful assistant<|im_end|>\n' }}{% endif %}{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}{% if add_generation_prompt %}{{ '<|im_start|>assistant\n' }}{% endif %}",
  "clean_up_tokenization_spaces": false,
  "eos_token": "<|endoftext|>",
  "errors": "replace",
  "model_max_length": 32768,
  "pad_token": "<|endoftext|>",
  "split_special_tokens": false,
  "tokenizer_class": "Qwen2Tokenizer",
  "unk_token": null
}
config_tokenizer_class = tokenizer_config.get("tokenizer_class")获取分词器的类'Qwen2Tokenizer'.
def tokenizer_class_from_name(class_name: str):
    if class_name == "PreTrainedTokenizerFast":
        return PreTrainedTokenizerFast

    for module_name, tokenizers in TOKENIZER_MAPPING_NAMES.items():
        if class_name in tokenizers:
            #根据分词类查找所属模块名
            module_name = model_type_to_module_name(module_name)
            #根据模块名,加载该模块
            module = importlib.import_module(f".{module_name}", "transformers.models")
            try:
                return getattr(module, class_name)
            except AttributeError:
                continue

    for config, tokenizers in TOKENIZER_MAPPING._extra_content.items():
        for tokenizer in tokenizers:
            if getattr(tokenizer, "__name__", None) == class_name:
                return tokenizer

    # We did not fine the class, but maybe it's because a dep is missing. In that case, the class will be in the main
    # init and we return the proper dummy to get an appropriate error message.
    main_module = importlib.import_module("transformers")
    if hasattr(main_module, class_name):
        return getattr(main_module, class_name)

    return None
tokenizer_class_from_name(config_tokenizer_class) is not None
or tokenizer_class_from_name(config_tokenizer_class + "Fast") is not None

在分词器类库TOKENIZER_MAPPING_NAMES字典中,根据Qwen2Tokenizer类名查找模块名qwen2,根据模块名导入该模块类库。根据use_fast判断,添加Fast,'Qwen2TokenizerFast'表示从分词器类库中加载分词器类。

            if use_fast and not config_tokenizer_class.endswith("Fast"):
                tokenizer_class_candidate = f"{config_tokenizer_class}Fast"
                tokenizer_class = tokenizer_class_from_name(tokenizer_class_candidate)
实例化预训练分词器
tokenizer_class.from_pretrained(pretrained_model_name_or_path, *inputs, **kwargs)
将vocab.json tokenizer.json tokenizer_config.json merges.txt 'special_tokens_map.json' 'added_tokens.json'添加到词汇表文件vocab_files。
vocab_files = {**cls.vocab_files_names, **additional_files_names}

循环遍历vocab_files,解析词汇表文件名,并使用cached_file方法判断文件是否存在,不存在,从modescope或huggingface下载,存在则直接使用。返回文件的路径信息。

日志输出载入的文件信息

 for file_id, file_path in vocab_files.items():
            if file_id not in resolved_vocab_files:
                continue

            if is_local:
                logger.info(f"loading file {file_path}")
            else:
                logger.info(f"loading file {file_path} from cache at {resolved_vocab_files[file_id]}")

 基于之前获取的信息,进一步处理相关信息,读取tokenizer_config.json赋值给init_kwargs,init_kwargs并添加added_tokens_decoder,

return cls._from_pretrained(
            resolved_vocab_files,
            pretrained_model_name_or_path,
            init_configuration,
            *init_inputs,
            token=token,
            cache_dir=cache_dir,
            local_files_only=local_files_only,
            _commit_hash=commit_hash,
            _is_local=is_local,
            trust_remote_code=trust_remote_code,
            **kwargs,
        )
token = AddedToken(**token)更新token
 if "added_tokens_decoder" in init_kwargs:
            for idx, token in init_kwargs["added_tokens_decoder"].items():
                if isinstance(token, dict):
                    token = AddedToken(**token)
                if isinstance(token, AddedToken):
                    added_tokens_decoder[int(idx)] = token
                    added_tokens_map[str(token)] = token
                else:
                    raise ValueError(
                        f"Found a {token.__class__} in the saved `added_tokens_decoder`, should be a dictionary or an AddedToken instance"
                    )

将AddedToken而不是字符串传递给类,以防止将字符串强制转换为不同的AddedToken

遍历指定的token列表,更新init_kwargs中相应的token值

     init_kwargs["added_tokens_decoder"] = added_tokens_decoder
        init_kwargs = cls.convert_added_tokens(init_kwargs, save=False)
        for key in cls.SPECIAL_TOKENS_ATTRIBUTES & init_kwargs.keys():
            if added_tokens_map != {} and init_kwargs[key] is not None:
                if key != "additional_special_tokens":
                    init_kwargs[key] = added_tokens_map.get(str(init_kwargs[key]), init_kwargs[key])
    SPECIAL_TOKENS_ATTRIBUTES = [
        "bos_token",
        "eos_token",
        "unk_token",
        "sep_token",
        "pad_token",
        "cls_token",
        "mask_token",
        "additional_special_tokens",
    ]
根据之前处理的参数信息,构造Qwen2TokenizerFast的分词器,并返回,赋值给tokenizer_module
tokenizer = cls(*init_inputs, **init_kwargs)
class Qwen2TokenizerFast(PreTrainedTokenizerFast):
    """
    Construct a "fast" Qwen2 tokenizer (backed by HuggingFace's *tokenizers* library). Based on byte-level
    Byte-Pair-Encoding.

    Same with GPT2Tokenizer, this tokenizer has been trained to treat spaces like parts of the tokens so a word will
    be encoded differently whether it is at the beginning of the sentence (without space) or not:

    ```python
    >>> from transformers import Qwen2TokenizerFast

    >>> tokenizer = Qwen2TokenizerFast.from_pretrained("Qwen/Qwen-tokenizer")
    >>> tokenizer("Hello world")["input_ids"]
    [9707, 1879]

    >>> tokenizer(" Hello world")["input_ids"]
    [21927, 1879]
    ```
    This is expected.

    This tokenizer inherits from [`PreTrainedTokenizerFast`] which contains most of the main methods. Users should
    refer to this superclass for more information regarding those methods.

    Args:
        vocab_file (`str`, *optional*):
            Path to the vocabulary file.
        merges_file (`str`, *optional*):
            Path to the merges file.
        tokenizer_file (`str`, *optional*):
            Path to [tokenizers](https://github.com/huggingface/tokenizers) file (generally has a .json extension) that
            contains everything needed to load the tokenizer.
        unk_token (`str`, *optional*, defaults to `"<|endoftext|>"`):
            The unknown token. A token that is not in the vocabulary cannot be converted to an ID and is set to be this
            token instead. Not applicable to this tokenizer.
        bos_token (`str`, *optional*):
            The beginning of sequence token. Not applicable for this tokenizer.
        eos_token (`str`, *optional*, defaults to `"<|endoftext|>"`):
            The end of sequence token.
        pad_token (`str`, *optional*, defaults to `"<|endoftext|>"`):
            The token used for padding, for example when batching sequences of different lengths.
    """

    vocab_files_names = VOCAB_FILES_NAMES
    model_input_names = ["input_ids", "attention_mask"]
    slow_tokenizer_class = Qwen2Tokenizer

    def __init__(
        self,
        vocab_file=None,
        merges_file=None,
        tokenizer_file=None,
        unk_token="<|endoftext|>",
        bos_token=None,
        eos_token="<|endoftext|>",
        pad_token="<|endoftext|>",
        **kwargs,
    ):
        # We need to at least pass vocab_file and merges_file to base class
        # in case a slow tokenizer needs to be initialized; other can be
        # configured through files.
        # following GPT2TokenizerFast, also adding unk_token, bos_token, and eos_token

        bos_token = (
            AddedToken(bos_token, lstrip=False, rstrip=False, special=True, normalized=False)
            if isinstance(bos_token, str)
            else bos_token
        )
        eos_token = (
            AddedToken(eos_token, lstrip=False, rstrip=False, special=True, normalized=False)
            if isinstance(eos_token, str)
            else eos_token
        )
        unk_token = (
            AddedToken(unk_token, lstrip=False, rstrip=False, special=True, normalized=False)
            if isinstance(unk_token, str)
            else unk_token
        )
        pad_token = (
            AddedToken(pad_token, lstrip=False, rstrip=False, special=True, normalized=False)
            if isinstance(pad_token, str)
            else pad_token
        )

        super().__init__(
            vocab_file=vocab_file,
            merges_file=merges_file,
            tokenizer_file=tokenizer_file,
            unk_token=unk_token,
            bos_token=bos_token,
            eos_token=eos_token,
            pad_token=pad_token,
            **kwargs,
        )

 载入数据集模块

dataset_module = get_dataset(model_args, data_args, training_args, stage="sft", **tokenizer_module)

构建模板

template = get_template_and_fix_tokenizer(tokenizer, data_args.template, data_args.tool_format)

        比较重要和复杂的是get_dataset,训练模型,最重要的部分是组织训练数据。根据参数name模板的名称,获取相应模板,示例为qwen的模板。根据输入的 name,获取相应的模板,如果没有提供 name 或提供的 name 不存在,则使用默认模板 "vanilla"。

        如果模板指示需要替换 EOS(End of String)标记,且模板还指定了停用词,则取出第一个停用词在分词器中替换 EOS 标记。,并将剩余的停用词保存起来。如果分词器中没有定义 EOS 标记,则在分词器中添加一个空EOS 标记"<|endoftext|>"。如果分词器中没有定义 PAD 标记,则将 PAD 标记设置为与 EOS 标记相同的值,并在日志中记录。

        如果模板指定了停用词,并且有剩余的停用词,则将这些停用词添加到分词器的特殊标记中,并在日志中记录。如果成功添加了新的特殊标记,则会发出警告提醒用户确认是否需要调整词汇表大小。尝试将模板转换为 Jinja 模板,并将其与分词器相关联。

主要流程

get_template_and_fix_tokenizer根据提供的tokenizer、model_args、data_args和training_args参数,获取数据集的模板并进行一些预处理。首先,根据指定的模板名称qwen,从模板字典中获取qwen的模板。使用模板template中的stop_workds的值'<|im_end|>',替换掉分词器tokenizer的eos_token的值。 使用加载的模型模板和分词器,构建分词器的chat_template聊天模板tokenizer.chat_template = _get_jinja_template(template, tokenizer)。以上构建模板,主要是加载name命中模板,并使用模板对分词器tokenizer的eos_token,pad_token,chat_template等相关参数进行替换,进一步细化分词器。 模板构建完成后,开始处理数据集。get_dataset_list(dataset_names, data_args.dataset_dir)从/app/data/dataset_info.json读取alpaca_zh数据集信息。如果参数dataset_names数据集在dataset_info中,则获取其中的huggingface或modelscope中的数据集url信息。并使用use_modelscope获取系统中的USE_MODELSCOPE_HUB=1环境变量信息,使用国内modelscope源。同时构建alpaca_zh的dataset_attr数据集属性信息。  _load_single_dataset(dataset_attr, model_args, data_args, training_args)获取下载信息,下载路径,从modelscope中导入MsDataset类,使用MsDataset类中的load方法,使用modelscope API,首先获取数据集alpaca_zh在仓库的id号18504,load_dataset_with_ctx最终调用modescope  API 'https://www.modelscope.cn/api/v1/datasets/18504/repo/tree'下载数据集,将数据集下载到缓存路径(默认路径/root/.cache/modelscope/hub/datasets),并返回dataset类。 dataset = dataset.select(range(max_samples))根据采样大小,调用transmit_format处理原始数据集,生成新的数据集。将数据集重新转换和封装。 align_dataset(dataset, dataset_attr, data_args, training_args)将数据集转换为问答对形式,转换完成,返回dataset。 数据集载入完成后,_get_preprocessed_dataset预处理数据集,dataset = dataset.map(preprocess_func, batched=True, remove_columns=column_names, **kwargs), 更新数据集特征,transform数据集,将数据集字符转为数字编号。生成编号后的新数据集,并返回主流程方法中。 split_dataset拆分数据集,dataset.train_test_split将数据集进行拆分,并创建当前数据集的hash值,并命名数据文件,如:'.cache/modelscope/hub/datasets/llamafactory___alpaca_zh/default-74c98c6abb3f5e6e/0.0.0/master/cache-7abb96404bd03dc4.arrow'。生成train训练集和validation验证集文件,并返回DatasetDict数据集字典。并从字典中,构建包含训练集和数据集的dataset_module数据模块,并返回。

关键源码及关键数据展示

根据参数name模板的名称,获取相应模板,示例为qwen的模板。

def get_template_and_fix_tokenizer(
    tokenizer: "PreTrainedTokenizer",
    name: Optional[str] = None,
    tool_format: Optional[str] = None,
) -> Template:
    if name is None:
        template = TEMPLATES["empty"]  # placeholder
    else:
         #根据name模型模板名称,获取模板
        template = TEMPLATES.get(name, None)
        if template is None:
            raise ValueError("Template {} does not exist.".format(name))

    if tool_format is not None:
        logger.info("Using tool format: {}.".format(tool_format))
        eos_slots = [] if template.efficient_eos else [{"eos_token"}]
        template.format_tools = ToolFormatter(tool_format=tool_format)
        template.format_function = FunctionFormatter(slots=eos_slots, tool_format=tool_format)

    stop_words = template.stop_words
    if template.replace_eos:
        if not stop_words:
            raise ValueError("Stop words are required to replace the EOS token.")
        #使用模板template中的stop_workds的值'<|im_end|>',替换掉分词器tokenizer的eos_token的值。

        _add_or_replace_eos_token(tokenizer, eos_token=stop_words[0])
        stop_words = stop_words[1:]

    if tokenizer.eos_token_id is None:
        _add_or_replace_eos_token(tokenizer, eos_token="<|endoftext|>")

    if tokenizer.pad_token_id is None:
        tokenizer.pad_token = tokenizer.eos_token
        logger.info("Add pad token: {}".format(tokenizer.pad_token))

    if stop_words:
        num_added_tokens = tokenizer.add_special_tokens(
            dict(additional_special_tokens=stop_words), replace_additional_special_tokens=False
        )
        logger.info("Add {} to stop words.".format(",".join(stop_words)))
        if num_added_tokens > 0:
            logger.warning("New tokens have been added, make sure `resize_vocab` is True.")

    try:
        tokenizer.chat_template = _get_jinja_template(template, tokenizer)
    except ValueError:
        logger.info("Cannot add this chat template to tokenizer.")

    return template

使用模板template中的stop_workds的值'<|im_end|>',替换掉分词器tokenizer的eos_token的值(默认值是'<|endoftext|>')。

eos_token替换前

eos_token替换后

 使用加载的模型模板和分词器,构建分词器的chat_template聊天模板

tokenizer.chat_template = _get_jinja_template(template, tokenizer)

以上构建模板,主要是加载name命中模板,并使用模板对分词器tokenizer的eos_token,pad_token,chat_template等相关参数进行替换,进一步细化分词器。

get_dataset_list(dataset_names, data_args.dataset_dir)

从'/app/data/dataset_info.json'获取['alpaca_zh']数据集信息

如果参数dataset_names数据集在dataset_info中,则获取其中的huggingface或modelscope中的数据集url信息。并使用use_modelscope获取系统中的USE_MODELSCOPE_HUB=1环境变量信息,使用国内modelscope源。同时构建dataset_attr数据集属性信息。

def get_dataset_list(dataset_names: Optional[Sequence[str]], dataset_dir: str) -> List["DatasetAttr"]:
    r"""
    Gets the attributes of the datasets.
    """
    if dataset_names is None:
        dataset_names = []

    if dataset_dir == "ONLINE":
        dataset_info = None
    else:
        if dataset_dir.startswith("REMOTE:"):
            config_path = cached_file(path_or_repo_id=dataset_dir[7:], filename=DATA_CONFIG, repo_type="dataset")
        else:
            config_path = os.path.join(dataset_dir, DATA_CONFIG)

        try:
            with open(config_path, "r") as f:
                dataset_info = json.load(f)
        except Exception as err:
            if len(dataset_names) != 0:
                raise ValueError("Cannot open {} due to {}.".format(config_path, str(err)))

            dataset_info = None

    dataset_list: List["DatasetAttr"] = []
    for name in dataset_names:
        if dataset_info is None:  # dataset_dir is ONLINE
            load_from = "ms_hub" if use_modelscope() else "hf_hub"
            dataset_attr = DatasetAttr(load_from, dataset_name=name)
            dataset_list.append(dataset_attr)
            continue

        if name not in dataset_info:
            raise ValueError("Undefined dataset {} in {}.".format(name, DATA_CONFIG))

        has_hf_url = "hf_hub_url" in dataset_info[name]
        has_ms_url = "ms_hub_url" in dataset_info[name]

        if has_hf_url or has_ms_url:
            if (use_modelscope() and has_ms_url) or (not has_hf_url):
                dataset_attr = DatasetAttr("ms_hub", dataset_name=dataset_info[name]["ms_hub_url"])
            else:
                dataset_attr = DatasetAttr("hf_hub", dataset_name=dataset_info[name]["hf_hub_url"])
        elif "script_url" in dataset_info[name]:
            dataset_attr = DatasetAttr("script", dataset_name=dataset_info[name]["script_url"])
        else:
            dataset_attr = DatasetAttr("file", dataset_name=dataset_info[name]["file_name"])

        dataset_attr.set_attr("formatting", dataset_info[name], default="alpaca")
        dataset_attr.set_attr("ranking", dataset_info[name], default=False)
        dataset_attr.set_attr("subset", dataset_info[name])
        dataset_attr.set_attr("split", dataset_info[name], default="train")
        dataset_attr.set_attr("folder", dataset_info[name])
        dataset_attr.set_attr("num_samples", dataset_info[name])

        if "columns" in dataset_info[name]:
            column_names = ["system", "tools", "images", "chosen", "rejected", "kto_tag"]
            if dataset_attr.formatting == "alpaca":
                column_names.extend(["prompt", "query", "response", "history"])
            else:
                column_names.extend(["messages"])

            for column_name in column_names:
                dataset_attr.set_attr(column_name, dataset_info[name]["columns"])

        if dataset_attr.formatting == "sharegpt" and "tags" in dataset_info[name]:
            tag_names = (
                "role_tag",
                "content_tag",
                "user_tag",
                "assistant_tag",
                "observation_tag",
                "function_tag",
                "system_tag",
            )
            for tag in tag_names:
                dataset_attr.set_attr(tag, dataset_info[name]["tags"])

        dataset_list.append(dataset_attr)

    return dataset_list
_load_single_dataset(dataset_attr, model_args, data_args, training_args)获取下载信息,下载路径,使用MsDataset类中的load方法,使用modelscope API将数据集下载到缓存路径(默认路径/root/.cache/modelscope/hub/datasets)
 elif hub == Hubs.modelscope:

            # Get dataset type from ModelScope Hub;  dataset_type->4: General Dataset
            from modelscope.hub.api import HubApi
            _api = HubApi()
            dataset_id_on_hub, dataset_type = _api.get_dataset_id_and_type(
                dataset_name=dataset_name, namespace=namespace)

            # Load from the ModelScope Hub for type=4 (general)
            if str(dataset_type) == str(DatasetFormations.general.value):

                with load_dataset_with_ctx(
                        path=namespace + '/' + dataset_name,
                        name=subset_name,
                        data_dir=data_dir,
                        data_files=data_files,
                        split=split,
                        cache_dir=cache_dir,
                        features=None,
                        download_config=None,
                        download_mode=download_mode.value,
                        revision=version,
                        token=token,
                        streaming=use_streaming,
                        dataset_info_only=dataset_info_only,
                        **config_kwargs) as dataset_res:

                    return dataset_res
dataset_module_factory可以使用数据集下载脚本'llamafactory/alpaca_zh/alpaca_zh.py'或API下载,示例使用API。

最终调用modescope API 'https://www.modelscope.cn/api/v1/datasets/18504/repo/tree'下载数据集,18504是数据集alpaca_zh在modescope仓库id号。

    def get_dataset_infos(self,
                          dataset_hub_id: str,
                          revision: str,
                          files_metadata: bool = False,
                          timeout: float = 100,
                          recursive: str = 'True'):
        """
        Get dataset infos.
        """
        datahub_url = f'{self.endpoint}/api/v1/datasets/{dataset_hub_id}/repo/tree'
        params = {'Revision': revision, 'Root': None, 'Recursive': recursive}
        cookies = ModelScopeConfig.get_cookies()
        if files_metadata:
            params['blobs'] = True
        r = self.session.get(datahub_url, params=params, cookies=cookies, timeout=timeout)
        resp = r.json()
        datahub_raise_on_error(datahub_url, resp, r)

        return resp

解析返回结果中的'Data'字段数据,再解析出‘Files’参数,获取数据集文件

构建并返回数据集信息,此时,只返回仓库中数据集文件相关信息,并未读取数据集文件中的数据。

def _dataset_info(
    self,
    repo_id: str,
    *,
    revision: Optional[str] = None,
    timeout: Optional[float] = None,
    files_metadata: bool = False,
    token: Optional[Union[bool, str]] = None,
) -> HfDatasetInfo:
    """
    Get info on one specific dataset on huggingface.co.

    Dataset can be private if you pass an acceptable token.

    Args:
        repo_id (`str`):
            A namespace (user or an organization) and a repo name separated
            by a `/`.
        revision (`str`, *optional*):
            The revision of the dataset repository from which to get the
            information.
        timeout (`float`, *optional*):
            Whether to set a timeout for the request to the Hub.
        files_metadata (`bool`, *optional*):
            Whether or not to retrieve metadata for files in the repository
            (size, LFS metadata, etc). Defaults to `False`.
        token (`bool` or `str`, *optional*):
            A valid authentication token (see https://huggingface.co/settings/token).
            If `None` or `True` and machine is logged in (through `huggingface-cli login`
            or [`~huggingface_hub.login`]), token will be retrieved from the cache.
            If `False`, token is not sent in the request header.

    Returns:
        [`hf_api.DatasetInfo`]: The dataset repository information.

    <Tip>

    Raises the following errors:

        - [`~utils.RepositoryNotFoundError`]
          If the repository to download from cannot be found. This may be because it doesn't exist,
          or because it is set to `private` and you do not have access.
        - [`~utils.RevisionNotFoundError`]
          If the revision to download from cannot be found.

    </Tip>
    """
    _api = HubApi()
    _namespace, _dataset_name = repo_id.split('/')
    dataset_hub_id, dataset_type = _api.get_dataset_id_and_type(
        dataset_name=_dataset_name, namespace=_namespace)

    revision: str = revision or 'master'
    data = _api.get_dataset_infos(dataset_hub_id=dataset_hub_id,
                                  revision=revision,
                                  files_metadata=files_metadata,
                                  timeout=timeout)

    # Parse data
    data_d: dict = data['Data']
    data_file_list: list = data_d['Files']
    # commit_info: dict = data_d['LatestCommitter']

    # Update data   # TODO: columns align with HfDatasetInfo
    data['id'] = repo_id
    data['private'] = False
    data['author'] = repo_id.split('/')[0] if repo_id else None
    data['sha'] = revision
    data['lastModified'] = None
    data['gated'] = False
    data['disabled'] = False
    data['downloads'] = 0
    data['likes'] = 0
    data['tags'] = []
    data['cardData'] = []
    data['createdAt'] = None

    # e.g. {'rfilename': 'xxx', 'blobId': 'xxx', 'size': 0, 'lfs': {'size': 0, 'sha256': 'xxx', 'pointerSize': 0}}
    data['siblings'] = []
    for file_info_d in data_file_list:
        file_info = {
            'rfilename': file_info_d['Path'],
            'blobId': file_info_d['Id'],
            'size': file_info_d['Size'],
            'type': 'directory' if file_info_d['Type'] == 'tree' else 'file',
            'lfs': {
                'size': file_info_d['Size'],
                'sha256': file_info_d['Sha256'],
                'pointerSize': 0
            }
        }
        data['siblings'].append(file_info)

    return HfDatasetInfo(**data)

先下载READE.md文件到本地 

get_from_cache_ms执行具体下载操作
return HubDatasetModuleFactoryWithoutScript(
    path,
    revision=revision,
    data_dir=data_dir,
    data_files=data_files,
    download_config=download_config,
    download_mode=download_mode,
).get_module()

从数据集模块库中,加载数据集模块,文本数据集模块格式json, 使用import导入数据集模块类库

 else:
        builder_configs: List[BuilderConfig] = [
            import_main_class(module_path).BUILDER_CONFIG_CLASS(
                data_files=data_files,
                **default_builder_kwargs,
            )
        ]
        default_config_name = None

下载到本地[{'filename': '/root/.cache/modelscope/hub/datasets/llamafactory___alpaca_zh/default-74c98c6abb3f5e6e/0.0.0/master/alpaca_zh-train.arrow'}]

builder_name为json,构建json格式的返回数据,同时根据column_names列构建data数据

根据max_samples设置的大小处理datasets 

_load_single_dataset函数方法   
 if data_args.max_samples is not None:  # truncate dataset
        max_samples = min(data_args.max_samples, len(dataset))
        dataset = dataset.select(range(max_samples))
def transmit_format(func):
    """Wrapper for dataset transforms that recreate a new Dataset to transmit the format of the original dataset to the new dataset"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if args:
            self: "Dataset" = args[0]
            args = args[1:]
        else:
            self: "Dataset" = kwargs.pop("self")
        # don't use self.format since it returns a list of columns for 'columns' even if self_format_columns is None
        unformatted_columns = set(self.column_names) - set(self._format_columns or [])
        self_format = {
            "type": self._format_type,
            "format_kwargs": self._format_kwargs,
            "columns": self._format_columns,
            "output_all_columns": self._output_all_columns,
        }
        # apply actual function
        out: Union["Dataset", "DatasetDict"] = func(self, *args, **kwargs)
        datasets: List["Dataset"] = list(out.values()) if isinstance(out, dict) else [out]
        # re-apply format to the output
        for dataset in datasets:
            new_format = self_format.copy()
            if new_format["columns"] is not None:  # new formatted columns = (columns - previously unformatted columns)
                # sort the columns to have a deterministic list of columns that we can compare with `out_format`
                new_format["columns"] = sorted(set(dataset.column_names) - unformatted_columns)
            out_format = {
                "type": dataset._format_type,
                "format_kwargs": dataset._format_kwargs,
                "columns": sorted(dataset._format_columns) if dataset._format_columns is not None else None,
                "output_all_columns": dataset._output_all_columns,
            }
            if out_format != new_format:
                fingerprint = dataset._fingerprint
                dataset.set_format(**new_format)
                dataset._fingerprint = fingerprint
        return out

    wrapper._decorator_name_ = "transmit_format"
    return wrapper
align_dataset(dataset, dataset_attr, data_args, training_args)将数据集转换为问答对形式,
 dataset.map(
    convert_func,
    batched=True,
    remove_columns=column_names,
    features=features,
    **kwargs,
)将数据集进行transform
def align_dataset(
    dataset: Union["Dataset", "IterableDataset"],
    dataset_attr: "DatasetAttr",
    data_args: "DataArguments",
    training_args: "Seq2SeqTrainingArguments",
) -> Union["Dataset", "IterableDataset"]:
    r"""
    Aligned dataset:
        prompt: [{"role": "user", "content": "..."}] * (2T - 1)
        response: [{"role": "assistant", "content": "..."}] * N (N > 1 for ranking dataset)
        system: "..."
        tools: "...",
        images: [],
    """
    if dataset_attr.formatting == "alpaca":
        convert_func = partial(convert_alpaca, dataset_attr=dataset_attr, data_args=data_args)
    else:
        convert_func = partial(convert_sharegpt, dataset_attr=dataset_attr, data_args=data_args)

    column_names = list(next(iter(dataset)).keys())
    features = Features.from_dict(
        {
            "prompt": [
                {"role": {"dtype": "string", "_type": "Value"}, "content": {"dtype": "string", "_type": "Value"}}
            ],
            "response": [
                {"role": {"dtype": "string", "_type": "Value"}, "content": {"dtype": "string", "_type": "Value"}}
            ],
            "system": {"dtype": "string", "_type": "Value"},
            "tools": {"dtype": "string", "_type": "Value"},
            "images": [{"_type": "Image"}],
        }
    )
    kwargs = {}
    if not data_args.streaming:
        kwargs = dict(
            num_proc=data_args.preprocessing_num_workers,
            load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0),
            desc="Converting format of dataset",
        )

    return dataset.map(
        convert_func,
        batched=True,
        remove_columns=column_names,
        features=features,
        **kwargs,
    )

 将数据集重新转换和封装。创建当前数据集的hash值,并命名数据文件'/root/.cache/modelscope/hub/datasets/llamafactory___alpaca_zh/default-74c98c6abb3f5e6e/0.0.0/master/cache-7abb96404bd03dc4.arrow'

def transmit_tasks(func):
    """Wrapper for dataset transforms that recreate a new Dataset to transmit the task templates of the original dataset to the new dataset"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if args:
            self: "Dataset" = args[0]
            args = args[1:]
        else:
            self: "Dataset" = kwargs.pop("self")
        # apply actual function
        out: Union["Dataset", "DatasetDict"] = func(self, *args, **kwargs)
        datasets: List["Dataset"] = list(out.values()) if isinstance(out, dict) else [out]
        for dataset in datasets:
            # Remove task templates if a column mapping of the template is no longer valid
            if self.info.task_templates is not None:
                dataset.info.task_templates = [
                    template
                    for template in self.info.task_templates
                    if all(
                        dataset._info.features.get(k) == self._info.features.get(k)
                        for k in template.column_mapping.keys()
                    )
                ]
        return out

    wrapper._decorator_name_ = "transmit_tasks"
    return wrapper

根据num_proc参数(示例为16),分片数据集。分割成16片,每片是7*5大小。

 
shards = [
                self.shard(num_shards=num_proc, index=rank, contiguous=True, keep_in_memory=keep_in_memory)
                for rank in range(num_proc)
            ]

根据任务数量,构建每个job处理的分片数据集信息

kwargs_per_job = [
                {
                    **dataset_kwargs,
                    "shard": shards[rank],
                    "cache_file_name": format_cache_file_name(cache_file_name, rank),
                    "rank": rank,
                    "offset": sum(len(s) for s in shards[:rank]),
                    "new_fingerprint": format_new_fingerprint(new_fingerprint, rank),
                }
                for rank in range(num_shards)
            ]

 更新数据集特征,transform数据集,将数据集字符转为数字编号。

    transformed_shards = [None] * num_shards
            for rank in range(num_shards):
                try:
                    transformed_shards[rank] = load_processed_shard_from_cache(kwargs_per_job[rank])
                    kwargs_per_job[rank] = None
                except NonExistentDatasetError:
                    pass

 将数据集处理为预训练数据集


    # Load and preprocess dataset
    with training_args.main_process_first(desc="load dataset"):
        dataset = _get_merged_dataset(data_args.dataset, model_args, data_args, training_args, stage)
        eval_dataset = _get_merged_dataset(data_args.eval_dataset, model_args, data_args, training_args, stage)

    with training_args.main_process_first(desc="pre-process dataset"):
        dataset = _get_preprocessed_dataset(
            dataset, data_args, training_args, stage, template, tokenizer, processor, is_eval=False
        )
        eval_dataset = _get_preprocessed_dataset(
            eval_dataset, data_args, training_args, stage, template, tokenizer, processor, is_eval=True
        )
def _get_preprocessed_dataset(
    dataset: Optional[Union["Dataset", "IterableDataset"]],
    data_args: "DataArguments",
    training_args: "Seq2SeqTrainingArguments",
    stage: Literal["pt", "sft", "rm", "ppo", "kto"],
    template: "Template",
    tokenizer: "PreTrainedTokenizer",
    processor: Optional["ProcessorMixin"] = None,
    is_eval: bool = False,
) -> Optional[Union["Dataset", "IterableDataset"]]:
    if dataset is None:
        return None

    preprocess_func, print_function = get_preprocess_and_print_func(
        data_args, stage, template, tokenizer, processor, do_generate=(training_args.predict_with_generate and is_eval)
    )
    column_names = list(next(iter(dataset)).keys())
    kwargs = {}
    if not data_args.streaming:
        kwargs = dict(
            num_proc=data_args.preprocessing_num_workers,
            load_from_cache_file=(not data_args.overwrite_cache) or (training_args.local_process_index != 0),
            desc="Running tokenizer on dataset",
        )

    dataset = dataset.map(preprocess_func, batched=True, remove_columns=column_names, **kwargs)

    if training_args.should_log:
        try:
            print("eval example:" if is_eval else "training example:")
            print_function(next(iter(dataset)))
        except StopIteration:
            if stage == "pt":
                raise RuntimeError("Cannot find sufficient samples, consider increasing dataset size.")
            else:
                raise RuntimeError("Cannot find valid samples, check `data/README.md` for the data format.")

    return dataset

 数据集处理完成后,返回给dataset_module

载入模型模块 

model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train)

主要流程

load_model载入预训练模型。_get_init_kwargs根据模型参数,调用try_download_model_from_ms,判断本地模型是否存在,如果不存在,使用modelscope API查询模型快照,方便后续使用。 load_config载入模型的config配置信息,依然会调用_get_init_kwargs判断模型是否存在,接着在AutoConfig.from_pretrained方法中,最终会调用cached_file方法,获取模型的config.json文件,使用cls._dict_from_json_file方法,将配置文件读取后,转为json格式,生成配置信息字典。根据配置字典的model_type key获取模型类型值,并从CONFIG_MAPPING字典中,命中该模型类型的配置类<class 'transformers.models.qwen2.configuration_qwen2.Qwen2Config'>,获取到类后,使用config_class.from_dict(config_dict, **unused_kwargs),用之前生成的配置字典,真正实例化配置类。patch_config再根据模型参数,以及其它相关参数,修正配置类相关参数(一般都是默认),如:torch_type类型设置为指定参数float16,device_map设置为{'': device(type='cpu')}等。 AutoModelForCausalLM.from_pretrained实例化模型类。 _get_model_class根据模型配置类型,从model_mapping字典中,根据模型配置类型,命中模型字典<class 'transformers.models.qwen2.modeling_qwen2.Qwen2ForCausalLM'>,与生成配置类类似,获取类后,需要使用model_class.from_pretrained根据模型参数,配置类等,配置相关模型参数,并获取模型的.saftensors文件路径,load_state_dict根据文件元数据信息,判断框架类型,safe_load_file读取文件具体内容,并生成权值的键值对字典(for循环所有权值对,并将所有权值信息加载到内存),得到state_dict权重等相关参数字典。获取所有权重的key,放入list列表。_check_and_enable_sdpa,config._attn_implementation = "sdpa"默认指定sdpa注意力机制。 model = cls(config, *model_args, **model_kwargs)使用预训练配置(PretrainedConfig)实例化生成式配置(GenerationConfig)。由from_dict调用GenerationConfig,初始化生成式任务的类,generate调用支持的生成式方法有:text-decoder、text-to-text、speech-to-text、vision-to-text。 config = cls(**{**config_dict, **kwargs})返回生成式配置。经过相关处理,最终完成模型类Qwen2ForCausalLM的实例化。 model实例化后,需要调用tie_weights,将输入嵌入和输出嵌入之间的权重联系起来。find_tied_parameters返回关联的权重参数列表。检查关联的权重参数是否在同一设备(需要确保在同一设备)。 GenerationConfig.from_pretrained从generation_config.json文件中加载生成式配置信息,并使用该信息实例化生成式配置类。 dispatch_model调度模型到指定的设备(model.to(device))。待模型相关操作处理完成,返回load_model主方法中,使用patch_model修改模型相关参数(一般都是默认)。 init_adapter初始化选择的适配器BAdam。model.train()将模型置为训练状态(默认eval()状态)。所有相关操作完成后,返回run_sft主方法,将处理完成的模型类,赋值给model。

关键源码及关键数据展示

载入并补充相关参数, 替换,修复模型相关参数,载入指定模型的模块类,并实例化该类。 设置模块为训练模式。

def load_model(
    tokenizer: "PreTrainedTokenizer",
    model_args: "ModelArguments",
    finetuning_args: "FinetuningArguments",
    is_trainable: bool = False,
    add_valuehead: bool = False,
) -> "PreTrainedModel":
    r"""
    Loads pretrained model.
    """
    init_kwargs = _get_init_kwargs(model_args)
    config = load_config(model_args)
    patch_config(config, tokenizer, model_args, init_kwargs, is_trainable)

    model = None
    lazy_load = False
    if model_args.use_unsloth:
        if model_args.adapter_name_or_path is not None:
            lazy_load = True
        elif is_trainable:
            model = load_unsloth_pretrained_model(config, model_args)

    if model is None and not lazy_load:
        init_kwargs["config"] = config
        init_kwargs["pretrained_model_name_or_path"] = model_args.model_name_or_path

        if model_args.mixture_of_depths == "load":
            model = load_mod_pretrained_model(**init_kwargs)
        elif model_args.visual_inputs:
            model = AutoModelForVision2Seq.from_pretrained(**init_kwargs)
        elif model_args.train_from_scratch:
            model = AutoModelForCausalLM.from_config(config)
        else:
            model = AutoModelForCausalLM.from_pretrained(**init_kwargs)

        if model_args.mixture_of_depths == "convert":
            model = convert_pretrained_model_to_mod(model, config, model_args)

    if not lazy_load:
        patch_model(model, tokenizer, model_args, is_trainable, add_valuehead)
        register_autoclass(config, model, tokenizer)

    model = init_adapter(config, model, model_args, finetuning_args, is_trainable)

    if add_valuehead:
        model = AutoModelForCausalLMWithValueHead.from_pretrained(model)
        patch_valuehead_model(model)

        if model_args.adapter_name_or_path is not None:
            vhead_path = model_args.adapter_name_or_path[-1]
        else:
            vhead_path = model_args.model_name_or_path

        vhead_params = load_valuehead_params(vhead_path, model_args)
        if vhead_params is not None:
            model.load_state_dict(vhead_params, strict=False)
            logger.info("Loaded valuehead from checkpoint: {}".format(vhead_path))

    if not is_trainable:
        model.requires_grad_(False)
        for param in model.parameters():
            if param.data.dtype == torch.float32 and model_args.compute_dtype != torch.float32:
                param.data = param.data.to(model_args.compute_dtype)

        model.eval()
    else:
        model.train()

    trainable_params, all_param = count_parameters(model)
    if is_trainable:
        param_stats = "trainable params: {:,} || all params: {:,} || trainable%: {:.4f}".format(
            trainable_params, all_param, 100 * trainable_params / all_param
        )
    else:
        param_stats = "all params: {:,}".format(all_param)

    logger.info(param_stats)

    if model_args.print_param_status:
        for name, param in model.named_parameters():
            print(
                "name: {}, dtype: {}, device: {}, trainable: {}".format(
                    name, param.dtype, param.device, param.requires_grad
                )
            )

    return model

加载模型类

def _get_model_class(config, model_mapping):
    supported_models = model_mapping[type(config)]
    if not isinstance(supported_models, (list, tuple)):
        return supported_models

    name_to_model = {model.__name__: model for model in supported_models}
    architectures = getattr(config, "architectures", [])
    for arch in architectures:
        if arch in name_to_model:
            return name_to_model[arch]
        elif f"TF{arch}" in name_to_model:
            return name_to_model[f"TF{arch}"]
        elif f"Flax{arch}" in name_to_model:
            return name_to_model[f"Flax{arch}"]

    # If not architecture is set in the config or match the supported models, the first element of the tuple is the
    # defaults.
    return supported_models[0]

实例化预训练的pytorch模型,配置模型参数,指定模型device

 获取safetensors路径,如果本地没有,则从仓库下载

   elif use_safetensors is not False and os.path.isfile(
                    os.path.join(pretrained_model_name_or_path, subfolder, _add_variant(SAFE_WEIGHTS_NAME, variant))
                ):
                    # Load from a safetensors checkpoint
                    archive_file = os.path.join(
                        pretrained_model_name_or_path, subfolder, _add_variant(SAFE_WEIGHTS_NAME, variant)
                    )
load_state_dict(checkpoint_file: Union[str, os.PathLike], is_quantized: bool = False),open模型safetensors文件,并载入权值数据。

训练准备

主要流程

     根据之前载入的train_dataset、model、tokenizer、optimizers初始化训练器,其实CustomSeq2SeqTrainer。最终父类为trainer,并从导入badam导入BAdamCallback回调函数,并添加到self.callback列表中。

关键数据展示

数据集整理器

初始化训练器CustomSeq2SeqTrainer

开始训练

train_result = trainer.train(resume_from_checkpoint=training_args.resume_from_checkpoint)

主要流程

检查是否需要断点恢复,需要,则载入checkpoint数据,获取train_batch_size 检查是否重新初始化模型,如果重新初始化,则重新载入模型到device find_executable_batch_size使用functools.partial(function, batch_size=starting_batch_size),将self._inner_training_loop函数固定参数batch_size值(其它参数值可变),创建一个新的函数inner_training_loop(其实还是调用的_inner_training_loop方法,只是其中的batch_size参数值固定了,相当于新函数,不用传递该参数,少一个参数)。 inner_training_loop方法,调用get_train_dataloader,加载训练数据,数据整理器,处理训练数据(移除前向训练不需要的参数)。构建dataloader参数,使用RandomSampler随机采样器, self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params)),首先调用torch的Dataloader,构建Dataloader(和Cachefs预研分析的Dataloader一样)。Dataloader类初始完成后,再调用self.accelerator.prepare使用args传递的参数(DataLoader类初始化构建的参数),处理分布式训练和混合精度对象,对象主要包括:pytorch dataloader、torch.nn.Module、 Optimizer、pytorch LR Scheduler。主要为分布式类型,数据、模型、优化器、学习率调度器。 _prepare_one调用prepare_data_loader,为分布式训练准备dataloder, 如果为分布式任务,将模型使用的随机采样器RandomSampler更换为批采样器 BatchSampler,否则更换为SeedableRandomSampler。处理完成相关判断后,使用dataloader = DataLoaderShard构建分布式任务设备的dataloader,并返回重新构建的dataloader,并添加到全局的self._dataloaders列表中。 构建完train_dataloader后,返回_inner_training_loop主体方法中,继续构造相关训练参数,创建指定的badam优化器,创建学习率调度器。 设置模型为训练模式。odel, self.optimizer = self.accelerator.prepare(self.model, self.optimizer)与dataloader类似,调用_prepare_one的self.prepare_model为分布式训练准备pytorch模型。

关键源码及关键数据展示

分布式类型

# Subclassing str as well as Enum allows the `DistributedType` to be JSON-serializable out of the box.
NO = "NO"
MULTI_CPU = "MULTI_CPU"
MULTI_GPU = "MULTI_GPU"
MULTI_NPU = "MULTI_NPU"
MULTI_MLU = "MULTI_MLU"
MULTI_XPU = "MULTI_XPU"
DEEPSPEED = "DEEPSPEED"
FSDP = "FSDP"
XLA = "XLA"
MEGATRON_LM = "MEGATRON_LM"
TPU = DeprecatedFieldDescriptor("TPU", "XLA")

加载train_dataset,并加载torch的dataloader,构建train_dataloader

    def get_train_dataloader(self) -> DataLoader:
        """
        Returns the training [`~torch.utils.data.DataLoader`].

        Will use no sampler if `train_dataset` does not implement `__len__`, a random sampler (adapted to distributed
        training if necessary) otherwise.

        Subclass and override this method if you want to inject some custom behavior.
        """
        if self.train_dataset is None:
            raise ValueError("Trainer: training requires a train_dataset.")

        train_dataset = self.train_dataset
        data_collator = self.data_collator
        if is_datasets_available() and isinstance(train_dataset, datasets.Dataset):
            train_dataset = self._remove_unused_columns(train_dataset, description="training")
        else:
            data_collator = self._get_collator_with_removed_columns(data_collator, description="training")

        dataloader_params = {
            "batch_size": self._train_batch_size,
            "collate_fn": data_collator,
            "num_workers": self.args.dataloader_num_workers,
            "pin_memory": self.args.dataloader_pin_memory,
            "persistent_workers": self.args.dataloader_persistent_workers,
        }

        if not isinstance(train_dataset, torch.utils.data.IterableDataset):
            dataloader_params["sampler"] = self._get_train_sampler()
            dataloader_params["drop_last"] = self.args.dataloader_drop_last
            dataloader_params["worker_init_fn"] = seed_worker
            dataloader_params["prefetch_factor"] = self.args.dataloader_prefetch_factor

        return self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))

 使用transfromers中trainer类库中方法

_inner_training_loop(
    self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None
)进行训练

训练完成,保存模型

调参效果

初步调参

经验调参 

总结

更新时间 2024-09-25