本部分将带你深入 vLLM 的源代码,通过分析关键代码路径,理解请求从创建到完成的完整生命周期。
1 - 入口点分析
入口点分析
本章开始,我们将从代码层面深入分析 vLLM 的执行流程。首先从入口点开始,了解用户如何与 vLLM 交互,以及请求是如何被处理的。
1. vLLM 的两种使用方式
vLLM 提供两种主要的使用方式:
graph TD
subgraph 离线推理
LLM[LLM 类]
Gen[generate/encode/...]
end
subgraph 在线服务
Server[API Server]
Async[AsyncLLMEngine]
end
User1[用户代码] --> LLM
LLM --> Gen
User2[HTTP 请求] --> Server
Server --> Async
style LLM fill:#e1f5fe
style Server fill:#fff3e0| 方式 | 入口类 | 适用场景 |
|---|---|---|
| 离线推理 | LLM | 批量处理、脚本调用、研究实验 |
| 在线服务 | AsyncLLMEngine | Web 服务、API 接口、实时应用 |
2. LLM 类 - 离线推理入口
2.1 类定义与初始化
# vllm/entrypoints/llm.py
class LLM:
"""用于从给定提示和采样参数生成文本的 LLM。
这个类包含一个分词器、一个语言模型(可能分布在多个 GPU 上),
以及为中间状态(即 KV Cache)分配的 GPU 内存空间。
"""
def __init__(
self,
model: str, # 模型路径或名称
*,
tokenizer: str | None = None, # 分词器路径
tokenizer_mode: str = "auto", # 分词器模式
trust_remote_code: bool = False, # 信任远程代码
tensor_parallel_size: int = 1, # 张量并行大小
dtype: str = "auto", # 数据类型
quantization: str | None = None, # 量化方法
gpu_memory_utilization: float = 0.9, # GPU 内存利用率
enable_prefix_caching: bool = False, # 启用前缀缓存
**kwargs, # 更多配置参数
):
# 1. 构建配置
engine_args = EngineArgs(
model=model,
tokenizer=tokenizer,
tensor_parallel_size=tensor_parallel_size,
dtype=dtype,
quantization=quantization,
gpu_memory_utilization=gpu_memory_utilization,
enable_prefix_caching=enable_prefix_caching,
**kwargs,
)
# 2. 创建 LLMEngine
self.llm_engine = LLMEngine.from_engine_args(engine_args)
# 3. 获取分词器
self.tokenizer = self.llm_engine.get_tokenizer()
# 4. 请求计数器
self.request_counter = Counter()
2.2 初始化流程
sequenceDiagram
participant User as 用户
participant LLM as LLM
participant Args as EngineArgs
participant Engine as LLMEngine
participant Core as EngineCore
participant Exec as Executor
User->>LLM: LLM(model, ...)
LLM->>Args: 构建 EngineArgs
LLM->>Engine: from_engine_args()
Engine->>Core: 创建 EngineCore
Core->>Exec: 创建 Executor
Exec->>Exec: 加载模型
Exec->>Exec: 分配 KV Cache
Core->>Core: 创建 Scheduler
Engine-->>LLM: engine 实例
LLM->>LLM: 获取 tokenizer
LLM-->>User: llm 实例2.3 generate() - 文本生成
# vllm/entrypoints/llm.py
def generate(
self,
prompts: PromptType | list[PromptType] | None = None,
sampling_params: SamplingParams | list[SamplingParams] | None = None,
prompt_token_ids: list[list[int]] | None = None,
use_tqdm: bool = True,
lora_request: LoRARequest | list[LoRARequest] | None = None,
) -> list[RequestOutput]:
"""生成文本的主方法。
Args:
prompts: 输入提示(字符串或多模态数据)
sampling_params: 采样参数(温度、top_k、top_p 等)
prompt_token_ids: 直接提供 token IDs
use_tqdm: 是否显示进度条
lora_request: LoRA 适配器请求
Returns:
RequestOutput 列表,包含生成的文本
"""
# 1. 参数验证和标准化
if prompts is None and prompt_token_ids is None:
raise ValueError("prompts or prompt_token_ids must be provided")
# 2. 处理采样参数
if sampling_params is None:
sampling_params = SamplingParams()
# 3. 添加请求到引擎
for i, (prompt, params) in enumerate(zip(prompts, sampling_params_list)):
request_id = str(next(self.request_counter))
self._add_request(
request_id=request_id,
prompt=prompt,
params=params,
lora_request=lora_request,
)
# 4. 运行引擎直到完成
return self._run_engine(use_tqdm=use_tqdm)
2.4 _run_engine() - 运行引擎
def _run_engine(self, use_tqdm: bool = True) -> list[RequestOutput]:
"""运行引擎直到所有请求完成"""
outputs: list[RequestOutput] = []
# 使用 tqdm 显示进度(可选)
pbar = tqdm(total=num_requests, disable=not use_tqdm)
while self.llm_engine.has_unfinished_requests():
# 执行一步
step_outputs = self.llm_engine.step()
for output in step_outputs:
if output.finished:
outputs.append(output)
pbar.update(1)
pbar.close()
# 按请求 ID 排序返回
return sorted(outputs, key=lambda x: int(x.request_id))
3. LLMEngine - 引擎核心
3.1 类结构
# vllm/v1/engine/llm_engine.py
class LLMEngine:
"""vLLM 的同步推理引擎"""
def __init__(self, vllm_config: VllmConfig, ...):
# 核心组件
self.engine_core: EngineCore # 内部循环
self.tokenizer: TokenizerLike # 分词器
self.input_processor # 输入处理器
@classmethod
def from_engine_args(cls, engine_args: EngineArgs) -> "LLMEngine":
"""从引擎参数创建实例(工厂方法)"""
vllm_config = engine_args.create_engine_config()
return cls(vllm_config, ...)
def add_request(
self,
request_id: str,
prompt: PromptType,
params: SamplingParams,
) -> None:
"""添加请求到引擎"""
...
def step(self) -> list[RequestOutput]:
"""执行一步推理"""
...
3.2 关键方法流程
graph TD
subgraph add_request
A1[接收请求] --> A2[处理输入]
A2 --> A3[tokenize]
A3 --> A4[创建 EngineCoreRequest]
A4 --> A5[发送到 EngineCore]
end
subgraph step
S1[调用 EngineCore.step] --> S2[获取输出]
S2 --> S3[detokenize]
S3 --> S4[构建 RequestOutput]
S4 --> S5[返回结果]
end4. EngineCore - 内部循环
4.1 类定义
# vllm/v1/engine/core.py
class EngineCore:
"""vLLM 引擎的内部循环"""
def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
...
):
# 1. 加载插件
from vllm.plugins import load_general_plugins
load_general_plugins()
# 2. 创建模型执行器
self.model_executor = executor_class(vllm_config)
# 3. 初始化 KV Cache
num_gpu_blocks, num_cpu_blocks, kv_cache_config = (
self._initialize_kv_caches(vllm_config)
)
vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
# 4. 初始化 Worker 端的 KV Cache
self.collective_rpc("initialize_cache", args=(num_gpu_blocks, num_cpu_blocks))
# 5. 创建调度器
Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
self.scheduler = Scheduler(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
...
)
4.2 step() - 核心执行循环
def step(self) -> EngineCoreOutputs:
"""执行一步推理"""
# 1. 调度:决定这一步处理哪些请求
scheduler_output = self.scheduler.schedule()
if scheduler_output.is_empty():
return EngineCoreOutputs([])
# 2. 执行模型
model_output = self.model_executor.execute_model(scheduler_output)
# 3. 采样
if not self.is_pooling_model:
sampled_tokens = self.model_executor.sample_tokens(model_output)
else:
sampled_tokens = None
# 4. 更新调度器状态
outputs = self.scheduler.update_from_output(
model_output,
sampled_tokens,
scheduler_output,
)
return outputs
4.3 执行流程图
sequenceDiagram
participant Core as EngineCore
participant Sched as Scheduler
participant KVM as KVCacheManager
participant Exec as Executor
participant GPU as GPU Worker
loop 每个 step
Core->>Sched: schedule()
rect rgb(230, 245, 230)
Note over Sched,KVM: 调度决策
Sched->>KVM: get_computed_blocks()
Sched->>KVM: allocate_slots()
Sched-->>Core: SchedulerOutput
end
rect rgb(245, 230, 230)
Note over Core,GPU: 模型执行
Core->>Exec: execute_model()
Exec->>GPU: 前向传播
GPU-->>Exec: logits
Exec-->>Core: ModelOutput
end
rect rgb(230, 230, 245)
Note over Core: 采样和更新
Core->>Exec: sample_tokens()
Exec-->>Core: sampled_tokens
Core->>Sched: update_from_output()
Sched-->>Core: EngineCoreOutputs
end
end5. API Server - 在线服务入口
5.1 启动命令
# 启动 OpenAI 兼容的 API 服务
vllm serve meta-llama/Llama-2-7b --port 8000
# 或使用 Python
python -m vllm.entrypoints.openai.api_server --model meta-llama/Llama-2-7b
5.2 服务架构
graph TD
subgraph API Server
FastAPI[FastAPI 应用]
Router[路由器]
Middleware[中间件]
end
subgraph Endpoints
Chat[/v1/chat/completions]
Completions[/v1/completions]
Embeddings[/v1/embeddings]
Models[/v1/models]
end
subgraph Engine
AsyncEngine[AsyncLLMEngine]
end
Client[HTTP 客户端] --> FastAPI
FastAPI --> Router
Router --> Chat
Router --> Completions
Router --> Embeddings
Router --> Models
Chat --> AsyncEngine
Completions --> AsyncEngine
Embeddings --> AsyncEngine5.3 请求处理流程
sequenceDiagram
participant Client as HTTP 客户端
participant API as API Server
participant Async as AsyncLLMEngine
participant Core as EngineCore
Client->>API: POST /v1/chat/completions
API->>API: 验证请求
API->>API: 解析聊天消息
API->>API: 应用聊天模板
API->>Async: add_request()
Async->>Core: 添加到调度队列
loop 异步生成
Core->>Core: step()
Core-->>Async: 部分输出
alt 流式响应
Async-->>API: yield 部分结果
API-->>Client: SSE 事件
end
end
Core-->>Async: 完成输出
Async-->>API: 最终结果
API-->>Client: HTTP 响应6. 请求数据结构
6.1 EngineCoreRequest
# vllm/v1/engine/__init__.py
@dataclass
class EngineCoreRequest:
"""从 LLMEngine 发送到 EngineCore 的请求"""
request_id: str # 唯一标识
prompt_token_ids: list[int] # prompt 的 token IDs
mm_inputs: list | None # 多模态输入
mm_hashes: list | None # 多模态内容的 hash
mm_positions: list | None # 多模态位置信息
sampling_params: SamplingParams # 采样参数
eos_token_id: int | None # 结束 token ID
arrival_time: float # 到达时间
lora_request: LoRARequest | None # LoRA 请求
6.2 Request(调度器内部)
# vllm/v1/request.py
class Request:
"""调度器内部的请求表示"""
def __init__(self, ...):
self.request_id: str
self.prompt_token_ids: list[int]
self.sampling_params: SamplingParams
# 状态跟踪
self.status: RequestStatus
self.num_computed_tokens: int
self._output_token_ids: list[int]
# 内存管理相关
self.block_hashes: list[BlockHash]
@property
def num_tokens(self) -> int:
"""当前总 token 数"""
return len(self.prompt_token_ids) + len(self._output_token_ids)
@property
def num_output_tokens(self) -> int:
"""输出 token 数"""
return len(self._output_token_ids)
6.3 请求状态机
stateDiagram-v2
[*] --> WAITING: add_request()
WAITING --> RUNNING: 调度成功
WAITING --> WAITING_FOR_FSM: 需要 FSM 编译
WAITING --> WAITING_FOR_REMOTE_KVS: 等待远程 KV
WAITING_FOR_FSM --> WAITING: FSM 就绪
WAITING_FOR_REMOTE_KVS --> WAITING: KV 就绪
RUNNING --> FINISHED_STOPPED: 达到停止条件
RUNNING --> FINISHED_LENGTH: 达到最大长度
RUNNING --> FINISHED_ABORTED: 被中止
RUNNING --> PREEMPTED: 被抢占
PREEMPTED --> WAITING: 重新排队
FINISHED_STOPPED --> [*]
FINISHED_LENGTH --> [*]
FINISHED_ABORTED --> [*]7. 配置系统
7.1 EngineArgs
# vllm/engine/arg_utils.py
@dataclass
class EngineArgs:
"""引擎配置参数"""
# 模型配置
model: str
tokenizer: str | None = None
revision: str | None = None
dtype: str = "auto"
quantization: str | None = None
# 并行配置
tensor_parallel_size: int = 1
pipeline_parallel_size: int = 1
# 内存配置
gpu_memory_utilization: float = 0.9
max_model_len: int | None = None
block_size: int = 16
# 调度配置
max_num_seqs: int = 256
max_num_batched_tokens: int = 2048
# 功能开关
enable_prefix_caching: bool = False
enable_chunked_prefill: bool = False
7.2 VllmConfig
# vllm/config.py
@dataclass
class VllmConfig:
"""vLLM 的完整配置"""
model_config: ModelConfig # 模型配置
cache_config: CacheConfig # 缓存配置
parallel_config: ParallelConfig # 并行配置
scheduler_config: SchedulerConfig # 调度配置
device_config: DeviceConfig # 设备配置
load_config: LoadConfig # 加载配置
lora_config: LoRAConfig | None # LoRA 配置
speculative_config: SpeculativeConfig | None # 投机解码配置
8. 代码位置速查
| 组件 | 文件 | 关键类/函数 |
|---|---|---|
| LLM 入口 | vllm/entrypoints/llm.py | LLM 类 |
| LLMEngine | vllm/v1/engine/llm_engine.py | LLMEngine 类 |
| EngineCore | vllm/v1/engine/core.py | EngineCore 类 |
| API Server | vllm/entrypoints/openai/api_server.py | main() |
| 配置参数 | vllm/engine/arg_utils.py | EngineArgs |
| 请求类 | vllm/v1/request.py | Request 类 |
| 请求状态 | vllm/v1/request.py | RequestStatus 枚举 |
9. 小结
本章我们了解了 vLLM 的入口点和请求处理流程:
两种使用方式:
LLM类用于离线批量推理- API Server 用于在线服务
核心组件层次:
LLM→LLMEngine→EngineCore→Scheduler+Executor
请求生命周期:
- 用户提交 → tokenize → 调度 → 执行 → 采样 → 返回
配置系统:
EngineArgs→VllmConfig→ 各子配置
在下一章中,我们将深入 Executor 和 Worker 的实现,了解模型是如何在 GPU 上执行的。
导航
- 上一篇:连续批处理机制
- 下一篇:Executor 与 Worker
- 返回目录
2 - 执行器与 Worker
Executor 与 Worker 详解
在上一章中,我们了解了 vLLM 的入口点和请求处理流程。本章我们将深入 Executor 和 Worker 层,了解模型是如何在 GPU 上执行的。
1. Executor 与 Worker 的关系
graph TD
subgraph EngineCore
Core[EngineCore]
Sched[Scheduler]
end
subgraph Executor 层
Exec[Executor]
end
subgraph Worker 层
W0[Worker 0<br/>GPU 0]
W1[Worker 1<br/>GPU 1]
W2[Worker 2<br/>GPU 2]
WN[Worker N<br/>GPU N]
end
Core --> Exec
Core <--> Sched
Exec --> W0
Exec --> W1
Exec --> W2
Exec --> WN
style Exec fill:#e1f5fe
style W0 fill:#fff3e0
style W1 fill:#fff3e0
style W2 fill:#fff3e0
style WN fill:#fff3e0职责划分:
| 组件 | 职责 |
|---|---|
| Executor | 管理多个 Worker,协调分布式执行 |
| Worker | 在单个 GPU 上执行模型推理 |
2. Executor 抽象基类
2.1 类定义
# vllm/v1/executor/abstract.py
class Executor(ABC):
"""vLLM Executor 的抽象基类
Executor 负责在一个或多个设备上执行模型。
"""
uses_ray: bool = False # 是否使用 Ray
supports_pp: bool = False # 是否支持流水线并行
def __init__(self, vllm_config: VllmConfig) -> None:
self.vllm_config = vllm_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.parallel_config = vllm_config.parallel_config
...
self._init_executor() # 子类实现
@abstractmethod
def _init_executor(self) -> None:
"""初始化 Executor(由子类实现)"""
raise NotImplementedError
@abstractmethod
def collective_rpc(
self,
method: str | Callable,
args: tuple = (),
kwargs: dict | None = None,
) -> list:
"""在所有 Worker 上执行 RPC 调用"""
raise NotImplementedError
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput | None:
"""执行模型推理"""
return self.collective_rpc(
"execute_model",
args=(scheduler_output,),
)[0]
2.2 Executor 工厂方法
@staticmethod
def get_class(vllm_config: VllmConfig) -> type["Executor"]:
"""根据配置获取合适的 Executor 类"""
distributed_executor_backend = vllm_config.parallel_config.distributed_executor_backend
if distributed_executor_backend == "ray":
from vllm.v1.executor.ray_executor import RayDistributedExecutor
return RayDistributedExecutor
elif distributed_executor_backend == "mp":
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
return MultiprocExecutor
elif distributed_executor_backend == "uni":
from vllm.v1.executor.uniproc_executor import UniProcExecutor
return UniProcExecutor
else:
raise ValueError(f"Unknown executor backend: {distributed_executor_backend}")
3. Executor 实现类型
3.1 类型对比
graph TD
Executor[Executor 抽象基类]
Executor --> Uni[UniProcExecutor<br/>单进程单 GPU]
Executor --> MP[MultiprocExecutor<br/>多进程多 GPU]
Executor --> Ray[RayDistributedExecutor<br/>Ray 分布式]
Uni -.-> Single[单 GPU 场景]
MP -.-> Multi[单机多卡场景]
Ray -.-> Distributed[多机分布式场景]
style Uni fill:#c8e6c9
style MP fill:#fff3e0
style Ray fill:#e1f5fe3.2 UniProcExecutor - 单进程
# vllm/v1/executor/uniproc_executor.py
class UniProcExecutor(Executor):
"""单进程 Executor,用于单 GPU 场景"""
def _init_executor(self) -> None:
# 直接在当前进程创建 Worker
self.driver_worker = Worker(
vllm_config=self.vllm_config,
local_rank=0,
rank=0,
distributed_init_method="",
is_driver_worker=True,
)
self.driver_worker.init_device()
self.driver_worker.load_model()
def collective_rpc(self, method, args=(), kwargs=None):
"""直接调用 Worker 方法"""
if isinstance(method, str):
func = getattr(self.driver_worker, method)
else:
func = lambda: method(self.driver_worker)
return [func(*args, **(kwargs or {}))]
3.3 MultiprocExecutor - 多进程
# vllm/v1/executor/multiproc_executor.py
class MultiprocExecutor(Executor):
"""多进程 Executor,用于单机多卡场景"""
supports_pp = True
def _init_executor(self) -> None:
# 创建多个 Worker 进程
self.workers = []
for rank in range(self.parallel_config.world_size):
worker = self._create_worker(rank)
self.workers.append(worker)
def collective_rpc(self, method, args=(), kwargs=None):
"""并行调用所有 Worker"""
futures = []
for worker in self.workers:
future = worker.execute_method(method, args, kwargs)
futures.append(future)
# 等待所有 Worker 完成
results = [f.result() for f in futures]
return results
3.4 RayDistributedExecutor - Ray 分布式
# vllm/v1/executor/ray_executor.py
class RayDistributedExecutor(Executor):
"""Ray 分布式 Executor,用于多机场景"""
uses_ray = True
supports_pp = True
def _init_executor(self) -> None:
import ray
# 创建 Ray Actor(远程 Worker)
self.workers = []
for rank in range(self.parallel_config.world_size):
worker_actor = ray.remote(Worker).remote(
vllm_config=self.vllm_config,
rank=rank,
...
)
self.workers.append(worker_actor)
def collective_rpc(self, method, args=(), kwargs=None):
"""通过 Ray 调用远程 Worker"""
import ray
refs = []
for worker in self.workers:
ref = getattr(worker, method).remote(*args, **(kwargs or {}))
refs.append(ref)
# 异步获取结果
results = ray.get(refs)
return results
4. Worker 详解
4.1 Worker 基类
# vllm/v1/worker/worker_base.py
class WorkerBase(ABC):
"""Worker 抽象基类"""
def __init__(
self,
vllm_config: VllmConfig,
local_rank: int, # 本地 GPU 序号
rank: int, # 全局 Worker 序号
distributed_init_method: str,
is_driver_worker: bool = False,
):
self.vllm_config = vllm_config
self.local_rank = local_rank
self.rank = rank
self.is_driver_worker = is_driver_worker
@abstractmethod
def init_device(self) -> None:
"""初始化设备(GPU)"""
raise NotImplementedError
@abstractmethod
def load_model(self) -> None:
"""加载模型"""
raise NotImplementedError
@abstractmethod
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
"""执行模型推理"""
raise NotImplementedError
4.2 GPU Worker
# vllm/v1/worker/gpu_worker.py
class Worker(WorkerBase):
"""GPU Worker 实现"""
def __init__(
self,
vllm_config: VllmConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
is_driver_worker: bool = False,
):
super().__init__(...)
# 配置 float32 精度
precision = envs.VLLM_FLOAT32_MATMUL_PRECISION
torch.set_float32_matmul_precision(precision)
# Profiler(可选)
self.profiler = self._setup_profiler()
def init_device(self):
"""初始化 GPU 设备"""
# 设置 CUDA 设备
torch.cuda.set_device(self.local_rank)
self.device = torch.device(f"cuda:{self.local_rank}")
# 初始化分布式环境
init_distributed_environment(
world_size=self.parallel_config.world_size,
rank=self.rank,
distributed_init_method=self.distributed_init_method,
backend="nccl",
)
# 初始化模型并行
ensure_model_parallel_initialized(
tensor_model_parallel_size=self.parallel_config.tensor_parallel_size,
pipeline_model_parallel_size=self.parallel_config.pipeline_parallel_size,
)
def load_model(self):
"""加载模型到 GPU"""
with self._maybe_get_memory_pool_context("weights"):
# 创建 ModelRunner
self.model_runner = GPUModelRunner(
vllm_config=self.vllm_config,
device=self.device,
)
# 加载模型权重
self.model_runner.load_model()
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
"""执行模型推理"""
return self.model_runner.execute_model(scheduler_output)
4.3 Worker 初始化流程
sequenceDiagram
participant Exec as Executor
participant Worker as Worker
participant Device as CUDA Device
participant Model as ModelRunner
Exec->>Worker: 创建 Worker
Worker->>Worker: __init__()
Exec->>Worker: init_device()
Worker->>Device: torch.cuda.set_device()
Worker->>Device: init_distributed_environment()
Worker->>Device: ensure_model_parallel_initialized()
Note over Device: NCCL 初始化完成
Exec->>Worker: load_model()
Worker->>Model: 创建 GPUModelRunner
Model->>Model: load_model()
Note over Model: 模型权重加载到 GPU
Exec->>Worker: initialize_cache()
Worker->>Worker: 分配 KV Cache 内存5. ModelRunner 详解
5.1 GPUModelRunner 类
# vllm/v1/worker/gpu_model_runner.py
class GPUModelRunner:
"""GPU 模型执行器"""
def __init__(
self,
vllm_config: VllmConfig,
device: torch.device,
):
self.vllm_config = vllm_config
self.device = device
self.model: nn.Module | None = None
# KV Cache 配置
self.kv_caches: list[torch.Tensor] = []
self.block_size = vllm_config.cache_config.block_size
# 输入处理
self.input_batch = GPUInputBatch(...)
def load_model(self):
"""加载模型"""
from vllm.model_executor.model_loader import get_model
self.model = get_model(
model_config=self.model_config,
load_config=self.load_config,
device_config=self.device_config,
parallel_config=self.parallel_config,
scheduler_config=self.scheduler_config,
)
def execute_model(
self,
scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput:
"""执行模型前向传播"""
# 1. 准备输入
model_input = self._prepare_inputs(scheduler_output)
# 2. 执行前向传播
with torch.inference_mode():
hidden_states = self.model(
input_ids=model_input.input_ids,
positions=model_input.positions,
kv_caches=self.kv_caches,
attn_metadata=model_input.attn_metadata,
)
# 3. 计算 logits
logits = self.model.compute_logits(hidden_states)
# 4. 返回输出
return ModelRunnerOutput(
logits=logits,
...
)
5.2 execute_model 流程
flowchart TD
A[execute_model 开始] --> B[_prepare_inputs]
subgraph 输入准备
B --> B1[处理 token IDs]
B1 --> B2[计算位置编码]
B2 --> B3[构建 attention metadata]
B3 --> B4[更新 block table]
end
B4 --> C[model forward]
subgraph 前向传播
C --> C1[Embedding]
C1 --> C2[Transformer Layers]
C2 --> C3[每层: Attention + FFN]
C3 --> C4[LM Head]
end
C4 --> D[compute_logits]
D --> E[返回 ModelRunnerOutput]6. KV Cache 管理
6.1 Worker 端的 KV Cache
# vllm/v1/worker/gpu_worker.py
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
"""初始化 KV Cache"""
self.cache_config.num_gpu_blocks = num_gpu_blocks
self.cache_config.num_cpu_blocks = num_cpu_blocks
def initialize_from_config(self, kv_cache_configs: list[KVCacheConfig]) -> None:
"""根据配置初始化 KV Cache"""
self.model_runner.initialize_kv_cache(kv_cache_configs)
6.2 KV Cache Tensor 布局
# KV Cache 的形状
# [num_blocks, 2, num_heads, block_size, head_dim]
# ↑ ↑ ↑ ↑ ↑
# 块数量 K和V 注意力头数 块大小 头维度
# 示例:16 块,8 头,16 token/块,128 维
kv_cache_shape = (16, 2, 8, 16, 128)
kv_cache = torch.empty(kv_cache_shape, dtype=torch.float16, device="cuda")
6.3 Block Table 使用
# vllm/v1/worker/block_table.py
class BlockTable:
"""管理每个序列的 block 映射"""
def __init__(self, max_num_seqs: int, max_num_blocks_per_seq: int):
# [max_num_seqs, max_num_blocks_per_seq]
self.block_table = torch.zeros(
(max_num_seqs, max_num_blocks_per_seq),
dtype=torch.int32,
device="cuda",
)
def update(self, req_index: int, block_ids: list[int]):
"""更新请求的 block 映射"""
num_blocks = len(block_ids)
self.block_table[req_index, :num_blocks] = torch.tensor(
block_ids, dtype=torch.int32
)
7. 分布式执行
7.1 张量并行(Tensor Parallelism)
graph LR
subgraph GPU 0
A0[输入] --> L0[Linear 分片 0]
L0 --> R0[部分结果]
end
subgraph GPU 1
A1[输入] --> L1[Linear 分片 1]
L1 --> R1[部分结果]
end
R0 --> AllReduce
R1 --> AllReduce
AllReduce --> Output[完整输出]7.2 流水线并行(Pipeline Parallelism)
graph LR
subgraph Stage 0 - GPU 0
L0_6[Layers 0-5]
end
subgraph Stage 1 - GPU 1
L6_12[Layers 6-11]
end
subgraph Stage 2 - GPU 2
L12_18[Layers 12-17]
end
subgraph Stage 3 - GPU 3
L18_24[Layers 18-23]
end
Input --> L0_6
L0_6 -->|激活值| L6_12
L6_12 -->|激活值| L12_18
L12_18 -->|激活值| L18_24
L18_24 --> Output7.3 collective_rpc 通信
sequenceDiagram
participant Exec as Executor
participant W0 as Worker 0
participant W1 as Worker 1
participant W2 as Worker 2
Exec->>W0: execute_model(scheduler_output)
Exec->>W1: execute_model(scheduler_output)
Exec->>W2: execute_model(scheduler_output)
Note over W0,W2: 并行执行前向传播
par TP AllReduce
W0->>W1: 交换中间结果
W1->>W2: 交换中间结果
W2->>W0: 交换中间结果
end
W0-->>Exec: result_0
W1-->>Exec: result_1
W2-->>Exec: result_2
Note over Exec: 只使用 driver worker 的结果8. 性能优化
8.1 CUDA Graph
# vllm/v1/worker/gpu_model_runner.py
def _capture_cuda_graph(self, ...):
"""捕获 CUDA Graph 以减少启动开销"""
# 预热
for _ in range(3):
self.model(...)
# 捕获
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph):
output = self.model(...)
return graph, output
def execute_model_with_cuda_graph(self, ...):
"""使用 CUDA Graph 执行"""
# 重放预捕获的计算图
self.cuda_graph.replay()
return self.graph_output
8.2 内存优化
# Worker 的 sleep/wake_up 机制
def sleep(self, level: int = 1) -> None:
"""释放内存进入睡眠模式"""
from vllm.device_allocator.cumem import CuMemAllocator
allocator = CuMemAllocator.get_instance()
if level == 2:
# 保存 buffer 到 CPU
self._sleep_saved_buffers = {
name: buffer.cpu().clone()
for name, buffer in self.model.named_buffers()
}
# 释放 GPU 内存
allocator.sleep(offload_tags=("weights",) if level == 1 else tuple())
def wake_up(self, tags: list[str] | None = None) -> None:
"""从睡眠模式唤醒"""
allocator = CuMemAllocator.get_instance()
allocator.wake_up(tags)
# 恢复 buffer
if self._sleep_saved_buffers:
for name, buffer in self.model.named_buffers():
buffer.data.copy_(self._sleep_saved_buffers[name].data)
9. 代码位置速查
| 组件 | 文件 | 关键类/函数 |
|---|---|---|
| Executor 基类 | vllm/v1/executor/abstract.py | Executor |
| 单进程 Executor | vllm/v1/executor/uniproc_executor.py | UniProcExecutor |
| 多进程 Executor | vllm/v1/executor/multiproc_executor.py | MultiprocExecutor |
| Ray Executor | vllm/v1/executor/ray_executor.py | RayDistributedExecutor |
| Worker 基类 | vllm/v1/worker/worker_base.py | WorkerBase |
| GPU Worker | vllm/v1/worker/gpu_worker.py | Worker |
| GPU ModelRunner | vllm/v1/worker/gpu_model_runner.py | GPUModelRunner |
| Block Table | vllm/v1/worker/block_table.py | BlockTable |
10. 小结
本章我们深入了解了 Executor 和 Worker 层:
Executor 类型:
UniProcExecutor:单进程单 GPUMultiprocExecutor:单机多卡RayDistributedExecutor:多机分布式
Worker 职责:
- 设备初始化
- 模型加载
- 模型执行
- KV Cache 管理
ModelRunner:
- 输入准备
- 前向传播
- logits 计算
分布式执行:
- 张量并行:切分权重矩阵
- 流水线并行:切分模型层
- collective_rpc:跨 Worker 通信
在下一章中,我们将深入模型前向传播的具体实现。
导航
3 - 模型前向传播
模型前向传播详解
在前面的章节中,我们了解了 vLLM 的架构和调度机制。本章将深入模型的前向传播过程,以 Llama 模型为例,详细分析从输入到输出的完整计算流程。
1. 模型架构概览
1.1 Llama 模型结构
graph TD
subgraph LlamaForCausalLM
Input[input_ids] --> Embed[Embedding]
Embed --> Layers[Transformer Layers × N]
Layers --> Norm[RMSNorm]
Norm --> LMHead[LM Head]
LMHead --> Logits[logits]
end
subgraph "Transformer Layer"
H[hidden_states] --> LN1[RMSNorm]
LN1 --> Attn[Self-Attention]
Attn --> Add1[+]
H --> Add1
Add1 --> LN2[RMSNorm]
LN2 --> MLP[MLP]
MLP --> Add2[+]
Add1 --> Add2
Add2 --> Out[output]
end1.2 核心组件对应关系
| 组件 | vLLM 类 | 功能 |
|---|---|---|
| Embedding | VocabParallelEmbedding | token 到向量的映射 |
| Transformer Layer | LlamaDecoderLayer | 主要计算单元 |
| Self-Attention | LlamaAttention | 注意力计算 |
| MLP | LlamaMLP | 前馈网络 |
| LayerNorm | RMSNorm | 归一化 |
| LM Head | ParallelLMHead | 输出词表概率 |
2. vLLM 中的 Llama 实现
2.1 LlamaForCausalLM 类
# vllm/model_executor/models/llama.py
class LlamaForCausalLM(nn.Module, SupportsLoRA, SupportsPP, SupportsEagle):
"""用于因果语言建模的 Llama 模型"""
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
super().__init__()
config = vllm_config.model_config.hf_config
quant_config = vllm_config.quant_config
# 主模型(Transformer 层)
self.model = LlamaModel(vllm_config=vllm_config, prefix=maybe_prefix(prefix, "model"))
# 输出层(LM Head)
if config.tie_word_embeddings:
self.lm_head = self.model.embed_tokens # 权重共享
else:
self.lm_head = ParallelLMHead(
config.vocab_size,
config.hidden_size,
quant_config=quant_config,
)
# Logits 处理器
self.logits_processor = LogitsProcessor(config.vocab_size)
# 采样器
self.sampler = get_sampler()
def forward(
self,
input_ids: torch.Tensor,
positions: torch.Tensor,
kv_caches: list[torch.Tensor],
attn_metadata: AttentionMetadata,
intermediate_tensors: IntermediateTensors | None = None,
) -> torch.Tensor | IntermediateTensors:
"""模型前向传播"""
# 1. 通过 Transformer 层
hidden_states = self.model(input_ids, positions, intermediate_tensors)
return hidden_states
def compute_logits(
self,
hidden_states: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> torch.Tensor:
"""计算 logits"""
logits = self.logits_processor(
self.lm_head,
hidden_states,
sampling_metadata,
)
return logits
def sample(
self,
logits: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
"""采样下一个 token"""
next_tokens = self.sampler(logits, sampling_metadata)
return next_tokens
2.2 LlamaModel 类
# vllm/model_executor/models/llama.py
@support_torch_compile(shape_invariants=llama_model_invariants)
class LlamaModel(nn.Module):
"""Llama 的 Transformer 模型"""
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
super().__init__()
config = vllm_config.model_config.hf_config
# Embedding 层
self.embed_tokens = VocabParallelEmbedding(
config.vocab_size,
config.hidden_size,
)
# Transformer 层
self.start_layer, self.end_layer, self.layers = make_layers(
config.num_hidden_layers,
lambda prefix: LlamaDecoderLayer(vllm_config=vllm_config, prefix=prefix),
prefix=f"{prefix}.layers",
)
# 最终归一化
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
def forward(
self,
input_ids: torch.Tensor | None,
positions: torch.Tensor,
intermediate_tensors: IntermediateTensors | None,
inputs_embeds: torch.Tensor | None = None,
) -> torch.Tensor | IntermediateTensors:
# 1. Embedding
if get_pp_group().is_first_rank:
if inputs_embeds is not None:
hidden_states = inputs_embeds
else:
hidden_states = self.embed_tokens(input_ids)
residual = None
else:
# 流水线并行:从前一阶段获取中间结果
hidden_states = intermediate_tensors["hidden_states"]
residual = intermediate_tensors["residual"]
# 2. Transformer 层
for layer in self.layers[self.start_layer:self.end_layer]:
hidden_states, residual = layer(
positions,
hidden_states,
residual,
)
# 3. 最终归一化(仅最后一个 PP 阶段)
if not get_pp_group().is_last_rank:
return IntermediateTensors({
"hidden_states": hidden_states,
"residual": residual
})
hidden_states, _ = self.norm(hidden_states, residual)
return hidden_states
3. Transformer 层详解
3.1 LlamaDecoderLayer
# vllm/model_executor/models/llama.py
class LlamaDecoderLayer(nn.Module):
"""单个 Transformer 解码器层"""
def __init__(self, vllm_config: VllmConfig, prefix: str = ""):
super().__init__()
config = vllm_config.model_config.hf_config
# Self-Attention
self.self_attn = LlamaAttention(
config=config,
hidden_size=config.hidden_size,
num_heads=config.num_attention_heads,
num_kv_heads=config.num_key_value_heads,
cache_config=vllm_config.cache_config,
prefix=f"{prefix}.self_attn",
)
# MLP
self.mlp = LlamaMLP(
hidden_size=config.hidden_size,
intermediate_size=config.intermediate_size,
hidden_act=config.hidden_act,
prefix=f"{prefix}.mlp",
)
# 归一化层
self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.post_attention_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
def forward(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
residual: torch.Tensor | None,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
层前向传播
使用 Pre-LN 结构(LayerNorm 在子层之前)
"""
# 1. 第一个归一化 + 残差处理
if residual is None:
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
else:
hidden_states, residual = self.input_layernorm(hidden_states, residual)
# 2. Self-Attention
hidden_states = self.self_attn(
positions=positions,
hidden_states=hidden_states,
)
# 3. 第二个归一化 + 残差处理
hidden_states, residual = self.post_attention_layernorm(hidden_states, residual)
# 4. MLP
hidden_states = self.mlp(hidden_states)
return hidden_states, residual
3.2 前向传播数据流
flowchart TD
subgraph "LlamaDecoderLayer Forward"
Input["输入<br/>(hidden_states, residual)"]
Input --> Check{residual is None?}
Check -->|是| Init["residual = hidden_states"]
Check -->|否| Fuse1["hidden_states, residual =<br/>input_layernorm(hidden_states, residual)"]
Init --> LN1["hidden_states = input_layernorm(hidden_states)"]
LN1 --> Attn["hidden_states = self_attn(positions, hidden_states)"]
Fuse1 --> Attn
Attn --> Fuse2["hidden_states, residual =<br/>post_attention_layernorm(hidden_states, residual)"]
Fuse2 --> MLP["hidden_states = mlp(hidden_states)"]
MLP --> Output["输出<br/>(hidden_states, residual)"]
end
style Attn fill:#e1f5fe
style MLP fill:#fff3e04. Self-Attention 详解
4.1 LlamaAttention 类
# vllm/model_executor/models/llama.py
class LlamaAttention(nn.Module):
"""Llama 的多头注意力层"""
def __init__(
self,
config: LlamaConfig,
hidden_size: int,
num_heads: int,
num_kv_heads: int,
cache_config: CacheConfig | None = None,
prefix: str = "",
):
super().__init__()
# 张量并行配置
tp_size = get_tensor_model_parallel_world_size()
self.num_heads = num_heads // tp_size
self.num_kv_heads = max(1, num_kv_heads // tp_size)
self.head_dim = hidden_size // num_heads
self.scaling = self.head_dim ** -0.5
# Q、K、V 投影(合并为一个线性层)
self.qkv_proj = QKVParallelLinear(
hidden_size=hidden_size,
head_size=self.head_dim,
total_num_heads=num_heads,
total_num_kv_heads=num_kv_heads,
prefix=f"{prefix}.qkv_proj",
)
# 输出投影
self.o_proj = RowParallelLinear(
input_size=num_heads * self.head_dim,
output_size=hidden_size,
prefix=f"{prefix}.o_proj",
)
# 旋转位置编码
self.rotary_emb = get_rope(
self.head_dim,
max_position=config.max_position_embeddings,
)
# 注意力后端
self.attn = Attention(
self.num_heads,
self.head_dim,
self.scaling,
num_kv_heads=self.num_kv_heads,
cache_config=cache_config,
)
def forward(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
) -> torch.Tensor:
"""
注意力前向传播
Args:
positions: 位置 IDs [num_tokens]
hidden_states: 隐藏状态 [num_tokens, hidden_size]
Returns:
输出 [num_tokens, hidden_size]
"""
# 1. QKV 投影
qkv, _ = self.qkv_proj(hidden_states)
q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)
# 2. 旋转位置编码
q, k = self.rotary_emb(positions, q, k)
# 3. 注意力计算(使用 vLLM 的优化后端)
attn_output = self.attn(q, k, v)
# 4. 输出投影
output, _ = self.o_proj(attn_output)
return output
4.2 注意力计算流程
flowchart LR
subgraph "Self-Attention"
H["hidden_states<br/>[tokens, hidden]"]
QKV["QKV Projection"]
Split["Split"]
Q["Q"]
K["K"]
V["V"]
RoPE["RoPE"]
Attn["Attention<br/>(with KV Cache)"]
O["O Projection"]
Out["output"]
H --> QKV
QKV --> Split
Split --> Q
Split --> K
Split --> V
Q --> RoPE
K --> RoPE
RoPE --> Attn
V --> Attn
Attn --> O
O --> Out
end4.3 GQA (Grouped-Query Attention)
标准 MHA (Multi-Head Attention):
Q heads: [h1, h2, h3, h4, h5, h6, h7, h8]
K heads: [h1, h2, h3, h4, h5, h6, h7, h8]
V heads: [h1, h2, h3, h4, h5, h6, h7, h8]
GQA (num_kv_heads=2):
Q heads: [h1, h2, h3, h4, h5, h6, h7, h8]
K heads: [k1, k1, k1, k1, | k2, k2, k2, k2]
V heads: [v1, v1, v1, v1, | v2, v2, v2, v2]
每 4 个 Q head 共享 1 个 KV head,减少 KV Cache 内存
5. MLP 详解
5.1 LlamaMLP 类
# vllm/model_executor/models/llama.py
class LlamaMLP(nn.Module):
"""Llama 的 MLP(SwiGLU 激活)"""
def __init__(
self,
hidden_size: int,
intermediate_size: int,
hidden_act: str,
prefix: str = "",
):
super().__init__()
# gate 和 up 投影(合并)
self.gate_up_proj = MergedColumnParallelLinear(
input_size=hidden_size,
output_sizes=[intermediate_size] * 2, # gate 和 up 各一个
prefix=f"{prefix}.gate_up_proj",
)
# down 投影
self.down_proj = RowParallelLinear(
input_size=intermediate_size,
output_size=hidden_size,
prefix=f"{prefix}.down_proj",
)
# SiLU 激活 + 门控
self.act_fn = SiluAndMul()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
MLP 前向传播
SwiGLU: down(SiLU(gate(x)) * up(x))
"""
# 1. gate 和 up 投影(合并执行)
gate_up, _ = self.gate_up_proj(x)
# 2. SiLU 激活 + 门控相乘
x = self.act_fn(gate_up)
# 3. down 投影
x, _ = self.down_proj(x)
return x
5.2 SwiGLU 计算流程
flowchart LR
subgraph "MLP (SwiGLU)"
X["x<br/>[tokens, hidden]"]
GU["gate_up_proj"]
Gate["gate"]
Up["up"]
SiLU["SiLU(gate)"]
Mul["×"]
Down["down_proj"]
Out["output"]
X --> GU
GU --> Gate
GU --> Up
Gate --> SiLU
SiLU --> Mul
Up --> Mul
Mul --> Down
Down --> Out
end6. KV Cache 集成
6.1 Attention 层的 KV Cache 使用
# vllm/attention/layer.py
class Attention(nn.Module):
"""vLLM 的注意力层,集成 KV Cache"""
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
) -> torch.Tensor:
"""
Args:
query: [num_tokens, num_heads * head_dim]
key: [num_tokens, num_kv_heads * head_dim]
value: [num_tokens, num_kv_heads * head_dim]
内部会:
1. 将新的 K、V 写入 KV Cache
2. 从 KV Cache 读取完整的 K、V
3. 计算注意力
"""
return self.impl.forward(
query,
key,
value,
kv_cache=self.kv_cache,
attn_metadata=self.attn_metadata,
)
6.2 PagedAttention 后端
sequenceDiagram
participant Attn as Attention Layer
participant PA as PagedAttention
participant KVC as KV Cache
Attn->>PA: forward(Q, K, V)
rect rgb(230, 245, 230)
Note over PA,KVC: 写入新的 K、V
PA->>KVC: 根据 slot_mapping 写入
Note over KVC: K、V 存储到对应的 block
end
rect rgb(245, 230, 230)
Note over PA,KVC: 读取完整的 K、V
PA->>KVC: 根据 block_table 读取
Note over KVC: 返回所有历史 K、V
end
rect rgb(230, 230, 245)
Note over PA: 计算注意力
PA->>PA: attention = softmax(Q @ K^T / sqrt(d)) @ V
end
PA-->>Attn: attention output7. 张量并行实现
7.1 线性层的张量并行
# vllm/model_executor/layers/linear.py
class ColumnParallelLinear(nn.Module):
"""列并行线性层(输出切分)"""
def __init__(self, input_size: int, output_size: int, ...):
tp_size = get_tensor_model_parallel_world_size()
# 每个 GPU 只有 output_size / tp_size 的输出
self.output_size_per_partition = output_size // tp_size
self.weight = Parameter(torch.empty(
self.output_size_per_partition,
input_size,
))
class RowParallelLinear(nn.Module):
"""行并行线性层(输入切分)"""
def __init__(self, input_size: int, output_size: int, ...):
tp_size = get_tensor_model_parallel_world_size()
# 每个 GPU 只处理 input_size / tp_size 的输入
self.input_size_per_partition = input_size // tp_size
self.weight = Parameter(torch.empty(
output_size,
self.input_size_per_partition,
))
def forward(self, x: torch.Tensor) -> torch.Tensor:
output = F.linear(x, self.weight)
if self.reduce_results:
# AllReduce 收集所有 GPU 的结果
output = tensor_model_parallel_all_reduce(output)
return output
7.2 QKV 投影的张量并行
graph TD
subgraph GPU 0
Q0["Q[:, :dim/2]"]
K0["K[:, :dim/4]"]
V0["V[:, :dim/4]"]
end
subgraph GPU 1
Q1["Q[:, dim/2:]"]
K1["K[:, dim/4:]"]
V1["V[:, dim/4:]"]
end
Input["hidden_states"] --> GPU0
Input --> GPU1
Q0 --> Attn0["Attention 0"]
K0 --> Attn0
V0 --> Attn0
Q1 --> Attn1["Attention 1"]
K1 --> Attn1
V1 --> Attn1
Attn0 --> AllReduce
Attn1 --> AllReduce
AllReduce --> Output["output"]8. 完整前向传播流程
flowchart TD
subgraph "execute_model"
Input["SchedulerOutput"]
Prep["_prepare_inputs()"]
FWD["model.forward()"]
Logits["compute_logits()"]
Sample["sample()"]
Output["ModelRunnerOutput"]
Input --> Prep
Prep --> FWD
FWD --> Logits
Logits --> Sample
Sample --> Output
end
subgraph "model.forward()"
Embed["embed_tokens(input_ids)"]
Layers["for layer in layers:<br/> hidden, residual = layer(...)"]
Norm["norm(hidden, residual)"]
Embed --> Layers
Layers --> Norm
end
subgraph "layer.forward()"
LN1["input_layernorm"]
Attn["self_attn(Q,K,V + KV Cache)"]
LN2["post_attention_layernorm"]
MLP["mlp(SwiGLU)"]
LN1 --> Attn
Attn --> LN2
LN2 --> MLP
end
FWD --> Embed
Norm --> Logits9. 代码位置速查
| 组件 | 文件 | 关键类/函数 |
|---|---|---|
| Llama 模型 | vllm/model_executor/models/llama.py | LlamaForCausalLM |
| Transformer 层 | vllm/model_executor/models/llama.py | LlamaDecoderLayer |
| Self-Attention | vllm/model_executor/models/llama.py | LlamaAttention |
| MLP | vllm/model_executor/models/llama.py | LlamaMLP |
| Attention 后端 | vllm/attention/layer.py | Attention |
| 旋转位置编码 | vllm/model_executor/layers/rotary_embedding.py | get_rope() |
| 并行线性层 | vllm/model_executor/layers/linear.py | *ParallelLinear |
| RMSNorm | vllm/model_executor/layers/layernorm.py | RMSNorm |
10. 小结
本章我们深入了解了 vLLM 中模型前向传播的实现:
模型结构:
LlamaForCausalLM→LlamaModel→LlamaDecoderLayer- Pre-LN 结构,每层包含 Attention 和 MLP
Self-Attention:
- QKV 合并投影
- RoPE 旋转位置编码
- GQA 减少 KV Cache 内存
- PagedAttention 集成
MLP:
- SwiGLU 激活函数
- gate_up 合并投影
张量并行:
- ColumnParallelLinear:输出切分
- RowParallelLinear:输入切分 + AllReduce
KV Cache 集成:
- 自动写入和读取
- 通过 attn_metadata 控制
在下一章中,我们将深入分析采样过程,了解如何从 logits 生成下一个 token。
导航
- 上一篇:Executor 与 Worker
- 下一篇:采样过程分析
- 返回目录
4 - 采样过程
采样过程分析
在模型前向传播得到 logits 后,需要通过采样来生成下一个 token。采样过程不仅仅是简单地选择概率最高的 token,还涉及温度调节、top-k/top-p 过滤、惩罚机制等多种策略。本章将深入分析 vLLM 的采样实现。
1. 采样在推理流程中的位置
graph LR
subgraph 模型推理
Input[输入 tokens] --> Forward[前向传播]
Forward --> Logits[logits]
end
subgraph 采样
Logits --> Processor[Logits 处理]
Processor --> Sample[采样策略]
Sample --> Token[下一个 token]
end
Token --> |追加到序列| Input采样的主要步骤:
- Logits 处理:应用各种处理器修改原始 logits
- 温度调节:控制随机性
- Top-k/Top-p 过滤:限制候选 token 范围
- 实际采样:从处理后的分布中选择 token
2. SamplingParams - 采样参数
# vllm/sampling_params.py
class SamplingParams:
"""控制采样行为的参数"""
def __init__(
self,
# 基本参数
n: int = 1, # 每个 prompt 生成的序列数
best_of: int | None = None, # 候选序列数
# 温度
temperature: float = 1.0, # 采样温度
# Top-k/Top-p
top_k: int = -1, # top-k 采样(-1 表示禁用)
top_p: float = 1.0, # top-p (nucleus) 采样
# 惩罚
repetition_penalty: float = 1.0, # 重复惩罚
frequency_penalty: float = 0.0, # 频率惩罚
presence_penalty: float = 0.0, # 存在惩罚
# 停止条件
max_tokens: int = 16, # 最大生成 token 数
stop: list[str] | None = None, # 停止字符串
stop_token_ids: list[int] | None = None, # 停止 token
# Logprobs
logprobs: int | None = None, # 返回的 logprobs 数量
# 其他
seed: int | None = None, # 随机种子
min_p: float = 0.0, # min-p 采样
...
):
...
3. Sampler 类详解
3.1 类定义
# vllm/v1/sample/sampler.py
class Sampler(nn.Module):
"""
从模型输出中采样下一个 token 的层。
处理步骤:
1. 计算 logprobs(如果请求)
2. 转换为 float32
3. 应用 allowed token ids 白名单
4. 应用 bad words 排除
5. 应用非 argmax-invariant 的 logit 处理器
6. 应用惩罚(重复、频率、存在)
7. 采样(贪婪或随机)
8. 收集 top-k logprobs
9. 返回 SamplerOutput
"""
def __init__(self, logprobs_mode: LogprobsMode = "raw_logprobs"):
super().__init__()
self.topk_topp_sampler = TopKTopPSampler(logprobs_mode)
def forward(
self,
logits: torch.Tensor, # [num_tokens, vocab_size]
sampling_metadata: SamplingMetadata,
) -> SamplerOutput:
"""主采样方法"""
# 1. 计算原始 logprobs(用于返回给用户)
num_logprobs = sampling_metadata.max_num_logprobs
if num_logprobs is not None:
raw_logprobs = self.compute_logprobs(logits)
# 2. 转换为 float32
logits = logits.to(torch.float32)
# 3-6. 应用各种 logits 处理器
logits = self.apply_logits_processors(logits, sampling_metadata)
# 7. 采样
sampled, processed_logprobs = self.sample(logits, sampling_metadata)
# 8. 收集 logprobs
if num_logprobs is not None:
logprobs_tensors = self.gather_logprobs(
raw_logprobs, num_logprobs, token_ids=sampled
)
# 9. 返回结果
return SamplerOutput(
sampled_token_ids=sampled.unsqueeze(-1),
logprobs_tensors=logprobs_tensors,
)
3.2 采样流程图
flowchart TD
A[logits] --> B{需要 logprobs?}
B -->|是| C[compute_logprobs]
B -->|否| D[转换为 float32]
C --> D
D --> E[apply_logits_processors]
subgraph Logits 处理
E --> E1[Allowed Token IDs]
E1 --> E2[Bad Words]
E2 --> E3[Min Tokens]
E3 --> E4[Logit Bias]
E4 --> E5[Penalties]
end
E5 --> F[sample]
subgraph 采样
F --> F1{all_greedy?}
F1 -->|是| F2[argmax]
F1 -->|否| F3[apply_temperature]
F3 --> F4[min_p processor]
F4 --> F5[top_k / top_p]
F5 --> F6[multinomial]
end
F2 --> G[gather_logprobs]
F6 --> G
G --> H[SamplerOutput]4. 采样策略详解
4.1 贪婪采样(Greedy Sampling)
@staticmethod
def greedy_sample(logits: torch.Tensor) -> torch.Tensor:
"""选择概率最高的 token"""
return logits.argmax(dim=-1).view(-1)
特点:
- 确定性输出(相同输入总是产生相同输出)
- 适合需要一致性的场景
- 可能导致重复和无聊的输出
4.2 温度采样(Temperature Sampling)
@staticmethod
def apply_temperature(
logits: torch.Tensor,
temp: torch.Tensor,
all_random: bool,
) -> torch.Tensor:
"""应用温度缩放"""
if not all_random:
# 避免除以零(贪婪请求的 temp 可能为 0)
temp = torch.where(temp < _SAMPLING_EPS, 1.0, temp)
return logits.div_(temp.unsqueeze(dim=1))
温度的作用:
logits = [2.0, 1.0, 0.5, 0.1]
temp = 0.5(更确定):
logits / 0.5 = [4.0, 2.0, 1.0, 0.2]
softmax → [0.84, 0.11, 0.04, 0.01] # 高概率更集中
temp = 1.0(原始):
softmax → [0.47, 0.17, 0.10, 0.07] # 原始分布
temp = 2.0(更随机):
logits / 2.0 = [1.0, 0.5, 0.25, 0.05]
softmax → [0.36, 0.22, 0.17, 0.14] # 分布更均匀
graph LR
subgraph "温度效果"
Low["temp < 1<br/>更确定"]
Normal["temp = 1<br/>原始分布"]
High["temp > 1<br/>更随机"]
end
Low -.-> |"集中于高概率"| Peak[尖锐分布]
Normal -.-> |"保持原状"| Orig[原始分布]
High -.-> |"均匀化"| Flat[平坦分布]4.3 Top-k 采样
# vllm/v1/sample/ops/topk_topp_sampler.py
def apply_top_k(logits: torch.Tensor, top_k: int) -> torch.Tensor:
"""只保留 top-k 个最高概率的 token"""
if top_k < 0:
return logits
# 找到第 k 大的值
top_k_values, _ = torch.topk(logits, top_k, dim=-1)
min_top_k = top_k_values[:, -1].unsqueeze(-1)
# 将低于阈值的 logits 设为 -inf
return logits.masked_fill(logits < min_top_k, float('-inf'))
示例:
原始 logits: [3.5, 2.1, 1.8, 0.5, 0.1, -0.2, -1.0]
top_k = 3
处理后: [3.5, 2.1, 1.8, -inf, -inf, -inf, -inf]
softmax 后只有前 3 个有概率
4.4 Top-p (Nucleus) 采样
def apply_top_p(logits: torch.Tensor, top_p: float) -> torch.Tensor:
"""保留累积概率达到 top_p 的最小 token 集合"""
if top_p >= 1.0:
return logits
# 按概率排序
probs = torch.softmax(logits, dim=-1)
sorted_probs, sorted_indices = torch.sort(probs, descending=True)
# 计算累积概率
cumsum_probs = torch.cumsum(sorted_probs, dim=-1)
# 找到累积概率首次超过 top_p 的位置
mask = cumsum_probs - sorted_probs > top_p
# 将超出的 logits 设为 -inf
sorted_logits = logits.gather(-1, sorted_indices)
sorted_logits = sorted_logits.masked_fill(mask, float('-inf'))
# 还原顺序
return sorted_logits.scatter(-1, sorted_indices, sorted_logits)
示例:
原始 probs: [0.40, 0.25, 0.15, 0.10, 0.05, 0.03, 0.02]
top_p = 0.9
累积: [0.40, 0.65, 0.80, 0.90, 0.95, 0.98, 1.00]
↑ 首次 >= 0.9
保留前 4 个 token,其余设为 -inf
4.5 Min-p 采样
def apply_min_p(logits: torch.Tensor, min_p: float) -> torch.Tensor:
"""过滤掉概率低于 max_prob * min_p 的 token"""
if min_p <= 0.0:
return logits
probs = torch.softmax(logits, dim=-1)
max_probs = probs.max(dim=-1, keepdim=True).values
# 概率阈值 = 最大概率 * min_p
threshold = max_probs * min_p
# 过滤低概率 token
return logits.masked_fill(probs < threshold, float('-inf'))
5. 惩罚机制
5.1 重复惩罚(Repetition Penalty)
def apply_repetition_penalty(
logits: torch.Tensor,
token_ids: torch.Tensor,
penalty: float,
) -> torch.Tensor:
"""惩罚已出现的 token"""
if penalty == 1.0:
return logits
# 获取已出现 token 的 logits
score = logits.gather(-1, token_ids)
# 应用惩罚
# logits > 0: score = score / penalty
# logits < 0: score = score * penalty
score = torch.where(score > 0, score / penalty, score * penalty)
# 写回
return logits.scatter(-1, token_ids, score)
效果:
penalty = 1.2, token "the" 已出现
原始 logit: 2.5
惩罚后: 2.5 / 1.2 = 2.08 # 概率降低
原始 logit: -0.5
惩罚后: -0.5 * 1.2 = -0.6 # 概率进一步降低
5.2 频率惩罚(Frequency Penalty)
def apply_frequency_penalty(
logits: torch.Tensor,
token_counts: torch.Tensor,
penalty: float,
) -> torch.Tensor:
"""基于出现次数的惩罚"""
if penalty == 0.0:
return logits
# logits = logits - penalty * count
return logits - penalty * token_counts
效果:
penalty = 0.5, token "the" 出现了 3 次
原始 logit: 2.5
惩罚后: 2.5 - 0.5 * 3 = 1.0
5.3 存在惩罚(Presence Penalty)
def apply_presence_penalty(
logits: torch.Tensor,
token_presence: torch.Tensor, # 0 或 1
penalty: float,
) -> torch.Tensor:
"""基于是否出现过的惩罚(不考虑次数)"""
if penalty == 0.0:
return logits
# logits = logits - penalty * presence
return logits - penalty * token_presence
6. Logprobs 计算
6.1 计算 Logprobs
@staticmethod
def compute_logprobs(logits: torch.Tensor) -> torch.Tensor:
"""计算 log 概率"""
return logits.log_softmax(dim=-1, dtype=torch.float32)
6.2 收集 Top-k Logprobs
@staticmethod
def gather_logprobs(
logprobs: torch.Tensor,
num_logprobs: int,
token_ids: torch.Tensor,
) -> LogprobsTensors:
"""收集 top-k logprobs 和采样 token 的 logprob"""
# 1. 找 top-k
topk_logprobs, topk_indices = torch.topk(logprobs, num_logprobs, dim=-1)
# 2. 获取采样 token 的 logprob
token_logprobs = logprobs.gather(-1, token_ids.unsqueeze(-1))
# 3. 计算采样 token 的排名
token_ranks = batched_count_greater_than(logprobs, token_logprobs)
# 4. 合并结果
indices = torch.cat((token_ids.unsqueeze(-1), topk_indices), dim=1)
logprobs = torch.cat((token_logprobs, topk_logprobs), dim=1)
return LogprobsTensors(indices, logprobs, token_ranks)
7. 批量采样优化
7.1 SamplingMetadata
# vllm/v1/sample/metadata.py
class SamplingMetadata:
"""批量采样的元数据"""
# 基本信息
all_greedy: bool # 所有请求都是贪婪采样
all_random: bool # 所有请求都是随机采样
# 参数(批量张量)
temperature: torch.Tensor | None # [batch_size]
top_k: torch.Tensor | None # [batch_size]
top_p: torch.Tensor | None # [batch_size]
# Logprobs
max_num_logprobs: int | None
# 惩罚相关
# ...
# 随机数生成器(每个请求一个)
generators: list[torch.Generator] | None
7.2 批量处理流程
sequenceDiagram
participant S as Scheduler
participant R as ModelRunner
participant Sampler as Sampler
S->>R: SchedulerOutput (多个请求)
R->>R: 构建 SamplingMetadata
Note over R: 将各请求的采样参数<br/>批量化为张量
R->>Sampler: forward(logits, metadata)
alt all_greedy
Sampler->>Sampler: argmax (快速路径)
else mixed
Sampler->>Sampler: 批量应用温度
Sampler->>Sampler: 批量 top-k/top-p
Sampler->>Sampler: multinomial 采样
Sampler->>Sampler: 合并贪婪和随机结果
end
Sampler-->>R: SamplerOutput8. 采样参数组合建议
8.1 常见场景配置
| 场景 | temperature | top_k | top_p | 说明 |
|---|---|---|---|---|
| 代码生成 | 0.0 | - | - | 贪婪,确保正确性 |
| 技术写作 | 0.3 | - | 0.9 | 低随机性 |
| 创意写作 | 0.8 | 50 | 0.95 | 高随机性 |
| 对话 | 0.7 | 40 | 0.9 | 平衡 |
| 头脑风暴 | 1.2 | - | 0.95 | 非常随机 |
8.2 惩罚参数建议
| 参数 | 推荐范围 | 说明 |
|---|---|---|
| repetition_penalty | 1.0-1.2 | 轻微惩罚重复 |
| frequency_penalty | 0.0-0.5 | 减少高频词 |
| presence_penalty | 0.0-0.5 | 鼓励新话题 |
9. 代码位置速查
| 组件 | 文件 | 关键类/函数 |
|---|---|---|
| 采样参数 | vllm/sampling_params.py | SamplingParams |
| Sampler | vllm/v1/sample/sampler.py | Sampler |
| 采样元数据 | vllm/v1/sample/metadata.py | SamplingMetadata |
| Top-k/Top-p | vllm/v1/sample/ops/topk_topp_sampler.py | TopKTopPSampler |
| 惩罚 | vllm/v1/sample/ops/penalties.py | apply_*_penalty() |
| Logprobs | vllm/v1/sample/ops/logprobs.py | gather_logprobs() |
10. 小结
本章我们深入了解了 vLLM 的采样过程:
- 采样参数:temperature、top_k、top_p、min_p 等
- 采样策略:
- 贪婪采样:确定性,选择最高概率
- 温度采样:控制随机性
- Top-k:只考虑前 k 个 token
- Top-p:累积概率阈值过滤
- Min-p:相对于最高概率的阈值
- 惩罚机制:
- 重复惩罚:惩罚已出现 token
- 频率惩罚:基于出现次数
- 存在惩罚:基于是否出现
- Logprobs:返回 token 的对数概率
- 批量优化:通过 SamplingMetadata 批量处理
在下一章中,我们将分析输出处理过程,了解采样结果如何转换为用户可见的输出。
导航
5 - 输出处理
输出处理流程
采样完成后,生成的 token 需要经过一系列处理才能最终返回给用户。本章将详细分析从采样结果到用户输出的完整处理流程。
1. 输出处理在整体流程中的位置
graph LR
subgraph 模型执行
Sample[采样] --> SamplerOut[SamplerOutput]
end
subgraph 输出处理
SamplerOut --> Update[更新请求状态]
Update --> Check[检查停止条件]
Check --> Detok[Detokenize]
Detok --> Build[构建输出]
end
subgraph 返回用户
Build --> Stream[流式返回]
Build --> Final[最终返回]
end2. 输出数据结构
2.1 SamplerOutput
# vllm/v1/outputs.py
@dataclass
class SamplerOutput:
"""采样器的输出"""
# 采样的 token IDs [num_requests, 1]
sampled_token_ids: torch.Tensor
# Logprobs 信息(可选)
logprobs_tensors: LogprobsTensors | None = None
@dataclass
class LogprobsTensors:
"""Logprobs 张量"""
# Top-k token indices [num_tokens, num_logprobs + 1]
indices: torch.Tensor
# Top-k logprobs [num_tokens, num_logprobs + 1]
logprobs: torch.Tensor
# 采样 token 的排名 [num_tokens]
ranks: torch.Tensor
2.2 ModelRunnerOutput
# vllm/v1/outputs.py
@dataclass
class ModelRunnerOutput:
"""ModelRunner 的输出"""
# 每个请求的采样结果
# Dict[request_id, SamplerOutput]
sampler_output: dict[str, SamplerOutput]
# 模型特定的输出(如 pooling embeddings)
model_output: Any | None = None
2.3 EngineCoreOutput
# vllm/v1/engine/__init__.py
@dataclass
class EngineCoreOutput:
"""EngineCore 的单个请求输出"""
request_id: str
# 新生成的 token IDs
new_token_ids: list[int]
# 完成原因(如果完成)
finish_reason: FinishReason | None = None
# 停止字符串(如果因停止字符串完成)
stop_str: str | None = None
# Logprobs 信息
new_logprobs: list[dict[int, Logprob]] | None = None
@dataclass
class EngineCoreOutputs:
"""EngineCore 的批量输出"""
outputs: list[EngineCoreOutput]
# 完成的请求 IDs
finished_req_ids: set[str] | None = None
2.4 RequestOutput(最终用户输出)
# vllm/outputs.py
@dataclass
class RequestOutput:
"""用户可见的最终输出"""
request_id: str
# 原始 prompt
prompt: str | None
prompt_token_ids: list[int]
# 生成结果(可能多个,如 beam search)
outputs: list[CompletionOutput]
# 是否完成
finished: bool
# 指标
metrics: RequestMetrics | None = None
@dataclass
class CompletionOutput:
"""单个生成序列的输出"""
index: int # 序列索引
text: str # 生成的文本
token_ids: list[int] # 生成的 token IDs
cumulative_logprob: float | None # 累积对数概率
logprobs: list[dict] | None # 每个 token 的 logprobs
finish_reason: str | None # 完成原因
stop_reason: str | int | None # 停止原因
3. 输出处理流程详解
3.1 update_from_output() - 状态更新
# vllm/v1/core/sched/scheduler.py
def update_from_output(
self,
model_runner_output: ModelRunnerOutput,
sampler_output: SamplerOutput | None,
scheduler_output: SchedulerOutput,
) -> EngineCoreOutputs:
"""根据模型输出更新请求状态"""
outputs: list[EngineCoreOutput] = []
for req_id, req_output in model_runner_output.items():
request = self.requests[req_id]
# 1. 获取新生成的 token IDs
new_token_ids = req_output.sampled_token_ids.tolist()
# 2. 追加到请求
request.append_output_token_ids(new_token_ids)
# 3. 检查停止条件
finish_reason, stop_str = check_stop(request, self.max_model_len)
# 4. 处理完成的请求
if finish_reason is not None:
self._finish_request(request, finish_reason)
# 5. 构建输出
output = EngineCoreOutput(
request_id=req_id,
new_token_ids=new_token_ids,
finish_reason=finish_reason,
stop_str=stop_str,
new_logprobs=self._process_logprobs(req_output),
)
outputs.append(output)
return EngineCoreOutputs(outputs=outputs)
3.2 停止条件检查
# vllm/v1/core/sched/utils.py
def check_stop(
request: Request,
max_model_len: int,
) -> tuple[FinishReason | None, str | None]:
"""检查请求是否应该停止"""
# 1. 检查 EOS token
last_token_id = request.all_token_ids[-1]
if last_token_id == request.eos_token_id:
return FinishReason.STOP, None
# 2. 检查最大输出长度
if request.num_output_tokens >= request.max_tokens:
return FinishReason.LENGTH, None
# 3. 检查模型最大长度
if len(request.all_token_ids) >= max_model_len:
return FinishReason.LENGTH, None
# 4. 检查停止 token IDs
if request.stop_token_ids:
if last_token_id in request.stop_token_ids:
return FinishReason.STOP, None
# 5. 检查停止字符串
if request.stop_strings:
output_text = request.get_output_text()
for stop_str in request.stop_strings:
if stop_str in output_text:
return FinishReason.STOP, stop_str
return None, None # 继续生成
3.3 流程图
flowchart TD
A[SamplerOutput] --> B[遍历每个请求]
B --> C[获取 new_token_ids]
C --> D[append_output_token_ids]
D --> E[check_stop]
E --> F{停止?}
F -->|EOS| G[finish_reason = STOP]
F -->|max_tokens| H[finish_reason = LENGTH]
F -->|stop_string| I[finish_reason = STOP<br/>stop_str = ...]
F -->|否| J[继续]
G --> K[_finish_request]
H --> K
I --> K
K --> L[释放 KV Cache]
L --> M[从 running 移除]
J --> N[构建 EngineCoreOutput]
M --> N
N --> O[EngineCoreOutputs]4. Detokenization - 反向分词
4.1 增量 Detokenize
# vllm/v1/engine/detokenizer.py
class IncrementalDetokenizer:
"""增量反向分词器"""
def __init__(self, tokenizer: TokenizerLike, request: Request):
self.tokenizer = tokenizer
self.request = request
# 已解码的文本
self.output_text = ""
# 待解码的 token 缓冲区
self.token_buffer: list[int] = []
# 上一次的偏移量(用于流式输出)
self.prev_output_len = 0
def decode(self, new_token_ids: list[int]) -> str:
"""解码新的 token,返回新增的文本"""
# 添加到缓冲区
self.token_buffer.extend(new_token_ids)
# 尝试解码
text = self.tokenizer.decode(
self.token_buffer,
skip_special_tokens=True,
)
# 检查是否有完整的字符
# (某些 token 可能是部分字符,需要等待后续 token)
if self._is_valid_utf8(text):
self.output_text = text
new_text = text[self.prev_output_len:]
self.prev_output_len = len(text)
return new_text
return ""
4.2 增量解码的必要性
Token IDs: [15496, 284, 262, 995, 0]
逐个解码:
15496 → "Hello" ✓ 完整单词
284 → " to" ✓ 完整
262 → " the" ✓ 完整
995 → " wor" ? 可能是 "world" 的一部分
0 → "ld" ✓ 完成 "world"
增量解码会等待 995+0 一起解码为 " world"
而不是输出 " wor" 然后 "ld"
5. 流式输出
5.1 流式输出架构
sequenceDiagram
participant User as 用户
participant API as API Server
participant Engine as LLMEngine
participant Core as EngineCore
User->>API: POST /v1/completions<br/>stream=true
API->>Engine: add_request()
loop 每个 step
Engine->>Core: step()
Core-->>Engine: EngineCoreOutputs
Engine->>Engine: detokenize()
Engine-->>API: yield partial_output
API-->>User: SSE: data: {...}
end
Engine-->>API: final_output
API-->>User: SSE: data: [DONE]5.2 Server-Sent Events (SSE) 格式
# 流式响应示例
async def stream_response():
async for output in engine.generate_stream(prompt, params):
yield f"data: {json.dumps(output)}\n\n"
yield "data: [DONE]\n\n"
实际输出示例:
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" there"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{},"finish_reason":"stop"}]}
data: [DONE]
6. Logprobs 处理
6.1 Logprobs 数据结构
@dataclass
class Logprob:
"""单个 token 的 logprob 信息"""
logprob: float # 对数概率
rank: int | None # 在词表中的排名
decoded_token: str # 解码后的文本
# 每个位置的 logprobs
# Dict[token_id, Logprob]
6.2 Logprobs 处理流程
def _process_logprobs(
self,
sampler_output: SamplerOutput,
) -> list[dict[int, Logprob]] | None:
"""处理 logprobs 输出"""
if sampler_output.logprobs_tensors is None:
return None
tensors = sampler_output.logprobs_tensors
result = []
for i in range(tensors.indices.shape[0]):
token_logprobs = {}
# 获取 top-k token IDs 和 logprobs
for j in range(tensors.indices.shape[1]):
token_id = tensors.indices[i, j].item()
logprob = tensors.logprobs[i, j].item()
# 解码 token
decoded = self.tokenizer.decode([token_id])
token_logprobs[token_id] = Logprob(
logprob=logprob,
rank=j if j > 0 else tensors.ranks[i].item(),
decoded_token=decoded,
)
result.append(token_logprobs)
return result
7. 输出格式化
7.1 OpenAI 兼容格式
# vllm/entrypoints/openai/protocol.py
class ChatCompletionResponse(BaseModel):
"""OpenAI Chat Completion 响应格式"""
id: str
object: str = "chat.completion"
created: int
model: str
choices: list[ChatCompletionResponseChoice]
usage: UsageInfo
class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessage
finish_reason: str | None
logprobs: ChoiceLogprobs | None
class ChatMessage(BaseModel):
role: str
content: str
7.2 格式转换
def create_chat_completion_response(
request_output: RequestOutput,
model_name: str,
) -> ChatCompletionResponse:
"""将 RequestOutput 转换为 OpenAI 格式"""
choices = []
for i, output in enumerate(request_output.outputs):
choice = ChatCompletionResponseChoice(
index=i,
message=ChatMessage(
role="assistant",
content=output.text,
),
finish_reason=output.finish_reason,
logprobs=_convert_logprobs(output.logprobs),
)
choices.append(choice)
return ChatCompletionResponse(
id=f"chatcmpl-{request_output.request_id}",
created=int(time.time()),
model=model_name,
choices=choices,
usage=UsageInfo(
prompt_tokens=len(request_output.prompt_token_ids),
completion_tokens=sum(len(o.token_ids) for o in request_output.outputs),
total_tokens=...,
),
)
8. 输出处理完整流程图
flowchart TD
subgraph EngineCore
SO[SamplerOutput] --> UO[update_from_output]
UO --> CS[check_stop]
CS --> ECO[EngineCoreOutput]
end
subgraph LLMEngine
ECO --> DT[Detokenize]
DT --> PL[Process Logprobs]
PL --> RO[RequestOutput]
end
subgraph API Server
RO --> FMT[格式化]
FMT --> |非流式| JSON[JSON Response]
FMT --> |流式| SSE[SSE Events]
end
subgraph 用户
JSON --> User1[完整响应]
SSE --> User2[增量响应]
end9. 代码位置速查
| 组件 | 文件 | 关键类/函数 |
|---|---|---|
| 输出数据结构 | vllm/v1/outputs.py | SamplerOutput, ModelRunnerOutput |
| EngineCore 输出 | vllm/v1/engine/__init__.py | EngineCoreOutput |
| 用户输出 | vllm/outputs.py | RequestOutput, CompletionOutput |
| 状态更新 | vllm/v1/core/sched/scheduler.py | update_from_output() |
| 停止检查 | vllm/v1/core/sched/utils.py | check_stop() |
| Detokenizer | vllm/v1/engine/detokenizer.py | IncrementalDetokenizer |
| OpenAI 格式 | vllm/entrypoints/openai/protocol.py | ChatCompletionResponse |
10. 小结
本章我们详细分析了输出处理流程:
数据结构链路:
SamplerOutput→ModelRunnerOutput→EngineCoreOutput→RequestOutput
状态更新:
- 追加 token 到请求
- 检查停止条件
- 处理完成的请求
停止条件:
- EOS token
- 最大长度
- 停止字符串/token
Detokenization:
- 增量解码
- 处理部分字符
流式输出:
- Server-Sent Events
- 增量返回
格式化:
- OpenAI 兼容格式
- Logprobs 处理
在下一章中,我们将完整跟踪一个请求从提交到返回的完整生命周期。
导航
6 - 请求生命周期
请求完整生命周期
本章将完整跟踪一个请求从用户提交到最终返回的全过程,将前面章节的知识串联起来,帮助读者建立完整的认知图景。
1. 生命周期概览
graph TD
subgraph 1. 提交阶段
A1[用户调用 generate]
A2[Tokenize]
A3[创建请求]
A4[加入 waiting 队列]
end
subgraph 2. 调度阶段
B1[查找前缀缓存]
B2[分配 KV Cache]
B3[加入 running 队列]
end
subgraph 3. 执行阶段
C1[准备输入]
C2[模型前向传播]
C3[采样]
end
subgraph 4. 更新阶段
D1[追加 token]
D2[检查停止条件]
D3[更新状态]
end
subgraph 5. 返回阶段
E1[Detokenize]
E2[构建输出]
E3[返回用户]
end
A1 --> A2 --> A3 --> A4
A4 --> B1 --> B2 --> B3
B3 --> C1 --> C2 --> C3
C3 --> D1 --> D2 --> D3
D3 -->|未完成| C1
D3 -->|完成| E1 --> E2 --> E32. 阶段 1:请求提交
2.1 用户调用
# 用户代码
from vllm import LLM, SamplingParams
llm = LLM(model="meta-llama/Llama-2-7b-hf")
prompts = ["The capital of France is"]
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=50)
outputs = llm.generate(prompts, sampling_params)
2.2 Tokenize
# vllm/entrypoints/llm.py
def generate(self, prompts, sampling_params, ...):
# 1. 处理输入
for prompt in prompts:
# Tokenize prompt
prompt_token_ids = self.tokenizer.encode(prompt)
# 创建请求
request_id = str(next(self.request_counter))
self._add_request(
request_id=request_id,
prompt=prompt,
prompt_token_ids=prompt_token_ids,
params=sampling_params,
)
2.3 创建 EngineCoreRequest
# vllm/v1/engine/llm_engine.py
def add_request(self, request_id, prompt, params, ...):
# 构建 EngineCoreRequest
engine_request = EngineCoreRequest(
request_id=request_id,
prompt_token_ids=prompt_token_ids,
sampling_params=params,
arrival_time=time.time(),
eos_token_id=self.tokenizer.eos_token_id,
)
# 发送到 EngineCore
self.engine_core.add_request(engine_request)
2.4 加入 Waiting 队列
# vllm/v1/core/sched/scheduler.py
def add_request(self, request: EngineCoreRequest) -> None:
# 1. 创建内部 Request 对象
internal_request = Request(
request_id=request.request_id,
prompt_token_ids=request.prompt_token_ids,
sampling_params=request.sampling_params,
)
# 2. 计算 block hashes(用于前缀缓存)
if self.enable_caching:
internal_request.block_hashes = compute_block_hashes(
internal_request.prompt_token_ids,
self.block_size,
)
# 3. 加入 waiting 队列
internal_request.status = RequestStatus.WAITING
self.waiting.append_request(internal_request)
# 4. 记录到请求字典
self.requests[request.request_id] = internal_request
2.5 提交阶段时序图
sequenceDiagram
participant User as 用户
participant LLM as LLM
participant Tokenizer as Tokenizer
participant Engine as LLMEngine
participant Core as EngineCore
participant Sched as Scheduler
User->>LLM: generate(prompts, params)
LLM->>Tokenizer: encode(prompt)
Tokenizer-->>LLM: token_ids
LLM->>Engine: add_request(id, tokens, params)
Engine->>Engine: 创建 EngineCoreRequest
Engine->>Core: add_request(request)
Core->>Sched: add_request(request)
Sched->>Sched: 创建 internal Request
Sched->>Sched: 计算 block_hashes
Sched->>Sched: waiting.append(request)
Note over Sched: 请求进入 WAITING 状态3. 阶段 2:调度
3.1 查找前缀缓存
# vllm/v1/core/sched/scheduler.py :: schedule()
# 从 waiting 队列取出请求
request = self.waiting.peek_request()
# 查找前缀缓存
new_computed_blocks, num_cached_tokens = (
self.kv_cache_manager.get_computed_blocks(request)
)
# num_cached_tokens 表示可以跳过的 token 数
# 例如:prompt 有 100 tokens,前 64 个已缓存
# 则只需要计算后 36 个
3.2 分配 KV Cache
# 计算需要处理的 token 数
num_new_tokens = request.num_tokens - num_cached_tokens
# 分配 KV Cache slots
new_blocks = self.kv_cache_manager.allocate_slots(
request,
num_new_tokens,
num_new_computed_tokens=num_cached_tokens,
new_computed_blocks=new_computed_blocks,
)
if new_blocks is None:
# 内存不足,请求继续等待
return
# 分配成功
3.3 移入 Running 队列
# 从 waiting 移除
request = self.waiting.pop_request()
# 加入 running
self.running.append(request)
# 更新状态
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_cached_tokens
3.4 调度阶段示意图
flowchart TD
subgraph Scheduler.schedule
W[waiting 队列] --> Peek[peek_request]
Peek --> Cache[get_computed_blocks]
Cache --> Alloc[allocate_slots]
Alloc --> Check{分配成功?}
Check -->|是| Move[移入 running]
Check -->|否| Wait[继续等待]
Move --> SO[构建 SchedulerOutput]
end
subgraph SchedulerOutput
SO --> Reqs[scheduled_new_reqs]
SO --> Blocks[req_to_new_blocks]
SO --> Tokens[num_scheduled_tokens]
end4. 阶段 3:模型执行
4.1 准备输入
# vllm/v1/worker/gpu_model_runner.py
def execute_model(self, scheduler_output: SchedulerOutput):
# 1. 准备 input_ids
input_ids = self._prepare_input_ids(scheduler_output)
# 2. 准备 positions
positions = self._prepare_positions(scheduler_output)
# 3. 准备 attention metadata
attn_metadata = self._prepare_attention_metadata(scheduler_output)
# 4. 更新 block table
self._update_block_table(scheduler_output)
4.2 模型前向传播
# 5. 前向传播
with torch.inference_mode():
hidden_states = self.model(
input_ids=input_ids,
positions=positions,
kv_caches=self.kv_caches,
attn_metadata=attn_metadata,
)
# 6. 计算 logits
logits = self.model.compute_logits(hidden_states)
return ModelRunnerOutput(logits=logits, ...)
4.3 采样
# vllm/v1/executor/abstract.py
def sample_tokens(self, model_output: ModelRunnerOutput) -> SamplerOutput:
# 构建采样元数据
sampling_metadata = self._prepare_sampling_metadata()
# 采样
sampler_output = self.sampler(
model_output.logits,
sampling_metadata,
)
return sampler_output
4.4 执行阶段时序图
sequenceDiagram
participant Core as EngineCore
participant Exec as Executor
participant Worker as Worker
participant Runner as ModelRunner
participant Model as Model
participant Sampler as Sampler
Core->>Exec: execute_model(scheduler_output)
Exec->>Worker: execute_model()
Worker->>Runner: execute_model()
Runner->>Runner: _prepare_inputs()
Runner->>Model: forward(input_ids, positions, kv_caches)
Note over Model: Embedding → Transformer Layers → Norm
Model-->>Runner: hidden_states
Runner->>Model: compute_logits(hidden_states)
Model-->>Runner: logits
Runner-->>Worker: ModelRunnerOutput
Worker-->>Exec: output
Core->>Exec: sample_tokens()
Exec->>Sampler: forward(logits, metadata)
Note over Sampler: Temperature → Top-k/p → Sample
Sampler-->>Exec: SamplerOutput
Exec-->>Core: sampled_tokens5. 阶段 4:状态更新
5.1 追加 Token
# vllm/v1/core/sched/scheduler.py
def update_from_output(self, model_output, sampler_output, scheduler_output):
for req_id, output in sampler_output.items():
request = self.requests[req_id]
# 获取新生成的 token
new_token_ids = output.sampled_token_ids.tolist()
# 追加到请求
request.append_output_token_ids(new_token_ids)
# 更新 computed_tokens
request.num_computed_tokens += 1
5.2 检查停止条件
# 检查是否完成
finish_reason, stop_str = check_stop(request, self.max_model_len)
if finish_reason is not None:
# 请求完成
self._finish_request(request, finish_reason)
finished_outputs.append(...)
else:
# 继续生成
outputs.append(...)
5.3 完成请求处理
def _finish_request(self, request: Request, reason: FinishReason):
# 1. 释放 KV Cache
self.kv_cache_manager.free(request)
# 2. 从 running 移除
self.running.remove(request)
# 3. 更新状态
request.status = RequestStatus.FINISHED
# 4. 记录完成
self.finished_req_ids.add(request.request_id)
6. 阶段 5:返回结果
6.1 Detokenize
# vllm/v1/engine/llm_engine.py
def _process_outputs(self, engine_outputs: EngineCoreOutputs):
results = []
for output in engine_outputs.outputs:
request = self.requests[output.request_id]
# 增量解码
new_text = self.detokenizer.decode(
request,
output.new_token_ids,
)
# 更新请求的输出文本
request.output_text += new_text
results.append(...)
return results
6.2 构建 RequestOutput
def _make_request_output(self, request: Request, finished: bool):
return RequestOutput(
request_id=request.request_id,
prompt=request.prompt,
prompt_token_ids=request.prompt_token_ids,
outputs=[
CompletionOutput(
index=0,
text=request.output_text,
token_ids=request.output_token_ids,
finish_reason=request.finish_reason,
logprobs=request.logprobs,
)
],
finished=finished,
)
6.3 返回用户
# vllm/entrypoints/llm.py
def _run_engine(self, use_tqdm: bool):
outputs = []
while self.llm_engine.has_unfinished_requests():
step_outputs = self.llm_engine.step()
for output in step_outputs:
if output.finished:
outputs.append(output)
return sorted(outputs, key=lambda x: int(x.request_id))
7. 完整生命周期时序图
sequenceDiagram
participant User as 用户
participant LLM as LLM
participant Engine as LLMEngine
participant Core as EngineCore
participant Sched as Scheduler
participant KVM as KVCacheManager
participant Exec as Executor
participant Model as Model
rect rgb(230, 245, 230)
Note over User,Model: 1. 提交阶段
User->>LLM: generate(prompt, params)
LLM->>Engine: add_request()
Engine->>Core: add_request()
Core->>Sched: add_request()
Note over Sched: status = WAITING
end
loop 每个 step
rect rgb(255, 245, 230)
Note over User,Model: 2. 调度阶段
Core->>Sched: schedule()
Sched->>KVM: get_computed_blocks()
KVM-->>Sched: cached_blocks, num_cached
Sched->>KVM: allocate_slots()
KVM-->>Sched: new_blocks
Note over Sched: status = RUNNING
Sched-->>Core: SchedulerOutput
end
rect rgb(245, 230, 230)
Note over User,Model: 3. 执行阶段
Core->>Exec: execute_model()
Exec->>Model: forward()
Model-->>Exec: logits
Exec->>Exec: sample()
Exec-->>Core: SamplerOutput
end
rect rgb(230, 230, 245)
Note over User,Model: 4. 更新阶段
Core->>Sched: update_from_output()
Sched->>Sched: append_token()
Sched->>Sched: check_stop()
alt 完成
Sched->>KVM: free()
Note over Sched: status = FINISHED
end
end
end
rect rgb(245, 245, 230)
Note over User,Model: 5. 返回阶段
Core-->>Engine: EngineCoreOutputs
Engine->>Engine: detokenize()
Engine-->>LLM: RequestOutput
LLM-->>User: outputs
end8. 状态转换汇总
stateDiagram-v2
[*] --> WAITING: add_request()
WAITING --> RUNNING: schedule() 成功
WAITING --> WAITING_FOR_FSM: 需要 FSM 编译
WAITING --> WAITING_FOR_REMOTE_KVS: 等待远程 KV
WAITING_FOR_FSM --> WAITING: FSM 就绪
WAITING_FOR_REMOTE_KVS --> WAITING: KV 就绪
RUNNING --> RUNNING: step() 继续生成
RUNNING --> PREEMPTED: 内存不足被抢占
RUNNING --> FINISHED_STOPPED: EOS 或停止字符串
RUNNING --> FINISHED_LENGTH: 达到 max_tokens
RUNNING --> FINISHED_ABORTED: 用户取消
PREEMPTED --> WAITING: 重新排队
FINISHED_STOPPED --> [*]: 释放资源
FINISHED_LENGTH --> [*]: 释放资源
FINISHED_ABORTED --> [*]: 释放资源9. 关键数据结构流转
用户输入
↓
prompt: str
↓ Tokenize
prompt_token_ids: list[int]
↓ 创建请求
EngineCoreRequest
↓ 调度器内部
Request (internal)
↓ 调度
SchedulerOutput
↓ 执行
ModelRunnerOutput (logits)
↓ 采样
SamplerOutput (token_ids)
↓ 更新
EngineCoreOutput
↓ Detokenize
RequestOutput
↓
用户输出
10. 小结
本章我们完整跟踪了一个请求的生命周期:
提交阶段:
- Tokenize → 创建请求 → 加入 waiting 队列
调度阶段:
- 查找缓存 → 分配 KV Cache → 移入 running
执行阶段:
- 准备输入 → 前向传播 → 采样
更新阶段:
- 追加 token → 检查停止 → 更新状态
返回阶段:
- Detokenize → 构建输出 → 返回用户
通过这个完整的流程分析,我们可以看到 vLLM 的各个组件是如何协同工作的,以及为什么它能够实现高效的 LLM 推理。