本文翻译自 How to build a distributed queue in a single JSON file on object storage,原载于 Hacker News。
turbopuffer 最近替换了他们的内部索引任务队列系统。这个队列用于在数据写入 WAL(Write-Ahead Log)后通知索引节点构建和更新搜索索引。队列不属于写入路径,纯粹是一个用于调度异步索引工作的通知系统。
之前的版本将队列分片到各个索引节点,所以如果一个节点变慢,分配给它的所有任务都会被阻塞,即使其他节点空闲也无济于事。新版本使用对象存储上的单个队列文件配合无状态 broker,实现了 FIFO 执行、至少一次投递保证,尾延迟相比之前降低了 10 倍。
为什么如此执着于基于对象存储构建?因为它简单、可预测、易于运维,而且极具扩展性。只要在它的边界内设计,就能保证性能。
与其直接展示最终设计,不如让我们从最简单的可行方案开始,逐步添加必要的复杂性。
Step 1: queue.json
turbopuffer 任务队列的数据总量很小,远小于 1 GiB,完全可以放入内存。所以最简单的功能设计就是一个单文件(比如 queue.json),每次写入时覆盖整个队列内容。
队列 pusher(生产者)读取队列内容,追加新任务到末尾,然后使用 compare-and-set (CAS) 写入。
队列 worker(消费者)同样使用 CAS 将第一个未认领的任务标记为进行中(○ → ◐),然后开始处理。
我们把 pusher 和 worker 统称为 clients,push 和 claim 操作统称为 requests。
queue.json
┌──────────────────────────────────────┐
│ {"jobs": ["◐", "○", "○", "○", "○",]} │
└──────────────────────────────────────┘
▲ ▲
│ │
CAS write │ CAS write │
│ │
┌─────┴────┐ ┌──────┴───┐
│ worker │ │ pusher │
└──────────┘ └──────────┘
CAS(compare-and-set)原语保证了原子性。只有当 queue.json 自读取后没有变化时,写入才会成功。如果变化了,客户端就读取新内容重试。这提供了强一致性保证,无需复杂的锁机制。
这个最简单的队列效果出奇地好!对于每秒 1 个请求(GCS 的限制)的场景,它已经是生产级别了——这要归功于对象存储为我们做的一切。
但大多数队列(包括 turbopuffer 的)每秒收到的请求远不止一个。我们需要更高的吞吐量。
Step 2: queue.json 配合 Group Commit
对象存储有很多优点,但低写入延迟不在其中。替换一个文件可能需要 200ms,所以我们不能一个一个写入任务,而需要批量处理。当有写入正在进行时,我们将传入的请求缓存在内存中。一旦写入完成,立即将缓冲区作为下一次 CAS 写入刷新出去。
这种技术通常称为 group commit,turbopuffer 也用同样的模式来批量写入 WAL。传统数据库也用这个技术来合并 fsync(2) 调用,最大化磁盘的提交吞吐量。
queue.json
┌───────────────────────────────────────────────────────────────┐
│ {"jobs": ["◐", "◐", "◐", "◐", "○", "○", "○", "○", "○", "○",]} │
└───────────────────────────────────────────────────────────────┘
▲ ▲
│ │
│ │
group commit │ group commit │
│ │
┌── buffer ──┴──────┐ ┌── buffer ───────┴─┐
│ ┌───┬───┬───┬───┐ │ │ ┌───┬───┬───┬───┐ │
│ │ ◐ │ ◐ │ ◐ │ ◐ │ │ │ │ ○ │ ○ │ ○ │ ○ │ │
│ └───┴───┴───┴───┘ │ │ └───┴───┴───┴───┘ │
└─────────▲─────────┘ └─────────▲─────────┘
│ │
│ │
┌─────┴────┐ ┌─────┴────┐
│ worker │ │ pusher │
└──────────┘ └──────────┘
Group commit 通过解耦写入速率和请求速率解决了吞吐量问题。扩展瓶颈从写入延迟(~200ms/写入)转移到了网络带宽(~10 GB/s)——这远超 turbopuffer 跟踪索引任务的需求。
然而,问题还没完全解决。在任何 turbopuffer 区域,当新数据写入多个 namespace 时,几十甚至上百个客户端会争抢这个单一的队列对象。
由于 CAS 通过强制每次写入在时间上不重叠来保证强一致性,我们只能达到 1 / ~200ms = ~5 写入/秒(而且 GCS 还有 1 RPS 的限制)。
问题不再是吞吐量。我们需要更少的写入者。
注:这个设计加上分片到本地队列,大致就是 turbopuffer 在这次更新之前的生产环境实现。下面的章节描述的是 turbopuffer 当前的生产索引队列。
Step 3: queue.json 配合 Brokered Group Commit
为了消除对队列对象的争抢,我们引入一个无状态的 broker,负责所有与对象存储的交互。所有客户端现在必须通过 broker,而不是直接写入对象存储。
broker 代表所有客户端运行单个 group commit 循环,所以没有人争抢对象。关键的是,它只有在 group commit 落地到对象存储后才确认写入。在数据持久化之前,没有客户端可以继续。
queue.json
┌───────────────────────────────────────────────────────────────────────────────────┐
│ {"jobs": ["◐", "◐", "◐", "◐", "○", "○", "○", "○", "○", "○", "○", "○", "○", "○",]} │
└───────────────────────────────────────────────────────────────────────────────────┘
▲
│
│ brokered group commit
│
╔═ broker ═════════════════════════════════╧════════════════════════════════════════╗
║ ║
║ ┌─ buffer ────────────────────────────────────────────────────────────────────┐ ║
║ │ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐ │ ║
║ │ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ │ ║
║ │ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘ │ ║
║ └─────────────────────────────────────────────────────────────────────────────┘ ║
║ ║
╚═══════════════════════════════════════════════════════════════════════════════════╝
▲ ▲ ▲ ▲ ▲ ▲
│ │ │ │ │ │
┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
│ worker │ │ worker │ │ worker │ │ pusher │ │ pusher │ │ pusher │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘
现在 broker 成了瓶颈,但单个 broker 进程可以轻松服务成百上千个客户端,因为写入的数据量很小。它只是保持连接开放、在等待 I/O 时在内存中缓冲请求。对象存储承担了繁重的工作。
扩展性问题解决了。系统现在可以处理 turbopuffer 的索引流量。但我们还需要高可用性。
Step 4: queue.json 配合 HA Brokered Group Commit
broker 的机器随时可能宕机。同样,某个 worker 可能认领了任务却永远无法完成。这两个问题的解决方案形状相同——发现问题消失后交接责任——但细节不同。
如果客户端到 broker 的任何请求超时,我们就启动一个新的 broker。客户端需要能找到新 broker,所以我们将 broker 的地址写入 queue.json。
broker 是无状态的,所以迁移既简单又廉价。如果同时有多个 broker?没关系:CAS 即使有两个 broker 也能保证正确性。旧的 broker 最终会在 queue.json 上遇到 CAS 失败,发现自己不再是 broker 了。唯一的代价是短暂的争抢导致的性能下降。
对于任务认领,我们添加心跳机制。worker 定期向 broker 发送时间戳确认仍在处理中,broker 将这个心跳时间写入 queue.json 中该任务的信息。如果队列中某个任务的最后心跳超过某个超时阈值,我们假设原来的 worker 已经消失,下一个 worker 接手继续处理。
queue.json
┌──────────────────────────────────────────────────────────────────────────────────┐
│ { │ read
│ "broker": "10.0.0.42:3000", │◀──┐
│ "jobs": ["◐(♥)", "◐(♥)", "◐(♥)", "◐(♥)", "◐(♥)", "○", "○", "○", "○", "○",] │ │
│ } │ │
└──────────────────────────────────────────────────────────────────────────────────┘ │
▲ │
│ │
│ brokered group commit │
│ │
╔═ broker ═════════════════════════════════╧════════════════════════════════════════╗ │
║ ║ │
║ ┌─ buffer ────────────────────────────────────────────────────────────────────┐ ║ │
║ │ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┐ │ ║ │
║ │ │ ◐ │ ◐ │ ◐ │ ◐ │ ◐ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ ○ │ │ ║ │
║ │ └───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┘ │ ║ │
║ └─────────────────────────────────────────────────────────────────────────────┘ ║ │
║ ║ │
╚═══════════════════════════════════════════════════════════════════════════════════╝ │
▲ ▲ ▲ ▲ ▲ ▲ │
│ │ │ │ │ │ │
┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ │
│ worker │ │ worker │ │ worker │ │ pusher │ │ pusher │ │ pusher │─────┘
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘
总结
我们只用了对象存储上的单个文件和少量无状态进程,就构建了一个可靠的分布式任务队列。它轻松处理我们的吞吐量,保证至少一次投递,并且可以在需要时故障转移到任何节点。
熟悉 turbopuffer 核心架构的读者会发现其中的相似之处。对象存储提供的原语虽然不多,但非常强大。一旦理解了它们的行为,就可以利用它们构建弹性、高性能、高扩展的分布式系统——使用已有的基础设施。
关键要点:
- 从简单开始:一个 JSON 文件 + CAS 原语就能实现基本的分布式队列
- Group Commit 解耦吞吐量:通过批量写入将写入速率与请求速率解耦
- 无状态 Broker 消除争抢:让单一 broker 负责所有对象存储交互
- CAS 保证正确性:即使出现多个 broker,CAS 也能保证系统正确运行
- 心跳机制处理失败:通过心跳检测失败的任务并重新分配
- 拥抱对象存储的特性:简单、可预测、易运维、高扩展