2023年的深度学习入门指南(19) - LLaMA 2源码解析
上一节我们学习了LLaMA 2的补全和聊天两种API的使用方法。本节我们来看看LLaMA 2的源码。
补全函数text_completion源码解析
上一节我们讲了LLaMA 2的编程方法。我们来复习一下:
generator = Llama.build(
ckpt_dir=ckpt_dir,
tokenizer_path=tokenizer_path,
max_seq_len=max_seq_len,
max_batch_size=max_batch_size,
)
prompts = [
"上下五千年,英雄万万千。黄沙百战穿金甲,不破楼兰终不还",
]
results = generator.text_completion(
prompts,
max_gen_len=max_gen_len,
temperature=temperature,
top_p=top_p,
)
我们先来看看text_completion函数的参数是什么意思,该函数的原型为:
def text_completion(
self,
prompts: List[str],
temperature: float = 0.6,
top_p: float = 0.9,
max_gen_len: Optional[int] = None,
logprobs: bool = False,
echo: bool = False,
) -> List[CompletionPrediction]:
我们来看下这些参数的含义:
prompts:这是一个字符串列表,每个字符串都是一个用于生成文本的提示。 temperature(默认值为0.6):这是一个控制生成文本随机性的参数。温度值越高,生成的文本就越随机;温度值越低,生成的文本就越倾向于最可能的输出。 top_p(默认值为0.9):这是一个控制生成文本多样性的参数,它设定了从最高概率的词开始,累计到总概率超过top_p的词为止,然后从这些词中随机选择一个词作为生成的词。这种方法也被称为nucleus sampling或top-p sampling。 max_gen_len:可选参数,表示生成的文本的最大长度。如果未指定,那么将使用模型参数中的最大序列长度减1。 logprobs(默认值为False):如果为True,那么在返回的结果中会包含生成的每个词的对数概率。 echo(默认值为False):这是一个控制是否在生成的文本中包含输入提示的参数。参数明白了之后我们看text_completion完整实现:
def text_completion(
self,
prompts: List[str],
temperature: float = 0.6,
top_p: float = 0.9,
max_gen_len: Optional[int] = None,
logprobs: bool = False,
echo: bool = False,
) -> List[CompletionPrediction]:
if max_gen_len is None:
max_gen_len = self.model.params.max_seq_len - 1
prompt_tokens = [self.tokenizer.encode(x, bos=True, eos=False) for x in prompts]
generation_tokens, generation_logprobs = self.generate(
prompt_tokens=prompt_tokens,
max_gen_len=max_gen_len,
temperature=temperature,
top_p=top_p,
logprobs=logprobs,
echo=echo,
)
if logprobs:
return [
{
"generation": self.tokenizer.decode(t),
"tokens": [self.tokenizer.decode(x) for x in t],
"logprobs": logprobs_i,
}
for t, logprobs_i in zip(generation_tokens, generation_logprobs)
]
return [{"generation": self.tokenizer.decode(t)} for t in generation_tokens]
总结起来就三步,这个text_completion其实就是generate的包装函数:
编码:调用tokenizer.encode 生成:调用generate 解码:调用tokenizer.decode分词
import os
from logging import getLogger
from typing import List
from sentencepiece import SentencePieceProcessor
logger = getLogger()
class Tokenizer:
def __init__(self, model_path: str):
# reload tokenizer
assert os.path.isfile(model_path), model_path
self.sp_model = SentencePieceProcessor(model_file=model_path)
logger.info(f"Reloaded SentencePiece model from {model_path}")
# BOS / EOS token IDs
self.n_words: int = self.sp_model.vocab_size()
self.bos_id: int = self.sp_model.bos_id()
self.eos_id: int = self.sp_model.eos_id()
self.pad_id: int = self.sp_model.pad_id()
logger.info(
f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}"
)
assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()
def encode(self, s: str, bos: bool, eos: bool) -> List[int]:
assert type(s) is str
t = self.sp_model.encode(s)
if bos:
t = [self.bos_id] + t
if eos:
t = t + [self.eos_id]
return t
def decode(self, t: List[int]) -> str:
return self.sp_model.decode(t)
首先是用到了分词组件SentencePieceProcessor。SentencePieceProcessor是SentencePiece库中的一个组件,它实现了子词(subword)tokenize和detokenize的功能。
其主要作用包括:
将文本tokenize成子词(subword)。SentencePiece 使用的数据驱动方法,可以学习文本的词汇表并将文本tokenize成子词单元。 将子词detokenize合并成原始文本。可以将tokenize后的子词序列重新合并为原始文本。 提供vocab管理。可以获得tokenize的子词词汇表等信息。 支持多种语言文本的tokenize和detokenize。 提供高效的实现。底层使用C++实现,可以快速处理大规模文本。 提供多种模型选择,如BPE、unigram等。 支持自定义训练子词化模型。好,下面我们回到这段代码本身。这段代码实现了一个基于SentencePiece的Tokenizer类,可以进行文本的tokenize和detokenize。
主要逻辑:
在初始化时加载SentencePiece模型文件model_path。 获取模型的词汇表大小n_words,以及特殊token的id(bos_id,eos_id,pad_id)。 encode方法可以将字符串文本s tokenize成id列表。可以选择在开始加入bos_id,结尾加入eos_id。 decode方法可以将id列表解码还原为字符串文本。这样就构建了一个封装SentencePiece tokenize/detokenize的Tokenizer类。可以加载自定义的SentencePiece模型,然后就可以方便地对文本进行子词化处理。
这种方式可以重复使用已训练好的SentencePiece模型,为下游NLP任务提供可靠的tokenize和detokenize功能。
最后我们再讲一讲几个特殊的符号bos_id、eos_id和pad_id:
bos_id: 开始符(Beginning of Sentence)的id。用于表示一个序列的开始。 eos_id: 结束符(End of Sentence)的id。用于表示一个序列的结束。 pad_id: 填充符(Padding)的id。当需要将多个序列长度对齐时,可以使用pad_id在较短序列后面填充。聊天函数chat_completion
在进入generate函数之前,我们再看看chat_completion是如何实现的。
def chat_completion(
self,
dialogs: List[Dialog],
temperature: float = 0.6,
top_p: float = 0.9,
max_gen_len: Optional[int] = None,
logprobs: bool = False,
) -> List[ChatPrediction]:
if max_gen_len is None:
max_gen_len = self.model.params.max_seq_len - 1
prompt_tokens = []
for dialog in dialogs:
if dialog[0]["role"] != "system":
dialog = [
{
"role": "system",
"content": DEFAULT_SYSTEM_PROMPT,
}
] + dialog
dialog = [
{
"role": dialog[1]["role"],
"content": B_SYS
+ dialog[0]["content"]
+ E_SYS
+ dialog[1]["content"],
}
] + dialog[2:]
assert all([msg["role"] == "user" for msg in dialog[::2]]) and all(
[msg["role"] == "assistant" for msg in dialog[1::2]]
), (
"model only supports 'system', 'user' and 'assistant' roles, "
"starting with 'system', then 'user' and alternating (u/a/u/a/u...)"
)
dialog_tokens: List[int] = sum(
[
self.tokenizer.encode(
f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} ",
bos=True,
eos=True,
)
for prompt, answer in zip(
dialog[::2],
dialog[1::2],
)
],
[],
)
assert (
dialog[-1]["role"] == "user"
), f"Last message must be from user, got {dialog[-1]['role']}"
dialog_tokens += self.tokenizer.encode(
f"{B_INST} {(dialog[-1]['content']).strip()} {E_INST}",
bos=True,
eos=False,
)
prompt_tokens.append(dialog_tokens)
generation_tokens, generation_logprobs = self.generate(
prompt_tokens=prompt_tokens,
max_gen_len=max_gen_len,
temperature=temperature,
top_p=top_p,
logprobs=logprobs,
)
if logprobs:
return [
{
"generation": {
"role": "assistant",
"content": self.tokenizer.decode(t),
},
"tokens": [self.tokenizer.decode(x) for x in t],
"logprobs": logprobs_i,
}
for t, logprobs_i in zip(generation_tokens, generation_logprobs)
]
return [
{"generation": {"role": "assistant", "content": self.tokenizer.decode(t)}}
for t in generation_tokens
]
我们先看一下参数:
dialogs:一个对话列表,其中每个对话都是一个字典列表,表示一段对话。 temperature:一个浮点数,表示生成文本时使用的温度。默认值为 0.6。 top_p:一个浮点数,表示生成文本时使用的 top-p 采样。默认值为 0.9。 max_gen_len:一个可选的整数,表示生成文本的最大长度。如果未指定,则使用模型参数中的最大序列长度减一。 logprobs:一个布尔值,表示是否返回生成文本的对数概率。默认值为 False。函数返回一个 ChatPrediction 列表,其中每个元素都是一个字典,包含生成的回复和相关信息。
函数首先检查 max_gen_len 是否为 None,如果是,则将其设置为模型参数中的最大序列长度减一。然后,对于每个对话,函数执行以下操作:
如果第一条消息的角色不是 “system”,则在对话的开头添加一条默认的系统提示。 将第一条和第二条消息合并为一条消息,并更新对话。 检查对话中消息的角色是否符合要求(即以 “system” 开始,然后交替出现 “user” 和 “assistant”)。 对于每一组相邻的提示和回答(即每两条消息),使用 tokenizer 对其进行编码,并将编码后的 token 连接起来。 检查最后一条消息是否来自用户。 对最后一条消息进行编码,并将编码后的 token 添加到 token 列表中。接下来,函数调用 generate 方法生成回复,并根据 logprobs 参数的值返回相应的结果。如果 logprobs 为 True,则返回包含生成回复、token 和对数概率的字典列表;否则,返回仅包含生成回复的字典列表。这些生成回复都具有 “assistant” 角色,并使用 tokenizer 进行解码。
总体来说,只是增加了对于对话角色的业务逻辑处理,核心还是调用generate函数。
温度与top p采样
在进入讲解generate函数之前,我们先讲一个小知识点,就是温度temperature的作用。我们看下面的代码:
if temperature > 0:
probs = torch.softmax(logits[:, -1] / temperature, dim=-1)
next_token = sample_top_p(probs, top_p)
else:
next_token = torch.argmax(logits[:, -1], dim=-1)
temperature 是一个超参数,用于控制生成文本的多样性。当 temperature 较高时,概率分布更加平坦,因此采样出的标记更具多样性。当 temperature 较低时,概率分布更加尖锐,因此采样出的标记更倾向于概率最大的那个。当 temperature 等于 0 时,直接选择概率最大的标记。
那么,sample_top_p是如何实现的呢?我把解说写在代码注释里面了:
def sample_top_p(probs, p):
# 这行代码将输入的概率 probs 按照降序排序。probs_sort 是排序后的概率,probs_idx 是对应的索引。
probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True)
# 这行代码计算 probs_sort 的累积和。累积和是从第一个元素开始,依次将序列中的每个元素与前面所有元素的和相加得到的。
probs_sum = torch.cumsum(probs_sort, dim=-1)
# 这行代码生成一个布尔掩码,用于指示哪些累积和减去当前概率的值大于 p。这用于确定哪些概率应该被设为0,以保证被抽样的概率和不超过 p。
mask = probs_sum - probs_sort > p
# 这行代码使用上述生成的掩码,将那些使累积和减去当前概率的值大于 p 的 probs_sort 中的元素设为0。
probs_sort[mask] = 0.0
# 这行代码将 probs_sort 中的每个元素除以它们的和,以便重新归一化概率分布。
probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True))
# 这行代码从归一化的 probs_sort 中抽取一个样本。torch.multinomial 是PyTorch中的多项式分布抽样函数,它根据每个元素的权重抽取样本。
next_token = torch.multinomial(probs_sort, num_samples=1)
# 这行代码使用 torch.gather 函数从 probs_idx 中收集对应 next_token 的索引,这样就能得到原始概率 probs 中对应的索引。
next_token = torch.gather(probs_idx, -1, next_token)
return next_token
总的来说,sample_top_p保留了按概率高低排序的大致分布,但过滤了长尾部分的低概率噪声。然后从重归一化的分布中采样,既保证了质量,又增加了适当的随机性。
generate函数
好,我们终于开始探索最核心的生成函数上了:
@torch.inference_mode()
def generate(
self,
prompt_tokens: List[List[int]],
max_gen_len: int,
temperature: float = 0.6,
top_p: float = 0.9,
logprobs: bool = False,
echo: bool = False,
) -> Tuple[List[List[int]], Optional[List[List[float]]]]:
首先是这个函数的参数,其实我们已经比较熟悉了。包括输入的提示 tokens(prompt_tokens),最大生成长度(max_gen_len),温度参数(temperature,影响生成文本的随机性), top_p(用于决定采样过程中保留的 token 集合的概率阈值,也被称为 “nucleus sampling”),是否返回每个 token 的对数概率(logprobs),以及是否将输入的提示返回(echo)。
params = self.model.params
bsz = len(prompt_tokens)
assert bsz <= params.max_batch_size, (bsz, params.max_batch_size)
min_prompt_len = min(len(t) for t in prompt_tokens)
max_prompt_len = max(len(t) for t in prompt_tokens)
assert max_prompt_len <= params.max_seq_len
total_len = min(params.max_seq_len, max_gen_len + max_prompt_len)
pad_id = self.tokenizer.pad_id
tokens = torch.full((bsz, total_len), pad_id, dtype=torch.long, device="cuda")
for k, t in enumerate(prompt_tokens):
tokens[k, : len(t)] = torch.tensor(t, dtype=torch.long, device="cuda")
if logprobs:
token_logprobs = torch.zeros_like(tokens, dtype=torch.float)
prev_pos = 0
eos_reached = torch.tensor([False] * bsz, device="cuda")
input_text_mask = tokens != pad_id
接着,根据提供的 prompt_tokens 初始化一个 tokens 张量,长度为 total_len,并填充模型的 pad_id。然后,将 prompt_tokens 的内容复制到 tokens 张量的对应位置。
for cur_pos in range(min_prompt_len, total_len):
logits = self.model.forward(tokens[:, prev_pos:cur_pos], prev_pos)
if logprobs:
token_logprobs[:, prev_pos + 1 : cur_pos + 1] = -F.cross_entropy(
input=logits.transpose(1, 2),
target=tokens[:, prev_pos + 1 : cur_pos + 1],
reduction="none",
ignore_index=pad_id,
)
if temperature > 0:
probs = torch.softmax(logits[:, -1] / temperature, dim=-1)
next_token = sample_top_p(probs, top_p)
else:
next_token = torch.argmax(logits[:, -1], dim=-1)
next_token = next_token.reshape(-1)
# only replace token if prompt has already been generated
next_token = torch.where(
input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token
)
tokens[:, cur_pos] = next_token
eos_reached |= (~input_text_mask[:, cur_pos]) & (
next_token == self.tokenizer.eos_id
)
prev_pos = cur_pos
if all(eos_reached):
break
然后,对于 tokens 张量中的每一个位置,计算下一个 token 的 logits,并基于这些 logits 生成下一个 token。如果 logprobs 参数为真,则计算每个 token 的对数概率。如果温度大于 0,则使用 softmax 函数和温度参数对 logits 进行缩放,然后使用 top-p 采样生成下一个 token。否则,直接选择 logits 最大的 token。新生成的 token 会替换 tokens 张量中的对应位置。
如果生成的 token 是结束标记(eos_id),则更新 eos_reached 标记。如果所有的序列都已经生成了结束标记,则停止生成。
if logprobs:
token_logprobs = token_logprobs.tolist()
out_tokens, out_logprobs = [], []
for i, toks in enumerate(tokens.tolist()):
# cut to max gen len
start = 0 if echo else len(prompt_tokens[i])
toks = toks[start : len(prompt_tokens[i]) + max_gen_len]
probs = None
if logprobs:
probs = token_logprobs[i][start : len(prompt_tokens[i]) + max_gen_len]
# cut to eos tok if any
if self.tokenizer.eos_id in toks:
eos_idx = toks.index(self.tokenizer.eos_id)
toks = toks[:eos_idx]
probs = probs[:eos_idx] if logprobs else None
out_tokens.append(toks)
out_logprobs.append(probs)
return (out_tokens, out_logprobs if logprobs else None)
最后,如果 logprobs 参数为真,则将 token_logprobs 转换为列表。然后,对于 tokens 张量中的每一行(即每一个生成的序列),如果 echo 参数为假,则去掉提示部分。然后,如果存在结束标记,则去掉结束标记之后的部分。最后,返回生成的 tokens 和对数概率(如果 logprobs 参数为真)。
这个函数返回的是一个元组,第一个元素是一个列表,包含每一个生成的 token 序列。第二个元素是一个列表,包含每一个生成的对数概率序列(如果 logprobs 参数为真)。
build构造函数
最后我们再说下构造Llama的部分:
@staticmethod
def build(
ckpt_dir: str,
tokenizer_path: str,
max_seq_len: int,
max_batch_size: int,
model_parallel_size: Optional[int] = None,
) -> "Llama":
if not torch.distributed.is_initialized():
torch.distributed.init_process_group("nccl")
if not model_parallel_is_initialized():
if model_parallel_size is None:
model_parallel_size = int(os.environ.get("WORLD_SIZE", 1))
initialize_model_parallel(model_parallel_size)
local_rank = int(os.environ.get("LOCAL_RANK", 0))
torch.cuda.set_device(local_rank)
# seed must be the same in all processes
torch.manual_seed(1)
if local_rank > 0:
sys.stdout = open(os.devnull, "w")
start_time = time.time()
checkpoints = sorted(Path(ckpt_dir).glob("*.pth"))
assert len(checkpoints) > 0, f"no checkpoint files found in {ckpt_dir}"
assert model_parallel_size == len(
checkpoints
), f"Loading a checkpoint for MP={len(checkpoints)} but world size is {model_parallel_size}"
ckpt_path = checkpoints[get_model_parallel_rank()]
checkpoint = torch.load(ckpt_path, map_location="cpu")
with open(Path(ckpt_dir) / "params.json", "r") as f:
params = json.loads(f.read())
model_args: ModelArgs = ModelArgs(
max_seq_len=max_seq_len,
max_batch_size=max_batch_size,
**params,
)
tokenizer = Tokenizer(model_path=tokenizer_path)
model_args.vocab_size = tokenizer.n_words
torch.set_default_tensor_type(torch.cuda.HalfTensor)
model = Transformer(model_args)
model.load_state_dict(checkpoint, strict=False)
print(f"Loaded in {time.time() - start_time:.2f} seconds")
return Llama(model, tokenizer)
虽然这么一大段,但其实都是一些初始化的工作。
分布式设置:首先,这段代码检查是否已经初始化了 PyTorch 的分布式环境,如果没有则进行初始化。然后,检查是否已经初始化了模型并行环境,如果没有,则获取环境变量 WORLD_SIZE 的值作为模型并行的大小,并进行初始化。
设备设置:获取环境变量 LOCAL_RANK 的值作为本地排名,并设置当前设备为该排名对应的 GPU。
随机种子设置:为了确保所有进程生成的随机数相同,设置随机种子为 1。
标准输出设置:如果本地排名大于 0,则将标准输出重定向到空设备,即不显示任何输出。
加载模型检查点:找到检查点目录中的所有检查点文件,并按照文件名排序。然后,根据模型并行的排名选择一个检查点文件,并加载该检查点。然后,加载模型参数。
构建模型和分词器:使用加载的模型参数和提供的 max_seq_len 和 max_batch_size 构建模型参数对象。然后,加载分词器,并设置模型参数的词汇表大小为分词器的词汇表大小。然后,设置默认的张量类型为半精度浮点型(以节省内存和计算资源)。然后,构建 Transformer 模型,并加载模型检查点。
最后,构建一个 Llama 对象,包含加载的模型和分词器,并返回该对象。
小结
本节我们学习了LLaMA 2的源码,包括补全函数text_completion和聊天函数chat_completion的实现,以及它们的真正实现generate函数的原理。我们还学习了温度temperature和top p采样的原理。
对于没有搞到深度学习生成的同学,可能有一点难度。
LLaMA的代码还差一部分就是模型的部分,我们放到下一节来讲。要不然知识点太多大家容易大脑缺氧:)