Fence 机制

核心概念

Fence 是 KV cache offloading 中的数据一致性保护机制

问题场景:一个请求结束了,但它触发的 GPU→CPU 数据搬运(store job)还没完成。此时请求占用的 GPU block 会被释放,可能被下一个请求复用。如果新请求覆写了这些 block,而旧的 store job 还在搬运旧数据,就会把脏数据写到 CPU 端。

解决方案:在 request_finished 时,把所有还在飞的 store job 的 block ID 注册到 _block_id_to_pending_jobs 字典里。后续如果有新请求分配到相同的 block ID,fence 会检测到冲突,强制 flush(立即完成)那些还没完成的 store job。

Mock 架构(4 层)

测试系统从上到下有 4 层 mock:

第 1 层:MockOffloadingHandler(传输引擎)

模拟真实的 GPU→CPU 数据传输引擎。真实系统用 DMA 传输,测试用内存标记:

方法 真实系统 测试 mock
transfer_async(job_id, spec) 发起异步 DMA 传输 只记录 waiting_jobs.add(job_id)
complete_jobs(job_ids) 等待 DMA 完成 立即标记完成
wait(job_ids) 强制 flush(中断传输) 标记 flushed + 立即完成

第 2 层:MockOffloadingSpec.manager(策略层 MagicMock)

OffloadingManager 是生产代码中的策略层,决定”存什么/不存什么”。真实系统有 4 层策略:

  1. 频率过滤:只存被 lookup 命中 ≥ N 次的 block(跳过一次性 decode block)
  2. 去重:已在 CPU 缓存的 block 不重复存
  3. LRU/ARC 淘汰:CPU 内存满时踢掉旧 block
  4. 保护集:正在存的 block 不能被自己淘汰

测试中 managerMagicMock,通过 side_effect 接管所有策略。

第 3 层:runner.manager(测试遥控器)

# 设置策略:给什么 key 就存什么 key
runner.manager.prepare_store.side_effect = lambda keys, req_context: (
    generate_store_output(keys)
)

注意:每次 runner.run() 内部会调用 manager.reset_mock(),所以每次 run 后需要重新设置 side_effect

第 4 层:RequestRunner(模拟 EngineCore.step 循环)

模拟真实引擎的调度-推理-更新循环:

真实引擎 (EngineCore.step):              测试 (RequestRunner._run):
──────────────────────                   ─────────────────────────
scheduler.schedule()                     scheduler.schedule()
  → 分配 block                             → 分配 block
  → build_connector_meta()                 → build_connector_meta()
model_executor.execute_model()           worker_connector.bind_connector_metadata()
  → pre_forward: handle_preemptions        → start_load_kv()
  → model forward                          → if complete_transfers: complete_jobs()
  → post_forward: get_finished()           → worker_connector.get_finished()
scheduler.update_from_output()           scheduler.update_from_output()

哪些是真实的,哪些是 mock 的

组件 真实/mock 原因
Scheduler 真实 测试核心就是验证调度器与 connector 的交互
OffloadingConnectorScheduler 真实 被测对象
OffloadingManager MagicMock 策略层,测试要控制”存什么/不存什么”
OffloadingHandler MockOffloadingHandler 传输引擎,测试要控制”何时完成”
模型推理 create_model_runner_output 不需要真实 GPU,模拟 token 生成即可
_update_gpu_blocks() 测试辅助 真实系统不需要,测试用来翻译物理 block_id → 逻辑偏移

TransferJobStatus 数据结构

调度器侧的 job 状态跟踪(scheduler.py:48-64):

@dataclass(slots=True)
class TransferJobStatus:
    req_id: ReqId                                    # 请求 ID
    pending_count: int                               # 还有几个 worker 没完成(到 0 表示完成)
    keys: set[OffloadKey]                            # 这个 job 覆盖的 offload key
    is_store: bool                                   # True=GPU→CPU, False=CPU→GPU
    non_sliding_window_block_ids: list[int] | None   # 非滑动窗口 block ID
    sliding_window_block_ids: list[int] | None       # 滑动窗口 block ID

调度器侧 vs Worker 侧的分工

  TransferJobStatus(调度器侧) TransferJob(发给 worker)
包含 req_id, is_store, keys, pending_count, block_ids GPULoadStoreSpec(GPU block ID), CPULoadStoreSpec(CPU slot)
不包含 GPU 地址、CPU 地址、tensor 数据 请求 ID、fence 信息
用途 跟踪决策、管理 fence 和生命周期 告诉 worker “具体搬什么、搬到哪”

调度器不碰数据,只知道”让 worker 去存哪些 block”。具体 GPU 显存地址、CPU 内存地址是 worker 侧的事。

RequestRunner.run() 详解

本质

run() = 模拟真实引擎跑 N 步推理,但跳过真实 GPU 推理,用假 token 代替。

runner.run(
    decoded_tokens=[0, 0, 0, 0],   # 假装模型生成了 4 个 token
    complete_transfers=False,       # 不主动完成传输
    expected_stored=(),             # 期望完成的 store
    expected_flushed=(),            # 期望被 flush 的 store
)

complete_transfers 的含义

行为 模拟场景
True 每步都调 complete_jobs() → job 立即完成 传输极快
False 不调 → job 一直在飞,直到被 flush 强制完成 传输慢,job 堆积

expected_stored / expected_flushed 语义

即使 complete_transfers=Falseexpected_stored 仍然可以有值——因为 flush 内部调用了 complete_jobs(),flush 完成也算 store 完成。

三种 Flush 触发机制

触发方式 条件 场景
fence 冲突 新请求复用了有 pending job 的 block 请求 A 结束,请求 B 抢了 A 的 block
preemption 请求被抢占 高优先级请求来了,低优先级被踢
all finished 所有请求都结束了 最后一个请求完成,没有后续请求(兜底机制)

all finished 的必要性:如果请求 A 是最后一个请求,没有请求 B 来触发 fence 冲突,job 就永远不会被 flush。所以必须有兜底:所有请求结束时主动 flush 所有 job。

测试用例

Case 1: fence 注册验证

验证什么request_finished 是否正确地把 non-SW block ID 注册到 fence。

为什么不用 run():需要停在 request_finished 之后检查 fence 中间状态。如果用 run(),循环会继续跑直到 job 完成,fence 最终被清空,看不到中间状态。

实现步骤

  1. 创建 request_runnerblock_size=4, num_gpu_blocks=100
  2. 添加请求(4 token → 1 block)
  3. 设置 store 策略:prepare_store.side_effect = lambda keys, ...: generate_store_output(keys)
  4. 手动 schedule() + _update_gpu_blocks()
  5. 检查:
    • 确认有 job 创建(len(_jobs) > 0
    • 确认是 store 类型(is_store=True
    • 确认有 non-SW block ID
    • 确认 fence 为空(FullAttention 有 ref_cnt 保护,请求运行中不需要 fence)
  6. 手动 finish_requests() → 触发 request_finished
  7. 检查:
    • 每个 non-SW block ID 都在 fence 里
    • 每个 block ID 都指向正确的 job_id
    • req_status 仍然存在(job 还在飞,不能清理)

Case 2: 多 job 全部 flush

验证什么request_finished 的 for 循环是否遍历了所有 in-flight job,不遗漏任何一个。

多 job 怎么产生_build_store_jobs 内部有游标 next_stored_block_idx,每次只看”游标之后”的新 block。每次 run 填满新 block → 游标前进 → 创建新 job。

实现步骤

  1. 创建请求(4 prompt token → 1 block)
  2. 第一次 run(decoded_tokens=[0]*4, complete_transfers=False)
    • 4 个 decode token → block 1 填满
    • _build_store_jobs 发现 block 0-1 都满了 → 创建 job_0
    • complete_transfers=False → job_0 留在 _jobs
    • expected_stored=(), expected_flushed=() → 没有完成,没有 flush
  3. 检查 len(_jobs) == 1(job_0 在飞)
  4. 重新设置 prepare_store.side_effect(因为 run() 调了 reset_mock()
  5. 第二次 run(decoded_tokens=[0]*4 + [EOS], complete_transfers=False)
    • 4 个 decode token → block 2 填满 → 创建 job_1
    • EOS → 请求结束 → request_finished → fence 注册 job_0 + job_1
    • “all finished” → flush job_0 + job_1
    • expected_stored=(0, 1, 2) → 3 个 block 都存完了
    • expected_flushed=(0, 1, 2) → 3 个 block 都被 flush 了
  6. 检查 fence == {}len(_jobs) == 0

如果 for 循环只处理了 job_0:block 2 不会被 flush,expected_flushed=(0, 1, 2) 失败(只有 (0, 1))。

Case 3: 完整生命周期清理

验证什么:整个流程跑完后,所有状态都清理干净。

与 Case 1 的分工:Case 1 停在中间检查 fence 注册;Case 3 跑完整个流程检查最终清理。

实现步骤

  1. 创建请求(4 prompt token → 1 block)
  2. run(decoded_tokens=[EOS_TOKEN_ID], complete_transfers=False)
    • 一次 run 跑完整个生命周期
    • expected_stored=(0,) → block 0 存完了
    • expected_flushed=(0,) → block 0 被 flush 了
  3. 检查:
    • fence == {} → fence 清空了
    • len(_jobs) == 0 → job 移除了
    • req_id not in _req_status → req_status 移除了

Case 4: 混合架构 SW vs non-SW

验证什么:同时有 FullAttention 和 SlidingWindow 两种 KV cache 组时,fence 注册是否正确区分 SW 和 non-SW block。

为什么 SW 在 request_finished 之前就在 fence 里

FullAttention block:
  请求运行期间 ref_cnt > 0 → 不会被释放 → request_finished 时注册 fence 就行

SlidingWindow block:
  请求还在运行时,窗口滑动就可能释放旧 block → 被新请求复用
  → 必须在创建 store job 时立即注册 fence
  → 否则 request_finished 还没调用,block 就已经被覆写了

实现步骤

  1. 声明两个 KV cache group:FullAttention + SlidingWindow(sliding_window=8, 2 block)
  2. 创建请求(12 token → 3 block)
  3. 手动 schedule() + _update_gpu_blocks()
  4. 检查:
    • 有 1 个 store job
    • job 同时有 non_sliding_window_block_idssliding_window_block_ids
    • SW block 已经在 fence 里(创建时立即注册)
  5. 记录 SW fence 快照
  6. 手动 finish_requests() → 触发 request_finished
  7. 检查:
    • non-SW block 现在在 fence 里(request_finished 注册的)
    • SW fence 条目没变(request_finished 只处理 non-SW,不碰 SW)