vLLM SimpleCPUOffloadConnector Process

一、环境设定

模型:       LLaMA-3-8B (head_size=128, num_kv_heads=32, num_layers=32)
block_size: 16 tokens
GPU 容量:   100 blocks (每 block 约 2MB × 32 layers ≈ 64MB)
CPU 容量:   50 blocks  (约 3.2GB)
hash粒度:   16 tokens (每 16 个 token 计算一个 block hash)
offload模式: eager (每个新计算/加载的 block 立即 offload 到 CPU)
通信方式:   进程内直接调用 (非网络通信)

二、服务启动 — Connector 创建

用户命令:
vllm serve meta-llama/Llama-3-8b \
    --enable-prefix-caching \
    --kv-connector SimpleCPUOffloadConnector \
    --kv-connector-extra-config '{"cpu_bytes_to_use_per_rank": 3221225472, "lazy_offload": false}'

┌─────────────────────────────────────────────────────────────────────┐
│ 1. 解析配置 → VllmConfig                                            │
│    ├── cache_config.enable_prefix_caching = True                    │
│    ├── kv_transfer_config.kv_connector = "SimpleCPUOffloadConnector"│
│    └── kv_transfer_config.kv_connector_extra_config = {...}         │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│ 2. Scheduler 进程初始化                                             │
│    Scheduler.__init__()                                             │
│    └── self.connector = KVConnectorFactory.create_connector(        │
│          config=vllm_config,                                        │
│          role=KVConnectorRole.SCHEDULER,                            │
│          kv_cache_config=kv_cache_config,                           │
│        )                                                            │
│        │                                                            │
│        ├── 查找注册表:                                              │
│        │   KVConnectorFactory._registry["SimpleCPUOffloadConnector"] │
│        │   → 动态导入 simple_cpu_offload_connector.SimpleCPUOffl... │
│        │                                                            │
│        └── return SimpleCPUOffloadConnector(                        │
│              vllm_config,                                           │
│              role=KVConnectorRole.SCHEDULER,  ← Scheduler 角色      │
│              kv_cache_config,                                       │
│            )                                                        │
│            │                                                        │
│            └── __init__:                                            │
│                ├── cpu_capacity_per_rank = 3221225472 (3GB)         │
│                ├── lazy_offload = False                             │
│                ├── self.scheduler_manager = SimpleCPUOffloadSch..   │ ✅
│                └── self.worker_handler = None                        │ ❌
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│ 3. Worker 进程初始化 (每张 GPU 一个)                                 │
│    Worker.__init__()                                                │
│    └── self.kv_connector = KVConnectorFactory.create_connector(     │
│          config=vllm_config,                                        │
│          role=KVConnectorRole.WORKER,                               │
│          kv_cache_config=kv_cache_config,                           │
│        )                                                            │
│        │                                                            │
│        └── return SimpleCPUOffloadConnector(                        │
│              vllm_config,                                           │
│              role=KVConnectorRole.WORKER,  ← Worker 角色            │
│              kv_cache_config,                                       │
│            )                                                        │
│            │                                                        │
│            └── __init__:                                            │
│                ├── self.scheduler_manager = None                     │ ❌
│                └── self.worker_handler = SimpleCPUOffloadWorker      │ ✅
└─────────────────────────────────────────────────────────────────────┘

最终状态:
┌─────────────────────────────────────────────────┐
│ Scheduler 进程                                   │
│ ├── connector.scheduler_manager ✅              │
│ └── connector.worker_handler  ❌                │
├─────────────────────────────────────────────────┤
│ Worker 进程                                      │
│ ├── connector.scheduler_manager ❌              │
│ └── connector.worker_handler  ✅                │
└─────────────────────────────────────────────────┘

三、请求 A: “Hello, world!” (首次,3 tokens)

Phase 1: Scheduler 调度

┌─ Scheduler.schedule() ─────────────────────────────────────────┐
│                                                                 │
│ Step 1: 检查 GPU prefix cache                                   │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ kv_cache_manager.get_computed_blocks(req_A)             │    │
│ │ → 查询 GPU 上前缀缓存 (请求 A 是首次出现)                │    │
│ │ → num_computed_tokens = 0                                │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 2: 检查 CPU prefix cache                                   │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.get_num_new_matched_tokens(req_A, 0)          │    │
│ │ → SimpleCPUOffloadScheduler.get_num_new_matched_tokens()│    │
│ │   │                                                      │    │
│ │   ├── 计算 block hashes:                                 │    │
│ │   │   block_hashes = hash("Hello, world!"[:16])         │    │
│ │   │              = [0xABCD1234]                         │    │
│ │   │                                                      │    │
│ │   ├── cpu_coordinator.find_longest_cache_hit()          │    │
│ │   │   → 查询 CPU cache map:                             │    │
│ │   │     cached_block_hash_to_block[0xABCD1234] = ?      │    │
│ │   │   → 无命中 (CPU cache 为空)                          │    │
│ │   │   → return ([], 0)                                  │    │
│ │   │                                                      │    │
│ │   └── return (0, False)  ← 无外部缓存命中                │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 3: 分配 GPU blocks                                         │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ kv_cache_manager.allocate_slots(req_A)                  │    │
│ │ → 需要 tokens: 3                                         │    │
│ │ → 需要 blocks: ceil(3 / 16) = 1                         │    │
│ │ → 从 free_block_queue 取出 1 个 block                    │    │
│ │ → 分配 GPU block 101                                     │    │
│ │ → 返回: blocks = KVCacheBlocks(block_ids=[[101]])        │    │
│ │                                                          │    │
│ │ 此时 GPU block 状态:                                     │    │
│ │   block 101: allocated to req_A                          │    │
│ │   free_blocks: [0,1,...,100,102,...,99]                  │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 4: 通知 connector 分配结果                                 │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.update_state_after_alloc(req_A, blocks, 0)    │    │
│ │ → SimpleCPUOffloadScheduler.update_state_after_alloc()  │    │
│ │   │                                                      │    │
│ │   ├── num_external_tokens = 0                            │    │
│ │   │   → 无需从 CPU 加载 (没有 pending CPU hit)           │    │
│ │   │                                                      │    │
│ │   └── eager 模式: 注册 req_A 到 _reqs_to_store           │    │
│ │       → 标记: block 101 计算完成后需要 offload 到 CPU    │    │
│ │       → _req_to_store_states[req_A] = StoreRequestState  │    │
│ │           (block_ids=[101], pending_blocks={101})        │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 5: 构建传输元数据                                          │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.build_connector_meta(scheduler_output)        │    │
│ │ → SimpleCPUOffloadScheduler.build_connector_meta()      │    │
│ │   │                                                      │    │
│ │   ├── _prepare_lazy_store_specs() / _prepare_eager_s..  │    │
│ │   │   → eager 模式: 扫描 _req_to_store_states            │    │
│ │   │   → 发现 req_A 的 blocks: [101]                      │    │
│ │   │   → 检查 CPU cache 是否已有 hash(101)               │    │
│ │   │   → 无,需要分配新的 CPU block                       │    │
│ │   │   → cpu_block_pool.allocate() → 分配 CPU block 1    │    │
│ │   │   → 建立映射:                                       │    │
│ │   │       _per_req_store_states[req_A] = (              │    │
│ │   │         gpu_block_ids = [101],                      │    │
│ │   │         cpu_block_ids = [1],                        │    │
│ │   │         hashes = [0xABCD1234],                      │    │
│ │   │       )                                              │    │
│ │   │                                                      │    │
│ │   └── 收集 pending loads (无)                            │    │
│ │   │                                                      │    │
│ │   └── return SimpleCPUOffloadMetadata:                   │    │
│ │       store_event      = 0                               │    │
│ │       store_gpu_blocks  = [101]  ← 源                    │    │
│ │       store_cpu_blocks  = [1]    ← 目标                  │    │
│ │       load_event       = -1      (无加载)                │    │
│ │       load_gpu_blocks   = []                             │    │
│ │       load_cpu_blocks   = []                             │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 6: 发送 SchedulerOutput → Worker                           │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ scheduler_output = SchedulerOutput(                     │    │
│ │     scheduled_reqs=[req_A],                             │    │
│ │     blocks_to_swap_in=[],                               │    │
│ │     blocks_to_swap_out=[],                              │    │
│ │     kv_connector_metadata=SimpleCPUOffloadMetadata(...),│    │
│ │ )                                                        │    │
│ │                                                          │    │
│ │ → 通过进程内函数调用传递给 Worker                          │    │
│ └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Phase 2: Worker 执行

┌─ Worker.execute_model(scheduler_output) ────────────────────────┐
│                                                                  │
│ Step 1: 绑定传输元数据                                            │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ connector.bind_connector_metadata(metadata)              │     │
│ │ → SimpleCPUOffloadWorker.bind_connector_metadata()       │     │
│ │   │                                                       │     │
│ │   ├── self.pending_metadata = metadata                   │     │
│ │   ├── 解析 store_event=0: GPU[101] → CPU[1]             │     │
│ │   └── 准备 GPU KV cache tensor 引用:                     │     │
│ │       kv_cache_tensors[layer_idx].block(101)             │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 2: 模型推理                                                  │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ model_runner.forward()                                    │     │
│ │ → 输入: "Hello, world!" (3 tokens)                        │     │
│ │ → 逐层前向传播 (32 layers):                               │     │
│ │     for layer in model.layers:                            │     │
│ │         K, V = attention.forward(...)                     │     │
│ │         kv_caches[layer][block=101] = K, V                │     │
│ │                                                           │     │
│ │ → kv_caches[layer][101] 数据布局:                         │     │
│ │     shape: (1, 16, 32, 128)  ← 1 block, 16 slots         │     │
│ │     dtype: float16                                        │     │
│ │     size: 1 × 16 × 32 × 128 × 2 bytes ≈ 128KB / layer    │     │
│ │     total: 128KB × 32 layers ≈ 4MB                        │     │
│ │                                                           │     │
│ │ → 输出: 生成下一个 token                                   │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 3: 执行异步 DMA 传输 (GPU → CPU)                            │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ connector.get_finished(finished_req_ids)                 │     │
│ │ → SimpleCPUOffloadWorker.get_finished()                  │     │
│ │   │                                                       │     │
│ │   ├── _process_store_event(event_idx=0):                 │     │
│ │   │   │                                                   │     │
│ │   │   ├── 获取 GPU KV cache tensor:                      │     │
│ │   │   │   src = kv_caches[block=101]  ← 4MB              │     │
│ │   │   │                                                   │     │
│ │   │   ├── 获取 CPU pinned memory:                        │     │
│ │   │   │   dst = cpu_pinned_memory[block=1]  ← 4MB        │     │
│ │   │   │                                                   │     │
│ │   │   ├── 启动异步 DMA 拷贝:                              │     │
│ │   │   │   for layer in range(num_layers):                │     │
│ │   │   │       cudaMemcpyAsync(                           │     │
│ │   │   │           dst=cpu_pinned[layer][block=1],        │     │
│ │   │   │           src=gpu_kv_cache[layer][block=101],    │     │
│ │   │   │           size=128KB,                            │     │
│ │   │   │           stream=copy_stream,                    │     │
│ │   │   │       )                                           │     │
│ │   │   │                                                   │     │
│ │   │   └── 记录事件:                                       │     │
│ │   │       _pending_store_events[0] = {                   │     │
│ │   │           gpu_block_ids: [101],                      │     │
│ │   │           cpu_block_ids: [1],                        │     │
│ │   │           req_ids: {req_A},                          │     │
│ │   │       }                                               │     │
│ │   │                                                       │     │
│ │   └── return KVConnectorOutput:                          │     │
│ │       kv_connector_worker_meta = {                       │     │
│ │         completed_store_events: {0: {                    │     │
│ │           gpu_block_ids: [101],                          │     │
│ │           cpu_block_ids: [1],                            │     │
│ │         }}                                                │     │
│ │       }                                                   │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 4: 返回结果 → Scheduler                                     │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ ModelRunnerOutput(                                       │     │
│ │     req_ids=[req_A],                                     │     │
│ │     sampled_token_ids=[...],                             │     │
│ │     kv_connector_output=KVConnectorOutput(...),          │     │
│ │ )                                                         │     │
│ └──────────────────────────────────────────────────────────┘     │
└──────────────────────────────────────────────────────────────────┘

Phase 3: Scheduler 清理

┌─ Scheduler 接收 KVConnectorOutput ───────────────────────────────┐
│                                                                  │
│ connector.update_connector_output(kv_connector_output)           │
│ → SimpleCPUOffloadScheduler.update_connector_output()            │
│   │                                                               │
│   ├── 检测到 completed_store_events: {0}                          │
│   │                                                               │
│   ├── _process_store_event(event_idx=0):                         │
│   │   │                                                           │
│   │   ├── _process_store_completion(                             │
│   │   │       gpu_block_ids=[101],                               │
│   │   │       cpu_block_ids=[1],                                 │
│   │   │       hashes=[0xABCD1234],                               │
│   │   │   )                                                       │
│   │   │   │                                                       │
│   │   │   ├── 注册 CPU cache:                                    │
│   │   │   │   cpu_coordinator.add_block(                         │
│   │   │   │       block_id=1,                                    │
│   │   │   │       prefix_hash=0xABCD1234,                        │
│   │   │   │   )                                                   │
│   │   │   │   → cached_block_hash_to_block[0xABCD1234] = CPU[1]  │
│   │   │   │                                                       │
│   │   │   ├── 释放传输保护 ref:                                  │
│   │   │   │   gpu_block_pool.free_blocks([101])  ← ref_cnt -= 1  │
│   │   │   │   cpu_block_pool.free_blocks([1])   ← ref_cnt -= 1   │
│   │   │   │                                                       │
│   │   │   └── 清理请求状态:                                      │
│   │   │       del _req_to_store_states[req_A]                    │
│   │   │                                                           │
│   │   └── 清理事件状态:                                          │
│   │       del _per_req_store_states[req_A]                       │
│   │                                                               │
│   └── 此时 CPU cache 状态:                                       │
│       ┌─────┬────────────┬──────────────────┬────────┐           │
│       │ ID  │ hash       │ 内容             │ 状态   │           │
│       ├─────┼────────────┼──────────────────┼────────┤           │
│       │  1  │ 0xABCD1234 │ "Hello, world!"  │ 已注册 │           │
│       │ ... │ ...        │ ...              │        │           │
│       └─────┴────────────┴──────────────────┴────────┘           │
│         已用: 1/50 blocks                                        │
└──────────────────────────────────────────────────────────────────┘

四、请求 B: “Hello, world! How are you?” (7 tokens,命中缓存)

Phase 1: Scheduler 调度

┌─ Scheduler.schedule() ─────────────────────────────────────────┐
│                                                                 │
│ Step 1: 检查 GPU prefix cache                                   │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ kv_cache_manager.get_computed_blocks(req_B)             │    │
│ │ → GPU prefix cache 按请求隔离,B 是新的                  │    │
│ │ → num_computed_tokens = 0                                │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 2: 检查 CPU prefix cache                                   │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.get_num_new_matched_tokens(req_B, 0)          │    │
│ │ → SimpleCPUOffloadScheduler.get_num_new_matched_tokens()│    │
│ │   │                                                      │    │
│ │   ├── 计算 block hashes:                                 │    │
│ │   │   text = "Hello, world! How are you?"[:16]          │    │
│ │   │            = "Hello, world! How"                     │    │
│ │   │   block_hashes = [hash(text)] = [0xABCD1234]        │    │
│ │   │   ← 注意: 前 16 字符包含 "Hello, world!" 的前缀      │    │
│ │   │                                                      │    │
│ │   ├── cpu_coordinator.find_longest_cache_hit()          │    │
│ │   │   → 查询 CPU cache map:                             │    │
│ │   │     cached_block_hash_to_block[0xABCD1234] = ?      │    │
│ │   │   → ✅ 命中! CPU block 1                             │    │
│ │   │   → 返回:                                           │    │
│ │   │     cpu_hit_blocks = [1]                            │    │
│ │   │     hit_length = 3 tokens ("Hello, world!" 实际长度) │    │
│ │   │                                                      │    │
│ │   ├── touch CPU block 1 (防止 LRU 驱逐):                 │    │
│ │   │   cpu_block_pool.touch([1])                         │    │
│ │   │                                                      │    │
│ │   ├── 记录 pending hit:                                  │    │
│ │   │   _pending_cpu_hits[req_B] = (                      │    │
│ │   │     cpu_block_ids=[1],                              │    │
│ │   │     hit_length=3,                                   │    │
│ │   │   )                                                  │    │
│ │   │                                                      │    │
│ │   └── return (3, True)  ← 可从 CPU 加载 3 tokens         │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 3: 分配 GPU blocks                                         │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ kv_cache_manager.allocate_slots(req_B, num_external=3)  │    │
│ │ → 总共需要: 7 tokens                                     │    │
│ │ → 从 CPU 加载: 3 tokens (不需要 GPU 计算)                │    │
│ │ → 需要 GPU 计算: 7 - 3 = 4 tokens                        │    │
│ │ → 需要 blocks: ceil(7 / 16) = 1                         │    │
│ │ → 分配 GPU block 102 (用于加载前缀 + 新计算)             │    │
│ │                                                          │    │
│ │ 注意: 不能复用 block 101 的原因:                         │    │
│ │   - 101 可能还在被请求 A 引用 (如果 A 还在生成)           │    │
│ │   - 每个请求必须有自己专属的 blocks (物理隔离)            │    │
│ │   - 即使内容相同,也不能共享同一个物理 block              │    │
│ │                                                          │    │
│ │ → 返回: blocks = KVCacheBlocks(block_ids=[[102]])        │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 4: 通知 connector 分配结果                                 │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.update_state_after_alloc(req_B, blocks, 3)    │    │
│ │ → SimpleCPUOffloadScheduler.update_state_after_alloc()  │    │
│ │   │                                                      │    │
│ │   ├── num_external_tokens = 3 > 0                        │    │
│ │   │                                                      │    │
│ │   ├── 检查 pending CPU hit:                              │    │
│ │   │   _pending_cpu_hits[req_B] = ([1], 3)  ✅ 存在       │    │
│ │   │                                                      │    │
│ │   ├── 构建 GPU↔CPU block 映射:                          │    │
│ │   │   pending = _pending_cpu_hits.pop(req_B)             │    │
│ │   │   cpu_block_ids = [1]  ← 从 CPU block 1 读取        │    │
│ │   │   gpu_block_ids = [102] ← 加载到 GPU block 102      │    │
│ │   │                                                      │    │
│ │   ├── 创建传输状态:                                      │    │
│ │   │   _reqs_to_load[req_B] = LoadRequestState(          │    │
│ │   │     request=req_B,                                  │    │
│ │   │     transfer_meta=TransferMeta(                     │    │
│ │   │       gpu_block_ids=[102],  ← 目标 GPU block         │    │
│ │   │       cpu_block_ids=[1],    ← 源 CPU block           │    │
│ │   │     ),                                               │    │
│ │   │     hit_length=3,                                   │    │
│ │   │   )                                                  │    │
│ │   │                                                      │    │
│ │   └── eager 模式: 注册 req_B 到 _reqs_to_store           │    │
│ │       → 标记: block 102 计算完成后需要 offload           │    │
│ │       → _req_to_store_states[req_B] = StoreRequestState  │    │
│ │           (block_ids=[102], pending_blocks={102})        │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 5: 构建传输元数据                                          │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ connector.build_connector_meta(scheduler_output)        │    │
│ │ → SimpleCPUOffloadScheduler.build_connector_meta()      │    │
│ │   │                                                      │    │
│ │   ├── 收集 pending loads:                                │    │
│ │   │   遍历 _reqs_to_load:                                │    │
│ │   │   → req_B: GPU[102] ← CPU[1]                        │    │
│ │   │   → 分配 load_event = 0                              │    │
│ │   │   → load_event_to_req_ids[0] = [req_B]              │    │
│ │   │                                                      │    │
│ │   ├── prepare_store_specs():                             │    │
│ │   │   → eager 模式: 扫描 _req_to_store_states            │    │
│ │   │   → 发现 req_B 的 blocks: [102]                      │    │
│ │   │   → 分配 CPU block 2                                 │    │
│ │   │   → _per_req_store_states[req_B] = (                │    │
│ │   │       gpu_block_ids=[102],                          │    │
│ │   │       cpu_block_ids=[2],                            │    │
│ │   │     )                                                │    │
│ │   │   → store_event = 1                                  │    │
│ │   │                                                      │    │
│ │   └── return SimpleCPUOffloadMetadata:                   │    │
│ │       load_event       = 0                               │    │
│ │       load_gpu_blocks  = [102]  ← 目标                   │    │
│ │       load_cpu_blocks  = [1]    ← 源                     │    │
│ │       store_event      = 1                               │    │
│ │       store_gpu_blocks  = [102]  ← 源                    │    │
│ │       store_cpu_blocks  = [2]    ← 目标                  │    │
│ └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│ Step 6: 发送 SchedulerOutput → Worker                           │
│ ┌─────────────────────────────────────────────────────────┐    │
│ │ scheduler_output.kv_connector_metadata =                │    │
│ │   SimpleCPUOffloadMetadata(                             │    │
│ │     load:  CPU[1] → GPU[102],                           │    │
│ │     store: GPU[102] → CPU[2],                           │    │
│ │   )                                                      │    │
│ │                                                          │    │
│ │ → 通过进程内函数调用传递给 Worker                          │    │
│ └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Phase 2: Worker 执行

┌─ Worker.execute_model(scheduler_output) ────────────────────────┐
│                                                                  │
│ Step 1: 绑定传输元数据                                            │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ connector.bind_connector_metadata(metadata)              │     │
│ │ → SimpleCPUOffloadWorker.bind_connector_metadata()       │     │
│ │   │                                                       │     │
│ │   ├── self.pending_metadata = metadata                   │     │
│ │   ├── 解析 load_event=0:  CPU[1] → GPU[102]             │     │
│ │   └── 解析 store_event=1: GPU[102] → CPU[2]             │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 2: 执行 CPU → GPU 加载 (DMA)                                │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ connector.get_finished(finished_req_ids)                 │     │
│ │ → SimpleCPUOffloadWorker.get_finished()                  │     │
│ │   │                                                       │     │
│ │   ├── _process_load_event(event_idx=0):                  │     │
│ │   │   │                                                   │     │
│ │   │   ├── 获取 CPU pinned memory:                        │     │
│ │   │   │   src = cpu_pinned_memory[block=1]  ← 4MB        │     │
│ │   │   │                                                   │     │
│ │   │   ├── 获取 GPU KV cache tensor:                      │     │
│ │   │   │   dst = kv_caches[block=102]  ← 4MB              │     │
│ │   │   │                                                   │     │
│ │   │   ├── 启动异步 DMA 拷贝:                              │     │
│ │   │   │   for layer in range(num_layers):                │     │
│ │   │   │       cudaMemcpyAsync(                           │     │
│ │   │   │           dst=gpu_kv_cache[layer][block=102],    │     │
│ │   │   │           src=cpu_pinned[layer][block=1],        │     │
│ │   │   │           size=128KB,                            │     │
│ │   │   │           stream=copy_stream,                    │     │
│ │   │   │       )                                           │     │
│ │   │   │                                                   │     │
│ │   │   └── 记录加载完成:                                  │     │
│ │   │       _finished_recving_req_ids.add(req_B)           │     │
│ │   │                                                       │     │
│ │   └── 现在 GPU[102] 包含 "Hello, world!" 的 KV 数据       │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 3: 模型推理                                                  │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ model_runner.forward()                                    │     │
│ │ → 输入: "How are you?" (4 tokens)                         │     │
│ │   (跳过已加载的 "Hello, world!" 3 tokens)                 │     │
│ │                                                           │     │
│ │ → 读取已有的 KV:                                          │     │
│ │     kv_caches[block=102]  ← 前 3 tokens 的 K/V           │     │
│ │                                                           │     │
│ │ → 计算新 tokens:                                          │     │
│ │     for layer in model.layers:                            │     │
│ │         K, V = attention.forward(                         │     │
│ │             input_ids="How are you?",                     │     │
│ │             start_slot=3,  ← 从第 4 个位置开始            │     │
│ │         )                                                 │     │
│ │         kv_caches[layer][block=102][slots 3:6] = K, V     │     │
│ │                                                           │     │
│ │ → GPU[102] 最终内容:                                      │     │
│ │     slots 0-2: "Hello, world!" (从 CPU 加载)              │     │
│ │     slots 3-6: "How are you?"     (新计算)                │     │
│ │                                                           │     │
│ │ → 节省了 3 tokens 的计算!                                  │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 4: 执行异步 DMA 传输 (GPU → CPU)                            │
│ ┌──────────────────────────────────────────────────────────┐     │
│ │ connector.get_finished() (再次调用, 处理 store)           │     │
│ │ → _process_store_event(event_idx=1):                     │     │
│ │   │                                                       │     │
│ │   ├── 获取 GPU KV cache tensor:                          │     │
│ │   │   src = kv_caches[block=102]  ← 4MB                  │     │
│ │   │   (现在包含完整的 "Hello, world! How are you?")       │     │
│ │   │                                                       │     │
│ │   ├── 获取 CPU pinned memory:                            │     │
│ │   │   dst = cpu_pinned_memory[block=2]  ← 4MB            │     │
│ │   │                                                       │     │
│ │   ├── 启动异步 DMA 拷贝:                                  │     │
│ │   │   cudaMemcpyAsync(dst=cpu[2], src=gpu[102], ...)     │     │
│ │   │                                                       │     │
│ │   └── return KVConnectorOutput:                          │     │
│ │       finished_recving = {req_B}          ← load 完成     │     │
│ │       completed_store_events = {1: {                     │     │
│ │         gpu_block_ids: [102],                            │     │
│ │         cpu_block_ids: [2],                              │     │
│ │       }}                                                  │     │
│ └──────────────────────────────────────────────────────────┘     │
│                                                                  │
│ Step 5: 返回结果 → Scheduler                                     │
└──────────────────────────────────────────────────────────────────┘

Phase 3: Scheduler 清理

┌─ Scheduler 接收 KVConnectorOutput ───────────────────────────────┐
│                                                                  │
│ connector.update_connector_output(kv_connector_output)           │
│ → SimpleCPUOffloadScheduler.update_connector_output()            │
│   │                                                               │
│   ├── 处理 load 完成 (finished_recving={req_B}):                 │
│   │   _cleanup_load_request(req_B):                              │
│   │   → del _reqs_to_load[req_B]                                 │
│   │   → 清理 load_event=0 的映射                                 │
│   │                                                               │
│   ├── 处理 store 完成 (completed_store_events={1}):               │
│   │   _process_store_event(event_idx=1):                         │
│   │   → _process_store_completion(                               │
│   │         gpu_block_ids=[102],                                 │
│   │         cpu_block_ids=[2],                                   │
│   │         hashes=[0xEFGH5678],  ← "Hello...How are you?"       │
│   │       )                                                       │
│   │   → 注册 CPU cache:                                          │
│   │       cached_block_hash_to_block[0xEFGH5678] = CPU[2]        │
│   │   → 释放传输保护 ref:                                        │
│   │       gpu_block_pool.free_blocks([102])                      │
│   │       cpu_block_pool.free_blocks([2])                        │
│   │   → 清理 _req_to_store_states[req_B]                         │
│   │                                                               │
│   └── 最终 CPU cache 状态:                                       │
│       ┌─────┬────────────┬──────────────────────────────┐        │
│       │ ID  │ hash       │ 内容                          │        │
│       ├─────┼────────────┼──────────────────────────────┤        │
│       │  1  │ 0xABCD1234 │ "Hello, world!"              │        │
│       │  2  │ 0xEFGH5678 │ "Hello, world! How are you?" │        │
│       └─────┴────────────┴──────────────────────────────┘        │
│         已用: 2/50 blocks                                        │
└──────────────────────────────────────────────────────────────────┘

五、完整数据流总结

请求 A: "Hello, world!" (3 tokens)
│
├─ Scheduler:
│   ├── GPU prefix cache: 无命中 (A 是新的)
│   ├── CPU prefix cache: 无命中 (首次)
│   ├── kv_cache_manager.allocate_slots() → GPU block 101
│   ├── connector 记录: block 101 需 offload
│   └── build_connector_meta → store: GPU[101] → CPU[1]
│
├─ Worker:
│   ├── model.forward() → KV 写入 GPU[101] (4MB)
│   ├── DMA: GPU[101] → CPU[1] (异步, ~2ms PCIe)
│   └── 返回 completed_store_events={0}
│
└─ Scheduler:
    └── 注册 CPU cache: hash("Hello...")=0xABCD → CPU[1]

─────────────────────────────────────────────────────────────────

请求 B: "Hello, world! How are you?" (7 tokens)
│
├─ Scheduler:
│   ├── GPU prefix cache: 无命中 (B 是新的)
│   ├── CPU prefix cache: 命中! hash=0xABCD → CPU[1]
│   ├── kv_cache_manager.allocate_slots() → GPU block 102
│   │   (不能复用 101,因为不同请求需要物理隔离)
│   ├── connector 建立映射: CPU[1] → GPU[102]
│   └── build_connector_meta
│       → load:  CPU[1] → GPU[102]
│       → store: GPU[102] → CPU[2]
│
├─ Worker:
│   ├── DMA: CPU[1] → GPU[102] (加载 "Hello, world!", 4MB)
│   ├── model.forward() → 只计算 "How are you?" (节省 3 tokens)
│   │   └─ GPU[102] 现在包含完整内容
│   ├── DMA: GPU[102] → CPU[2] (offload 完整内容, 4MB)
│   └── 返回 finished_recving={req_B}, completed_store_events={1}
│
└─ Scheduler:
    └── 注册 CPU cache: hash("Hello...How...")=0xEFGH → CPU[2]
    └── 清理 _reqs_to_load[req_B], _req_to_store_states[req_B]

─────────────────────────────────────────────────────────────────

最终状态:
┌─ GPU KV Cache ────────────────────────────────┐
│ block 102: "Hello, world! How are you?"       │  ← B 的活跃数据
│ free_blocks: 98/100                           │
└───────────────────────────────────────────────┘

┌─ CPU KV Cache ────────────────────────────────┐
│ block 1: "Hello, world!"          (hash=ABCD) │  ← 可被复用
│ block 2: "Hello, world! How are you?" (EFGH)  │  ← 可被复用
│ free_blocks: 48/50                            │
└───────────────────────────────────────────────┘

性能收益:
- 请求 B 节省了 3 tokens 的 forward 计算
- 32 layers × 3 tokens 的 attention 计算被跳过
- 代价: 2 次 DMA 传输 (load + store, 各 ~2ms)