In-Depth Study of Offline Connector

vllm/v1/kv_offload/base.py

OffloadKey

一个 block 需要两个信息来唯一标识:

  1. block_hash: 内容的哈希 (比如 0xABCD1234…)
  2. group_idx: 属于哪个 KV cache 组 (比如 0, 1, 2)

解决方案:打包成 bytes

核心抽象

@dataclass
class ReqContext:
    req_id: str
    kv_transfer_params: dict[str, Any] | None = None
class LoadStoreSpec(ABC):
    """
    Abstract metadata that encapsulates information allowing a worker
    to load, and optionally also to store, blocks of KV data.
    """

    @staticmethod
    @abstractmethod
    def medium() -> str:
        """Returns a string representation of the medium type."""
        pass
@dataclass
class PrepareStoreOutput:
    keys_to_store: list[OffloadKey]
    store_spec: LoadStoreSpec
    evicted_keys: list[OffloadKey]
@dataclass
class OffloadingEvent:
    keys: list[OffloadKey]
    medium: str
    removed: bool  # True if blocks are removed, False if stored

PrepareStoreOutput 生成流程

Scheduler (调度器)
  → 发现 GPU 上有空闲 blocks 需要 offload
  → 调用 SimpleCPUOffloadScheduler.prepare_store(keys)

SimpleCPUOffloadScheduler
  → 输入: keys (GPU 上的 blocks)
  → 结合自己的数据:
     - CPU cache 当前状态 (哪些 blocks 已缓存)
     - CPU pool 容量 (是否满了)
     - LRU 顺序 (哪些 blocks 最久未使用)
  → 输出: PrepareStoreOutput
     - keys_to_store: 要 offload 的 blocks
     - store_spec: CPU 上的目标位置
     - evicted_keys: 被驱逐的旧 blocks

事件处理流程

Scheduler 在 update_from_output() 方法中:

  1. 调用 connector.take_events() 获取事件 → 转发到 SimpleCPUOffloadScheduler.take_events() → 转发到 CPUOffloadManager.take_events()

  2. 合并所有事件 (kv_cache_manager + connector)

  3. 通过 kv_event_publisher 发布给外部监听者

RequestOffloadState

主要包含信息:

  1. SchedulerOffloadConfig
  2. Request (请求 id、token 数据、block_hashes 等)
  3. group_states
    • offload_keys: block_hash + group_idx
    • block_ids: scheduler 分配的
    • next_stored_block_idx: 跟踪 offload 进度 (哪个 block 是下一个要保存的)
    • num_hit_blocks: 跟踪进行中的传输 jobs (load 或 store)

TransferJobStatus

字段 类型 说明
req_id ReqId 属于哪个请求
pending_count int 还有几个 worker 没完成
keys set[OffloadKey] 这个 job 涉及的 offload keys
is_store bool True = store (GPU→CPU), False = load (CPU→GPU)
non_sliding_window_block_ids list[int] \| None 非滑动窗口 block IDs
sliding_window_block_ids list[int] \| None 滑动窗口 block IDs

OffloadingManager

OffloadingManager 定义了 6 个原语,构成完整的、事务性的 block 生命周期管理:

阶段 方法 作用
查找 lookup() block 在介质中吗?
加载 prepare_load() 锁定 blocks,准备读取
加载 complete_load() 解锁 blocks
存储 prepare_store() 锁定 blocks,准备写入
存储 complete_store() 解锁 blocks,注册为可加载
辅助 touch() 更新 LRU
辅助 take_events() 获取事件
辅助 reset_cache() 重置
辅助 shutdown() 关闭

思考:通过控制引用计数来控制 LRU 链表?

OffloadingSpec

必须一次 offload context_parallel_factor 个 block 的原因:

  1. 语义完整性: 一个逻辑 block 的 tokens 是连续的
  2. Hash 一致性: prefix cache 依赖完整的 token 序列
  3. Attention 正确性: 需要访问所有历史 tokens 的 KV

如果不这样做:

KVConnectorBase_V1

角色

Scheduler 端接口

方法 调用时机 作用
get_num_new_matched_tokens() 请求调度时 返回可以从外部 KV cache 加载的 token 数量
update_state_after_alloc() block 分配后 更新 connector 内部状态,建立 GPU blocks 与外部 blocks 的映射
build_connector_meta() 每个 scheduler step 构建传输元数据,发送给 worker
request_finished() 请求完成时 决定是否可以立即释放 KV blocks,或需要异步等待传输完成
update_connector_output() 收到 worker 输出后 处理来自 worker 的完成通知
take_events() 定期调用 获取 KV cache 事件(用于监控/可观测性)

Worker 端接口

方法 调用时机 作用
register_kv_caches() 初始化时 注册 KV cache tensors
bind_connector_metadata() 模型执行前 接收 scheduler 传来的传输元数据
start_load_kv() forward 开始前 异步启动 KV cache 加载 (CPU→GPU 或 网络接收)
wait_for_layer_load() 每个 attention layer 内 等待特定 layer 的加载完成
save_kv_layer() 每个 attention layer 内 异步启动 KV cache 保存 (GPU→CPU 或 网络发送)
wait_for_save() forward 结束后 等待所有保存操作完成
get_finished() 每个 step 结束 报告已完成的异步传输请求
handle_preemptions() 发生 preemption 时 处理被抢占的 requests,防止数据损坏

OffloadingConnector Scheduler

初始化

通过 config 创建 OffloadingSpec,然后创建两个 role:connector_schedulerconnector_worker

on_new_request

创建 RequestOffloadState

get_num_new_matched_tokens

  1. 查找 group_states 或者创建空 group_states
  2. update_offload_keys: 增量补充 offload_key,根据 block_hashes
  3. lookup: 遍历内存,用链表计数器决定哪些值得 offload,LRU 作为实际的 block,避免低频数据被 offload
  4. touch: 更新链表

bind_gpu_block_pool

保存 VRAM 的页表 BlockPool

关键点:VRAM 和内存都基于 BlockPool,这是一个 Python 实现的页表管理

update_state_after_alloc

  1. 检查是否需要加载 (num_external_tokens > 0)
  2. 判断需要加载哪些 block
  3. PD 分离需要把整个都 offload
  4. 检查冲突,读写冲突的话需要让 worker 阻塞
  5. prepare_store 更新 LRU,返回 CPULoadStoreSpec (包含 block id 信息)
  6. 生成 TransferJob
  7. _blocks_being_loaded 降低并发

build_connector_meta

把抢占、已经完成的任务加入 _current_batch_jobs_to_flush

update_connector_output

  1. 解析 OffloadingWorkerMetadata
  2. 判断哪些 job 完成了
  3. 更新 _block_id_to_pending_jobs
  4. 更新 LRU

take_events

返回平时记录的列表,每个 step 调用一次

OffloadingConnector Worker

初始化

被包装在 vllm/v1/worker/gpu/kv_connector.py 中:

方法 说明
init 初始化
pre_forward(scheduler_output) forward 前准备 KV cache 操作
post_forward(finished_req_ids, wait_for_save) forward 后处理后续操作
no_forward(scheduler_output) 空调度时无操作
set_disabled(disabled) dummy run 时禁用 connector

在此之前 GPU worker 已经分配好了 VRAM 中的 KV cache tensor

不同 attention 的处理是在 EngineCore 初始化时转化为标准 layout,并加到 forward context 中

register_kv_caches

不复制数据,只在原始 tensor 上建”索引表”,让 offloading worker 能通过 layer name 找到对应的内存位置。

示例:

layer.0: (1000, 32768)
→ 只关心: 1000 blocks, 每个 32768 bytes

MambaSpecAttentionSpec 仍要分别处理。

set_host_xfer_buffer_ops

设置主机传输缓冲区操作。

pre_forward

在模型 forward 之前准备 KV cache 操作 (如异步加载):

  1. handle_preemptions: 处理 _unsubmitted_store_jobs,进行异步传输 transfer_async
  2. start_load_kv: 处理加载任务

post_forward

模型执行完成后处理 KV cache 后续操作 (如 offload):

  1. 等待传输,标记错误
  2. 提交 store job,但不执行
  3. 加载监控等信息
  4. 返回清空 meta

no_forward

没有 token 任务时更新。

set_disabled

在 dummy run 时禁用 kv_connector,避免不必要的操作。

Job 的生命周期

class OffloadingConnectorScheduler:
    # 本 batch 新创建的 load jobs (待发送给 worker)
    _current_batch_load_jobs: dict[int, TransferJob] = {}
    
    # 本 batch 需要立即 flush 的 job IDs
    _current_batch_jobs_to_flush: set[int] = set()
    
    # 所有活跃 jobs 的状态跟踪
    _jobs: dict[int, TransferJobStatus] = {}
    
    # block_id → 正在使用该 block 的 store job IDs
    _block_id_to_pending_jobs: dict[int, set[int]] = {}
    
    # 每个请求的 transfer job IDs
    # (在 RequestOffloadState.transfer_jobs 中)

Store Job

初始为空。

1. _build_store_jobs()

manager.prepare_store([K0])  # 分配 CPU block

store_jobs[0] = TransferJob(
    req_id="req_A", 
    transfer_spec=(GPUSpec([101..104]), CPUSpec([5]))
)

TransferJobStatus(
    req_id="req_A",
    pending_count=1,        # 1 个 worker
    keys={K0},
    is_store=True,
    non_sliding_window_block_ids=[101, 102, 103, 104],
    sliding_window_block_ids=None
)

_current_batch_load_jobs = {}           # 没有 load jobs
_current_batch_jobs_to_flush = {}       # 没有 flush jobs
_jobs = {0: TransferJobStatus(...)}     # 新 job
store_jobs = {0: TransferJob(...)}      # 返回值

2. build_connector_meta()

OffloadingConnectorMetadata(
    load_jobs={},                   # ← _current_batch_load_jobs (空)
    store_jobs={0: TransferJob},    # ← _build_store_jobs() 返回值
)

_current_batch_load_jobs = {}           # 清空了
_current_batch_jobs_to_flush = {}       # 清空了
_jobs = {0: TransferJobStatus(...)}     # 还在跟踪

3. Worker 收到 meta

store_jobs = {0: TransferJob(GPU[101..104]  CPU[5])}
jobs_to_flush = {}

4. Worker 执行

memcpy(GPU 101..104  CPU 5)
# 完成后返回: {completed_jobs: {0: 1}}

5. update_connector_output()

job_status.pending_count -= 1  # → 0
manager.complete_store({K0})
_remove_pending_job(0, None)
del _jobs[0]
req_A.transfer_jobs.remove(0)

_jobs = {}                          # job 0 删除了
_block_id_to_pending_jobs = {}      # 清理了

Load Job

1. update_state_after_alloc()

TransferJob(
    req_id="req_B",
    transfer_spec=(CPUSpec([5]), GPUSpec([201..204]))
)

TransferJobStatus(
    req_id="req_B",
    pending_count=1,
    keys={K0},
    is_store=False
)

_blocks_being_loaded = {K0}

2. build_connector_meta()

OffloadingConnectorMetadata(
    _current_batch_load_jobs,
    store_jobs={},
    jobs_to_flush={},
)

_current_batch_load_jobs = {}          # 清空!
_current_batch_jobs_to_flush = {}
_jobs = {1: TransferJobStatus(...)}    # 还在跟踪

3. Worker 执行 — 类似 Store Job

4. update_connector_output() — 类似 Store Job

被抢占的请求

初始结构:

_jobs = {0: TransferJobStatus(req_id="req_A", is_store=True)}
_req_status["req_A"].transfer_jobs = {0}

1. build_connector_meta()

_current_batch_jobs_to_flush.update({0})

抢占的本质: 这些 jobs 对应的请求状态正在被清理 (因为被抢占或完成),worker 必须尽快回报完成状态,这样 scheduler 才能安全清理。

SingleDirectionOffloadingHandler

数据结构

包含 CPU 和 GPU 上的 tensor,传输时只需给出 ID 即可自动传输。

传输流程

解析输入

参数 含义
block_ids 传哪些 blocks
group_sizes 每组多少个 blocks
block_indices 从哪里开始写 (对齐 CPU sub-block)
num_copy_ops 要分配多大的数组

迭代 group 计算地址

  1. 计算这组从请求的哪个位置开始 (block_idx)
  2. 计算要跳过的 sub-blocks (对齐 CPU 边界)
  3. 切片出这组对应的 GPU/CPU block IDs
  4. 对每个 tensor: 计算字节地址 → 写入 all_src/all_dst
  5. 更新偏移,处理下一组

CUDA 同步

  1. 等待 forward 完成
  2. 等待上一个 transform 完成

总体来说无法并行

F1 → S1 → L2 → F2