.

.

代码链路分析

跟踪代码执行路径,理解实现细节

本部分将带你深入 vLLM 的源代码,通过分析关键代码路径,理解请求从创建到完成的完整生命周期。

1 - 入口点分析

入口点分析

本章开始,我们将从代码层面深入分析 vLLM 的执行流程。首先从入口点开始,了解用户如何与 vLLM 交互,以及请求是如何被处理的。


1. vLLM 的两种使用方式

vLLM 提供两种主要的使用方式:

graph TD
    subgraph 离线推理
        LLM[LLM 类]
        Gen[generate/encode/...]
    end

    subgraph 在线服务
        Server[API Server]
        Async[AsyncLLMEngine]
    end

    User1[用户代码] --> LLM
    LLM --> Gen

    User2[HTTP 请求] --> Server
    Server --> Async

    style LLM fill:#e1f5fe
    style Server fill:#fff3e0
方式入口类适用场景
离线推理LLM批量处理、脚本调用、研究实验
在线服务AsyncLLMEngineWeb 服务、API 接口、实时应用

2. LLM 类 - 离线推理入口

2.1 类定义与初始化

# vllm/entrypoints/llm.py

class LLM:
    """用于从给定提示和采样参数生成文本的 LLM。

    这个类包含一个分词器、一个语言模型(可能分布在多个 GPU 上),
    以及为中间状态(即 KV Cache)分配的 GPU 内存空间。
    """

    def __init__(
        self,
        model: str,                         # 模型路径或名称
        *,
        tokenizer: str | None = None,        # 分词器路径
        tokenizer_mode: str = "auto",        # 分词器模式
        trust_remote_code: bool = False,     # 信任远程代码
        tensor_parallel_size: int = 1,       # 张量并行大小
        dtype: str = "auto",                 # 数据类型
        quantization: str | None = None,     # 量化方法
        gpu_memory_utilization: float = 0.9, # GPU 内存利用率
        enable_prefix_caching: bool = False, # 启用前缀缓存
        **kwargs,                            # 更多配置参数
    ):
        # 1. 构建配置
        engine_args = EngineArgs(
            model=model,
            tokenizer=tokenizer,
            tensor_parallel_size=tensor_parallel_size,
            dtype=dtype,
            quantization=quantization,
            gpu_memory_utilization=gpu_memory_utilization,
            enable_prefix_caching=enable_prefix_caching,
            **kwargs,
        )

        # 2. 创建 LLMEngine
        self.llm_engine = LLMEngine.from_engine_args(engine_args)

        # 3. 获取分词器
        self.tokenizer = self.llm_engine.get_tokenizer()

        # 4. 请求计数器
        self.request_counter = Counter()

2.2 初始化流程

sequenceDiagram
    participant User as 用户
    participant LLM as LLM
    participant Args as EngineArgs
    participant Engine as LLMEngine
    participant Core as EngineCore
    participant Exec as Executor

    User->>LLM: LLM(model, ...)
    LLM->>Args: 构建 EngineArgs
    LLM->>Engine: from_engine_args()

    Engine->>Core: 创建 EngineCore
    Core->>Exec: 创建 Executor
    Exec->>Exec: 加载模型
    Exec->>Exec: 分配 KV Cache

    Core->>Core: 创建 Scheduler

    Engine-->>LLM: engine 实例
    LLM->>LLM: 获取 tokenizer
    LLM-->>User: llm 实例

2.3 generate() - 文本生成

# vllm/entrypoints/llm.py

def generate(
    self,
    prompts: PromptType | list[PromptType] | None = None,
    sampling_params: SamplingParams | list[SamplingParams] | None = None,
    prompt_token_ids: list[list[int]] | None = None,
    use_tqdm: bool = True,
    lora_request: LoRARequest | list[LoRARequest] | None = None,
) -> list[RequestOutput]:
    """生成文本的主方法。

    Args:
        prompts: 输入提示(字符串或多模态数据)
        sampling_params: 采样参数(温度、top_k、top_p 等)
        prompt_token_ids: 直接提供 token IDs
        use_tqdm: 是否显示进度条
        lora_request: LoRA 适配器请求

    Returns:
        RequestOutput 列表,包含生成的文本
    """
    # 1. 参数验证和标准化
    if prompts is None and prompt_token_ids is None:
        raise ValueError("prompts or prompt_token_ids must be provided")

    # 2. 处理采样参数
    if sampling_params is None:
        sampling_params = SamplingParams()

    # 3. 添加请求到引擎
    for i, (prompt, params) in enumerate(zip(prompts, sampling_params_list)):
        request_id = str(next(self.request_counter))
        self._add_request(
            request_id=request_id,
            prompt=prompt,
            params=params,
            lora_request=lora_request,
        )

    # 4. 运行引擎直到完成
    return self._run_engine(use_tqdm=use_tqdm)

2.4 _run_engine() - 运行引擎

def _run_engine(self, use_tqdm: bool = True) -> list[RequestOutput]:
    """运行引擎直到所有请求完成"""

    outputs: list[RequestOutput] = []

    # 使用 tqdm 显示进度(可选)
    pbar = tqdm(total=num_requests, disable=not use_tqdm)

    while self.llm_engine.has_unfinished_requests():
        # 执行一步
        step_outputs = self.llm_engine.step()

        for output in step_outputs:
            if output.finished:
                outputs.append(output)
                pbar.update(1)

    pbar.close()

    # 按请求 ID 排序返回
    return sorted(outputs, key=lambda x: int(x.request_id))

3. LLMEngine - 引擎核心

3.1 类结构

# vllm/v1/engine/llm_engine.py

class LLMEngine:
    """vLLM 的同步推理引擎"""

    def __init__(self, vllm_config: VllmConfig, ...):
        # 核心组件
        self.engine_core: EngineCore  # 内部循环
        self.tokenizer: TokenizerLike  # 分词器
        self.input_processor  # 输入处理器

    @classmethod
    def from_engine_args(cls, engine_args: EngineArgs) -> "LLMEngine":
        """从引擎参数创建实例(工厂方法)"""
        vllm_config = engine_args.create_engine_config()
        return cls(vllm_config, ...)

    def add_request(
        self,
        request_id: str,
        prompt: PromptType,
        params: SamplingParams,
    ) -> None:
        """添加请求到引擎"""
        ...

    def step(self) -> list[RequestOutput]:
        """执行一步推理"""
        ...

3.2 关键方法流程

graph TD
    subgraph add_request
        A1[接收请求] --> A2[处理输入]
        A2 --> A3[tokenize]
        A3 --> A4[创建 EngineCoreRequest]
        A4 --> A5[发送到 EngineCore]
    end

    subgraph step
        S1[调用 EngineCore.step] --> S2[获取输出]
        S2 --> S3[detokenize]
        S3 --> S4[构建 RequestOutput]
        S4 --> S5[返回结果]
    end

4. EngineCore - 内部循环

4.1 类定义

# vllm/v1/engine/core.py

class EngineCore:
    """vLLM 引擎的内部循环"""

    def __init__(
        self,
        vllm_config: VllmConfig,
        executor_class: type[Executor],
        log_stats: bool,
        ...
    ):
        # 1. 加载插件
        from vllm.plugins import load_general_plugins
        load_general_plugins()

        # 2. 创建模型执行器
        self.model_executor = executor_class(vllm_config)

        # 3. 初始化 KV Cache
        num_gpu_blocks, num_cpu_blocks, kv_cache_config = (
            self._initialize_kv_caches(vllm_config)
        )
        vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks

        # 4. 初始化 Worker 端的 KV Cache
        self.collective_rpc("initialize_cache", args=(num_gpu_blocks, num_cpu_blocks))

        # 5. 创建调度器
        Scheduler = vllm_config.scheduler_config.get_scheduler_cls()
        self.scheduler = Scheduler(
            vllm_config=vllm_config,
            kv_cache_config=kv_cache_config,
            ...
        )

4.2 step() - 核心执行循环

def step(self) -> EngineCoreOutputs:
    """执行一步推理"""

    # 1. 调度:决定这一步处理哪些请求
    scheduler_output = self.scheduler.schedule()

    if scheduler_output.is_empty():
        return EngineCoreOutputs([])

    # 2. 执行模型
    model_output = self.model_executor.execute_model(scheduler_output)

    # 3. 采样
    if not self.is_pooling_model:
        sampled_tokens = self.model_executor.sample_tokens(model_output)
    else:
        sampled_tokens = None

    # 4. 更新调度器状态
    outputs = self.scheduler.update_from_output(
        model_output,
        sampled_tokens,
        scheduler_output,
    )

    return outputs

4.3 执行流程图

sequenceDiagram
    participant Core as EngineCore
    participant Sched as Scheduler
    participant KVM as KVCacheManager
    participant Exec as Executor
    participant GPU as GPU Worker

    loop 每个 step
        Core->>Sched: schedule()

        rect rgb(230, 245, 230)
            Note over Sched,KVM: 调度决策
            Sched->>KVM: get_computed_blocks()
            Sched->>KVM: allocate_slots()
            Sched-->>Core: SchedulerOutput
        end

        rect rgb(245, 230, 230)
            Note over Core,GPU: 模型执行
            Core->>Exec: execute_model()
            Exec->>GPU: 前向传播
            GPU-->>Exec: logits
            Exec-->>Core: ModelOutput
        end

        rect rgb(230, 230, 245)
            Note over Core: 采样和更新
            Core->>Exec: sample_tokens()
            Exec-->>Core: sampled_tokens
            Core->>Sched: update_from_output()
            Sched-->>Core: EngineCoreOutputs
        end
    end

5. API Server - 在线服务入口

5.1 启动命令

# 启动 OpenAI 兼容的 API 服务
vllm serve meta-llama/Llama-2-7b --port 8000

# 或使用 Python
python -m vllm.entrypoints.openai.api_server --model meta-llama/Llama-2-7b

5.2 服务架构

graph TD
    subgraph API Server
        FastAPI[FastAPI 应用]
        Router[路由器]
        Middleware[中间件]
    end

    subgraph Endpoints
        Chat[/v1/chat/completions]
        Completions[/v1/completions]
        Embeddings[/v1/embeddings]
        Models[/v1/models]
    end

    subgraph Engine
        AsyncEngine[AsyncLLMEngine]
    end

    Client[HTTP 客户端] --> FastAPI
    FastAPI --> Router
    Router --> Chat
    Router --> Completions
    Router --> Embeddings
    Router --> Models

    Chat --> AsyncEngine
    Completions --> AsyncEngine
    Embeddings --> AsyncEngine

5.3 请求处理流程

sequenceDiagram
    participant Client as HTTP 客户端
    participant API as API Server
    participant Async as AsyncLLMEngine
    participant Core as EngineCore

    Client->>API: POST /v1/chat/completions
    API->>API: 验证请求
    API->>API: 解析聊天消息
    API->>API: 应用聊天模板

    API->>Async: add_request()
    Async->>Core: 添加到调度队列

    loop 异步生成
        Core->>Core: step()
        Core-->>Async: 部分输出

        alt 流式响应
            Async-->>API: yield 部分结果
            API-->>Client: SSE 事件
        end
    end

    Core-->>Async: 完成输出
    Async-->>API: 最终结果
    API-->>Client: HTTP 响应

6. 请求数据结构

6.1 EngineCoreRequest

# vllm/v1/engine/__init__.py

@dataclass
class EngineCoreRequest:
    """从 LLMEngine 发送到 EngineCore 的请求"""

    request_id: str                    # 唯一标识
    prompt_token_ids: list[int]        # prompt 的 token IDs
    mm_inputs: list | None             # 多模态输入
    mm_hashes: list | None             # 多模态内容的 hash
    mm_positions: list | None          # 多模态位置信息
    sampling_params: SamplingParams    # 采样参数
    eos_token_id: int | None           # 结束 token ID
    arrival_time: float                # 到达时间
    lora_request: LoRARequest | None   # LoRA 请求

6.2 Request(调度器内部)

# vllm/v1/request.py

class Request:
    """调度器内部的请求表示"""

    def __init__(self, ...):
        self.request_id: str
        self.prompt_token_ids: list[int]
        self.sampling_params: SamplingParams

        # 状态跟踪
        self.status: RequestStatus
        self.num_computed_tokens: int
        self._output_token_ids: list[int]

        # 内存管理相关
        self.block_hashes: list[BlockHash]

    @property
    def num_tokens(self) -> int:
        """当前总 token 数"""
        return len(self.prompt_token_ids) + len(self._output_token_ids)

    @property
    def num_output_tokens(self) -> int:
        """输出 token 数"""
        return len(self._output_token_ids)

6.3 请求状态机

stateDiagram-v2
    [*] --> WAITING: add_request()

    WAITING --> RUNNING: 调度成功
    WAITING --> WAITING_FOR_FSM: 需要 FSM 编译
    WAITING --> WAITING_FOR_REMOTE_KVS: 等待远程 KV

    WAITING_FOR_FSM --> WAITING: FSM 就绪
    WAITING_FOR_REMOTE_KVS --> WAITING: KV 就绪

    RUNNING --> FINISHED_STOPPED: 达到停止条件
    RUNNING --> FINISHED_LENGTH: 达到最大长度
    RUNNING --> FINISHED_ABORTED: 被中止
    RUNNING --> PREEMPTED: 被抢占

    PREEMPTED --> WAITING: 重新排队

    FINISHED_STOPPED --> [*]
    FINISHED_LENGTH --> [*]
    FINISHED_ABORTED --> [*]

7. 配置系统

7.1 EngineArgs

# vllm/engine/arg_utils.py

@dataclass
class EngineArgs:
    """引擎配置参数"""

    # 模型配置
    model: str
    tokenizer: str | None = None
    revision: str | None = None
    dtype: str = "auto"
    quantization: str | None = None

    # 并行配置
    tensor_parallel_size: int = 1
    pipeline_parallel_size: int = 1

    # 内存配置
    gpu_memory_utilization: float = 0.9
    max_model_len: int | None = None
    block_size: int = 16

    # 调度配置
    max_num_seqs: int = 256
    max_num_batched_tokens: int = 2048

    # 功能开关
    enable_prefix_caching: bool = False
    enable_chunked_prefill: bool = False

7.2 VllmConfig

# vllm/config.py

@dataclass
class VllmConfig:
    """vLLM 的完整配置"""

    model_config: ModelConfig           # 模型配置
    cache_config: CacheConfig           # 缓存配置
    parallel_config: ParallelConfig     # 并行配置
    scheduler_config: SchedulerConfig   # 调度配置
    device_config: DeviceConfig         # 设备配置
    load_config: LoadConfig             # 加载配置
    lora_config: LoRAConfig | None      # LoRA 配置
    speculative_config: SpeculativeConfig | None  # 投机解码配置

8. 代码位置速查

组件文件关键类/函数
LLM 入口vllm/entrypoints/llm.pyLLM
LLMEnginevllm/v1/engine/llm_engine.pyLLMEngine
EngineCorevllm/v1/engine/core.pyEngineCore
API Servervllm/entrypoints/openai/api_server.pymain()
配置参数vllm/engine/arg_utils.pyEngineArgs
请求类vllm/v1/request.pyRequest
请求状态vllm/v1/request.pyRequestStatus 枚举

9. 小结

本章我们了解了 vLLM 的入口点和请求处理流程:

  1. 两种使用方式

    • LLM 类用于离线批量推理
    • API Server 用于在线服务
  2. 核心组件层次

    • LLMLLMEngineEngineCoreScheduler + Executor
  3. 请求生命周期

    • 用户提交 → tokenize → 调度 → 执行 → 采样 → 返回
  4. 配置系统

    • EngineArgsVllmConfig → 各子配置

在下一章中,我们将深入 Executor 和 Worker 的实现,了解模型是如何在 GPU 上执行的。


导航

2 - 执行器与 Worker

Executor 与 Worker 详解

在上一章中,我们了解了 vLLM 的入口点和请求处理流程。本章我们将深入 Executor 和 Worker 层,了解模型是如何在 GPU 上执行的。


1. Executor 与 Worker 的关系

graph TD
    subgraph EngineCore
        Core[EngineCore]
        Sched[Scheduler]
    end

    subgraph Executor 层
        Exec[Executor]
    end

    subgraph Worker 层
        W0[Worker 0<br/>GPU 0]
        W1[Worker 1<br/>GPU 1]
        W2[Worker 2<br/>GPU 2]
        WN[Worker N<br/>GPU N]
    end

    Core --> Exec
    Core <--> Sched
    Exec --> W0
    Exec --> W1
    Exec --> W2
    Exec --> WN

    style Exec fill:#e1f5fe
    style W0 fill:#fff3e0
    style W1 fill:#fff3e0
    style W2 fill:#fff3e0
    style WN fill:#fff3e0

职责划分:

组件职责
Executor管理多个 Worker,协调分布式执行
Worker在单个 GPU 上执行模型推理

2. Executor 抽象基类

2.1 类定义

# vllm/v1/executor/abstract.py

class Executor(ABC):
    """vLLM Executor 的抽象基类

    Executor 负责在一个或多个设备上执行模型。
    """

    uses_ray: bool = False      # 是否使用 Ray
    supports_pp: bool = False   # 是否支持流水线并行

    def __init__(self, vllm_config: VllmConfig) -> None:
        self.vllm_config = vllm_config
        self.model_config = vllm_config.model_config
        self.cache_config = vllm_config.cache_config
        self.parallel_config = vllm_config.parallel_config
        ...
        self._init_executor()  # 子类实现

    @abstractmethod
    def _init_executor(self) -> None:
        """初始化 Executor(由子类实现)"""
        raise NotImplementedError

    @abstractmethod
    def collective_rpc(
        self,
        method: str | Callable,
        args: tuple = (),
        kwargs: dict | None = None,
    ) -> list:
        """在所有 Worker 上执行 RPC 调用"""
        raise NotImplementedError

    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ModelRunnerOutput | None:
        """执行模型推理"""
        return self.collective_rpc(
            "execute_model",
            args=(scheduler_output,),
        )[0]

2.2 Executor 工厂方法

@staticmethod
def get_class(vllm_config: VllmConfig) -> type["Executor"]:
    """根据配置获取合适的 Executor 类"""

    distributed_executor_backend = vllm_config.parallel_config.distributed_executor_backend

    if distributed_executor_backend == "ray":
        from vllm.v1.executor.ray_executor import RayDistributedExecutor
        return RayDistributedExecutor

    elif distributed_executor_backend == "mp":
        from vllm.v1.executor.multiproc_executor import MultiprocExecutor
        return MultiprocExecutor

    elif distributed_executor_backend == "uni":
        from vllm.v1.executor.uniproc_executor import UniProcExecutor
        return UniProcExecutor

    else:
        raise ValueError(f"Unknown executor backend: {distributed_executor_backend}")

3. Executor 实现类型

3.1 类型对比

graph TD
    Executor[Executor 抽象基类]

    Executor --> Uni[UniProcExecutor<br/>单进程单 GPU]
    Executor --> MP[MultiprocExecutor<br/>多进程多 GPU]
    Executor --> Ray[RayDistributedExecutor<br/>Ray 分布式]

    Uni -.-> Single[单 GPU 场景]
    MP -.-> Multi[单机多卡场景]
    Ray -.-> Distributed[多机分布式场景]

    style Uni fill:#c8e6c9
    style MP fill:#fff3e0
    style Ray fill:#e1f5fe

3.2 UniProcExecutor - 单进程

# vllm/v1/executor/uniproc_executor.py

class UniProcExecutor(Executor):
    """单进程 Executor,用于单 GPU 场景"""

    def _init_executor(self) -> None:
        # 直接在当前进程创建 Worker
        self.driver_worker = Worker(
            vllm_config=self.vllm_config,
            local_rank=0,
            rank=0,
            distributed_init_method="",
            is_driver_worker=True,
        )
        self.driver_worker.init_device()
        self.driver_worker.load_model()

    def collective_rpc(self, method, args=(), kwargs=None):
        """直接调用 Worker 方法"""
        if isinstance(method, str):
            func = getattr(self.driver_worker, method)
        else:
            func = lambda: method(self.driver_worker)
        return [func(*args, **(kwargs or {}))]

3.3 MultiprocExecutor - 多进程

# vllm/v1/executor/multiproc_executor.py

class MultiprocExecutor(Executor):
    """多进程 Executor,用于单机多卡场景"""

    supports_pp = True

    def _init_executor(self) -> None:
        # 创建多个 Worker 进程
        self.workers = []

        for rank in range(self.parallel_config.world_size):
            worker = self._create_worker(rank)
            self.workers.append(worker)

    def collective_rpc(self, method, args=(), kwargs=None):
        """并行调用所有 Worker"""
        futures = []
        for worker in self.workers:
            future = worker.execute_method(method, args, kwargs)
            futures.append(future)

        # 等待所有 Worker 完成
        results = [f.result() for f in futures]
        return results

3.4 RayDistributedExecutor - Ray 分布式

# vllm/v1/executor/ray_executor.py

class RayDistributedExecutor(Executor):
    """Ray 分布式 Executor,用于多机场景"""

    uses_ray = True
    supports_pp = True

    def _init_executor(self) -> None:
        import ray

        # 创建 Ray Actor(远程 Worker)
        self.workers = []

        for rank in range(self.parallel_config.world_size):
            worker_actor = ray.remote(Worker).remote(
                vllm_config=self.vllm_config,
                rank=rank,
                ...
            )
            self.workers.append(worker_actor)

    def collective_rpc(self, method, args=(), kwargs=None):
        """通过 Ray 调用远程 Worker"""
        import ray

        refs = []
        for worker in self.workers:
            ref = getattr(worker, method).remote(*args, **(kwargs or {}))
            refs.append(ref)

        # 异步获取结果
        results = ray.get(refs)
        return results

4. Worker 详解

4.1 Worker 基类

# vllm/v1/worker/worker_base.py

class WorkerBase(ABC):
    """Worker 抽象基类"""

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,        # 本地 GPU 序号
        rank: int,              # 全局 Worker 序号
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):
        self.vllm_config = vllm_config
        self.local_rank = local_rank
        self.rank = rank
        self.is_driver_worker = is_driver_worker

    @abstractmethod
    def init_device(self) -> None:
        """初始化设备(GPU)"""
        raise NotImplementedError

    @abstractmethod
    def load_model(self) -> None:
        """加载模型"""
        raise NotImplementedError

    @abstractmethod
    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ModelRunnerOutput:
        """执行模型推理"""
        raise NotImplementedError

4.2 GPU Worker

# vllm/v1/worker/gpu_worker.py

class Worker(WorkerBase):
    """GPU Worker 实现"""

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):
        super().__init__(...)

        # 配置 float32 精度
        precision = envs.VLLM_FLOAT32_MATMUL_PRECISION
        torch.set_float32_matmul_precision(precision)

        # Profiler(可选)
        self.profiler = self._setup_profiler()

    def init_device(self):
        """初始化 GPU 设备"""
        # 设置 CUDA 设备
        torch.cuda.set_device(self.local_rank)
        self.device = torch.device(f"cuda:{self.local_rank}")

        # 初始化分布式环境
        init_distributed_environment(
            world_size=self.parallel_config.world_size,
            rank=self.rank,
            distributed_init_method=self.distributed_init_method,
            backend="nccl",
        )

        # 初始化模型并行
        ensure_model_parallel_initialized(
            tensor_model_parallel_size=self.parallel_config.tensor_parallel_size,
            pipeline_model_parallel_size=self.parallel_config.pipeline_parallel_size,
        )

    def load_model(self):
        """加载模型到 GPU"""
        with self._maybe_get_memory_pool_context("weights"):
            # 创建 ModelRunner
            self.model_runner = GPUModelRunner(
                vllm_config=self.vllm_config,
                device=self.device,
            )
            # 加载模型权重
            self.model_runner.load_model()

    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ModelRunnerOutput:
        """执行模型推理"""
        return self.model_runner.execute_model(scheduler_output)

4.3 Worker 初始化流程

sequenceDiagram
    participant Exec as Executor
    participant Worker as Worker
    participant Device as CUDA Device
    participant Model as ModelRunner

    Exec->>Worker: 创建 Worker
    Worker->>Worker: __init__()

    Exec->>Worker: init_device()
    Worker->>Device: torch.cuda.set_device()
    Worker->>Device: init_distributed_environment()
    Worker->>Device: ensure_model_parallel_initialized()
    Note over Device: NCCL 初始化完成

    Exec->>Worker: load_model()
    Worker->>Model: 创建 GPUModelRunner
    Model->>Model: load_model()
    Note over Model: 模型权重加载到 GPU

    Exec->>Worker: initialize_cache()
    Worker->>Worker: 分配 KV Cache 内存

5. ModelRunner 详解

5.1 GPUModelRunner 类

# vllm/v1/worker/gpu_model_runner.py

class GPUModelRunner:
    """GPU 模型执行器"""

    def __init__(
        self,
        vllm_config: VllmConfig,
        device: torch.device,
    ):
        self.vllm_config = vllm_config
        self.device = device
        self.model: nn.Module | None = None

        # KV Cache 配置
        self.kv_caches: list[torch.Tensor] = []
        self.block_size = vllm_config.cache_config.block_size

        # 输入处理
        self.input_batch = GPUInputBatch(...)

    def load_model(self):
        """加载模型"""
        from vllm.model_executor.model_loader import get_model

        self.model = get_model(
            model_config=self.model_config,
            load_config=self.load_config,
            device_config=self.device_config,
            parallel_config=self.parallel_config,
            scheduler_config=self.scheduler_config,
        )

    def execute_model(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ModelRunnerOutput:
        """执行模型前向传播"""

        # 1. 准备输入
        model_input = self._prepare_inputs(scheduler_output)

        # 2. 执行前向传播
        with torch.inference_mode():
            hidden_states = self.model(
                input_ids=model_input.input_ids,
                positions=model_input.positions,
                kv_caches=self.kv_caches,
                attn_metadata=model_input.attn_metadata,
            )

        # 3. 计算 logits
        logits = self.model.compute_logits(hidden_states)

        # 4. 返回输出
        return ModelRunnerOutput(
            logits=logits,
            ...
        )

5.2 execute_model 流程

flowchart TD
    A[execute_model 开始] --> B[_prepare_inputs]

    subgraph 输入准备
        B --> B1[处理 token IDs]
        B1 --> B2[计算位置编码]
        B2 --> B3[构建 attention metadata]
        B3 --> B4[更新 block table]
    end

    B4 --> C[model forward]

    subgraph 前向传播
        C --> C1[Embedding]
        C1 --> C2[Transformer Layers]
        C2 --> C3[每层: Attention + FFN]
        C3 --> C4[LM Head]
    end

    C4 --> D[compute_logits]
    D --> E[返回 ModelRunnerOutput]

6. KV Cache 管理

6.1 Worker 端的 KV Cache

# vllm/v1/worker/gpu_worker.py

def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None:
    """初始化 KV Cache"""
    self.cache_config.num_gpu_blocks = num_gpu_blocks
    self.cache_config.num_cpu_blocks = num_cpu_blocks

def initialize_from_config(self, kv_cache_configs: list[KVCacheConfig]) -> None:
    """根据配置初始化 KV Cache"""
    self.model_runner.initialize_kv_cache(kv_cache_configs)

6.2 KV Cache Tensor 布局

# KV Cache 的形状
# [num_blocks, 2, num_heads, block_size, head_dim]
#     ↑        ↑      ↑          ↑         ↑
#   块数量   K和V   注意力头数  块大小   头维度

# 示例:16 块,8 头,16 token/块,128 维
kv_cache_shape = (16, 2, 8, 16, 128)
kv_cache = torch.empty(kv_cache_shape, dtype=torch.float16, device="cuda")

6.3 Block Table 使用

# vllm/v1/worker/block_table.py

class BlockTable:
    """管理每个序列的 block 映射"""

    def __init__(self, max_num_seqs: int, max_num_blocks_per_seq: int):
        # [max_num_seqs, max_num_blocks_per_seq]
        self.block_table = torch.zeros(
            (max_num_seqs, max_num_blocks_per_seq),
            dtype=torch.int32,
            device="cuda",
        )

    def update(self, req_index: int, block_ids: list[int]):
        """更新请求的 block 映射"""
        num_blocks = len(block_ids)
        self.block_table[req_index, :num_blocks] = torch.tensor(
            block_ids, dtype=torch.int32
        )

7. 分布式执行

7.1 张量并行(Tensor Parallelism)

graph LR
    subgraph GPU 0
        A0[输入] --> L0[Linear 分片 0]
        L0 --> R0[部分结果]
    end

    subgraph GPU 1
        A1[输入] --> L1[Linear 分片 1]
        L1 --> R1[部分结果]
    end

    R0 --> AllReduce
    R1 --> AllReduce
    AllReduce --> Output[完整输出]

7.2 流水线并行(Pipeline Parallelism)

graph LR
    subgraph Stage 0 - GPU 0
        L0_6[Layers 0-5]
    end

    subgraph Stage 1 - GPU 1
        L6_12[Layers 6-11]
    end

    subgraph Stage 2 - GPU 2
        L12_18[Layers 12-17]
    end

    subgraph Stage 3 - GPU 3
        L18_24[Layers 18-23]
    end

    Input --> L0_6
    L0_6 -->|激活值| L6_12
    L6_12 -->|激活值| L12_18
    L12_18 -->|激活值| L18_24
    L18_24 --> Output

7.3 collective_rpc 通信

sequenceDiagram
    participant Exec as Executor
    participant W0 as Worker 0
    participant W1 as Worker 1
    participant W2 as Worker 2

    Exec->>W0: execute_model(scheduler_output)
    Exec->>W1: execute_model(scheduler_output)
    Exec->>W2: execute_model(scheduler_output)

    Note over W0,W2: 并行执行前向传播

    par TP AllReduce
        W0->>W1: 交换中间结果
        W1->>W2: 交换中间结果
        W2->>W0: 交换中间结果
    end

    W0-->>Exec: result_0
    W1-->>Exec: result_1
    W2-->>Exec: result_2

    Note over Exec: 只使用 driver worker 的结果

8. 性能优化

8.1 CUDA Graph

# vllm/v1/worker/gpu_model_runner.py

def _capture_cuda_graph(self, ...):
    """捕获 CUDA Graph 以减少启动开销"""

    # 预热
    for _ in range(3):
        self.model(...)

    # 捕获
    graph = torch.cuda.CUDAGraph()
    with torch.cuda.graph(graph):
        output = self.model(...)

    return graph, output

def execute_model_with_cuda_graph(self, ...):
    """使用 CUDA Graph 执行"""
    # 重放预捕获的计算图
    self.cuda_graph.replay()
    return self.graph_output

8.2 内存优化

# Worker 的 sleep/wake_up 机制
def sleep(self, level: int = 1) -> None:
    """释放内存进入睡眠模式"""
    from vllm.device_allocator.cumem import CuMemAllocator

    allocator = CuMemAllocator.get_instance()

    if level == 2:
        # 保存 buffer 到 CPU
        self._sleep_saved_buffers = {
            name: buffer.cpu().clone()
            for name, buffer in self.model.named_buffers()
        }

    # 释放 GPU 内存
    allocator.sleep(offload_tags=("weights",) if level == 1 else tuple())

def wake_up(self, tags: list[str] | None = None) -> None:
    """从睡眠模式唤醒"""
    allocator = CuMemAllocator.get_instance()
    allocator.wake_up(tags)

    # 恢复 buffer
    if self._sleep_saved_buffers:
        for name, buffer in self.model.named_buffers():
            buffer.data.copy_(self._sleep_saved_buffers[name].data)

9. 代码位置速查

组件文件关键类/函数
Executor 基类vllm/v1/executor/abstract.pyExecutor
单进程 Executorvllm/v1/executor/uniproc_executor.pyUniProcExecutor
多进程 Executorvllm/v1/executor/multiproc_executor.pyMultiprocExecutor
Ray Executorvllm/v1/executor/ray_executor.pyRayDistributedExecutor
Worker 基类vllm/v1/worker/worker_base.pyWorkerBase
GPU Workervllm/v1/worker/gpu_worker.pyWorker
GPU ModelRunnervllm/v1/worker/gpu_model_runner.pyGPUModelRunner
Block Tablevllm/v1/worker/block_table.pyBlockTable

10. 小结

本章我们深入了解了 Executor 和 Worker 层:

  1. Executor 类型

    • UniProcExecutor:单进程单 GPU
    • MultiprocExecutor:单机多卡
    • RayDistributedExecutor:多机分布式
  2. Worker 职责

    • 设备初始化
    • 模型加载
    • 模型执行
    • KV Cache 管理
  3. ModelRunner

    • 输入准备
    • 前向传播
    • logits 计算
  4. 分布式执行

    • 张量并行:切分权重矩阵
    • 流水线并行:切分模型层
    • collective_rpc:跨 Worker 通信

在下一章中,我们将深入模型前向传播的具体实现。


导航

3 - 模型前向传播

模型前向传播详解

在前面的章节中,我们了解了 vLLM 的架构和调度机制。本章将深入模型的前向传播过程,以 Llama 模型为例,详细分析从输入到输出的完整计算流程。


1. 模型架构概览

1.1 Llama 模型结构

graph TD
    subgraph LlamaForCausalLM
        Input[input_ids] --> Embed[Embedding]
        Embed --> Layers[Transformer Layers × N]
        Layers --> Norm[RMSNorm]
        Norm --> LMHead[LM Head]
        LMHead --> Logits[logits]
    end

    subgraph "Transformer Layer"
        H[hidden_states] --> LN1[RMSNorm]
        LN1 --> Attn[Self-Attention]
        Attn --> Add1[+]
        H --> Add1
        Add1 --> LN2[RMSNorm]
        LN2 --> MLP[MLP]
        MLP --> Add2[+]
        Add1 --> Add2
        Add2 --> Out[output]
    end

1.2 核心组件对应关系

组件vLLM 类功能
EmbeddingVocabParallelEmbeddingtoken 到向量的映射
Transformer LayerLlamaDecoderLayer主要计算单元
Self-AttentionLlamaAttention注意力计算
MLPLlamaMLP前馈网络
LayerNormRMSNorm归一化
LM HeadParallelLMHead输出词表概率

2. vLLM 中的 Llama 实现

2.1 LlamaForCausalLM 类

# vllm/model_executor/models/llama.py

class LlamaForCausalLM(nn.Module, SupportsLoRA, SupportsPP, SupportsEagle):
    """用于因果语言建模的 Llama 模型"""

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config
        quant_config = vllm_config.quant_config

        # 主模型(Transformer 层)
        self.model = LlamaModel(vllm_config=vllm_config, prefix=maybe_prefix(prefix, "model"))

        # 输出层(LM Head)
        if config.tie_word_embeddings:
            self.lm_head = self.model.embed_tokens  # 权重共享
        else:
            self.lm_head = ParallelLMHead(
                config.vocab_size,
                config.hidden_size,
                quant_config=quant_config,
            )

        # Logits 处理器
        self.logits_processor = LogitsProcessor(config.vocab_size)

        # 采样器
        self.sampler = get_sampler()

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        kv_caches: list[torch.Tensor],
        attn_metadata: AttentionMetadata,
        intermediate_tensors: IntermediateTensors | None = None,
    ) -> torch.Tensor | IntermediateTensors:
        """模型前向传播"""

        # 1. 通过 Transformer 层
        hidden_states = self.model(input_ids, positions, intermediate_tensors)

        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        sampling_metadata: SamplingMetadata,
    ) -> torch.Tensor:
        """计算 logits"""
        logits = self.logits_processor(
            self.lm_head,
            hidden_states,
            sampling_metadata,
        )
        return logits

    def sample(
        self,
        logits: torch.Tensor,
        sampling_metadata: SamplingMetadata,
    ) -> SamplerOutput:
        """采样下一个 token"""
        next_tokens = self.sampler(logits, sampling_metadata)
        return next_tokens

2.2 LlamaModel 类

# vllm/model_executor/models/llama.py

@support_torch_compile(shape_invariants=llama_model_invariants)
class LlamaModel(nn.Module):
    """Llama 的 Transformer 模型"""

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config

        # Embedding 层
        self.embed_tokens = VocabParallelEmbedding(
            config.vocab_size,
            config.hidden_size,
        )

        # Transformer 层
        self.start_layer, self.end_layer, self.layers = make_layers(
            config.num_hidden_layers,
            lambda prefix: LlamaDecoderLayer(vllm_config=vllm_config, prefix=prefix),
            prefix=f"{prefix}.layers",
        )

        # 最终归一化
        self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)

    def forward(
        self,
        input_ids: torch.Tensor | None,
        positions: torch.Tensor,
        intermediate_tensors: IntermediateTensors | None,
        inputs_embeds: torch.Tensor | None = None,
    ) -> torch.Tensor | IntermediateTensors:

        # 1. Embedding
        if get_pp_group().is_first_rank:
            if inputs_embeds is not None:
                hidden_states = inputs_embeds
            else:
                hidden_states = self.embed_tokens(input_ids)
            residual = None
        else:
            # 流水线并行:从前一阶段获取中间结果
            hidden_states = intermediate_tensors["hidden_states"]
            residual = intermediate_tensors["residual"]

        # 2. Transformer 层
        for layer in self.layers[self.start_layer:self.end_layer]:
            hidden_states, residual = layer(
                positions,
                hidden_states,
                residual,
            )

        # 3. 最终归一化(仅最后一个 PP 阶段)
        if not get_pp_group().is_last_rank:
            return IntermediateTensors({
                "hidden_states": hidden_states,
                "residual": residual
            })

        hidden_states, _ = self.norm(hidden_states, residual)
        return hidden_states

3. Transformer 层详解

3.1 LlamaDecoderLayer

# vllm/model_executor/models/llama.py

class LlamaDecoderLayer(nn.Module):
    """单个 Transformer 解码器层"""

    def __init__(self, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config

        # Self-Attention
        self.self_attn = LlamaAttention(
            config=config,
            hidden_size=config.hidden_size,
            num_heads=config.num_attention_heads,
            num_kv_heads=config.num_key_value_heads,
            cache_config=vllm_config.cache_config,
            prefix=f"{prefix}.self_attn",
        )

        # MLP
        self.mlp = LlamaMLP(
            hidden_size=config.hidden_size,
            intermediate_size=config.intermediate_size,
            hidden_act=config.hidden_act,
            prefix=f"{prefix}.mlp",
        )

        # 归一化层
        self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.post_attention_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        residual: torch.Tensor | None,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """
        层前向传播

        使用 Pre-LN 结构(LayerNorm 在子层之前)
        """
        # 1. 第一个归一化 + 残差处理
        if residual is None:
            residual = hidden_states
            hidden_states = self.input_layernorm(hidden_states)
        else:
            hidden_states, residual = self.input_layernorm(hidden_states, residual)

        # 2. Self-Attention
        hidden_states = self.self_attn(
            positions=positions,
            hidden_states=hidden_states,
        )

        # 3. 第二个归一化 + 残差处理
        hidden_states, residual = self.post_attention_layernorm(hidden_states, residual)

        # 4. MLP
        hidden_states = self.mlp(hidden_states)

        return hidden_states, residual

3.2 前向传播数据流

flowchart TD
    subgraph "LlamaDecoderLayer Forward"
        Input["输入<br/>(hidden_states, residual)"]

        Input --> Check{residual is None?}
        Check -->|是| Init["residual = hidden_states"]
        Check -->|否| Fuse1["hidden_states, residual =<br/>input_layernorm(hidden_states, residual)"]
        Init --> LN1["hidden_states = input_layernorm(hidden_states)"]

        LN1 --> Attn["hidden_states = self_attn(positions, hidden_states)"]
        Fuse1 --> Attn

        Attn --> Fuse2["hidden_states, residual =<br/>post_attention_layernorm(hidden_states, residual)"]

        Fuse2 --> MLP["hidden_states = mlp(hidden_states)"]

        MLP --> Output["输出<br/>(hidden_states, residual)"]
    end

    style Attn fill:#e1f5fe
    style MLP fill:#fff3e0

4. Self-Attention 详解

4.1 LlamaAttention 类

# vllm/model_executor/models/llama.py

class LlamaAttention(nn.Module):
    """Llama 的多头注意力层"""

    def __init__(
        self,
        config: LlamaConfig,
        hidden_size: int,
        num_heads: int,
        num_kv_heads: int,
        cache_config: CacheConfig | None = None,
        prefix: str = "",
    ):
        super().__init__()

        # 张量并行配置
        tp_size = get_tensor_model_parallel_world_size()
        self.num_heads = num_heads // tp_size
        self.num_kv_heads = max(1, num_kv_heads // tp_size)
        self.head_dim = hidden_size // num_heads
        self.scaling = self.head_dim ** -0.5

        # Q、K、V 投影(合并为一个线性层)
        self.qkv_proj = QKVParallelLinear(
            hidden_size=hidden_size,
            head_size=self.head_dim,
            total_num_heads=num_heads,
            total_num_kv_heads=num_kv_heads,
            prefix=f"{prefix}.qkv_proj",
        )

        # 输出投影
        self.o_proj = RowParallelLinear(
            input_size=num_heads * self.head_dim,
            output_size=hidden_size,
            prefix=f"{prefix}.o_proj",
        )

        # 旋转位置编码
        self.rotary_emb = get_rope(
            self.head_dim,
            max_position=config.max_position_embeddings,
        )

        # 注意力后端
        self.attn = Attention(
            self.num_heads,
            self.head_dim,
            self.scaling,
            num_kv_heads=self.num_kv_heads,
            cache_config=cache_config,
        )

    def forward(
        self,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
    ) -> torch.Tensor:
        """
        注意力前向传播

        Args:
            positions: 位置 IDs [num_tokens]
            hidden_states: 隐藏状态 [num_tokens, hidden_size]

        Returns:
            输出 [num_tokens, hidden_size]
        """
        # 1. QKV 投影
        qkv, _ = self.qkv_proj(hidden_states)
        q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1)

        # 2. 旋转位置编码
        q, k = self.rotary_emb(positions, q, k)

        # 3. 注意力计算(使用 vLLM 的优化后端)
        attn_output = self.attn(q, k, v)

        # 4. 输出投影
        output, _ = self.o_proj(attn_output)

        return output

4.2 注意力计算流程

flowchart LR
    subgraph "Self-Attention"
        H["hidden_states<br/>[tokens, hidden]"]
        QKV["QKV Projection"]
        Split["Split"]
        Q["Q"]
        K["K"]
        V["V"]
        RoPE["RoPE"]
        Attn["Attention<br/>(with KV Cache)"]
        O["O Projection"]
        Out["output"]

        H --> QKV
        QKV --> Split
        Split --> Q
        Split --> K
        Split --> V

        Q --> RoPE
        K --> RoPE

        RoPE --> Attn
        V --> Attn

        Attn --> O
        O --> Out
    end

4.3 GQA (Grouped-Query Attention)

标准 MHA (Multi-Head Attention):
Q heads: [h1, h2, h3, h4, h5, h6, h7, h8]
K heads: [h1, h2, h3, h4, h5, h6, h7, h8]
V heads: [h1, h2, h3, h4, h5, h6, h7, h8]

GQA (num_kv_heads=2):
Q heads: [h1, h2, h3, h4, h5, h6, h7, h8]
K heads: [k1,   k1,   k1,   k1, | k2,   k2,   k2,   k2]
V heads: [v1,   v1,   v1,   v1, | v2,   v2,   v2,   v2]

每 4 个 Q head 共享 1 个 KV head,减少 KV Cache 内存

5. MLP 详解

5.1 LlamaMLP 类

# vllm/model_executor/models/llama.py

class LlamaMLP(nn.Module):
    """Llama 的 MLP(SwiGLU 激活)"""

    def __init__(
        self,
        hidden_size: int,
        intermediate_size: int,
        hidden_act: str,
        prefix: str = "",
    ):
        super().__init__()

        # gate 和 up 投影(合并)
        self.gate_up_proj = MergedColumnParallelLinear(
            input_size=hidden_size,
            output_sizes=[intermediate_size] * 2,  # gate 和 up 各一个
            prefix=f"{prefix}.gate_up_proj",
        )

        # down 投影
        self.down_proj = RowParallelLinear(
            input_size=intermediate_size,
            output_size=hidden_size,
            prefix=f"{prefix}.down_proj",
        )

        # SiLU 激活 + 门控
        self.act_fn = SiluAndMul()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        MLP 前向传播

        SwiGLU: down(SiLU(gate(x)) * up(x))
        """
        # 1. gate 和 up 投影(合并执行)
        gate_up, _ = self.gate_up_proj(x)

        # 2. SiLU 激活 + 门控相乘
        x = self.act_fn(gate_up)

        # 3. down 投影
        x, _ = self.down_proj(x)

        return x

5.2 SwiGLU 计算流程

flowchart LR
    subgraph "MLP (SwiGLU)"
        X["x<br/>[tokens, hidden]"]
        GU["gate_up_proj"]
        Gate["gate"]
        Up["up"]
        SiLU["SiLU(gate)"]
        Mul["×"]
        Down["down_proj"]
        Out["output"]

        X --> GU
        GU --> Gate
        GU --> Up
        Gate --> SiLU
        SiLU --> Mul
        Up --> Mul
        Mul --> Down
        Down --> Out
    end

6. KV Cache 集成

6.1 Attention 层的 KV Cache 使用

# vllm/attention/layer.py

class Attention(nn.Module):
    """vLLM 的注意力层,集成 KV Cache"""

    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
    ) -> torch.Tensor:
        """
        Args:
            query: [num_tokens, num_heads * head_dim]
            key: [num_tokens, num_kv_heads * head_dim]
            value: [num_tokens, num_kv_heads * head_dim]

        内部会:
        1. 将新的 K、V 写入 KV Cache
        2. 从 KV Cache 读取完整的 K、V
        3. 计算注意力
        """
        return self.impl.forward(
            query,
            key,
            value,
            kv_cache=self.kv_cache,
            attn_metadata=self.attn_metadata,
        )

6.2 PagedAttention 后端

sequenceDiagram
    participant Attn as Attention Layer
    participant PA as PagedAttention
    participant KVC as KV Cache

    Attn->>PA: forward(Q, K, V)

    rect rgb(230, 245, 230)
        Note over PA,KVC: 写入新的 K、V
        PA->>KVC: 根据 slot_mapping 写入
        Note over KVC: K、V 存储到对应的 block
    end

    rect rgb(245, 230, 230)
        Note over PA,KVC: 读取完整的 K、V
        PA->>KVC: 根据 block_table 读取
        Note over KVC: 返回所有历史 K、V
    end

    rect rgb(230, 230, 245)
        Note over PA: 计算注意力
        PA->>PA: attention = softmax(Q @ K^T / sqrt(d)) @ V
    end

    PA-->>Attn: attention output

7. 张量并行实现

7.1 线性层的张量并行

# vllm/model_executor/layers/linear.py

class ColumnParallelLinear(nn.Module):
    """列并行线性层(输出切分)"""

    def __init__(self, input_size: int, output_size: int, ...):
        tp_size = get_tensor_model_parallel_world_size()
        # 每个 GPU 只有 output_size / tp_size 的输出
        self.output_size_per_partition = output_size // tp_size
        self.weight = Parameter(torch.empty(
            self.output_size_per_partition,
            input_size,
        ))

class RowParallelLinear(nn.Module):
    """行并行线性层(输入切分)"""

    def __init__(self, input_size: int, output_size: int, ...):
        tp_size = get_tensor_model_parallel_world_size()
        # 每个 GPU 只处理 input_size / tp_size 的输入
        self.input_size_per_partition = input_size // tp_size
        self.weight = Parameter(torch.empty(
            output_size,
            self.input_size_per_partition,
        ))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        output = F.linear(x, self.weight)
        if self.reduce_results:
            # AllReduce 收集所有 GPU 的结果
            output = tensor_model_parallel_all_reduce(output)
        return output

7.2 QKV 投影的张量并行

graph TD
    subgraph GPU 0
        Q0["Q[:, :dim/2]"]
        K0["K[:, :dim/4]"]
        V0["V[:, :dim/4]"]
    end

    subgraph GPU 1
        Q1["Q[:, dim/2:]"]
        K1["K[:, dim/4:]"]
        V1["V[:, dim/4:]"]
    end

    Input["hidden_states"] --> GPU0
    Input --> GPU1

    Q0 --> Attn0["Attention 0"]
    K0 --> Attn0
    V0 --> Attn0

    Q1 --> Attn1["Attention 1"]
    K1 --> Attn1
    V1 --> Attn1

    Attn0 --> AllReduce
    Attn1 --> AllReduce
    AllReduce --> Output["output"]

8. 完整前向传播流程

flowchart TD
    subgraph "execute_model"
        Input["SchedulerOutput"]
        Prep["_prepare_inputs()"]
        FWD["model.forward()"]
        Logits["compute_logits()"]
        Sample["sample()"]
        Output["ModelRunnerOutput"]

        Input --> Prep
        Prep --> FWD
        FWD --> Logits
        Logits --> Sample
        Sample --> Output
    end

    subgraph "model.forward()"
        Embed["embed_tokens(input_ids)"]
        Layers["for layer in layers:<br/>  hidden, residual = layer(...)"]
        Norm["norm(hidden, residual)"]

        Embed --> Layers
        Layers --> Norm
    end

    subgraph "layer.forward()"
        LN1["input_layernorm"]
        Attn["self_attn(Q,K,V + KV Cache)"]
        LN2["post_attention_layernorm"]
        MLP["mlp(SwiGLU)"]

        LN1 --> Attn
        Attn --> LN2
        LN2 --> MLP
    end

    FWD --> Embed
    Norm --> Logits

9. 代码位置速查

组件文件关键类/函数
Llama 模型vllm/model_executor/models/llama.pyLlamaForCausalLM
Transformer 层vllm/model_executor/models/llama.pyLlamaDecoderLayer
Self-Attentionvllm/model_executor/models/llama.pyLlamaAttention
MLPvllm/model_executor/models/llama.pyLlamaMLP
Attention 后端vllm/attention/layer.pyAttention
旋转位置编码vllm/model_executor/layers/rotary_embedding.pyget_rope()
并行线性层vllm/model_executor/layers/linear.py*ParallelLinear
RMSNormvllm/model_executor/layers/layernorm.pyRMSNorm

10. 小结

本章我们深入了解了 vLLM 中模型前向传播的实现:

  1. 模型结构

    • LlamaForCausalLMLlamaModelLlamaDecoderLayer
    • Pre-LN 结构,每层包含 Attention 和 MLP
  2. Self-Attention

    • QKV 合并投影
    • RoPE 旋转位置编码
    • GQA 减少 KV Cache 内存
    • PagedAttention 集成
  3. MLP

    • SwiGLU 激活函数
    • gate_up 合并投影
  4. 张量并行

    • ColumnParallelLinear:输出切分
    • RowParallelLinear:输入切分 + AllReduce
  5. KV Cache 集成

    • 自动写入和读取
    • 通过 attn_metadata 控制

在下一章中,我们将深入分析采样过程,了解如何从 logits 生成下一个 token。


导航

4 - 采样过程

采样过程分析

在模型前向传播得到 logits 后,需要通过采样来生成下一个 token。采样过程不仅仅是简单地选择概率最高的 token,还涉及温度调节、top-k/top-p 过滤、惩罚机制等多种策略。本章将深入分析 vLLM 的采样实现。


1. 采样在推理流程中的位置

graph LR
    subgraph 模型推理
        Input[输入 tokens] --> Forward[前向传播]
        Forward --> Logits[logits]
    end

    subgraph 采样
        Logits --> Processor[Logits 处理]
        Processor --> Sample[采样策略]
        Sample --> Token[下一个 token]
    end

    Token --> |追加到序列| Input

采样的主要步骤:

  1. Logits 处理:应用各种处理器修改原始 logits
  2. 温度调节:控制随机性
  3. Top-k/Top-p 过滤:限制候选 token 范围
  4. 实际采样:从处理后的分布中选择 token

2. SamplingParams - 采样参数

# vllm/sampling_params.py

class SamplingParams:
    """控制采样行为的参数"""

    def __init__(
        self,
        # 基本参数
        n: int = 1,                        # 每个 prompt 生成的序列数
        best_of: int | None = None,        # 候选序列数

        # 温度
        temperature: float = 1.0,          # 采样温度

        # Top-k/Top-p
        top_k: int = -1,                   # top-k 采样(-1 表示禁用)
        top_p: float = 1.0,                # top-p (nucleus) 采样

        # 惩罚
        repetition_penalty: float = 1.0,   # 重复惩罚
        frequency_penalty: float = 0.0,    # 频率惩罚
        presence_penalty: float = 0.0,     # 存在惩罚

        # 停止条件
        max_tokens: int = 16,              # 最大生成 token 数
        stop: list[str] | None = None,     # 停止字符串
        stop_token_ids: list[int] | None = None,  # 停止 token

        # Logprobs
        logprobs: int | None = None,       # 返回的 logprobs 数量

        # 其他
        seed: int | None = None,           # 随机种子
        min_p: float = 0.0,                # min-p 采样
        ...
    ):
        ...

3. Sampler 类详解

3.1 类定义

# vllm/v1/sample/sampler.py

class Sampler(nn.Module):
    """
    从模型输出中采样下一个 token 的层。

    处理步骤:
    1. 计算 logprobs(如果请求)
    2. 转换为 float32
    3. 应用 allowed token ids 白名单
    4. 应用 bad words 排除
    5. 应用非 argmax-invariant 的 logit 处理器
    6. 应用惩罚(重复、频率、存在)
    7. 采样(贪婪或随机)
    8. 收集 top-k logprobs
    9. 返回 SamplerOutput
    """

    def __init__(self, logprobs_mode: LogprobsMode = "raw_logprobs"):
        super().__init__()
        self.topk_topp_sampler = TopKTopPSampler(logprobs_mode)

    def forward(
        self,
        logits: torch.Tensor,              # [num_tokens, vocab_size]
        sampling_metadata: SamplingMetadata,
    ) -> SamplerOutput:
        """主采样方法"""

        # 1. 计算原始 logprobs(用于返回给用户)
        num_logprobs = sampling_metadata.max_num_logprobs
        if num_logprobs is not None:
            raw_logprobs = self.compute_logprobs(logits)

        # 2. 转换为 float32
        logits = logits.to(torch.float32)

        # 3-6. 应用各种 logits 处理器
        logits = self.apply_logits_processors(logits, sampling_metadata)

        # 7. 采样
        sampled, processed_logprobs = self.sample(logits, sampling_metadata)

        # 8. 收集 logprobs
        if num_logprobs is not None:
            logprobs_tensors = self.gather_logprobs(
                raw_logprobs, num_logprobs, token_ids=sampled
            )

        # 9. 返回结果
        return SamplerOutput(
            sampled_token_ids=sampled.unsqueeze(-1),
            logprobs_tensors=logprobs_tensors,
        )

3.2 采样流程图

flowchart TD
    A[logits] --> B{需要 logprobs?}
    B -->|是| C[compute_logprobs]
    B -->|否| D[转换为 float32]
    C --> D

    D --> E[apply_logits_processors]

    subgraph Logits 处理
        E --> E1[Allowed Token IDs]
        E1 --> E2[Bad Words]
        E2 --> E3[Min Tokens]
        E3 --> E4[Logit Bias]
        E4 --> E5[Penalties]
    end

    E5 --> F[sample]

    subgraph 采样
        F --> F1{all_greedy?}
        F1 -->|是| F2[argmax]
        F1 -->|否| F3[apply_temperature]
        F3 --> F4[min_p processor]
        F4 --> F5[top_k / top_p]
        F5 --> F6[multinomial]
    end

    F2 --> G[gather_logprobs]
    F6 --> G
    G --> H[SamplerOutput]

4. 采样策略详解

4.1 贪婪采样(Greedy Sampling)

@staticmethod
def greedy_sample(logits: torch.Tensor) -> torch.Tensor:
    """选择概率最高的 token"""
    return logits.argmax(dim=-1).view(-1)

特点:

  • 确定性输出(相同输入总是产生相同输出)
  • 适合需要一致性的场景
  • 可能导致重复和无聊的输出

4.2 温度采样(Temperature Sampling)

@staticmethod
def apply_temperature(
    logits: torch.Tensor,
    temp: torch.Tensor,
    all_random: bool,
) -> torch.Tensor:
    """应用温度缩放"""
    if not all_random:
        # 避免除以零(贪婪请求的 temp 可能为 0)
        temp = torch.where(temp < _SAMPLING_EPS, 1.0, temp)
    return logits.div_(temp.unsqueeze(dim=1))

温度的作用:

logits = [2.0, 1.0, 0.5, 0.1]

temp = 0.5(更确定):
logits / 0.5 = [4.0, 2.0, 1.0, 0.2]
softmax → [0.84, 0.11, 0.04, 0.01]  # 高概率更集中

temp = 1.0(原始):
softmax → [0.47, 0.17, 0.10, 0.07]  # 原始分布

temp = 2.0(更随机):
logits / 2.0 = [1.0, 0.5, 0.25, 0.05]
softmax → [0.36, 0.22, 0.17, 0.14]  # 分布更均匀
graph LR
    subgraph "温度效果"
        Low["temp < 1<br/>更确定"]
        Normal["temp = 1<br/>原始分布"]
        High["temp > 1<br/>更随机"]
    end

    Low -.-> |"集中于高概率"| Peak[尖锐分布]
    Normal -.-> |"保持原状"| Orig[原始分布]
    High -.-> |"均匀化"| Flat[平坦分布]

4.3 Top-k 采样

# vllm/v1/sample/ops/topk_topp_sampler.py

def apply_top_k(logits: torch.Tensor, top_k: int) -> torch.Tensor:
    """只保留 top-k 个最高概率的 token"""
    if top_k < 0:
        return logits

    # 找到第 k 大的值
    top_k_values, _ = torch.topk(logits, top_k, dim=-1)
    min_top_k = top_k_values[:, -1].unsqueeze(-1)

    # 将低于阈值的 logits 设为 -inf
    return logits.masked_fill(logits < min_top_k, float('-inf'))

示例:

原始 logits: [3.5, 2.1, 1.8, 0.5, 0.1, -0.2, -1.0]
top_k = 3

处理后: [3.5, 2.1, 1.8, -inf, -inf, -inf, -inf]

softmax 后只有前 3 个有概率

4.4 Top-p (Nucleus) 采样

def apply_top_p(logits: torch.Tensor, top_p: float) -> torch.Tensor:
    """保留累积概率达到 top_p 的最小 token 集合"""
    if top_p >= 1.0:
        return logits

    # 按概率排序
    probs = torch.softmax(logits, dim=-1)
    sorted_probs, sorted_indices = torch.sort(probs, descending=True)

    # 计算累积概率
    cumsum_probs = torch.cumsum(sorted_probs, dim=-1)

    # 找到累积概率首次超过 top_p 的位置
    mask = cumsum_probs - sorted_probs > top_p

    # 将超出的 logits 设为 -inf
    sorted_logits = logits.gather(-1, sorted_indices)
    sorted_logits = sorted_logits.masked_fill(mask, float('-inf'))

    # 还原顺序
    return sorted_logits.scatter(-1, sorted_indices, sorted_logits)

示例:

原始 probs: [0.40, 0.25, 0.15, 0.10, 0.05, 0.03, 0.02]
top_p = 0.9

累积: [0.40, 0.65, 0.80, 0.90, 0.95, 0.98, 1.00]
                              ↑ 首次 >= 0.9

保留前 4 个 token,其余设为 -inf

4.5 Min-p 采样

def apply_min_p(logits: torch.Tensor, min_p: float) -> torch.Tensor:
    """过滤掉概率低于 max_prob * min_p 的 token"""
    if min_p <= 0.0:
        return logits

    probs = torch.softmax(logits, dim=-1)
    max_probs = probs.max(dim=-1, keepdim=True).values

    # 概率阈值 = 最大概率 * min_p
    threshold = max_probs * min_p

    # 过滤低概率 token
    return logits.masked_fill(probs < threshold, float('-inf'))

5. 惩罚机制

5.1 重复惩罚(Repetition Penalty)

def apply_repetition_penalty(
    logits: torch.Tensor,
    token_ids: torch.Tensor,
    penalty: float,
) -> torch.Tensor:
    """惩罚已出现的 token"""
    if penalty == 1.0:
        return logits

    # 获取已出现 token 的 logits
    score = logits.gather(-1, token_ids)

    # 应用惩罚
    # logits > 0: score = score / penalty
    # logits < 0: score = score * penalty
    score = torch.where(score > 0, score / penalty, score * penalty)

    # 写回
    return logits.scatter(-1, token_ids, score)

效果:

penalty = 1.2, token "the" 已出现

原始 logit: 2.5
惩罚后: 2.5 / 1.2 = 2.08  # 概率降低

原始 logit: -0.5
惩罚后: -0.5 * 1.2 = -0.6  # 概率进一步降低

5.2 频率惩罚(Frequency Penalty)

def apply_frequency_penalty(
    logits: torch.Tensor,
    token_counts: torch.Tensor,
    penalty: float,
) -> torch.Tensor:
    """基于出现次数的惩罚"""
    if penalty == 0.0:
        return logits

    # logits = logits - penalty * count
    return logits - penalty * token_counts

效果:

penalty = 0.5, token "the" 出现了 3 次

原始 logit: 2.5
惩罚后: 2.5 - 0.5 * 3 = 1.0

5.3 存在惩罚(Presence Penalty)

def apply_presence_penalty(
    logits: torch.Tensor,
    token_presence: torch.Tensor,  # 0 或 1
    penalty: float,
) -> torch.Tensor:
    """基于是否出现过的惩罚(不考虑次数)"""
    if penalty == 0.0:
        return logits

    # logits = logits - penalty * presence
    return logits - penalty * token_presence

6. Logprobs 计算

6.1 计算 Logprobs

@staticmethod
def compute_logprobs(logits: torch.Tensor) -> torch.Tensor:
    """计算 log 概率"""
    return logits.log_softmax(dim=-1, dtype=torch.float32)

6.2 收集 Top-k Logprobs

@staticmethod
def gather_logprobs(
    logprobs: torch.Tensor,
    num_logprobs: int,
    token_ids: torch.Tensor,
) -> LogprobsTensors:
    """收集 top-k logprobs 和采样 token 的 logprob"""

    # 1. 找 top-k
    topk_logprobs, topk_indices = torch.topk(logprobs, num_logprobs, dim=-1)

    # 2. 获取采样 token 的 logprob
    token_logprobs = logprobs.gather(-1, token_ids.unsqueeze(-1))

    # 3. 计算采样 token 的排名
    token_ranks = batched_count_greater_than(logprobs, token_logprobs)

    # 4. 合并结果
    indices = torch.cat((token_ids.unsqueeze(-1), topk_indices), dim=1)
    logprobs = torch.cat((token_logprobs, topk_logprobs), dim=1)

    return LogprobsTensors(indices, logprobs, token_ranks)

7. 批量采样优化

7.1 SamplingMetadata

# vllm/v1/sample/metadata.py

class SamplingMetadata:
    """批量采样的元数据"""

    # 基本信息
    all_greedy: bool          # 所有请求都是贪婪采样
    all_random: bool          # 所有请求都是随机采样

    # 参数(批量张量)
    temperature: torch.Tensor | None   # [batch_size]
    top_k: torch.Tensor | None         # [batch_size]
    top_p: torch.Tensor | None         # [batch_size]

    # Logprobs
    max_num_logprobs: int | None

    # 惩罚相关
    # ...

    # 随机数生成器(每个请求一个)
    generators: list[torch.Generator] | None

7.2 批量处理流程

sequenceDiagram
    participant S as Scheduler
    participant R as ModelRunner
    participant Sampler as Sampler

    S->>R: SchedulerOutput (多个请求)
    R->>R: 构建 SamplingMetadata
    Note over R: 将各请求的采样参数<br/>批量化为张量

    R->>Sampler: forward(logits, metadata)

    alt all_greedy
        Sampler->>Sampler: argmax (快速路径)
    else mixed
        Sampler->>Sampler: 批量应用温度
        Sampler->>Sampler: 批量 top-k/top-p
        Sampler->>Sampler: multinomial 采样
        Sampler->>Sampler: 合并贪婪和随机结果
    end

    Sampler-->>R: SamplerOutput

8. 采样参数组合建议

8.1 常见场景配置

场景temperaturetop_ktop_p说明
代码生成0.0--贪婪,确保正确性
技术写作0.3-0.9低随机性
创意写作0.8500.95高随机性
对话0.7400.9平衡
头脑风暴1.2-0.95非常随机

8.2 惩罚参数建议

参数推荐范围说明
repetition_penalty1.0-1.2轻微惩罚重复
frequency_penalty0.0-0.5减少高频词
presence_penalty0.0-0.5鼓励新话题

9. 代码位置速查

组件文件关键类/函数
采样参数vllm/sampling_params.pySamplingParams
Samplervllm/v1/sample/sampler.pySampler
采样元数据vllm/v1/sample/metadata.pySamplingMetadata
Top-k/Top-pvllm/v1/sample/ops/topk_topp_sampler.pyTopKTopPSampler
惩罚vllm/v1/sample/ops/penalties.pyapply_*_penalty()
Logprobsvllm/v1/sample/ops/logprobs.pygather_logprobs()

10. 小结

本章我们深入了解了 vLLM 的采样过程:

  1. 采样参数:temperature、top_k、top_p、min_p 等
  2. 采样策略
    • 贪婪采样:确定性,选择最高概率
    • 温度采样:控制随机性
    • Top-k:只考虑前 k 个 token
    • Top-p:累积概率阈值过滤
    • Min-p:相对于最高概率的阈值
  3. 惩罚机制
    • 重复惩罚:惩罚已出现 token
    • 频率惩罚:基于出现次数
    • 存在惩罚:基于是否出现
  4. Logprobs:返回 token 的对数概率
  5. 批量优化:通过 SamplingMetadata 批量处理

在下一章中,我们将分析输出处理过程,了解采样结果如何转换为用户可见的输出。


导航

5 - 输出处理

输出处理流程

采样完成后,生成的 token 需要经过一系列处理才能最终返回给用户。本章将详细分析从采样结果到用户输出的完整处理流程。


1. 输出处理在整体流程中的位置

graph LR
    subgraph 模型执行
        Sample[采样] --> SamplerOut[SamplerOutput]
    end

    subgraph 输出处理
        SamplerOut --> Update[更新请求状态]
        Update --> Check[检查停止条件]
        Check --> Detok[Detokenize]
        Detok --> Build[构建输出]
    end

    subgraph 返回用户
        Build --> Stream[流式返回]
        Build --> Final[最终返回]
    end

2. 输出数据结构

2.1 SamplerOutput

# vllm/v1/outputs.py

@dataclass
class SamplerOutput:
    """采样器的输出"""

    # 采样的 token IDs [num_requests, 1]
    sampled_token_ids: torch.Tensor

    # Logprobs 信息(可选)
    logprobs_tensors: LogprobsTensors | None = None

@dataclass
class LogprobsTensors:
    """Logprobs 张量"""

    # Top-k token indices [num_tokens, num_logprobs + 1]
    indices: torch.Tensor

    # Top-k logprobs [num_tokens, num_logprobs + 1]
    logprobs: torch.Tensor

    # 采样 token 的排名 [num_tokens]
    ranks: torch.Tensor

2.2 ModelRunnerOutput

# vllm/v1/outputs.py

@dataclass
class ModelRunnerOutput:
    """ModelRunner 的输出"""

    # 每个请求的采样结果
    # Dict[request_id, SamplerOutput]
    sampler_output: dict[str, SamplerOutput]

    # 模型特定的输出(如 pooling embeddings)
    model_output: Any | None = None

2.3 EngineCoreOutput

# vllm/v1/engine/__init__.py

@dataclass
class EngineCoreOutput:
    """EngineCore 的单个请求输出"""

    request_id: str

    # 新生成的 token IDs
    new_token_ids: list[int]

    # 完成原因(如果完成)
    finish_reason: FinishReason | None = None

    # 停止字符串(如果因停止字符串完成)
    stop_str: str | None = None

    # Logprobs 信息
    new_logprobs: list[dict[int, Logprob]] | None = None

@dataclass
class EngineCoreOutputs:
    """EngineCore 的批量输出"""

    outputs: list[EngineCoreOutput]

    # 完成的请求 IDs
    finished_req_ids: set[str] | None = None

2.4 RequestOutput(最终用户输出)

# vllm/outputs.py

@dataclass
class RequestOutput:
    """用户可见的最终输出"""

    request_id: str

    # 原始 prompt
    prompt: str | None
    prompt_token_ids: list[int]

    # 生成结果(可能多个,如 beam search)
    outputs: list[CompletionOutput]

    # 是否完成
    finished: bool

    # 指标
    metrics: RequestMetrics | None = None

@dataclass
class CompletionOutput:
    """单个生成序列的输出"""

    index: int                    # 序列索引
    text: str                     # 生成的文本
    token_ids: list[int]          # 生成的 token IDs
    cumulative_logprob: float | None  # 累积对数概率
    logprobs: list[dict] | None   # 每个 token 的 logprobs
    finish_reason: str | None     # 完成原因
    stop_reason: str | int | None # 停止原因

3. 输出处理流程详解

3.1 update_from_output() - 状态更新

# vllm/v1/core/sched/scheduler.py

def update_from_output(
    self,
    model_runner_output: ModelRunnerOutput,
    sampler_output: SamplerOutput | None,
    scheduler_output: SchedulerOutput,
) -> EngineCoreOutputs:
    """根据模型输出更新请求状态"""

    outputs: list[EngineCoreOutput] = []

    for req_id, req_output in model_runner_output.items():
        request = self.requests[req_id]

        # 1. 获取新生成的 token IDs
        new_token_ids = req_output.sampled_token_ids.tolist()

        # 2. 追加到请求
        request.append_output_token_ids(new_token_ids)

        # 3. 检查停止条件
        finish_reason, stop_str = check_stop(request, self.max_model_len)

        # 4. 处理完成的请求
        if finish_reason is not None:
            self._finish_request(request, finish_reason)

        # 5. 构建输出
        output = EngineCoreOutput(
            request_id=req_id,
            new_token_ids=new_token_ids,
            finish_reason=finish_reason,
            stop_str=stop_str,
            new_logprobs=self._process_logprobs(req_output),
        )
        outputs.append(output)

    return EngineCoreOutputs(outputs=outputs)

3.2 停止条件检查

# vllm/v1/core/sched/utils.py

def check_stop(
    request: Request,
    max_model_len: int,
) -> tuple[FinishReason | None, str | None]:
    """检查请求是否应该停止"""

    # 1. 检查 EOS token
    last_token_id = request.all_token_ids[-1]
    if last_token_id == request.eos_token_id:
        return FinishReason.STOP, None

    # 2. 检查最大输出长度
    if request.num_output_tokens >= request.max_tokens:
        return FinishReason.LENGTH, None

    # 3. 检查模型最大长度
    if len(request.all_token_ids) >= max_model_len:
        return FinishReason.LENGTH, None

    # 4. 检查停止 token IDs
    if request.stop_token_ids:
        if last_token_id in request.stop_token_ids:
            return FinishReason.STOP, None

    # 5. 检查停止字符串
    if request.stop_strings:
        output_text = request.get_output_text()
        for stop_str in request.stop_strings:
            if stop_str in output_text:
                return FinishReason.STOP, stop_str

    return None, None  # 继续生成

3.3 流程图

flowchart TD
    A[SamplerOutput] --> B[遍历每个请求]
    B --> C[获取 new_token_ids]
    C --> D[append_output_token_ids]
    D --> E[check_stop]

    E --> F{停止?}
    F -->|EOS| G[finish_reason = STOP]
    F -->|max_tokens| H[finish_reason = LENGTH]
    F -->|stop_string| I[finish_reason = STOP<br/>stop_str = ...]
    F -->|否| J[继续]

    G --> K[_finish_request]
    H --> K
    I --> K

    K --> L[释放 KV Cache]
    L --> M[从 running 移除]

    J --> N[构建 EngineCoreOutput]
    M --> N

    N --> O[EngineCoreOutputs]

4. Detokenization - 反向分词

4.1 增量 Detokenize

# vllm/v1/engine/detokenizer.py

class IncrementalDetokenizer:
    """增量反向分词器"""

    def __init__(self, tokenizer: TokenizerLike, request: Request):
        self.tokenizer = tokenizer
        self.request = request

        # 已解码的文本
        self.output_text = ""

        # 待解码的 token 缓冲区
        self.token_buffer: list[int] = []

        # 上一次的偏移量(用于流式输出)
        self.prev_output_len = 0

    def decode(self, new_token_ids: list[int]) -> str:
        """解码新的 token,返回新增的文本"""

        # 添加到缓冲区
        self.token_buffer.extend(new_token_ids)

        # 尝试解码
        text = self.tokenizer.decode(
            self.token_buffer,
            skip_special_tokens=True,
        )

        # 检查是否有完整的字符
        # (某些 token 可能是部分字符,需要等待后续 token)
        if self._is_valid_utf8(text):
            self.output_text = text
            new_text = text[self.prev_output_len:]
            self.prev_output_len = len(text)
            return new_text

        return ""

4.2 增量解码的必要性

Token IDs: [15496, 284, 262, 995, 0]

逐个解码:
15496 → "Hello"     ✓ 完整单词
284   → " to"       ✓ 完整
262   → " the"      ✓ 完整
995   → " wor"      ? 可能是 "world" 的一部分
0     → "ld"        ✓ 完成 "world"

增量解码会等待 995+0 一起解码为 " world"
而不是输出 " wor" 然后 "ld"

5. 流式输出

5.1 流式输出架构

sequenceDiagram
    participant User as 用户
    participant API as API Server
    participant Engine as LLMEngine
    participant Core as EngineCore

    User->>API: POST /v1/completions<br/>stream=true

    API->>Engine: add_request()

    loop 每个 step
        Engine->>Core: step()
        Core-->>Engine: EngineCoreOutputs

        Engine->>Engine: detokenize()
        Engine-->>API: yield partial_output

        API-->>User: SSE: data: {...}
    end

    Engine-->>API: final_output
    API-->>User: SSE: data: [DONE]

5.2 Server-Sent Events (SSE) 格式

# 流式响应示例
async def stream_response():
    async for output in engine.generate_stream(prompt, params):
        yield f"data: {json.dumps(output)}\n\n"

    yield "data: [DONE]\n\n"

实际输出示例:

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" there"}}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"}}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{},"finish_reason":"stop"}]}

data: [DONE]

6. Logprobs 处理

6.1 Logprobs 数据结构

@dataclass
class Logprob:
    """单个 token 的 logprob 信息"""

    logprob: float           # 对数概率
    rank: int | None         # 在词表中的排名
    decoded_token: str       # 解码后的文本

# 每个位置的 logprobs
# Dict[token_id, Logprob]

6.2 Logprobs 处理流程

def _process_logprobs(
    self,
    sampler_output: SamplerOutput,
) -> list[dict[int, Logprob]] | None:
    """处理 logprobs 输出"""

    if sampler_output.logprobs_tensors is None:
        return None

    tensors = sampler_output.logprobs_tensors
    result = []

    for i in range(tensors.indices.shape[0]):
        token_logprobs = {}

        # 获取 top-k token IDs 和 logprobs
        for j in range(tensors.indices.shape[1]):
            token_id = tensors.indices[i, j].item()
            logprob = tensors.logprobs[i, j].item()

            # 解码 token
            decoded = self.tokenizer.decode([token_id])

            token_logprobs[token_id] = Logprob(
                logprob=logprob,
                rank=j if j > 0 else tensors.ranks[i].item(),
                decoded_token=decoded,
            )

        result.append(token_logprobs)

    return result

7. 输出格式化

7.1 OpenAI 兼容格式

# vllm/entrypoints/openai/protocol.py

class ChatCompletionResponse(BaseModel):
    """OpenAI Chat Completion 响应格式"""

    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: list[ChatCompletionResponseChoice]
    usage: UsageInfo

class ChatCompletionResponseChoice(BaseModel):
    index: int
    message: ChatMessage
    finish_reason: str | None
    logprobs: ChoiceLogprobs | None

class ChatMessage(BaseModel):
    role: str
    content: str

7.2 格式转换

def create_chat_completion_response(
    request_output: RequestOutput,
    model_name: str,
) -> ChatCompletionResponse:
    """将 RequestOutput 转换为 OpenAI 格式"""

    choices = []
    for i, output in enumerate(request_output.outputs):
        choice = ChatCompletionResponseChoice(
            index=i,
            message=ChatMessage(
                role="assistant",
                content=output.text,
            ),
            finish_reason=output.finish_reason,
            logprobs=_convert_logprobs(output.logprobs),
        )
        choices.append(choice)

    return ChatCompletionResponse(
        id=f"chatcmpl-{request_output.request_id}",
        created=int(time.time()),
        model=model_name,
        choices=choices,
        usage=UsageInfo(
            prompt_tokens=len(request_output.prompt_token_ids),
            completion_tokens=sum(len(o.token_ids) for o in request_output.outputs),
            total_tokens=...,
        ),
    )

8. 输出处理完整流程图

flowchart TD
    subgraph EngineCore
        SO[SamplerOutput] --> UO[update_from_output]
        UO --> CS[check_stop]
        CS --> ECO[EngineCoreOutput]
    end

    subgraph LLMEngine
        ECO --> DT[Detokenize]
        DT --> PL[Process Logprobs]
        PL --> RO[RequestOutput]
    end

    subgraph API Server
        RO --> FMT[格式化]
        FMT --> |非流式| JSON[JSON Response]
        FMT --> |流式| SSE[SSE Events]
    end

    subgraph 用户
        JSON --> User1[完整响应]
        SSE --> User2[增量响应]
    end

9. 代码位置速查

组件文件关键类/函数
输出数据结构vllm/v1/outputs.pySamplerOutput, ModelRunnerOutput
EngineCore 输出vllm/v1/engine/__init__.pyEngineCoreOutput
用户输出vllm/outputs.pyRequestOutput, CompletionOutput
状态更新vllm/v1/core/sched/scheduler.pyupdate_from_output()
停止检查vllm/v1/core/sched/utils.pycheck_stop()
Detokenizervllm/v1/engine/detokenizer.pyIncrementalDetokenizer
OpenAI 格式vllm/entrypoints/openai/protocol.pyChatCompletionResponse

10. 小结

本章我们详细分析了输出处理流程:

  1. 数据结构链路

    • SamplerOutputModelRunnerOutputEngineCoreOutputRequestOutput
  2. 状态更新

    • 追加 token 到请求
    • 检查停止条件
    • 处理完成的请求
  3. 停止条件

    • EOS token
    • 最大长度
    • 停止字符串/token
  4. Detokenization

    • 增量解码
    • 处理部分字符
  5. 流式输出

    • Server-Sent Events
    • 增量返回
  6. 格式化

    • OpenAI 兼容格式
    • Logprobs 处理

在下一章中,我们将完整跟踪一个请求从提交到返回的完整生命周期。


导航

6 - 请求生命周期

请求完整生命周期

本章将完整跟踪一个请求从用户提交到最终返回的全过程,将前面章节的知识串联起来,帮助读者建立完整的认知图景。


1. 生命周期概览

graph TD
    subgraph 1. 提交阶段
        A1[用户调用 generate]
        A2[Tokenize]
        A3[创建请求]
        A4[加入 waiting 队列]
    end

    subgraph 2. 调度阶段
        B1[查找前缀缓存]
        B2[分配 KV Cache]
        B3[加入 running 队列]
    end

    subgraph 3. 执行阶段
        C1[准备输入]
        C2[模型前向传播]
        C3[采样]
    end

    subgraph 4. 更新阶段
        D1[追加 token]
        D2[检查停止条件]
        D3[更新状态]
    end

    subgraph 5. 返回阶段
        E1[Detokenize]
        E2[构建输出]
        E3[返回用户]
    end

    A1 --> A2 --> A3 --> A4
    A4 --> B1 --> B2 --> B3
    B3 --> C1 --> C2 --> C3
    C3 --> D1 --> D2 --> D3
    D3 -->|未完成| C1
    D3 -->|完成| E1 --> E2 --> E3

2. 阶段 1:请求提交

2.1 用户调用

# 用户代码
from vllm import LLM, SamplingParams

llm = LLM(model="meta-llama/Llama-2-7b-hf")

prompts = ["The capital of France is"]
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=50)

outputs = llm.generate(prompts, sampling_params)

2.2 Tokenize

# vllm/entrypoints/llm.py

def generate(self, prompts, sampling_params, ...):
    # 1. 处理输入
    for prompt in prompts:
        # Tokenize prompt
        prompt_token_ids = self.tokenizer.encode(prompt)

        # 创建请求
        request_id = str(next(self.request_counter))

        self._add_request(
            request_id=request_id,
            prompt=prompt,
            prompt_token_ids=prompt_token_ids,
            params=sampling_params,
        )

2.3 创建 EngineCoreRequest

# vllm/v1/engine/llm_engine.py

def add_request(self, request_id, prompt, params, ...):
    # 构建 EngineCoreRequest
    engine_request = EngineCoreRequest(
        request_id=request_id,
        prompt_token_ids=prompt_token_ids,
        sampling_params=params,
        arrival_time=time.time(),
        eos_token_id=self.tokenizer.eos_token_id,
    )

    # 发送到 EngineCore
    self.engine_core.add_request(engine_request)

2.4 加入 Waiting 队列

# vllm/v1/core/sched/scheduler.py

def add_request(self, request: EngineCoreRequest) -> None:
    # 1. 创建内部 Request 对象
    internal_request = Request(
        request_id=request.request_id,
        prompt_token_ids=request.prompt_token_ids,
        sampling_params=request.sampling_params,
    )

    # 2. 计算 block hashes(用于前缀缓存)
    if self.enable_caching:
        internal_request.block_hashes = compute_block_hashes(
            internal_request.prompt_token_ids,
            self.block_size,
        )

    # 3. 加入 waiting 队列
    internal_request.status = RequestStatus.WAITING
    self.waiting.append_request(internal_request)

    # 4. 记录到请求字典
    self.requests[request.request_id] = internal_request

2.5 提交阶段时序图

sequenceDiagram
    participant User as 用户
    participant LLM as LLM
    participant Tokenizer as Tokenizer
    participant Engine as LLMEngine
    participant Core as EngineCore
    participant Sched as Scheduler

    User->>LLM: generate(prompts, params)
    LLM->>Tokenizer: encode(prompt)
    Tokenizer-->>LLM: token_ids

    LLM->>Engine: add_request(id, tokens, params)
    Engine->>Engine: 创建 EngineCoreRequest
    Engine->>Core: add_request(request)

    Core->>Sched: add_request(request)
    Sched->>Sched: 创建 internal Request
    Sched->>Sched: 计算 block_hashes
    Sched->>Sched: waiting.append(request)

    Note over Sched: 请求进入 WAITING 状态

3. 阶段 2:调度

3.1 查找前缀缓存

# vllm/v1/core/sched/scheduler.py :: schedule()

# 从 waiting 队列取出请求
request = self.waiting.peek_request()

# 查找前缀缓存
new_computed_blocks, num_cached_tokens = (
    self.kv_cache_manager.get_computed_blocks(request)
)

# num_cached_tokens 表示可以跳过的 token 数
# 例如:prompt 有 100 tokens,前 64 个已缓存
# 则只需要计算后 36 个

3.2 分配 KV Cache

# 计算需要处理的 token 数
num_new_tokens = request.num_tokens - num_cached_tokens

# 分配 KV Cache slots
new_blocks = self.kv_cache_manager.allocate_slots(
    request,
    num_new_tokens,
    num_new_computed_tokens=num_cached_tokens,
    new_computed_blocks=new_computed_blocks,
)

if new_blocks is None:
    # 内存不足,请求继续等待
    return

# 分配成功

3.3 移入 Running 队列

# 从 waiting 移除
request = self.waiting.pop_request()

# 加入 running
self.running.append(request)

# 更新状态
request.status = RequestStatus.RUNNING
request.num_computed_tokens = num_cached_tokens

3.4 调度阶段示意图

flowchart TD
    subgraph Scheduler.schedule
        W[waiting 队列] --> Peek[peek_request]
        Peek --> Cache[get_computed_blocks]
        Cache --> Alloc[allocate_slots]

        Alloc --> Check{分配成功?}
        Check -->|是| Move[移入 running]
        Check -->|否| Wait[继续等待]

        Move --> SO[构建 SchedulerOutput]
    end

    subgraph SchedulerOutput
        SO --> Reqs[scheduled_new_reqs]
        SO --> Blocks[req_to_new_blocks]
        SO --> Tokens[num_scheduled_tokens]
    end

4. 阶段 3:模型执行

4.1 准备输入

# vllm/v1/worker/gpu_model_runner.py

def execute_model(self, scheduler_output: SchedulerOutput):
    # 1. 准备 input_ids
    input_ids = self._prepare_input_ids(scheduler_output)

    # 2. 准备 positions
    positions = self._prepare_positions(scheduler_output)

    # 3. 准备 attention metadata
    attn_metadata = self._prepare_attention_metadata(scheduler_output)

    # 4. 更新 block table
    self._update_block_table(scheduler_output)

4.2 模型前向传播

    # 5. 前向传播
    with torch.inference_mode():
        hidden_states = self.model(
            input_ids=input_ids,
            positions=positions,
            kv_caches=self.kv_caches,
            attn_metadata=attn_metadata,
        )

    # 6. 计算 logits
    logits = self.model.compute_logits(hidden_states)

    return ModelRunnerOutput(logits=logits, ...)

4.3 采样

# vllm/v1/executor/abstract.py

def sample_tokens(self, model_output: ModelRunnerOutput) -> SamplerOutput:
    # 构建采样元数据
    sampling_metadata = self._prepare_sampling_metadata()

    # 采样
    sampler_output = self.sampler(
        model_output.logits,
        sampling_metadata,
    )

    return sampler_output

4.4 执行阶段时序图

sequenceDiagram
    participant Core as EngineCore
    participant Exec as Executor
    participant Worker as Worker
    participant Runner as ModelRunner
    participant Model as Model
    participant Sampler as Sampler

    Core->>Exec: execute_model(scheduler_output)
    Exec->>Worker: execute_model()
    Worker->>Runner: execute_model()

    Runner->>Runner: _prepare_inputs()
    Runner->>Model: forward(input_ids, positions, kv_caches)

    Note over Model: Embedding → Transformer Layers → Norm

    Model-->>Runner: hidden_states
    Runner->>Model: compute_logits(hidden_states)
    Model-->>Runner: logits

    Runner-->>Worker: ModelRunnerOutput
    Worker-->>Exec: output

    Core->>Exec: sample_tokens()
    Exec->>Sampler: forward(logits, metadata)

    Note over Sampler: Temperature → Top-k/p → Sample

    Sampler-->>Exec: SamplerOutput
    Exec-->>Core: sampled_tokens

5. 阶段 4:状态更新

5.1 追加 Token

# vllm/v1/core/sched/scheduler.py

def update_from_output(self, model_output, sampler_output, scheduler_output):
    for req_id, output in sampler_output.items():
        request = self.requests[req_id]

        # 获取新生成的 token
        new_token_ids = output.sampled_token_ids.tolist()

        # 追加到请求
        request.append_output_token_ids(new_token_ids)

        # 更新 computed_tokens
        request.num_computed_tokens += 1

5.2 检查停止条件

        # 检查是否完成
        finish_reason, stop_str = check_stop(request, self.max_model_len)

        if finish_reason is not None:
            # 请求完成
            self._finish_request(request, finish_reason)
            finished_outputs.append(...)
        else:
            # 继续生成
            outputs.append(...)

5.3 完成请求处理

def _finish_request(self, request: Request, reason: FinishReason):
    # 1. 释放 KV Cache
    self.kv_cache_manager.free(request)

    # 2. 从 running 移除
    self.running.remove(request)

    # 3. 更新状态
    request.status = RequestStatus.FINISHED

    # 4. 记录完成
    self.finished_req_ids.add(request.request_id)

6. 阶段 5:返回结果

6.1 Detokenize

# vllm/v1/engine/llm_engine.py

def _process_outputs(self, engine_outputs: EngineCoreOutputs):
    results = []

    for output in engine_outputs.outputs:
        request = self.requests[output.request_id]

        # 增量解码
        new_text = self.detokenizer.decode(
            request,
            output.new_token_ids,
        )

        # 更新请求的输出文本
        request.output_text += new_text

        results.append(...)

    return results

6.2 构建 RequestOutput

def _make_request_output(self, request: Request, finished: bool):
    return RequestOutput(
        request_id=request.request_id,
        prompt=request.prompt,
        prompt_token_ids=request.prompt_token_ids,
        outputs=[
            CompletionOutput(
                index=0,
                text=request.output_text,
                token_ids=request.output_token_ids,
                finish_reason=request.finish_reason,
                logprobs=request.logprobs,
            )
        ],
        finished=finished,
    )

6.3 返回用户

# vllm/entrypoints/llm.py

def _run_engine(self, use_tqdm: bool):
    outputs = []

    while self.llm_engine.has_unfinished_requests():
        step_outputs = self.llm_engine.step()

        for output in step_outputs:
            if output.finished:
                outputs.append(output)

    return sorted(outputs, key=lambda x: int(x.request_id))

7. 完整生命周期时序图

sequenceDiagram
    participant User as 用户
    participant LLM as LLM
    participant Engine as LLMEngine
    participant Core as EngineCore
    participant Sched as Scheduler
    participant KVM as KVCacheManager
    participant Exec as Executor
    participant Model as Model

    rect rgb(230, 245, 230)
        Note over User,Model: 1. 提交阶段
        User->>LLM: generate(prompt, params)
        LLM->>Engine: add_request()
        Engine->>Core: add_request()
        Core->>Sched: add_request()
        Note over Sched: status = WAITING
    end

    loop 每个 step
        rect rgb(255, 245, 230)
            Note over User,Model: 2. 调度阶段
            Core->>Sched: schedule()
            Sched->>KVM: get_computed_blocks()
            KVM-->>Sched: cached_blocks, num_cached

            Sched->>KVM: allocate_slots()
            KVM-->>Sched: new_blocks

            Note over Sched: status = RUNNING
            Sched-->>Core: SchedulerOutput
        end

        rect rgb(245, 230, 230)
            Note over User,Model: 3. 执行阶段
            Core->>Exec: execute_model()
            Exec->>Model: forward()
            Model-->>Exec: logits

            Exec->>Exec: sample()
            Exec-->>Core: SamplerOutput
        end

        rect rgb(230, 230, 245)
            Note over User,Model: 4. 更新阶段
            Core->>Sched: update_from_output()
            Sched->>Sched: append_token()
            Sched->>Sched: check_stop()

            alt 完成
                Sched->>KVM: free()
                Note over Sched: status = FINISHED
            end
        end
    end

    rect rgb(245, 245, 230)
        Note over User,Model: 5. 返回阶段
        Core-->>Engine: EngineCoreOutputs
        Engine->>Engine: detokenize()
        Engine-->>LLM: RequestOutput
        LLM-->>User: outputs
    end

8. 状态转换汇总

stateDiagram-v2
    [*] --> WAITING: add_request()

    WAITING --> RUNNING: schedule() 成功
    WAITING --> WAITING_FOR_FSM: 需要 FSM 编译
    WAITING --> WAITING_FOR_REMOTE_KVS: 等待远程 KV

    WAITING_FOR_FSM --> WAITING: FSM 就绪
    WAITING_FOR_REMOTE_KVS --> WAITING: KV 就绪

    RUNNING --> RUNNING: step() 继续生成
    RUNNING --> PREEMPTED: 内存不足被抢占
    RUNNING --> FINISHED_STOPPED: EOS 或停止字符串
    RUNNING --> FINISHED_LENGTH: 达到 max_tokens
    RUNNING --> FINISHED_ABORTED: 用户取消

    PREEMPTED --> WAITING: 重新排队

    FINISHED_STOPPED --> [*]: 释放资源
    FINISHED_LENGTH --> [*]: 释放资源
    FINISHED_ABORTED --> [*]: 释放资源

9. 关键数据结构流转

用户输入
    ↓
prompt: str
    ↓ Tokenize
prompt_token_ids: list[int]
    ↓ 创建请求
EngineCoreRequest
    ↓ 调度器内部
Request (internal)
    ↓ 调度
SchedulerOutput
    ↓ 执行
ModelRunnerOutput (logits)
    ↓ 采样
SamplerOutput (token_ids)
    ↓ 更新
EngineCoreOutput
    ↓ Detokenize
RequestOutput
    ↓
用户输出

10. 小结

本章我们完整跟踪了一个请求的生命周期:

  1. 提交阶段

    • Tokenize → 创建请求 → 加入 waiting 队列
  2. 调度阶段

    • 查找缓存 → 分配 KV Cache → 移入 running
  3. 执行阶段

    • 准备输入 → 前向传播 → 采样
  4. 更新阶段

    • 追加 token → 检查停止 → 更新状态
  5. 返回阶段

    • Detokenize → 构建输出 → 返回用户

通过这个完整的流程分析,我们可以看到 vLLM 的各个组件是如何协同工作的,以及为什么它能够实现高效的 LLM 推理。


导航