In-Depth Study of Offline Connector
vllm/v1/kv_offload/base.py
OffloadKey
一个 block 需要两个信息来唯一标识:
block_hash: 内容的哈希 (比如 0xABCD1234…)group_idx: 属于哪个 KV cache 组 (比如 0, 1, 2)
解决方案:打包成 bytes
核心抽象
@dataclass
class ReqContext:
req_id: str
kv_transfer_params: dict[str, Any] | None = None
class LoadStoreSpec(ABC):
"""
Abstract metadata that encapsulates information allowing a worker
to load, and optionally also to store, blocks of KV data.
"""
@staticmethod
@abstractmethod
def medium() -> str:
"""Returns a string representation of the medium type."""
pass
@dataclass
class PrepareStoreOutput:
keys_to_store: list[OffloadKey]
store_spec: LoadStoreSpec
evicted_keys: list[OffloadKey]
@dataclass
class OffloadingEvent:
keys: list[OffloadKey]
medium: str
removed: bool # True if blocks are removed, False if stored
PrepareStoreOutput 生成流程
Scheduler (调度器)
→ 发现 GPU 上有空闲 blocks 需要 offload
→ 调用 SimpleCPUOffloadScheduler.prepare_store(keys)
SimpleCPUOffloadScheduler
→ 输入: keys (GPU 上的 blocks)
→ 结合自己的数据:
- CPU cache 当前状态 (哪些 blocks 已缓存)
- CPU pool 容量 (是否满了)
- LRU 顺序 (哪些 blocks 最久未使用)
→ 输出: PrepareStoreOutput
- keys_to_store: 要 offload 的 blocks
- store_spec: CPU 上的目标位置
- evicted_keys: 被驱逐的旧 blocks
事件处理流程
Scheduler 在 update_from_output() 方法中:
-
调用
connector.take_events()获取事件 → 转发到SimpleCPUOffloadScheduler.take_events()→ 转发到CPUOffloadManager.take_events() -
合并所有事件 (
kv_cache_manager+connector) -
通过
kv_event_publisher发布给外部监听者
RequestOffloadState
主要包含信息:
SchedulerOffloadConfigRequest(请求 id、token 数据、block_hashes 等)group_statesoffload_keys:block_hash+group_idxblock_ids: scheduler 分配的next_stored_block_idx: 跟踪 offload 进度 (哪个 block 是下一个要保存的)num_hit_blocks: 跟踪进行中的传输 jobs (load 或 store)
TransferJobStatus
| 字段 | 类型 | 说明 |
|---|---|---|
req_id |
ReqId |
属于哪个请求 |
pending_count |
int |
还有几个 worker 没完成 |
keys |
set[OffloadKey] |
这个 job 涉及的 offload keys |
is_store |
bool |
True = store (GPU→CPU), False = load (CPU→GPU) |
non_sliding_window_block_ids |
list[int] \| None |
非滑动窗口 block IDs |
sliding_window_block_ids |
list[int] \| None |
滑动窗口 block IDs |
OffloadingManager
OffloadingManager 定义了 6 个原语,构成完整的、事务性的 block 生命周期管理:
| 阶段 | 方法 | 作用 |
|---|---|---|
| 查找 | lookup() |
block 在介质中吗? |
| 加载 | prepare_load() |
锁定 blocks,准备读取 |
| 加载 | complete_load() |
解锁 blocks |
| 存储 | prepare_store() |
锁定 blocks,准备写入 |
| 存储 | complete_store() |
解锁 blocks,注册为可加载 |
| 辅助 | touch() |
更新 LRU |
| 辅助 | take_events() |
获取事件 |
| 辅助 | reset_cache() |
重置 |
| 辅助 | shutdown() |
关闭 |
思考:通过控制引用计数来控制 LRU 链表?
OffloadingSpec
必须一次 offload context_parallel_factor 个 block 的原因:
- 语义完整性: 一个逻辑 block 的 tokens 是连续的
- Hash 一致性: prefix cache 依赖完整的 token 序列
- Attention 正确性: 需要访问所有历史 tokens 的 KV
如果不这样做:
- cache 命中率下降
- attention 计算错误
- 生成结果不正确
KVConnectorBase_V1
角色
SCHEDULER: 决定”做什么”WORKER: 执行”怎么做”
Scheduler 端接口
| 方法 | 调用时机 | 作用 |
|---|---|---|
get_num_new_matched_tokens() |
请求调度时 | 返回可以从外部 KV cache 加载的 token 数量 |
update_state_after_alloc() |
block 分配后 | 更新 connector 内部状态,建立 GPU blocks 与外部 blocks 的映射 |
build_connector_meta() |
每个 scheduler step | 构建传输元数据,发送给 worker |
request_finished() |
请求完成时 | 决定是否可以立即释放 KV blocks,或需要异步等待传输完成 |
update_connector_output() |
收到 worker 输出后 | 处理来自 worker 的完成通知 |
take_events() |
定期调用 | 获取 KV cache 事件(用于监控/可观测性) |
Worker 端接口
| 方法 | 调用时机 | 作用 |
|---|---|---|
register_kv_caches() |
初始化时 | 注册 KV cache tensors |
bind_connector_metadata() |
模型执行前 | 接收 scheduler 传来的传输元数据 |
start_load_kv() |
forward 开始前 | 异步启动 KV cache 加载 (CPU→GPU 或 网络接收) |
wait_for_layer_load() |
每个 attention layer 内 | 等待特定 layer 的加载完成 |
save_kv_layer() |
每个 attention layer 内 | 异步启动 KV cache 保存 (GPU→CPU 或 网络发送) |
wait_for_save() |
forward 结束后 | 等待所有保存操作完成 |
get_finished() |
每个 step 结束 | 报告已完成的异步传输请求 |
handle_preemptions() |
发生 preemption 时 | 处理被抢占的 requests,防止数据损坏 |
OffloadingConnector Scheduler
初始化
通过 config 创建 OffloadingSpec,然后创建两个 role:connector_scheduler、connector_worker
on_new_request
创建 RequestOffloadState
get_num_new_matched_tokens
- 查找
group_states或者创建空group_states update_offload_keys: 增量补充offload_key,根据block_hasheslookup: 遍历内存,用链表计数器决定哪些值得 offload,LRU 作为实际的 block,避免低频数据被 offloadtouch: 更新链表
bind_gpu_block_pool
保存 VRAM 的页表 BlockPool
关键点:VRAM 和内存都基于
BlockPool,这是一个 Python 实现的页表管理
update_state_after_alloc
- 检查是否需要加载 (
num_external_tokens > 0) - 判断需要加载哪些 block
- PD 分离需要把整个都 offload
- 检查冲突,读写冲突的话需要让 worker 阻塞
prepare_store更新 LRU,返回CPULoadStoreSpec(包含 block id 信息)- 生成
TransferJob _blocks_being_loaded降低并发
build_connector_meta
把抢占、已经完成的任务加入 _current_batch_jobs_to_flush
update_connector_output
- 解析
OffloadingWorkerMetadata - 判断哪些 job 完成了
- 更新
_block_id_to_pending_jobs - 更新 LRU
take_events
返回平时记录的列表,每个 step 调用一次
OffloadingConnector Worker
初始化
被包装在 vllm/v1/worker/gpu/kv_connector.py 中:
| 方法 | 说明 |
|---|---|
init |
初始化 |
pre_forward(scheduler_output) |
forward 前准备 KV cache 操作 |
post_forward(finished_req_ids, wait_for_save) |
forward 后处理后续操作 |
no_forward(scheduler_output) |
空调度时无操作 |
set_disabled(disabled) |
dummy run 时禁用 connector |
在此之前 GPU worker 已经分配好了 VRAM 中的 KV cache tensor
不同 attention 的处理是在
EngineCore初始化时转化为标准 layout,并加到 forward context 中
register_kv_caches
不复制数据,只在原始 tensor 上建”索引表”,让 offloading worker 能通过 layer name 找到对应的内存位置。
示例:
layer.0: (1000, 32768)
→ 只关心: 1000 blocks, 每个 32768 bytes
但 MambaSpec、AttentionSpec 仍要分别处理。
set_host_xfer_buffer_ops
设置主机传输缓冲区操作。
pre_forward
在模型 forward 之前准备 KV cache 操作 (如异步加载):
handle_preemptions: 处理_unsubmitted_store_jobs,进行异步传输transfer_asyncstart_load_kv: 处理加载任务
post_forward
模型执行完成后处理 KV cache 后续操作 (如 offload):
- 等待传输,标记错误
- 提交 store job,但不执行
- 加载监控等信息
- 返回清空 meta
no_forward
没有 token 任务时更新。
set_disabled
在 dummy run 时禁用 kv_connector,避免不必要的操作。
Job 的生命周期
class OffloadingConnectorScheduler:
# 本 batch 新创建的 load jobs (待发送给 worker)
_current_batch_load_jobs: dict[int, TransferJob] = {}
# 本 batch 需要立即 flush 的 job IDs
_current_batch_jobs_to_flush: set[int] = set()
# 所有活跃 jobs 的状态跟踪
_jobs: dict[int, TransferJobStatus] = {}
# block_id → 正在使用该 block 的 store job IDs
_block_id_to_pending_jobs: dict[int, set[int]] = {}
# 每个请求的 transfer job IDs
# (在 RequestOffloadState.transfer_jobs 中)
Store Job
初始为空。
1. _build_store_jobs()
manager.prepare_store([K0]) # 分配 CPU block
store_jobs[0] = TransferJob(
req_id="req_A",
transfer_spec=(GPUSpec([101..104]), CPUSpec([5]))
)
TransferJobStatus(
req_id="req_A",
pending_count=1, # 1 个 worker
keys={K0},
is_store=True,
non_sliding_window_block_ids=[101, 102, 103, 104],
sliding_window_block_ids=None
)
_current_batch_load_jobs = {} # 没有 load jobs
_current_batch_jobs_to_flush = {} # 没有 flush jobs
_jobs = {0: TransferJobStatus(...)} # 新 job
store_jobs = {0: TransferJob(...)} # 返回值
2. build_connector_meta()
OffloadingConnectorMetadata(
load_jobs={}, # ← _current_batch_load_jobs (空)
store_jobs={0: TransferJob}, # ← _build_store_jobs() 返回值
)
_current_batch_load_jobs = {} # 清空了
_current_batch_jobs_to_flush = {} # 清空了
_jobs = {0: TransferJobStatus(...)} # 还在跟踪
3. Worker 收到 meta
store_jobs = {0: TransferJob(GPU[101..104] → CPU[5])}
jobs_to_flush = {}
4. Worker 执行
memcpy(GPU 101..104 → CPU 5)
# 完成后返回: {completed_jobs: {0: 1}}
5. update_connector_output()
job_status.pending_count -= 1 # → 0
manager.complete_store({K0})
_remove_pending_job(0, None)
del _jobs[0]
req_A.transfer_jobs.remove(0)
_jobs = {} # job 0 删除了
_block_id_to_pending_jobs = {} # 清理了
Load Job
1. update_state_after_alloc()
TransferJob(
req_id="req_B",
transfer_spec=(CPUSpec([5]), GPUSpec([201..204]))
)
TransferJobStatus(
req_id="req_B",
pending_count=1,
keys={K0},
is_store=False
)
_blocks_being_loaded = {K0}
2. build_connector_meta()
OffloadingConnectorMetadata(
_current_batch_load_jobs,
store_jobs={},
jobs_to_flush={},
)
_current_batch_load_jobs = {} # 清空!
_current_batch_jobs_to_flush = {}
_jobs = {1: TransferJobStatus(...)} # 还在跟踪
3. Worker 执行 — 类似 Store Job
4. update_connector_output() — 类似 Store Job
被抢占的请求
初始结构:
_jobs = {0: TransferJobStatus(req_id="req_A", is_store=True)}
_req_status["req_A"].transfer_jobs = {0}
1. build_connector_meta()
_current_batch_jobs_to_flush.update({0})
抢占的本质: 这些 jobs 对应的请求状态正在被清理 (因为被抢占或完成),worker 必须尽快回报完成状态,这样 scheduler 才能安全清理。
SingleDirectionOffloadingHandler
数据结构
包含 CPU 和 GPU 上的 tensor,传输时只需给出 ID 即可自动传输。
传输流程
解析输入
| 参数 | 含义 |
|---|---|
block_ids |
传哪些 blocks |
group_sizes |
每组多少个 blocks |
block_indices |
从哪里开始写 (对齐 CPU sub-block) |
num_copy_ops |
要分配多大的数组 |
迭代 group 计算地址
- 计算这组从请求的哪个位置开始 (
block_idx) - 计算要跳过的 sub-blocks (对齐 CPU 边界)
- 切片出这组对应的 GPU/CPU block IDs
- 对每个 tensor: 计算字节地址 → 写入
all_src/all_dst - 更新偏移,处理下一组
CUDA 同步
- 等待 forward 完成
- 等待上一个 transform 完成
总体来说无法并行
F1 → S1 → L2 → F2