源碼解讀系列將和大家一起來讀Megatron的pretrain部分代碼。
在源碼解讀第一篇中,我們講解了如何做「分布式環(huán)境初始化」,即按照DP/TP/PP對進(jìn)程進(jìn)行分組,并為每個進(jìn)程指定GPU。在這一章中,我們將一起讀「模型并行部分」:如何切分模型,并搬入分布式環(huán)境定義好的DP/TP/PP組中。
「本文將提供:」
- 詳細(xì)的圖解。畫圖說明代碼的設(shè)計架構(gòu),講清代碼想做一件什么事。
- 詳細(xì)的代碼注釋。在圖解的基礎(chǔ)上,提取核心代碼部分,并附上注釋。
「如何利用本文提高源碼閱讀效率:」
- 先看一~三部分。了解模型并行的設(shè)計思想、整體框架及入口函數(shù)。
- 打開Megatron源碼,找到入口函數(shù),開始閱讀。
- 閱讀中的每一塊細(xì)節(jié),可參考四~八部分。
「閱讀本文前置知識:」
- 圖解大模型訓(xùn)練之:張量模型并行,Megatron-LM
- 圖解大模型訓(xùn)練之:Megatron源碼解讀1,分布式環(huán)境初始化
「本文目錄:」
一、模型概述
二、模型切割在做一件什么事
- 2.1 模型切割設(shè)計思想
- 2.2 隨機(jī)種子
三、模型并行框架
- 3.1 模型并行入口函數(shù)
- 3.2 定義并搬運模型
- 3.3 分布式模型:CodeGeeX
四、MegatronModule
五、Emebdding
六、VocabParallelEmebdding
七、ParallelSelfAttention:分布式block的一般套路
- 7.1 列切割:ColumnParallelLinear
- 7.2 行切割:RowParallelLinear
- 7.3 ParallelSelfAttention
八、CrossEntropy
- 8.1 計算logit
- 8.2 計算交叉熵
九、筋疲力盡的總結(jié)
十、參考(本文相關(guān)源碼與論文)
一、模型概述
前文說過,用Megatron做分布式訓(xùn)練的開源大模型有很多,我們選用的是THUDM開源的CodeGeeX(代碼生成式大模型,類比于openAI Codex)。選用它的原因是“完全開源”與“清晰的模型架構(gòu)和預(yù)訓(xùn)練配置圖”,能幫助我們高效閱讀源碼。我們再來回顧下這兩張圖。
「模型架構(gòu)」
「預(yù)訓(xùn)練配置」
由圖可知,CodeGeeX在預(yù)訓(xùn)練中采用的是8頭TP(同一個node內(nèi)的8張卡做TP,8張卡組成一個完整的模型),192頭DP(192個node間做DP),一共1536塊GPU進(jìn)行訓(xùn)練。
「【閱讀提醒】:如果你對GPT模型比較熟悉,則不需要花時間細(xì)看CodeGeeX架構(gòu)圖也能無障礙閱讀本文。架構(gòu)圖只是在涉及模型細(xì)節(jié)時,可以對照著看?!?/strong>
二、模型切割在做一件什么事
2.1 模型切割設(shè)計思想
回顧一下,在初始化分布式環(huán)境中,我們根據(jù)DP/TP/PP組設(shè)置并劃分了進(jìn)程,確定了模型的切割方法,如下圖:(注意:這并不是CodeGeeX的劃分框架,而是一個更廣義的例子,細(xì)節(jié)可閱讀上篇講解)
接下來,我們就可以根據(jù)這個框架來切割模型了。pytorch默認(rèn)將模型(nn.Module)定義在CPU上,因此,我們在CPU上定義并初始化模型,然后將其搬運到當(dāng)前進(jìn)程所對應(yīng)的GPU上,整個過程如下圖:首先,我們是面向進(jìn)程編程的,也就是整份腳本處理的是發(fā)生在1個進(jìn)程上的事情。這樣做的好處是,我們只需要維護(hù)1份腳本,然后將其發(fā)去不同機(jī)器的各張卡上執(zhí)行,就能實現(xiàn)全局的并行。
但是,1個進(jìn)程處理的是模型的不同部分,比如GPT模型,它的pre層涉及到Embedding計算,post層涉及到softmax和loss的計算,這樣每個進(jìn)程上處理的模型是不一樣的,這時怎么辦呢?別忘了,我們能夠取到進(jìn)程id(全局或DP/TP/PP組內(nèi)的),這樣我們就能通過進(jìn)程id,寫if...else...
來解決模型差異化問題了。
明確了這個思想,現(xiàn)在我們可以開始寫代碼了,我們有兩種方式對模型進(jìn)行切割:
- 「方案一:」先定義出完整的模型,并對模型參數(shù)做初始化,然后根據(jù)進(jìn)程id取出相應(yīng)子模型,搬運到GPU上
- 「方案二:」直接根據(jù)進(jìn)程id,設(shè)計好當(dāng)前子模型,做參數(shù)初始化,搬運到GPU上
這兩者的核心差別,在于“隨機(jī)種子”
的設(shè)定。
2.2 隨機(jī)種子
在分布式訓(xùn)練中,「隨機(jī)種子是非常重要的,它關(guān)系到模型是否能夠復(fù)現(xiàn)」。例如我們采取activation checkpoint的技術(shù)來節(jié)省顯存時,在backward過程中我們需要重算forward得到activation,這時候就需要我們完整復(fù)現(xiàn)之前forward的過程,各類參數(shù)的初始化結(jié)果也要和之前完全一致。
我們來看幾個例子:
例1: Word Embedding
WE1和WE2間需要采用不同的隨機(jī)種子。因為若采用相同的隨機(jī)種子,則WE1和WE2的結(jié)果完全一樣,這不等價于先隨機(jī)初始化WE,再將它進(jìn)行切割。
例2: dropout
左側(cè)方框中的2個dropout,在初始化時需要用不同的隨機(jī)種子。因為這樣才等價于對完整的dropout做初始化,然后再切割。右側(cè)方框中的dropout,需要用相同的隨機(jī)種子(雖然右邊只畫了1個dropout,但其實是2個dropout,每塊GPU上各一個,因為此時兩塊GPU上的輸出已經(jīng)AllReduce,是完全一致的。做完AllReduce后,兩塊GPU繼續(xù)獨立計算,因此實際上有兩個dropout)。
關(guān)于隨機(jī)種子設(shè)定的一般結(jié)論
從例子中,我們可以得出一個結(jié)論:「一般在TP/PP組內(nèi),設(shè)定不同的隨機(jī)種子。而在DP組內(nèi),設(shè)定相同的隨機(jī)種子?!?/strong> 這只是一個一般結(jié)論,我們可以根據(jù)實際情況去調(diào)整。
最后,回到模型切割上,方案1(先做整體初始化再切割)在代碼里被稱為“CPU上的初始化”(_initialize_affine_weight_cpu
),方案2(直接對局部初始化)被稱為“在GPU上的初始化”(_initialize_affine_weight_gpu
)。我們會在切割部分的代碼里經(jīng)??匆娝鼈?。
三、模型并行框架
現(xiàn)在,我們可以來看具體的代碼了
3.1 模型并行入口函數(shù)
模型并行部分的代碼入口依然在megatron/training.py
的pretrain
函數(shù)下,代碼如下:
defpretrain(
train_valid_test_dataset_provider,
model_provider,
forward_step_func,
valid_forward_step_func=None,
extra_args_provider=None,
args_defaults={},
):
#1.初始化分布式環(huán)境(源碼解讀1內(nèi)容)
initialize_megatron(
extra_args_provider=extra_args_provider,args_defaults=args_defaults
)
...
# 2、模型并行:定義模型架構(gòu),并切割模型(本文重點)
model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider)
...
#3、構(gòu)造train/val/test數(shù)據(jù)集(下一篇將講述)
...(
train_data_iterator,
valid_data_iterator,
test_data_iterator,
)=build_train_valid_test_data_iterators(train_valid_test_dataset_provider)
...
#4、訓(xùn)練(下下一篇將講述)
iteration=train(
forward_step_func,
valid_forward_step_func,
model,
optimizer,
lr_scheduler,
train_data_iterator,
valid_data_iterator,
)
...
由代碼可知,setup_model_and_optimizer
是整個模型并行的入口函數(shù),如下圖,它主要由”「定義模型架構(gòu)并切割模型」“,“「設(shè)置optimizer」”和“「設(shè)置學(xué)習(xí)率」”三部分組成。我們關(guān)注的重點在第一部分上(get_model
)。
3.2 定義并搬運模型
get_model
的內(nèi)容可簡化成下圖:
get_model
函數(shù)主要做了兩件事:
-
在CPU上定義模型。pytorch默認(rèn)在CPU上定義模型(nn.Module)。
model_provider
是一個函數(shù),調(diào)用它即可返回CPU版的模型,也就是一個CodeGeeX類,這個將是下文要介紹的重點。 -
把模型從CPU搬運至GPU上。這里有兩種方法可供選擇:
- 「顯式搬運?!?/strong>即手動將模型搬運到當(dāng)前進(jìn)程所對應(yīng)的GPU上
- 「權(quán)重精度設(shè)定?!?/strong>由ZeRO的思想可知,在模型訓(xùn)練中,把權(quán)重精度從fp32降至fp16,是一種節(jié)省顯存的好辦法。如果確定使用這種優(yōu)化辦法,將模型搬運到GPU上后,我們需要修改精度。
-
「初始化DP組」。這里指的是
定義DP組間forward、backward和梯度計算與通訊等方法
。在Megatron中,TP和PP組的這些方法是人為定義的
(在定義CPU模型時已設(shè)置好,我們將在下文講CodeGeeX細(xì)節(jié)時看到),而DP組則是可以用現(xiàn)成的
(torch的DistributedDataParallel)。在具體使用時,我們可以:(1)直接調(diào)用DistributedDataParallel?;颍?)在DistributedDataParallel這個類的基礎(chǔ)上做一些改進(jìn),例如增加對碎片化內(nèi)存的管理,對計算梯度時的精度控制等。
- 「方案一:借助deepspeed進(jìn)行管理」。在源碼解讀1中我們提過,秉持著萬物皆可wrap的原則,按照deepspeed官網(wǎng)教程,只需要在Megatron的某些文件中插入相應(yīng)代碼,就可以讓deepspeed來管理模型的分布式、DP組間的顯存優(yōu)化等,這里同理。
- 「方案二:手動搬運管理?!?/strong>這里需要我們以下事情:
get_model函數(shù)的核心代碼如下(一切盡在注釋中):
defget_model(model_provider_func):
"""Buildthemodel."""
args=get_args()
#1、定義并構(gòu)建CPU版模型
if(#1.1、當(dāng)分布式進(jìn)行框架采用virtualpipeline(是NVDIA后續(xù)提出的對Megatron的優(yōu)化方法,可先忽略不看)
mpu.get_pipeline_model_parallel_world_size()>1
andargs.virtual_pipeline_model_parallel_sizeisnotNone
):
model=[]
foriinrange(args.virtual_pipeline_model_parallel_size):
mpu.set_virtual_pipeline_model_parallel_rank(i)
#Setpre_processandpost_processonlyaftervirtualrankisset.
pre_process=mpu.is_pipeline_first_stage()
post_process=mpu.is_pipeline_last_stage()
this_model=model_provider_func(
pre_process=pre_process,post_process=post_process
)
model.append(this_model)
else:#1.2其余情況
#判斷當(dāng)前進(jìn)程是否是PP組的第一個進(jìn)程(例如第一部分圖例中PP組的g0)
pre_process=mpu.is_pipeline_first_stage()
#判斷當(dāng)前進(jìn)程是否是PP組的最后一個進(jìn)程(例如第一部分圖例中PP組的g12)
post_process=mpu.is_pipeline_last_stage()
#構(gòu)建CPU版CodeGeeX模型
model=model_provider_func(pre_process=pre_process,post_process=post_process)
...
#2、將模型從CPU搬運到GPU上
#2.1如果采用Megatron-DeepSpeed的方式,則直接返回模型,后面的搬運,數(shù)據(jù)并行等工作將由deepspeed來完成
# ref:https://www.deepspeed.ai/tutorials/megatron/
ifargs.deepspeed:
returnmodel
#將當(dāng)前進(jìn)程所維護(hù)的模型,從CPU搬運到GPU上(GPU即為在初始化時為當(dāng)前進(jìn)程分配的那塊GPU)
print(f">movingmodeltoGPU...",flush=True)
formodel_moduleinmodel:
model_module.cuda(torch.cuda.current_device())
print(f">movingtoGPUdone",flush=True)
#fp16轉(zhuǎn)換(pytorch默認(rèn)模型參數(shù)精度為fp32,依需決定計算過程中是否要轉(zhuǎn)成fp16,節(jié)省顯存)
ifargs.fp16orargs.bf16:
print(f">convertingmodeltofp16...",flush=True)
model=[Float16Module(model_module,args)formodel_moduleinmodel]
print(f">convertingtofp16done",flush=True)
#采用pytorch定義的DistributedDataParallel管理數(shù)據(jù)并行
ifargs.DDP_impl=="torch":
i=torch.cuda.current_device()
model=[
torchDDP(
model_module,
device_ids=[i],
output_device=i,
process_group=mpu.get_data_parallel_group(),#數(shù)據(jù)并行的組
)
formodel_moduleinmodel
]
returnmodel
#采用自定義的DistributedDataParallel管理數(shù)據(jù)并行
#即在pytorch的DistributedDataParallel的基礎(chǔ)上,自己再定義內(nèi)存管理、梯度精度等計算方式,更有效利用顯存
ifargs.DDP_impl=="local":#自定義的數(shù)據(jù)并行類在megatron/model/distributed.py下
print(f">creatingDDPmodel...",flush=True)
model=[
LocalDDP(
model_module,
args.accumulate_allreduce_grads_in_fp32,
args.use_contiguous_buffers_in_ddp,
)
formodel_moduleinmodel
]
print(f">creatingDDPmodeldone",flush=True)
returnmodel
raiseNotImplementedError(
"UnknownDDPimplementationspecified:{}.""Exiting.".format(args.DDP_impl)
)
特別說明的是,前文提過模型的首尾兩層和中間層的架構(gòu)可能不一樣,因此我們通過pre_process 和post_process來做區(qū)分。(當(dāng)然你也能選擇用進(jìn)程序id,只是首尾兩層經(jīng)常被Q到,所以這里單獨明確了下)。對CodeGeeX來說,由它預(yù)訓(xùn)練配置可知,它的PP并行度為1,也就是1塊GPU上涵蓋了模型的第一層至最后一層,所以pre_process和post_process實際上沒有用到。感興趣的朋友可以閱讀NVIDIA Megatron源碼下關(guān)于bert、gpt2的預(yù)訓(xùn)練代碼,具體了解pre_process和post_process在定義模型時起的作用。
3.3 分布式模型:CodeGeeX
現(xiàn)在,我們來看最核心的分布式模型:CodeGeeX類。
前文說過,1個腳本處理的是1個進(jìn)程上發(fā)生的事情,而1個進(jìn)程對應(yīng)的是模型的一部分。單進(jìn)程的架構(gòu)如下:
圖中每個方框都表示源碼里定義的一個nn.Module 類(除了最上的方框外)具體定義為:
-
CodeGeeX
: 定義一塊GPU上的模型。它由TransformerLanguageModel 和_VocabParallelCrossEntropy這兩個核心類組成。 -
TransformerLanguageModel
:定義每塊GPU上輸入層embedding和中間block層的結(jié)構(gòu) -
Embedding
: 定義每塊GPU上輸入層embedding結(jié)構(gòu)及相關(guān)計算,輸出結(jié)果已AllReduce(TP組間) -
ParallelTransformer
:定義每塊GPU上所有中間blocks的結(jié)構(gòu)及相關(guān)計算,輸出結(jié)果已AllReduce(TP組間) -
ParallelTransformerLayer
: 定義每塊GPU上單個block的結(jié)構(gòu)及相關(guān)計算,輸出結(jié)果已AllReduce(TP組間) -
ParallelSelfAttention
: 定義每塊GPU上單個block中,attention的結(jié)構(gòu)及相關(guān)計算,輸出結(jié)果已AllReduce(TP組間) -
ParallelMLP
: 定義每塊GPU上單個block中,mlp層的結(jié)構(gòu)及相關(guān)計算,輸出結(jié)果已AllReduce(TP組間)。 -
_VocabParallelCrossEntropy
: torch.autograd.Function,定義每塊GPU上,輸出層embedding、softmax和loss等結(jié)構(gòu)及相關(guān)計算。
「為什么需要對輸出做AllReduce?」回顧Megtron理論部分的講解,在縱向切割模型時,Megatron是在輸入X完整的情況下,設(shè)計模型切割的方式的。因此,對于模型的每一層輸出,我們都要在TP組間做AllReduce,來保證下一層拿到的輸入也是完整的。類名字中的"Parallel",也是指在TP組中做并行
,如下圖所示:
到這一步,我們終于把模型切割部分的整體流程講完了。「雖然我們是以CodeGeeX為例,但這個流程圖可以看作是通用的?!?/strong>不同模型間只有模型具體結(jié)構(gòu)、DP/TP/PP組設(shè)置這些方面的差別,整個并行框架是通用的。下面,我們來探究圖中所繪的各個類的細(xì)節(jié)。
四、MegatronModule
上面所繪制的幾類,并不是直接繼承自nn.Module ,而是皆繼承于自定義的class MegatronModule(torch.nn.Module)
。我們說過,gpt類模型,輸入和輸出層共用一個word embedding。因此,這個類的主要作用,就是令PP組的第一個進(jìn)程和最后一個進(jìn)程滿足這個條件(不過我不懂為什么要把這個限制放在一個大母類中去做,設(shè)計上感覺有點奇怪)。MegatronModule類的整體架構(gòu)如下:
特別說明,「initialize_word_embedding 并不是某一具體的初始化WE方法,它只是起到如圖所說的強(qiáng)制作用?!?/strong>
MegatronModule的代碼如下(一切盡在注釋中):
classMegatronModule(torch.nn.Module):
"""MegatronspecificextensionsoftorchModulewithsupport
forpipelining."""
def__init__(self,share_word_embeddings=True):
super(MegatronModule,self).__init__()
#input和output是否要共享一套WE
self.share_word_embeddings=share_word_embeddings
defstate_dict_for_save_checkpoint(
self,destination=None,prefix="",keep_vars=False
):
"""Usethisfunctiontooverridethestatedictfor
savingcheckpoints."""
#模型訓(xùn)練中,及時將參數(shù)保存到指定位置(設(shè)置checkpoint),
#這樣在訓(xùn)練出問題時,可以從checkpoint點重新load參數(shù),繼續(xù)訓(xùn)練
returnself.state_dict(destination,prefix,keep_vars)
defword_embeddings_weight(self):
"""獲取word_embedding"""
ifmpu.is_pipeline_first_stage(ignore_virtual=True):
returnself.language_model.embedding.word_embeddings.weight
ifmpu.is_pipeline_last_stage(ignore_virtual=True):
ifnotself.share_word_embeddings:
raiseException(#強(qiáng)制要求共享一套embedding
"word_embeddings_weight()calledforlast"
"stage,butshare_word_embeddingsisfalse"
)
returnself.word_embeddings.weight#參見initialize_word_embeddings中WE的定義
raiseException(#如果當(dāng)前進(jìn)程是PP組的中間進(jìn)程,則其上未維護(hù)WE,因此當(dāng)然獲取不到
"word_embeddings_weight()shouldbe""calledforfirstandlaststageonly"
)
definitialize_word_embeddings(self,init_method_normal):
"""強(qiáng)制PP組最后一個進(jìn)程初始化WE時,直接使用PP組第一個進(jìn)程的WE"""
args=get_args()
ifnotself.share_word_embeddings:#強(qiáng)制shareembeddingg
raiseException(
"initialize_word_embeddings()wascalledbut"
"share_word_embeddingsisfalse"
)
#PP組并行度為1時,第一層和最后一層都在一塊GPU上,天然共享WE,無需做強(qiáng)制
ifargs.pipeline_model_parallel_size==1:
return
#---------------------------------------------------
#如果流水線并行的度不為1時,依次做三件事:
#【初始化時】:
#1、在PP組最后一個進(jìn)程上初始化一個WE,令其取值全為0
#2、在PP組第一個進(jìn)程與最后一個進(jìn)程間做一次AllReduce,保證兩者的WE完全一致
#【訓(xùn)練時】:
#3、每次想在PP組第一個/最后一個進(jìn)程上使用WE時,要做一次通信,保證兩者用的WE完全一致
ifmpu.is_pipeline_last_stage():#若當(dāng)前進(jìn)程是PP組最后一個進(jìn)程
assertnotmpu.is_pipeline_first_stage()
self._word_embeddings_for_head_key="word_embeddings_for_head"
#初始化一個WE(已按vocab_size維度切割,可參見Megatron原理篇對WE的講解)
#VocabParallelEmbedding將在下文詳細(xì)講解
self.word_embeddings=mpu.VocabParallelEmbedding(
args.padded_vocab_size,#vocab_size
args.hidden_size,#embed_dim
init_method=init_method_normal(args.init_method_std),#初始化方法(在model/utils.py下)
)
#用0填充WE(等待下面做AllReduce后取得第一個進(jìn)程上的WE)
self.word_embeddings.weight.data.fill_(0)
self.word_embeddings.weight.shared=True
iftorch.distributed.is_initialized():
ifmpu.is_pipeline_first_stage()ormpu.is_pipeline_last_stage():#若當(dāng)前進(jìn)程是PP組第一個或最后一個進(jìn)程
#在兩進(jìn)程間做AllReduce,保證它們使用的WE完全一致
# mpu.get_embedding_group:在源碼解讀1中講過,是除DP/TP/PP之外設(shè)置的又一進(jìn)程組,
#主要就是用來做關(guān)于WE的通訊
torch.distributed.all_reduce(
self.word_embeddings_weight().data,group=mpu.get_embedding_group()
)
else:
print(
"WARNING!Distributedprocessesaren'tinitialized,so"
"wordembeddingsinthelastlayerarenotinitialized."
"Ifyouarejustmanipulatingamodelthisisfine,but"
"thisneedstobehandledmanually.Ifyouaretraining"
"somethingisdefinitelywrong."
)
五、Embedding
Emebdding類定義了word/position/segment embedding
,并定義輸入X過embedding層的計算方法。關(guān)鍵屬性和方法如下圖:
-
self.word_embeddings
:來自自定義的VocabParallelEmbedding (下面會詳述) 。「含“Parallel”則意味著參數(shù)在TP組間做了切割」。因此self.word_embeddings 是切割好的WE。每個進(jìn)程上維護(hù)根據(jù)自己進(jìn)程序號所取下的那塊WE(例如下圖中的WE1,WE2,圖片來自Megatron原理篇):
-
self.position_embeddings
和self.tokentype_embeddings
這兩者都和輸入X相關(guān),而輸入X是不做切割的,因此這兩者也無需切割。 -
state_dict_for_save_checkpoint
和load_state_dict
。在源碼注解里,這兩個函數(shù)分別給出了"easy load" 和"customize load"的注釋,這個注釋不是很貼切。實際上,前者用于在模型訓(xùn)練過程中及時讀取當(dāng)前參數(shù),及時保存(做checkpoint);后者則一般用于模型的重載,例如訓(xùn)到一半掛掉了,我們就重新初始化一個新模型,重載上個checkpoint保存下的權(quán)重。
Embedding層代碼如下(一切盡在注釋中):
classEmbedding(MegatronModule):
"""Languagemodelembeddings.
Arguments:
hidden_size:hiddensize
vocab_size:vocabularysize
max_sequence_length:maximumsizeofsequence.This
isusedforpositionalembedding
embedding_dropout_prob:dropoutprobabilityforembeddings
init_method:weightinitializationmethod
num_tokentypes:sizeofthetoken-typeembeddings.0value
willignorethisembedding
"""
def__init__(
self,
hidden_size,#每個token的向量維度
vocab_size,#詞表大小
max_sequence_length,#最長序列長度
embedding_dropout_prob,#dropoutprobabilityforembeddings
init_method,#初始化權(quán)重的方法
num_tokentypes=0,#類似于Bert中的segmenttype
):
super(Embedding,self).__init__()
args=get_args()
self.hidden_size=hidden_size
self.init_method=init_method
self.num_tokentypes=num_tokentypes
self.max_sequence_length=max_sequence_length
#WEsize:(vocab_size//TP_N,hidden_size)
#TP_N表示TP組模型并行度
self.word_embeddings=mpu.VocabParallelEmbedding(
vocab_size,self.hidden_size,init_method=self.init_method)
self._word_embeddings_key='word_embeddings'
self.vocab_size=vocab_size
#PEsize:(max_seq_len,hidden_size)
self.position_embeddings=torch.nn.Embedding(
max_sequence_length,self.hidden_size)
self.position_embeddings=self.position_embeddings.half()
self._position_embeddings_key='position_embeddings'
#Initializethepositionembeddings.
self.init_method(self.position_embeddings.weight)
#TE_size:(num_tokentypes,hidden_size)
#TE類似于Bert中的segmentembedding
self._tokentype_embeddings_key='tokentype_embeddings'
ifself.num_tokentypes>0:
self.tokentype_embeddings=torch.nn.Embedding(self.num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)
else:
self.tokentype_embeddings=None
#Embeddingsdropout
self.embedding_dropout=torch.nn.Dropout(embedding_dropout_prob)
defadd_tokentype_embeddings(self,num_tokentypes):
"""如果在pretrain階段未定義TE,而在fine-tune階段TE,則可通過此函數(shù)添加
"""
ifself.tokentype_embeddingsisnotNone:
raiseException('tokentypeembeddingsisalreadyinitialized')
iftorch.distributed.get_rank()==0:
print('addingembeddingfor{}tokentypes'.format(num_tokentypes),
flush=True)
self.num_tokentypes=num_tokentypes
self.tokentype_embeddings=torch.nn.Embedding(num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)
defforward(self,input_ids,position_ids,tokentype_ids=None):
"""定義輸入X過embedding層的計算方法
"""
#words_embeddingssize=(b,seq_len,hidden_size)
#再次注意:self.word_embeddings做forward時,最終的輸出結(jié)果時AllReduce的(見上圖)
words_embeddings=self.word_embeddings(input_ids)
#position_embeddingssize=(b,seq_len,hidden_size)
position_embeddings=self.position_embeddings(position_ids)
#embedding=WE+PE
#embeddingsize=(b,seq_len,hidden_size)
embeddings=words_embeddings+position_embeddings
#依需要決定是否增加TE
iftokentype_idsisnotNone:
assertself.tokentype_embeddingsisnotNone
embeddings=embeddings+self.tokentype_embeddings(tokentype_ids)
else:
assertself.tokentype_embeddingsisNone
#Dropout.
embeddings=self.embedding_dropout(embeddings)
returnembeddings
defstate_dict_for_save_checkpoint(
self,destination=None,prefix='',keep_vars=False,
):
"""Foreasyload.
在模型訓(xùn)練過程中及時讀取當(dāng)前參數(shù),方便及時保存(做checkpoint)
篇幅限制,這里不展示細(xì)節(jié)
"""
...
defload_state_dict(self,state_dict,strict=True):
"""Customizedload.
用于模型的重載。例如訓(xùn)到一半掛掉了,我們就重新初始化一個新模型,
重載上個checkpoint保存下的權(quán)重。
篇幅限制,這里不展示細(xì)節(jié)
"""
...
六、VocabParallelEmbedding
該類用于定義分布式的word embedding,整體架構(gòu)如下,同樣只列舉了核心屬性和方法:
具體代碼如下,可以特別關(guān)注「初始化和forward」部分,同時建議大家閱讀理論篇中關(guān)于這一過程的詳細(xì)講解(一切盡在注釋中)
classVocabParallelEmbedding(torch.nn.Module):
"""Embeddingparallelizedinthevocabularydimension.
Thisismainlyadaptedfromtorch.nn.Embeddingandallthedefault
valuesarekept.
Arguments:
num_embeddings:vocabularysize.
embedding_dim:sizeofhiddenstate.
init_method:methodtoinitializeweights.
"""
def__init__(self,num_embeddings,embedding_dim,init_method=init.xavier_normal_):
super(VocabParallelEmbedding,self).__init__()
#Keeptheinputdimensions.
self.num_embeddings=num_embeddings#vocab_size
self.embedding_dim=embedding_dim#hidden_state.
#Setthedetaulsforcompatibility.
self.padding_idx=None
self.max_norm=None
self.norm_type=2.0
self.scale_grad_by_freq=False
self.sparse=False
self._weight=None
#當(dāng)前進(jìn)程所在TP組進(jìn)程總數(shù)
self.tensor_model_parallel_size=get_tensor_model_parallel_world_size()
#根據(jù)當(dāng)前進(jìn)程在TP組中的序號,確定其所需維護(hù)的WE部分,沿著vocab維度對WE進(jìn)行切割
#例如,進(jìn)程id=0, 維護(hù)詞表序號[0,5)范圍內(nèi)的數(shù)據(jù);進(jìn)程id=1,維護(hù)[5,10)
(
self.vocab_start_index,
self.vocab_end_index,
)=VocabUtility.vocab_range_from_global_vocab_size(
self.num_embeddings,
get_tensor_model_parallel_rank(),
self.tensor_model_parallel_size,
)
#計算當(dāng)前進(jìn)程維護(hù)的詞表大小
self.num_embeddings_per_partition=(
self.vocab_end_index-self.vocab_start_index
)
#對WE做初始化
args=get_args()#讀取預(yù)訓(xùn)練參數(shù)配置
ifargs.use_cpu_initialization:#CPU上做初始化
self.weight=Parameter(#在CPU上先生成一個完整的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#對CPU上的WE做切割(隨機(jī)種子在初始化分布式中已設(shè)定好,不用變)
_initialize_affine_weight_cpu(
self.weight,
self.num_embeddings,
self.embedding_dim,
self.num_embeddings_per_partition,
0,
init_method,#初始化權(quán)重的方法,例如xavier之類
)
else:#在GPU上做初始化
self.weight=Parameter(#生成一個切割好的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
device=torch.cuda.current_device(),
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#在GPU上做初始化,注意TP組內(nèi)不同進(jìn)程采用不同的隨機(jī)種子
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=1
)
defforward(self,input_):
"""定義輸入X過WE的計算方法,輸出結(jié)果已經(jīng)過AllReduce"""
ifself.tensor_model_parallel_size>1:#如果使用TP
#如果在當(dāng)前進(jìn)程維護(hù)的WE上,找不到對應(yīng)的單詞,那么對應(yīng)位置就賦0
#例如當(dāng)前的數(shù)據(jù)的tokenid是:[2,7,1,5],當(dāng)前維護(hù)的詞表是[0,1,2](start_index=0, end_index = 3),
#則mask之后的數(shù)據(jù)為[2,0,1,0]
#Buildthemask.
input_mask=(input_=self.vocab_end_index
)
#Masktheinput.
masked_input=input_.clone()-self.vocab_start_index
masked_input[input_mask]=0
else:
masked_input=input_
#輸入X,過當(dāng)前進(jìn)程維護(hù)的部分WE的結(jié)果
output_parallel=F.embedding(
masked_input,#tensorcontainingindicesintotheembeddingmatrix
self.weight,#切割好的wordembedding的權(quán)重
self.padding_idx,
self.max_norm,
self.norm_type,
self.scale_grad_by_freq,
self.sparse,
)
#當(dāng)前詞表不維護(hù)的部分,都設(shè)為0
ifself.tensor_model_parallel_size>1:
output_parallel[input_mask,:]=0.0#
#將TP組各GPU上的結(jié)果做AllReduce
output=reduce_from_tensor_model_parallel_region(output_parallel)
returnoutput
def_initialize_affine_weight_cpu(...):
"""CPU版權(quán)重初始化。這個不難,大家可以自己閱讀"""
...
def_initialize_affine_weight_gpu(...):
"""GPU版權(quán)重初始化。特別關(guān)注設(shè)置隨機(jī)種子部分"""
...
#借助deepspeed或自定義的get_cuda_rng_tracker方法,對隨機(jī)種子進(jìn)行操作
#get_cuda_rng_tracker細(xì)節(jié),大家可自行閱讀源碼
ifds_checkpointing.is_configured():
globalget_cuda_rng_tracker
get_cuda_rng_tracker=ds_checkpointing.get_cuda_rng_tracker
withget_cuda_rng_tracker().fork():
init_method(weight)
七、ParallelSelfAttention:分布式block的一般套路
【閱讀提示】:閱讀本節(jié)時可:
- 對照第一部分CodeGeeX框架圖
- 對照Megatron理論篇對矩陣切分的講解
首先來看切割A(yù)ttention的示意圖,由圖可知,「對QKV矩陣,采用“列切割”,對線性矩陣B,采用“行切割”」。這樣設(shè)計的好處是,在經(jīng)過QKV的計算后,各進(jìn)程在不用通訊的前提下,繼續(xù)做線性計算,直到最后一步才AllReduce,起到降低通訊成本的作用:
我們先單獨來看“列切割”與“行切割”的實現(xiàn)代碼。Megatron將它們定義成了兩個nn.Module類。
7.1 列切割:ColumnParallelLinear
列切割示意圖如下:
-
f和g
是兩個共軛算子,可理解為兩個torch.autograd.Function類。在這個類下,我們可以「根據(jù)需要重寫forward和backward方法」。 -
f
: 「forward中,直接copy輸入;backward中,對梯度做AllReduce」。在代碼里定義為class _CopyToModelParallelRegion(torch.autograd.Function) -
g
: 「forward中,all-gather輸出;backward中,對梯度做split」(每張卡經(jīng)過all-gather已有完整的Y了,因此以Y為起點計算梯度后,沿著列做split就可得到Y(jié)1和Y2的梯度)。在代碼里定義為class _GatherFromModelParallelRegion(torch.autograd.Function)
classColumnParallelLinear(torch.nn.Module):
"""Linearlayerwithcolumnparallelism.
ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsseconddimensionasA=[A_1,...,A_p].
Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias
gather_output:Iftrue,callall-getheronoutputandmakeYavaiable
toallGPUs,otherwise,everyGPUwillhaveitsoutput
whichisY_i=XA_i
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""
#該類定義了切割后的權(quán)重W,例如對上圖來說,W1和W2都可分別視為該類的一個實例
def__init__(
self,
input_size,#W的第一個維度
output_size,#W的第二個維度
bias=True,#是否需要引入bias
gather_output=True,#決定是否要將Y1和Y2做all-gather
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(ColumnParallelLinear,self).__init__()
#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.gather_output=gather_output
#Dividetheweightmatrixalongthelastdimension.
#當(dāng)前進(jìn)程所在TP組的總進(jìn)程數(shù)
world_size=get_tensor_model_parallel_world_size()
#每塊GPU上維護(hù)的hidden_size的大小,等于原h(huán)idden_zize//TP組總進(jìn)程數(shù)
self.output_size_per_partition=divide(output_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device
#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#Initializeweight.
args=get_args()#取得命令行所有的參數(shù)
ifnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(#
self.weight,
self.output_size,
self.input_size,
self.output_size_per_partition,
0,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:#GPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=stride
)
else:
self.register_parameter("weight",None)
#對bias做處理,道理同weight
ifbiasandnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.bias=Parameter(
torch.empty(self.output_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(#GPU上初始化
torch.empty(
self.output_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
set_tensor_model_parallel_attributes(self.bias,True,0,stride)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)
defforward(self,input_):
#定義列切割中的f算子
#調(diào)用copy_to_tensor_model_parallel_region則新建一個_CopyToModelParallelRegion實例(見下)
input_parallel=copy_to_tensor_model_parallel_region(input_)
bias=self.biasifnotself.skip_bias_addelseNone#定義bias
output_parallel=F.linear(input_parallel,self.weight,bias)#X*切割好的權(quán)重
#決定是否要對每個進(jìn)程上的輸出結(jié)果做All-Reduce
ifself.gather_output:
#定義列切割中的g算子
#調(diào)用gather_from_tensor_model_parallel_region則新建一個_GatherFromModelParallelRegion實例(見下)
output=gather_from_tensor_model_parallel_region(output_parallel)#把各GPU上的輸出按照列g(shù)ather起來后,作為最終輸出
else:
output=output_parallel#否則最終輸出還是自己算的那塊GPU
output_bias=self.biasifself.skip_bias_addelseNone
returnoutput,output_bias
#列切割中的f與g
class_CopyToModelParallelRegion(torch.autograd.Function):
"""Passtheinputtothemodelparallelregion."""
#列切割下的f算子
# forward:copy輸入
# backward:對梯度做AllReduce
@staticmethod
defsymbolic(graph,input_):
returninput_
@staticmethod
defforward(ctx,input_):
returninput_
@staticmethod
defbackward(ctx,grad_output):
return_reduce(grad_output)
class_GatherFromModelParallelRegion(torch.autograd.Function):
"""Gathertheinputfrommodelparallelregionandconcatinate."""
#列切割中的g算子
# forward:All-Gather輸出
# backward:對梯度,沿著列方向做split
@staticmethod
defsymbolic(graph,input_):
return_gather(input_)
@staticmethod
defforward(ctx,input_):
return_gather(input_)
@staticmethod
defbackward(ctx,grad_output):
return_split(grad_output)
7.2 行切割:RowParallelLinear
-
f
: forward中,按列split輸入;backward中,all-gather梯度 -
g
: forward中,AllReduce輸出;backward中,直接輸出梯度,無需做任何通訊(因為經(jīng)過g的foward,每塊GPU上已擁有了Yi和Y,則根據(jù)圖中g(shù)的backward公式可知,每塊GPU可獨立計算梯度)
代碼如下:
classRowParallelLinear(torch.nn.Module):
"""Linearlayerwithrowparallelism.
ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsfirstdimensionandXalongitsseconddimensionas:
--
|A_1|
|.|
A=|.|X=[X_1,...,X_p]
|.|
|A_p|
--
Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias.Notethatbiasisnotparallelized.
input_is_parallel:Iftrue,weassumethattheinputisalready
splitacrosstheGPUsandwedonotsplit
again.
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""
def__init__(
self,
input_size,
output_size,
bias=True,
input_is_parallel=False,
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(RowParallelLinear,self).__init__()
#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.input_is_parallel=input_is_parallel
#Dividetheweightmatrixalongthelastdimension.
world_size=get_tensor_model_parallel_world_size()
self.input_size_per_partition=divide(input_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device
#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#weallocatethetranspose.
#Initializeweight.
args=get_args()
ifnotskip_init:
ifargs.use_cpu_initialization:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(
self.weight,
self.output_size,
self.input_size,
self.input_size_per_partition,
1,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=1,stride=stride
)
else:
self.register_parameter("weight",None)
ifbiasandnotskip_init:
ifargs.use_cpu_initialization:
self.bias=Parameter(
torch.empty(self.output_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(
torch.empty(
self.output_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)
defforward(self,input_):
#Setupbackpropall-reduce.
ifself.input_is_parallel:
input_parallel=input_
else:
input_parallel=scatter_to_tensor_model_parallel_region(input_)
#Matrixmultiply.
output_parallel=F.linear(input_parallel,self.weight)
#All-reduceacrossallthepartitions.
output_=reduce_from_tensor_model_parallel_region(output_parallel)
ifnotself.skip_bias_add:
output=output_+self.biasifself.biasisnotNoneelseoutput_
output_bias=None
else:
output=output_
output_bias=self.bias
returnoutput,output_bias
#行切割中的f和g算子
class_ScatterToModelParallelRegion(torch.autograd.Function):
"""Splittheinputandkeeponlythecorrespondingchucktotherank."""
#行切割中的f算子
# forward:沿列split輸入
# backward:all-gather梯度
@staticmethod
defsymbolic(graph,input_):
return_split(input_)
@staticmethod
defforward(ctx,input_):
return_split(input_)
@staticmethod
defbackward(ctx,grad_output):
return_gather(grad_output)
class_ReduceFromModelParallelRegion(torch.autograd.Function):
"""All-reducetheinputfromthemodelparallelregion."""
#行切割中的g算子
# forward:AllReduce輸出
# backward:正常計算梯度,GPU間無需做任何通訊
@staticmethod
defsymbolic(graph,input_):
return_reduce(input_)
@staticmethod
defforward(ctx,input_):
return_reduce(input_)
@staticmethod
defbackward(ctx,grad_output):
returngrad_output
7.3 ParallelSelfAttention
該類的構(gòu)造如下圖:
這張圖中透露的核心含義是,「每個進(jìn)程上維護(hù)的都是按列切割完的QKV矩陣」,進(jìn)程間獨立計算,QKV矩陣的輸出結(jié)果一般不做AllReduce。同時,「每個進(jìn)程上維護(hù)的是按行切割完的dense(線型層)矩陣」,Attention輸出過線性層后的結(jié)果,做AllReduce。另外,在設(shè)置attention_dropout時,同樣調(diào)用了get_cuda_rng_tracker 方法,令TP組內(nèi)的進(jìn)程擁有不同的隨機(jī)種子。「最后,你可能想問,dense后的dropout去哪里了」?代碼里把它定義到了ParallelTransformerLayer 下(等于attention + mlp)。
相信有了上面的說明,看這塊代碼就不難了。篇幅限制,這里不展示代碼了。大家可以對照著CodeGeeX架構(gòu)圖,來看這里multi-head attention的計算方式。
ParallelMLP,ParallelTransformerLayer和ParallelTransformer都采用的是一樣的套路,也略過不言。
八、CrossEntropy
現(xiàn)在,終于可以來看模型的最后一層:交叉熵的平行計算。核心類為_VocabParallelCrossEntropy
我們在原理篇中講過交叉熵的并行計算,其優(yōu)化核心是將通訊量從b*s*v降至b*s。但是Megatron代碼中定義的交叉熵計算方式,稍微復(fù)雜一些,也和我們一般理解的交叉熵有些許差異。所以我們先用圖解,來看下代碼定義的交叉熵計算流程:
【注】:
- 對X和Y_i來說,(b, s, h)維度下應(yīng)該畫成一個立方體,為了表達(dá)簡練,這里將b拍平了。
- 對其余維度中含b的矩陣,b正常表示,即row=b
8.1 計算logit
首先,在使用_VocabParallelCrossEntropy 計算交叉熵前,我們需要計算logit。這時我們調(diào)用parallel_lm_logits 函數(shù),將模型最后一層的輸出X(復(fù)習(xí)一下,這個X已經(jīng)在TP組內(nèi)AllReduce了),乘上當(dāng)前進(jìn)程上維護(hù)的輸入層WE的轉(zhuǎn)置(復(fù)習(xí)一下,輸入層和輸出層共用一套embedding),得到當(dāng)前進(jìn)程的logit Y_i,「同時我們選擇不對輸出logit做AllReduce」。
你可能會有一個疑惑:「在Transformer中,輸出層會額外訓(xùn)練一個線性矩陣,來計算logit;為什么在gpt中,可以用輸入層WE的轉(zhuǎn)置來代替這個線性矩陣?」
這個問題的答案,對理解Megatron交叉熵計算也至關(guān)重要。我們可「將X*WE^T結(jié)果理解成“X與WE間的相似度”」,例如對Y1來說,它的第一行中的每個logit,表示第一個token與詞表里每個詞的相似度。
注意到每個進(jìn)程上只維護(hù)部分WE。例如,假設(shè)詞表共有5個單詞,WE1維護(hù)前5個單詞,WE2維護(hù)后5個單詞。因此再嚴(yán)格來說:「對Y1,它的第一行中的每個logit,表示第一個token與詞表中前5個詞的相似度;對Y2,它的第一行中的每個logit,表示第一個token與詞表中后5個詞的相似度。我們要記住這個含義?!?/strong>
8.2 計算交叉熵
知道了logit的含義,我們來看交叉熵計算。
首先做了一系列求max的計算,得到基于全局的max(logit),再將orig_logit - max(logit),得到處理后的結(jié)果。這步理解起來不難,主要目的是為了防止計算溢出。
「接下來,就是基于logit算loss了。」
-
每個進(jìn)程上都有一份(b, s)維度的真值,它表示每個token的真值是哪個詞(詞用id表示)。我們基于這份真值,在Y_i上找出真值位置的logit。例如:seq_length = 3,即我們需要對3個token去做預(yù)測,假設(shè)前兩個token的真值在第1個進(jìn)程所維護(hù)的WE1中,最后一個token的真值在第2個進(jìn)程所維護(hù)的WE2中。那么我們?nèi)1的前兩行里,取出真值位置的logit,這個logit表示“token與真值的相似度”,去Y2的最后一行里做同樣操作。
-
這樣,我們就能得到L1和L2,和真值位置不對應(yīng)的地方,統(tǒng)一填充0。隨后對L1和L2做AllReduce,得到L。「L中的每行表示“token與真值間的相似度"」
-
現(xiàn)在,我們回來對Y1和Y2的每一行求sum(e^logit),得到e1和e2。將e1和e2做AllReduce,得到e。「e中的每行表示“token和詞表中所有詞相似度的總和”」
-
我們希望「(token和詞表中所有詞相似度的總和-token與真值間的相似度) /token和詞表中所有詞相似度的總和」這個值最小,這個差值就是最終的loss。
8.3 代碼
理清了這點,現(xiàn)在可以來看代碼了(一切盡在注釋中),建議對這塊還有疑問的朋友,可以寫個test腳本把中間結(jié)果打印出來,方便理解:
class_VocabParallelCrossEntropy(torch.autograd.Function):
"""
分布式計算Loss
"""
@staticmethod
defforward(ctx,vocab_parallel_logits,target):
#1.logit-globalmax(logit)操作,主要目的是防溢出
logits_max=torch.max(vocab_parallel_logits,dim=-1)[0]#(b,s,1)
torch.distributed.all_reduce(#(b,s,1)
logits_max,
op=torch.distributed.ReduceOp.MAX,#找全局最大值
group=get_tensor_model_parallel_group(),
)
#Subtractthemaximumvalue.
vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1))#原始GPU上維護(hù)的logits減去每行最大值(防止溢出)
#2、根據(jù)當(dāng)前進(jìn)程id,取出當(dāng)前進(jìn)程所維護(hù)詞表序號等信息
#函數(shù),能夠獲取當(dāng)前進(jìn)程所維護(hù)詞表的start_index和end_index
get_vocab_range=VocabUtility.vocab_range_from_per_partition_vocab_size
#這塊GPU上logits最后一維的大小,等于所維護(hù)的詞表的大?。╲/N)
partition_vocab_size=vocab_parallel_logits.size()[-1]
#取得當(dāng)前進(jìn)程所在TP組中的序號
rank=get_tensor_model_parallel_rank()
#取得當(dāng)前進(jìn)程所在TP組的總進(jìn)程數(shù)
world_size=get_tensor_model_parallel_world_size()
#取得當(dāng)前進(jìn)程所維護(hù)的詞表的start_index和end_index
vocab_start_index,vocab_end_index=get_vocab_range(
partition_vocab_size,rank,world_size
)
#3.基于真值,取出每個token在真值位置上的logit(即和真值的相似度)
#Createamaskofvalidvocabids(1meansitneedstobemasked)
target_mask=(target=vocab_end_index)#target=(b,s)
masked_target=target.clone()-vocab_start_index
masked_target[target_mask]=0
#Getpredicted-logits=logits[target].
#ForSimplicity,weconvertlogitstoa2-Dtensorwithsize
#[*,partition-vocab-size]andtargettoa1-Dtensorofsize[*].
logits_2d=vocab_parallel_logits.view(-1,partition_vocab_size)#(b*s,v/N)
masked_target_1d=masked_target.view(-1)#(b*s)
arange_1d=torch.arange(#[b*s]
start=0,end=logits_2d.size()[0],device=logits_2d.device
)
#logits_2d[arange_1d,masked_target_1d]:
# tensor的切片操作。arange_1d:取出所有的行。masked_target_1d:取出logit
predicted_logits_1d=logits_2d[arange_1d,masked_target_1d]#(b*s)
predicted_logits_1d=predicted_logits_1d.clone().contiguous()
predicted_logits=predicted_logits_1d.view_as(target)#(b,s)
predicted_logits[target_mask]=0.0
#AllreduceisneededtogetthechunksfromotherGPUs.
torch.distributed.all_reduce(#allreduce之后得到的logit矩陣為(b,s),每一個位置表示對應(yīng)真值位置的預(yù)測logit
predicted_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)
#SumofexponentialoflogitsalongvocabdimensionacrossallGPUs.
exp_logits=vocab_parallel_logits#(b,s,v/N)
torch.exp(vocab_parallel_logits,out=exp_logits)
sum_exp_logits=exp_logits.sum(dim=-1)#(b,s)
torch.distributed.all_reduce(
sum_exp_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)
#4.計算Loss=log(sum(exp(logits)))-predicted-logit.
loss=torch.log(sum_exp_logits)-predicted_logits#(b,s)
#Storesoftmax,target-maskandmasked-targetforbackwardpass.
exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))
ctx.save_for_backward(exp_logits,target_mask,masked_target_1d)
returnloss
@staticmethod
defbackward(ctx,grad_output):
#Retreivetensorsfromtheforwardpath.
softmax,target_mask,masked_target_1d=ctx.saved_tensors
#Alltheinputshavesoftmaxastheirgradient.
grad_input=softmax
#Forsimplicity,workwiththe2Dgradient.
partition_vocab_size=softmax.size()[-1]
grad_2d=grad_input.view(-1,partition_vocab_size)
#Addthegradientfrommatchingclasses.
arange_1d=torch.arange(start=0,end=grad_2d.size()[0],device=grad_2d.device)
grad_2d[arange_1d,masked_target_1d]-=1.0-target_mask.view(-1).float()
#Finallyelementwisemultiplicationwiththeoutputgradients.
grad_input.mul_(grad_output.unsqueeze(dim=-1))
returngrad_input,None
九、總結(jié)
啊這總結(jié)怎么寫呢,嘔心瀝血終于寫完了。希望能給到大家?guī)椭?/p>
-
gpu
+關(guān)注
關(guān)注
28文章
4761瀏覽量
129141 -
源碼
+關(guān)注
關(guān)注
8文章
649瀏覽量
29317 -
大模型
+關(guān)注
關(guān)注
2文章
2514瀏覽量
2932
原文標(biāo)題:圖解大模型訓(xùn)練之:Megatron源碼解讀2,模型并行
文章出處:【微信號:GiantPandaCV,微信公眾號:GiantPandaCV】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論