经过前面几节的介绍,我们已经梳理了大语言模型的嵌入层和Transformer块。回忆一下LLMs架构图,transformer 的输出将作为下一个归一化层的输入。

前文在transformer 模块中,我们提到了层归一化。层归一化的核心思想是为了调整神经网络层的激活值(输出),使其具有零均值和单位方差(即方差为1)。
其结构如代码段5-1
import torch
import torch.nn as nn
class LayerNorm(nn.Module):
def __init__(self, emb_dim):
super(LayerNorm, self).__init__()
self.eps = 1e-5
self.scale = nn.Parameter(torch.ones(emb_dim))
self.shift = nn.Parameter(torch.zeros(emb_dim))
def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)
var = x.var(dim=-1, keepdim=True, unbiased=False)
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
数据经过transformer 之后,进行层归一化操作,进一步的提高网络内部数值稳定性,加速模型的收敛。
经过层归一化之后,还需要经过线性层处理,将隐藏状态转换成词汇表大小的向量,以便进行softmax操作,从而得到各个词汇的概率分布。
至此,我们已经了解了构建大语言模型训练框架的所有核心要素,接下来给出完整的实现代码。
代码段5-2,主体框架
import torch
import torch.nn as nn
GPT_CONFIG = {
"vocab_size": 50257,
"context_length": 1024,
"emb_dim": 768,
"n_heads": 12,
"n_layers": 12,
"drop_rate": 0.1,
"bias": False
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class GPTModel(nn.Module):
def __init__(self, cfg):
super().__init__()
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
self.drop_emb = nn.Dropout(cfg["drop_rate"])
self.trf_blocks = nn.Sequential(
*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
)
self.layer_norm = LayerNorm(cfg["emb_dim"])
self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
def forward(self, in_idx):
batch_size, seq_len = in_idx.shape
tok_embeds = self.tok_emb(in_idx)
pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
x = tok_embeds + pos_embeds
x = self.drop_emb(x)
x = self.trf_blocks(x)
x = self.layer_norm(x)
logits = self.out_head(x)
return logits
class TransformerBlock(nn.Module):
def __init__(self, cfg):
super().__init__()
self.attention = MultiHeadAttention(
input_dim=cfg["emb_dim"],
output_dim=cfg["emb_dim"],
num_heads=cfg["n_heads"],
drop_out=cfg["drop_rate"],
bias=cfg["bias"]
)
self.ff = FeedForward(cfg)
self.norm = LayerNorm(cfg["emb_dim"])
self.dropout = nn.Dropout(cfg["drop_rate"])
def forward(self, x):
# 自注意力部分
shortcut = x
x = self.norm(x)
x = self.attention(x)
x = self.dropout(x)
x = x + shortcut
# 前馈网络部分
shortcut = x
x = self.norm(x)
x = self.ff(x)
x = self.dropout(x)
x = x + shortcut
return x
class MultiHeadAttention(nn.Module):
def __init__(self, input_dim, output_dim, num_heads, drop_out, bias):
super().__init__()
assert input_dim % num_heads == 0, "input_dim必须能被num_heads整除"
self.input_dim = input_dim
self.num_heads = num_heads
self.head_dim = input_dim // num_heads
self.W_query = nn.Linear(input_dim, output_dim, bias=False)
self.W_key = nn.Linear(input_dim, output_dim, bias=False)
self.W_value = nn.Linear(input_dim, output_dim, bias=bias)
self.dropout = nn.Dropout(drop_out)
def forward(self, inputs):
batch_size, seq_len, _ = inputs.size()
queries = self.W_query(inputs)
keys = self.W_key(inputs)
values = self.W_value(inputs)
# 多头切分
queries = queries.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
keys = keys.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
values = values.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
# 注意力计算
attn_scores = torch.matmul(queries, keys.transpose(-2, -1)) / (self.head_dim ** 0.5)
mask = torch.tril(torch.ones(seq_len, seq_len)).view(1, 1, seq_len, seq_len).to(inputs.device)
attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))
attn_weights = torch.softmax(attn_scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# 合并多头
context_vec = torch.matmul(attn_weights, values)
context_vec = context_vec.transpose(1, 2).contiguous().view(batch_size, seq_len, self.input_dim)
return context_vec
class FeedForward(nn.Module):
def __init__(self, cfg):
super().__init__()
self.linear_layer_1 = nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"])
self.linear_layer_2 = nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"])
self.dropout = nn.Dropout(cfg["drop_rate"])
self.relu = nn.ReLU()
def forward(self, x): # 修正:添加输入参数x
x = self.linear_layer_1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.linear_layer_2(x)
return x
class LayerNorm(nn.Module):
def __init__(self, emb_dim):
super().__init__()
self.eps = 1e-5
self.scale = nn.Parameter(torch.ones(emb_dim))
self.shift = nn.Parameter(torch.zeros(emb_dim))
def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)
var = x.var(dim=-1, keepdim=True, unbiased=False) # 有偏方差估计
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
# 测试模型
model = GPTModel(GPT_CONFIG)
total_params = sum(p.numel() for p in model.parameters())
print(f"总参数量:{total_params}")
上面结果显示,总参数量约为155M。
要计算模型的总参数数量,我们需要遍历模型的所有参数,并对它们的元素数量进行求和。在PyTorch中,模型的参数可以通过调用model.parameters()
来获取,然后使用.numel()
方法来获取每个张量中的元素数量。
具体的:
- tok_emb: nn.Embedding(cfg[“vocab_size”], cfg[“emb_dim”]) 有一个权重矩阵,大小为 (vocab_size, emb_dim)。
- pos_emb: nn.Embedding(cfg[“context_length”], cfg[“emb_dim”]) 有一个权重矩阵,大小为 (context_length, emb_dim)。
- trf_blocks: 包含12个TransformerBlock,每个TransformerBlock包含:
- MultiHeadAttention,有三个线性层 (W_query, W_key, W_value),每个大小为 (emb_dim, emb_dim)。
- FeedForward,有两个线性层 (linear_layer_1, linear_layer_2),第一个大小为 (emb_dim, 4 * emb_dim),第二个大小为 (4 * emb_dim, emb_dim)。
- LayerNorm,有两个可学习参数 scale 和 shift,每个大小为 (emb_dim,)。
- layer_norm: LayerNorm(cfg[“emb_dim”]) 有两个可学习参数 scale 和 shift,每个大小为 (emb_dim,)。
- out_head: nn.Linear(cfg[“emb_dim”], cfg[“vocab_size”], bias=False) 有一个权重矩阵,大小为 (emb_dim, vocab_size)。
现在我们根据给定的配置来计算每个部分的参数数量:
- tok_emb: vocab_size * emb_dim = 50257 * 768
- pos_emb: context_length * emb_dim = 1024 * 768
- MultiHeadAttention (每个TransformerBlock): 3 * emb_dim * emb_dim = 3 * 768 * 768
- FeedForward (每个TransformerBlock): 4 * emb_dim * emb_dim + 4 * emb_dim * emb_dim = 2 * 4 * 768 * 768
- LayerNorm (每个TransformerBlock): 2 * emb_dim = 2 * 768
- layer_norm: 2 * emb_dim = 2 * 768
- out_head: emb_dim * vocab_size = 768 * 50257
import torch
# 给定的配置
GPT_CONFIG = {
"vocab_size": 50257,
"context_length": 1024,
"emb_dim": 768,
"n_heads": 12,
"n_layers": 12,
"drop_rate": 0.1,
"bias": False
}
# 计算参数数量
total_params = (
GPT_CONFIG["vocab_size"] * GPT_CONFIG["emb_dim"] # tok_emb
+ GPT_CONFIG["context_length"] * GPT_CONFIG["emb_dim"] # pos_emb
+ GPT_CONFIG["n_layers"] * (3 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"]) # MultiHeadAttention
+ GPT_CONFIG["n_layers"] * (4 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"] + 4 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"]) # FeedForward
+ GPT_CONFIG["n_layers"] * 2 * GPT_CONFIG["emb_dim"] # LayerNorm in TransformerBlock
+ 2 * GPT_CONFIG["emb_dim"] # layer_norm
+ GPT_CONFIG["emb_dim"] * GPT_CONFIG["vocab_size"] # out_head
)
print(total_params)
>>> 155857920
接下来我们将使用这个代码进行单词预测。
在GPT(Generative Pre-trained Transformer)模型中,最终输出的矩阵通常是模型对下一个词的概率分布的预测。这个输出矩阵的每一行对应输入序列中的每一个位置,每一列对应词汇表中的每一个词。
具体来说,GPT模型的输出经过一系列变换后,会通过一个线性层(out_head
),该线性层将嵌入维度(emb_dim
)的向量转换为词汇表大小(vocab_size
)的向量。这个转换后的向量可以视为对于当前位置下一个词的概率分布,即对于每个位置,模型输出一个形状为 (vocab_size,)
的向量,表示每个词汇出现的概率。
这个输出可以直接用于采样下一个词,也可以用于计算损失函数(通常使用交叉熵损失)。采样时,通常会选择概率最高的词作为下一个词;而在训练阶段,则会根据真实标签计算损失,并反向传播更新模型参数。
因此,可以说GPT模型的最终输出是一个矩阵,其形状为 (batch_size, sequence_length, vocab_size)
,代表了对于输入序列中每个位置下一个词的概率分布预测。这是模型对于下一个词的预测值,通常需要通过softmax函数将其转换为概率分布,以便进行后续的采样或计算损失。
在生成文本的过程中,GPT模型的任务是根据已有的上下文预测下一个词。具体来说,在生成过程中,模型每次只预测当前序列末尾位置后面的下一个词。
生成过程的具体步骤
- 初始化序列:假设我们初始化一个空序列或给定一个起始序列。
- 逐个预测词:每一步,模型都基于当前序列预测下一个词。
- 添加新词:将预测的新词添加到序列末尾,形成新的序列。
- 重复步骤:重复上述过程,直到生成所需长度的文本或达到终止条件。
其框架参见代码段 5-3
import torch
import torch.nn.functional as F
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")
# 假设模型已经定义好
model = GPTModel(GPT_CONFIG)
# 初始输入序列
input_sequence = "Hello, I like python "
encoded = tokenizer.encode(input_sequence)
print("encoded:", encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encoded_tensor.shape)
# 生成的文本长度
generation_length = 10
# 生成文本
generated_text = encoded_tensor
for _ in range(generation_length - generated_text.shape[1]):
with torch.no_grad():
output_logits = model(generated_text)
# 获取最后一个位置的概率分布
last_position_probabilities = F.softmax(output_logits[:, -1, :], dim=-1)
# 从概率分布中采样下一个词
next_word = torch.multinomial(last_position_probabilities, 1)
# 将新词添加到序列末尾
generated_text = torch.cat([generated_text, next_word], dim=1)
print("生成的文本:", generated_text)
decoded_text = tokenizer.decode(generated_text.squeeze(0).tolist())
print(decoded_text)
对于上述输出,其结果很难理解,因为截至目前我们仅仅实现了模型的骨架,并没有训练模型,其参数权重都是随机值,因此其输出也是不知所云。下一章我们继续讨论模型的训练。