vLLM SimpleCPUOffloadConnector Impletation

环境设定

num_gpu_blocks = 100     → GPU blocks: [0(null), 1, 2, ..., 99]
num_cpu_blocks = 50      → CPU blocks: [0(null), 1, 2, ..., 49]
block_size = 16 tokens
hash_block_size = 16 tokens

请求 A: “Hello, world!” (3 tokens)

Step 1: 创建请求
┌─────────────────────────────────────────────────┐
│ Request("A")                                     │
│   tokens: "Hello, world!" (3 tokens)             │
│   block_hashes: [0xABCD]                         │
│     ← hash("Hello, world!"[:16]) = 0xABCD        │
└─────────────────────────────────────────────────┘

Step 2: 检查 GPU prefix cache
┌─────────────────────────────────────────────────┐
│ kv_cache_manager.get_computed_blocks(A)          │
│   → GPU cache 查询 (A 是新的)                     │
│   → num_computed_tokens = 0                      │
└─────────────────────────────────────────────────┘

Step 3: 检查 CPU prefix cache
┌─────────────────────────────────────────────────┐
│ connector.get_num_new_matched_tokens(A, 0)       │
│   → CPU cache 查询:                              │
│     cached_block_hash_to_block[0xABCD] = ?       │
│   → 未命中 (cache 为空)                           │
│   → return (0, False)                            │
└─────────────────────────────────────────────────┘

Step 4: 分配 GPU block
┌─────────────────────────────────────────────────┐
│ kv_cache_manager.allocate_slots(A)               │
│   → 需要 blocks: ceil(3/16) = 1                  │
│   → 分配 GPU block 1                             │
│   → GPU[1].ref_cnt = 1                           │
│                                                  │
│ GPU 状态:                                        │
│   blocks[1]: ref_cnt=1, hash=None, 内容=空       │
│   free_queue: [2, 3, ..., 99]                    │
└─────────────────────────────────────────────────┘

Step 5: 注册 eager store
┌─────────────────────────────────────────────────┐
│ connector.update_state_after_alloc(A, [[1]], 0)  │
│   → num_external_tokens=0, 无需 load             │
│   → 注册 _reqs_to_store["A"]:                    │
│     block_ids=([1],), num_stored_blocks=[0]      │
└─────────────────────────────────────────────────┘

Step 6: 构建传输元数据
┌─────────────────────────────────────────────────┐
│ connector.build_connector_meta()                 │
│   → prepare_store_specs():                       │
│     扫描 _reqs_to_store["A"]                     │
│     → GPU[1] 有 KV 数据,需要 offload             │
│     → 分配 CPU block 1                           │
│     → stamp: CPU[1]._block_hash = 0xABCD         │
│     → touch(GPU[1])  ← 传输保护                   │
│                                                  │
│   → return:                                      │
│     store_event=0                                │
│     store_gpu_blocks=[1]  ← 源                   │
│     store_cpu_blocks=[1]  ← 目标                 │
└─────────────────────────────────────────────────┘

Step 7: Worker 模型推理
┌─────────────────────────────────────────────────┐
│ model.forward("Hello, world!")                   │
│   → KV 数据写入 GPU[1]                           │
│   → GPU[1] 内容: K/V for "Hello, world!" (4MB)   │
└─────────────────────────────────────────────────┘

Step 8: Worker DMA (GPU → CPU)
┌─────────────────────────────────────────────────┐
│ get_finished():                                  │
│   → DMA: GPU[1] → CPU[1] (4MB, 异步)             │
│   → 返回 completed_store_events={0}              │
└─────────────────────────────────────────────────┘

Step 9: Scheduler 处理完成事件
┌─────────────────────────────────────────────────┐
│ update_connector_output():                       │
│   → _process_store_completion([1], [1]):         │
│     → 注册 CPU cache:                            │
│       cached_block_hash_to_block[0xABCD]=CPU[1]  │
│     → free_blocks(GPU[1])  → ref_cnt=0           │
│     → free_blocks(CPU[1])  → ref_cnt=0           │
│                                                  │
│ 最终状态:                                        │
│   GPU[1]: ref_cnt=0, hash=0xABCD, 在 free_queue  │
│   CPU[1]: ref_cnt=0, hash=0xABCD, 在 cache map   │
│           在 free_queue (最前, LRU 优先驱逐)       │
└─────────────────────────────────────────────────┘

请求 B: “Hello, world! How are you?” (7 tokens)

Step 1: 创建请求
┌─────────────────────────────────────────────────┐
│ Request("B")                                     │
│   tokens: "Hello, world! How are you?" (7)       │
│   block_hashes: [0xABCD]                         │
│     ← hash("Hello, world! How"[:16]) = 0xABCD    │
│     ← 和 A 相同!                                  │
└─────────────────────────────────────────────────┘

Step 2: 检查 GPU prefix cache
┌─────────────────────────────────────────────────┐
│ kv_cache_manager.get_computed_blocks(B)          │
│   → GPU cache 查询 (B 是新的)                     │
│   → num_computed_tokens = 0                      │
└─────────────────────────────────────────────────┘

Step 3: 检查 CPU prefix cache
┌─────────────────────────────────────────────────┐
│ connector.get_num_new_matched_tokens(B, 0)       │
│   → stale 检查: _pending_cpu_hits.pop("B")=None  │
│                                                  │
│   → CPU cache 查询:                              │
│     cached_block_hash_to_block[0xABCD] = CPU[1]  │
│   → ✅ 命中!                                     │
│   → cpu_hit_blocks = [[CPU[1]]]                  │
│   → hit_length = 3 tokens                        │
│   → max_hit_len = 7 - 1 - 0 = 6                  │
│   → 3 < 6, 全部命中                              │
│                                                  │
│   → touch(CPU[1])  ← ref_cnt=1 (pin, 防驱逐)     │
│   → CPU[1] 从 free_queue 移除                     │
│   → _pending_cpu_hits["B"] = ([[CPU[1]]], 3)     │
│                                                  │
│   → return (3, True)  ← 可从 CPU 加载 3 tokens   │
└─────────────────────────────────────────────────┘

Step 4: 分配 GPU block
┌─────────────────────────────────────────────────┐
│ kv_cache_manager.allocate_slots(B, num_external=3)│
│   → 需要 blocks: ceil(7/16) = 1                  │
│   → 分配 GPU block 2                             │
│   → GPU[2].ref_cnt = 1                           │
│                                                  │
│ GPU 状态:                                        │
│   blocks[1]: ref_cnt=0, hash=0xABCD, 在 free     │
│   blocks[2]: ref_cnt=1, hash=None, 内容=空       │
│   free_queue: [1, 3, 4, ..., 99]                 │
│            (1 回来了, 但 CPU[1] 还在 pin)          │
└─────────────────────────────────────────────────┘

Step 5: 建立传输映射
┌─────────────────────────────────────────────────┐
│ connector.update_state_after_alloc(B, [[2]], 3)  │
│                                                  │
│   → 消费 pending hit:                            │
│     pending = _pending_cpu_hits.pop("B")         │
│     → ([[CPU[1]]], 3)                            │
│                                                  │
│   → 建立映射:                                    │
│     cpu_hit_blocks = [[CPU[1]]]                  │
│     gpu_block_ids = [2]  ← 新分配的               │
│     cpu_block_ids = [1]  ← 之前命中的             │
│                                                  │
│   → _reqs_to_load["B"] = LoadRequestState(       │
│       transfer_meta=TransferMeta([2], [1])       │
│     )                                            │
│                                                  │
│   → free_blocks(CPU[1])  ← 释放之前的 pin         │
│   → touch(CPU[1])    ← 重新 touch (传输保护)      │
│   → touch(GPU[2])    ← 传输保护                   │
│                                                  │
│   → 注册 eager store:                            │
│     _reqs_to_store["B"] = StoreRequestState(...) │
└─────────────────────────────────────────────────┘

Step 6: 构建传输元数据
┌─────────────────────────────────────────────────┐
│ connector.build_connector_meta()                 │
│   → 收集 pending loads:                          │
│     _reqs_to_load["B"].load_event = None         │
│     → load_event = 0                             │
│     → _reqs_to_load["B"].load_event = 0          │
│                                                  │
│   → prepare_store_specs():                       │
│     扫描 _reqs_to_store["B"]                     │
│     → GPU[2] 有 KV 数据,需要 offload             │
│     → 分配 CPU block 2                           │
│     → stamp: CPU[2]._block_hash = 0xEFGH         │
│       (hash("Hello...How are you?") = 0xEFGH)    │
│     → touch(GPU[2])                              │
│                                                  │
│   → return:                                      │
│     load_event=0                                 │
│     load_gpu_blocks=[2]   ← 目标 (CPU→GPU)       │
│     load_cpu_blocks=[1]   ← 源                   │
│     store_event=0                                │
│     store_gpu_blocks=[2]  ← 源 (GPU→CPU)         │
│     store_cpu_blocks=[2]  ← 目标                 │
└─────────────────────────────────────────────────┘

Step 7: Worker DMA (CPU → GPU)
┌─────────────────────────────────────────────────┐
│ get_finished() - 处理 load:                      │
│   → DMA: CPU[1] → GPU[2] (4MB, 异步)             │
│   → GPU[2] 内容: K/V for "Hello, world!"         │
│   → 返回 finished_recving={"B"}                  │
└─────────────────────────────────────────────────┘

Step 8: Worker 模型推理
┌─────────────────────────────────────────────────┐
│ model.forward("How are you?")                    │
│   → 跳过已加载的 "Hello, world!" (3 tokens)      │
│   → 只计算 "How are you?" (4 tokens)             │
│   → KV 写入 GPU[2] slots 3:6                     │
│   → GPU[2] 内容: "Hello, world! How are you?"    │
└─────────────────────────────────────────────────┘

Step 9: Worker DMA (GPU → CPU)
┌─────────────────────────────────────────────────┐
│ get_finished() - 处理 store:                     │
│   → DMA: GPU[2] → CPU[2] (4MB, 异步)             │
│   → 返回 completed_store_events={0}              │
└─────────────────────────────────────────────────┘

Step 10: Scheduler 处理完成事件
┌─────────────────────────────────────────────────┐
│ update_connector_output():                       │
│   → 处理 load 完成 (finished_recving={"B"}):     │
│     _cleanup_load_request("B"):                  │
│       → del _reqs_to_load["B"]                   │
│       → free_blocks(CPU[1])  → ref_cnt -= 1      │
│       → free_blocks(GPU[2])  → ref_cnt -= 1      │
│                                                  │
│   → 处理 store 完成 (completed_store_events={0}):│
│     _process_store_completion([2], [2]):         │
│       → 注册 CPU cache:                          │
│         cached_block_hash_to_block[0xEFGH]=CPU[2]│
│       → free_blocks(GPU[2])  → ref_cnt=0         │
│       → free_blocks(CPU[2])  → ref_cnt=0         │
│                                                  │
│ 最终状态:                                        │
│   GPU[1]: ref_cnt=0, hash=0xABCD, 在 free_queue  │
│   GPU[2]: ref_cnt=0, hash=None*, 在 free_queue   │
│   CPU[1]: ref_cnt=0, hash=0xABCD, 在 cache map   │
│   CPU[2]: ref_cnt=0, hash=0xEFGH, 在 cache map   │
│   (* GPU[2] 的 hash 由 kv_cache_manager 管理)     │
└─────────────────────────────────────────────────┘

数据流动总结

请求 A 的数据流:
┌─────────────────────────────────────────────────────┐
│                                                      │
│  "Hello, world!" (3 tokens)                          │
│       │                                              │
│       ▼                                              │
│  GPU[1] ──forward()──▶ KV 写入 GPU[1]                │
│       │                                              │
│       ▼                                              │
│  GPU[1] ──DMA──▶ CPU[1] (hash=0xABCD)               │
│                    │                                 │
│                    ▼                                 │
│              注册到 CPU cache map                    │
│              cached_block_hash_to_block[0xABCD]      │
│                = CPU[1]                              │
│                                                      │
└─────────────────────────────────────────────────────┘

请求 B 的数据流:
┌─────────────────────────────────────────────────────┐
│                                                      │
│  "Hello, world! How are you?" (7 tokens)             │
│       │                                              │
│       ▼                                              │
│  CPU cache 查询: hash(0xABCD) → CPU[1] ✅ 命中       │
│       │                                              │
│       ▼                                              │
│  CPU[1] ──DMA──▶ GPU[2] (加载 "Hello, world!")       │
│       │                                              │
│       ▼                                              │
│  GPU[2] ──forward()──▶ 计算 "How are you?"           │
│       │                                              │
│       ▼                                              │
│  GPU[2] ──DMA──▶ CPU[2] (hash=0xEFGH)               │
│                    │                                 │
│                    ▼                                 │
│              注册到 CPU cache map                    │
│              cached_block_hash_to_block[0xEFGH]      │
│                = CPU[2]                              │
│                                                      │
└─────────────────────────────────────────────────────┘

最终 CPU cache 状态:
┌──────────────────────────────────────────────────────┐
│ cached_block_hash_to_block:                          │
│   {                                                  │
│     BlockHash(0xABCD, g=0): CPU[1]  ← "Hello, world!"│
│     BlockHash(0xEFGH, g=0): CPU[2]  ← "Hello...How.."│
│   }                                                  │
│                                                      │
│ free_queue (CPU): [1, 2, 3, ..., 49]                 │
│   (1 和 2 都在队列中,1 更老,优先驱逐)                 │
└──────────────────────────────────────────────────────┘