核心概念

概念说明
thread_id一次对话/执行的唯一标识
checkpoint_id图执行到某一步时的状态快照 ID,在 checkpoints 表内递增
checkpoint_ns命名空间。'' = 根图(root graph),非空 = 子图(subgraph)
channelState 中的一个字段名(如 messagesfiles),每个 channel 独立版本化
versionblob 的唯一标识,格式为 {032d计数器}.{016f随机数},per-channel 单调递增
channel_versionscheckpoint 内部的 JSON 元数据,记录该 checkpoint 时刻每个 channel 对应的 version

概览

LangGraph 的 State 在内存与数据库之间流转,形成完整的生命周期。核心逻辑链条:

① 写入 → ② 更新 → ③ 回滚 → ④ 读取

三张核心表

表名职责粒度写入时机
checkpoints元数据索引(快照链条)每个 checkpoint 1 条checkpoint 完成时
checkpoint_writes写入操作历史(每个 node 的部分更新)每个 task 1+ 条每个 node 执行后立即写入
checkpoint_blobs最终完整状态(合并后的序列化数据)每个 channel 1 条checkpoint 完成时

三表通过 thread_id + checkpoint_ns_hash 关联。

两表关联方式(checkpoints ↔ checkpoint_blobs)

checkpoints 和 blobs 之间没有外键,通过 checkpoint 内部的 channel_versions JSON 间接关联:

checkpoint 记录:
{
  "channel_versions": {
    "messages": "00000000000000000000000000000003.0384719283746192",
    "files": "00000000000000000000000000000002.0886748088723920",
    "__start__":"00000000000000000000000000000001.0867480887239203"
  }
}
         │
         │ 按 (channel, version) 精确查找
         ▼
blob 表: (thread_id, channel="messages", version="...003.xxx") → blob_data
        (thread_id, channel="files", version="...002.xxx") → blob_data

LangGraph 官方加载状态的 SQL:

SELECT ... FROM checkpoints
INNER JOIN checkpoint_blobs bl
  ON bl.thread_id = checkpoints.thread_id
  AND bl.checkpoint_ns = checkpoints.checkpoint_ns
  AND bl.channel = jsonb_each_text.key     -- channel 名
  AND bl.version = jsonb_each_text.value   -- 从 channel_versions 取

这种设计的优势:

  • 存储去重:同一个 blob 可被多个 checkpoint 引用(类似 Git)
  • 写入效率:只写实际变化的 channel,未变的复用已有 blob
  • 分支共享:分叉后未变的 channel 自然共享同一份 blob

三张表的关系

┌─────────────────────────────────────────────┐
│             LangGraph 执行流程                │
└─────────────────────────────────────────────┘
                      │
      ┌───────────────┼───────────────┐
      │               │               │
      ↓               ↓               ↓
 Node A 写入      Node B 写入     Node C 写入
 {"user_context":  {"messages":    {"messages":
  {...}}            [msg1]}         [msg2]}
      │               │               │
      ↓               ↓               ↓
┌─────────────────────────────────────────────┐
│  checkpoint_writes 表                       │
│  记录每个 task 的写入操作(细粒度)              │
└─────────────────────────────────────────────┘
                      │
                      │ 合并所有 task 的写入
                      ↓
┌─────────────────────────────────────────────┐
│  checkpoint_blobs 表                         │
│  存储最终合并后的完整状态(粗粒度)               │
└─────────────────────────────────────────────┘
                      │
                      │ 元数据索引
                      ↓
┌─────────────────────────────────────────────┐
│  checkpoints 表                             │
│  存储 checkpoint 元数据(链条/分支)            │
└─────────────────────────────────────────────┘

完整数据流图

┌───────────────────────────────────────────────────────┐
│                   内存中的 State                        │
│  {                                                     │
│    messages: [msg1, msg2],                             │
│    files: {"file_001": {...}},                         │
│    user_context: {"age": 30, ...}                      │
│  }                                                     │
└───────────────────────────────────────────────────────┘
       │ ① 写入 (node 级)         ④ 读取 │
       │ ② 更新 (新 checkpoint)          │
       ↓                                 ↑
┌───────────────────────────────────────────────────────┐
│                   数据库存储                            │
│                                                        │
│  checkpoint_writes  ──合并──→  checkpoint_blobs         │
│  (每个 node 的写入)           (最终完整状态)              │
│                                    │                   │
│                               元数据索引                │
│                                    ↓                   │
│                             checkpoints                │
│                           (链条 / 分支)                 │
│                                                        │
│                   ③ 回滚 = 新建分支                     │
└───────────────────────────────────────────────────────┘

version 字段详解

version 生成逻辑

# LangGraph 源码 get_next_version()
def get_next_version(self, current, channel):
    current_v = 0 if current is None else int(current.split(".")[0])
    next_v = current_v + 1
    next_h = random.random()
    return f"{next_v:032d}.{next_h:016f}"

version 格式解读

00000000000000000000000000000003.0384719283746192
|_______________________________| |_______________|
      per-channel 递增计数器         随机数(防冲突)
  • 计数器:该 channel 第 N 次被写入(不是 checkpoint 序号)
  • 随机后缀:防止分支/重跑时 version 碰撞
  • 零填充 32 位:保证字典序 = 时间序,ORDER BY version 有效

易混淆点

version 前缀不是 checkpoint_idmessages channel 的第 2 次写入可能发生在 checkpoint 3,也可能在 checkpoint 5,取决于哪些节点修改了 messages


① 写入:State → Database

触发时机

  • Agent 执行完一个 node
  • 用户发送新消息
  • 调用 tool 完成后

实际场景

假设 Agent 执行一个包含 3 个 node 的流程:

graph = StateGraph(AgentState)
graph.add_node("retrieve_context", retrieve_node)   # task_id = "retrieve_context"
graph.add_node("search_products", search_node)      # task_id = "search_products"
graph.add_node("generate_response", generate_node)  # task_id = "generate_response"

用户输入 "推荐医疗险",执行过程:

→ Node 1: retrieve_context
    写入: {"user_context": {"age": 30, ...}}

→ Node 2: search_products
    写入: {"messages": [AIMessage(...), ToolMessage(...)]}

→ Node 3: generate_response
    写入: {"messages": [AIMessage("推荐您...")]}

→ Checkpoint 完成
    合并所有写入 → checkpoint_blobs

步骤 1:每个 node 执行后 → 写入 checkpoint_writes 表

每个 node 执行完毕立即写入,记录该 task 对 State 的部分更新:

-- Node 1: retrieve_context 的写入
INSERT INTO checkpoint_writes VALUES (
    'thread_001', 'ckpt_005', 'd41d8cd...', 'retrieve_context', 0, 'user_context',
    'msgpack', <binary: {"age": 30, ...}>, '()', NOW()
);
 
-- Node 2: search_products 的写入
INSERT INTO checkpoint_writes VALUES (
    'thread_001', 'ckpt_005', 'd41d8cd...', 'search_products', 0, 'messages',
    'msgpack', <binary: [AIMessage, ToolMessage]>, '()', NOW()
);
 
-- Node 3: generate_response 的写入
INSERT INTO checkpoint_writes VALUES (
    'thread_001', 'ckpt_005', 'd41d8cd...', 'generate_response', 0, 'messages',
    'msgpack', <binary: [AIMessage("推荐您...")]>, '()', NOW()
);

注意:Node 2 和 Node 3 都写入了 messages channel,后续会被合并。

步骤 2:所有 node 完成后 → 合并写入 checkpoint_blobs 表

checkpoint_writes 中同一 channel 的多条记录合并,写入最终状态:

from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
 
serializer = JsonPlusSerializer()
 
for field_name, field_value in merged_state.items():
    serialized_data = serializer.dumps_typed(field_value)
    # serialized_data = ('msgpack', binary_data)
 
    INSERT INTO checkpoint_blobs VALUES (
        thread_id          = 'thread_001',
        checkpoint_ns_hash = 'd41d8cd98f00b204e9800998ecf8427e',
        channel            = field_name,
        version            = 0,
        type               = 'msgpack',
        blob_data          = binary_data
    )

步骤 3:写入 checkpoints 表(元数据)

INSERT INTO checkpoints (
    thread_id, checkpoint_id, parent_checkpoint_id,
    checkpoint_ns, checkpoint_ns_hash, gmt_create, gmt_modified
) VALUES (
    'thread_001',                              -- 会话ID
    'ckpt_005',                                -- 新生成的快照ID
    'ckpt_004',                                -- 指向上一个 checkpoint(构建链条)
    '',                                        -- 命名空间(主图用空字符串)
    'd41d8cd98f00b204e9800998ecf8427e',        -- MD5('')
    '2026-04-06 10:00:00',
    '2026-04-06 10:00:00'
);

写入结果

checkpoint_writes 表 — 3 条(每个 task 一条):

thread_idcheckpoint_idtask_ididxchannelblob_data
thread_001ckpt_005retrieve_context0user_context<binary>
thread_001ckpt_005search_products0messages<binary>
thread_001ckpt_005generate_response0messages<binary>

checkpoint_blobs 表 — 2 条(合并后只剩 2 个 channel):

thread_idcheckpoint_ns_hashchannelversionblob_data
thread_001d41d8cd…user_context0<binary>
thread_001d41d8cd…messages0<binary: 合并后的完整列表>

checkpoints 表 — 1 条:

thread_idcheckpoint_idparent_checkpoint_idcheckpoint_ns_hashgmt_modified
thread_001ckpt_005ckpt_004d41d8cd…2026-04-06 10:00:00

② 更新:创建新 Checkpoint

场景:用户继续对话,State 发生变化。

# 旧 State (ckpt_005)
old_state = {
    "messages": [msg1, msg2],
    "files": {"file_001": {...}},
    "user_context": {"age": 30, ...}
}
 
# 用户发送新消息,Agent 处理后——
 
# 新 State (ckpt_006)
new_state = {
    "messages": [msg1, msg2, msg3, msg4],  # ⬅️ 新增了 2 条消息
    "files": {"file_001": {...}},          # ⬅️ 未变化
    "user_context": {"age": 30, ...}       # ⬅️ 未变化
}

写入流程

与 ① 相同,只是创建了一个新的 checkpoint:

  1. checkpoint_writes:记录本次执行中每个 node 的写入
  2. checkpoint_blobs:合并后的最终状态

存储策略

方式 A:完整存储(简单但浪费空间)

INSERT INTO checkpoints VALUES (
    'thread_001', 'ckpt_006', 'ckpt_005', '', <新hash>, NOW(), NOW()
);
 
-- 存储所有 channel(即使未变化也重复存储)
INSERT INTO checkpoint_blobs VALUES
    ('thread_001', <hash>, 'messages',     0, 'msgpack', <新的messages>),
    ('thread_001', <hash>, 'files',        0, 'msgpack', <files重复存储>),
    ('thread_001', <hash>, 'user_context', 0, 'msgpack', <user_context重复存储>);

方式 B:增量存储(优化,读取时从 parent 继承未变 channel)

-- 只存储变化的 channel
INSERT INTO checkpoint_blobs VALUES
    ('thread_001', <hash>, 'messages', 0, 'msgpack', <只存新增的 msg3, msg4>);
 
-- files 和 user_context 未变化,不存储(读取时从 parent checkpoint 继承)
对比checkpoints 表checkpoint_blobs 表
方式 A1 条N 条(所有 channel)
方式 B1 条仅变化的 channel

无论哪种方式,checkpoint_writes 始终记录每个 node 的实际写入操作。


③ 回滚:从 ckpt_006 回到 ckpt_005

核心原则

  • 不删除旧 checkpoint
  • 创建新 checkpoint(分支),parent 指向回滚目标

用户操作

graph.update_state(
    thread_id="thread_001",
    checkpoint_id="ckpt_005"   # 指定回滚目标
)

数据库操作

-- 1. 创建新 checkpoint,parent 指向回滚目标 ckpt_005
INSERT INTO checkpoints VALUES (
    'thread_001', 'ckpt_007', 'ckpt_005', '', <新hash>, NOW(), NOW()
);
 
-- 2. 复制 ckpt_005 的所有 blob 数据到 ckpt_007
INSERT INTO checkpoint_blobs
SELECT
    thread_id,
    <ckpt_007的hash>,   -- 新的 checkpoint_ns_hash
    channel,
    version,
    type,
    blob_data            -- 直接复制旧数据
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
    AND checkpoint_ns_hash = <ckpt_005的hash>;

结果:形成分支树

ckpt_004 → ckpt_005 ┬→ ckpt_006  (旧分支,保留不删)
                     └→ ckpt_007  (新分支,从此继续)

对应的 checkpoints 表:

checkpoint_idparent_checkpoint_idgmt_modified
ckpt_004ckpt_00310:00:00
ckpt_005ckpt_00410:00:30 ← 回滚目标
ckpt_006ckpt_00510:01:00 ← 旧分支(保留)
ckpt_007ckpt_00510:02:00 ← 新分支

④ 读取:Database → State

场景:加载某个 checkpoint(如 ckpt_007)到内存。

常规读取(通过 checkpoint_blobs)

步骤 1:查询 checkpoint 元数据

SELECT checkpoint_id, checkpoint_ns_hash, parent_checkpoint_id
FROM checkpoints
WHERE thread_id = 'thread_001'
    AND checkpoint_id = 'ckpt_007'
    AND checkpoint_ns = '';

步骤 2:查询所有 blob 数据

SELECT channel, version, type, blob_data
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
    AND checkpoint_ns_hash = 'abc123...'   -- ckpt_007 的 hash
ORDER BY channel, version;

步骤 3:反序列化并合并

from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
 
serializer = JsonPlusSerializer()
state = {}
 
for row in blob_rows:
    channel = row['channel']
    value = serializer.loads_typed((row['type'], row['blob_data']))
 
    # 同一 channel 有多个 version 时需要合并
    if channel in state:
        state[channel] = merge(state[channel], value)
    else:
        state[channel] = value

步骤 4:多 version 合并逻辑

当同一 channel 存在多个 version 时,按类型合并:

# dict → 合并
state["files"] = {**files_v0, **files_v1}
 
# list → 拼接
state["messages"] = messages_v0 + messages_v1

最终恢复的 State

state = {
    "messages": [msg1, msg2],           # 从 blob 反序列化
    "files": {"file_001": {...}},       # 从 blob 反序列化
    "user_context": {"age": 30, ...}    # 从 blob 反序列化
}

调试/审计读取(通过 checkpoint_writes)

当需要分析执行过程时,查询 checkpoint_writes

SELECT
    task_id, idx, channel, task_path,
    LENGTH(blob_data) as data_size, gmt_create
FROM checkpoint_writes
WHERE thread_id = 'thread_001'
    AND checkpoint_id = 'ckpt_005'
    AND checkpoint_ns = ''
ORDER BY gmt_create, idx;

输出示例:

task_ididxchanneltask_pathdata_sizegmt_create
retrieve_context0user_context()51210:00:00.123
search_products0messages()204810:00:01.456
generate_response0messages()102410:00:02.789

可以清晰看到:执行顺序、每个 node 写了什么、数据量多大。


查询实践:取最新 checkpoint 的特定 channel

场景

给定一批 thread_id,取每个 thread 根图最新 checkpoint 中 files channel 的值。

关键点

由于 version 是 per-channel 单调递增的,最新 checkpoint 引用的 files version 一定是该 thread 中 files channel 的最大 version。因此无需查 checkpoints 表,直接在 blob 表中取 MAX(version) 即可。

SQL(MaxCompute / ODPS)

SELECT thread_id, version, blob_data
FROM (
  SELECT thread_id, version, blob_data,
         ROW_NUMBER() OVER (PARTITION BY thread_id ORDER BY version DESC) AS rn
  FROM test_ods.ods_checkpoint_blobs_db_delta
  WHERE thread_id IN ({thread_id_list})
    AND checkpoint_ns = ''
    AND channel = 'files'
    AND dt >= '{dt_start}'
    AND dt <= '{dt_end}'
) t
WHERE rn = 1
ORDER BY thread_id;

条件说明

条件作用
checkpoint_ns = ''只取根图,排除子图
channel = 'files'只取 files channel
ROW_NUMBER() ... ORDER BY version DESC每个 thread 取最新 version
dt >= / dt <=blob 增量表的分区过滤

为什么这样设计?

方案blob 行数说明
以 checkpoint_id 为外键(每步存全量)5 channels × 4 steps = 20 行
以 version 寻址(只存变化的)8 行✅ channel 越多、步骤越长,节省越显著

存储设计原理

1. 存储去重(Git-like)

类比 Git:同一个 blob 可被多个 commit 引用,同一个 channel blob 也可被多个 checkpoint 引用。

2. 写入效率

blob 不可变(ON CONFLICT DO NOTHING)。每步只写实际变化的 channel,未变的 channel 复用已有 blob。

3. 分支共享

LangGraph 支持从历史 checkpoint 分叉执行。分叉后未变的 channel 自然共享同一份 blob,无需复制。


checkpoint_writes 的进阶用途

1. 故障恢复:从 writes 重建 blobs

如果 checkpoint_blobs 写入失败,可以从 checkpoint_writes 重放:

writes = get_checkpoint_writes(thread_id, checkpoint_id)
 
state = {}
for write in writes:
    channel = write['channel']
    data = deserialize(write['blob_data'])
 
    if channel in state:
        state[channel] = merge(state[channel], data)
    else:
        state[channel] = data
 
# 重新创建 checkpoint_blobs
save_checkpoint_blobs(thread_id, checkpoint_id, state)

2. Agent 行为分析

SELECT
    task_id,
    COUNT(*) as write_count,
    SUM(LENGTH(blob_data)) / 1024 / 1024 as total_mb,
    AVG(LENGTH(blob_data)) as avg_bytes
FROM checkpoint_writes
WHERE thread_id LIKE 'thread_%'
    AND gmt_create >= '2026-04-01'
    AND checkpoint_ns = ''
GROUP BY task_id
ORDER BY write_count DESC;

3. 对比 writes vs blobs 数据量

-- writes 总量(所有 task 的写入)
SELECT SUM(LENGTH(blob_data)) / 1024 as writes_kb
FROM checkpoint_writes
WHERE thread_id = 'thread_001' AND checkpoint_id = 'ckpt_005';
 
-- blobs 总量(合并后的最终状态)
SELECT SUM(LENGTH(blob_data)) / 1024 as blobs_kb
FROM checkpoint_blobs
WHERE thread_id = 'thread_001'
    AND checkpoint_ns_hash = (
        SELECT checkpoint_ns_hash FROM checkpoints WHERE checkpoint_id = 'ckpt_005'
    );

如果 writes_kb > blobs_kb:说明有多个 task 写同一个 channel(合并后体积缩小)。

task_id 与 task_path

# task_id = node 名称
graph.add_node("retrieve_context", retrieve_func)
# → task_id = "retrieve_context"
 
# task_path 用于嵌套子图
task_path = "()"                              # 主图中的 node
task_path = "('subgraph_name',)"              # 子图中的 node
task_path = "('subgraph_name', 'nested',)"    # 嵌套子图

速查总结

四阶段操作对照

操作checkpoint_writescheckpoint_blobscheckpointsState 内存
① 写入INSERT(每 node 一条)INSERT(每 channel 一条)INSERT 1 条序列化
② 更新INSERT(新的 node 写入)INSERT(完整或增量)INSERT 1 条新 State
③ 回滚复制旧 blobINSERT 1 条(parent → 目标)
④ 读取SELECT(调试/审计)SELECT(常规读取)SELECT(查 hash)反序列化 + 合并

三张表速查

checkpoints 表

thread_id               ← 会话ID
checkpoint_id           ← 快照ID(唯一标识)
parent_checkpoint_id    ← 父快照ID(构建链条/分支)
checkpoint_ns           ← 命名空间(主图='',子图='task_name')
checkpoint_ns_hash      ← MD5(checkpoint_ns),关联其他两表 ⭐
gmt_create / gmt_modified ← 时间戳

checkpoint_writes 表

thread_id               ← 会话ID
checkpoint_id           ← 所属 checkpoint
checkpoint_ns_hash      ← 关联 checkpoints 表 ⭐
task_id                 ← node 名称(如 "retrieve_context")
idx                     ← 同一 task 内的写入序号
channel                 ← State 字段名(messages / files / ...)
type                    ← 序列化类型(msgpack)
blob_data               ← 序列化后的部分更新数据
task_path               ← 子图路径(主图="()")

checkpoint_blobs 表

thread_id               ← 会话ID
checkpoint_ns_hash      ← 关联 checkpoints 表 ⭐
channel                 ← State 字段名(messages / files / user_context)
version                 ← 同一 checkpoint、同一 channel 的版本号
type                    ← 序列化类型(msgpack / json / ...)
blob_data               ← 序列化后的二进制数据(合并后的完整状态)

序列化与反序列化

from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
serializer = JsonPlusSerializer()
 
# 写入时:State 字段 → blob_data
serialized = serializer.dumps_typed(state_field)
# → ('msgpack', b'\x82\xa3age\x1e...')
 
# 读取时:blob_data → State 字段
state_field = serializer.loads_typed(('msgpack', binary_data))
# → {"age": 30, "name": "张三"}
 

完整字段级数据流图示例

保险咨询会话为例,展示三张表的实际字段数据流转。

场景设定


thread_id = "thread_abc123" (用户咨询医疗险的完整会话)

会话流程


checkpoint_1 (用户说"你好") → checkpoint_2 (Agent回复) → checkpoint_3 (用户说"推荐医疗险") → checkpoint_4 (Agent推荐产品)


Step 1: 用户说”你好”

执行节点handle_greetinggenerate_response

checkpoints 表

thread_idcheckpoint_idparent_checkpoint_idcheckpoint_nscheckpoint_ns_hashgmt_create
thread_abc123ckpt_001NULLd41d8cd98f00b204e9800998ecf8427e10:00:00
thread_abc123ckpt_002ckpt_001d41d8cd98f00b204e9800998ecf8427e10:00:02

checkpoint_writes 表(ckpt_002 的写入)

thread_idcheckpoint_idcheckpoint_ns_hashtask_ididxchanneltypeblob_data (反序列化后)
thread_abc123ckpt_002d41d8cd…handle_greeting0messagesmsgpack[{"role":"human","content":"你好"}]
thread_abc123ckpt_002d41d8cd…generate_response0messagesmsgpack[{"role":"ai","content":"您好!我是您的保险顾问"}]

checkpoint_blobs 表(ckpt_002 最终状态)

thread_idcheckpoint_ns_hashchannelversiontypeblob_data (反序列化后)
thread_abc123d41d8cd…messages0msgpack[HumanMessage("你好"), AIMessage("您好!我是您的保险顾问")]
thread_abc123d41d8cd…user_context0msgpack{"session_start":"2026-04-07"}

Step 2: 用户说”推荐医疗险”

执行节点retrieve_contextsearch_productsgenerate_response

业务逻辑

  1. retrieve_context: 解析用户意图(医疗险)、提取年龄信息
  2. search_products: 搜索数据库中的医疗险产品
  3. generate_response: 生成产品推荐回复

checkpoints 表新增记录

thread_idcheckpoint_idparent_checkpoint_idcheckpoint_ns_hashgmt_create
thread_abc123ckpt_003ckpt_002d41d8cd…10:00:30

checkpoint_writes 表(ckpt_003 的写入 - 3 个 node 执行)

thread_idcheckpoint_idtask_ididxchannelblob_data (反序列化后)
thread_abc123ckpt_003retrieve_context0user_context{"intent":"medical_insurance","age":28,"budget":5000}
thread_abc123ckpt_003retrieve_context1extracted_entities["医疗险","28岁"]
thread_abc123ckpt_003search_products0search_results[{"product_id":"M001","name":"百万医疗","price":399}]
thread_abc123ckpt_003search_products1tool_calls[{"name":"search_db","args":{"category":"medical"}}]
thread_abc123ckpt_003generate_response0messages[AIMessage("为您推荐百万医疗险...")]

注意retrieve_context 写入了 2 条(idx 0 和 1),search_products 也写了 2 条。

checkpoint_blobs 表(ckpt_003 合并后状态)

thread_idcheckpoint_ns_hashchannelversionblob_data (反序列化后)
thread_abc123d41d8cd…messages0合并后的 3 条消息列表
thread_abc123d41d8cd…user_context0{"intent":"medical_insurance","age":28,"budget":5000}
thread_abc123d41d8cd…search_results0[{"product_id":"M001","name":"百万医疗"}]

Step 3: 数据流图可视化

flowchart TD
  thread["thread_abc123"]

  thread --> ckpt001["ckpt_001<br/>初始空状态"]
  ckpt001 --> writes001["writes<br/>无"]
  writes001 --> blobs001["blobs<br/>空"]

  blobs001 --> ckpt002["ckpt_002<br/>问候完成"]
  ckpt002 --> writes002["checkpoint_writes"]
  writes002 --> greeting["handle_greeting<br/>messages: 你好"]
  writes002 --> response002["generate_response<br/>messages: 您好!..."]
  greeting --> merge002["按 channel 合并"]
  response002 --> merge002
  merge002 --> blobs002["checkpoint_blobs<br/>messages: 合并后 2 条消息<br/>user_context: 基础会话信息"]

  blobs002 --> ckpt003["ckpt_003<br/>推荐请求处理中"]
  ckpt003 --> writes003["checkpoint_writes"]
  writes003 --> context["retrieve_context<br/>user_context: 意图 + 年龄"]
  writes003 --> entities["retrieve_context<br/>extracted_entities: 实体列表"]
  writes003 --> products["search_products<br/>search_results: 产品数据"]
  writes003 --> toolCalls["search_products<br/>tool_calls: 工具调用记录"]
  writes003 --> response003["generate_response<br/>messages: 推荐回复"]
  context --> merge003["按 channel 合并"]
  entities --> merge003
  products --> merge003
  toolCalls --> merge003
  response003 --> merge003
  merge003 --> blobs003["checkpoint_blobs<br/>messages: 合并后 4 条消息<br/>user_context: 更新后的用户信息<br/>search_results: 产品列表"]

  blobs003 --> ckpt004["ckpt_004<br/>最终回复完成"]
  ckpt004 --> writes004["checkpoint_writes"]
  writes004 --> finalResponse["finalize_response<br/>messages: 格式化回复"]
  finalResponse --> merge004["按 channel 合并"]
  merge004 --> blobs004["checkpoint_blobs<br/>messages: 完整对话历史<br/>user_context: 完整用户画像<br/>search_results: 推荐产品快照"]

Step 4: 从 writes 到 blobs 的合并过程详解

ckpt_003 为例,展示 messages channel 是如何合并的:

flowchart LR
  subgraph writes["checkpoint_writes 原始写入(按时间序)"]
    human["ckpt_002<br/>handle_greeting<br/>[HumanMessage: 你好]"]
    greeting["ckpt_002<br/>generate_response<br/>[AIMessage: 您好!]"]
    context["ckpt_003<br/>retrieve_context<br/>未写 messages"]
    products["ckpt_003<br/>search_products<br/>未写 messages"]
    recommendation["ckpt_003<br/>generate_response<br/>[AIMessage: 为您推荐...]"]
  end

  human --> reducer["messages reducer<br/>列表类型 = 拼接"]
  greeting --> reducer
  context -. 跳过 .-> reducer
  products -. 跳过 .-> reducer
  recommendation --> reducer

  reducer --> merged["合并结果<br/>[Human: 你好]<br/>[AI: 您好!]<br/>[AI: 为您推荐...]"]
  merged --> blobs["checkpoint_blobs<br/>channel: messages<br/>value: 3 条消息的完整列表"]

Step 5: 回滚场景示例

用户操作:“我刚才说错了,我不想看病险了”

# 回滚到 ckpt_002(问候完成状态,还没推荐产品)
graph.update_state(
    thread_id="thread_abc123",
    checkpoint_id="ckpt_002"
)

回滚后的 checkpoints 表(形成分支)

checkpoint_idparent_checkpoint_id说明
ckpt_001NULL初始
ckpt_002ckpt_001问候完成 ← 回滚目标
ckpt_003ckpt_002旧分支(含推荐)
ckpt_004ckpt_002新分支(从此继续)
                    ┌→ ckpt_003 → (废弃分支,保留)
ckpt_001 → ckpt_002 ┤
                    └→ ckpt_004 → (新对话从此开始)

ckpt_004 的 checkpoint_blobs

从 ckpt_002 复制所有 blob 数据:

thread_idcheckpoint_idchannelblob_data 来源
thread_abc123ckpt_004messages复制自 ckpt_002
thread_abc123ckpt_004user_context复制自 ckpt_002

ckpt_003 的 search_results 和推荐相关的 messages 不会出现在 ckpt_004 中。


Step 6: SQL 查询示例

查询某 checkpoint 的完整执行轨迹

-- 查看 ckpt_003 中每个 node 的贡献
SELECT
    task_id,
    channel,
    LENGTH(blob_data) as bytes,
    gmt_create
FROM checkpoint_writes
WHERE thread_id = 'thread_abc123'
    AND checkpoint_id = 'ckpt_003'
ORDER BY gmt_create;
 
-- 输出:
-- task_id           | channel            | bytes | gmt_create
-- retrieve_context  | user_context       | 256   | 10:00:30.100
-- retrieve_context  | extracted_entities | 128   | 10:00:30.200
-- search_products   | search_results     | 2048  | 10:00:31.500
-- search_products   | tool_calls         | 512   | 10:00:31.600
-- generate_response | messages           | 1024  | 10:00:32.000

对比 writes 和 blobs 的数据量

-- ckpt_003 的 writes 总量
SELECT SUM(LENGTH(blob_data)) as writes_total_bytes
FROM checkpoint_writes
WHERE thread_id = 'thread_abc123' AND checkpoint_id = 'ckpt_003';
-- 结果: 3968 bytes (5条写入)
 
-- ckpt_003 的 blobs 总量
SELECT SUM(LENGTH(blob_data)) as blobs_total_bytes
FROM checkpoint_blobs
WHERE thread_id = 'thread_abc123'
    AND checkpoint_ns_hash = 'd41d8cd...';
-- 结果: 3072 bytes (3条合并后)
 
-- 压缩率: (3968-3072)/3968 = 22.6% 的冗余被合并消除

字段级速查卡片

checkpoints 表 —— 时间线/分支索引

┌─────────────────┬─────────────────────────────────────────┐
│ thread_id       │ "thread_abc123"                         │
│ checkpoint_id   │ "ckpt_003"                              │
│ parent_id       │ "ckpt_002" ← 指向前一个                   │
│ checkpoint_ns   │ "" (主图) / "subgraph" (子图)            │
│ ns_hash         │ "d41d8cd..." ← 关联 writes 和 blobs      │
│ gmt_create      │ 2026-04-07 10:00:30                     │
└─────────────────┴─────────────────────────────────────────┘

checkpoint_writes 表 —— 执行轨迹(调试/审计)

┌─────────────────┬─────────────────────────────────────────┐
│ thread_id       │ "thread_abc123"                         │
│ checkpoint_id   │ "ckpt_003"                              │
│ task_id         │ "search_products" ← node 名称            │
│ idx             │ 0 / 1 / 2... ← 同一 task 的第 N 次写入    │
│ channel         │ "search_results" ← State 字段名          │
│ blob_data       │ <二进制: 产品列表数据>                     │
│ task_path       │ "()" / "('subgraph',)" ← 子图路径        │
└─────────────────┴─────────────────────────────────────────┘

checkpoint_blobs 表 —— 最终状态(常规读取)

┌─────────────────┬─────────────────────────────────────────┐
│ thread_id       │ "thread_abc123"                         │
│ ns_hash         │ "d41d8cd..." ← 来自 checkpoints 表       │
│ channel         │ "messages" ← State 字段名                │
│ version         │ 0 / 1 / 2... ← 同一 checkpoint 的版本     │
│ blob_data       │ <二进制: 合并后的完整消息列表>              │
└─────────────────┴─────────────────────────────────────────┘