Karp 的技术博客
场景:撮合引擎多 Pod 部署,每个 Pod 维护本地 L1 内存深度数据,如何通过 Redis L2 实现跨 Pod 无损同步?

一、背景与问题定义

撮合引擎(Matching Engine)是交易系统的核心,订单簿(Order Book)的买卖深度数据有以下特点:

  • 写频率极高:每秒数千次挂单、撤单、成交
  • 读频率更高:行情推送、风控查询、前端展示
  • 强一致性要求:深度数据不能乱序、不能丢失
  • 延迟敏感:毫秒级响应,不能每次都走 Redis

因此,撮合引擎不会把深度数据持续维护在 Redis,而是:

L1(JVM 堆内存)= 单一真相来源(Source of Truth)
L2(Redis)     = 跨 Pod 同步介质 + 故障恢复快照

核心问题就变成:如何让 L1 的变更,通过 L2,无差别地同步到其他 Pod 的 L1?


二、整体架构

┌─────────────────────────┐      ┌─────────────────────────┐
│       Pod A             │      │       Pod B             │
│  ┌─────────────────┐    │      │  ┌─────────────────┐    │
│  │  L1: OrderBook  │    │      │  │  L1: OrderBook  │    │
│  │  (ConcurrentMap)│    │      │  │  (ConcurrentMap)│    │
│  └────────┬────────┘    │      │  └────────▲────────┘    │
│           │ 写变更事件    │      │           │ 消费事件     │
│  ┌────────▼────────┐    │      │  ┌────────┴────────┐    │
│  │  EventPublisher │    │      │  │  EventConsumer  │    │
│  └────────┬────────┘    │      │  └────────┬────────┘    │
└───────────┼─────────────┘      └───────────┼─────────────┘
            │                                │
            ▼                                │
┌───────────────────────────────────────────────────────┐
│                     Redis                             │
│   Stream(有序事件日志)  +  Hash(全量快照)           │
│                                                       │
│  XADD depth:events * op ADD side BID price 100 qty 5  │
│  HSET depth:snapshot:BTC-USDT BID:100 5               │
└───────────────────────────────────────────────────────┘

核心思路

  1. L1 变更 → 发布增量事件到 Redis Stream
  2. 其他 Pod 消费 Stream → 应用到自身 L1
  3. Redis Hash 维护全量快照,用于新 Pod 启动时快速恢复

三、数据模型设计

3.1 深度变更事件(DepthEvent)

@Data
@Builder
public class DepthEvent {
    private String symbol;        // 交易对,如 BTC-USDT
    private String operation;     // ADD / UPDATE / REMOVE / CLEAR
    private String side;          // BID / ASK
    private BigDecimal price;     // 价格档位
    private BigDecimal quantity;  // 数量(REMOVE 时为 0)
    private long sequence;        // 全局递增序列号,保证顺序
    private long timestamp;       // 事件时间戳(毫秒)
    private String sourceNodeId;  // 来源 Pod ID,避免自己消费自己
}

3.2 L1 本地 OrderBook

public class OrderBook {
    private final String symbol;
    
    // 买盘:价格从高到低
    private final TreeMap<BigDecimal, BigDecimal> bids = 
        new TreeMap<>(Comparator.reverseOrder());
    
    // 卖盘:价格从低到高
    private final TreeMap<BigDecimal, BigDecimal> asks = 
        new TreeMap<>();
    
    // 当前已应用的最大 sequence,用于幂等去重
    private volatile long appliedSequence = -1;

    public synchronized void apply(DepthEvent event) {
        // 幂等保护:跳过已处理的事件
        if (event.getSequence() <= appliedSequence) return;
        
        TreeMap<BigDecimal, BigDecimal> side = 
            "BID".equals(event.getSide()) ? bids : asks;
        
        switch (event.getOperation()) {
            case "ADD":
            case "UPDATE":
                side.put(event.getPrice(), event.getQuantity());
                break;
            case "REMOVE":
                side.remove(event.getPrice());
                break;
            case "CLEAR":
                side.clear();
                break;
        }
        appliedSequence = event.getSequence();
    }
}

四、关键流程实现

4.1 L1 写入 + 发布事件(双写)

撮合引擎处理订单时,先写 L1,再异步发布事件到 Redis

@Service
public class MatchingEngineService {

    private final Map<String, OrderBook> localBooks = new ConcurrentHashMap<>();
    private final DepthEventPublisher publisher;
    private final AtomicLong sequencer = new AtomicLong(0);

    public void onOrderAdd(String symbol, String side, 
                           BigDecimal price, BigDecimal qty) {
        OrderBook book = localBooks.computeIfAbsent(symbol, OrderBook::new);
        
        DepthEvent event = DepthEvent.builder()
            .symbol(symbol)
            .operation("ADD")
            .side(side)
            .price(price)
            .quantity(qty)
            .sequence(sequencer.incrementAndGet())  // 全局自增
            .timestamp(System.currentTimeMillis())
            .sourceNodeId(NodeConfig.NODE_ID)
            .build();
        
        // 1. 先应用到本地 L1
        book.apply(event);
        
        // 2. 异步发布到 Redis Stream(非阻塞)
        publisher.publish(event);
        
        // 3. 定期更新全量快照到 Redis Hash(异步批量)
        snapshotScheduler.markDirty(symbol);
    }
}

4.2 Redis Stream 事件发布

@Component
public class DepthEventPublisher {

    private final StringRedisTemplate redisTemplate;
    
    // 异步发布,不阻塞撮合主线程
    @Async("depthPublishExecutor")
    public void publish(DepthEvent event) {
        Map<String, String> body = new HashMap<>();
        body.put("op",       event.getOperation());
        body.put("symbol",   event.getSymbol());
        body.put("side",     event.getSide());
        body.put("price",    event.getPrice().toPlainString());
        body.put("qty",      event.getQuantity().toPlainString());
        body.put("seq",      String.valueOf(event.getSequence()));
        body.put("ts",       String.valueOf(event.getTimestamp()));
        body.put("node",     event.getSourceNodeId());

        // XADD depth:events:BTC-USDT MAXLEN ~ 10000 * ...
        // MAXLEN ~ 10000 滚动保留最近 1 万条,避免无限增长
        redisTemplate.opsForStream()
            .add(RecordId.autoGenerate(),
                 "depth:events:" + event.getSymbol(),
                 body);
    }
}

4.3 跨 Pod 事件消费(核心)

其他 Pod 通过 Redis Stream Consumer Group 拉取事件,应用到自身 L1:

@Component
public class DepthEventConsumer implements InitializingBean {

    private final StringRedisTemplate redisTemplate;
    private final Map<String, OrderBook> localBooks;
    private static final String GROUP   = "depth-sync-group";
    private static final String STREAM  = "depth:events:";

    @Override
    public void afterPropertiesSet() {
        // 每个 Pod 独立消费(不共享 offset),所以用广播模式:
        // 每个 Pod 独立 Consumer Name = NODE_ID
        // 注意:这里不用 Consumer Group 共享消费,
        //       而是每个 Pod 都从自己上次的 lastId 开始拉取
    }

    @Scheduled(fixedDelay = 10)  // 每 10ms 拉一次
    public void poll() {
        for (String symbol : watchedSymbols) {
            List<MapRecord<String, String, String>> records =
                redisTemplate.opsForStream().read(
                    Consumer.from(GROUP, NodeConfig.NODE_ID),
                    StreamReadOptions.empty().count(500).block(Duration.ofMillis(5)),
                    StreamOffset.create(STREAM + symbol, ReadOffset.lastConsumed())
                );

            if (records == null || records.isEmpty()) continue;

            for (MapRecord<String, String, String> record : records) {
                Map<String, String> body = record.getValue();
                
                // 跳过自己发出的事件(已在发布前应用过 L1)
                if (NodeConfig.NODE_ID.equals(body.get("node"))) {
                    ack(symbol, record.getId());
                    continue;
                }
                
                DepthEvent event = parseEvent(body);
                OrderBook book = localBooks.computeIfAbsent(
                    event.getSymbol(), OrderBook::new);
                
                book.apply(event);  // 幂等应用
                ack(symbol, record.getId());
            }
        }
    }

    private void ack(String symbol, RecordId id) {
        redisTemplate.opsForStream()
            .acknowledge(STREAM + symbol, GROUP, id);
    }
}

五、不丢失保障:全量快照 + 增量回放

只靠 Stream 有一个风险:新 Pod 启动时,历史 Stream 可能已被裁剪(MAXLEN),无法从头回放。

解决方案:快照 + 增量回放,类似 Redis AOF+RDB 的思路。

新 Pod 启动
    │
    ▼
① 加载 Redis Hash 全量快照(毫秒级)
    │
    ▼
② 记录快照对应的 sequence(snapshotSeq)
    │
    ▼
③ 从 snapshotSeq 之后的 Stream 事件逐条回放
    │
    ▼
④ L1 就绪,开始提供服务

5.1 定期写入全量快照

@Scheduled(fixedDelay = 1000)  // 每秒写一次快照
public void flushSnapshot(String symbol) {
    OrderBook book = localBooks.get(symbol);
    if (book == null || !snapshotScheduler.isDirty(symbol)) return;

    Map<String, String> snapshot = new HashMap<>();
    snapshot.put("_seq", String.valueOf(book.getAppliedSequence()));
    
    book.getBids().forEach((price, qty) ->
        snapshot.put("BID:" + price.toPlainString(), qty.toPlainString()));
    book.getAsks().forEach((price, qty) ->
        snapshot.put("ASK:" + price.toPlainString(), qty.toPlainString()));

    // 原子替换快照
    redisTemplate.opsForHash().putAll("depth:snapshot:" + symbol, snapshot);
    snapshotScheduler.clearDirty(symbol);
}

5.2 新 Pod 启动恢复

public void recoverFromRedis(String symbol) {
    // Step 1: 加载全量快照
    Map<Object, Object> snapshot = 
        redisTemplate.opsForHash().entries("depth:snapshot:" + symbol);
    
    if (snapshot.isEmpty()) return; // 首次启动,无快照
    
    long snapshotSeq = Long.parseLong((String) snapshot.get("_seq"));
    OrderBook book = localBooks.computeIfAbsent(symbol, OrderBook::new);
    
    snapshot.forEach((k, v) -> {
        String key = (String) k;
        if (key.startsWith("BID:")) {
            book.getBids().put(new BigDecimal(key.substring(4)), 
                               new BigDecimal((String) v));
        } else if (key.startsWith("ASK:")) {
            book.getAsks().put(new BigDecimal(key.substring(4)), 
                               new BigDecimal((String) v));
        }
    });
    book.setAppliedSequence(snapshotSeq);
    
    // Step 2: 从快照 seq 之后继续回放 Stream 增量
    replayStreamFrom(symbol, snapshotSeq);
    
    log.info("Pod {} recovered {} from seq={}", 
             NodeConfig.NODE_ID, symbol, snapshotSeq);
}

六、sequence 全局唯一方案

多 Pod 并发写入时,sequence 必须全局唯一且单调递增,不能各 Pod 各自维护。

方案对比

方案优点缺点
Redis INCR简单,强一致每次撮合都要 RTT,影响性能
预分配号段减少 RTT,批量申请实现稍复杂
Redis Stream ID天然有序,免维护不是业务 sequence,难对齐快照
Snowflake ID无中心,高性能时钟回拨风险,需处理

推荐:号段预分配 + 本地自增

public class SequenceAllocator {
    
    private final StringRedisTemplate redisTemplate;
    private long currentMax = 0;
    private long cursor = 0;
    private static final int STEP = 1000; // 每次申请 1000 个号

    public synchronized long nextSeq() {
        if (cursor >= currentMax) {
            // 向 Redis 申请下一个号段
            currentMax = redisTemplate.opsForValue()
                .increment("depth:seq:global", STEP);
            cursor = currentMax - STEP;
        }
        return cursor++;
    }
}

七、边界情况处理

7.1 网络分区 / Redis 短暂不可用

@Async
public void publish(DepthEvent event) {
    int retry = 0;
    while (retry < 3) {
        try {
            redisTemplate.opsForStream().add(...);
            return;
        } catch (RedisConnectionException e) {
            retry++;
            // 写入本地 WAL(Write-Ahead Log)兜底
            localWal.append(event);
            Thread.sleep(50 * retry);
        }
    }
    log.error("Redis publish failed, event buffered to WAL: seq={}", 
               event.getSequence());
}

// Redis 恢复后,WAL 重放
@Scheduled(fixedDelay = 5000)
public void replayWal() {
    if (!redisAlive() || localWal.isEmpty()) return;
    localWal.drain().forEach(this::publish);
}

7.2 消费者 Pending 消息处理

Pod 崩溃后,已 ACK 前的消息会停留在 Pending List,重启后需要先处理:

// 启动时检查 PEL(Pending Entry List)
public void claimPendingMessages(String symbol) {
    PendingMessages pending = redisTemplate.opsForStream()
        .pending(STREAM + symbol, GROUP, Range.unbounded(), 100);
    
    pending.forEach(msg -> {
        // 超过 30s 未 ACK 的消息,重新拉取处理
        if (msg.getElapsedTimeSinceLastDelivery().getSeconds() > 30) {
            redisTemplate.opsForStream()
                .claim(STREAM + symbol, GROUP, NodeConfig.NODE_ID,
                       Duration.ofSeconds(30), msg.getId());
        }
    });
}

7.3 事件乱序处理

网络抖动可能导致极小概率乱序,OrderBook.apply() 中已有 sequence 检查,额外增加短暂缓冲排序:

// 消费端维护一个小的排序缓冲区,等待乱序窗口(50ms)
private final DelayQueue<DepthEvent> reorderBuffer = new DelayQueue<>();

public void applyWithReorder(DepthEvent event) {
    reorderBuffer.offer(event); // 按 sequence 排序
    
    DepthEvent ready;
    while ((ready = reorderBuffer.poll()) != null) {
        if (ready.getSequence() == book.getAppliedSequence() + 1) {
            book.apply(ready);
        } else {
            reorderBuffer.offer(ready); // 放回等下一个
            break;
        }
    }
}

八、完整数据流总结

撮合引擎处理订单
       │
       ▼
① 生成 sequence(号段分配器,本地自增)
       │
       ▼
② 应用到本地 L1 OrderBook(同步,微秒级)
       │
       ├──────────────────────────────────────┐
       ▼                                      ▼
③ 异步发布 DepthEvent               ④ 定期写全量快照
  到 Redis Stream                      到 Redis Hash
  (非阻塞,~1ms)                      (每秒,毫秒级)
       │
       ▼
⑤ 其他 Pod 消费 Stream(10ms 轮询)
       │
       ▼
⑥ 过滤自身 sourceNodeId
       │
       ▼
⑦ 幂等应用到各自 L1 OrderBook
       │
       ▼
⑧ ACK 消息

新 Pod 启动路径

加载 Redis Hash 快照 → 记录 snapshotSeq → 
回放 snapshotSeq 之后的 Stream → L1 就绪

九、性能指标参考

指标数值备注
L1 读取延迟< 1μs纯内存,无锁读
事件发布延迟~1ms异步,不阻塞撮合
跨 Pod 同步延迟10~50ms取决于轮询频率
快照恢复时间< 500ms取决于深度档位数
Stream 保留条数10,000MAXLEN,约覆盖几分钟
号段预分配大小1000可根据 TPS 调整

十、小结

问题解决方案
跨 Pod L1 同步Redis Stream 广播增量事件
新 Pod 冷启动Redis Hash 快照 + Stream 增量回放
事件不丢失WAL 本地兜底 + PEL 重试机制
事件不重复sequence 幂等检查
事件不乱序延迟排序缓冲区
sequence 全局唯一号段预分配 + 本地自增
撮合性能不受影响所有 Redis 操作全部异步
核心原则:L1 是真相来源,Redis 只是同步管道和恢复快照。撮合的关键路径上永远不依赖 Redis,Redis 的任何故障都不能影响撮合本身的正确性。

redis

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2026年05月15日 00:38
0

目录

来自 《Redis 二级缓存在撮合引擎中的落地方案》