场景:撮合引擎多 Pod 部署,每个 Pod 维护本地 L1 内存深度数据,如何通过 Redis L2 实现跨 Pod 无损同步?
一、背景与问题定义
撮合引擎(Matching Engine)是交易系统的核心,订单簿(Order Book)的买卖深度数据有以下特点:
- 写频率极高:每秒数千次挂单、撤单、成交
- 读频率更高:行情推送、风控查询、前端展示
- 强一致性要求:深度数据不能乱序、不能丢失
- 延迟敏感:毫秒级响应,不能每次都走 Redis
因此,撮合引擎不会把深度数据持续维护在 Redis,而是:
L1(JVM 堆内存)= 单一真相来源(Source of Truth)
L2(Redis) = 跨 Pod 同步介质 + 故障恢复快照核心问题就变成:如何让 L1 的变更,通过 L2,无差别地同步到其他 Pod 的 L1?
二、整体架构
┌─────────────────────────┐ ┌─────────────────────────┐
│ Pod A │ │ Pod B │
│ ┌─────────────────┐ │ │ ┌─────────────────┐ │
│ │ L1: OrderBook │ │ │ │ L1: OrderBook │ │
│ │ (ConcurrentMap)│ │ │ │ (ConcurrentMap)│ │
│ └────────┬────────┘ │ │ └────────▲────────┘ │
│ │ 写变更事件 │ │ │ 消费事件 │
│ ┌────────▼────────┐ │ │ ┌────────┴────────┐ │
│ │ EventPublisher │ │ │ │ EventConsumer │ │
│ └────────┬────────┘ │ │ └────────┬────────┘ │
└───────────┼─────────────┘ └───────────┼─────────────┘
│ │
▼ │
┌───────────────────────────────────────────────────────┐
│ Redis │
│ Stream(有序事件日志) + Hash(全量快照) │
│ │
│ XADD depth:events * op ADD side BID price 100 qty 5 │
│ HSET depth:snapshot:BTC-USDT BID:100 5 │
└───────────────────────────────────────────────────────┘核心思路:
- L1 变更 → 发布增量事件到 Redis Stream
- 其他 Pod 消费 Stream → 应用到自身 L1
- Redis Hash 维护全量快照,用于新 Pod 启动时快速恢复
三、数据模型设计
3.1 深度变更事件(DepthEvent)
@Data
@Builder
public class DepthEvent {
private String symbol; // 交易对,如 BTC-USDT
private String operation; // ADD / UPDATE / REMOVE / CLEAR
private String side; // BID / ASK
private BigDecimal price; // 价格档位
private BigDecimal quantity; // 数量(REMOVE 时为 0)
private long sequence; // 全局递增序列号,保证顺序
private long timestamp; // 事件时间戳(毫秒)
private String sourceNodeId; // 来源 Pod ID,避免自己消费自己
}3.2 L1 本地 OrderBook
public class OrderBook {
private final String symbol;
// 买盘:价格从高到低
private final TreeMap<BigDecimal, BigDecimal> bids =
new TreeMap<>(Comparator.reverseOrder());
// 卖盘:价格从低到高
private final TreeMap<BigDecimal, BigDecimal> asks =
new TreeMap<>();
// 当前已应用的最大 sequence,用于幂等去重
private volatile long appliedSequence = -1;
public synchronized void apply(DepthEvent event) {
// 幂等保护:跳过已处理的事件
if (event.getSequence() <= appliedSequence) return;
TreeMap<BigDecimal, BigDecimal> side =
"BID".equals(event.getSide()) ? bids : asks;
switch (event.getOperation()) {
case "ADD":
case "UPDATE":
side.put(event.getPrice(), event.getQuantity());
break;
case "REMOVE":
side.remove(event.getPrice());
break;
case "CLEAR":
side.clear();
break;
}
appliedSequence = event.getSequence();
}
}四、关键流程实现
4.1 L1 写入 + 发布事件(双写)
撮合引擎处理订单时,先写 L1,再异步发布事件到 Redis:
@Service
public class MatchingEngineService {
private final Map<String, OrderBook> localBooks = new ConcurrentHashMap<>();
private final DepthEventPublisher publisher;
private final AtomicLong sequencer = new AtomicLong(0);
public void onOrderAdd(String symbol, String side,
BigDecimal price, BigDecimal qty) {
OrderBook book = localBooks.computeIfAbsent(symbol, OrderBook::new);
DepthEvent event = DepthEvent.builder()
.symbol(symbol)
.operation("ADD")
.side(side)
.price(price)
.quantity(qty)
.sequence(sequencer.incrementAndGet()) // 全局自增
.timestamp(System.currentTimeMillis())
.sourceNodeId(NodeConfig.NODE_ID)
.build();
// 1. 先应用到本地 L1
book.apply(event);
// 2. 异步发布到 Redis Stream(非阻塞)
publisher.publish(event);
// 3. 定期更新全量快照到 Redis Hash(异步批量)
snapshotScheduler.markDirty(symbol);
}
}4.2 Redis Stream 事件发布
@Component
public class DepthEventPublisher {
private final StringRedisTemplate redisTemplate;
// 异步发布,不阻塞撮合主线程
@Async("depthPublishExecutor")
public void publish(DepthEvent event) {
Map<String, String> body = new HashMap<>();
body.put("op", event.getOperation());
body.put("symbol", event.getSymbol());
body.put("side", event.getSide());
body.put("price", event.getPrice().toPlainString());
body.put("qty", event.getQuantity().toPlainString());
body.put("seq", String.valueOf(event.getSequence()));
body.put("ts", String.valueOf(event.getTimestamp()));
body.put("node", event.getSourceNodeId());
// XADD depth:events:BTC-USDT MAXLEN ~ 10000 * ...
// MAXLEN ~ 10000 滚动保留最近 1 万条,避免无限增长
redisTemplate.opsForStream()
.add(RecordId.autoGenerate(),
"depth:events:" + event.getSymbol(),
body);
}
}4.3 跨 Pod 事件消费(核心)
其他 Pod 通过 Redis Stream Consumer Group 拉取事件,应用到自身 L1:
@Component
public class DepthEventConsumer implements InitializingBean {
private final StringRedisTemplate redisTemplate;
private final Map<String, OrderBook> localBooks;
private static final String GROUP = "depth-sync-group";
private static final String STREAM = "depth:events:";
@Override
public void afterPropertiesSet() {
// 每个 Pod 独立消费(不共享 offset),所以用广播模式:
// 每个 Pod 独立 Consumer Name = NODE_ID
// 注意:这里不用 Consumer Group 共享消费,
// 而是每个 Pod 都从自己上次的 lastId 开始拉取
}
@Scheduled(fixedDelay = 10) // 每 10ms 拉一次
public void poll() {
for (String symbol : watchedSymbols) {
List<MapRecord<String, String, String>> records =
redisTemplate.opsForStream().read(
Consumer.from(GROUP, NodeConfig.NODE_ID),
StreamReadOptions.empty().count(500).block(Duration.ofMillis(5)),
StreamOffset.create(STREAM + symbol, ReadOffset.lastConsumed())
);
if (records == null || records.isEmpty()) continue;
for (MapRecord<String, String, String> record : records) {
Map<String, String> body = record.getValue();
// 跳过自己发出的事件(已在发布前应用过 L1)
if (NodeConfig.NODE_ID.equals(body.get("node"))) {
ack(symbol, record.getId());
continue;
}
DepthEvent event = parseEvent(body);
OrderBook book = localBooks.computeIfAbsent(
event.getSymbol(), OrderBook::new);
book.apply(event); // 幂等应用
ack(symbol, record.getId());
}
}
}
private void ack(String symbol, RecordId id) {
redisTemplate.opsForStream()
.acknowledge(STREAM + symbol, GROUP, id);
}
}五、不丢失保障:全量快照 + 增量回放
只靠 Stream 有一个风险:新 Pod 启动时,历史 Stream 可能已被裁剪(MAXLEN),无法从头回放。
解决方案:快照 + 增量回放,类似 Redis AOF+RDB 的思路。
新 Pod 启动
│
▼
① 加载 Redis Hash 全量快照(毫秒级)
│
▼
② 记录快照对应的 sequence(snapshotSeq)
│
▼
③ 从 snapshotSeq 之后的 Stream 事件逐条回放
│
▼
④ L1 就绪,开始提供服务5.1 定期写入全量快照
@Scheduled(fixedDelay = 1000) // 每秒写一次快照
public void flushSnapshot(String symbol) {
OrderBook book = localBooks.get(symbol);
if (book == null || !snapshotScheduler.isDirty(symbol)) return;
Map<String, String> snapshot = new HashMap<>();
snapshot.put("_seq", String.valueOf(book.getAppliedSequence()));
book.getBids().forEach((price, qty) ->
snapshot.put("BID:" + price.toPlainString(), qty.toPlainString()));
book.getAsks().forEach((price, qty) ->
snapshot.put("ASK:" + price.toPlainString(), qty.toPlainString()));
// 原子替换快照
redisTemplate.opsForHash().putAll("depth:snapshot:" + symbol, snapshot);
snapshotScheduler.clearDirty(symbol);
}5.2 新 Pod 启动恢复
public void recoverFromRedis(String symbol) {
// Step 1: 加载全量快照
Map<Object, Object> snapshot =
redisTemplate.opsForHash().entries("depth:snapshot:" + symbol);
if (snapshot.isEmpty()) return; // 首次启动,无快照
long snapshotSeq = Long.parseLong((String) snapshot.get("_seq"));
OrderBook book = localBooks.computeIfAbsent(symbol, OrderBook::new);
snapshot.forEach((k, v) -> {
String key = (String) k;
if (key.startsWith("BID:")) {
book.getBids().put(new BigDecimal(key.substring(4)),
new BigDecimal((String) v));
} else if (key.startsWith("ASK:")) {
book.getAsks().put(new BigDecimal(key.substring(4)),
new BigDecimal((String) v));
}
});
book.setAppliedSequence(snapshotSeq);
// Step 2: 从快照 seq 之后继续回放 Stream 增量
replayStreamFrom(symbol, snapshotSeq);
log.info("Pod {} recovered {} from seq={}",
NodeConfig.NODE_ID, symbol, snapshotSeq);
}六、sequence 全局唯一方案
多 Pod 并发写入时,sequence 必须全局唯一且单调递增,不能各 Pod 各自维护。
方案对比
| 方案 | 优点 | 缺点 |
|---|---|---|
| Redis INCR | 简单,强一致 | 每次撮合都要 RTT,影响性能 |
| 预分配号段 | 减少 RTT,批量申请 | 实现稍复杂 |
| Redis Stream ID | 天然有序,免维护 | 不是业务 sequence,难对齐快照 |
| Snowflake ID | 无中心,高性能 | 时钟回拨风险,需处理 |
推荐:号段预分配 + 本地自增
public class SequenceAllocator {
private final StringRedisTemplate redisTemplate;
private long currentMax = 0;
private long cursor = 0;
private static final int STEP = 1000; // 每次申请 1000 个号
public synchronized long nextSeq() {
if (cursor >= currentMax) {
// 向 Redis 申请下一个号段
currentMax = redisTemplate.opsForValue()
.increment("depth:seq:global", STEP);
cursor = currentMax - STEP;
}
return cursor++;
}
}七、边界情况处理
7.1 网络分区 / Redis 短暂不可用
@Async
public void publish(DepthEvent event) {
int retry = 0;
while (retry < 3) {
try {
redisTemplate.opsForStream().add(...);
return;
} catch (RedisConnectionException e) {
retry++;
// 写入本地 WAL(Write-Ahead Log)兜底
localWal.append(event);
Thread.sleep(50 * retry);
}
}
log.error("Redis publish failed, event buffered to WAL: seq={}",
event.getSequence());
}
// Redis 恢复后,WAL 重放
@Scheduled(fixedDelay = 5000)
public void replayWal() {
if (!redisAlive() || localWal.isEmpty()) return;
localWal.drain().forEach(this::publish);
}7.2 消费者 Pending 消息处理
Pod 崩溃后,已 ACK 前的消息会停留在 Pending List,重启后需要先处理:
// 启动时检查 PEL(Pending Entry List)
public void claimPendingMessages(String symbol) {
PendingMessages pending = redisTemplate.opsForStream()
.pending(STREAM + symbol, GROUP, Range.unbounded(), 100);
pending.forEach(msg -> {
// 超过 30s 未 ACK 的消息,重新拉取处理
if (msg.getElapsedTimeSinceLastDelivery().getSeconds() > 30) {
redisTemplate.opsForStream()
.claim(STREAM + symbol, GROUP, NodeConfig.NODE_ID,
Duration.ofSeconds(30), msg.getId());
}
});
}7.3 事件乱序处理
网络抖动可能导致极小概率乱序,OrderBook.apply() 中已有 sequence 检查,额外增加短暂缓冲排序:
// 消费端维护一个小的排序缓冲区,等待乱序窗口(50ms)
private final DelayQueue<DepthEvent> reorderBuffer = new DelayQueue<>();
public void applyWithReorder(DepthEvent event) {
reorderBuffer.offer(event); // 按 sequence 排序
DepthEvent ready;
while ((ready = reorderBuffer.poll()) != null) {
if (ready.getSequence() == book.getAppliedSequence() + 1) {
book.apply(ready);
} else {
reorderBuffer.offer(ready); // 放回等下一个
break;
}
}
}八、完整数据流总结
撮合引擎处理订单
│
▼
① 生成 sequence(号段分配器,本地自增)
│
▼
② 应用到本地 L1 OrderBook(同步,微秒级)
│
├──────────────────────────────────────┐
▼ ▼
③ 异步发布 DepthEvent ④ 定期写全量快照
到 Redis Stream 到 Redis Hash
(非阻塞,~1ms) (每秒,毫秒级)
│
▼
⑤ 其他 Pod 消费 Stream(10ms 轮询)
│
▼
⑥ 过滤自身 sourceNodeId
│
▼
⑦ 幂等应用到各自 L1 OrderBook
│
▼
⑧ ACK 消息新 Pod 启动路径:
加载 Redis Hash 快照 → 记录 snapshotSeq →
回放 snapshotSeq 之后的 Stream → L1 就绪九、性能指标参考
| 指标 | 数值 | 备注 |
|---|---|---|
| L1 读取延迟 | < 1μs | 纯内存,无锁读 |
| 事件发布延迟 | ~1ms | 异步,不阻塞撮合 |
| 跨 Pod 同步延迟 | 10~50ms | 取决于轮询频率 |
| 快照恢复时间 | < 500ms | 取决于深度档位数 |
| Stream 保留条数 | 10,000 | MAXLEN,约覆盖几分钟 |
| 号段预分配大小 | 1000 | 可根据 TPS 调整 |
十、小结
| 问题 | 解决方案 |
|---|---|
| 跨 Pod L1 同步 | Redis Stream 广播增量事件 |
| 新 Pod 冷启动 | Redis Hash 快照 + Stream 增量回放 |
| 事件不丢失 | WAL 本地兜底 + PEL 重试机制 |
| 事件不重复 | sequence 幂等检查 |
| 事件不乱序 | 延迟排序缓冲区 |
| sequence 全局唯一 | 号段预分配 + 本地自增 |
| 撮合性能不受影响 | 所有 Redis 操作全部异步 |
核心原则:L1 是真相来源,Redis 只是同步管道和恢复快照。撮合的关键路径上永远不依赖 Redis,Redis 的任何故障都不能影响撮合本身的正确性。