SGLang 中的 TP + PP 流程浅析(以 Qwen2 为例)¶
约 2604 个字 45 行代码 3 张图片 预计阅读时间 14 分钟
这里我们以 Qwen2 模型为例,开启 PP + TP 分析一下 SGLang 是如何实现模型推理的并行的
不同节点的职责¶
| 节点 | 工作内容 |
|---|---|
| Rank 0 | tokenizer、detokenizer、HTTP 服务、调度器、模型 worker |
| Rank > 0 | 只运行调度器 + worker,不处理前端服务 |
Initiallize Server¶
launch_server.py中根据grpc_mode参数决定执行serve_grpc()或者launch_server()
后续以
launch_server()为例进行讲解
- 调用
engine.py中的_launch_subprocesses()
这里所有的子进程以 spawn 方式派生全新 Python 进程
- 按照 pp 和 tp 重新映射 GPU Id,然后为每个 GPU 创建一个
mp.Pipe() - 接着启动一个 Scheduler 子进程,传入 pipe 的写端(使用
run_scheduler_process()),然后父进程保留子进程引用和 pipe 的读端 - 多机场景下非 0 节点不参与前端服务,仅负责启动 scheduler 进程并保持节点健康,避免重复运行 tokenizer / detokenizer / 接入服务。
- 0 号节点启动 detokenizer 子进程 by
run_detokenizer_process(), - 启动 TokenizerManager,等待所有的 GPU 都加载了 model 并拥有相同的
scheduler_info(Bymp.Pipe())
- 从
_launch_subprocesses()中获得tokenizer_manager和scheduler_info,直接在主进程启动 HTTP 服务 TokenizerManager 进行请求的接收
😆 现在,Tokenizer 进程,Scheduler 进程,Detokenizer 进程可以通过事件循环不停驱动,实现用户请求的处理
Scheduler¶
先创建 Scheduler 对象,在 __init__ 中进行 TpModelWorker 初始化,DraftWorker 初始化,memory pool 和 memory cache 初始化
然后根据 server_args 不同,启动不同的事件循环
if disaggregation_mode == DisaggregationMode.NULL:
if scheduler.enable_pdmux:
scheduler.event_loop_pdmux()
elif server_args.pp_size > 1:
scheduler.event_loop_pp()
elif scheduler.enable_overlap:
scheduler.event_loop_overlap()
else:
scheduler.event_loop_normal()
TpModelWorker & ModelRunner¶
TpModelWoker 的 __init__() 中进行了 ModelRunner 的初始化
ModelRunner 的 __init__() 调用了 self.init_torch_distributed()
- 确认使用的通信后端,这里以 NCCL 为例
- 最终调用 parallel_state.py 中的
initialize_model_parallel()。 - 创建全局的
_TP(Tensor Parallelism) 进程组。假设 TP=4,GPU 0-3 会被划入同一个 NCCL 通信组。 - 创建全局的
_PP(Pipeline Parallelism) 进程组,为每个流水线 stage 创建 1 个独立的通信 group
| i | ranks = range(i, 8, 4) | PP Stage |
|---|---|---|
| 0 | 0,4 | [0,4] |
| 1 | 1,5 | [1,5] |
| 2 | 2,6 | [2,6] |
| 3 | 3,7 | [3,7] |
Detokenizer¶
实际上 detokenizer 会一直事件循环,从 Scheduler 得到 TODO,解码成 BatchTokenIDOutput 传递给 Tokenizer 子进程
def event_loop(self):
"""The event loop that handles requests"""
while True:
recv_obj = self.recv_from_scheduler.recv_pyobj()
output = self._request_dispatcher(recv_obj)
if output is not None:
self.send_to_tokenizer.send_pyobj(output)
Parallel Linear in SGLang¶
在 SGLang 中,Parallel Linear 是实现 Tensor Parallel (TP) 的核心组件。它通过将巨大的权重矩阵切分到多个 GPU 上,使得每个 GPU 只需计算一部分数据,从而减少显存占用并加速计算。
主要有两种并行方式:Column Parallel (列并行) 和 Row Parallel (行并行)。通常它们会成对出现(例如在 MLP 中:先 Column 后 Row),以最小化通信开销。
ColumnParallelLinear¶
数学原理:
输入 \(X\) 是完整的(复制在所有 GPU 上)。每个 GPU 计算 \(Y_i=XA_i\)。输出 \(Y\)被 切分为 \([Y_1,Y_2,...,Y_p]\),即每个 GPU 得到输出向量的一部分特征。
典型应用:
- Attention 的 QKV Projection (
QKVParallelLinear)。 - MLP 的 Gate / Up Projection (
MergedColumnParallelLinear)。
RowParallelLinear¶
数学原理:
为了匹配矩阵乘法规则,输入 \(X\) 也必须按列切分为 \([X_1,X_2,...,X_p]\)(这正好是 ColumnParallelLinear 的输出格式)。每个 GPU 计算 \(Y_i=XA_i\)。注意,\(Y_i\) 的形状与最终输出 \(Y\) 相同,但它只是部分和。最终输出 \(Y = \sum Y_i\) ,需要一次 All-Reduce (Sum) 操作
典型应用:
- Attention 的 Output Projection (
o_proj)。 - MLP 的 Down Projection (
down_proj)。
ColumnParallelLinear + RowParallelLinear¶
计算量减半,多了一次集群通信(allReduce),中间值的存储大小减半,Input, Weight 减半
Qwen2 Model¶
每个 GPU 进程都会实例化一个 Qwen2ForCausalLM 对象,但根据其所在的 PP Rank 和 TP Rank,加载的内容不同
Embedding 层¶
- 只有 PP Rank 0 的进程会初始化
VocabParallelEmbedding(Row Parallel)。 - TP 处理: 词表 (Vocab) 被切分到 TP 组的各个 GPU 上。每个 GPU 只持有
VocabSize / TP_Size大小的权重。 - 其他 PP Rank: 初始化为 PPMissingLayer (占位符,不占用显存)。
# perform weight tying for PP
if self.pp_group.world_size > 1 and config.tie_word_embeddings:
if self.pp_group.is_first_rank:
self.pp_group.send(self.model.embed_tokens.weight, dst=self.pp_group.last_rank)
elif self.pp_group.is_last_rank:
emb_token_weight = self.pp_group.recv(
size=(config.vocab_size, config.hidden_size),
dtype=next(self.model.parameters()).dtype,
src=self.pp_group.first_rank,
)
self.lm_head.weight.copy_(emb_token_weight)
Transformer Layers (所有 PP Rank)¶
- 使用
make_layers进行构建。所有层都会构建,只有本地层会加载权重(即占用显存) - PP 切分: 总层数(例如 32 层)会被均匀分配给 PP 组的各个 Rank。
- 例如 PP=4,Rank 0 负责 0-7 层,Rank 1 负责 8-15 层,以此类推。
- 本地层: 当前 Rank 只实际初始化它负责的那部分
Qwen2DecoderLayer。 - 缺失层: 不属于当前 Rank 的层被初始化为
PPMissingLayer。
modules = torch.nn.ModuleList(
[PPMissingLayer(return_tuple=return_tuple) for _ in range(start_layer)]
+ get_offloader().wrap_modules(
(
layer_fn(idx=idx, prefix=add_prefix(idx, prefix))
for idx in range(start_layer, end_layer)
),
**(offloader_kwargs or {}),
)
+ [
PPMissingLayer(return_tuple=return_tuple)
for _ in range(end_layer, num_hidden_layers)
]
)
Qwen2DecodeLayer 实际上主要是由 Qwen2Attention,Qwen2MLP,RMSNorm 组成的
- Qwen2Attention:
- 输入 -> 中间: 使用 QKVParallelLinear (继承自 ColumnParallelLinear)。将输入投影到 Q, K, V。
- 中间 -> 输出: 使用 RowParallelLinear。将 Attention 的输出投影回 hidden size。
- Qwen2MLP:
- 输入 -> 中间: 使用 MergedColumnParallelLinear (继承自 ColumnParallelLinear)。将输入投影到 Gate 和 Up 状态。
- 中间 -> 输出: 使用 RowParallelLinear。将激活后的状态投影回 hidden size。
LMHead 层¶
是一个 按词表维度切分 (Column Parallel) 的线性层
- 只有 最后一个 PP Rank 会初始化
RMSNorm和ParallelLMHead。 - 其他 PP Rank: 初始化为 PPMissingLayer。
Inference Process¶
当一个 Batch 的请求到来时,数据流会在 GPU 之间通过流水线传递。
这里以 PP3 + TP2 为例,梳理下整个推理的流程
拓扑¶
| GPU | 逻辑 rank | 负责层 |
|---|---|---|
| GPU1 | (pp0,tp0) | Layer1-2 的 TP shard 0 |
| GPU2 | (pp0,tp1) | Layer1-2 的 TP shard 1 |
| GPU3 | (pp0,tp2) | Layer1-2 的 TP shard 2 |
| GPU4 | (pp1,tp0) | Layer3-4 的 TP shard 0 |
| GPU5 | (pp1,tp1) | Layer3-4 的 TP shard 1 |
| GPU6 | (pp1,tp2) | Layer3-4 的 TP shard 2 |
Scheduler 持有的关键状态¶
- 单 stage 常驻状态是 waiting_queue / running_batch / cur_batch / last_batch
- PP 模式额外有 mbs / last_mbs / running_mbs / pp_outputs / last_rank_comm_queue / send_req_work / send_proxy_work / send_output_work。
- hidden_states 不是 scheduler 的长期成员;它作为
GenerationBatchResult.pp_hidden_states_proxy_tensors里的 PPProxyTensors 临时存在,然后通过 send_proxy_work 发到下一 stage。 - next_token_ids 先是
GenerationBatchResult.next_token_ids,随后写入 batch.output_ids,最后 append 到每个 req.output_ids。
消息传输¶
- req/control:CPU 侧控制消息,单 rank 收发 + CPU broadcast
- hidden_states/residual:tensor data 各 TP rank 点对点发送 + 接收侧可选 TP all-gather
Prefill 流程¶
- 外部请求只先进 pp0。(pp0,tp0) 从 ZMQ 收到 recv_reqs;pp0,tp1/tp2 不直接收外部请求
- pp0 内部先做 TP 控制广播:recv_reqs 经 broadcast_pyobj(..., tp_cpu_group) 扩给 GPU2/GPU3。这是 CPU gloo 通道,传的是 Python 对象,属于控制面。
-
pp0 选 batch 后,每张卡都本地执行 prepare_for_extend()。这一步得到:
Pythoninput_ids = fill_ids[len(prefix_indices):] seq_lens / prefix_lens / extend_lens out_cache_loc req_pool_indices- 这里没有 TP0 统一分配再广播。而是 GPU½/3 各自用自己的本地 req_to_token_pool 和 token_to_kv_pool_allocator 做同样的分配;因为控制流一致,分配结果在逻辑位置上保持一致,但物理 KV 存在各自 GPU 本地。
- 都只是存放的各自负责的 layer 和各自负责的 head 的 KV
- ForwardBatch.init_new() 把 input_ids / req_pool_indices / out_cache_loc / seq_lens 填进模型执行结构。
- pp0 的每张卡开始算本 stage 层。stage 内的 TP 通信发生在层内部,不在 scheduler 里:
- 首层 embedding 后会做 TP all_reduce。
- 线性层会按实现做 all_gather 或 all_reduce。
- 这些都是 GPU collectives,参与 rank 必须同步完成这个 collective 才能继续。
- 同时,本 stage 每个本地 layer shard 会把本 shard 的 K/V 写入本卡自己的 KV pool;写入位置就是
forward_batch.out_cache_loc。 - pp0 前向结束后,不是直接传 token,而是传
PPProxyTensors({"hidden_states","residual"})给 pp1。 - 这条消息在 scheduler 里走 send_proxy_work,消息体来自
result.pp_hidden_states_proxy_tensors.tensors。发送前会等 launch_event,确保 GPU 计算完成;发送是 async,但下一 stage 的接收是阻塞 - req/control 也从 (pp0,tp0) 通过
point_to_point_pyobj发到 (pp1,tp0),再在 pp1 内广播到 GPU5/GPU6;这条是 CPU 控制消息,和 proxy tensor 分离。 - pp1 收到 proxy 后继续算 Layer3-4;本 stage 也会做本地 TP collectives,并把本 stage 的 K/V 写进 GPU⅘/6 各自本地 KV pool。
- 最后一层后,lm_head logits 会先在 TP 内 all-gather 成完整 vocab logits,所以 GPU⅘/6 每张卡本地都有完整 next_token_logits。
- 然后 GPU⅘/6 都各自执行 sampler;默认不再额外同步 next_token_ids,只依赖算子确定性。只有 SYNC_TOKEN_IDS_ACROSS_TP=1 或 grammar 时才做一次 TP all_reduce(MIN) 对齐。
- pp1 把 next_token_ids 打成 output tensor dict,先放进 last_rank_comm_queue,再 async 发回 ring。
- 这条 output 不只回到 pp0,还会继续沿 ring 转一圈,让每个 PP stage 都跑一次 process_batch_result,从而每个 stage 的本地 Req 状态保持一致;pp_outputs 就是这个在环上中转的成员。每个 stage 收到 output 后:
_pp_prep_batch_result()先把batch.output_ids = pp_outputs["next_token_ids"]- 然后
process_batch_result_prefill()把每个 token append 到 req.output_ids若请求未结束,会 tree_cache.cache_unfinished_req(req),把刚算好的前缀缓存进 radix cache,并更新 req.prefix_indices
Decode 流程¶
- 进入 decode 前,每个 stage 的本地 Req 都已经有同样的 req.output_ids,因为上一轮 output 已经沿 ring 让所有 stage 都处理过了。
- 每个 stage 本地调用 alloc_for_decode() 给这 1 个新 token 分一个新的 out_cache_loc,并写到 req_to_token_pool[req_pool_idx, seq_lens]。
- 注意: 这里仍然没有跨 PP/TP 传 KV;每张卡只是给自己这张卡上、自己负责的层 shard 追加一格 KV。真正的历史上下文依赖本地 KV cache,不重算 prompt
- 第 k 轮 decode 的通信与 prefill 基本同型,只是 pp0 输入 token 数从整段 extend tokens变成每请求 1 token:
- CPU 控制消息:req 从 pp0 到 pp1
- TP 层内 collectives:每个 local layer 都有
- PP proxy:hidden_states/residual 从 pp0 到 pp1
- LM head logits all-gather:在 pp1 的 TP 内
- 可选 token-id TP sync:默认关
- PP output:next_token_ids 从 pp1 回 ring
- 所有 stage 本地 process_batch_result_decode():append 到 req.output_ids
层内并行细节 (Inside a Layer with TP)¶
在每一个 Transformer 层内部,TP 是这样工作的:
- Qwen2Attention:
- 输入: 完整的
hidden_states(所有 TP Rank 都有副本)。 - QKV Proj (Column Parallel): 每个 TP Rank 只计算一部分 Head 的 Q/K/V。
- Attention 计算: 每个 Rank 独立计算自己那部分 Head 的 Attention。
- Output Proj (Row Parallel): 每个 Rank 计算部分输出。
- AllReduce: 在 Output Proj 之后,执行一次 AllReduce (Sum),让所有 TP Rank 重新获得完整的 Attention 输出,并加到 Residual 上。
- Qwen2MLP:
