Fence 机制
核心概念
Fence 是 KV cache offloading 中的数据一致性保护机制。
问题场景:一个请求结束了,但它触发的 GPU→CPU 数据搬运(store job)还没完成。此时请求占用的 GPU block 会被释放,可能被下一个请求复用。如果新请求覆写了这些 block,而旧的 store job 还在搬运旧数据,就会把脏数据写到 CPU 端。
解决方案:在 request_finished 时,把所有还在飞的 store job 的 block ID 注册到 _block_id_to_pending_jobs 字典里。后续如果有新请求分配到相同的 block ID,fence 会检测到冲突,强制 flush(立即完成)那些还没完成的 store job。
Mock 架构(4 层)
测试系统从上到下有 4 层 mock:
第 1 层:MockOffloadingHandler(传输引擎)
模拟真实的 GPU→CPU 数据传输引擎。真实系统用 DMA 传输,测试用内存标记:
| 方法 | 真实系统 | 测试 mock |
|---|---|---|
transfer_async(job_id, spec) |
发起异步 DMA 传输 | 只记录 waiting_jobs.add(job_id) |
complete_jobs(job_ids) |
等待 DMA 完成 | 立即标记完成 |
wait(job_ids) |
强制 flush(中断传输) | 标记 flushed + 立即完成 |
第 2 层:MockOffloadingSpec.manager(策略层 MagicMock)
OffloadingManager 是生产代码中的策略层,决定”存什么/不存什么”。真实系统有 4 层策略:
- 频率过滤:只存被 lookup 命中 ≥ N 次的 block(跳过一次性 decode block)
- 去重:已在 CPU 缓存的 block 不重复存
- LRU/ARC 淘汰:CPU 内存满时踢掉旧 block
- 保护集:正在存的 block 不能被自己淘汰
测试中 manager 是 MagicMock,通过 side_effect 接管所有策略。
第 3 层:runner.manager(测试遥控器)
# 设置策略:给什么 key 就存什么 key
runner.manager.prepare_store.side_effect = lambda keys, req_context: (
generate_store_output(keys)
)
注意:每次 runner.run() 内部会调用 manager.reset_mock(),所以每次 run 后需要重新设置 side_effect。
第 4 层:RequestRunner(模拟 EngineCore.step 循环)
模拟真实引擎的调度-推理-更新循环:
真实引擎 (EngineCore.step): 测试 (RequestRunner._run):
────────────────────── ─────────────────────────
scheduler.schedule() scheduler.schedule()
→ 分配 block → 分配 block
→ build_connector_meta() → build_connector_meta()
model_executor.execute_model() worker_connector.bind_connector_metadata()
→ pre_forward: handle_preemptions → start_load_kv()
→ model forward → if complete_transfers: complete_jobs()
→ post_forward: get_finished() → worker_connector.get_finished()
scheduler.update_from_output() scheduler.update_from_output()
哪些是真实的,哪些是 mock 的
| 组件 | 真实/mock | 原因 |
|---|---|---|
Scheduler |
真实 | 测试核心就是验证调度器与 connector 的交互 |
OffloadingConnectorScheduler |
真实 | 被测对象 |
OffloadingManager |
MagicMock |
策略层,测试要控制”存什么/不存什么” |
OffloadingHandler |
MockOffloadingHandler |
传输引擎,测试要控制”何时完成” |
| 模型推理 | create_model_runner_output |
不需要真实 GPU,模拟 token 生成即可 |
_update_gpu_blocks() |
测试辅助 | 真实系统不需要,测试用来翻译物理 block_id → 逻辑偏移 |
TransferJobStatus 数据结构
调度器侧的 job 状态跟踪(scheduler.py:48-64):
@dataclass(slots=True)
class TransferJobStatus:
req_id: ReqId # 请求 ID
pending_count: int # 还有几个 worker 没完成(到 0 表示完成)
keys: set[OffloadKey] # 这个 job 覆盖的 offload key
is_store: bool # True=GPU→CPU, False=CPU→GPU
non_sliding_window_block_ids: list[int] | None # 非滑动窗口 block ID
sliding_window_block_ids: list[int] | None # 滑动窗口 block ID
调度器侧 vs Worker 侧的分工:
TransferJobStatus(调度器侧) |
TransferJob(发给 worker) |
|
|---|---|---|
| 包含 | req_id, is_store, keys, pending_count, block_ids |
GPULoadStoreSpec(GPU block ID), CPULoadStoreSpec(CPU slot) |
| 不包含 | GPU 地址、CPU 地址、tensor 数据 | 请求 ID、fence 信息 |
| 用途 | 跟踪决策、管理 fence 和生命周期 | 告诉 worker “具体搬什么、搬到哪” |
调度器不碰数据,只知道”让 worker 去存哪些 block”。具体 GPU 显存地址、CPU 内存地址是 worker 侧的事。
RequestRunner.run() 详解
本质
run() = 模拟真实引擎跑 N 步推理,但跳过真实 GPU 推理,用假 token 代替。
runner.run(
decoded_tokens=[0, 0, 0, 0], # 假装模型生成了 4 个 token
complete_transfers=False, # 不主动完成传输
expected_stored=(), # 期望完成的 store
expected_flushed=(), # 期望被 flush 的 store
)
decoded_tokens的值不重要(都是 0),重要的是数量——4 个 token 意味着 4 步 decoderun()只是”执行 N 步推理”,不是”完成所有任务”
complete_transfers 的含义
| 值 | 行为 | 模拟场景 |
|---|---|---|
True |
每步都调 complete_jobs() → job 立即完成 |
传输极快 |
False |
不调 → job 一直在飞,直到被 flush 强制完成 | 传输慢,job 堆积 |
expected_stored / expected_flushed 语义
expected_stored=(0, 1, 2):3 个 block 偏移的 store 完成了expected_flushed=(0, 1, 2):3 个 block 偏移被 flush(强制提前完成)了
即使 complete_transfers=False,expected_stored 仍然可以有值——因为 flush 内部调用了 complete_jobs(),flush 完成也算 store 完成。
三种 Flush 触发机制
| 触发方式 | 条件 | 场景 |
|---|---|---|
| fence 冲突 | 新请求复用了有 pending job 的 block | 请求 A 结束,请求 B 抢了 A 的 block |
| preemption | 请求被抢占 | 高优先级请求来了,低优先级被踢 |
| all finished | 所有请求都结束了 | 最后一个请求完成,没有后续请求(兜底机制) |
all finished 的必要性:如果请求 A 是最后一个请求,没有请求 B 来触发 fence 冲突,job 就永远不会被 flush。所以必须有兜底:所有请求结束时主动 flush 所有 job。
测试用例
Case 1: fence 注册验证
验证什么:request_finished 是否正确地把 non-SW block ID 注册到 fence。
为什么不用 run():需要停在 request_finished 之后检查 fence 中间状态。如果用 run(),循环会继续跑直到 job 完成,fence 最终被清空,看不到中间状态。
实现步骤:
- 创建
request_runner(block_size=4,num_gpu_blocks=100) - 添加请求(4 token → 1 block)
- 设置 store 策略:
prepare_store.side_effect = lambda keys, ...: generate_store_output(keys) - 手动
schedule()+_update_gpu_blocks() - 检查:
- 确认有 job 创建(
len(_jobs) > 0) - 确认是 store 类型(
is_store=True) - 确认有 non-SW block ID
- 确认 fence 为空(FullAttention 有 ref_cnt 保护,请求运行中不需要 fence)
- 确认有 job 创建(
- 手动
finish_requests()→ 触发request_finished - 检查:
- 每个 non-SW block ID 都在 fence 里
- 每个 block ID 都指向正确的 job_id
req_status仍然存在(job 还在飞,不能清理)
Case 2: 多 job 全部 flush
验证什么:request_finished 的 for 循环是否遍历了所有 in-flight job,不遗漏任何一个。
多 job 怎么产生:_build_store_jobs 内部有游标 next_stored_block_idx,每次只看”游标之后”的新 block。每次 run 填满新 block → 游标前进 → 创建新 job。
实现步骤:
- 创建请求(4 prompt token → 1 block)
- 第一次
run(decoded_tokens=[0]*4, complete_transfers=False):- 4 个 decode token → block 1 填满
_build_store_jobs发现 block 0-1 都满了 → 创建 job_0complete_transfers=False→ job_0 留在_jobs里expected_stored=(),expected_flushed=()→ 没有完成,没有 flush
- 检查
len(_jobs) == 1(job_0 在飞) - 重新设置
prepare_store.side_effect(因为run()调了reset_mock()) - 第二次
run(decoded_tokens=[0]*4 + [EOS], complete_transfers=False):- 4 个 decode token → block 2 填满 → 创建 job_1
- EOS → 请求结束 →
request_finished→ fence 注册 job_0 + job_1 - “all finished” → flush job_0 + job_1
expected_stored=(0, 1, 2)→ 3 个 block 都存完了expected_flushed=(0, 1, 2)→ 3 个 block 都被 flush 了
- 检查
fence == {},len(_jobs) == 0
如果 for 循环只处理了 job_0:block 2 不会被 flush,expected_flushed=(0, 1, 2) 失败(只有 (0, 1))。
Case 3: 完整生命周期清理
验证什么:整个流程跑完后,所有状态都清理干净。
与 Case 1 的分工:Case 1 停在中间检查 fence 注册;Case 3 跑完整个流程检查最终清理。
实现步骤:
- 创建请求(4 prompt token → 1 block)
run(decoded_tokens=[EOS_TOKEN_ID], complete_transfers=False):- 一次 run 跑完整个生命周期
expected_stored=(0,)→ block 0 存完了expected_flushed=(0,)→ block 0 被 flush 了
- 检查:
fence == {}→ fence 清空了len(_jobs) == 0→ job 移除了req_id not in _req_status→ req_status 移除了
Case 4: 混合架构 SW vs non-SW
验证什么:同时有 FullAttention 和 SlidingWindow 两种 KV cache 组时,fence 注册是否正确区分 SW 和 non-SW block。
为什么 SW 在 request_finished 之前就在 fence 里:
FullAttention block:
请求运行期间 ref_cnt > 0 → 不会被释放 → request_finished 时注册 fence 就行
SlidingWindow block:
请求还在运行时,窗口滑动就可能释放旧 block → 被新请求复用
→ 必须在创建 store job 时立即注册 fence
→ 否则 request_finished 还没调用,block 就已经被覆写了
实现步骤:
- 声明两个 KV cache group:FullAttention + SlidingWindow(
sliding_window=8, 2 block) - 创建请求(12 token → 3 block)
- 手动
schedule()+_update_gpu_blocks() - 检查:
- 有 1 个 store job
- job 同时有
non_sliding_window_block_ids和sliding_window_block_ids - SW block 已经在 fence 里(创建时立即注册)
- 记录 SW fence 快照
- 手动
finish_requests()→ 触发request_finished - 检查:
- non-SW block 现在在 fence 里(
request_finished注册的) - SW fence 条目没变(
request_finished只处理 non-SW,不碰 SW)
- non-SW block 现在在 fence 里(