核心概念
| 概念 | 说明 |
|---|---|
thread_id | 一次对话/执行的唯一标识 |
checkpoint_id | 图执行到某一步时的状态快照 ID,在 checkpoints 表内递增 |
checkpoint_ns | 命名空间。'' = 根图(root graph),非空 = 子图(subgraph) |
channel | State 中的一个字段名(如 messages、files),每个 channel 独立版本化 |
version | blob 的唯一标识,格式为 {032d计数器}.{016f随机数},per-channel 单调递增 |
channel_versions | checkpoint 内部的 JSON 元数据,记录该 checkpoint 时刻每个 channel 对应的 version |
概览
LangGraph 的 State 在内存与数据库之间流转,形成完整的生命周期。核心逻辑链条:
① 写入 → ② 更新 → ③ 回滚 → ④ 读取
三张核心表
| 表名 | 职责 | 粒度 | 写入时机 |
|---|---|---|---|
checkpoints | 元数据索引(快照链条) | 每个 checkpoint 1 条 | checkpoint 完成时 |
checkpoint_writes | 写入操作历史(每个 node 的部分更新) | 每个 task 1+ 条 | 每个 node 执行后立即写入 |
checkpoint_blobs | 最终完整状态(合并后的序列化数据) | 每个 channel 1 条 | checkpoint 完成时 |
三表通过 thread_id + checkpoint_ns_hash 关联。
两表关联方式(checkpoints ↔ checkpoint_blobs)
checkpoints 和 blobs 之间没有外键,通过 checkpoint 内部的 channel_versions JSON 间接关联:
checkpoint 记录:
{
"channel_versions": {
"messages": "00000000000000000000000000000003.0384719283746192",
"files": "00000000000000000000000000000002.0886748088723920",
"__start__":"00000000000000000000000000000001.0867480887239203"
}
}
│
│ 按 (channel, version) 精确查找
▼
blob 表: (thread_id, channel="messages", version="...003.xxx") → blob_data
(thread_id, channel="files", version="...002.xxx") → blob_data
LangGraph 官方加载状态的 SQL:
SELECT ... FROM checkpoints
INNER JOIN checkpoint_blobs bl
ON bl.thread_id = checkpoints.thread_id
AND bl.checkpoint_ns = checkpoints.checkpoint_ns
AND bl.channel = jsonb_each_text.key -- channel 名
AND bl.version = jsonb_each_text.value -- 从 channel_versions 取这种设计的优势:
- 存储去重:同一个 blob 可被多个 checkpoint 引用(类似 Git)
- 写入效率:只写实际变化的 channel,未变的复用已有 blob
- 分支共享:分叉后未变的 channel 自然共享同一份 blob
三张表的关系
┌─────────────────────────────────────────────┐
│ LangGraph 执行流程 │
└─────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
↓ ↓ ↓
Node A 写入 Node B 写入 Node C 写入
{"user_context": {"messages": {"messages":
{...}} [msg1]} [msg2]}
│ │ │
↓ ↓ ↓
┌─────────────────────────────────────────────┐
│ checkpoint_writes 表 │
│ 记录每个 task 的写入操作(细粒度) │
└─────────────────────────────────────────────┘
│
│ 合并所有 task 的写入
↓
┌─────────────────────────────────────────────┐
│ checkpoint_blobs 表 │
│ 存储最终合并后的完整状态(粗粒度) │
└─────────────────────────────────────────────┘
│
│ 元数据索引
↓
┌─────────────────────────────────────────────┐
│ checkpoints 表 │
│ 存储 checkpoint 元数据(链条/分支) │
└─────────────────────────────────────────────┘
完整数据流图
┌───────────────────────────────────────────────────────┐
│ 内存中的 State │
│ { │
│ messages: [msg1, msg2], │
│ files: {"file_001": {...}}, │
│ user_context: {"age": 30, ...} │
│ } │
└───────────────────────────────────────────────────────┘
│ ① 写入 (node 级) ④ 读取 │
│ ② 更新 (新 checkpoint) │
↓ ↑
┌───────────────────────────────────────────────────────┐
│ 数据库存储 │
│ │
│ checkpoint_writes ──合并──→ checkpoint_blobs │
│ (每个 node 的写入) (最终完整状态) │
│ │ │
│ 元数据索引 │
│ ↓ │
│ checkpoints │
│ (链条 / 分支) │
│ │
│ ③ 回滚 = 新建分支 │
└───────────────────────────────────────────────────────┘
version 字段详解
version 生成逻辑
# LangGraph 源码 get_next_version()
def get_next_version(self, current, channel):
current_v = 0 if current is None else int(current.split(".")[0])
next_v = current_v + 1
next_h = random.random()
return f"{next_v:032d}.{next_h:016f}"version 格式解读
00000000000000000000000000000003.0384719283746192
|_______________________________| |_______________|
per-channel 递增计数器 随机数(防冲突)
- 计数器:该 channel 第 N 次被写入(不是 checkpoint 序号)
- 随机后缀:防止分支/重跑时 version 碰撞
- 零填充 32 位:保证字典序 = 时间序,
ORDER BY version有效
易混淆点
version 前缀不是 checkpoint_id。messages channel 的第 2 次写入可能发生在 checkpoint 3,也可能在 checkpoint 5,取决于哪些节点修改了 messages。
① 写入:State → Database
触发时机
- Agent 执行完一个 node
- 用户发送新消息
- 调用 tool 完成后
实际场景
假设 Agent 执行一个包含 3 个 node 的流程:
graph = StateGraph(AgentState)
graph.add_node("retrieve_context", retrieve_node) # task_id = "retrieve_context"
graph.add_node("search_products", search_node) # task_id = "search_products"
graph.add_node("generate_response", generate_node) # task_id = "generate_response"用户输入 "推荐医疗险",执行过程:
→ Node 1: retrieve_context
写入: {"user_context": {"age": 30, ...}}
→ Node 2: search_products
写入: {"messages": [AIMessage(...), ToolMessage(...)]}
→ Node 3: generate_response
写入: {"messages": [AIMessage("推荐您...")]}
→ Checkpoint 完成
合并所有写入 → checkpoint_blobs
步骤 1:每个 node 执行后 → 写入 checkpoint_writes 表
每个 node 执行完毕立即写入,记录该 task 对 State 的部分更新:
-- Node 1: retrieve_context 的写入
INSERT INTO checkpoint_writes VALUES (
'thread_001', 'ckpt_005', 'd41d8cd...', 'retrieve_context', 0, 'user_context',
'msgpack', <binary: {"age": 30, ...}>, '()', NOW()
);
-- Node 2: search_products 的写入
INSERT INTO checkpoint_writes VALUES (
'thread_001', 'ckpt_005', 'd41d8cd...', 'search_products', 0, 'messages',
'msgpack', <binary: [AIMessage, ToolMessage]>, '()', NOW()
);
-- Node 3: generate_response 的写入
INSERT INTO checkpoint_writes VALUES (
'thread_001', 'ckpt_005', 'd41d8cd...', 'generate_response', 0, 'messages',
'msgpack', <binary: [AIMessage("推荐您...")]>, '()', NOW()
);注意:Node 2 和 Node 3 都写入了
messageschannel,后续会被合并。
步骤 2:所有 node 完成后 → 合并写入 checkpoint_blobs 表
将 checkpoint_writes 中同一 channel 的多条记录合并,写入最终状态:
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
serializer = JsonPlusSerializer()
for field_name, field_value in merged_state.items():
serialized_data = serializer.dumps_typed(field_value)
# serialized_data = ('msgpack', binary_data)
INSERT INTO checkpoint_blobs VALUES (
thread_id = 'thread_001',
checkpoint_ns_hash = 'd41d8cd98f00b204e9800998ecf8427e',
channel = field_name,
version = 0,
type = 'msgpack',
blob_data = binary_data
)步骤 3:写入 checkpoints 表(元数据)
INSERT INTO checkpoints (
thread_id, checkpoint_id, parent_checkpoint_id,
checkpoint_ns, checkpoint_ns_hash, gmt_create, gmt_modified
) VALUES (
'thread_001', -- 会话ID
'ckpt_005', -- 新生成的快照ID
'ckpt_004', -- 指向上一个 checkpoint(构建链条)
'', -- 命名空间(主图用空字符串)
'd41d8cd98f00b204e9800998ecf8427e', -- MD5('')
'2026-04-06 10:00:00',
'2026-04-06 10:00:00'
);写入结果
checkpoint_writes 表 — 3 条(每个 task 一条):
| thread_id | checkpoint_id | task_id | idx | channel | blob_data |
|---|---|---|---|---|---|
| thread_001 | ckpt_005 | retrieve_context | 0 | user_context | <binary> |
| thread_001 | ckpt_005 | search_products | 0 | messages | <binary> |
| thread_001 | ckpt_005 | generate_response | 0 | messages | <binary> |
checkpoint_blobs 表 — 2 条(合并后只剩 2 个 channel):
| thread_id | checkpoint_ns_hash | channel | version | blob_data |
|---|---|---|---|---|
| thread_001 | d41d8cd… | user_context | 0 | <binary> |
| thread_001 | d41d8cd… | messages | 0 | <binary: 合并后的完整列表> |
checkpoints 表 — 1 条:
| thread_id | checkpoint_id | parent_checkpoint_id | checkpoint_ns_hash | gmt_modified |
|---|---|---|---|---|
| thread_001 | ckpt_005 | ckpt_004 | d41d8cd… | 2026-04-06 10:00:00 |
② 更新:创建新 Checkpoint
场景:用户继续对话,State 发生变化。
# 旧 State (ckpt_005)
old_state = {
"messages": [msg1, msg2],
"files": {"file_001": {...}},
"user_context": {"age": 30, ...}
}
# 用户发送新消息,Agent 处理后——
# 新 State (ckpt_006)
new_state = {
"messages": [msg1, msg2, msg3, msg4], # ⬅️ 新增了 2 条消息
"files": {"file_001": {...}}, # ⬅️ 未变化
"user_context": {"age": 30, ...} # ⬅️ 未变化
}写入流程
与 ① 相同,只是创建了一个新的 checkpoint:
- checkpoint_writes:记录本次执行中每个 node 的写入
- checkpoint_blobs:合并后的最终状态
存储策略
方式 A:完整存储(简单但浪费空间)
INSERT INTO checkpoints VALUES (
'thread_001', 'ckpt_006', 'ckpt_005', '', <新hash>, NOW(), NOW()
);
-- 存储所有 channel(即使未变化也重复存储)
INSERT INTO checkpoint_blobs VALUES
('thread_001', <hash>, 'messages', 0, 'msgpack', <新的messages>),
('thread_001', <hash>, 'files', 0, 'msgpack', <files重复存储>),
('thread_001', <hash>, 'user_context', 0, 'msgpack', <user_context重复存储>);方式 B:增量存储(优化,读取时从 parent 继承未变 channel)
-- 只存储变化的 channel
INSERT INTO checkpoint_blobs VALUES
('thread_001', <hash>, 'messages', 0, 'msgpack', <只存新增的 msg3, msg4>);
-- files 和 user_context 未变化,不存储(读取时从 parent checkpoint 继承)| 对比 | checkpoints 表 | checkpoint_blobs 表 |
|---|---|---|
| 方式 A | 1 条 | N 条(所有 channel) |
| 方式 B | 1 条 | 仅变化的 channel |
无论哪种方式,
checkpoint_writes始终记录每个 node 的实际写入操作。
③ 回滚:从 ckpt_006 回到 ckpt_005
核心原则
- ❌ 不删除旧 checkpoint
- ✅ 创建新 checkpoint(分支),
parent指向回滚目标
用户操作
graph.update_state(
thread_id="thread_001",
checkpoint_id="ckpt_005" # 指定回滚目标
)数据库操作
-- 1. 创建新 checkpoint,parent 指向回滚目标 ckpt_005
INSERT INTO checkpoints VALUES (
'thread_001', 'ckpt_007', 'ckpt_005', '', <新hash>, NOW(), NOW()
);
-- 2. 复制 ckpt_005 的所有 blob 数据到 ckpt_007
INSERT INTO checkpoint_blobs
SELECT
thread_id,
<ckpt_007的hash>, -- 新的 checkpoint_ns_hash
channel,
version,
type,
blob_data -- 直接复制旧数据
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
AND checkpoint_ns_hash = <ckpt_005的hash>;结果:形成分支树
ckpt_004 → ckpt_005 ┬→ ckpt_006 (旧分支,保留不删)
└→ ckpt_007 (新分支,从此继续)
对应的 checkpoints 表:
| checkpoint_id | parent_checkpoint_id | gmt_modified |
|---|---|---|
| ckpt_004 | ckpt_003 | 10:00:00 |
| ckpt_005 | ckpt_004 | 10:00:30 ← 回滚目标 |
| ckpt_006 | ckpt_005 | 10:01:00 ← 旧分支(保留) |
| ckpt_007 | ckpt_005 | 10:02:00 ← 新分支 |
④ 读取:Database → State
场景:加载某个 checkpoint(如 ckpt_007)到内存。
常规读取(通过 checkpoint_blobs)
步骤 1:查询 checkpoint 元数据
SELECT checkpoint_id, checkpoint_ns_hash, parent_checkpoint_id
FROM checkpoints
WHERE thread_id = 'thread_001'
AND checkpoint_id = 'ckpt_007'
AND checkpoint_ns = '';步骤 2:查询所有 blob 数据
SELECT channel, version, type, blob_data
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
AND checkpoint_ns_hash = 'abc123...' -- ckpt_007 的 hash
ORDER BY channel, version;步骤 3:反序列化并合并
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
serializer = JsonPlusSerializer()
state = {}
for row in blob_rows:
channel = row['channel']
value = serializer.loads_typed((row['type'], row['blob_data']))
# 同一 channel 有多个 version 时需要合并
if channel in state:
state[channel] = merge(state[channel], value)
else:
state[channel] = value步骤 4:多 version 合并逻辑
当同一 channel 存在多个 version 时,按类型合并:
# dict → 合并
state["files"] = {**files_v0, **files_v1}
# list → 拼接
state["messages"] = messages_v0 + messages_v1最终恢复的 State
state = {
"messages": [msg1, msg2], # 从 blob 反序列化
"files": {"file_001": {...}}, # 从 blob 反序列化
"user_context": {"age": 30, ...} # 从 blob 反序列化
}调试/审计读取(通过 checkpoint_writes)
当需要分析执行过程时,查询 checkpoint_writes:
SELECT
task_id, idx, channel, task_path,
LENGTH(blob_data) as data_size, gmt_create
FROM checkpoint_writes
WHERE thread_id = 'thread_001'
AND checkpoint_id = 'ckpt_005'
AND checkpoint_ns = ''
ORDER BY gmt_create, idx;输出示例:
| task_id | idx | channel | task_path | data_size | gmt_create |
|---|---|---|---|---|---|
| retrieve_context | 0 | user_context | () | 512 | 10:00:00.123 |
| search_products | 0 | messages | () | 2048 | 10:00:01.456 |
| generate_response | 0 | messages | () | 1024 | 10:00:02.789 |
可以清晰看到:执行顺序、每个 node 写了什么、数据量多大。
查询实践:取最新 checkpoint 的特定 channel
场景
给定一批 thread_id,取每个 thread 根图最新 checkpoint 中 files channel 的值。
关键点
由于 version 是 per-channel 单调递增的,最新 checkpoint 引用的 files version 一定是该 thread 中 files channel 的最大 version。因此无需查 checkpoints 表,直接在 blob 表中取 MAX(version) 即可。
SQL(MaxCompute / ODPS)
SELECT thread_id, version, blob_data
FROM (
SELECT thread_id, version, blob_data,
ROW_NUMBER() OVER (PARTITION BY thread_id ORDER BY version DESC) AS rn
FROM test_ods.ods_checkpoint_blobs_db_delta
WHERE thread_id IN ({thread_id_list})
AND checkpoint_ns = ''
AND channel = 'files'
AND dt >= '{dt_start}'
AND dt <= '{dt_end}'
) t
WHERE rn = 1
ORDER BY thread_id;条件说明
| 条件 | 作用 |
|---|---|
checkpoint_ns = '' | 只取根图,排除子图 |
channel = 'files' | 只取 files channel |
ROW_NUMBER() ... ORDER BY version DESC | 每个 thread 取最新 version |
dt >= / dt <= | blob 增量表的分区过滤 |
为什么这样设计?
| 方案 | blob 行数 | 说明 |
|---|---|---|
| 以 checkpoint_id 为外键(每步存全量) | 5 channels × 4 steps = 20 行 | |
| 以 version 寻址(只存变化的) | 8 行 | ✅ channel 越多、步骤越长,节省越显著 |
存储设计原理
1. 存储去重(Git-like)
类比 Git:同一个 blob 可被多个 commit 引用,同一个 channel blob 也可被多个 checkpoint 引用。
2. 写入效率
blob 不可变(ON CONFLICT DO NOTHING)。每步只写实际变化的 channel,未变的 channel 复用已有 blob。
3. 分支共享
LangGraph 支持从历史 checkpoint 分叉执行。分叉后未变的 channel 自然共享同一份 blob,无需复制。
checkpoint_writes 的进阶用途
1. 故障恢复:从 writes 重建 blobs
如果 checkpoint_blobs 写入失败,可以从 checkpoint_writes 重放:
writes = get_checkpoint_writes(thread_id, checkpoint_id)
state = {}
for write in writes:
channel = write['channel']
data = deserialize(write['blob_data'])
if channel in state:
state[channel] = merge(state[channel], data)
else:
state[channel] = data
# 重新创建 checkpoint_blobs
save_checkpoint_blobs(thread_id, checkpoint_id, state)2. Agent 行为分析
SELECT
task_id,
COUNT(*) as write_count,
SUM(LENGTH(blob_data)) / 1024 / 1024 as total_mb,
AVG(LENGTH(blob_data)) as avg_bytes
FROM checkpoint_writes
WHERE thread_id LIKE 'thread_%'
AND gmt_create >= '2026-04-01'
AND checkpoint_ns = ''
GROUP BY task_id
ORDER BY write_count DESC;3. 对比 writes vs blobs 数据量
-- writes 总量(所有 task 的写入)
SELECT SUM(LENGTH(blob_data)) / 1024 as writes_kb
FROM checkpoint_writes
WHERE thread_id = 'thread_001' AND checkpoint_id = 'ckpt_005';
-- blobs 总量(合并后的最终状态)
SELECT SUM(LENGTH(blob_data)) / 1024 as blobs_kb
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
AND checkpoint_ns_hash = (
SELECT checkpoint_ns_hash FROM checkpoints WHERE checkpoint_id = 'ckpt_005'
);如果 writes_kb > blobs_kb:说明有多个 task 写同一个 channel(合并后体积缩小)。
task_id 与 task_path
# task_id = node 名称
graph.add_node("retrieve_context", retrieve_func)
# → task_id = "retrieve_context"
# task_path 用于嵌套子图
task_path = "()" # 主图中的 node
task_path = "('subgraph_name',)" # 子图中的 node
task_path = "('subgraph_name', 'nested',)" # 嵌套子图速查总结
四阶段操作对照
| 操作 | checkpoint_writes | checkpoint_blobs | checkpoints | State 内存 |
|---|---|---|---|---|
| ① 写入 | INSERT(每 node 一条) | INSERT(每 channel 一条) | INSERT 1 条 | 序列化 |
| ② 更新 | INSERT(新的 node 写入) | INSERT(完整或增量) | INSERT 1 条 | 新 State |
| ③ 回滚 | — | 复制旧 blob | INSERT 1 条(parent → 目标) | — |
| ④ 读取 | SELECT(调试/审计) | SELECT(常规读取) | SELECT(查 hash) | 反序列化 + 合并 |
三张表速查
checkpoints 表
thread_id ← 会话ID
checkpoint_id ← 快照ID(唯一标识)
parent_checkpoint_id ← 父快照ID(构建链条/分支)
checkpoint_ns ← 命名空间(主图='',子图='task_name')
checkpoint_ns_hash ← MD5(checkpoint_ns),关联其他两表 ⭐
gmt_create / gmt_modified ← 时间戳
checkpoint_writes 表
thread_id ← 会话ID
checkpoint_id ← 所属 checkpoint
checkpoint_ns_hash ← 关联 checkpoints 表 ⭐
task_id ← node 名称(如 "retrieve_context")
idx ← 同一 task 内的写入序号
channel ← State 字段名(messages / files / ...)
type ← 序列化类型(msgpack)
blob_data ← 序列化后的部分更新数据
task_path ← 子图路径(主图="()")
checkpoint_blobs 表
thread_id ← 会话ID
checkpoint_ns_hash ← 关联 checkpoints 表 ⭐
channel ← State 字段名(messages / files / user_context)
version ← 同一 checkpoint、同一 channel 的版本号
type ← 序列化类型(msgpack / json / ...)
blob_data ← 序列化后的二进制数据(合并后的完整状态)
序列化与反序列化
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
serializer = JsonPlusSerializer()
# 写入时:State 字段 → blob_data
serialized = serializer.dumps_typed(state_field)
# → ('msgpack', b'\x82\xa3age\x1e...')
# 读取时:blob_data → State 字段
state_field = serializer.loads_typed(('msgpack', binary_data))
# → {"age": 30, "name": "张三"}
完整字段级数据流图示例
以保险咨询会话为例,展示三张表的实际字段数据流转。
场景设定
thread_id = "thread_abc123" (用户咨询医疗险的完整会话)
会话流程:
checkpoint_1 (用户说"你好") → checkpoint_2 (Agent回复) → checkpoint_3 (用户说"推荐医疗险") → checkpoint_4 (Agent推荐产品)
Step 1: 用户说”你好”
执行节点:handle_greeting → generate_response
checkpoints 表
| thread_id | checkpoint_id | parent_checkpoint_id | checkpoint_ns | checkpoint_ns_hash | gmt_create |
|---|---|---|---|---|---|
| thread_abc123 | ckpt_001 | NULL | d41d8cd98f00b204e9800998ecf8427e | 10:00:00 | |
| thread_abc123 | ckpt_002 | ckpt_001 | d41d8cd98f00b204e9800998ecf8427e | 10:00:02 |
checkpoint_writes 表(ckpt_002 的写入)
| thread_id | checkpoint_id | checkpoint_ns_hash | task_id | idx | channel | type | blob_data (反序列化后) |
|---|---|---|---|---|---|---|---|
| thread_abc123 | ckpt_002 | d41d8cd… | handle_greeting | 0 | messages | msgpack | [{"role":"human","content":"你好"}] |
| thread_abc123 | ckpt_002 | d41d8cd… | generate_response | 0 | messages | msgpack | [{"role":"ai","content":"您好!我是您的保险顾问"}] |
checkpoint_blobs 表(ckpt_002 最终状态)
| thread_id | checkpoint_ns_hash | channel | version | type | blob_data (反序列化后) |
|---|---|---|---|---|---|
| thread_abc123 | d41d8cd… | messages | 0 | msgpack | [HumanMessage("你好"), AIMessage("您好!我是您的保险顾问")] |
| thread_abc123 | d41d8cd… | user_context | 0 | msgpack | {"session_start":"2026-04-07"} |
Step 2: 用户说”推荐医疗险”
执行节点:retrieve_context → search_products → generate_response
业务逻辑:
retrieve_context: 解析用户意图(医疗险)、提取年龄信息search_products: 搜索数据库中的医疗险产品generate_response: 生成产品推荐回复
checkpoints 表新增记录
| thread_id | checkpoint_id | parent_checkpoint_id | checkpoint_ns_hash | gmt_create |
|---|---|---|---|---|
| thread_abc123 | ckpt_003 | ckpt_002 | d41d8cd… | 10:00:30 |
checkpoint_writes 表(ckpt_003 的写入 - 3 个 node 执行)
| thread_id | checkpoint_id | task_id | idx | channel | blob_data (反序列化后) |
|---|---|---|---|---|---|
| thread_abc123 | ckpt_003 | retrieve_context | 0 | user_context | {"intent":"medical_insurance","age":28,"budget":5000} |
| thread_abc123 | ckpt_003 | retrieve_context | 1 | extracted_entities | ["医疗险","28岁"] |
| thread_abc123 | ckpt_003 | search_products | 0 | search_results | [{"product_id":"M001","name":"百万医疗","price":399}] |
| thread_abc123 | ckpt_003 | search_products | 1 | tool_calls | [{"name":"search_db","args":{"category":"medical"}}] |
| thread_abc123 | ckpt_003 | generate_response | 0 | messages | [AIMessage("为您推荐百万医疗险...")] |
注意:
retrieve_context写入了 2 条(idx 0 和 1),search_products也写了 2 条。
checkpoint_blobs 表(ckpt_003 合并后状态)
| thread_id | checkpoint_ns_hash | channel | version | blob_data (反序列化后) |
|---|---|---|---|---|
| thread_abc123 | d41d8cd… | messages | 0 | 合并后的 3 条消息列表 |
| thread_abc123 | d41d8cd… | user_context | 0 | {"intent":"medical_insurance","age":28,"budget":5000} |
| thread_abc123 | d41d8cd… | search_results | 0 | [{"product_id":"M001","name":"百万医疗"}] |
Step 3: 数据流图可视化
flowchart TD thread["thread_abc123"] thread --> ckpt001["ckpt_001<br/>初始空状态"] ckpt001 --> writes001["writes<br/>无"] writes001 --> blobs001["blobs<br/>空"] blobs001 --> ckpt002["ckpt_002<br/>问候完成"] ckpt002 --> writes002["checkpoint_writes"] writes002 --> greeting["handle_greeting<br/>messages: 你好"] writes002 --> response002["generate_response<br/>messages: 您好!..."] greeting --> merge002["按 channel 合并"] response002 --> merge002 merge002 --> blobs002["checkpoint_blobs<br/>messages: 合并后 2 条消息<br/>user_context: 基础会话信息"] blobs002 --> ckpt003["ckpt_003<br/>推荐请求处理中"] ckpt003 --> writes003["checkpoint_writes"] writes003 --> context["retrieve_context<br/>user_context: 意图 + 年龄"] writes003 --> entities["retrieve_context<br/>extracted_entities: 实体列表"] writes003 --> products["search_products<br/>search_results: 产品数据"] writes003 --> toolCalls["search_products<br/>tool_calls: 工具调用记录"] writes003 --> response003["generate_response<br/>messages: 推荐回复"] context --> merge003["按 channel 合并"] entities --> merge003 products --> merge003 toolCalls --> merge003 response003 --> merge003 merge003 --> blobs003["checkpoint_blobs<br/>messages: 合并后 4 条消息<br/>user_context: 更新后的用户信息<br/>search_results: 产品列表"] blobs003 --> ckpt004["ckpt_004<br/>最终回复完成"] ckpt004 --> writes004["checkpoint_writes"] writes004 --> finalResponse["finalize_response<br/>messages: 格式化回复"] finalResponse --> merge004["按 channel 合并"] merge004 --> blobs004["checkpoint_blobs<br/>messages: 完整对话历史<br/>user_context: 完整用户画像<br/>search_results: 推荐产品快照"]
Step 4: 从 writes 到 blobs 的合并过程详解
以 ckpt_003 为例,展示 messages channel 是如何合并的:
flowchart LR subgraph writes["checkpoint_writes 原始写入(按时间序)"] human["ckpt_002<br/>handle_greeting<br/>[HumanMessage: 你好]"] greeting["ckpt_002<br/>generate_response<br/>[AIMessage: 您好!]"] context["ckpt_003<br/>retrieve_context<br/>未写 messages"] products["ckpt_003<br/>search_products<br/>未写 messages"] recommendation["ckpt_003<br/>generate_response<br/>[AIMessage: 为您推荐...]"] end human --> reducer["messages reducer<br/>列表类型 = 拼接"] greeting --> reducer context -. 跳过 .-> reducer products -. 跳过 .-> reducer recommendation --> reducer reducer --> merged["合并结果<br/>[Human: 你好]<br/>[AI: 您好!]<br/>[AI: 为您推荐...]"] merged --> blobs["checkpoint_blobs<br/>channel: messages<br/>value: 3 条消息的完整列表"]
Step 5: 回滚场景示例
用户操作:“我刚才说错了,我不想看病险了”
# 回滚到 ckpt_002(问候完成状态,还没推荐产品)
graph.update_state(
thread_id="thread_abc123",
checkpoint_id="ckpt_002"
)回滚后的 checkpoints 表(形成分支)
| checkpoint_id | parent_checkpoint_id | 说明 |
|---|---|---|
| ckpt_001 | NULL | 初始 |
| ckpt_002 | ckpt_001 | 问候完成 ← 回滚目标 |
| ckpt_003 | ckpt_002 | 旧分支(含推荐) |
| ckpt_004 | ckpt_002 | 新分支(从此继续) |
┌→ ckpt_003 → (废弃分支,保留)
ckpt_001 → ckpt_002 ┤
└→ ckpt_004 → (新对话从此开始)
ckpt_004 的 checkpoint_blobs
从 ckpt_002 复制所有 blob 数据:
| thread_id | checkpoint_id | channel | blob_data 来源 |
|---|---|---|---|
| thread_abc123 | ckpt_004 | messages | 复制自 ckpt_002 |
| thread_abc123 | ckpt_004 | user_context | 复制自 ckpt_002 |
ckpt_003 的
search_results和推荐相关的messages不会出现在 ckpt_004 中。
Step 6: SQL 查询示例
查询某 checkpoint 的完整执行轨迹:
-- 查看 ckpt_003 中每个 node 的贡献
SELECT
task_id,
channel,
LENGTH(blob_data) as bytes,
gmt_create
FROM checkpoint_writes
WHERE thread_id = 'thread_abc123'
AND checkpoint_id = 'ckpt_003'
ORDER BY gmt_create;
-- 输出:
-- task_id | channel | bytes | gmt_create
-- retrieve_context | user_context | 256 | 10:00:30.100
-- retrieve_context | extracted_entities | 128 | 10:00:30.200
-- search_products | search_results | 2048 | 10:00:31.500
-- search_products | tool_calls | 512 | 10:00:31.600
-- generate_response | messages | 1024 | 10:00:32.000对比 writes 和 blobs 的数据量:
-- ckpt_003 的 writes 总量
SELECT SUM(LENGTH(blob_data)) as writes_total_bytes
FROM checkpoint_writes
WHERE thread_id = 'thread_abc123' AND checkpoint_id = 'ckpt_003';
-- 结果: 3968 bytes (5条写入)
-- ckpt_003 的 blobs 总量
SELECT SUM(LENGTH(blob_data)) as blobs_total_bytes
FROM checkpoint_blobs
WHERE thread_id = 'thread_abc123'
AND checkpoint_ns_hash = 'd41d8cd...';
-- 结果: 3072 bytes (3条合并后)
-- 压缩率: (3968-3072)/3968 = 22.6% 的冗余被合并消除字段级速查卡片
checkpoints 表 —— 时间线/分支索引
┌─────────────────┬─────────────────────────────────────────┐
│ thread_id │ "thread_abc123" │
│ checkpoint_id │ "ckpt_003" │
│ parent_id │ "ckpt_002" ← 指向前一个 │
│ checkpoint_ns │ "" (主图) / "subgraph" (子图) │
│ ns_hash │ "d41d8cd..." ← 关联 writes 和 blobs │
│ gmt_create │ 2026-04-07 10:00:30 │
└─────────────────┴─────────────────────────────────────────┘
checkpoint_writes 表 —— 执行轨迹(调试/审计)
┌─────────────────┬─────────────────────────────────────────┐
│ thread_id │ "thread_abc123" │
│ checkpoint_id │ "ckpt_003" │
│ task_id │ "search_products" ← node 名称 │
│ idx │ 0 / 1 / 2... ← 同一 task 的第 N 次写入 │
│ channel │ "search_results" ← State 字段名 │
│ blob_data │ <二进制: 产品列表数据> │
│ task_path │ "()" / "('subgraph',)" ← 子图路径 │
└─────────────────┴─────────────────────────────────────────┘
checkpoint_blobs 表 —— 最终状态(常规读取)
┌─────────────────┬─────────────────────────────────────────┐
│ thread_id │ "thread_abc123" │
│ ns_hash │ "d41d8cd..." ← 来自 checkpoints 表 │
│ channel │ "messages" ← State 字段名 │
│ version │ 0 / 1 / 2... ← 同一 checkpoint 的版本 │
│ blob_data │ <二进制: 合并后的完整消息列表> │
└─────────────────┴─────────────────────────────────────────┘