模型: LLaMA-3-8B (head_size=128, num_kv_heads=32, num_layers=32)
block_size: 16 tokens
GPU 容量: 100 blocks (每 block 约 2MB × 32 layers ≈ 64MB)
CPU 容量: 50 blocks (约 3.2GB)
hash粒度: 16 tokens (每 16 个 token 计算一个 block hash)
offload模式: eager (每个新计算/加载的 block 立即 offload 到 CPU)
通信方式: 进程内直接调用 (非网络通信)
用户命令:
vllm serve meta-llama/Llama-3-8b \
--enable-prefix-caching \
--kv-connector SimpleCPUOffloadConnector \
--kv-connector-extra-config '{"cpu_bytes_to_use_per_rank": 3221225472, "lazy_offload": false}'
┌─────────────────────────────────────────────────────────────────────┐
│ 1. 解析配置 → VllmConfig │
│ ├── cache_config.enable_prefix_caching = True │
│ ├── kv_transfer_config.kv_connector = "SimpleCPUOffloadConnector"│
│ └── kv_transfer_config.kv_connector_extra_config = {...} │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 2. Scheduler 进程初始化 │
│ Scheduler.__init__() │
│ └── self.connector = KVConnectorFactory.create_connector( │
│ config=vllm_config, │
│ role=KVConnectorRole.SCHEDULER, │
│ kv_cache_config=kv_cache_config, │
│ ) │
│ │ │
│ ├── 查找注册表: │
│ │ KVConnectorFactory._registry["SimpleCPUOffloadConnector"] │
│ │ → 动态导入 simple_cpu_offload_connector.SimpleCPUOffl... │
│ │ │
│ └── return SimpleCPUOffloadConnector( │
│ vllm_config, │
│ role=KVConnectorRole.SCHEDULER, ← Scheduler 角色 │
│ kv_cache_config, │
│ ) │
│ │ │
│ └── __init__: │
│ ├── cpu_capacity_per_rank = 3221225472 (3GB) │
│ ├── lazy_offload = False │
│ ├── self.scheduler_manager = SimpleCPUOffloadSch.. │ ✅
│ └── self.worker_handler = None │ ❌
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ 3. Worker 进程初始化 (每张 GPU 一个) │
│ Worker.__init__() │
│ └── self.kv_connector = KVConnectorFactory.create_connector( │
│ config=vllm_config, │
│ role=KVConnectorRole.WORKER, │
│ kv_cache_config=kv_cache_config, │
│ ) │
│ │ │
│ └── return SimpleCPUOffloadConnector( │
│ vllm_config, │
│ role=KVConnectorRole.WORKER, ← Worker 角色 │
│ kv_cache_config, │
│ ) │
│ │ │
│ └── __init__: │
│ ├── self.scheduler_manager = None │ ❌
│ └── self.worker_handler = SimpleCPUOffloadWorker │ ✅
└─────────────────────────────────────────────────────────────────────┘
最终状态:
┌─────────────────────────────────────────────────┐
│ Scheduler 进程 │
│ ├── connector.scheduler_manager ✅ │
│ └── connector.worker_handler ❌ │
├─────────────────────────────────────────────────┤
│ Worker 进程 │
│ ├── connector.scheduler_manager ❌ │
│ └── connector.worker_handler ✅ │
└─────────────────────────────────────────────────┘
┌─ Scheduler.schedule() ─────────────────────────────────────────┐
│ │
│ Step 1: 检查 GPU prefix cache │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ kv_cache_manager.get_computed_blocks(req_A) │ │
│ │ → 查询 GPU 上前缀缓存 (请求 A 是首次出现) │ │
│ │ → num_computed_tokens = 0 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 2: 检查 CPU prefix cache │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.get_num_new_matched_tokens(req_A, 0) │ │
│ │ → SimpleCPUOffloadScheduler.get_num_new_matched_tokens()│ │
│ │ │ │ │
│ │ ├── 计算 block hashes: │ │
│ │ │ block_hashes = hash("Hello, world!"[:16]) │ │
│ │ │ = [0xABCD1234] │ │
│ │ │ │ │
│ │ ├── cpu_coordinator.find_longest_cache_hit() │ │
│ │ │ → 查询 CPU cache map: │ │
│ │ │ cached_block_hash_to_block[0xABCD1234] = ? │ │
│ │ │ → 无命中 (CPU cache 为空) │ │
│ │ │ → return ([], 0) │ │
│ │ │ │ │
│ │ └── return (0, False) ← 无外部缓存命中 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 3: 分配 GPU blocks │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ kv_cache_manager.allocate_slots(req_A) │ │
│ │ → 需要 tokens: 3 │ │
│ │ → 需要 blocks: ceil(3 / 16) = 1 │ │
│ │ → 从 free_block_queue 取出 1 个 block │ │
│ │ → 分配 GPU block 101 │ │
│ │ → 返回: blocks = KVCacheBlocks(block_ids=[[101]]) │ │
│ │ │ │
│ │ 此时 GPU block 状态: │ │
│ │ block 101: allocated to req_A │ │
│ │ free_blocks: [0,1,...,100,102,...,99] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 4: 通知 connector 分配结果 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.update_state_after_alloc(req_A, blocks, 0) │ │
│ │ → SimpleCPUOffloadScheduler.update_state_after_alloc() │ │
│ │ │ │ │
│ │ ├── num_external_tokens = 0 │ │
│ │ │ → 无需从 CPU 加载 (没有 pending CPU hit) │ │
│ │ │ │ │
│ │ └── eager 模式: 注册 req_A 到 _reqs_to_store │ │
│ │ → 标记: block 101 计算完成后需要 offload 到 CPU │ │
│ │ → _req_to_store_states[req_A] = StoreRequestState │ │
│ │ (block_ids=[101], pending_blocks={101}) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 5: 构建传输元数据 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.build_connector_meta(scheduler_output) │ │
│ │ → SimpleCPUOffloadScheduler.build_connector_meta() │ │
│ │ │ │ │
│ │ ├── _prepare_lazy_store_specs() / _prepare_eager_s.. │ │
│ │ │ → eager 模式: 扫描 _req_to_store_states │ │
│ │ │ → 发现 req_A 的 blocks: [101] │ │
│ │ │ → 检查 CPU cache 是否已有 hash(101) │ │
│ │ │ → 无,需要分配新的 CPU block │ │
│ │ │ → cpu_block_pool.allocate() → 分配 CPU block 1 │ │
│ │ │ → 建立映射: │ │
│ │ │ _per_req_store_states[req_A] = ( │ │
│ │ │ gpu_block_ids = [101], │ │
│ │ │ cpu_block_ids = [1], │ │
│ │ │ hashes = [0xABCD1234], │ │
│ │ │ ) │ │
│ │ │ │ │
│ │ └── 收集 pending loads (无) │ │
│ │ │ │ │
│ │ └── return SimpleCPUOffloadMetadata: │ │
│ │ store_event = 0 │ │
│ │ store_gpu_blocks = [101] ← 源 │ │
│ │ store_cpu_blocks = [1] ← 目标 │ │
│ │ load_event = -1 (无加载) │ │
│ │ load_gpu_blocks = [] │ │
│ │ load_cpu_blocks = [] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 6: 发送 SchedulerOutput → Worker │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ scheduler_output = SchedulerOutput( │ │
│ │ scheduled_reqs=[req_A], │ │
│ │ blocks_to_swap_in=[], │ │
│ │ blocks_to_swap_out=[], │ │
│ │ kv_connector_metadata=SimpleCPUOffloadMetadata(...),│ │
│ │ ) │ │
│ │ │ │
│ │ → 通过进程内函数调用传递给 Worker │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─ Worker.execute_model(scheduler_output) ────────────────────────┐
│ │
│ Step 1: 绑定传输元数据 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ connector.bind_connector_metadata(metadata) │ │
│ │ → SimpleCPUOffloadWorker.bind_connector_metadata() │ │
│ │ │ │ │
│ │ ├── self.pending_metadata = metadata │ │
│ │ ├── 解析 store_event=0: GPU[101] → CPU[1] │ │
│ │ └── 准备 GPU KV cache tensor 引用: │ │
│ │ kv_cache_tensors[layer_idx].block(101) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 2: 模型推理 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ model_runner.forward() │ │
│ │ → 输入: "Hello, world!" (3 tokens) │ │
│ │ → 逐层前向传播 (32 layers): │ │
│ │ for layer in model.layers: │ │
│ │ K, V = attention.forward(...) │ │
│ │ kv_caches[layer][block=101] = K, V │ │
│ │ │ │
│ │ → kv_caches[layer][101] 数据布局: │ │
│ │ shape: (1, 16, 32, 128) ← 1 block, 16 slots │ │
│ │ dtype: float16 │ │
│ │ size: 1 × 16 × 32 × 128 × 2 bytes ≈ 128KB / layer │ │
│ │ total: 128KB × 32 layers ≈ 4MB │ │
│ │ │ │
│ │ → 输出: 生成下一个 token │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 3: 执行异步 DMA 传输 (GPU → CPU) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ connector.get_finished(finished_req_ids) │ │
│ │ → SimpleCPUOffloadWorker.get_finished() │ │
│ │ │ │ │
│ │ ├── _process_store_event(event_idx=0): │ │
│ │ │ │ │ │
│ │ │ ├── 获取 GPU KV cache tensor: │ │
│ │ │ │ src = kv_caches[block=101] ← 4MB │ │
│ │ │ │ │ │
│ │ │ ├── 获取 CPU pinned memory: │ │
│ │ │ │ dst = cpu_pinned_memory[block=1] ← 4MB │ │
│ │ │ │ │ │
│ │ │ ├── 启动异步 DMA 拷贝: │ │
│ │ │ │ for layer in range(num_layers): │ │
│ │ │ │ cudaMemcpyAsync( │ │
│ │ │ │ dst=cpu_pinned[layer][block=1], │ │
│ │ │ │ src=gpu_kv_cache[layer][block=101], │ │
│ │ │ │ size=128KB, │ │
│ │ │ │ stream=copy_stream, │ │
│ │ │ │ ) │ │
│ │ │ │ │ │
│ │ │ └── 记录事件: │ │
│ │ │ _pending_store_events[0] = { │ │
│ │ │ gpu_block_ids: [101], │ │
│ │ │ cpu_block_ids: [1], │ │
│ │ │ req_ids: {req_A}, │ │
│ │ │ } │ │
│ │ │ │ │
│ │ └── return KVConnectorOutput: │ │
│ │ kv_connector_worker_meta = { │ │
│ │ completed_store_events: {0: { │ │
│ │ gpu_block_ids: [101], │ │
│ │ cpu_block_ids: [1], │ │
│ │ }} │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 4: 返回结果 → Scheduler │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ModelRunnerOutput( │ │
│ │ req_ids=[req_A], │ │
│ │ sampled_token_ids=[...], │ │
│ │ kv_connector_output=KVConnectorOutput(...), │ │
│ │ ) │ │
│ └──────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
┌─ Scheduler 接收 KVConnectorOutput ───────────────────────────────┐
│ │
│ connector.update_connector_output(kv_connector_output) │
│ → SimpleCPUOffloadScheduler.update_connector_output() │
│ │ │
│ ├── 检测到 completed_store_events: {0} │
│ │ │
│ ├── _process_store_event(event_idx=0): │
│ │ │ │
│ │ ├── _process_store_completion( │
│ │ │ gpu_block_ids=[101], │
│ │ │ cpu_block_ids=[1], │
│ │ │ hashes=[0xABCD1234], │
│ │ │ ) │
│ │ │ │ │
│ │ │ ├── 注册 CPU cache: │
│ │ │ │ cpu_coordinator.add_block( │
│ │ │ │ block_id=1, │
│ │ │ │ prefix_hash=0xABCD1234, │
│ │ │ │ ) │
│ │ │ │ → cached_block_hash_to_block[0xABCD1234] = CPU[1] │
│ │ │ │ │
│ │ │ ├── 释放传输保护 ref: │
│ │ │ │ gpu_block_pool.free_blocks([101]) ← ref_cnt -= 1 │
│ │ │ │ cpu_block_pool.free_blocks([1]) ← ref_cnt -= 1 │
│ │ │ │ │
│ │ │ └── 清理请求状态: │
│ │ │ del _req_to_store_states[req_A] │
│ │ │ │
│ │ └── 清理事件状态: │
│ │ del _per_req_store_states[req_A] │
│ │ │
│ └── 此时 CPU cache 状态: │
│ ┌─────┬────────────┬──────────────────┬────────┐ │
│ │ ID │ hash │ 内容 │ 状态 │ │
│ ├─────┼────────────┼──────────────────┼────────┤ │
│ │ 1 │ 0xABCD1234 │ "Hello, world!" │ 已注册 │ │
│ │ ... │ ... │ ... │ │ │
│ └─────┴────────────┴──────────────────┴────────┘ │
│ 已用: 1/50 blocks │
└──────────────────────────────────────────────────────────────────┘
┌─ Scheduler.schedule() ─────────────────────────────────────────┐
│ │
│ Step 1: 检查 GPU prefix cache │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ kv_cache_manager.get_computed_blocks(req_B) │ │
│ │ → GPU prefix cache 按请求隔离,B 是新的 │ │
│ │ → num_computed_tokens = 0 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 2: 检查 CPU prefix cache │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.get_num_new_matched_tokens(req_B, 0) │ │
│ │ → SimpleCPUOffloadScheduler.get_num_new_matched_tokens()│ │
│ │ │ │ │
│ │ ├── 计算 block hashes: │ │
│ │ │ text = "Hello, world! How are you?"[:16] │ │
│ │ │ = "Hello, world! How" │ │
│ │ │ block_hashes = [hash(text)] = [0xABCD1234] │ │
│ │ │ ← 注意: 前 16 字符包含 "Hello, world!" 的前缀 │ │
│ │ │ │ │
│ │ ├── cpu_coordinator.find_longest_cache_hit() │ │
│ │ │ → 查询 CPU cache map: │ │
│ │ │ cached_block_hash_to_block[0xABCD1234] = ? │ │
│ │ │ → ✅ 命中! CPU block 1 │ │
│ │ │ → 返回: │ │
│ │ │ cpu_hit_blocks = [1] │ │
│ │ │ hit_length = 3 tokens ("Hello, world!" 实际长度) │ │
│ │ │ │ │
│ │ ├── touch CPU block 1 (防止 LRU 驱逐): │ │
│ │ │ cpu_block_pool.touch([1]) │ │
│ │ │ │ │
│ │ ├── 记录 pending hit: │ │
│ │ │ _pending_cpu_hits[req_B] = ( │ │
│ │ │ cpu_block_ids=[1], │ │
│ │ │ hit_length=3, │ │
│ │ │ ) │ │
│ │ │ │ │
│ │ └── return (3, True) ← 可从 CPU 加载 3 tokens │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 3: 分配 GPU blocks │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ kv_cache_manager.allocate_slots(req_B, num_external=3) │ │
│ │ → 总共需要: 7 tokens │ │
│ │ → 从 CPU 加载: 3 tokens (不需要 GPU 计算) │ │
│ │ → 需要 GPU 计算: 7 - 3 = 4 tokens │ │
│ │ → 需要 blocks: ceil(7 / 16) = 1 │ │
│ │ → 分配 GPU block 102 (用于加载前缀 + 新计算) │ │
│ │ │ │
│ │ 注意: 不能复用 block 101 的原因: │ │
│ │ - 101 可能还在被请求 A 引用 (如果 A 还在生成) │ │
│ │ - 每个请求必须有自己专属的 blocks (物理隔离) │ │
│ │ - 即使内容相同,也不能共享同一个物理 block │ │
│ │ │ │
│ │ → 返回: blocks = KVCacheBlocks(block_ids=[[102]]) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 4: 通知 connector 分配结果 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.update_state_after_alloc(req_B, blocks, 3) │ │
│ │ → SimpleCPUOffloadScheduler.update_state_after_alloc() │ │
│ │ │ │ │
│ │ ├── num_external_tokens = 3 > 0 │ │
│ │ │ │ │
│ │ ├── 检查 pending CPU hit: │ │
│ │ │ _pending_cpu_hits[req_B] = ([1], 3) ✅ 存在 │ │
│ │ │ │ │
│ │ ├── 构建 GPU↔CPU block 映射: │ │
│ │ │ pending = _pending_cpu_hits.pop(req_B) │ │
│ │ │ cpu_block_ids = [1] ← 从 CPU block 1 读取 │ │
│ │ │ gpu_block_ids = [102] ← 加载到 GPU block 102 │ │
│ │ │ │ │
│ │ ├── 创建传输状态: │ │
│ │ │ _reqs_to_load[req_B] = LoadRequestState( │ │
│ │ │ request=req_B, │ │
│ │ │ transfer_meta=TransferMeta( │ │
│ │ │ gpu_block_ids=[102], ← 目标 GPU block │ │
│ │ │ cpu_block_ids=[1], ← 源 CPU block │ │
│ │ │ ), │ │
│ │ │ hit_length=3, │ │
│ │ │ ) │ │
│ │ │ │ │
│ │ └── eager 模式: 注册 req_B 到 _reqs_to_store │ │
│ │ → 标记: block 102 计算完成后需要 offload │ │
│ │ → _req_to_store_states[req_B] = StoreRequestState │ │
│ │ (block_ids=[102], pending_blocks={102}) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 5: 构建传输元数据 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ connector.build_connector_meta(scheduler_output) │ │
│ │ → SimpleCPUOffloadScheduler.build_connector_meta() │ │
│ │ │ │ │
│ │ ├── 收集 pending loads: │ │
│ │ │ 遍历 _reqs_to_load: │ │
│ │ │ → req_B: GPU[102] ← CPU[1] │ │
│ │ │ → 分配 load_event = 0 │ │
│ │ │ → load_event_to_req_ids[0] = [req_B] │ │
│ │ │ │ │
│ │ ├── prepare_store_specs(): │ │
│ │ │ → eager 模式: 扫描 _req_to_store_states │ │
│ │ │ → 发现 req_B 的 blocks: [102] │ │
│ │ │ → 分配 CPU block 2 │ │
│ │ │ → _per_req_store_states[req_B] = ( │ │
│ │ │ gpu_block_ids=[102], │ │
│ │ │ cpu_block_ids=[2], │ │
│ │ │ ) │ │
│ │ │ → store_event = 1 │ │
│ │ │ │ │
│ │ └── return SimpleCPUOffloadMetadata: │ │
│ │ load_event = 0 │ │
│ │ load_gpu_blocks = [102] ← 目标 │ │
│ │ load_cpu_blocks = [1] ← 源 │ │
│ │ store_event = 1 │ │
│ │ store_gpu_blocks = [102] ← 源 │ │
│ │ store_cpu_blocks = [2] ← 目标 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Step 6: 发送 SchedulerOutput → Worker │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ scheduler_output.kv_connector_metadata = │ │
│ │ SimpleCPUOffloadMetadata( │ │
│ │ load: CPU[1] → GPU[102], │ │
│ │ store: GPU[102] → CPU[2], │ │
│ │ ) │ │
│ │ │ │
│ │ → 通过进程内函数调用传递给 Worker │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─ Worker.execute_model(scheduler_output) ────────────────────────┐
│ │
│ Step 1: 绑定传输元数据 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ connector.bind_connector_metadata(metadata) │ │
│ │ → SimpleCPUOffloadWorker.bind_connector_metadata() │ │
│ │ │ │ │
│ │ ├── self.pending_metadata = metadata │ │
│ │ ├── 解析 load_event=0: CPU[1] → GPU[102] │ │
│ │ └── 解析 store_event=1: GPU[102] → CPU[2] │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 2: 执行 CPU → GPU 加载 (DMA) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ connector.get_finished(finished_req_ids) │ │
│ │ → SimpleCPUOffloadWorker.get_finished() │ │
│ │ │ │ │
│ │ ├── _process_load_event(event_idx=0): │ │
│ │ │ │ │ │
│ │ │ ├── 获取 CPU pinned memory: │ │
│ │ │ │ src = cpu_pinned_memory[block=1] ← 4MB │ │
│ │ │ │ │ │
│ │ │ ├── 获取 GPU KV cache tensor: │ │
│ │ │ │ dst = kv_caches[block=102] ← 4MB │ │
│ │ │ │ │ │
│ │ │ ├── 启动异步 DMA 拷贝: │ │
│ │ │ │ for layer in range(num_layers): │ │
│ │ │ │ cudaMemcpyAsync( │ │
│ │ │ │ dst=gpu_kv_cache[layer][block=102], │ │
│ │ │ │ src=cpu_pinned[layer][block=1], │ │
│ │ │ │ size=128KB, │ │
│ │ │ │ stream=copy_stream, │ │
│ │ │ │ ) │ │
│ │ │ │ │ │
│ │ │ └── 记录加载完成: │ │
│ │ │ _finished_recving_req_ids.add(req_B) │ │
│ │ │ │ │
│ │ └── 现在 GPU[102] 包含 "Hello, world!" 的 KV 数据 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 3: 模型推理 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ model_runner.forward() │ │
│ │ → 输入: "How are you?" (4 tokens) │ │
│ │ (跳过已加载的 "Hello, world!" 3 tokens) │ │
│ │ │ │
│ │ → 读取已有的 KV: │ │
│ │ kv_caches[block=102] ← 前 3 tokens 的 K/V │ │
│ │ │ │
│ │ → 计算新 tokens: │ │
│ │ for layer in model.layers: │ │
│ │ K, V = attention.forward( │ │
│ │ input_ids="How are you?", │ │
│ │ start_slot=3, ← 从第 4 个位置开始 │ │
│ │ ) │ │
│ │ kv_caches[layer][block=102][slots 3:6] = K, V │ │
│ │ │ │
│ │ → GPU[102] 最终内容: │ │
│ │ slots 0-2: "Hello, world!" (从 CPU 加载) │ │
│ │ slots 3-6: "How are you?" (新计算) │ │
│ │ │ │
│ │ → 节省了 3 tokens 的计算! │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 4: 执行异步 DMA 传输 (GPU → CPU) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ connector.get_finished() (再次调用, 处理 store) │ │
│ │ → _process_store_event(event_idx=1): │ │
│ │ │ │ │
│ │ ├── 获取 GPU KV cache tensor: │ │
│ │ │ src = kv_caches[block=102] ← 4MB │ │
│ │ │ (现在包含完整的 "Hello, world! How are you?") │ │
│ │ │ │ │
│ │ ├── 获取 CPU pinned memory: │ │
│ │ │ dst = cpu_pinned_memory[block=2] ← 4MB │ │
│ │ │ │ │
│ │ ├── 启动异步 DMA 拷贝: │ │
│ │ │ cudaMemcpyAsync(dst=cpu[2], src=gpu[102], ...) │ │
│ │ │ │ │
│ │ └── return KVConnectorOutput: │ │
│ │ finished_recving = {req_B} ← load 完成 │ │
│ │ completed_store_events = {1: { │ │
│ │ gpu_block_ids: [102], │ │
│ │ cpu_block_ids: [2], │ │
│ │ }} │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Step 5: 返回结果 → Scheduler │
└──────────────────────────────────────────────────────────────────┘
┌─ Scheduler 接收 KVConnectorOutput ───────────────────────────────┐
│ │
│ connector.update_connector_output(kv_connector_output) │
│ → SimpleCPUOffloadScheduler.update_connector_output() │
│ │ │
│ ├── 处理 load 完成 (finished_recving={req_B}): │
│ │ _cleanup_load_request(req_B): │
│ │ → del _reqs_to_load[req_B] │
│ │ → 清理 load_event=0 的映射 │
│ │ │
│ ├── 处理 store 完成 (completed_store_events={1}): │
│ │ _process_store_event(event_idx=1): │
│ │ → _process_store_completion( │
│ │ gpu_block_ids=[102], │
│ │ cpu_block_ids=[2], │
│ │ hashes=[0xEFGH5678], ← "Hello...How are you?" │
│ │ ) │
│ │ → 注册 CPU cache: │
│ │ cached_block_hash_to_block[0xEFGH5678] = CPU[2] │
│ │ → 释放传输保护 ref: │
│ │ gpu_block_pool.free_blocks([102]) │
│ │ cpu_block_pool.free_blocks([2]) │
│ │ → 清理 _req_to_store_states[req_B] │
│ │ │
│ └── 最终 CPU cache 状态: │
│ ┌─────┬────────────┬──────────────────────────────┐ │
│ │ ID │ hash │ 内容 │ │
│ ├─────┼────────────┼──────────────────────────────┤ │
│ │ 1 │ 0xABCD1234 │ "Hello, world!" │ │
│ │ 2 │ 0xEFGH5678 │ "Hello, world! How are you?" │ │
│ └─────┴────────────┴──────────────────────────────┘ │
│ 已用: 2/50 blocks │
└──────────────────────────────────────────────────────────────────┘
请求 A: "Hello, world!" (3 tokens)
│
├─ Scheduler:
│ ├── GPU prefix cache: 无命中 (A 是新的)
│ ├── CPU prefix cache: 无命中 (首次)
│ ├── kv_cache_manager.allocate_slots() → GPU block 101
│ ├── connector 记录: block 101 需 offload
│ └── build_connector_meta → store: GPU[101] → CPU[1]
│
├─ Worker:
│ ├── model.forward() → KV 写入 GPU[101] (4MB)
│ ├── DMA: GPU[101] → CPU[1] (异步, ~2ms PCIe)
│ └── 返回 completed_store_events={0}
│
└─ Scheduler:
└── 注册 CPU cache: hash("Hello...")=0xABCD → CPU[1]
─────────────────────────────────────────────────────────────────
请求 B: "Hello, world! How are you?" (7 tokens)
│
├─ Scheduler:
│ ├── GPU prefix cache: 无命中 (B 是新的)
│ ├── CPU prefix cache: 命中! hash=0xABCD → CPU[1]
│ ├── kv_cache_manager.allocate_slots() → GPU block 102
│ │ (不能复用 101,因为不同请求需要物理隔离)
│ ├── connector 建立映射: CPU[1] → GPU[102]
│ └── build_connector_meta
│ → load: CPU[1] → GPU[102]
│ → store: GPU[102] → CPU[2]
│
├─ Worker:
│ ├── DMA: CPU[1] → GPU[102] (加载 "Hello, world!", 4MB)
│ ├── model.forward() → 只计算 "How are you?" (节省 3 tokens)
│ │ └─ GPU[102] 现在包含完整内容
│ ├── DMA: GPU[102] → CPU[2] (offload 完整内容, 4MB)
│ └── 返回 finished_recving={req_B}, completed_store_events={1}
│
└─ Scheduler:
└── 注册 CPU cache: hash("Hello...How...")=0xEFGH → CPU[2]
└── 清理 _reqs_to_load[req_B], _req_to_store_states[req_B]
─────────────────────────────────────────────────────────────────
最终状态:
┌─ GPU KV Cache ────────────────────────────────┐
│ block 102: "Hello, world! How are you?" │ ← B 的活跃数据
│ free_blocks: 98/100 │
└───────────────────────────────────────────────┘
┌─ CPU KV Cache ────────────────────────────────┐
│ block 1: "Hello, world!" (hash=ABCD) │ ← 可被复用
│ block 2: "Hello, world! How are you?" (EFGH) │ ← 可被复用
│ free_blocks: 48/50 │
└───────────────────────────────────────────────┘
性能收益:
- 请求 B 节省了 3 tokens 的 forward 计算
- 32 layers × 3 tokens 的 attention 计算被跳过
- 代价: 2 次 DMA 传输 (load + store, 各 ~2ms)