本文记录了我在学习 llama-factory过程中对代码运行过程的梳理
代码入口——src/train.py
from llamafactory.train.tuner import run_exp
def main():
run_exp()
def _mp_fn(index):
# For xla_spawn (TPUs)
run_exp()
if __name__ == "__main__":
main()
run_exp()
该函数位于src/llamafactory/train/tuner.py
def run_exp(args: Optional[Dict[str, Any]] = None, callbacks: List["TrainerCallback"] = []) -> None:
callbacks.append(LogCallback())
model_args, data_args, training_args, finetuning_args, generating_args = get_train_args(args)
if finetuning_args.stage == "pt":
run_pt(model_args, data_args, training_args, finetuning_args, callbacks)
elif finetuning_args.stage == "sft":
run_sft(model_args, data_args, training_args, finetuning_args, generating_args, callbacks)
elif finetuning_args.stage == "rm":
run_rm(model_args, data_args, training_args, finetuning_args, callbacks)
elif finetuning_args.stage == "ppo":
run_ppo(model_args, data_args, training_args, finetuning_args, generating_args, callbacks)
elif finetuning_args.stage == "dpo":
run_dpo(model_args, data_args, training_args, finetuning_args, callbacks)
elif finetuning_args.stage == "kto":
run_kto(model_args, data_args, training_args, finetuning_args, callbacks)
else:
raise ValueError("Unknown task: {}.".format(finetuning_args.stage))
这段代码首先获取训练参数,也就是 get_train_args函数,位置在src/llamafactory/hparams/parser.py,hparams这个包里还有llama-factory的参数体系。
这个函数是用来解析和验证训练参数的。它接受一个可选的字典参数args
,如果没有提供,则使用默认值。函数的主要任务是:
model_args
, data_args
, training_args
, finetuning_args
, 和 generating_args
。
设置日志记录,以便在训练过程中记录信息。
验证输入参数的合法性,包括检查是否有冲突的参数设置、是否有必要的参数未提供等。
根据需要进行参数的后处理,例如在分布式训练中调整某些参数的值。
如果输出目录已经存在并且不允许覆盖,函数会检查是否可以从之前的检查点恢复训练。
根据是否使用混合精度训练,设置模型的计算精度。
在每个进程中记录一些基本的训练信息,例如进程的排名、设备、GPU数量、分布式训练的状态和计算精度。
使用提供的种子设置随机数生成器的种子。
最后,函数返回五个参数对象,分别代表模型、数据、训练、微调和生成的相关参数。这些参数将被用来配置和执行训练过程。
run_dpo()
以dpo为例,run_dpo()函数在src/llamafactory/train/dpo/workflow.py。
导入模块
from typing import TYPE_CHECKING, List, Optional
from ...data import PairwiseDataCollatorWithPadding, get_dataset
from ...extras.constants import IGNORE_INDEX
from ...extras.ploting import plot_loss
from ...hparams import ModelArguments
from ...model import load_model, load_tokenizer
from ..trainer_utils import create_modelcard_and_push, create_ref_model
from .trainer import CustomDPOTrainer
这些导入语句引入了所需的模块和函数:
typing
模块用于类型检查。
PairwiseDataCollatorWithPadding
和 get_dataset
用于数据处理。
IGNORE_INDEX
是一个常量,用于忽略填充值。
plot_loss
用于绘制损失图。
ModelArguments
是模型参数的定义。
load_model
和 load_tokenizer
用于加载模型和分词器。
create_modelcard_and_push
和 create_ref_model
是一些实用函数。
CustomDPOTrainer
是自定义的训练器类。
类型检查
if TYPE_CHECKING:
from transformers import Seq2SeqTrainingArguments, TrainerCallback
from ...hparams import DataArguments, FinetuningArguments
这些导入语句仅在类型检查时使用,用于定义类型提示。
run_dpo
函数
def run_dpo(
model_args: "ModelArguments",
data_args: "DataArguments",
training_args: "Seq2SeqTrainingArguments",
finetuning_args: "FinetuningArguments",
callbacks: Optional[List["TrainerCallback"]] = None,
):
这个函数 run_dpo
接受五个参数:
model_args
: 模型相关的参数。
data_args
: 数据相关的参数。
training_args
: 训练相关的参数。
finetuning_args
: 微调相关的参数。
callbacks
: 可选的回调函数列表。
加载分词器和数据集
tokenizer_module = load_tokenizer(model_args)
tokenizer = tokenizer_module["tokenizer"]
dataset_module = get_dataset(model_args, data_args, training_args, stage="rm", **tokenizer_module)
model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train)
这几行代码执行以下操作:
加载分词器模块,并从中提取出实际的分词器对象。 调用get_dataset
函数,获取数据集模块。参数包括模型参数、数据参数、训练参数、阶段(这里是 "rm"),以及分词器模块中的其他参数。
加载模型,使用分词器、模型参数、微调参数和训练标志(do_train
)作为输入。
创建数据整理器
data_collator = PairwiseDataCollatorWithPadding(
tokenizer=tokenizer,
pad_to_multiple_of=8,
label_pad_token_id=IGNORE_INDEX if data_args.ignore_pad_token_for_loss else tokenizer.pad_token_id,
)
这段代码创建了一个 PairwiseDataCollatorWithPadding
对象,用于数据整理。它使用分词器,并设置填充到8的倍数。如果 data_args.ignore_pad_token_for_loss
为真,则使用 IGNORE_INDEX
作为标签填充标记,否则使用分词器的填充标记。
创建参考模型
if finetuning_args.use_ref_model:
if finetuning_args.ref_model is None and (not training_args.do_train): # use the model itself
ref_model = model
else:
ref_model = create_ref_model(model_args, finetuning_args)
else:
ref_model = None
这段代码创建一个参考模型:
如果finetuning_args.use_ref_model
为真:
如果 finetuning_args.ref_model
为 None
且不进行训练,则使用当前模型作为参考模型。
否则,调用 create_ref_model
函数创建参考模型。
如果 finetuning_args.use_ref_model
为假,则参考模型为 None
。
更新训练参数
training_args.remove_unused_columns = False # important for pairwise dataset
这行代码更新训练参数,设置 remove_unused_columns
为 False
,这对于成对数据集非常重要。
初始化训练器
trainer = CustomDPOTrainer(
model=model,
ref_model=ref_model,
args=training_args,
finetuning_args=finetuning_args,
data_collator=data_collator,
callbacks=callbacks,
**dataset_module,
**tokenizer_module,
)
这段代码初始化自定义的训练器 CustomDPOTrainer
,传入模型、参考模型、训练参数、微调参数、数据整理器、回调函数以及数据集和分词器模块中的其他参数。
训练模型
if training_args.do_train:
train_result = trainer.train(resume_from_checkpoint=training_args.resume_from_checkpoint)
trainer.save_model()
trainer.log_metrics("train", train_result.metrics)
trainer.save_metrics("train", train_result.metrics)
trainer.save_state()
if trainer.is_world_process_zero() and finetuning_args.plot_loss:
plot_loss(training_args.output_dir, keys=["loss", "eval_loss", "rewards/accuracies"])
如果 do_train
为真,则执行以下操作:
trainer.train
方法进行训练,支持从检查点恢复训练。
保存模型。
记录和保存训练指标。
保存训练状态。
如果当前进程是主进程且启用了绘制损失图,则调用 plot_loss
绘制损失图。
评估模型
if training_args.do_eval:
metrics = trainer.evaluate(metric_key_prefix="eval")
if id(model) == id(ref_model): # unable to compute rewards if reference model is the model itself
remove_keys = [key for key in metrics.keys() if "rewards" in key]
for key in remove_keys:
metrics.pop(key)
trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)
如果 do_eval
为真,则执行以下操作:
trainer.evaluate
方法进行评估,使用 "eval" 作为指标前缀。
如果模型和参考模型是同一个对象,则无法计算奖励,移除包含 "rewards" 的指标。
记录和保存评估指标。
创建模型卡并推送
create_modelcard_and_push(trainer, model_args, data_args, training_args, finetuning_args)
最后,调用 create_modelcard_and_push
函数,创建模型卡并推送到指定位置。
总结
这个代码片段定义了一个 run_dpo
函数,用于加载和准备模型、数据集和相关的配置参数,初始化自定义训练器 CustomDPOTrainer
,并根据需要进行训练和评估。它还包括创建模型卡并推送的步骤。
CustomDPOTrainer类
class CustomDPOTrainer(DPOTrainer):
这个类 CustomDPOTrainer
继承自 DPOTrainer
,它是一个自定义的训练器类。
def __init__(
self,
model: Union["PreTrainedModel", torch.nn.Module],
ref_model: Optional[Union["PreTrainedModel", torch.nn.Module]],
finetuning_args: "FinetuningArguments",
processor: Optional["ProcessorMixin"],
disable_dropout: bool = True,
**kwargs,
):
初始化方法,接受以下参数:
model
: 预训练模型或 PyTorch 模型。
ref_model
: 可选的参考模型。
finetuning_args
: 微调参数。
processor
: 可选的处理器。
disable_dropout
: 是否禁用 dropout,默认为 True
。
**kwargs
: 其他关键字参数。
if disable_dropout:
disable_dropout_in_model(model)
if ref_model is not None:
disable_dropout_in_model(ref_model)
如果 disable_dropout
为真,则禁用模型和参考模型中的 dropout。
self.finetuning_args = finetuning_args
self.f_divergence_type = "reverse_kl"
self.reference_free = False
self.use_dpo_data_collator = True # hack to avoid warning
self.generate_during_eval = False # disable at evaluation
self.label_pad_token_id = IGNORE_INDEX
self.padding_value = 0
self.is_encoder_decoder = model.config.is_encoder_decoder
self.precompute_ref_log_probs = False
self._precomputed_train_ref_log_probs = False
self._precomputed_eval_ref_log_probs = False
self._peft_has_been_casted_to_bf16 = False
初始化一些实例变量,包括微调参数、散度类型、是否使用参考模型、数据整理器、评估期间是否生成、标签填充标记、填充值、是否是编码器-解码器模型等。
self.ref_model = ref_model
self._stored_metrics = defaultdict(lambda: defaultdict(list))
设置参考模型,并初始化一个存储指标的字典。
# dpo hyperparams
self.beta = finetuning_args.pref_beta
self.loss_type = finetuning_args.pref_loss
self.ftx_gamma = finetuning_args.pref_ftx
self.label_smoothing = finetuning_args.dpo_label_smoothing
self.simpo_gamma = finetuning_args.simpo_gamma
初始化一些 DPO(偏好优化)超参数。
Trainer.__init__(self, model=model, **kwargs)
if not hasattr(self, "accelerator"):
raise AttributeError("Please update `transformers`.")
调用父类 Trainer
的初始化方法,并检查是否存在 accelerator
属性。
warnings.simplefilter("ignore") # remove gc warnings on ref model
忽略一些警告信息。
if ref_model is not None:
if self.is_deepspeed_enabled:
if not (
getattr(ref_model, "is_loaded_in_8bit", False) or getattr(ref_model, "is_loaded_in_4bit", False)
): # quantized models are already set on the correct device
self.ref_model = self._prepare_deepspeed(self.ref_model)
else:
self.ref_model = self.accelerator.prepare_model(self.ref_model, evaluation_mode=True)
self.ref_model.eval()
如果参考模型不为空,且启用了 DeepSpeed,则准备 DeepSpeed 模型;否则,使用加速器准备参考模型,并将其设置为评估模式。
if processor is not None:
self.add_callback(SaveProcessorCallback(processor))
如果处理器不为空,则添加 SaveProcessorCallback
回调。
if finetuning_args.pissa_convert:
self.callback_handler.add_callback(PissaConvertCallback)
如果启用了 pissa_convert
,则添加 PissaConvertCallback
回调。
if finetuning_args.use_badam:
from badam import BAdamCallback, clip_grad_norm_old_version
self.accelerator.clip_grad_norm_ = MethodType(clip_grad_norm_old_version, self.accelerator)
self.add_callback(BAdamCallback)
如果启用了 use_badam
,则导入 BAdamCallback
和 clip_grad_norm_old_version
,并添加 BAdamCallback
回调。
def create_optimizer(self) -> "torch.optim.Optimizer":
if self.optimizer is None:
self.optimizer = create_custom_optimzer(self.model, self.args, self.finetuning_args)
return super().create_optimizer()
创建优化器,如果优化器为空,则调用 create_custom_optimzer
创建自定义优化器。
def create_scheduler(
self, num_training_steps: int, optimizer: Optional["torch.optim.Optimizer"] = None
) -> "torch.optim.lr_scheduler.LRScheduler":
create_custom_scheduler(self.args, num_training_steps, optimizer)
return super().create_scheduler(num_training_steps, optimizer)
创建学习率调度器,调用 create_custom_scheduler
创建自定义调度器。
def odds_ratio_loss(self, chosen_logps: "torch.Tensor", rejected_logps: "torch.Tensor") -> "torch.Tensor":
r"""
Computes ORPO's odds ratio (OR) loss for batched log probabilities of the policy model.
"""
log_odds = (chosen_logps - rejected_logps) - (
torch.log1p(-torch.exp(chosen_logps)) - torch.log1p(-torch.exp(rejected_logps))
)
sft_loss = -chosen_logps
odds_ratio_loss = -F.logsigmoid(log_odds)
orpo_loss = sft_loss + self.beta * odds_ratio_loss
return orpo_loss
这是一个用于计算策略模型的批量对数概率的Odds Ratio Policy Optimization(ORPO)损失的PyTorch函数。log_odds
是选择和拒绝的对数概率之差,sft_loss
是负的选择对数概率,odds_ratio_loss
是负的 logsigmoid
,最终的 orpo_loss
是 sft_loss
和 odds_ratio_loss
的加权和。ORPO损失旨在鼓励策略选择具有更高期望回报的动作,同时惩罚它选择不太可能是最优的动作。通过平衡这两个目标,ORPO旨在改善强化学习代理在复杂环境中的性能。
def simpo_loss(self, chosen_logps: "torch.Tensor", rejected_logps: "torch.Tensor") -> "torch.Tensor":
r"""
Computes SimPO loss for batched log probabilities of the policy model.
"""
pi_logratios = chosen_logps - rejected_logps
gamma_logratios = self.simpo_gamma / self.beta
logits = pi_logratios - gamma_logratios
simpo_loss = -F.logsigmoid(self.beta * logits)
return simpo_loss
这个函数用于计算批量对数概率的策略模型的SimPO(简单政策优化)损失。pi_logratios
是选择和拒绝的对数概率之差,gamma_logratios
是 simpo_gamma
和 beta
的比值,logits
是 pi_logratios
和 gamma_logratios
之差,最终的 simpo_loss
是负的 logsigmoid
。
def compute_preference_loss(
self,
policy_chosen_logps: "torch.Tensor",
policy_rejected_logps: "torch.Tensor",
reference_chosen_logps: Optional["torch.Tensor"],
reference_rejected_logps: Optional["torch.Tensor"],
) -> Tuple["torch.Tensor", "torch.Tensor", "torch.Tensor"]:
r"""
Computes loss for preference learning.
"""
if not self.finetuning_args.use_ref_model:
if self.loss_type == "orpo":
losses = self.odds_ratio_loss(policy_chosen_logps, policy_rejected_logps)
elif self.loss_type == "simpo":
losses = self.simpo_loss(policy_chosen_logps, policy_rejected_logps)
else:
raise NotImplementedError("Unknown loss type: {}.".format(self.loss_type))
chosen_rewards = self.beta * policy_chosen_logps.to(self.accelerator.device).detach()
rejected_rewards = self.beta * policy_rejected_logps.to(self.accelerator.device).detach()
else:
losses, chosen_rewards, rejected_rewards = self.dpo_loss(
policy_chosen_logps, policy_rejected_logps, reference_chosen_logps, reference_rejected_logps
)
return losses, chosen_rewards, rejected_rewards
计算偏好学习的损失。如果不使用参考模型,根据参数中的损失类型计算 ORPO 或 SimPO 损失,并计算选择和拒绝的奖励。如果使用参考模型,调用 dpo_loss
计算损失和奖励。对于是否使用参考模型,也就是use_ref_model参数,可以到src/llamafactory/hparams/finetuning_args.py中查看它的默认值:
self.use_ref_model = self.stage == "dpo" and self.pref_loss not in ["orpo", "simpo"]
pref_loss: Literal["sigmoid", "hinge", "ipo", "kto_pair", "orpo", "simpo"] = field(
default="sigmoid",
metadata={"help": "The type of DPO loss to use."},
)
可以看到pref_loss的默认值是sigmod,也就是use_ref_model在dpo阶段默认是True。
def concatenated_forward(
self, model: "PreTrainedModel", batch: Dict[str, "torch.Tensor"]
) -> Tuple["torch.Tensor", "torch.Tensor", "torch.Tensor", "torch.Tensor", "torch.Tensor"]:
r"""
Computes the sum log probabilities of the labels under given logits if loss_type is not IPO, ORPO or SimPO.
Otherwise the average log probabilities.
"""
if self.finetuning_args.use_ref_model:
batch = {k: v.detach().clone() for k, v in batch.items()} # avoid error
all_logits: "torch.Tensor" = model(**batch, return_dict=True, use_cache=False).logits.to(torch.float32)
all_logps, valid_length = get_batch_logps(logits=all_logits, labels=batch["labels"])
if self.loss_type in ["ipo", "orpo", "simpo"]:
all_logps = all_logps / valid_length
batch_size = batch["input_ids"].size(0) // 2
chosen_logps, rejected_logps = all_logps.split(batch_size, dim=0)
chosen_logits, rejected_logits = all_logits.split(batch_size, dim=0)
chosen_length, _ = valid_length.split(batch_size, dim=0)
return chosen_logps, rejected_logps, chosen_logits, rejected_logits, chosen_logps / chosen_length
计算给定 logits 下标签的对数概率之和(如果损失类型不是 IPO、ORPO 或 SimPO),否则计算平均对数概率。返回选择和拒绝的对数概率、logits 和选择的对数概率的平均值。
def compute_reference_log_probs(
self, model: "PreTrainedModel", batch: Dict[str, "torch.Tensor"]
) -> Tuple[Optional["torch.Tensor"], Optional["torch.Tensor"]]:
r"""
Computes log probabilities of the reference model.
"""
if not self.finetuning_args.use_ref_model:
return None, None
if self.ref_model is None:
ref_model = model
ref_context = self.accelerator.unwrap_model(model).disable_adapter()
else:
ref_model = self.ref_model
ref_context = nullcontext()
with torch.no_grad(), ref_context:
reference_chosen_logps, reference_rejected_logps, *_ = self.concatenated_forward(ref_model, batch)
return reference_chosen_logps, reference_rejected_logps
计算参考模型的对数概率。如果不使用参考模型,返回 None
。否则,计算参考模型的选择和拒绝对数概率。
def get_batch_loss_metrics(
self,
model: "PreTrainedModel",
batch: Dict[str, "torch.Tensor"],
train_eval: Literal["train", "eval"] = "train",
) -> Tuple["torch.Tensor", Dict[str, "torch.Tensor"]]:
r"""
Computes the DPO loss and other metrics for the given batch of inputs for train or test.
"""
metrics = {}
(
policy_chosen_logps,
policy_rejected_logps,
policy_chosen_logits,
policy_rejected_logits,
policy_chosen_logps_avg,
) = self.concatenated_forward(model, batch)
reference_chosen_logps, reference_rejected_logps = self.compute_reference_log_probs(model, batch)
losses, chosen_rewards, rejected_rewards = self.compute_preference_loss(
policy_chosen_logps,
policy_rejected_logps,
reference_chosen_logps,
reference_rejected_logps,
)
sft_loss = -policy_chosen_logps_avg
if self.ftx_gamma > 1e-6:
losses += self.ftx_gamma * sft_loss
reward_accuracies = (chosen_rewards > rejected_rewards).float()
prefix = "eval_" if train_eval == "eval" else ""
metrics["{}rewards/chosen".format(prefix)] = chosen_rewards.mean().cpu()
metrics["{}rewards/rejected".format(prefix)] = rejected_rewards.mean().cpu()
metrics["{}rewards/accuracies".format(prefix)] = reward_accuracies.mean().cpu()
metrics["{}rewards/margins".format(prefix)] = (chosen_rewards - rejected_rewards).mean().cpu()
metrics["{}logps/rejected".format(prefix)] = policy_rejected_logps.detach().mean().cpu()
metrics["{}logps/chosen".format(prefix)] = policy_chosen_logps.detach().mean().cpu()
metrics["{}logits/rejected".format(prefix)] = policy_rejected_logits.detach().mean().cpu()
metrics["{}logits/chosen".format(prefix)] = policy_chosen_logits.detach().mean().cpu()
if self.loss_type == "orpo":
metrics["{}sft_loss".format(prefix)] = sft_loss.detach().mean().cpu()
metrics["{}odds_ratio_loss".format(prefix)] = ((losses - sft_loss) / self.beta).detach().mean().cpu()
return losses.mean(), metrics
这个方法计算给定输入批次的 DPO 损失和其他指标,用于训练或测试。具体步骤如下:
初始化一个空的metrics
字典。
调用 concatenated_forward
方法,获取策略模型的选择和拒绝对数概率、logits 和对数概率的平均值。
调用 compute_reference_log_probs
方法,计算参考模型的选择和拒绝对数概率。
调用 compute_preference_loss
方法,计算偏好学习的损失和奖励。
计算 sft_loss
,如果 ftx_gamma
大于 1e-6
,则将其加到损失中,这里的sft_loss可能起到一个正则项的作用。
计算奖励准确率。
根据 train_eval
参数设置前缀,更新 metrics
字典。
返回损失的平均值和 metrics
字典。
总结: CustomDPOTrainer
类扩展了 DPOTrainer
,添加了自定义的初始化方法、优化器和调度器创建方法,以及计算偏好学习损失和其他指标的方法。它还包括计算 ORPO 和 SimPO 损失的方法,以及计算参考模型对数概率的方法。这个类主要用于在训练过程中处理偏好优化相关的任务。
总结