5 – 模型骨架组合

经过前面几节的介绍,我们已经梳理了大语言模型的嵌入层和Transformer块。回忆一下LLMs架构图,transformer 的输出将作为下一个归一化层的输入。

前文在transformer 模块中,我们提到了层归一化。层归一化的核心思想是为了调整神经网络层的激活值(输出),使其具有零均值和单位方差(即方差为1)。

其结构如代码段5-1

import torch
import torch.nn as nn

class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super(LayerNorm, self).__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

数据经过transformer 之后,进行层归一化操作,进一步的提高网络内部数值稳定性,加速模型的收敛。

经过层归一化之后,还需要经过线性层处理,将隐藏状态转换成词汇表大小的向量,以便进行softmax操作,从而得到各个词汇的概率分布。

至此,我们已经了解了构建大语言模型训练框架的所有核心要素,接下来给出完整的实现代码。

代码段5-2,主体框架

import torch
import torch.nn as nn

GPT_CONFIG = {
    "vocab_size": 50257,
    "context_length": 1024,
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "bias": False
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        self.layer_norm = LayerNorm(cfg["emb_dim"])
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
    
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.layer_norm(x)
        logits = self.out_head(x)
        return logits

class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.attention = MultiHeadAttention(
            input_dim=cfg["emb_dim"],
            output_dim=cfg["emb_dim"],
            num_heads=cfg["n_heads"],
            drop_out=cfg["drop_rate"],
            bias=cfg["bias"]
        )
        self.ff = FeedForward(cfg)
        self.norm = LayerNorm(cfg["emb_dim"])
        self.dropout = nn.Dropout(cfg["drop_rate"])
    
    def forward(self, x):
        # 自注意力部分
        shortcut = x
        x = self.norm(x)
        x = self.attention(x)
        x = self.dropout(x)
        x = x + shortcut
        
        # 前馈网络部分
        shortcut = x
        x = self.norm(x)
        x = self.ff(x)
        x = self.dropout(x)
        x = x + shortcut
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, input_dim, output_dim, num_heads, drop_out, bias):
        super().__init__()
        assert input_dim % num_heads == 0, "input_dim必须能被num_heads整除"
        self.input_dim = input_dim
        self.num_heads = num_heads
        self.head_dim = input_dim // num_heads
        self.W_query = nn.Linear(input_dim, output_dim, bias=False)
        self.W_key = nn.Linear(input_dim, output_dim, bias=False)
        self.W_value = nn.Linear(input_dim, output_dim, bias=bias)
        self.dropout = nn.Dropout(drop_out)
    
    def forward(self, inputs):
        batch_size, seq_len, _ = inputs.size()
        queries = self.W_query(inputs)
        keys = self.W_key(inputs)
        values = self.W_value(inputs)
        
        # 多头切分
        queries = queries.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        keys = keys.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        values = values.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        
        # 注意力计算
        attn_scores = torch.matmul(queries, keys.transpose(-2, -1)) / (self.head_dim ** 0.5)
        mask = torch.tril(torch.ones(seq_len, seq_len)).view(1, 1, seq_len, seq_len).to(inputs.device)
        attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))
        attn_weights = torch.softmax(attn_scores, dim=-1)
        attn_weights = self.dropout(attn_weights)
        
        # 合并多头
        context_vec = torch.matmul(attn_weights, values)
        context_vec = context_vec.transpose(1, 2).contiguous().view(batch_size, seq_len, self.input_dim)
        return context_vec

class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.linear_layer_1 = nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"])
        self.linear_layer_2 = nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"])
        self.dropout = nn.Dropout(cfg["drop_rate"])
        self.relu = nn.ReLU()
    
    def forward(self, x):  # 修正:添加输入参数x
        x = self.linear_layer_1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear_layer_2(x)
        return x

class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.eps = 1e-5
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)  # 有偏方差估计
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

# 测试模型
model = GPTModel(GPT_CONFIG)
total_params = sum(p.numel() for p in model.parameters())
print(f"总参数量:{total_params}")

上面结果显示,总参数量约为155M。

要计算模型的总参数数量,我们需要遍历模型的所有参数,并对它们的元素数量进行求和。在PyTorch中,模型的参数可以通过调用model.parameters()来获取,然后使用.numel()方法来获取每个张量中的元素数量。

具体的:

  1. tok_emb: nn.Embedding(cfg[“vocab_size”], cfg[“emb_dim”]) 有一个权重矩阵,大小为 (vocab_size, emb_dim)。
  2. pos_emb: nn.Embedding(cfg[“context_length”], cfg[“emb_dim”]) 有一个权重矩阵,大小为 (context_length, emb_dim)。
  3. trf_blocks: 包含12个TransformerBlock,每个TransformerBlock包含:
  • MultiHeadAttention,有三个线性层 (W_query, W_key, W_value),每个大小为 (emb_dim, emb_dim)。
  • FeedForward,有两个线性层 (linear_layer_1, linear_layer_2),第一个大小为 (emb_dim, 4 * emb_dim),第二个大小为 (4 * emb_dim, emb_dim)。
  • LayerNorm,有两个可学习参数 scale 和 shift,每个大小为 (emb_dim,)。
  1. layer_norm: LayerNorm(cfg[“emb_dim”]) 有两个可学习参数 scale 和 shift,每个大小为 (emb_dim,)。
  2. out_head: nn.Linear(cfg[“emb_dim”], cfg[“vocab_size”], bias=False) 有一个权重矩阵,大小为 (emb_dim, vocab_size)。

现在我们根据给定的配置来计算每个部分的参数数量:

  • tok_emb: vocab_size * emb_dim = 50257 * 768
  • pos_emb: context_length * emb_dim = 1024 * 768
  • MultiHeadAttention (每个TransformerBlock): 3 * emb_dim * emb_dim = 3 * 768 * 768
  • FeedForward (每个TransformerBlock): 4 * emb_dim * emb_dim + 4 * emb_dim * emb_dim = 2 * 4 * 768 * 768
  • LayerNorm (每个TransformerBlock): 2 * emb_dim = 2 * 768
  • layer_norm: 2 * emb_dim = 2 * 768
  • out_head: emb_dim * vocab_size = 768 * 50257

import torch

# 给定的配置
GPT_CONFIG = {
    "vocab_size": 50257,
    "context_length": 1024,
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "drop_rate": 0.1,
    "bias": False
}

# 计算参数数量
total_params = (
    GPT_CONFIG["vocab_size"] * GPT_CONFIG["emb_dim"]  # tok_emb
    + GPT_CONFIG["context_length"] * GPT_CONFIG["emb_dim"]  # pos_emb
    + GPT_CONFIG["n_layers"] * (3 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"])  # MultiHeadAttention
    + GPT_CONFIG["n_layers"] * (4 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"] + 4 * GPT_CONFIG["emb_dim"] * GPT_CONFIG["emb_dim"])  # FeedForward
    + GPT_CONFIG["n_layers"] * 2 * GPT_CONFIG["emb_dim"]  # LayerNorm in TransformerBlock
    + 2 * GPT_CONFIG["emb_dim"]  # layer_norm
    + GPT_CONFIG["emb_dim"] * GPT_CONFIG["vocab_size"]  # out_head
)
print(total_params)

>>> 155857920

接下来我们将使用这个代码进行单词预测。

在GPT(Generative Pre-trained Transformer)模型中,最终输出的矩阵通常是模型对下一个词的概率分布的预测。这个输出矩阵的每一行对应输入序列中的每一个位置,每一列对应词汇表中的每一个词。

具体来说,GPT模型的输出经过一系列变换后,会通过一个线性层(out_head),该线性层将嵌入维度(emb_dim)的向量转换为词汇表大小(vocab_size)的向量。这个转换后的向量可以视为对于当前位置下一个词的概率分布,即对于每个位置,模型输出一个形状为 (vocab_size,) 的向量,表示每个词汇出现的概率。

这个输出可以直接用于采样下一个词,也可以用于计算损失函数(通常使用交叉熵损失)。采样时,通常会选择概率最高的词作为下一个词;而在训练阶段,则会根据真实标签计算损失,并反向传播更新模型参数。

因此,可以说GPT模型的最终输出是一个矩阵,其形状为 (batch_size, sequence_length, vocab_size),代表了对于输入序列中每个位置下一个词的概率分布预测。这是模型对于下一个词的预测值,通常需要通过softmax函数将其转换为概率分布,以便进行后续的采样或计算损失。

在生成文本的过程中,GPT模型的任务是根据已有的上下文预测下一个词。具体来说,在生成过程中,模型每次只预测当前序列末尾位置后面的下一个词。

生成过程的具体步骤

  1. 初始化序列:假设我们初始化一个空序列或给定一个起始序列。
  2. 逐个预测词:每一步,模型都基于当前序列预测下一个词。
  3. 添加新词:将预测的新词添加到序列末尾,形成新的序列。
  4. 重复步骤:重复上述过程,直到生成所需长度的文本或达到终止条件。

其框架参见代码段 5-3

import torch
import torch.nn.functional as F
import tiktoken


tokenizer = tiktoken.get_encoding("gpt2")
# 假设模型已经定义好
model = GPTModel(GPT_CONFIG)
# 初始输入序列
input_sequence = "Hello, I like python "
encoded = tokenizer.encode(input_sequence)
print("encoded:", encoded)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encoded_tensor.shape)

# 生成的文本长度
generation_length = 10
# 生成文本
generated_text = encoded_tensor
for _ in range(generation_length - generated_text.shape[1]):
    with torch.no_grad():
        output_logits = model(generated_text)
    # 获取最后一个位置的概率分布
    last_position_probabilities = F.softmax(output_logits[:, -1, :], dim=-1)
    # 从概率分布中采样下一个词
    next_word = torch.multinomial(last_position_probabilities, 1)
    # 将新词添加到序列末尾
    generated_text = torch.cat([generated_text, next_word], dim=1) 
   
print("生成的文本:", generated_text)

decoded_text = tokenizer.decode(generated_text.squeeze(0).tolist())
print(decoded_text)

对于上述输出,其结果很难理解,因为截至目前我们仅仅实现了模型的骨架,并没有训练模型,其参数权重都是随机值,因此其输出也是不知所云。下一章我们继续讨论模型的训练。