本部分将介绍 vLLM 的高级特性,包括模型量化、投机解码和分布式推理等进阶优化技术。
1 - 投机解码
投机解码(Speculative Decoding)
概述
投机解码(Speculative Decoding)是一种加速大语言模型推理的技术。其核心思想是:使用一个小型的 Draft 模型快速生成多个候选 token,然后让大型的 Target 模型并行验证这些候选 token。由于验证比逐个生成更高效,这种方法可以显著加速推理过程。
本文将深入分析 vLLM 中投机解码的实现原理和代码细节。
为什么需要投机解码
Decode 阶段的瓶颈
在LLM 生成过程中,我们了解到 Decode 阶段是内存带宽密集型的:
传统自回归生成:
Token 1 → Model Forward → Token 2 → Model Forward → Token 3 → ...
| | | | |
GPU 利用率低 等待 GPU 利用率低 等待 GPU 利用率低
每次 Decode 只处理一个 token,无法充分利用 GPU 的并行计算能力。
投机解码的核心思想
投机解码受到 CPU 分支预测的启发:
flowchart LR
subgraph 传统方法
A1[Token 1] --> B1[Model]
B1 --> C1[Token 2]
C1 --> D1[Model]
D1 --> E1[Token 3]
E1 --> F1[Model]
F1 --> G1[Token 4]
end
subgraph 投机解码
A2[Token 1] --> B2[Draft Model]
B2 --> C2[Draft: 2,3,4]
C2 --> D2[Target Model<br/>并行验证]
D2 --> E2[接受: 2,3<br/>拒绝: 4]
E2 --> F2[输出: 2,3,4']
end关键洞察:
- Draft 模型生成 K 个候选 token
- Target 模型一次性验证所有候选
- 接受正确的前缀,从第一个错误位置重新生成
- 验证的计算量 ≈ 生成一个 token,但产出多个 token
投机解码的数学原理
接受率分析
设:
p(x)= Target 模型在位置 i 的概率分布q(x)= Draft 模型在位置 i 的概率分布x_d= Draft 模型采样的 token
接受概率计算:
接受 x_d 的概率 = min(1, p(x_d) / q(x_d))
这种接受准则保证了最终输出的分布与只使用 Target 模型完全一致(无损加速)。
加速比估算
假设:
- Draft 模型生成 K 个候选 token
- 每个 token 的平均接受率为 α
- Draft 模型的前向时间为 t_d
- Target 模型的前向时间为 t_t
预期接受的 token 数:
E[accepted] = α + α² + α³ + ... + α^K ≈ α/(1-α) (当 α < 1)
加速比:
Speedup = E[accepted] × t_t / (K × t_d + t_t)
例如,当 α = 0.8,K = 5,t_d = t_t/10 时:
E[accepted] ≈ 3.2 tokens
Speedup ≈ 3.2 × t_t / (0.5 × t_t + t_t) = 2.1x
vLLM 中的投机解码实现
支持的投机解码方法
vLLM V1 支持多种投机解码方法:
graph TD
A[Speculative Decoding] --> B[Draft Model]
A --> C[EAGLE]
A --> D[EAGLE3]
A --> E[Medusa]
A --> F[MTP]
A --> G[N-gram]
B --> B1[独立小模型<br/>如 TinyLlama]
C --> C1[EAGLE Head<br/>利用隐藏状态]
D --> D1[EAGLE3 Head<br/>多层隐藏状态]
E --> E1[多头预测<br/>并行采样]
F --> F1[Multi-Token Prediction<br/>多 token 预测]
G --> G1[基于历史<br/>N-gram 匹配]核心代码结构
投机解码的核心代码位于 vllm/v1/spec_decode/ 目录:
vllm/v1/spec_decode/
├── __init__.py
├── eagle.py # EAGLE 基类和 Proposer
├── draft_model.py # Draft Model Proposer
├── medusa.py # Medusa Head
├── metadata.py # 投机解码元数据
├── metrics.py # 性能指标
├── ngram_proposer.py # N-gram 方法
├── suffix_decoding.py # 后缀解码
└── utils.py # 工具函数
vllm/v1/worker/gpu/spec_decode/
├── eagle.py # EAGLE CUDA Graph 支持
├── eagle_cudagraph.py # CUDA Graph 实现
└── rejection_sample.py # 拒绝采样内核
SpecDecodeBaseProposer 基类
类定义与初始化
SpecDecodeBaseProposer 是所有投机解码方法的基类:
# vllm/v1/spec_decode/eagle.py
class SpecDecodeBaseProposer:
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
pass_hidden_states_to_model: bool, # 是否传递隐藏状态
runner=None,
):
self.vllm_config = vllm_config
self.speculative_config = vllm_config.speculative_config
self.draft_model_config = self.speculative_config.draft_model_config
self.method = self.speculative_config.method
# 配置参数
self.num_speculative_tokens = self.speculative_config.num_speculative_tokens
self.hidden_size = self.draft_model_config.get_hidden_size()
# 持久化缓冲区(用于 CUDA Graph)
self.input_ids = torch.zeros(
self.max_num_tokens, dtype=torch.int32, device=device
)
self.positions = torch.zeros(
self.max_num_tokens, dtype=torch.int64, device=device
)
self.hidden_states = torch.zeros(
(self.max_num_tokens, self.hidden_size),
dtype=self.dtype, device=device
)
Proposer 工作流程
sequenceDiagram
participant Runner as ModelRunner
participant Proposer as SpecDecodeProposer
participant Draft as Draft Model
participant Cache as KV Cache
Runner->>Proposer: propose(target_hidden_states, next_token_ids)
Note over Proposer: 第一次前向传播
Proposer->>Proposer: set_inputs_first_pass()
Proposer->>Draft: model(input_ids, positions, hidden_states)
Draft-->>Proposer: logits, hidden_states
Proposer->>Proposer: argmax(logits) → token_1
loop 剩余 K-1 个 token
Note over Proposer: 增量前向传播
Proposer->>Proposer: 更新 positions, seq_lens
Proposer->>Proposer: 计算 slot_mapping
Proposer->>Draft: model(token_i, position, hidden_state)
Draft-->>Proposer: logits, hidden_states
Proposer->>Proposer: argmax(logits) → token_{i+1}
end
Proposer-->>Runner: draft_token_ids [batch, K]propose() 方法详解
def propose(
self,
target_token_ids: torch.Tensor, # [num_tokens]
target_positions: torch.Tensor, # [num_tokens]
target_hidden_states: torch.Tensor, # [num_tokens, hidden_size]
next_token_ids: torch.Tensor, # [batch_size]
last_token_indices: torch.Tensor | None,
common_attn_metadata: CommonAttentionMetadata,
sampling_metadata: SamplingMetadata,
...
) -> torch.Tensor:
batch_size = common_attn_metadata.batch_size()
# 1. 设置第一次前向传播的输入
num_tokens, last_token_indices, common_attn_metadata = (
self.set_inputs_first_pass(
target_token_ids=target_token_ids,
next_token_ids=next_token_ids,
target_positions=target_positions,
last_token_indices=last_token_indices,
cad=common_attn_metadata,
num_rejected_tokens_gpu=num_rejected_tokens_gpu,
)
)
# 2. 构建 Attention Metadata
attn_metadata = attn_metadata_builder.build_for_drafting(
common_attn_metadata=common_attn_metadata, draft_index=0
)
# 3. 第一次前向传播
with set_forward_context(per_layer_attn_metadata, ...):
ret_hidden_states = self.model(
input_ids=self.input_ids[:num_input_tokens],
positions=self._get_positions(num_input_tokens),
hidden_states=self.hidden_states[:num_input_tokens]
if self.pass_hidden_states_to_model else None,
)
# 4. 采样第一个 draft token
logits = self.model.compute_logits(last_hidden_states[last_token_indices])
draft_token_ids = logits.argmax(dim=-1)
draft_token_ids_list = [draft_token_ids]
# 5. 生成剩余的 draft tokens
for token_index in range(self.num_speculative_tokens - 1):
# 更新输入
input_ids = draft_token_ids_list[-1].int()
positions += 1
# 处理超出最大长度的情况
exceeds_max_model_len = positions >= self.max_model_len
clamped_positions = torch.where(exceeds_max_model_len, 0, positions)
# 更新序列长度
common_attn_metadata.seq_lens += 1
# 计算新的 slot mapping
block_numbers = clamped_positions // block_size
block_ids = common_attn_metadata.block_table_tensor.gather(
dim=1, index=block_numbers.view(-1, 1)
)
common_attn_metadata.slot_mapping = (
block_ids * block_size + clamped_positions % block_size
)
# 屏蔽超长位置
common_attn_metadata.slot_mapping.masked_fill_(
exceeds_max_model_len, PADDING_SLOT_ID
)
# 执行模型
with set_forward_context(...):
ret_hidden_states = self.model(
input_ids=self.input_ids[:input_batch_size],
positions=self._get_positions(input_batch_size),
hidden_states=self.hidden_states[:input_batch_size],
)
# 采样下一个 token
logits = self.model.compute_logits(last_hidden_states[:batch_size])
draft_token_ids = logits.argmax(dim=-1)
draft_token_ids_list.append(draft_token_ids)
# 返回所有 draft tokens
return torch.stack(draft_token_ids_list, dim=1)
DraftModelProposer 详解
独立 Draft 模型
DraftModelProposer 使用独立的小模型(如 TinyLlama)作为 Draft:
# vllm/v1/spec_decode/draft_model.py
class DraftModelProposer(SpecDecodeBaseProposer):
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
runner=None,
):
super().__init__(
vllm_config=vllm_config,
device=device,
pass_hidden_states_to_model=False, # 不需要 Target 的隐藏状态
runner=runner,
)
# 验证约束
self._raise_if_multimodal() # 暂不支持多模态
self._raise_if_mrope() # 暂不支持 M-RoPE
self._raise_if_vocab_size_mismatch() # 词表大小必须一致
self._raise_if_draft_tp_mismatch() # TP 大小必须一致
输入处理
Draft Model 的输入处理需要合并 Target 的输出:
def set_inputs_first_pass(
self,
target_token_ids: torch.Tensor,
next_token_ids: torch.Tensor,
target_positions: torch.Tensor,
last_token_indices: torch.Tensor | None,
cad: CommonAttentionMetadata,
num_rejected_tokens_gpu: torch.Tensor | None,
) -> tuple[int, torch.Tensor, CommonAttentionMetadata]:
batch_size = cad.batch_size()
# 使用 Triton kernel 合并 tokens
# target_toks: [a1, b1, b2, c1, c2, c3]
# next_toks: [a2, b3, c4]
# 结果: [a1, a2, b1, b2, b3, c1, c2, c3, c4]
merge_toks_kernel[grid](
target_toks_ptr=target_token_ids,
next_toks_ptr=next_token_ids,
query_start_locs_ptr=start_locs,
query_end_locs_ptr=end_locs,
out_ptr_merged_toks=self.input_ids,
out_ptr_is_rejected_tok=is_rejected_tok,
...
)
# 重新计算 slot mapping
new_slot_mapping = compute_new_slot_mapping(
cad=cad,
new_positions=self.positions[:num_tokens],
is_rejected_token_mask=is_rejected_tok,
block_size=self._block_size(),
max_model_len=self.max_model_len,
)
return num_tokens, new_last_token_indices, new_cad
加载 Draft 模型
def load_model(self, target_model: Any) -> None:
# 创建 Draft 模型专用的 VllmConfig
draft_vllm_config = create_vllm_config_for_draft_model(
target_model_vllm_config=self.vllm_config
)
logger.info(
"Starting to load draft model %s. TP=%d, rank=%d",
draft_vllm_config.model_config.model,
draft_vllm_config.parallel_config.tensor_parallel_size,
draft_vllm_config.parallel_config.rank,
)
# 加载模型并设置编译标签
with set_model_tag("draft_model"):
self.model = get_model(vllm_config=draft_vllm_config, prefix="draft_model")
EAGLE Proposer 详解
EAGLE 方法原理
EAGLE (Extrapolation Algorithm for Greater Language-model Efficiency) 利用 Target 模型的隐藏状态来预测 draft tokens:
graph TD
subgraph Target Model
T1[Input Token] --> T2[Transformer Layers]
T2 --> T3[Hidden States]
T3 --> T4[LM Head]
T4 --> T5[Output Token]
end
subgraph EAGLE Head
T3 --> E1[Feature Projection]
T5 --> E2[Token Embedding]
E1 --> E3[+]
E2 --> E3
E3 --> E4[Lightweight Transformer]
E4 --> E5[LM Head]
E5 --> E6[Draft Tokens]
end
style E1 fill:#f9f,stroke:#333
style E4 fill:#f9f,stroke:#333EAGLE 的关键创新:
- 复用 Target 模型的隐藏状态,避免独立编码
- 只需要很少的额外参数(通常 < 1% 的 Target 模型)
- 共享 Token Embedding 和 LM Head 权重
EagleProposer 实现
# vllm/v1/spec_decode/eagle.py
class EagleProposer(SpecDecodeBaseProposer):
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
runner=None,
):
super().__init__(
vllm_config,
device,
pass_hidden_states_to_model=True, # 需要传递隐藏状态
runner=runner,
)
权重共享机制
EAGLE 模型与 Target 模型共享权重:
def load_model(self, target_model: nn.Module) -> None:
# 加载 EAGLE head
with set_model_tag("eagle_head"):
self.model = get_model(
vllm_config=self.vllm_config,
model_config=draft_model_config
)
# 检查是否需要共享 embedding
if hasattr(self.model, "has_own_embed_tokens"):
if not self.model.has_own_embed_tokens:
share_embeddings = True
logger.info("Sharing target model embedding weights with draft model")
if share_embeddings:
# 共享 embed_tokens
del self.model.model.embed_tokens
self.model.model.embed_tokens = target_embed_tokens
# 共享 lm_head
if share_lm_head:
del self.model.lm_head
self.model.lm_head = target_language_model.lm_head
拒绝采样(Rejection Sampling)
核心算法
拒绝采样确保输出分布与 Target 模型一致:
# vllm/v1/worker/gpu/spec_decode/rejection_sample.py
@triton.jit
def _rejection_sample_kernel(
sampled_ptr, # [num_reqs, num_speculative_steps + 1]
sampled_stride,
num_sampled_ptr, # [num_reqs]
target_sampled_ptr, # [num_draft_tokens + num_reqs]
input_ids_ptr, # [num_draft_tokens + num_reqs](draft tokens)
cu_num_logits_ptr, # [num_reqs + 1]
):
req_idx = tl.program_id(0)
start_idx = tl.load(cu_num_logits_ptr + req_idx)
end_idx = tl.load(cu_num_logits_ptr + req_idx + 1)
num_tokens = end_idx - start_idx
num_sampled = 0
rejected = False
# 逐个比较 draft token 和 target token
for i in range(num_tokens - 1):
if not rejected:
target_sampled = tl.load(target_sampled_ptr + start_idx + i)
draft_sampled = tl.load(input_ids_ptr + start_idx + i + 1)
# 存储 target 的采样结果
tl.store(sampled_ptr + req_idx * sampled_stride + i, target_sampled)
num_sampled += 1
# 检查是否匹配
if target_sampled != draft_sampled:
rejected = True # 一旦不匹配,后续全部拒绝
# 处理最后一个 token(bonus token)
if not rejected:
target_sampled = tl.load(target_sampled_ptr + start_idx + num_tokens - 1)
tl.store(
sampled_ptr + req_idx * sampled_stride + num_tokens - 1,
target_sampled
)
num_sampled += 1
tl.store(num_sampled_ptr + req_idx, num_sampled)
工作流程图解
flowchart TD
A[Draft: t1, t2, t3, t4, t5] --> B[Target 验证]
B --> C{t1 匹配?}
C -->|是| D{t2 匹配?}
C -->|否| E[拒绝 t1, t2, t3, t4, t5<br/>输出 target_t1]
D -->|是| F{t3 匹配?}
D -->|否| G[拒绝 t2, t3, t4, t5<br/>输出 t1, target_t2]
F -->|是| H{t4 匹配?}
F -->|否| I[拒绝 t3, t4, t5<br/>输出 t1, t2, target_t3]
H -->|是| J{t5 匹配?}
H -->|否| K[拒绝 t4, t5<br/>输出 t1, t2, t3, target_t4]
J -->|是| L[全部接受<br/>输出 t1-t5 + bonus token]
J -->|否| M[拒绝 t5<br/>输出 t1-t4, target_t5]Tree-based 投机解码
树结构 Draft
vLLM 支持树结构的投机解码,可以同时探索多个候选路径:
graph TD
R[Root Token] --> A[Candidate A]
R --> B[Candidate B]
R --> C[Candidate C]
A --> A1[A-1]
A --> A2[A-2]
B --> B1[B-1]
B --> B2[B-2]
C --> C1[C-1]
C --> C2[C-2]
A1 --> A11[A-1-1]
A2 --> A21[A-2-1]
B1 --> B11[B-1-1]
B2 --> B21[B-2-1]
style A fill:#9f9
style A1 fill:#9f9
style A11 fill:#9f9Tree Attention
树结构需要特殊的注意力计算:
def propose_tree(
self,
batch_size: int,
logits: torch.Tensor,
positions: torch.Tensor,
hidden_states: torch.Tensor,
common_attn_metadata: CommonAttentionMetadata,
...
) -> list[torch.Tensor]:
# 解析投机 token 树
tree_depth = len(self.cu_drafts_per_level)
# 第一层:从 root 采样多个候选
num_children = self.child_drafts_per_level[0]
if num_children == 1:
draft_token_ids = logits.argmax(dim=-1).view(batch_size, -1)
else:
# Top-K 采样
draft_token_ids = torch.topk(
logits, num_children, dim=-1
).indices.view(batch_size, -1)
draft_token_ids_list = [draft_token_ids]
# 逐层生成
for level in range(tree_depth - 1):
# 构建树注意力元数据
attn_metadata = tree_attn_metadata_builder.build_for_drafting(
common_attn_metadata=common_attn_metadata,
draft_index=level + 1
)
# 执行前向传播
with set_forward_context(...):
last_hidden_states, hidden_states = self.model(
input_ids=self.input_ids[:num_input_tokens],
positions=self.positions[:num_input_tokens],
hidden_states=self.hidden_states[:num_input_tokens],
)
# 为下一层采样
logits = self.model.compute_logits(draft_last_hidden_states)
num_children = self.child_drafts_per_level[level + 1]
if num_children == 1:
draft_token_ids = logits.argmax(dim=-1).view(batch_size, -1)
else:
draft_token_ids = torch.topk(
logits, num_children, dim=-1
).indices.view(batch_size, -1)
draft_token_ids_list.append(draft_token_ids)
return draft_token_ids_list
投机解码配置与使用
配置参数
from vllm import LLM, SamplingParams
# 使用独立 Draft 模型
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
speculative_model="meta-llama/Llama-3.2-1B-Instruct",
num_speculative_tokens=5, # 每次投机生成的 token 数
)
# 使用 EAGLE
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
speculative_model="path/to/eagle-head",
speculative_method="eagle",
num_speculative_tokens=5,
)
命令行使用
# 使用 Draft 模型
vllm serve meta-llama/Llama-3.1-70B-Instruct \
--speculative-model meta-llama/Llama-3.2-1B-Instruct \
--num-speculative-tokens 5
# 使用 EAGLE
vllm serve meta-llama/Llama-3.1-70B-Instruct \
--speculative-model path/to/eagle-head \
--speculative-method eagle \
--num-speculative-tokens 5
性能优化
CUDA Graph 支持
投机解码支持 CUDA Graph 以减少 kernel launch 开销:
class SpecDecodeBaseProposer:
def initialize_cudagraph_keys(self, cudagraph_mode: CUDAGraphMode) -> None:
"""初始化 CUDA Graph dispatcher"""
if (
not self.speculative_config.enforce_eager
and cudagraph_mode.mixed_mode() in [CUDAGraphMode.PIECEWISE, CUDAGraphMode.FULL]
):
eagle_cudagraph_mode = CUDAGraphMode.PIECEWISE
else:
eagle_cudagraph_mode = CUDAGraphMode.NONE
self.cudagraph_dispatcher.initialize_cudagraph_keys(eagle_cudagraph_mode)
批量处理优化
Padded Drafter Batch 避免动态形状:
def prepare_inputs_padded(
self,
common_attn_metadata: CommonAttentionMetadata,
spec_decode_metadata: SpecDecodeMetadata,
valid_sampled_tokens_count: torch.Tensor,
) -> tuple[CommonAttentionMetadata, torch.Tensor, torch.Tensor]:
"""
准备投机解码的输入,使用 padding 保持固定形状。
被拒绝的 token 作为 padding,后续通过 token_indices_to_sample 过滤。
"""
num_reqs = common_attn_metadata.num_reqs
device = valid_sampled_tokens_count.device
token_indices_to_sample = torch.empty(
(num_reqs,), dtype=torch.int32, device=device
)
num_rejected_tokens_gpu = torch.empty(
(num_reqs,), dtype=torch.int32, device=device
)
# 使用 Triton kernel 计算
eagle_prepare_inputs_padded_kernel[grid](
spec_decode_metadata.cu_num_draft_tokens,
valid_sampled_tokens_count,
common_attn_metadata.query_start_loc,
token_indices_to_sample,
num_rejected_tokens_gpu,
num_reqs,
)
return spec_common_attn_metadata, token_indices_to_sample, num_rejected_tokens_gpu
投机解码的限制与注意事项
当前限制
- 多模态不完全支持:某些投机解码方法不支持多模态模型
- M-RoPE 限制:Draft Model 方法不支持 M-RoPE 位置编码
- 词表大小:Draft 和 Target 模型必须有相同的词表
- 张量并行:Draft 和 Target 的 TP 大小必须一致
最佳实践
选择合适的 K 值:
- 较大的 K 增加预测深度,但降低平均接受率
- 通常 K=3-5 是较好的平衡点
Draft 模型选择:
- 选择与 Target 模型同系列的小模型
- 确保词表完全一致
- EAGLE 通常比独立 Draft 模型效率更高
监控接受率:
# 检查投机解码统计 # vLLM 会在日志中输出平均接受率等指标
总结
投机解码是加速 LLM 推理的重要技术:
graph TD
A[投机解码核心思想] --> B[Draft Model 预测]
A --> C[Target Model 验证]
A --> D[拒绝采样保证正确性]
E[vLLM 实现特点] --> F[多种方法支持]
E --> G[CUDA Graph 优化]
E --> H[Tree Attention]
E --> I[权重共享]
J[最佳实践] --> K[选择合适的 K 值]
J --> L[监控接受率]
J --> M[选择匹配的 Draft 模型]关键要点:
- 核心原理:用小模型快速预测,大模型并行验证
- 无损加速:拒绝采样保证输出分布不变
- vLLM 优化:CUDA Graph、权重共享、批量处理
- 实际效果:通常可获得 1.5x-3x 的加速
参考资料
导航
2 - 量化技术
量化技术(Quantization)
概述
量化(Quantization)是一种将模型权重和激活值从高精度(如 FP32、FP16)转换为低精度(如 INT8、INT4、FP8)的技术。通过量化,可以显著减少模型的显存占用和计算量,从而提高推理效率。
本文将介绍量化的基本原理以及 vLLM 中支持的各种量化方法。
为什么需要量化
显存压力
以 LLaMA-70B 为例:
| 精度 | 每个参数占用 | 模型权重大小 |
|---|---|---|
| FP32 | 4 字节 | 280 GB |
| FP16/BF16 | 2 字节 | 140 GB |
| INT8 | 1 字节 | 70 GB |
| INT4 | 0.5 字节 | 35 GB |
加上 KV Cache 和激活值,FP16 推理需要多张高端 GPU;而 INT4 量化可以在单张 80GB GPU 上运行。
计算加速
graph LR
subgraph FP16 计算
A1[权重 FP16] --> B1[矩阵乘法]
B1 --> C1[输出 FP16]
end
subgraph INT8 计算
A2[权重 INT8] --> B2[整数矩阵乘法]
B2 --> C2[反量化]
C2 --> D2[输出 FP16]
end现代 GPU 的 INT8/INT4 计算单元比 FP16 更快:
- A100: INT8 Tensor Core 是 FP16 的 2 倍吞吐
- H100: FP8 Tensor Core 是 FP16 的 2 倍吞吐
量化的基本原理
线性量化
最常见的量化方法是线性量化(Linear Quantization):
量化: q = round((x - zero_point) / scale)
反量化: x ≈ q × scale + zero_point
其中:
x是原始浮点值q是量化后的整数值scale是缩放因子zero_point是零点偏移
对称量化 vs 非对称量化
graph TD
subgraph 对称量化
A1["-127 到 127"] --> B1["zero_point = 0"]
B1 --> C1["x = q × scale"]
end
subgraph 非对称量化
A2["-128 到 127"] --> B2["zero_point ≠ 0"]
B2 --> C2["x = q × scale + zero_point"]
end对称量化(Symmetric):
- 零点固定为 0
- 计算更简单
- 适合权重分布对称的情况
非对称量化(Asymmetric):
- 零点可变
- 更灵活,精度可能更高
- 计算稍复杂
量化粒度
graph TD
A[量化粒度] --> B[Per-Tensor<br/>整个张量一个 scale]
A --> C[Per-Channel<br/>每个通道一个 scale]
A --> D[Per-Group<br/>每 G 个元素一个 scale]
A --> E[Per-Block<br/>每个块一个 scale]
B --> B1[内存效率最高<br/>精度最低]
C --> C1[权重常用]
D --> D1[精度与效率平衡]
E --> E1[块大小如 128]vLLM 支持的量化方法
量化方法概览
vLLM 支持丰富的量化方法:
# vllm/model_executor/layers/quantization/__init__.py
QuantizationMethods = Literal[
"awq", # Activation-aware Weight Quantization
"fp8", # FP8 量化
"gptq", # Post-Training Quantization for GPT
"gptq_marlin", # GPTQ with Marlin kernel
"awq_marlin", # AWQ with Marlin kernel
"bitsandbytes", # BitsAndBytes 量化
"gguf", # GGUF 格式量化
"compressed-tensors", # 压缩张量
"torchao", # PyTorch AO 量化
"modelopt", # NVIDIA ModelOpt FP8
"mxfp4", # MXFP4 格式
...
]
量化配置基类
# vllm/model_executor/layers/quantization/base_config.py
class QuantizationConfig(ABC):
"""量化配置基类"""
@abstractmethod
def get_name(self) -> QuantizationMethods:
"""量化方法名称"""
raise NotImplementedError
@abstractmethod
def get_supported_act_dtypes(self) -> list[torch.dtype]:
"""支持的激活数据类型"""
raise NotImplementedError
@classmethod
@abstractmethod
def get_min_capability(cls) -> int:
"""最低 GPU 计算能力要求
70 = Volta, 75 = Turing, 80 = Ampere
"""
raise NotImplementedError
@abstractmethod
def get_quant_method(
self, layer: torch.nn.Module, prefix: str
) -> QuantizeMethodBase | None:
"""获取量化方法实现"""
raise NotImplementedError
class QuantizeMethodBase(ABC):
"""量化方法基类"""
@abstractmethod
def create_weights(
self, layer: torch.nn.Module, *weight_args, **extra_weight_attrs
):
"""创建量化权重"""
raise NotImplementedError
@abstractmethod
def apply(self, layer: torch.nn.Module, *args, **kwargs) -> torch.Tensor:
"""应用量化计算"""
raise NotImplementedError
FP8 量化详解
FP8 格式介绍
FP8 (8-bit Floating Point) 有两种主要格式:
| 格式 | 符号位 | 指数位 | 尾数位 | 动态范围 | 精度 |
|---|---|---|---|---|---|
| E4M3 | 1 | 4 | 3 | 较小 | 较高 |
| E5M2 | 1 | 5 | 2 | 较大 | 较低 |
E4M3 更适合权重,E5M2 更适合梯度。vLLM 主要使用 E4M3。
Fp8Config 实现
# vllm/model_executor/layers/quantization/fp8.py
class Fp8Config(QuantizationConfig):
"""FP8 量化配置"""
def __init__(
self,
is_checkpoint_fp8_serialized: bool = False, # 是否为 FP8 序列化的检查点
activation_scheme: str = "dynamic", # 激活量化方案
ignored_layers: list[str] | None = None, # 忽略的层
weight_block_size: list[int] | None = None, # 块大小
) -> None:
super().__init__()
self.is_checkpoint_fp8_serialized = is_checkpoint_fp8_serialized
# 支持的激活方案: static, dynamic
if activation_scheme not in ACTIVATION_SCHEMES:
raise ValueError(f"Unsupported activation scheme {activation_scheme}")
self.activation_scheme = activation_scheme
self.ignored_layers = ignored_layers or []
self.weight_block_size = weight_block_size
@classmethod
def get_min_capability(cls) -> int:
return 75 # 最低支持 Turing 架构
@classmethod
def get_supported_act_dtypes(cls) -> list[torch.dtype]:
return [torch.bfloat16, torch.half]
动态 vs 静态激活量化
flowchart TD
subgraph 动态量化
A1[输入激活] --> B1[计算 min/max]
B1 --> C1[动态计算 scale]
C1 --> D1[量化为 FP8]
D1 --> E1[计算]
end
subgraph 静态量化
A2[校准数据集] --> B2[收集激活统计]
B2 --> C2[预计算 scale]
C2 --> D2[存储 scale]
A3[输入激活] --> E2[使用预存 scale]
D2 --> E2
E2 --> F2[量化为 FP8]
F2 --> G2[计算]
end动态量化:
- 运行时计算 scale
- 精度更高
- 开销稍大
静态量化:
- 使用预计算的 scale
- 计算更快
- 需要校准数据
AWQ 量化详解
AWQ 原理
AWQ (Activation-aware Weight Quantization) 的核心思想是:保护重要的权重通道。
graph TD
A[原始权重 W] --> B[计算激活幅度]
B --> C[识别重要通道]
C --> D[对重要通道缩放]
D --> E[均匀量化]
E --> F[反缩放恢复]关键洞察:
- 不同权重通道对输出的贡献不同
- 通过激活值的幅度识别重要通道
- 对重要通道进行缩放保护,减少量化误差
AWQ 在 vLLM 中的使用
# 加载 AWQ 量化模型
from vllm import LLM
llm = LLM(
model="TheBloke/Llama-2-7B-AWQ",
quantization="awq",
)
# 或者使用 Marlin 加速
llm = LLM(
model="TheBloke/Llama-2-7B-AWQ",
quantization="awq_marlin", # 使用 Marlin 内核
)
GPTQ 量化详解
GPTQ 原理
GPTQ (Post-Training Quantization for GPT) 使用二阶信息(Hessian)来最小化量化误差:
目标: min ||W - Q(W)||_H
其中 H 是 Hessian 矩阵
GPTQ 逐列量化,并使用 Hessian 信息来补偿量化误差:
flowchart LR
A[权重矩阵 W] --> B[计算 Hessian H]
B --> C[逐列量化]
C --> D[误差补偿]
D --> E[量化后权重 Q]GPTQ 配置
# 加载 GPTQ 量化模型
llm = LLM(
model="TheBloke/Llama-2-7B-GPTQ",
quantization="gptq",
)
# 使用 Marlin 内核加速
llm = LLM(
model="TheBloke/Llama-2-7B-GPTQ",
quantization="gptq_marlin",
)
Marlin 内核
Marlin 是什么
Marlin 是一套高度优化的 CUDA 内核,专门用于 INT4/INT8 矩阵乘法:
graph TD
A[量化权重 INT4/INT8] --> B[Marlin 内核]
C[激活值 FP16] --> B
B --> D[输出 FP16]
E[特点] --> F[高效的内存访问模式]
E --> G[优化的 Tensor Core 使用]
E --> H[支持异步预取]Marlin 相比普通内核可以提供 2-4 倍的加速。
支持的量化格式
awq_marlin: AWQ 格式 + Marlin 内核gptq_marlin: GPTQ 格式 + Marlin 内核gptq_marlin_24: 2:4 稀疏 GPTQ + Marlin 内核
BitsAndBytes 量化
4-bit 量化
BitsAndBytes 提供简单的 4-bit 量化:
llm = LLM(
model="meta-llama/Llama-2-7B",
quantization="bitsandbytes",
load_format="bitsandbytes",
)
NF4 格式
BitsAndBytes 使用 NF4 (Normal Float 4) 格式,专门优化正态分布的权重:
NF4 量化级别:
[-1.0, -0.6962, -0.5251, -0.3949, -0.2844, -0.1848, -0.0911, 0.0,
0.0796, 0.1609, 0.2461, 0.3379, 0.4407, 0.5626, 0.7230, 1.0]
这些级别根据正态分布的分位数选择,使量化误差最小化。
GGUF 格式
GGUF 简介
GGUF (GGML Universal File) 是一种通用的量化模型格式,支持多种量化级别:
| 量化类型 | 位数 | 说明 |
|---|---|---|
| Q4_0 | 4 | 基础 4-bit 量化 |
| Q4_K_M | 4 | K-means 聚类量化 |
| Q5_0 | 5 | 5-bit 量化 |
| Q5_K_M | 5 | K-means 5-bit |
| Q8_0 | 8 | 8-bit 量化 |
在 vLLM 中使用
llm = LLM(
model="TheBloke/Llama-2-7B-GGUF",
quantization="gguf",
)
KV Cache 量化
为什么量化 KV Cache
KV Cache 在长序列场景下占用大量显存。量化 KV Cache 可以:
- 减少显存占用 50-75%
- 支持更长的上下文
- 支持更大的批量大小
vLLM 中的 KV Cache 量化
# vllm/model_executor/layers/quantization/kv_cache.py
class BaseKVCacheMethod:
"""KV Cache 量化基类"""
def quant_kv_tensor(
self,
key: torch.Tensor,
value: torch.Tensor,
scale: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
"""量化 KV 张量"""
raise NotImplementedError
启用 KV Cache 量化:
llm = LLM(
model="meta-llama/Llama-2-7B",
kv_cache_dtype="fp8", # 或 "fp8_e4m3", "fp8_e5m2"
)
在线量化
什么是在线量化
在线量化是指在加载模型时动态进行量化,而不需要预先量化好的检查点:
# vllm/model_executor/model_loader/online_quantization.py
# 动态 FP8 量化
llm = LLM(
model="meta-llama/Llama-2-7B",
quantization="fp8",
# 将 FP16 权重动态转换为 FP8
)
优势与限制
优势:
- 无需预量化的检查点
- 灵活选择量化方法
- 适合实验和快速部署
限制:
- 加载时间稍长
- 某些量化方法不支持在线量化
- 精度可能略低于离线量化
量化方法对比
精度 vs 压缩率
graph LR
subgraph 高精度
A[FP16] --> B[2x 压缩]
end
subgraph 中等精度
C[FP8] --> D[4x 压缩]
E[INT8] --> F[4x 压缩]
end
subgraph 低精度
G[INT4] --> H[8x 压缩]
I[INT3] --> J[10.7x 压缩]
K[INT2] --> L[16x 压缩]
end性能对比表
| 量化方法 | 精度损失 | 速度提升 | 显存节省 | 推荐场景 |
|---|---|---|---|---|
| FP8 | 极低 | 1.5-2x | 50% | 生产环境 |
| AWQ | 低 | 2-3x | 75% | 长文本推理 |
| GPTQ | 低-中 | 2-3x | 75% | 资源受限 |
| BitsAndBytes | 中 | 1.5x | 75% | 快速实验 |
| GGUF | 可变 | 可变 | 可变 | CPU 推理 |
最佳实践
选择合适的量化方法
flowchart TD
A[选择量化方法] --> B{GPU 类型?}
B -->|H100/A100| C{精度要求?}
B -->|消费级| D{显存大小?}
C -->|高| E[FP8]
C -->|中| F[AWQ/GPTQ + Marlin]
D -->|24GB+| G[AWQ/GPTQ]
D -->|<24GB| H[GPTQ 4-bit]
I[无 GPU] --> J[GGUF]配置建议
# 高精度生产环境
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
quantization="fp8",
tensor_parallel_size=4,
)
# 资源受限环境
llm = LLM(
model="TheBloke/Llama-2-70B-GPTQ",
quantization="gptq_marlin",
tensor_parallel_size=2,
)
# 单 GPU 部署
llm = LLM(
model="TheBloke/Llama-2-7B-AWQ",
quantization="awq_marlin",
max_model_len=8192,
)
注意事项
- 验证精度:量化后务必在实际任务上验证精度
- 选择 Marlin:有 Marlin 版本时优先使用
- KV Cache 量化:长序列场景考虑启用
- 监控性能:关注吞吐量和延迟指标
总结
graph TD
A[量化技术核心] --> B[减少显存]
A --> C[加速计算]
A --> D[保持精度]
E[vLLM 支持] --> F[FP8 - 高精度]
E --> G[AWQ/GPTQ - 高压缩]
E --> H[Marlin - 高性能]
E --> I[KV Cache 量化]
J[选择建议] --> K[生产环境用 FP8]
J --> L[资源受限用 4-bit]
J --> M[使用 Marlin 加速]关键要点:
- 量化原理:用低精度表示权重,平衡精度与效率
- 多种方法:FP8、AWQ、GPTQ、BitsAndBytes 等
- Marlin 加速:INT4/INT8 专用高效内核
- 实际选择:根据硬件和精度需求选择合适方法
参考资料
导航
3 - 分布式推理
分布式推理(Distributed Inference)
概述
当模型规模超过单个 GPU 的显存容量时,需要使用分布式推理。vLLM 支持多种并行策略,可以将大模型分布到多个 GPU 上运行。
本文将介绍分布式推理的基本概念、并行策略以及 vLLM 中的实现细节。
为什么需要分布式推理
单卡显存限制
以 LLaMA-70B 为例(FP16 精度):
| 组件 | 显存占用 |
|---|---|
| 模型权重 | ~140 GB |
| KV Cache (4K 上下文, batch=32) | ~20 GB |
| 激活值 | ~5 GB |
| 总计 | ~165 GB |
即使是 H100 80GB,单卡也无法容纳完整模型。
分布式推理的目标
graph TD
A[分布式推理] --> B[支持更大模型]
A --> C[提高吞吐量]
A --> D[降低延迟]
B --> B1[70B/405B 模型]
C --> C1[更大批量]
D --> D1[并行计算]并行策略概述
主要并行方式
graph TD
A[并行策略] --> B[张量并行 TP]
A --> C[流水线并行 PP]
A --> D[数据并行 DP]
B --> B1[切分权重矩阵]
C --> C1[切分模型层]
D --> D1[复制完整模型]各策略对比
| 策略 | 通信模式 | 显存效率 | 计算效率 | 适用场景 |
|---|---|---|---|---|
| 张量并行 | AllReduce | 高 | 高 | 单节点多卡 |
| 流水线并行 | P2P | 中 | 中 | 多节点 |
| 数据并行 | AllGather | 低 | 最高 | 高吞吐量 |
张量并行(Tensor Parallelism)
基本原理
张量并行将模型的权重矩阵切分到多个 GPU:
graph LR
subgraph 单卡计算
A1[输入 X] --> B1[完整权重 W]
B1 --> C1[输出 Y = XW]
end
subgraph 2卡张量并行
A2[输入 X] --> B2[权重 W1<br/>GPU 0]
A2 --> B3[权重 W2<br/>GPU 1]
B2 --> C2[Y1 = XW1]
B3 --> C3[Y2 = XW2]
C2 --> D2[AllReduce]
C3 --> D2
D2 --> E2[输出 Y]
end列并行与行并行
graph TD
subgraph 列并行 Column Parallel
W1["W = [W1 | W2]"] --> C1["输出 = [XW1 | XW2]"]
C1 --> R1["需要 AllGather"]
end
subgraph 行并行 Row Parallel
W2["W = [W1]<br/> [W2]"] --> C2["输出 = XW1 + XW2"]
C2 --> R2["需要 AllReduce"]
endLinear 层的张量并行:
- 第一个 Linear:列并行,输出分片
- 第二个 Linear:行并行,输入分片
vLLM 中的实现
# vllm/distributed/parallel_state.py
class GroupCoordinator:
"""分布式组协调器"""
def __init__(
self,
group_ranks: list[list[int]],
local_rank: int,
torch_distributed_backend: str,
use_pynccl: bool,
...
):
self.rank = torch.distributed.get_rank()
self.ranks = group_ranks[local_rank]
self.world_size = len(self.ranks)
self.local_rank = local_rank
# 创建通信组
self.device_group = torch.distributed.new_group(
self.ranks, backend=torch_distributed_backend
)
def all_reduce(self, input_: torch.Tensor) -> torch.Tensor:
"""AllReduce 操作"""
if self.world_size == 1:
return input_
if self.use_custom_op_call:
return torch.ops.vllm.all_reduce(input_, group_name=self.unique_name)
else:
return self._all_reduce_out_place(input_)
def all_gather(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
"""AllGather 操作"""
if self.world_size == 1:
return input_
if self.use_custom_op_call:
return torch.ops.vllm.all_gather(
input_, dim, self.world_size, group_name=self.unique_name
)
else:
return self._all_gather_out_place(input_, dim)
def reduce_scatter(self, input_: torch.Tensor, dim: int = -1) -> torch.Tensor:
"""ReduceScatter 操作"""
if self.world_size == 1:
return input_
return self._reduce_scatter_out_place(input_, dim)
通信原语
sequenceDiagram
participant G0 as GPU 0
participant G1 as GPU 1
participant G2 as GPU 2
participant G3 as GPU 3
Note over G0,G3: AllReduce (求和)
G0->>G0: [1]
G1->>G1: [2]
G2->>G2: [3]
G3->>G3: [4]
G0-->>G3: 通信
G0->>G0: [10]
G1->>G1: [10]
G2->>G2: [10]
G3->>G3: [10]
Note over G0,G3: AllGather (收集)
G0->>G0: [A]
G1->>G1: [B]
G2->>G2: [C]
G3->>G3: [D]
G0-->>G3: 通信
G0->>G0: [A,B,C,D]
G1->>G1: [A,B,C,D]
G2->>G2: [A,B,C,D]
G3->>G3: [A,B,C,D]流水线并行(Pipeline Parallelism)
基本原理
流水线并行将模型的层分配到不同 GPU:
graph LR
subgraph GPU 0
L1[Layer 0-15]
end
subgraph GPU 1
L2[Layer 16-31]
end
subgraph GPU 2
L3[Layer 32-47]
end
subgraph GPU 3
L4[Layer 48-63]
end
L1 --> L2 --> L3 --> L4流水线调度
为了减少 GPU 空闲时间,使用微批次(micro-batch)流水线:
gantt
title 流水线并行调度
dateFormat X
axisFormat %s
section GPU 0
Micro 1 Forward: 0, 1
Micro 2 Forward: 1, 2
Micro 3 Forward: 2, 3
Micro 4 Forward: 3, 4
section GPU 1
Idle: 0, 1
Micro 1 Forward: 1, 2
Micro 2 Forward: 2, 3
Micro 3 Forward: 3, 4
Micro 4 Forward: 4, 5
section GPU 2
Idle: 0, 2
Micro 1 Forward: 2, 3
Micro 2 Forward: 3, 4
Micro 3 Forward: 4, 5
Micro 4 Forward: 5, 6vLLM 中的配置
from vllm import LLM
# 配置流水线并行
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
tensor_parallel_size=2, # 每个流水线阶段 2 卡张量并行
pipeline_parallel_size=2, # 2 个流水线阶段
# 总共需要 2 × 2 = 4 张 GPU
)
数据并行(Data Parallelism)
基本原理
数据并行复制完整模型到每个 GPU,各 GPU 处理不同的请求:
graph TD
subgraph 请求分发
R[请求队列] --> R1[请求 1,2,3]
R --> R2[请求 4,5,6]
end
subgraph GPU 0
M1[完整模型副本]
R1 --> M1
end
subgraph GPU 1
M2[完整模型副本]
R2 --> M2
endvLLM 中的数据并行
vLLM 支持通过多实例实现数据并行:
# 启动多个 vLLM 实例
# 实例 1:使用 GPU 0-1
CUDA_VISIBLE_DEVICES=0,1 vllm serve model --tensor-parallel-size 2 --port 8000
# 实例 2:使用 GPU 2-3
CUDA_VISIBLE_DEVICES=2,3 vllm serve model --tensor-parallel-size 2 --port 8001
然后使用负载均衡器分发请求。
通信后端
NCCL 通信
vLLM 使用 NCCL (NVIDIA Collective Communications Library) 进行 GPU 间通信:
# vllm/distributed/device_communicators/pynccl.py
class PyNcclCommunicator:
"""PyNccl 通信器"""
def __init__(self, group: ProcessGroup, device: torch.device):
self.group = group
self.device = device
# 初始化 NCCL 通信
self.nccl_comm = self._init_nccl()
def all_reduce(self, tensor: torch.Tensor) -> torch.Tensor:
"""使用 NCCL 执行 AllReduce"""
# 调用 NCCL AllReduce
return self._nccl_all_reduce(tensor)
自定义 AllReduce
vLLM 提供了优化的自定义 AllReduce 实现:
# vllm/distributed/device_communicators/custom_all_reduce.py
class CustomAllReduce:
"""
自定义 AllReduce,针对小张量优化。
使用共享内存和 CUDA 内核实现低延迟通信。
"""
def __init__(self, group: ProcessGroup):
self.group = group
# 分配共享内存用于通信
self._init_shared_memory()
def all_reduce(self, tensor: torch.Tensor) -> torch.Tensor:
# 对于小张量,使用自定义内核
if tensor.numel() < self.threshold:
return self._custom_all_reduce(tensor)
# 对于大张量,使用 NCCL
return self._nccl_all_reduce(tensor)
分布式初始化
初始化流程
sequenceDiagram
participant Main as 主进程
participant W0 as Worker 0
participant W1 as Worker 1
participant W2 as Worker 2
participant W3 as Worker 3
Main->>Main: 解析配置
Main->>W0: 启动 Worker
Main->>W1: 启动 Worker
Main->>W2: 启动 Worker
Main->>W3: 启动 Worker
Note over W0,W3: 初始化分布式环境
W0->>W0: init_distributed_environment()
W1->>W1: init_distributed_environment()
W2->>W2: init_distributed_environment()
W3->>W3: init_distributed_environment()
Note over W0,W3: 初始化模型并行组
W0->>W0: initialize_model_parallel()
W1->>W1: initialize_model_parallel()
W2->>W2: initialize_model_parallel()
W3->>W3: initialize_model_parallel()
Note over W0,W3: 加载模型(分片)
W0->>W0: load_model()
W1->>W1: load_model()
W2->>W2: load_model()
W3->>W3: load_model()并行组配置
# 并行组划分示例
# 假设 4 GPU,TP=2,PP=2
# 张量并行组:
# [GPU 0, GPU 1] # 第一阶段
# [GPU 2, GPU 3] # 第二阶段
# 流水线并行组:
# [GPU 0, GPU 2] # 第一个数据并行副本
# [GPU 1, GPU 3] # 第二个数据并行副本
分布式执行器
Executor 类型
vLLM 提供多种分布式执行器:
graph TD
A[Executor 类型] --> B[UniProcExecutor<br/>单进程]
A --> C[MultiprocExecutor<br/>多进程]
A --> D[RayDistributedExecutor<br/>Ray 分布式]
B --> B1[单 GPU 调试]
C --> C1[单节点多 GPU]
D --> D1[多节点集群]MultiprocExecutor
# vllm/v1/executor/multiproc_executor.py
class MultiprocExecutor:
"""多进程执行器,用于单节点多 GPU"""
def __init__(self, vllm_config: VllmConfig):
self.vllm_config = vllm_config
parallel_config = vllm_config.parallel_config
# 计算总 Worker 数
self.world_size = (
parallel_config.tensor_parallel_size *
parallel_config.pipeline_parallel_size
)
# 启动 Worker 进程
self.workers = self._start_workers()
def _start_workers(self):
"""使用 multiprocessing 启动 Worker"""
workers = []
for rank in range(self.world_size):
worker = multiprocessing.Process(
target=self._worker_main,
args=(rank,)
)
worker.start()
workers.append(worker)
return workers
Ray 分布式执行器
# 使用 Ray 进行多节点分布式推理
from vllm import LLM
# Ray 会自动检测集群中的 GPU
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
tensor_parallel_size=4, # 使用 4 张 GPU
distributed_executor_backend="ray",
)
KV Cache 分布式传输
Prefill-Decode 分离架构
vLLM 支持将 Prefill 和 Decode 阶段分离到不同节点:
graph LR
subgraph Prefill 节点
P1[GPU 0-3<br/>计算密集]
end
subgraph Decode 节点
D1[GPU 0-3<br/>内存密集]
end
subgraph KV Transfer
T[KV Cache 传输]
end
P1 --> T --> D1KV Connector 实现
# vllm/distributed/kv_transfer/kv_connector/v1/base.py
class KVConnectorBase:
"""KV Cache 传输连接器基类"""
def send_kv_cache(
self,
request_id: str,
kv_cache: torch.Tensor,
) -> None:
"""发送 KV Cache 到远程节点"""
raise NotImplementedError
def recv_kv_cache(
self,
request_id: str,
) -> torch.Tensor:
"""从远程节点接收 KV Cache"""
raise NotImplementedError
配置示例
单节点 4 GPU
# 单节点 4 GPU 张量并行
llm = LLM(
model="meta-llama/Llama-3.1-70B-Instruct",
tensor_parallel_size=4,
)
# 命令行方式
vllm serve meta-llama/Llama-3.1-70B-Instruct --tensor-parallel-size 4
单节点 8 GPU
# 4 路张量并行 + 2 路流水线并行
llm = LLM(
model="meta-llama/Llama-3.1-405B-Instruct",
tensor_parallel_size=4,
pipeline_parallel_size=2,
)
多节点集群
# 节点 1(主节点)
vllm serve meta-llama/Llama-3.1-405B-Instruct \
--tensor-parallel-size 8 \
--pipeline-parallel-size 2 \
--distributed-executor-backend ray
# 确保 Ray 集群已启动并包含所有节点
性能优化
通信优化
- 重叠计算与通信:
# 使用异步通信
with torch.cuda.stream(comm_stream):
all_reduce(tensor)
# 同时在计算流上进行其他操作
with torch.cuda.stream(compute_stream):
other_computation()
- 使用 Custom AllReduce:
# 对于小张量,使用自定义 AllReduce
# vLLM 会自动选择最优策略
负载均衡
对于流水线并行,确保每个阶段的层数均衡:
# 手动指定层分配(如果需要)
# 默认情况下 vLLM 会均匀分配
调试技巧
检查分布式状态
from vllm.distributed import (
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
get_pipeline_model_parallel_rank,
get_pipeline_model_parallel_world_size,
)
# 打印当前进程的并行信息
print(f"TP Rank: {get_tensor_model_parallel_rank()}")
print(f"TP World Size: {get_tensor_model_parallel_world_size()}")
print(f"PP Rank: {get_pipeline_model_parallel_rank()}")
print(f"PP World Size: {get_pipeline_model_parallel_world_size()}")
环境变量
# 设置 NCCL 调试级别
export NCCL_DEBUG=INFO
# 设置 NCCL 超时
export NCCL_TIMEOUT=1800
# 禁用 P2P 通信(调试用)
export NCCL_P2P_DISABLE=1
总结
graph TD
A[分布式推理] --> B[张量并行 TP]
A --> C[流水线并行 PP]
A --> D[数据并行 DP]
B --> B1[切分权重矩阵<br/>AllReduce 通信]
C --> C1[切分模型层<br/>P2P 通信]
D --> D1[复制完整模型<br/>请求分发]
E[vLLM 支持] --> F[NCCL 通信]
E --> G[Custom AllReduce]
E --> H[Ray 分布式]
E --> I[KV Cache 传输]
J[配置建议] --> K[单节点用 TP]
J --> L[多节点用 PP+TP]
J --> M[高吞吐用 DP]关键要点:
- 张量并行:单节点多 GPU 首选,低延迟
- 流水线并行:跨节点扩展,需要权衡
- 数据并行:吞吐量最高,但显存效率低
- 组合使用:大模型通常需要 TP+PP 组合
参考资料
导航