如果你能够从零开始自己构建一个能使用你的母语进行交流的LLM,会怎么样呢?正好,在本教程中我们就将实践这一目标。要想真正理解LLM的工作原理,最好的方法就是亲自动手去构建一个。
我们将一步步指导你如何用特定语言(本次是乌尔都语)创建自己的LLM,这样你就能了解LLM内部究竟是如何运作的。
现代的LLM其实起源于一篇改变了整个领域的研究论文:“注意力就是你所需要的一切”。不过,我们不会陷入那些复杂的数学公式中(遗憾的是,我数学不太好),而是通过实际构建LLM来学习相关知识。
本手册适合谁阅读?
适合软件工程师、产品负责人,或者任何对LLM工作原理感兴趣的人。如果你有一点机器学习的基础知识会更好,但即使没有也没关系。我编写这份教程的目的就是让你无需再去其他地方寻找学习资源。
完成本教程后,你将能够拥有一个可以正常运行的乌尔都语LLM聊天机器人。按照下面的步骤操作,你也可以为自己母语创建这样的工具。
关于预期效果的一些说明:
我们的目标是通过实际操作来了解LLM的工作原理。
需要强调的是,你的LLM并不一定需要具备ChatGPT那样的功能。ChatGPT需要庞大的数据集、数月的训练时间,以及人类反馈所驱动的强化学习机制,而通过学习本教程,你会对这些内容有更深入的理解。
关于代码的一些说明:
本教程中使用的代码大部分是使用Claude Opus 4生成的。这一点值得特别提及,因为它说明了LLM并不仅仅是帮助你更快实现功能开发的编码工具,它们也可以成为非常强大的学习工具。
通过让Claude生成、解释并优化各个组件,我对自己理解LLM训练的原理有了比单纯阅读文档更深入的认识。
如果你也在跟随本教程进行学习,我建议你也尝试利用LLM来辅助自己的学习过程。
我们将涵盖哪些内容?
大语言模型训练的组成部分
在本教程中,我们将逐一介绍以下各个组成部分,并提供代码示例以便大家更好地理解:
-
数据准备
-
分词处理
-
预训练
-
有监督微调
-
部署
所需技术栈
在开始操作之前,你需要准备以下技术工具:
-
Python 3.9+
-
PyTorch
-
分词工具 / SentencePiece
-
Hugging Face 数据集及 Hub
-
regex、BeautifulSoup4、requests(用于数据清洗)
-
tqdm、matplotlib(用于训练辅助工具)
-
Gradio(用于聊天界面的部署)
-
Google Colab(免费提供 T4 GPU 用于训练)
注意:在开始操作之前,请确保安装了仓库中 requirements.txt 文件中列出的所有依赖项。
1. 数据准备
在数据准备阶段,第一步就是数据收集。大语言模型需要通过大量文本数据进行训练,而这些数据的来源并不单一。根据你想要构建的模型类型,可以从多种途径获取文本数据:
-
数字图书馆和档案馆: 互联网档案馆 或维基百科的数据导出文件
-
代码仓库: GitHub、GitLab(如果你的模型需要理解代码,这些资源会非常有用)
-
网络爬取:使用自动化脚本浏览网站、博客和论坛以收集数据
-
学术数据集:研究论文、开放获取期刊
-
现成的数据集:像 Hugging Face 数据集 和 Kaggle 这样的平台提供了大量可直接使用的数据集
在实际应用中,像 GPT 和 LLaMA 这样的大规模大语言模型会依赖自动化流程从多种来源进行网络爬取。但有一条重要的原则需要遵守:只使用公开可用的开源数据。切勿抓取私人或用户个人信息,只选择那些明确允许公众使用的数据集或遵循宽松许可协议的数据。
另外,请记住:输入有误,输出亦错。仅仅获取到数据是不够的,这些数据必须是准确、干净且无杂质的。
在实际操作中,你可以从不同的来源收集数据。就我而言,我从 Hugging Face 获取了足够用的数据。Hugging Face 提供了 CulturaX 这个多语言数据集。由于这个数据集的规模非常大,我并没有下载全部内容,而只下载了一小部分。
在本次教程中,我选择了Hugging Face作为数据来源。选择它有以下几个原因。
首先,由于我们的目标是了解大语言模型的工作原理,因此我希望把时间花在模型本身上,而不是编写网页爬虫程序。Hugging Face已经收集了大量经过清洗和整理的数据集,这大大节省了前期准备工作所需的时间。
其次,Hugging Face提供了针对特定语言的数据集。由于我正在构建一个乌尔都语大语言模型,因此需要专门用于乌尔都语的语料数据。而Hugging Face拥有CulturaX这个多语言数据集,其中包含了乌尔都语以及其他多种语言的文本数据。由于这个数据集的规模非常大,所以我并没有下载全部内容,而只下载了其中的一小部分。
重要提示:在开始从Hugging Face下载数据集之前,你需要先创建一个账户。之后登录CLI界面,才能进行数据下载操作。
在下面的代码示例中,我们从Hugging Face加载数据集,并将流式下载模式设置为True。这样我们就无需下载全部数据,而只需下载NUM_samples所指定的样本数量即可。
# =================================================-----------
# 选项A:从CulturaX下载数据集(推荐,质量较高)
# =================-----------
# CulturaX是mC4 + OSCAR数据集的清洗版本
# 我们采用流式下载方式来避免下载全部数据
NUM_samples = 100_000 # 先下载10万个样本(约50-100MB文本)
print("正在从CulturaX下载乌尔都语数据集……")
dataset = load_dataset(
"uonlp/CulturaX",
"ur", # 乌尔都语代码
split="train",
streaming=True, # 不下载全部数据
trust_remote_code=True
)
# 收集样本数据
raw_texts = []
for i, sample in enumerate(tqdm(dataset, total=NUM_samples, desc="正在下载数据…"):
if i > NUM_samples:
break
rawtexts.append(sample["text"])
print(f"已下载{len(raw_texts)}个样本")
print(f>总字符数:{sum(len(t) for t in raw_texts):}")
print(f>示例文本(前500个字符):")
print(raw_texts[0][:500])
数据清洗
仅仅拥有数据是远远不够的,要开始训练模型,下一步才是最重要的步骤:数据清洗。我们的目标是将数据尽可能地净化起来。
由于我正在构建一个专门针对乌尔都语的语言模型,因此必须编写相应的清洗程序,以去除非乌尔都语文本、HTML链接、特殊字符、重复内容以及多余的空白字符。所有这些因素都会污染训练数据,从而在训练过程中引发问题。
根据数据集的类型不同,可能还需要进行一些特定于某种语言或使用场景的数据清洗操作。
其中有一点你可能会觉得比较新奇,那就是NFKC Unicode规范化步骤。这一过程会将那些外观相同但实际存在于不同Unicode编码形式中的文本统一成一种标准格式。
您还会看到一些正则表达式模式,这些模式被用来仅保留乌尔都语文本。由于乌尔都语的书写系统基于阿拉伯语,因此我们使用了阿拉伯语对应的Unicode编码范围。同时,我也去掉了原始数据中存在的//、--这类代码,以及多余的空白字符。
这个清洗过程进行了多次迭代。每次我都手动检查了处理结果,发现了诸如间距不一致、长破折号以及杂乱标点符号等问题。所有这些问题都可能对后续处理阶段产生负面影响,因此彻底进行数据清洗是非常重要的。
这也让我们清楚地认识到:数据在当前的工作中仍然占据着极其重要的地位,而大型语言模型在很大程度上也是依赖数据来运行的。
def clean_urdu_text(text: str) -> str:
"""
清理单个乌尔都语文本文件。
处理步骤:
1. 删除URL地址
2. 删除HTML标签及相关实体
3. 删除电子邮件地址
4. 对Unicode字符进行规范化处理(NFKC标准化)
5. 仅保留乌尔都语字符、基本标点符号、数字以及空白字符
6. 规范化重复出现的标点符号
7. 规范化空白字符的排列方式
$$import unicodedata$$
# 步骤1:删除URL地址
text = re.sub(r'https?://\S+|www\.\S+', '', text)
# 步骤2:删除HTML标签
text = re.sub(r'<[^>>]+>>', '', text)
# 删除HTML实体
text = re.sub(r'&[a-zA-Z]+;', ' ', text)
text = re.sub(r'&#\d+}', ' ', text)
# 步骤3:删除电子邮件地址
text = re.sub(r'\S+@\S+', '', text)
# 步骤4:Unicode规范化处理(NFKC)
# 这一步用于将同一字符的不同表示形式统一为标准形式
text = unicodedata.normalize('NFKC', text)
# 步骤5:仅保留乌尔都语字符、基本标点符号、数字以及空白字符
# 乌尔都语对应的Unicode编码范围 + 阿拉伯语标点符号 + 西方数字 + 基本标点符号
urdu_pattern = regex.compile(
r'[^'
r'\u0600-\u06FF' # 阿拉伯语字符(包括乌尔都语)
r'\u0750-\u077F' # 阿拉伯语补充字符
r'\u08A0-\u08FF' # 阿拉伯语扩展-A字符
r'\uFB50-\uFDFF' # 阿拉伯语表现形式-A字符
r'\uFE70-\uFEFF' # 阿拉伯语表现形式-B字符
r'0-9۰-۹' # 西方数字及阿拉伯数字-印度数字
r'\s' # 白空字符
r'۔،؟!٪' # 乌尔都语标点符号
r'.,:%!?\-\(\)"\']' # 常见拉丁文标点符号
)
text = urdu_pattern.sub(' ', text)
# 步骤6:规范化重复出现的标点符号
text = re.sub(r'۔{2,}', '۔', text)
text = re.sub(r'\.{2,}', '.', text)
text = re.sub(r'-\s*-+', '-', text)
text = re.sub(r'-{2,}', '-', text)
text = re.sub(r',{2,}', '،', text)
text = re.sub(r'{2,}', ',', text)
text = re.sub(r'\s+[۔\.\-,،]\s+', ' ', text)
# 步骤7:规范化空白字符的排列方式
text = re.sub(r'\n{3,}', '\n\n', text) # 最多保留2个换行符
text = re.sub(r'[^\S\n]+', ' ', text) # 合并多余的空白字符,但保留换行符
text = text.strip()
return text
def is_mostly_urdu(text: str, threshold: float = 0.5) -> bool:
"""
检查文本是否主要由乌尔都语字符组成。
这个函数可以过滤掉那些主要包含英语或其他语言内容的文档。
threshold:文本中乌尔都语字符所占的最小比例
"""
if len(text) == 0:
return False
urdu_chars = len(regex.findall(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]', text))
return (urdu_chars / len(text)) > threshold
# 测试清洗函数
sample = raw_texts[0]
print("=== 清洗前的文本 ===")
print(sample[:300])
print("\n=== 清洗后的文本 ===")
cleaned = clean_urdu_text(sample)
print(cleaned[:300])
print(f"\n该文本是否主要由乌尔都语字符组成:{is_mostly.urdu(cleaned)}")
清洗数据后,我将其保存为两种格式:一种是文本文件(用于分词器训练),另一种是JSONL文件(用于预训练)。在后续步骤中,这两种格式各自发挥着不同的作用。
2. 分词处理
清洗数据之后的下一步就是分词处理。分词处理的目的是将文本转换成数字形式,同时也提供了将这些数字重新转换回文本的方法。
这样做是必要的,因为神经网络无法理解文本——它们只能识别数字。因此,分词处理实际上起到了人类语言与模型能够处理的数据之间的桥梁作用。
举个例子:
"hello world" → ["hel", "lo", " world"] → [1245, 532, 995]
"اردو زبان" ← ["ار", "دو", "زب", "ان"] ← [412, 87, 953, 201]
分词处理方法
目前主要有三种分词处理方法:
方法1:字符级别
采用这种方法时,会将文本拆分成一个个单独的字符:
-
hello->['h', 'e', 'l', 'l', 'o'] -
اردو->['ا', 'ر', 'د', 'و']
不过这种方法的缺点在于,处理后的序列会变得非常长。例如,一篇1000词的文档可能会被分解成5000多个分词。模型需要学会将这些字符重新组合成词语,而这显然是一件相当困难的任务。
方法2:词语级别
在这种方法中,会根据词语之间的空格来进行分割:
-
hello how are you->['hello', 'how', 'are', 'you'] -
اردو بہت اچھی زبان ہے->>['اردو', 'بہت', 'اچھی', 'زبان', 'ہے']
然而,这种方法的不足之处在于,许多语言的词汇量都非常庞大(例如乌尔都语有超过10万个独特的词语,英语则有17万个以上)。因此,模型可能无法处理那些新出现的或较为罕见的词语。
方法3:基于BPE的分词方法
采用这种方法时,模型会从数据中学习常见的字符序列。
-
unhappiness可能会被分解为['un', 'happi', 'ness'] -
مکمل可能会被分解为['مکم', 'ل'],如果这个序列比较常见的话,也可能保持原样不变。
这种方法的优点在于所需的词汇量较小(我们仅使用了32K个分词),而且它能够处理任何词语,包括新出现的词语。常见的词语在分割后仍然会保持为一个单独的分词。
BPE是目前业界广泛采用的分词编码标准,GPT、LLaMA以及大多数现代大语言模型都在使用这种方法。其具体工作原理如下:
-
从字符开始:将所有单词拆分成单独的字符。
-
统计相邻字符对:找出出现频率最高的相邻字符对。
-
合并这些字符对:将它们组合成一个新的分词。
-
重复这个过程:直到词汇量的大小达到预期值为止。
这里有一个例子:
开始状态:ار د و ز ب ا ن
合并操作1:'ا ر' -> 'ار'(最常见的组合)
最终结果:ار د و ز ب ا ن
合并操作2:'ز ب' -> 'زب'(第二常见的组合)
最终结果:ار د و زب ا ن
……如此重复进行32,000次合并操作
这就是我们用于构建乌尔都语大语言模型的方法。我使用清洗过的乌尔都语语料库,训练了一个词汇量为32,000个词的BPE分词器。
特殊标记
除了BPE分词器之外,我们还需要添加一些特殊标记。这些标记能为模型在训练和推理过程中提供必要的结构信息。
| 标记 | 作用 | 为何需要这些标记 |
|---|---|---|
<pad> |
用于使序列长度保持一致 | 在批量处理数据时,所有序列的长度必须相同。较短的序列会用<pad>标记来补充长度。 |
<unk> |
用于表示未知词汇 | 如果模型遇到词汇表中不存在的词,它会将其替换为<unk>,而不会导致训练失败。 |
<bos> |
标记序列的开始位置 | 这有助于模型准确判断输入内容的起始点,从而生成更稳定的输出结果。 |
<eos> |
标记序列的结束位置 | 它告诉模型何时停止生成内容。如果没有这个标记,输出结果可能会无限延续下去,或者随机终止。 |
<sep> |
用于分隔不同的数据段 | 在聊天格式中,这种标记可以区分系统提示、用户输入和助手回复,从而帮助模型正确识别各部分的内容。 |
| `< | 用户输入 | >` |
| `< | 助手回复 | >` |
| `< | 系统提示 | >` |
BPE分词器配置参数
我将词汇量设置为32,000个词。这意味着模型的词汇表中将会包含32,000个标记。
这种设置在语言覆盖范围和模型规模之间取得了良好的平衡。如果增加词汇量,嵌入层和输出层的参数数量也会相应增加,从而增加训练的难度。对于一个学习项目来说,32,000这个数值使得模型规模保持在可管理的范围内。
MIN_frequency被设置为2,这意味着某个标记必须在该语料库中出现至少两次才能被纳入词汇表中。这样就可以过滤掉那些只会出现一次的无关标记,避免浪费词汇表空间。
作为参考:GPT-2使用50,000个词的词汇表,而LLaMA则使用32,000个词。我们选择32,000这个数值,是与现有的生产级模型相一致的。
VOCAB_SIZE = 32_000 # 我们词汇表中的标记数量 MIN_frequency = 2 # 一个标记必须出现至少两次才能被收录# 特殊标记——这些标记被赋予了固定的ID SPECIAL_TOKENS = [ "", # ID 0:填充字符 " ", # ID 1:未知词汇 " ", # ID 2:序列开始标志 " ", # ID 3:序列结束标志 " ", # ID 4:分隔符(用于聊天格式) "<|user|>", # ID 5:用户发言标记 "<|assistant|>", # ID 6:助手发言标记 "<|system|>", # ID 7:系统提示标记 ]
构建分词器
接下来,我们将使用之前创建的清洗过的文本文件来生成分词器。首先,我们需要导入所需的库并设置文件路径:
import os
from pathlib import Path
from tokenizers import (
Tokenizer,
models,
trainers,
pre_tokenizers,
decoders,
processors,
normalizers,
)
PROJECT_ROOT = Path(".").resolve().parent
CLEANED_DIR = PROJECT_ROOT / "data" / "cleaned"
TOKENIZER_DIR = PROJECT_ROOT / "tokenizer" / "urdu_tokenizer"
TOKENIZER_DIR.mkdir(parents=True, exist_ok=True)
CORPUS_FILE = str(CLEANED_DIR / "urdu_corpus.txt")
print(f"语料库文件:{CORPUS_FILE}")
print(f"分词器输出目录:{TOKENIZER_DIR}")
# 验证语料库是否存在
assert os.path.exists(CORPUS_FILE), f"在 {CORPUS_FILE} 处未找到语料库文件。请先运行笔记本01!"
file_size_mb = os.path.getsize(CORPUS_FILE) / 1024 / 1024
print(f"语料库大小:{file_size_mb:.1f} MB")
现在我们来配置分词器的各个组件:
# =================================================-----------
# 构建分词器
# ============================================================
# 第一步:创建BPE模型(核心算法)
tokenizer = Tokenizer(models.BPE(unk_token=""))
# 第二步:添加规范化处理步骤(在分词之前对文本进行清洗)
# NFKC用于规范化Unicode字符(例如,同一个阿拉伯字母的不同形式)
tokenizer.normalizer = normalizers.NFKC()
# 第三步:预分词步骤(在应用BPE算法之前如何分割文本)
# 我们使用Metaspace插件,它会将空格替换为□符号,并在这些位置进行分割
# 这样可以保留原始文本中的空格信息,从而便于后续重新构造原文
tokenizer.pre_tokenizer = pre_tokenizers.Metaspace()
# 第四步:解码步骤(如何将分词后的结果转换回文本)
# Metaspace解码器会将□符号还原为空格
tokenizerdecoder = decoders.Metaspace()
# 第五步:配置训练器
trainer = trainers.BpeTrainer(
vocab_size=VOCAB_SIZE,
min_frequency=MIN_frequency,
special_tokens=SPECIAL_TOKENS,
show_progress=True,
initial_alphabet=[] # 从数据中学习字母表
)
print("分词器配置完成,准备开始训练!")
训练分词器
一旦分词器的配置完成,下一步就是运行它。根据你的设备性能,这个过程大约需要5到10分钟的时间。
print("正在训练分词器...(这个过程可能需要几分钟时间)")
tokenizer.train([CORPUS_FILE], trainer)
print(f"\n 分词器训练完成!")
print(f" 词汇表大小:{tokenizer.getvocab_size():,}")
配置后处理步骤(自动添加BOS/EOS标记)
接下来,我们将配置后处理机制,使得分词器能够自动在每个文本序列的开头添加标记,在结尾添加标记。这样一来,我们在每次对文本进行编码时就不需要手动添加这些标记了:
bos_id = tokenizer.token_to_id("<bos>"
eos_id = tokenizer.token_to_id("<eos>“)
tokenizer.post_processor = processors.TemplateProcessing(
single=f"<bos>:0 $A:0 <eos>:0",
pair=f"<bos>:0 \(A:0 <sep>:0 \)B:1 <eos>:1",
special_tokens=[
("<bos>", bos_id),
("<eos>", eos_id),
("<sep>", tokenizer.token_to_id("<sep>“),
],
)
print("后处理程序已配置完成(会自动添加和标记)")
注意:你可能会疑惑:既然我们已经在SPECIAL_TOKENS中定义了<bos>和SPECIAL_TOKENS列表只是为这些标记预留了词汇位置(即为它们分配了ID)。后处理程序的作用是让分词器在编码过程中自动插入这些标记。
如果没有这个步骤,虽然这些标记存在于词汇表中,但除非你每次都手动将它们添加到数据中,否则它们根本不会出现在你的数据中。
测试分词器
分词过程的最后一步就是对其进行测试。测试会将乌尔都语句子编码成标记ID,然后再将这些ID解码回文本形式。如果解码后的文本与原始输入内容一致,那就说明分词器工作正常。这种来回测试可以确保在编码和解码过程中没有信息丢失:
test_sentences = [
"乌尔都语是一种非常优美的语言", # "Urdu is a very beautiful language"
"巴基斯坦的首都是伊斯兰堡", # "The capital of Pakistan is Islamabad"
"今天的天气非常好", # "The weather is very nice today"
"人工智能是未来的技术", # "AI is the technology of the future"
"愿平安与你同在!你最近怎么样?", # "Peace be upon you! How are you?"
]
print("=" * 70)
print("分词器测试结果")
print "=" * 70)
for sentence in test_sentences:
encoded = tokenizer.encode(sentence)
decoded = tokenizer.decode(encoded.ids)
print(f"\n 输入内容: {sentence}")
print(f" 标记ID: {encoded.ids}")
print(f" 分词结果: {encoded.tokens}")
print(f" 解码后的文本: {decoded}")
print(f" 分词数量: {len(encoded ids)}")
print(f" 测试通过与否: {sentence in decoded}")
print("-" * 70)
测试的输出结果如下:
======================================================================
分词器测试结果
======================================================================
输入内容: 乌尔都语是一种非常优美的语言
标记ID: [2, 1418, 324, 431, 2965, 1430, 276, 3]
分词结果: ['
测试通过与否: True
----------------------------------------------------------------------
输入内容: 巴基斯坦的首都是伊斯兰堡
标记ID: [2, 474, 289, 3699, 616, 1004, 276, 3]
分词结果: ['
测试通过与否: True
请注意,和这些标记会自动被添加进来(这要归功于我们的后处理步骤);像پاکستان这样的常见乌尔都语单词则会被保持为单独的标记。此外,"前缀用于标识词界,这一功能源自Metaspace预分词器。最重要的是,每次处理都能成功完成,也就是说解码后的文本与原始输入完全一致。
生育率得分
“生育率”指的是每个单词平均包含多少个标记。
-
当生育率为1时,意味着每个单词只对应一个标记(这是理想情况,但在现代的子词分词器中很难实现)。
-
在现代的大型语言模型中,生育率通常在1.3到2.5之间,具体数值会因语言的不同而有所差异。
-
较高的生育率意味着需要进行更多的标记分割操作,这会增加处理成本并降低效率;不过这一指标也受到语言复杂性的影响,而不仅仅是分词器的质量。
# =================================================-----------
# 计算训练语料库的生育率得分
# =================================================-----------
import json
jsonl_file = CLEANED_DIR / "urdu_corpus.jsonl"
corpus_words = 0
corpus_tokens = 0
sample_size = 10000 # 为提高效率,抽取1万份文档进行测试
print(f"正在计算语料库中{sample_size:,}份文档的生育率得分...")
with open(jsonl_file, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= sample_size:
break
doc = json.loads(line)
text = doc["text"]
words = text.split()
tokens = tokenizer.encode(text).tokens
n_tokens = len(tokens) - 2 # 移除和标记
corpus_words += len(words)
corpus_tokens += n_tokens
corpus_fertility = corpus_tokens / corpus_words
print(f"\n📊 生育率得分(整个语料库):{corpus_fertility:.2f}个标记/单词")
print(f" (总共有{corpus_words:,}个单词,因此总标记数为{corpus_tokens:,}个)")
print(f" 被抽取用于测试的文档数量为:{min(i+1, sample_size):,}")
if corpus_fertility < 2.0:
print(" ✅ 非常好!这个分词器针对乌尔都语进行了很好的优化。")
elif corpus_fertility < 3.0:
print(" ⚠️ 还不错,但还有提升空间。可以考虑扩大词汇量。")
else:
print(" ❌ 生育率过高,说明这个分词器还需要进一步改进。")
我们得到的生育率得分是1.04,这个数值已经相当不错了。不过需要注意的是,由于这个分词器是在与其评估所用相同的小规模语料库上训练出来的,因此这个得分实际上被人为地压低了。如果使用规模更大或未曾见过的数据集进行测试,生育率很可能会更高(接近通常在1.3到2.5之间的范围)。
保存分词器
最后一步是将分词器保存为JSON格式,并验证其能否正确被加载:
# =================================================-----------
# 保存分词器
# =================================================-----------
tokenizer_path = str(TOKENIZER_DIR / "urdu_bpe_tokenizer.json")
tokenizer.save(tokenizer_path)
print(f"分词器已保存至:{tokenizer_path}")
print(f" 文件大小为:{os.path.getsize(tokenizer_path) / 1024:.0f} KB")
# 验证分词器能否成功被加载
loaded_tokenizer = Tokenizer.from_file(tokenizer_path)
test = loaded_tokenizer.encode("اردو ایک خوبصورت زبان ہے")
print(f"\n 验证结果:{test.tokens}")
print(f" 分词器已成功加载!")
一旦保存完成,我们就得到了一个查询表。利用这个查询表以及我们的数据集,就可以进行下一步重要的操作:预训练。
3. 预训练
在预训练阶段,模型会学习语言、语法、模式以及词汇。当预训练完成之后,模型就能够预测序列中接下来的单词是什么,而也正是通过这个过程,原始数据才逐渐转化成了大语言模型。
实际上,大语言模型就是用于预测下一个单词的模型。给定一个单词序列,它们会预测出最有可能出现的下一个单词。
在训练的过程中,模型会学习到以下内容:
-
该语言的语法结构
-
语义信息,即词语在特定上下文中的含义
-
常用的表达方式
-
训练数据集中包含的各种事实性信息
对于模型的训练来说,你有多种选择。由于模型体积较小,你也可以在本地机器上进行训练。虽然训练速度会较慢,但最终还是能够完成训练任务的。
另一种选择是使用Google Colab。我自己就是使用的这个工具——对于我所需的训练任务而言,免费版本就已经足够了,而且使用T4 GPU进行训练也非常方便。
预训练的步骤
-
将数据集的JSONL文件以及分词器上传到Google Drive中。
-
设置模型配置参数(词汇表大小、层数、注意力头数等)。
-
定义模型的架构结构,包括注意力机制、前馈层等组成部分。
-
将数据集加载进来并进行分词处理,然后将其分为训练集和验证集。
-
运行训练循环,设置优化器、学习率调度策略以及检查点保存机制。
模型配置参数
from dataclasses import dataclass
@dataclass
class UrduLLMConfig:
# 词汇表相关参数
vocab_size: int = 32_000
pad_token_id: int = 0
bos_token_id: int = 2
eos_token_id: int = 3
# 模型架构相关参数
d_model: int = 384
n_layers: int = 6
n_heads: int = 6
d_ff: int = 1536 # 4 * d_model
dropout: float = 0.1
max_seq_len: int = 256
# 训练相关参数
batch_size: int = 32
learning_rate: float = 3e-4
weight_decay: float = 0.1
max_epochs: int = 10
warmup_steps: int = 500
grad_clip: float = 1.0
配置参数说明:
词汇表相关的参数(vocab_size、pad_token_id、bos_token_id、eos_token_id)主要是为了与我们之前构建的分词器相匹配。其中,vocab_size被设置为32K,这对应于我们使用的BPE词汇表;而那些特殊的标记ID(0、2、3)则分别代表了我们在分词器训练过程中为各个单词分配的位置。
模型架构相关参数:
| 参数名称 | 含义 | 示例值 | 数值变化带来的影响 |
|---|---|---|---|
d_model |
每个标记对应的嵌入向量大小 | 384 | 数值越大,模型对语言的理解能力越强,但训练速度会变慢,所需内存也会增加;数值较小则相反。 |
n_layers |
模型的层数 | 6 | 层数越多,模型对语言的深层理解能力越强,但计算延迟也会增加;层数较少则训练速度较快,但模型表现可能不够强大。 |
n_heads |
每层中的注意力头数 | 6 | 注意力头数越多,模型对上下文的捕捉能力越强;头数过少则效果会减弱。 |
d_ff |
前馈层的规模 | 1536 | 数值越大,模型的计算能力越强;数值较小则计算速度较快,但模型处理信息的效率会降低。 |
dropout |
训练过程中被丢弃的神经元所占的比例 | 0.1 | 比例越高,有助于防止模型过拟合,但可能会导致欠拟合;比例过低则容易发生过拟合现象。 |
max_seq_len |
输入序列中允许的最大单词数量 | 256 | 数值越大,模型能够处理的上下文信息就越丰富,但训练速度会变慢,计算资源消耗也会增加;数值较小则相反。 |
训练超参数:
| 变量 | 含义 | 示例值 | 数值变化的影响 |
|---|---|---|---|
batch_size |
每步训练所处理的样本数量 | 32 | 数值越大:训练速度越快,但需要更多内存;数值越小:模型运行更稳定,但训练速度较慢 |
learning_rate |
参数更新的步长 | 0.0003 | 数值过高:训练过程不稳定;数值过低:学习进度非常缓慢 |
weight_decay |
正则化强度 | 0.1 | 数值越大:有助于减少过拟合现象;数值过低:容易发生过拟合 |
max_epochs |
完整数据集被处理的次数 | 10 | 次数越多:学习效果越好,但过拟合风险也越大;次数太少:模型训练不充分 |
warmup_steps |
逐步增加学习率的具体步骤数 | 500 | 步骤数越多:训练开始时模型表现更平稳,训练过程更安全;步骤数太少:存在早期出现不稳定现象的风险 |
grad_clip |
梯度值的最大上限 | 1.0 | 数值越小:学习过程越稳定,但速度较慢;数值越大:存在梯度爆炸的风险 |
Transformer架构
接下来是训练的核心部分:编写Transformer架构代码。在开始编写代码之前,了解什么是Transformer架构是非常重要的。
如果你想深入了解Transformer的本质以及它与RNN、CNN的区别,我推荐你阅读这篇文章:AWS:人工智能中的Transformer是什么
简而言之:
"Transformer是一种神经网络架构,它能够将输入序列转换为输出序列。"
最初的Transformer论文中同时介绍了编码器和解码器两部分;但像我们这样的GPT风格模型只使用了解码器部分,这种架构被称为仅解码器架构。
解码器会接收一系列符号,然后运用自注意力机制来理解这些符号之间的关系,并预测下一个应该出现的符号是什么。
正是自注意力机制使得Transformer如此强大:与RNN逐个处理符号不同,Transformer可以同时考虑所有之前的符号,从而确定哪些符号对当前预测最为关键。
以下是完整的Transformer代码,其中每个组件的功能都会被详细解释:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.n_heads = config.n_heads
self.d_model = config.d_model
self.head_dim = config.d_model // config.n_heads
self.qkv_proj = nn.Linear(config.d_model, 3 * config.d_model)
self.out_proj = nn.Linear(config.d_model, config.d_model)
selfdropout = nn.Dropout(config.dropout)
def forward(self, x, mask=None):
B, T, C = x.shape
qkv = self.qkv_proj(x)
qkv = qkv.reshape(B, T, 3, self.n_heads, self.head_dim)
qkv = qkv.permute(2, 0, 3, 1, 4)
q, k, v = qkv[0], qkv[1], qkv[2]
attn = (q @ k.transpose(-2, -1)) * (self.head_dim ** -0.5)
if mask is not None:
attn = attn.masked_fill(mask == 0, float('-inf'))
attn = Fsoftmax(attn, dim=-1)
attn = selfdropout(attn)
out = attn @ v
out = out.transpose(1, 2).reshape(B, T, C)
out = self.out_proj(out)
return out
class FeedForward(nn.Module):
def __init__(self, config):
super().__init__()
self.fc1 = nn.Linear(config.d_model, config.d_ff)
self.fc2 = nn.Linear(config.d_ff, config.d_model)
selfdropout = nn.Dropout(config.dropout)
def forward(self, x):
x = F.gelu(self.fc1(x))
x = selfdropout(x)
x = self.fc2(x)
return x
class TransformerBlock(nn.Module):
def __init__(self, config):
super().__init__()
self.ln1 = nn.LayerNorm(config.d_model)
self.attn = MultiHeadSelfAttention(config)
self.ln2 = nnLayerNorm(config.d_model)
self.ff = FeedForward(config)
selfdropout = nn.Dropout(config.dropout)
def forward(self, x, mask=None):
x = x + selfdropout(self.attn(self.ln1(x), mask))
x = x + selfdropout(self(ff(self.ln2(x)))
return x
class UrduGPT(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.token_emb = nn.Embedding(configvocab_size, config.d_model)
self.posEmb = nn.Embedding(config.max_seq_len, config.d_model)
selfdropout = nn.Dropout(config.dropout)
self.blocks = nn.ModuleList([
TransformerBlock(config) for _ in range(config.n_layers)
])
self.ln_f = nn.LayerNorm(config.d_model)
self.head = nn.Linear(config.d_model, configvocab_size, bias=False)
# Weight tying
self.head.weight = self.token_emb.weight
self.apply(self._init_weights)
def _init_weights(self, module):
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def forward(self, input_ids, targets=None):
B, T = input_ids.shape
device = input_ids.device
tok_emb = self.token_emb(input_ids)
pos = torch.arange(0, T, dtype=torch.long, device=device)
pos_emb = self.pos_emb(pos)
x = selfdropout(tok_emb + pos_emb)
# Causal mask
mask = torch.tril(torch.ones(T, T, device=device)).unsqueeze(0).unsqueeze(0)
for block in self.blocks:
x = block(x, mask)
x = self.ln_f(x)
logits = self.head(x)
loss = None
if targets is not None:
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
return {'logits': logits, 'loss': loss}
@torch.no_grad()
def generate(self, input_ids, max_new_tokens=100, temperature=0.8,
top_k=50, top_p=0.9, eos_token_id=None):
"""
Generate text autoregressively.
Sampling strategies:
- temperature: Controls randomness (low = deterministic, high = creative)
- top_k: Only consider the top K most likely tokens
- top_p (nucleus): Only consider tokens whose cumulative probability <= p
- eos_token_id: Stop generating when this token is produced
"""
self.eval()
eos_token_id = eos_token_id or getattr(self.config, 'eos_token_id', None)
for _ in range(max_new_tokens):
idx_cond = input_ids if input_ids.size(1) <= self.config.max_seq_len \
else input_ids[:, -self.config.max_seq_len:]
outputs = self.forward(idx_cond)
logits = outputs["logits"][:, -1, :] / temperature
# Top-K filtering
if top_k > 0:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = float('-inf')
# Top-P (nucleus) filtering
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(Fsoftmax(sorted_logits, dim=-1), dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
sorted_indices_to_remove[:, 0] = 0
indices_to_remove = sorted_indices_to_remove.scatter(
1, sorted_indices, sorted_indices_to_remove
)
logits[indices_to_remove] = float('-inf')
probs = Fsoftmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
input_ids = torch.cat([input_ids, next_token], dim=1)
if eos_token_id is not None and next_token.item() == eos_token_id:
break
return input_ids
这段代码构建了一个文本预测模型。你向它提供一些乌尔都语单词,它会不断猜测下一个单词,直到形成一句完整的句子。ChatGPT的工作原理也是如此,只不过它的规模要大得多。
Transformer代码构成解析
1. MultiHeadSelfAttention:“回溯机制”
想象一下你在阅读一句话时,当你看到“اس”这个词时,你的大脑会回溯之前的内容来确定“这个”指的是什么。这就是所谓的“注意力机制”。
Q、K、V:可以把它们想象成一座图书馆:
-
查询向量(Q): “我在寻找关于X的信息”
-
键向量(K): 每一个前面的单词都相当于在说:“我有关于Y的信息”
-
值向量(V): 这个单词所承载的实际信息
6个“阅读器”:意味着有6个不同的机制同时在分析这句话。其中一个可能关注语法,另一个关注含义,还有一个会分析附近的单词,以此类推。
因果屏蔽规则:规定“你只能查看出现在你之前的单词,而不能查看之后的单词。”(因为在生成新单词时,那些未来的单词还并不存在!)
计算过程: 首先将Q与K相乘,从而得出每个单词的“重要性得分”;然后利用这些得分从V中提取最有用的信息。
2. 前馈层:“思考阶段”
在确定了哪些单词重要之后,模型就会开始思考这些单词的含义。
这一过程只包含两层结构:
-
信息扩展(384 → 1536): 为模型提供更多的“思考空间”
-
信息压缩(1536 → 384): 将处理后的结果重新压缩成适合后续处理的格式
-
GELU激活函数: 一种用于决定“保留这些信息”还是“舍弃它们”的机制(处理过程较为平滑,不会产生突变)
3. TransformerBlock:“一轮阅读过程”
模型会进行一次阅读并思考这句话的含义。
-
步骤1: 分析其他单词(运用注意力机制)
-
步骤2: 思考所分析的内容(执行前馈处理)
-
LayerNorm层: 用于在各个处理阶段之间保持数值的稳定性,防止数值过大或过小
-
残差连接机制(
x + ……): 模型会保留原有的思考结果,并结合新的分析结果进行进一步处理。这就像做笔记一样:不会擦掉旧笔记,而是添加新内容。
模型会重复这个过程6次(即通过6个TransformerBlock),每一轮都会让对文本的理解更加深入一些。
4. UrduGPT:“完整系统”
初始化设置(__init__):
-
词元嵌入: 一个庞大的查找表。32,000个乌尔都语单词或子词每个都会对应一组384个数字,这些数字代表了它们的“含义”。
-
位置嵌入: 另一个查找表,用于告诉模型“这个单词位于第1位,那个单词位于第2位……”(否则模型就无法理解单词的顺序)
-
6个TransformerBlock: 上述描述的那6轮阅读过程
-
语言模型头部: 在最后,这个部分会将模型内部的“思考结果”(即384个数字)转换成32,000个可能的下一个单词对应的分数。
-
权重共享机制: 输入查找表和输出评分表使用相同的数据。这种设计可以节省内存,而且实际上效果更好!
处理过程(forward):
-
查找每个单词的含义(将其转化为嵌入向量)
-
添加位置信息
-
进行6轮注意力机制计算及逻辑分析
-
为每一个可能的后续单词计算得分
-
如果我们知道正确答案,就能计算出自己的错误程度(即损失值)
生成文本过程(generate): 这个过程只需要一个简单的循环即可完成:
-
将目前已有的单词输入到系统中
-
为下一个要生成的单词计算得分
-
“温度”参数: 用于控制文本生成的创造性。数值较低时,生成的内容较为保守、可预测;数值较高时,生成的内容则更加大胆、富有创意
-
“Top-K”机制: 只考虑K个最佳选项,忽略其余31,950个不太可能的单词
-
“Top-P”机制(核心部分): 动态地选择那些累积概率达到设定阈值的最小数量的单词来生成文本
-
从剩余的选项中随机选取一个单词
-
将其添加到句子中,然后重新开始上述流程
-
当生成到
<eos>标记时,或当累计生成的单词数量达到max_new_tokens限制时,停止这个过程
加载数据集并开始训练
首先,我们加载JSONL格式的数据集,并将其中的每篇文档都转换成由单词ID组成的长序列。然后,我们将这些数据集按90/10的比例分为训练集和验证集,并使用PyTorch的Dataset类将这些数据集组织成适合进行后续预测处理的结构:
import json
from tokenizers import Tokenizer
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
# 确定计算设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用计算设备:{device}")
# 加载分词器
tokenizer = Tokenizer.from_file(TOKENIZER_PATH)
print(f"分词器加载完成。词汇表大小为:{tokenizer.getvocab_size():,}")
# 加载并处理数据集
print("正在加载数据集...")
all_token_ids = []
with open(DATA_PATH, "r", encoding="utf-8") as f:
for line in tqdm(f, desc="分词中..."):
doc = json.loads(line)
encoded = tokenizer.encode(doc["text"])
all_token_ids.extend(encoded.ids)
all_token_ids = torch.tensor(all_token_ids, dtype=torch.long)
print(f>总单词数量为:{len(all_token_ids):,}")
class UrduTextDataset(Dataset):
def __init__(self, token_ids, seq_len):
self.token_ids = token_ids
self.seq_len = seq_len
self.n_chunks = (len(token_ids) - 1) // seq_len
def __len__(self):
return self.n_chunks
def __getitem__(self, idx):
start = idx * self(seq_len
chunk = self.token_ids[start:start + self.seq_len + 1]
return chunk[:-1], chunk[1:] # 输入数据与目标数据(相差1个位置)
config = UrduLLMConfig()
# 分割数据集为训练集和验证集
split_idx = int(len(all_token_ids) * 0.9)
train_dataset = UrduTextDataset(all_token_ids[:split_idx], config.max_seq_len)
val_dataset = UrduTextDataset(all_token_ids[split_idx:], config.max_seq_len)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size/config.batch_size)
print(f>训练集包含{len(train_dataset):,}个数据块")
print(f>验证集包含{len(val_dataset):,}个数据块)
每个数据块的长度为256个标记。`__getitem__`方法会返回一个`(input, target)`元组,其中`target`表示输入数据向右移动一位后的结果,而这正是进行下一个标记预测所需要的数据。
对我来说,训练过程耗时大约3个小时,共完成了3个训练周期。实际上应该进行10个周期的训练,但在进行了3个周期后,我就达到了Google Colab提供的免费资源限制。由于训练的目的是为了学习,所以我使用了生成出来的模型,并将其保存在了Drive中。
以下是完整的训练代码:
# 优化器设置
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weightdecay)
# 学习率调整策略
total_steps = len(train_loader) * config.max_epochs
def get_lr(step):
if step < config.warmup_steps:
return configlearning_rate * step / config.warmup_steps
progress = (step - config.warmup_steps) / (total_steps - config.warmup_steps)
return config.learning_rate * 0.5 * (1 + math.cos(math.pi * progress))
# 开始训练
history = {'train_loss': [], 'val_loss': []}
global_step = 0
best_val_loss = float('inf')
for epoch in range(config.max_epochs):
model.train()
epoch_loss = 0
pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
for input_ids, targets in pbar:
input_ids, targets = input_ids.to(device), targets.to(device)
lr = get_lr(global_step)
for g in optimizer.param_groups:
g['lr'] = lr
outputs = model(input_ids, targets)
loss = outputs['loss']
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip)
optimizer.step()
epoch_loss += loss.item()
global_step += 1
pbar.set_postfix({'loss': f'{loss.item():.4f}'))
# 验证模型性能
model.eval()
val_loss = 0
with torch.no_grad():
for input_ids, targets in val_loader:
input_ids, targets = input_ids.to(device), targets.to(device)
val_loss += model(input_ids, targets)['loss'].item()
val_loss /= len(val_loader)
train_loss = epoch_loss / len(train_loader)
history['train_loss'].append(train_loss)
history['val_loss'].append(val_loss)
print(f"Epoch {epoch+1}: Train={train_loss:.4f}, Val={val_loss:.4f}")
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), f"{DRIVE_PATH}/best_model.pt")
print(f"Best model saved!”
print(f"\nDone! Best val loss: {best_val_loss:.4f}")
现在让我们来详细分析一下这段训练代码的每一部分功能。
逐行解释训练代码
1. 优化器设置
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weightdecay)
`AdamW`优化器为每个模型参数维护两个运行中的统计数值,因此会在内存中占用额外的空间(2300万个参数 × 2 = 4600万个额外数据)。
-
第一矩(动量):梯度的指数移动平均值。这种计算方法能够平滑掉那些包含噪声的更新值,从而避免优化器在训练过程中出现剧烈波动。
-
第二矩:梯度平方的指数移动平均值。这种方法能为每个参数设定不同的自适应学习率:更新频率较高的参数会使用较小的步长进行训练,而更新频率较低的参数则会使用较大的步长。
-
权重衰减(0.1):在每次迭代过程中,权重都会被乘以
(1 - lr × 0.1),这样权重值就会逐渐减小。这种机制属于L2正则化,它能够防止某个权重值变得过大,从而有效减少过拟合现象。在AdamW算法中,“W”这个符号表示这种权重衰减机制是与梯度更新过程分离开的——权重衰减是直接应用于权重值的,而不是像普通的Adam算法那样与梯度混合后再进行应用。
2. 学习率调度策略
total_steps = len(train_loader) * config.max_epochs # 例如:500个训练批次 × 10个训练周期 = 5000步
def get_lr(step):
if step < config.warmup_steps: # 第一阶段:0到499步
return configlearning_rate * step / config.warmup_steps # 线性递增:从0逐渐增加到3e-4
progress = (step - config.warmup_steps) / (total_steps - config.warmup_steps) # 进度值,范围为0.0到1.0
return config.learning_rate * 0.5 * (1 + math.cos(math.pi * progress)) # 最终学习率值,范围为3e-4到接近0
-
热身阶段(前500步):在训练开始时,权重值是随机设定的,梯度方向也是随机的;如果使用过高的学习率,就会导致参数值发生剧烈变化,从而影响训练效果。因此,我们通过让学习率从0线性递增到3e-4,让损失函数的值“稳定”下来,然后再进行更大幅度的参数调整。
-
余下步骤中的衰减机制:
公式
0.5 × (1 + cos(π × progress))可以生成一条从1.0逐渐减小到0的平滑S形曲线。当这个值乘以初始学习率时,就会得到如下效果: -
训练初期:
学习率较高,参数变化幅度较大,因此损失值会迅速下降。
-
训练后期:
学习率较低,参数调整幅度较小,这样就可以进行精细调优,同时避免陷入局部最小值。
学习率变化过程:0 ── 线性递增──▶ 最高值 ── 平滑下降曲线──▶ 接近0
| | |
| 热身阶段 | 副角函数衰减 |
3. 跟踪变量
history = {'train_loss': [], 'val_loss': []} # 用于后续绘制曲线
global_step = 0 # 计录所有训练周期中的总训练批次数(用于学习率调度)
best_val_loss = float('inf') # 记录最佳验证损失值;初始值为无穷大,因此任何实际的验证损失值都会比它小
4. 训练循环
外层循环:训练周期
for epoch in range(config.max_epochs):
model.train() # 启用dropout机制(随机将10%的激活值设为0,以起到正则化作用)
每个训练周期意味着完整地处理一遍所有训练数据。我们会重复进行max_epochs轮这样的操作。
内层循环:批量处理
1. 将数据传输到GPU:
input_ids, targets = input_ids.to(device), targets.to(device)
将张量数据从CPU内存传输到GPU显存。在变换器模型中,矩阵乘法运算(如注意力机制和FFN层)在GPU上可以快50到100倍,这是因为GPU具有强大的并行处理能力。
2. 手动更新学习率:
lr = get_lr(global_step)
for g in optimizer.param_groups:
g['lr'] = lr
PyTorch的AdamW优化器并不支持自定义学习率调整策略,因此我们必须手动在每一步中更新学习率。param_groups是一个列表(在这里只包含一个参数组),每个参数组都可以拥有自己的学习率和权重衰减系数。
3>前向传播:
outputs = model(input_ids, targets)
loss = outputs['loss']
输入的标记序列会依次经过以下处理流程:嵌入层 → 6个变换器模块 → 长短时记忆头层 → 最终得到logits值。损失函数计算的是logits值(其数据结构为[batch, seq_len, 32000])与目标标记ID之间的交叉熵损失。这个损失值反映了模型认为正确下一个标记出现的概率,它是通过对所有位置和批量数据进行平均计算得出的。
4>反向传播 + 更新参数:
optimizer.zero_grad() # 将所有参数的梯度重置为0(默认情况下这些梯度会不断累积)
loss.backward() # 进行反向传播计算,利用链式法则求出2300万个参数的∂loss/∂θ值
torch.nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip) # 如果某个梯度的L2范数大于1.0,就将其缩放为1.0/norm的大小
optimizer.step() # 根据更新规则计算新的参数值:θ_new = θ_old - lr × adam_adjusted_gradient - lr × weight_decay × θ_old
-
zero_grad(): PyTorch默认会累积梯度值(这种机制在处理小批量数据时非常有用)。但在每次进行反向传播计算之前,我们必须手动清除这些累积的梯度。 -
loss.backward(): 反向传播过程会沿着计算图逆向遍历,利用链式法则计算出每个参数的∂loss/∂θ值。这一步骤的计算量与前向传播过程相当大,属于计算成本最高的环节。 -
梯度裁剪: 将所有参数的梯度值连接成一个向量,然后计算这个向量的L2范数。如果某个梯度的范数超过了1.0,就会将其缩放为1.0/norm的大小,这样既能保持梯度的方向,又能限制其大小,从而避免某些特殊情况下导致的异常大的参数更新幅度,这些情况可能会破坏训练过程的稳定性。
-
optimizer.step(): AdamW优化器会结合动量机制、自适应的学习率调整方式以及独立的权重衰减策略来更新参数值。
5>记录训练进度:
epoch_loss += loss.item() # .item()方法用于将CUDA张量中的数值转换为Python浮点数类型,这样可以避免GPU内存泄漏的问题
global_step += 1 # 更新全局步数,以便后续根据步数来调整学习率
pbar.set_postfix({'loss': ...}) # 更新tqdm进度条的显示内容
6. 验证
model.eval() # 关闭dropout机制,以便使用模型的全部计算能力进行准确评估
val_loss = 0
with torch.no_grad(): # 不进行梯度计算,这样可以节省约50%的内存并提高运行速度
for input_ids, targets in val_loader:
input_ids, targets = input_ids.to(device), targets.to(device)
val_loss += model(input_ids, targets)['loss'].item()
val_loss /= len(val_loader) # 计算每批数据的平均损失值
这一过程会在模型从未训练过的数据集上进行测试。通过比较训练损失与验证损失,我们可以得出以下结论:
| 模式 | 含义 |
|---|---|
| 两者都在下降 | 模型正在学习具有普遍适用性的规律 |
| 训练损失下降,验证损失停滞/上升 | 过拟合:模型只是机械地记忆数据,而没有真正进行学习 |
| 两者都处于较高水平且变化不大 | 欠拟合:模型需要更多的计算能力或更多数据才能达到最佳效果 |
model.eval()这一命令会关闭dropout机制,这样我们就可以使用模型的全部功能来进行评估了。torch.no_grad()则用于跳过梯度计算,因为我们现在只是在进行测试,而不是进行学习。
7. 检点保存
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), f"{DRIVE_PATH}/best_model.pt")
model.state_dict()会返回一个OrderedDict对象,该对象将参数名称与对应的张量关联起来。torch.save则使用Python的pickle和zip函数将这些数据序列化到磁盘上。只有当验证损失有所改善时,我们才会保存这些检查点。
这种做法本质上属于“提前停止训练”策略:无论后续训练过程中发生什么,我们都会保留那个能够产生最佳泛化效果的检查点。
总结:六步完成一个训练批次
-
将32个乌尔都语序列输入模型 → 获取预测概率
-
计算预测概率与实际下一个字符之间的交叉熵损失 → 从而了解预测的准确程度
-
通过2300万个参数进行反向传播计算 → 确定每个参数需要调整的方向
-
将梯度范数限制在1.0以内 → 防止模型出现不稳定现象
-
使用AdamW算法结合动量和衰减机制更新参数 → 这才是真正的学习过程
-
重复这个过程约5000次,然后保存最佳检查点 → 训练完成
关键指标
交叉熵损失用于衡量预测概率分布与实际下一个字符之间的差距。对于一个包含32000个单词的模型来说,其交叉熵损失大约为ln(32000)≈10.4。
困惑度 = e^损失值,这个指标可以理解为“模型当前是在N个可能性相同的选项中做出选择”。
-
当困惑度为32,000时,意味着模型完全是随机猜测的
-
当困惑度为100时,说明模型已经将候选选项范围缩小到了大约100个
-
当困惑度为10时,表示模型的预测结果具有较高的可靠性
一旦训练完成并将模型保存到云端,下一步就是将其下载到本地计算机上,以便继续后续步骤。
现在,我们已经得到了一个准备好的模型,但随之而来的是一个问题:这个模型是否已经发展到我们可以像使用ChatGPT、Claude或Copilot这样的AI工具一样与它进行对话的程度呢?答案是没有,它还远远没有达到这个阶段。为什么呢?
模型的训练部分已经完成,但它还不知道该如何以对话的形式来组织语言或撰写回答内容。这就是我们所说的有监督微调(SFT)步骤。
4. 有监督微调(SFT)
从宏观的角度来看,在有监督微调的过程中,我们教会模型如何回应用户的查询。这就像是给它提供一些示例,让它从中学习如何给出正确的回答。示例越多,模型的响应质量就会越高。因此,有监督微调的本质就是将模型转变为一个能够进行对话的智能体。
为了实现这一目标,我们需要创建一组包含以下关键信息及相应格式的示例数据集:
{
"conversations": [
{"role": "system", "content": "您是一位辅助工具,能使用乌尔都语为您提供帮助。"},
{"role": "user", "content": "请问……"},
{"role": "assistant", "content": "回答是……"}
]
}
大约79个示例会被输入到系统中,并以JSONL格式保存下来。在实际应用中,所需的示例数量会更多。正如我之前所说,更多的示例确实能够带来更好的效果。
为训练而格式化对话内容
下一步就是将上面保存的对话内容进行格式化,以便用于训练。这个步骤属于有监督微调流程中的对话格式化环节。它的作用是将原始的对话JSON数据转换成包含损失掩码的标记序列,这样模型就能只学习如何生成辅助角色的回答内容。
所谓“损失掩码”,就是我们有意地隐藏输入数据中的某些部分,不让它们被用于计算训练损失。在这种情况下,我们会屏蔽系统提示语和用户的输入信息,这样模型就不会被训练去记忆或复制这些内容。训练信号仅来自辅助角色的回答,而这个回答正是教会模型该生成什么内容以及何时停止生成的关键依据。
第1部分:禁用自动格式化并获取特殊标记ID
tokenizer.no_padding()
BOS_ID = tokenizer.token_to_id("<bos>") # 2
EOS_ID = tokenizer.token_to_id("<eos>") # 3
SEP_ID = tokenizer.token_to_id("<sep>") # 4
PAD_ID = tokenizer.token_to_id("<pad>") # 0
USER_ID = tokenizer.token_to_id("<|user|>") # 5
ASSISTANT_ID = tokenizer.token_to_id("<|assistant|>") # 6
SYSTEM_ID = tokenizer.token_to_id("<|system|>") # 7
IGNORE_INDEX = -100
-
no_padding(): 这个指令告诉分词器“不要自动添加填充字符,这些我会自己处理”。我们需要完全控制标记序列的格式。 -
我们获取每个特殊标记对应的整数ID,这样就可以在训练过程中手动将它们插入到正确的位置。
-
IGNORE_INDEX = -100: PyTorch的cross_entropy函数具有一个内置功能:任何被设置为-100的标签都会在损失计算中被忽略。我们正是利用这一机制来实现损失掩码功能的。
第2部分:`format_conversation()`函数:核心功能
该函数接收一段对话内容,然后生成两个平行数组:
input_ids: [BOS, SYSTEM, 您, 一个, 助手, ..., SEP, 用户, 巴基斯坦, 的, ..., SEP, 助理, 伊斯兰堡, 是, EOS, PAD, PAD, ...]
labels: [-100, -100, -100, -100, -100, ..., -100, -100, -100, -100,..., -100, -100, 伊斯兰堡, 是, EOS, -100, -100, ...]
函数内部的处理步骤:
1. 从BOS开始:
input_ids = [BOS_ID]
labels = [IGNORE_INDEX] # 不需要让模型学习如何预测BOS
2. 对于对话中的每一轮内容,首先对其进行编码处理,同时去除自动添加的BOS/EOS标记:
content_ids = tokenizer.encode(content).ids
if content_ids[0] == BOS_ID: content_ids = content_ids[1:] # 如果tokenizer自动添加了BOS,则将其删除
if content_ids[-1] == EOS_ID: content_ids = content_ids[:-1]
我们去除这些标记是因为我们是手动在特定位置插入特殊符号的,因此不希望出现重复。
3. 为每种角色生成对应的标记序列:
| 角色 | 标记序列 | 标签 |
|---|---|---|
| 系统 | [SYSTEM_ID] + 内容 + [SEP_ID] |
全部为-100(被屏蔽) |
| 用户 | [USER_ID] + 内容 + [SEP_ID] |
全部为-100(被屏蔽) |
| 助手 | [ASSISTANT_ID] + 内容 + [EOS_ID] |
[-100] + 内容 + [EOS_ID] |
助手角色的标记 `<|assistant|>` 也被屏蔽了,因为我们不希望模型学习如何预测这个角色。不过助手的实际回答内容以及 `` 标记是有标签的,因此模型可以从中学习到:
-
该说些什么(即回答内容)
-
何时应该停止发言(通过识别 `
` 来判断)
4. 对序列进行截断和填充:
input_ids = input_ids[:max_len] # 将序列长度限制在256个标记以内
pad_len = max_len - len(input_ids)
input_ids = input_ids + [PAD_ID] * pad_len
labels = labels + [IGNORE_INDEX] * pad_len # 填充后的标签也被视为无效数据
在进行批量训练时,所有序列的长度必须相同。填充用的标签值为-100,因此在计算损失时会忽略这些标签。
以下是完整的 `format_conversation()` 函数代码:
def format_conversation(conversation: dict, max_len: int = 256) -> dict:
"""
将对话数据转换为标记ID和标签,以便用于SFT训练。
格式:<bos><|系统|>>...<sep><|用户|>>>...<sep><|助手|>>>...<eos>
标签说明:系统/用户角色的标记为-100(被屏蔽),助手角色的标记为其实际ID。
"""
input_ids = [BOS_ID]
labels = [IGNORE_INDEX]
for turn in conversation["conversations"]:
role = turn["role"]
content = turn["content"]
content_ids = tokenizer.encode(content).ids
if content_ids and content_ids[0] == BOS_ID:
content_ids = content_ids[1:]
if content_ids and content_ids[-1] == EOS_ID:
content_ids = content_ids[:-1]
if role == "system":
role_ids = [SYSTEM_ID] + content_ids + [SEP_ID]
role_labels = [IGNORE_INDEX] * len(role_ids)
elif role == "user":
role_ids = [USER_ID] + content_ids + [SEP_ID]
role_labels = [IGNORE_INDEX] * lenrole_ids)
elif role == "assistant":
role_ids = [ASSISTANT_ID] + content_ids + [EOS_ID]
role_labels = [IGNORE_INDEX] + content_ids + [EOS_ID]
input_ids.extend(role_ids)
labels.extend(role_labels)
# 对序列进行截断和填充
input_ids = input_ids[:max_len]
labels = labels[:max_len]
pad_len = max_len - len(input_ids)
input_ids = input_ids + [PAD_ID] * pad_len
labels = labels + [IGNORE_INDEX] * pad_len
return {"input_ids": input_ids, "labels": labels}
第三部分:验证
n_loss_tokens = sum(1 for l in test_formatted['labels'] if l != IGNORE_INDEX)
print(f" 产生损失的标记数量:{n_loss_tokens} / 256")
这表明,只有极少一部分标记(助手生成的文字以及表示对话结束的标记)会影响到损失值。以一个典型的例子来说,你可能会看到类似“产生损失的标记数量:18 / 256”这样的结果,这意味着序列中仅有约7%的部分会用于驱动梯度更新。其余部分(系统提示、用户输入的问题、特殊标记以及填充字符)都被赋予了-100这个值。
正是这种机制使得SFT训练方法效率极高:100%的学习信号都来自于预测助手的实际回答内容,以及判断何时应该停止训练(即识别出表示对话结束的标记
格式化说明
| 组件 | 作用 |
|---|---|
no_padding() |
允许手动控制标记的排列位置 |
| 特殊标记ID | 用于在指定位置插入对话结构标记 |
IGNORE_INDEX = -100 |
PyTorch内置的机制,用于跳过某些位置以避免影响损失计算 |
| 系统/用户输入的标签 → -100 | 这些标签不会被用于训练模型(仅作为上下文信息使用) |
| 助手生成的标签 → 其实际ID | 模型会通过学习这些标签来生成正确的回答内容并判断对话何时结束 |
| 将所有标记长度截断为256个字符 | 使输入数据符合模型的处理要求 |
| 使用-100标签进行填充 | 保证批量数据处理时的对齐性,同时不会影响损失计算结果 |
SFT数据集与数据加载器
class SFTDataset(Dataset):
def __init__(self, conversations: list, max_len: int = 256):
selfexamples = []
for conv in conversations:
formatted = format_conversation(conv, max_len)
self/examples.append(formatted)
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
return (
torch.tensor(self.examples[idx]['input_ids'], dtype=torch.long),
torch.tensor(selfexamples[idx]['labels'], dtype=torch.long),
)
这个类将全部79个格式化后的对话数据封装成了一个PyTorch数据集。在初始化时,它会使用format_conversation()方法对每个对话进行预处理,并将处理结果保存起来。当数据加载器请求索引为idx的训练数据时,它就会返回以张量形式存在的(input_ids, labels)数据。
数据加载器:
sft_loader = DataLoader(sft_dataset, batch_size=4, shuffle=True)
-
batch_size=4:由于我们只有79个训练样本,因此使用较小的批量大小。如果批量规模过大,每个训练周期内进行的梯度更新次数就会减少。 -
shuffle=True:在每个训练周期内随机打乱数据顺序,这样模型就不会记住固定的训练数据序列。
加载预训练模型
model = UrduGPT(config).to(device)
checkpoint = torch.load("best_model.pt", map_location=device)
state_dict = checkpoint['model_state_dict']
# 名称映射(Google Colab与本地代码之间的对应关系)
name_mapping = {
'token_emb.weight': 'token_embedding.weight',
'pos_emb.weight': 'positionembedding.weight',
'ln_f.weight': 'ln_final(weight)',
'ln_f.bias': 'ln_final.bias',
'head.weight': 'lm_head.weight',
}
这段代码会创建一个新的UrduGPT模型,并从第三阶段训练中加载预训练得到的权重。
你可能会问:为什么需要这种名称映射呢?因为该模型是在Google Colab上训练的,其中的变量名称与本地代码中的名称有所不同(例如,token_emb与token_embedding就是不同的)。这种映射的作用是将Colab上的命名规范转换为本地代码的命名规范。在load_state_dict方法中设置strict=False,就可以允许某些键的名称不完全匹配也能成功加载数据。
另外,为什么要从预训练模型开始训练呢?因为SFT训练其实是建立在预训练模型的基础之上的。预训练模型已经掌握了Urdu语言的语法、词汇以及相关知识,SFT训练所做的只是教会它如何进行对话。如果从随机权重开始训练,那么就需要消耗更多的数据和进行更长时间的训练。
SFT训练循环
以下是完整的SFT训练循环代码:
SFT_LR = 2e-5
SFT_EPOCHS = 50
optimizer = torch.optim.AdamW(model.parameters(), lr=SFT_LR, weight_decay=0.01)
sft_history = {'loss': []}
best_loss = float('inf')
for epoch in range(SFT_EPOCHS):
model.train()
epoch_loss = 0
n_batches = 0
for input_ids, labels in sft_loader:
input_ids = input_ids.to(device)
labels = labels.to(device)
outputs = model(input_ids)
logits = outputs['logits']
shift_logits = logits[:, :-1, :].contiguous()
shift_labels = labels[:, 1:].contiguous()
loss = F.cross_entropy(
shift_logits.view(-1, shift_logits.size(-1)),
shift_labels.view(-1),
ignore_index=IGNORE_INDEX,
)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
avg_loss = epoch_loss / n_batches
sft_history['loss'].append(avg_loss)
if avg_loss < best_loss:
best_loss = avg_loss
torch.save({
'model_state_dict': model.state_dict(),
'config': config.__dict__,
'epoch': epoch + 1,
'loss': avg_loss,
}, "sft_model.pt")
if (epoch + 1) % 10 == 0 or epoch == 0:
print(f"第{epoch+1}轮训练/总共{SFT_EPOCHS}轮 | 损失值:{avg_loss:.4f}")
print(f"SFT训练已完成!最佳损失值:{best_loss:.4f}")
为什么这些超参数与预训练阶段使用的参数不同呢?……
| 参数 | 预训练阶段 | SFT方法 | 为何存在差异 |
|---|---|---|---|
| 学习率 | 3e-4 | 2e-5 | 较低的学习率能够有效防止“灾难性遗忘”现象;过高的学习率会抹去在预训练阶段学到的知识。 |
| 训练周期数 | 3 | 50 | 仅使用79个示例进行训练,而普通方法需要使用数百万条数据;因此该模型需要经过多次训练才能掌握对话模式。 |
| 权重衰减系数 | 0.1 | 0.01 | 由于我们希望模型能够紧密地适应这些特定示例,因此所需的正则化强度较低。 |
| 学习率调整策略 | 余弦函数衰减法 | 恒定值 | 这种调整方法对于小规模数据的微调来说简单且有效。 |
以下是每个训练批次中的具体步骤:
# 正向传播过程,不使用目标标签;损失值由我们手动计算
outputs = model(input_ids)
logits = outputs['logits']
# 为下一个词元的预测做数据调整
shift_logits = logos[:, :-1, :].contiguous() # 用于预测位置0到254的词元
shift_labels = labels[:, 1:].contiguous() # 用于对应位置1到255的目标标签
# 使用掩码计算损失值
loss = F.cross_entropy(
shift_logits.view(-1, shift logits.size(-1)),
shift_labels.view(-1),
ignore_index=IGNORE_INDEX, # 跳过索引-100的位置
)
与预训练过程有一个关键区别:在预训练中,我们会直接将目标标签传递给`model(input_ids, targets)`,这样模型就会对所有词元自动计算损失值。而在这里,我们需要手动计算损失值,因此才能使用`ignore_index=-100`来忽略那些不需要被考虑的词元。
数据调整的原理: `logits[:, :-1]`和`labels[:, 1:]`这两个数组分别包含了用于预测下一个词元的信息。模型对位置`i`的预测结果会与实际位置`i+1`上的词元进行比较。
反向传播与参数更新:
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
这一过程与预训练时相同:首先清除梯度值,然后进行反向传播计算,接着通过梯度裁剪来避免数值不稳定,最后更新模型参数。由于当前阶段是在对模型进行微调,而且在一些小规模数据集上某些梯度的幅度可能会很大,因此将梯度裁剪的范围设定为1.0尤为重要。
检查点保存:
if avg_loss < best_loss:
torch.save({'model_state_dict': model.state_dict(), ...}, "sft_model.pt")
只要训练损失值有所改善,就立即保存模型状态。与预训练不同,我们没有单独的验证集(79个样本数量太少,无法进行划分),因此我们是根据训练损失值来决定是否保存模型的。
聊天功能:推理过程
以下是完整的聊天功能实现代码:
def chat(model, tokenizer, user_message: str, system_prompt: str = None,
max_tokens: int = 100, temperature: float = 0.7) -> str:
"""生成聊天响应。"""
model.eval()
if system.prompt is None:
systemprompt = SYSTEM_PROMPT
# 构建输入提示序列
prompt_ids = [BOS_ID, SYSTEM_ID]
sys_ids = tokenizer.encode(system_prompt).ids
if sys_ids and sys_ids[0] == BOS_ID:
sys_ids = sys_ids[1:]
if sys_ids and sys_ids[-1] == EOS_ID:
sys_ids = sys_ids[:-1]
prompt_ids.extend(sys_ids)
prompt_ids.append(SEP_ID)
prompt_ids.append(USER_ID)
user_ids = tokenizer.encode(user_message).ids
if user_ids and user_ids[0] == BOS_ID:
user_ids = user_ids[1:]
if user_ids and user_ids[-1] == EOS_ID:
user_ids = user_ids[:-1]
prompt_ids.extend(user_ids)
prompt_ids.append(SEP_ID)
prompt_ids.append(ASSISTANT_ID)
# 生成聊天响应
input_tensor = torch.tensor([prompt_ids], dtype=torch.long).to(device)
with torch.no_grad():
output_ids = model.generate(
input_tensor,
max_new_tokens=max_tokens,
temperature=temperature,
top_k=50,
top_p=0.9,
eos_token_id=EOS_ID,
)
# 只解码生成的聊天内容
generated_ids = output_ids[0][len(prompt_ids):].tolist()
if EOS_ID in generated_ids:
generated_ids = generated_ids[:generated_ids.index(EOS_ID)]
return tokenizer.decode(generated_ids)
以下是具体的步骤说明:
1. 构建提示信息:
prompt_ids = [BOS_ID, SYSTEM_ID]
prompt_ids.extend(sys_ids) # 系统提示内容
prompt_ids.append(SEP_ID)
prompt_ids.append(USER_ID)
prompt_ids.extend(user_ids) # 用户输入内容
prompt_ids.append(SEP_ID)
prompt_ids.append(ASSISTANT_ID) # “现在开始回答…”
这样构建出的格式与模型在SFT训练过程中看到的格式完全一致:
<bos><|system|>>你是一个助手……<|user|>>巴基斯坦的首都是哪里?<|assistant|>
模型会识别到<|assistant|>>这个标记,从而知道“现在应该开始生成回答了”,因为在SFT训练中,模型已经学会了在遇到这个标记之后应该输出什么内容。
2. 采用自回归方式生成答案:
with torch.no_grad():
output_ids = model.generate(
input_tensor,
max_new_tokens=max_tokens,
temperature=temperature,
top_k=50,
top_p=0.9,
eos_token_id=EOS_ID,
)
-
torch.no_grad():推理过程中不需要计算梯度,这样可以节省内存并提高运行速度。 -
temperature=0.7:适当调整分布概率,使生成的答案更加自然,而不会显得过于机械。 -
top_k=50:只选取前50个最有可能出现的词来生成答案,从而避免生成低概率的随机内容。 -
top_p=0.9:采用核心采样算法,仅选择累积概率大于或等于0.9的词来生成答案。 -
eos_token_id:当生成到标记时,停止生成答案。
3. 提取并解码生成的文本:
generated_ids = output_ids[0][len(prompt_ids):].tolist() # 只保留新生成的词
if EOS_ID in generated_ids:
generated_ids = generated_ids[:generated_ids.index(EOS_ID)] # 去掉标记
return tokenizer.decode(generated_ids)
我们去掉提示信息部分,去除标记,然后将生成的词序列解码回乌尔都语文本。
5. 部署模型
至此,你已经成功构建了自己的大语言模型。这是一个非常重要的里程碑。但还有一个问题需要解决:虽然这个模型在你的机器上可以正常运行,但它仍然只能在你自己的设备上使用。
为了让其他人也能使用这个模型,我们需要将其部署到外部环境中,并提供相应的交互界面。
在探索部署方案的过程中,我发现了Gradio这个工具。它提供了一个简单直观的接口,可以帮助我们轻松地部署机器学习模型和应用程序。Gradio还能直接与Hugging Face Spaces集成,让我们几乎不需要进行任何额外的配置就能免费托管模型。
Gradio网页界面(app.py)
app.py文件将所有内容整合到了一起:它加载了分词器和模型,定义了chat()函数,并启动了Gradio用户界面。模型的加载过程以及chat()>函数的实现方式与我们在SFT章节中介绍的内容完全相同,因此在这里我们只展示与Gradio相关的部分:
import gradio as gr
def respond(message, history):
if not message.strip():
return "请输入一些内容。"
return chat(message)
demo = gr.ChatInterface(
fn=respond,
title="🇵🇰 阿尔杜语大语言模型聊天机器人",
description="""
### 这是一个从零开始构建的阿尔杜语模型
**一个小型阿尔杜语模型,包含约2300万个参数**
""",
examples=[
"愿安宁降临于您。",
"巴基斯坦的首都是哪里?",
"请介绍一下拉合尔。",
"布里安尼是如何制作的?",
"板球是怎么玩的?",
"月亮为什么會发光?",
="斋月是什么?",
"艾勒·加扎利是谁?",
"如何才能保持快乐?",
"您是谁?",
],
theme=gr.themes.Soft(),
)
if __name__ == "__main__":
demo.launch()
-
respond()函数为chat()添加了一个空输入检查机制,这样就能满足Gradio的ChatInterface接口所要求的格式。 -
gr.ChatInterface提供了一个功能完备的聊天界面,其中包含了消息历史记录、输入框和发送按钮。 -
examples中列出了一些预填充好的测试用例,用户可以点击这些示例来尝试与机器人进行对话。 -
theme=gr.themes.Soft()使得界面看起来更加简洁、现代。
注意: Hugging Face Spaces会将app.py作为独立的脚本来运行,因此仓库中的app.py文件将所有的代码整合到了一个文件中:包括模型配置、完整的Transformer架构、使用gc.collect()>进行内存优化的模型加载过程、chat()>函数,以及上面的Gradio界面代码。
由于这些内容在预训练和SFT章节中已经有过介绍,因此这里不再重复赘述。
本地运行方式:
python app.py
# 程序将在http://127.0.0.1:7860地址上启动
部署选项
选项A:Hugging Face Spaces(免费,推荐使用)
Hugging Face Spaces为Gradio应用程序提供了免费的CPU托管服务。
需要上传的文件:
urdu-llm-chat/
├── app.py # Gradio网页界面代码
├ ├── requirements.txt # 所需依赖库列表
├ ├── README.md # 项目元数据文件
├ ├── model/
│ ├── __init__.py
│ ├── config.py
│ ├── transformer.py
│ └── checkpoints/sft_model.pt # 训练好的模型权重文件(约90MB)
└── tokenizer/
└── urdu_tokenizer/
└── urdu_bpe_tokenizer.json
工作原理:
-
在huggingface.co上创建一个免费账户。
-
创建一个新的空间(SDK:Gradio,硬件:CPU Basic)。
-
通过git上传文件:`git clone https://huggingface.co/spaces/USERNAME/urdu-llm-chat`
-
将项目文件复制到克隆后的仓库中,然后进行推送。
-
Hugging Face会自动安装依赖项,并运行`app.py`。
-
你的模型已经可以在`https://huggingface.co/spaces/USERNAME/urdu-llm-chat`上使用了。
为什么使用CPU就可以了:我们的模型仅有2300万个参数(约90MB大小)。在CPU上进行推理所需时间不到1秒,因此不需要GPU来进行服务。
选项B:本地运行
cd your-project-directory
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python app.py
该程序会在`http://127.0.0.1:7860`上运行,任何安装了Python 3.9及以上版本的机器都可以使用它。
选项C:终端聊天模式(无用户界面)
这是一种轻量级的解决方案,不需要依赖Gradio,只需通过终端进行输入和输出操作。程序会加载模型并进入交互式循环:
"""
用于Urdu LLM的独立聊天推理脚本
使用方法:
python inference/chat.py
"""
import sys
import torch
from pathlib import Path
from tokenizers import Tokenizer
# 将项目根目录添加到路径中
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from model.config import UrduLLMConfig
from model.transformer import UrduGPT
def load_model(checkpoint_path: str, device: str = None):
"""加载经过微调的模型。"""
if device is None:
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
device = torch.device(device)
config = UrduLLMConfig()
model = UrduGPT(config).to(device)
checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()
return model, config, device
def chat_response(model, tokenizer, config, device, user_message,
system_prompt="您是一位辅助性的乌尔都语助手。",
max_tokens=100, temperature=0.7):
"""生成聊天回复。"""
BOS_ID = tokenizer.token_to_id("<bos>")
EOS_ID = tokenizer.token_to_id("<eos>"
SEP_ID = tokenizer.token_to_id("<sep>")
USER_ID = tokenizer.token_to_id("<|user|>")
ASSISTANT_ID = tokenizer.token_to_id("<|assistant|>?")
SYSTEM_ID = tokenizer.token_to_id("<|system|>")
# 构建提示语
prompt_ids = [BOS_ID, SYSTEM_ID]
sys_ids = tokenizer.encode(system.prompt).ids
if sys_ids and sys_ids[0] == BOS_ID: sys_ids = sys_ids[1:]
if sys_ids and sys_ids[-1] == EOS_ID: sys_ids = sys_ids[:-1]
prompt_ids.extend(sys_ids)
prompt_ids.append(SEP_ID)
prompt_ids.append(USER_ID)
user_ids = tokenizer.encode(user_message).ids
if user_ids and user_ids[0] == BOS_ID: user_ids = user_ids[1:]
if user_ids and user_ids[-1] == EOS_ID: user_ids = user_ids[:-1]
prompt_ids.extend(user_ids)
prompt_ids.append(SEP_ID)
prompt_ids.append(ASSISTANT_ID)
# 生成回复
input_tensor = torch.tensor([prompt_ids], dtype=torch.long).to(device)
output_ids = model.generate(
input_tensor,
max_new_tokens=max_tokens,
temperature=temperature,
top_k=50,
top_p=0.9,
eos_token_id=EOS_ID,
)
generated_ids = output_ids[0][len(prompt_ids):].tolist()
if EOS_ID in generated_ids:
generated_ids = generated_ids[:generated_ids.index(EOS_ID)]
return tokenizer.decode(generated_ids)
def main():
print("=" * 60)
print("🇵🇰 乌尔都语LLM聊天机器人 🇵🇰")
print(" Urdu LLM ChatBot")
print "=" * 60)
# 加载模型
tokenizer_path = PROJECT_ROOT / "tokenizer" / "urdu_tokenizer" / "urdu_bpe_tokenizer.json"
# 先尝试使用SFT模型,如果不行则使用预训练模型
sft_path = PROJECT_ROOT / "model" / "checkpoints" / "sft_model.pt"
pretrained_path = PROJECT_ROOT / "model" / "checkpoints" / "best_model.pt"
if sft_path.exists():
checkpoint_path = sft_path
print("正在加载SFT模型...")
elif pretrained_pathexists():
checkpoint_path = pretrained_path
print("正在加载预训练模型(未针对聊天功能进行微调)...")
else:
print("❌ 未找到模型检查点!")
print(" 请先运行笔记簿03和04来训练模型。")
sys.exit(1)
model, config, device = load_model(str(checkpoint_path))
tokenizer = Tokenizer.from_file(str(tokenizer_path))
print(f"模型已加载到{device}上。")
print("\n请用乌尔都语输入您的消息。输入'quit'即可退出。\n")
print("-" * 60)
while True:
try:
user_input = input("\n👤 您:").strip()
except (EOFError, KeyboardInterrupt):
print("\n再见!👋")
break
if user_input.lower() in ['quit', 'exit', 'q']:
print("再见!👋")
break
if not user_input:
continue
response = chat_response(model, tokenizer, config, device, user_input)
print(f"🤖 机器人回复:{response}")
if __name__ == "__main__":
main()
运行方式如下:
python inference/chat.py
👤 用户:愿您平安
🤖 机器人:愿您平安!我能为您做些什么吗?
完整流程概述
| 阶段 | 描述 |
|---|---|
| 阶段1 | 原始乌尔都语文本 → 清洗后的语料库 |
| 阶段2 | 语料库 → 使用BPE分词器进行分词(词汇表规模为32K) |
| 阶段3 | 分词后的文本片段 → 对GPT模型进行预训练(参数数量为2300万,用于预测下一个单词) |
| 阶段4 | 对话数据 → 使用损失掩码进行监督微调 → 最终形成聊天模型 |
| 部署阶段 | app.py + 模型权重 → 部署到Hugging Face Spaces或本地环境中 |
您可以在这里查看已部署的模型:https://huggingface.co/spaces/Wisamul/urdu_llm
测试结果
测试结果相当有趣。由于我们的训练样本数量较少,模型出现“幻觉”现象的概率较高;尤其是对于那些懂乌尔都语的人来说,这种“幻觉”效果会更加明显。不过总体而言,模型的运行效果还是不错的。
👤 用户:愿您平安
🤖 机器人:愿您平安!我能为您做些什么吗?
👤 用户:巴基斯坦的首都是哪里?
🤖 机器人:巴基斯坦的首都是伊斯兰堡。这座城市建于20世纪60年代。
👤 用户:教育的重要性是什么?
🤖 机器人:教育是人类发展的基础。它不仅是获取知识的途径,还在社会建设中发挥着重要作用。
对于那些与训练数据相关的查询,模型能够准确且流畅地给出回答;而对于那些超出训练数据范围的查询,模型则容易出现“幻觉”现象或重复某些内容。考虑到我们的SFT数据集规模较小(仅包含79个样本),以及模型的参数数量为2300万,这种结果也在意料之中。
结论
我们进行这一切的目的,就是探讨并解释构建大型语言模型所涉及的所有步骤。希望这些内容能帮助大家理解为什么不是每个人都能自己开发大型语言模型——成本、训练过程、数据收集等因素都是制约因素。同时,通过这个学习过程,我们也更加认识到前人所付出的努力以及我们目前取得的成果。
我们的工作流程是从原始乌尔都语文本开始,依次经历了数据清洗、分词处理、对GPT风格的模型进行预训练、使用损失掩码进行监督微调,最后还开发了Gradio网页界面。
虽然这个模型的规模较小,数据集也有限,但其中涉及的每一个技术概念(注意力机制、下一个单词的预测、监督微调、聊天对话的格式化等),都是构建像GPT-4和Llama这样的大型语言模型的基础——只不过在实际应用中,这些技术的规模要大得多。
如果你们想要进一步改进这个模型,接下来可以尝试以下措施:
-
更多的SFT数据(数千个训练样本,而非原有的79个);
-
规模更大的模型(拥有1亿个以上的参数);
-
以及RLHF/DPO技术之间的协同作用。
但即便在这样的规模下,你也已经对整个大语言模型的开发流程有了清晰的认识。

![如何在 Flutter 中使用混合组件——[完整手册] 如何在 Flutter 中使用混合组件——[完整手册]](https://cdn.hashnode.com/uploads/covers/63a47b24490dd1c9cd9c32ff/26c1c13b-8a54-4b4c-8b46-c292be780b65.png)