Karp 的技术博客
标签:Kafka Golang 定时任务 事件驱动 架构设计
Go Kafka 库:github.com/segmentio/kafka-go

为什么要替换定时任务?

定时任务在小规模系统中运行良好,但随着业务增长,问题逐渐暴露:

问题描述
时间耦合必须等到固定时间点才触发,实时性差
重复执行多实例部署需要额外分布式锁,稍有不慎就重复执行
全量扫描通常需要扫全表找待处理数据,数据量大时拖垮数据库
失败难追踪没有自然的重试机制,失败了难以回溯
扩展性差任务量增大时无法动态扩容消费能力

方案一:事件驱动替代状态轮询

适用场景

  • 订单支付后触发发货
  • 用户注册后发送欢迎邮件
  • 状态流转触发下游处理

流程对比

❌ 旧方案(定时轮询)

┌─────────────────────────────────────────────┐
│  Cron Job(每 5 分钟)                        │
│       ↓                                      │
│  SELECT * FROM orders WHERE status='paid'    │  ← 全表扫描,数据量大时极慢
│       ↓                                      │
│  for each order → 触发发货逻辑               │
│       ↓                                      │
│  UPDATE orders SET status='shipping'         │
└─────────────────────────────────────────────┘
  问题:最多延迟 5 分钟响应,高峰期扫表慢


✅ 新方案(事件驱动)

  支付服务                 Kafka                  发货服务
    │                       │                        │
    │  支付成功              │                        │
    │──发布 order.paid ────→│                        │
    │                       │──推送消息 ────────────→│
    │                       │                        │ 毫秒级实时触发发货
    │                       │                        │ 处理完提交 offset
    │                       │                        │

Go 实现

// ========== Producer:支付完成发事件 ==========

package payment

import (
    "context"
    "encoding/json"
    "time"

    "github.com/segmentio/kafka-go"
)

// OrderPaidEvent 是订单支付成功的事件结构体
// 这个结构体会被序列化为 JSON 作为 Kafka 消息的 Value 发送
type OrderPaidEvent struct {
    OrderID   string    `json:"order_id"`
    UserID    string    `json:"user_id"`
    Amount    float64   `json:"amount"`
    PaidAt    time.Time `json:"paid_at"`
    // MessageID 是消费端幂等去重的唯一标识
    // 消费者收到消息后先用这个 ID 检查是否已处理过,防止重复消费
    MessageID string    `json:"message_id"`
}

// writer 是全局共享的 Kafka 生产者,kafka-go 的 Writer 是协程安全的,可以复用
var writer = &kafka.Writer{
    // Addr:Kafka Broker 地址列表
    // 填多个地址是为了高可用:其中一个 Broker 宕机时,Producer 会自动切换到其他地址
    // 这里只需要填部分 Broker 地址,Producer 连上后会自动发现集群其他节点(Bootstrap)
    Addr: kafka.TCP("kafka1:9092", "kafka2:9092", "kafka3:9092"),

    // Topic:消息写入的目标 Topic 名称
    // 命名规范:{业务域}.{实体}.{事件类型},便于管理和监控
    Topic: "order.order.paid",

    // Balancer:决定消息路由到哪个 Partition 的策略
    // kafka.Hash{} 表示对消息的 Key 做哈希取模,相同 Key 的消息永远路由到同一 Partition
    // 这样可以保证同一个 OrderID 的消息是有序的(Kafka 只保证单 Partition 内有序)
    // 其他可选值:
    //   &kafka.RoundRobin{} → 轮询,适合不需要顺序保证的场景,吞吐更均匀
    //   &kafka.LeastBytes{} → 优先发给堆积最少的 Partition
    Balancer: &kafka.Hash{},

    // RequiredAcks:发送消息后需要等待多少副本确认才算成功
    // kafka.RequireAll  = acks=-1,等待所有 ISR 副本写入确认,最安全,不丢消息(推荐生产使用)
    // kafka.RequireOne  = acks=1, 只等 Leader 确认,Leader 宕机可能丢消息
    // kafka.RequireNone = acks=0, 不等任何确认,性能最高但可能丢消息
    RequiredAcks: kafka.RequireAll,

    // Async:是否异步发送
    // false(默认):同步发送,WriteMessages 会阻塞直到 Broker 确认,适合对可靠性要求高的场景
    // true:异步发送,WriteMessages 立即返回,错误通过 Completion 回调处理,吞吐更高但需额外处理错误
    Async: false,
}

// OnPaymentSuccess 在支付成功时调用,将事件发布到 Kafka
func OnPaymentSuccess(ctx context.Context, orderID, userID string, amount float64) error {
    event := OrderPaidEvent{
        OrderID:   orderID,
        UserID:    userID,
        Amount:    amount,
        PaidAt:    time.Now(),
        // 用 orderID + 事件类型 拼接成幂等 ID
        // 保证同一笔订单的支付成功事件即使重试发送,消费端也只处理一次
        MessageID: orderID + "_paid",
    }
    payload, _ := json.Marshal(event)

    return writer.WriteMessages(ctx, kafka.Message{
        // Key:消息路由键,决定这条消息去哪个 Partition
        // 这里用 OrderID 作为 Key,保证同一个订单的所有事件都在同一个 Partition,保序
        Key:   []byte(orderID),
        Value: payload,
    })
}
// ========== Consumer:发货服务实时消费 ==========

package shipping

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

func StartConsumer(ctx context.Context) {
    r := kafka.NewReader(kafka.ReaderConfig{
        // Brokers:Kafka 集群地址,填多个保证高可用,逻辑同 Producer
        Brokers: []string{"kafka1:9092", "kafka2:9092"},

        // Topic:要消费的 Topic 名称,需与 Producer 写入的 Topic 一致
        Topic: "order.order.paid",

        // GroupID:消费者组 ID
        // 同一个 GroupID 的多个消费者实例共同消费这个 Topic
        // Kafka 会自动将 Partition 分配给组内不同的消费者,实现负载均衡
        // 不同 GroupID 的消费者相互独立,都能收到全量消息(广播语义)
        // 例如:发货服务和通知服务可以用不同的 GroupID 同时消费同一个 Topic
        GroupID: "shipping-service-group",

        // MinBytes:每次拉取请求等待的最小数据量(字节)
        // 设为 1 表示有消息就立即返回,延迟最低,适合实时性要求高的场景
        // 设大一点(如 1MB)可以减少请求次数,提升吞吐,但会增加延迟
        MinBytes: 1,

        // MaxBytes:每次拉取请求返回的最大数据量(字节)
        // 10e6 = 10MB,防止一次拉取过多消息导致内存压力
        // 需要结合单条消息大小和处理能力来设置
        MaxBytes: 10e6,

        // CommitInterval:自动提交 offset 的时间间隔
        // 0 表示关闭自动提交,改为手动调用 CommitMessages 控制
        // 手动提交的好处:只有业务处理成功后才提交,避免消息丢失
        // 如果设为自动提交(如 time.Second),消息拉下来还没处理完就提交了,
        // 一旦进程崩溃,这批消息就再也不会被处理(消息丢失)
        CommitInterval: 0,

        // StartOffset:当 GroupID 第一次消费这个 Topic 时,从哪里开始
        // kafka.FirstOffset  = 从最早的消息开始消费(不遗漏历史消息)
        // kafka.LastOffset   = 从最新的消息开始消费(忽略历史消息)
        // 注意:这个配置只对新的 GroupID 生效,已有 offset 记录的 Group 会从上次位置继续
        StartOffset: kafka.FirstOffset,

        // MaxWait:当消息不足 MinBytes 时,最多等待多久再返回
        // 设为 3s 表示即使消息量不够,等 3 秒后也强制返回,避免消费者长时间阻塞
        MaxWait: 3 * time.Second,
    })
    defer r.Close()

    for {
        // FetchMessage:拉取一条消息,但不提交 offset
        // 与 ReadMessage 的区别:ReadMessage 会自动提交,FetchMessage 需要手动 CommitMessages
        // 这里用 FetchMessage 配合手动提交,保证"处理成功才提交"
        msg, err := r.FetchMessage(ctx)
        if err != nil {
            log.Printf("拉取消息失败: %v", err)
            continue
        }

        var event OrderPaidEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("消息解析失败,跳过: %v", err)
            // 解析失败说明消息格式有问题,重试也没用,直接提交跳过
            // 否则这条消息会一直阻塞,后续消息无法消费
            r.CommitMessages(ctx, msg)
            continue
        }

        // 幂等检查 + 业务处理
        if err := processWithIdempotent(ctx, event); err != nil {
            log.Printf("处理失败,发往 DLQ: %v", err)
            sendToDLQ(ctx, msg) // 失败消息发到死信队列,不阻塞主流程
        }

        // 只有业务处理完成(无论成功还是进入 DLQ)后,才提交 offset
        // 这保证了 At Least Once 语义:消息至少被处理一次
        if err := r.CommitMessages(ctx, msg); err != nil {
            log.Printf("offset 提交失败: %v", err)
        }
    }
}

func processWithIdempotent(ctx context.Context, event OrderPaidEvent) error {
    // 用 Redis SetNX(Set if Not eXists)实现幂等去重
    // SetNX 是原子操作:Key 不存在时设置并返回 true,已存在时返回 false
    // TTL 设为 24 小时:超过这个时间窗口的重复消息概率极低,可以接受
    ok, _ := rdb.SetNX(ctx,
        "msg:processed:"+event.MessageID, // Key:用消息唯一 ID 标识
        1,                                // Value:随意,只用 Key 来判断是否存在
        24*time.Hour,                     // TTL:24 小时后 Key 自动删除,释放内存
    ).Result()

    if !ok {
        // ok=false 说明 Key 已存在,这是一条重复消息,直接跳过
        log.Printf("重复消息,跳过: %s", event.MessageID)
        return nil
    }
    // 确认是新消息,执行业务逻辑
    return shippingService.Trigger(event.OrderID)
}

方案二:延迟消息替代延时定时任务

适用场景

  • 下单 30 分钟未支付自动取消
  • 发货 7 天后自动确认收货
  • 优惠券到期提醒

时间轮原理

时间轮(Timing Wheel) 是一种高效处理大量定时任务的数据结构。

想象一个时钟表盘,表盘被均匀分成 N 个槽(Slot),每个槽对应一个时间刻度(如 1 秒)。
指针每隔一个刻度向前走一格,走到哪个槽就触发该槽中所有到期的任务。

       0s
      ┌──┐
 59s──┤  ├──1s       ← 指针当前在 0s,每秒走一格
      │  │
 ...  │  │  ...      每个槽里挂着"在这个时刻到期"的任务列表
      │  │
 30s──┴──┘           到期的任务从槽中取出并执行

优点:插入任务 O(1),触发任务 O(1),远优于最小堆的 O(log n)
适合:海量定时任务(如百万级订单超时取消)

对比最小堆(Go time.AfterFunc 内部实现):

  • 最小堆:任务量大时,每次插入/删除都是 O(log n),任务多了性能下降
  • 时间轮:无论多少任务,插入和触发都是 O(1),更适合高并发延迟场景

流程图

                    ┌─────────────────────────────────────────────┐
                    │              延迟调度服务                     │
                    │                                              │
  业务服务           │   delay-queue Topic                          │   真实 Topic
    │               │        │                                     │       │
    │  下单成功      │        │                                     │       │
    │─写入 ────────→│────────→  consume 拉取消息                   │       │
    │  execute_at   │             │                                │       │
    │  = now+30min  │             ↓                                │       │
    │               │         时间轮(每秒 tick)                   │       │
    │               │         槽内挂载到期任务                      │       │
    │               │             │  30分钟后到期                   │       │
    │               │             └──────────────────────────────→│───────→  取消订单
    │               │                   转发到 order.cancel Topic  │       │  Consumer
    │               │                                              │       │
    └───────────────┴──────────────────────────────────────────────┴───────┘

Go 实现

// ========== 发送延迟消息 ==========

package delay

import (
    "context"
    "encoding/json"
    "time"

    "github.com/segmentio/kafka-go"
)

// DelayMessage 是延迟消息的信封结构
// 它不是真正的业务消息,而是一个"包裹",里面装着真正要投递的内容和投递时间
type DelayMessage struct {
    // RealTopic:到期后消息需要被转发到的真实 Topic
    RealTopic string `json:"real_topic"`
    // Key:转发时使用的消息 Key,保证路由到正确的 Partition
    Key string `json:"key"`
    // Value:真正的业务消息内容(原样转发,不解析)
    Value json.RawMessage `json:"value"`
    // ExecuteAt:消息应该被触发的时间点(Unix 毫秒时间戳)
    // 调度器会不断检查当前时间是否 >= ExecuteAt,到期则转发
    ExecuteAt int64 `json:"execute_at"`
}

// delay-queue 是所有延迟消息的"暂存区"
// 所有需要延迟执行的消息都先写到这里,由调度器统一管理投递时机
var writer = &kafka.Writer{
    Addr:         kafka.TCP("kafka1:9092"),
    Topic:        "delay-queue",
    RequiredAcks: kafka.RequireAll,
}

// SendDelay 将一条业务消息包装成延迟消息发送到暂存 Topic
// realTopic:最终要投递的 Topic
// key:      消息的路由 Key
// value:    业务消息内容(会被原样透传)
// delay:    延迟时长,如 30*time.Minute
func SendDelay(ctx context.Context, realTopic, key string, value any, delay time.Duration) error {
    payload, _ := json.Marshal(value)
    msg := DelayMessage{
        RealTopic: realTopic,
        Key:       key,
        Value:     payload,
        // 当前时间 + 延迟时长 = 应该被触发的绝对时间点
        ExecuteAt: time.Now().Add(delay).UnixMilli(),
    }
    body, _ := json.Marshal(msg)

    return writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(key),
        Value: body,
    })
}

// OnOrderCreated 下单时调用,发送 30 分钟后自动取消的延迟消息
func OnOrderCreated(ctx context.Context, orderID string) {
    SendDelay(ctx,
        "order.order.cancel",                    // 到期后投递到这个 Topic
        orderID,                                 // 路由 Key
        map[string]string{"order_id": orderID},  // 业务内容
        30*time.Minute,                          // 延迟 30 分钟
    )
}
// ========== 延迟调度服务(时间轮转发)==========

package delay

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// timerTask 是内存时间轮中的一个待触发任务
type timerTask struct {
    msg       DelayMessage  // 解析后的延迟消息,包含目标 Topic 和业务内容
    commitMsg kafka.Message // 原始 Kafka 消息,转发成功后用于提交 delay-queue 的 offset
}

// Scheduler 是延迟调度器的主体
// 它扮演"中间人"角色:从 delay-queue 拉取消息 → 放入时间轮等待 → 到期后转发到真实 Topic
type Scheduler struct {
    reader    *kafka.Reader            // 从 delay-queue 拉取消息的消费者
    writers   map[string]*kafka.Writer // 按目标 Topic 缓存 Writer,避免重复创建连接
    timerCh   chan timerTask           // 时间轮到期后,将任务投入这个 channel,由 dispatch 协程处理
}

func NewScheduler() *Scheduler {
    return &Scheduler{
        reader: kafka.NewReader(kafka.ReaderConfig{
            Brokers: []string{"kafka1:9092"},
            Topic:   "delay-queue",
            // 调度器是单独的服务,用独立的 GroupID
            // 保证 delay-queue 的每条消息只被调度器处理一次
            GroupID:        "delay-scheduler-group",
            CommitInterval: 0,       // 手动提交:转发成功后才提交,防止消息丢失
            MinBytes:       1,
            MaxBytes:       10e6,
        }),
        writers: make(map[string]*kafka.Writer),
        // timerCh 缓冲区设为 10000:
        // 允许最多 10000 个任务同时到期等待被 dispatch 消费
        // 缓冲区满时,时间轮的 goroutine 会阻塞,起到背压作用
        timerCh: make(chan timerTask, 10000),
    }
}

func (s *Scheduler) Start(ctx context.Context) {
    go s.consume(ctx)  // 协程1:持续从 delay-queue 拉取消息,放入时间轮
    go s.dispatch(ctx) // 协程2:持续从 timerCh 取到期任务,转发到真实 Topic
}

// consume 从 delay-queue 拉取消息,计算剩余等待时间,用 time.Sleep 模拟时间轮槽
// 注意:每条消息会启动一个独立 goroutine 等待到期,适合任务量适中的场景
// 若任务量极大(百万级),建议替换为 github.com/RussellLuo/timingwheel 等成熟时间轮库
func (s *Scheduler) consume(ctx context.Context) {
    for {
        // FetchMessage 拉取消息,不自动提交 offset
        m, err := s.reader.FetchMessage(ctx)
        if err != nil {
            log.Printf("delay-queue 拉取失败: %v", err)
            continue
        }

        var dm DelayMessage
        if err := json.Unmarshal(m.Value, &dm); err != nil {
            log.Printf("延迟消息解析失败,跳过: %v", err)
            s.reader.CommitMessages(ctx, m) // 格式错误,跳过这条消息
            continue
        }

        // 计算距离到期还有多久
        wait := time.Until(time.UnixMilli(dm.ExecuteAt))
        if wait < 0 {
            wait = 0 // 已经过期的消息立即投递(如服务重启后的补偿)
        }

        // 为每条消息启动一个 goroutine,sleep 到期后投入 dispatch channel
        // 这就是简化版"时间轮":每个 goroutine 相当于时间轮的一个槽
        go func(task timerTask, wait time.Duration) {
            log.Printf("延迟任务入轮: key=%s, 等待=%v", task.msg.Key, wait)

            // select 监听两个事件:
            // 1. wait 到期:正常触发投递
            // 2. ctx 取消:服务关闭时优雅退出,不再等待
            select {
            case <-time.After(wait):
                s.timerCh <- task // 到期,投入 dispatch 队列
            case <-ctx.Done():
                log.Printf("调度器关闭,任务丢弃: key=%s", task.msg.Key)
                // 生产环境应将未到期任务持久化到 DB,重启后恢复
            }
        }(timerTask{msg: dm, commitMsg: m}, wait)
    }
}

// dispatch 从 timerCh 取出到期任务,转发到真实 Topic,然后提交 delay-queue 的 offset
// 这个顺序非常重要:必须先确认转发成功,再提交 offset
// 如果先提交 offset 再转发失败,delay-queue 里的这条消息就永久丢失了
func (s *Scheduler) dispatch(ctx context.Context) {
    for {
        select {
        case task := <-s.timerCh:
            w := s.getWriter(task.msg.RealTopic) // 获取目标 Topic 的 Writer
            err := w.WriteMessages(ctx, kafka.Message{
                Key:   []byte(task.msg.Key),
                Value: task.msg.Value, // 原样转发业务内容
            })
            if err != nil {
                log.Printf("转发到 %s 失败: %v,消息将重新投递", task.msg.RealTopic, err)
                // 转发失败:不提交 offset,消息会在下次服务重启后重新调度
                // 注意:这可能导致轻微的重复投递,消费端需要做幂等处理
                continue
            }
            // 转发成功后才提交 delay-queue 的 offset,防止重复调度
            if err := s.reader.CommitMessages(ctx, task.commitMsg); err != nil {
                log.Printf("offset 提交失败: %v", err)
            }
            log.Printf("延迟消息转发成功: key=%s → topic=%s", task.msg.Key, task.msg.RealTopic)

        case <-ctx.Done():
            return
        }
    }
}

// getWriter 按 Topic 获取或创建 Writer,避免重复建立连接
func (s *Scheduler) getWriter(topic string) *kafka.Writer {
    if w, ok := s.writers[topic]; ok {
        return w
    }
    w := &kafka.Writer{
        Addr:         kafka.TCP("kafka1:9092"),
        Topic:        topic,
        RequiredAcks: kafka.RequireAll,
    }
    s.writers[topic] = w
    return w
}

方案三:流式窗口替代定时批处理

适用场景

  • 每小时统计订单金额报表
  • 每天对账汇总
  • 实时监控指标聚合

流程图

❌ 旧方案(定时批处理)

  Cron 每小时触发
       ↓
  SELECT SUM(amount) FROM orders         ← 重复全量计算,数据库压力大
  WHERE created_at BETWEEN ? AND ?
       ↓
  写入报表表


✅ 新方案(滚动窗口聚合)

  订单服务          Kafka              聚合消费者              Doris
    │                │                    │                     │
    │ 每笔订单 ──────→│                    │                     │
    │                │── 实时推送 ────────→│                     │
    │                │                    │ 内存累加             │
    │                │                    │ 整点窗口关闭          │
    │                │                    │──── 批量写入 ────────→│
    │                │                    │                     │ 报表 / BI 查询

Go 实现

package aggregator

import (
    "context"
    "encoding/json"
    "log"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
)

type OrderEvent struct {
    OrderID    string    `json:"order_id"`
    MerchantID string    `json:"merchant_id"`
    Amount     float64   `json:"amount"`
    CreatedAt  time.Time `json:"created_at"`
}

// WindowResult 是一个时间窗口内的聚合结果
// 例如:某商户在 14:00~15:00 的订单总金额和笔数
type WindowResult struct {
    MerchantID  string
    WindowStart time.Time // 窗口开始时间(整点)
    WindowEnd   time.Time // 窗口结束时间(下一个整点)
    TotalAmount float64
    Count       int64
}

// WindowAggregator 是滚动窗口聚合器
type WindowAggregator struct {
    mu      sync.Mutex               // 保护 windows map 的并发安全
    windows map[string]*WindowResult // key = merchantID|windowStart,value = 该窗口的聚合数据
    ticker  *time.Ticker             // 定时触发窗口关闭和刷新
}

func NewWindowAggregator() *WindowAggregator {
    wa := &WindowAggregator{
        windows: make(map[string]*WindowResult),
        // 每小时触发一次窗口刷新,将上一个小时的聚合结果写入 Doris
        ticker: time.NewTicker(time.Hour),
    }
    go wa.flushLoop()
    return wa
}

func (wa *WindowAggregator) Start(ctx context.Context) {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"kafka1:9092"},
        Topic:   "order.order.created",
        GroupID: "hourly-aggregator-group",
        // 聚合场景可以适当增大 MinBytes,减少请求次数,提升吞吐
        MinBytes: 1e3,  // 1KB
        MaxBytes: 10e6, // 10MB
        // 最多等 500ms,即使消息量不够 MinBytes 也返回,保证聚合的实时性
        MaxWait: 500 * time.Millisecond,
    })
    defer r.Close()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            continue
        }

        var event OrderEvent
        if err := json.Unmarshal(m.Value, &event); err != nil {
            r.CommitMessages(ctx, m)
            continue
        }

        // 将消息累加到对应的时间窗口
        wa.accumulate(event)
        r.CommitMessages(ctx, m)
    }
}

// accumulate 将一条订单消息的金额累加到对应的时间窗口中
func (wa *WindowAggregator) accumulate(event OrderEvent) {
    // Truncate(time.Hour) 将时间对齐到整点
    // 例如:14:37:22 → 14:00:00,这样同一小时内的消息都落入同一个窗口
    windowStart := event.CreatedAt.Truncate(time.Hour)
    // key = merchantID|windowStart,唯一标识一个窗口
    key := event.MerchantID + "|" + windowStart.String()

    wa.mu.Lock()
    defer wa.mu.Unlock()

    // 第一次看到这个窗口,初始化
    if _, ok := wa.windows[key]; !ok {
        wa.windows[key] = &WindowResult{
            MerchantID:  event.MerchantID,
            WindowStart: windowStart,
            WindowEnd:   windowStart.Add(time.Hour),
        }
    }
    wa.windows[key].TotalAmount += event.Amount
    wa.windows[key].Count++
}

// flushLoop 每小时将当前所有窗口的聚合结果写入 Doris,然后清空内存
// 注意:这里有个简化:整点时写入上一个小时的数据
// 生产环境建议用水位线(Watermark)机制处理迟到消息
func (wa *WindowAggregator) flushLoop() {
    for range wa.ticker.C {
        // 原子性地取出所有窗口数据并重置,减少锁持有时间
        wa.mu.Lock()
        toFlush := wa.windows
        wa.windows = make(map[string]*WindowResult)
        wa.mu.Unlock()

        for _, result := range toFlush {
            if err := doris.Insert(result); err != nil {
                log.Printf("写入 Doris 失败: %v", err)
                // 生产环境应有重试或补偿机制,如写入本地文件后告警人工处理
            }
        }
        log.Printf("窗口刷新完成,写入 %d 条聚合结果", len(toFlush))
    }
}

方案四:心跳超时检测替代健康检查定时任务

适用场景

  • IoT 设备在线检测
  • 微服务实例存活检测
  • 长连接客户端超时判断

流程图

  设备 / 服务                Kafka                  心跳消费者              Redis
    │                         │                         │                    │
    │  每 30s 发心跳            │                         │                    │
    │──→ device.heartbeat ────→│                         │                    │
    │                         │──── 实时推送 ───────────→│                    │
    │                         │                         │ SET device:online  │
    │                         │                         │ :deviceID EX 90 ──→│  TTL 刷新
    │                         │                         │                    │
    │    (超过 90s 无心跳)    │                         │                    │
    │                         │                         │         Key 过期 ──→ 触发过期事件
    │                         │                         │                    │
    │                         │                         │             告警服务 ←─ 订阅过期事件
    │                         │                         │          钉钉 / PagerDuty 通知

Go 实现

// ========== 设备心跳 Producer ==========

package device

import (
    "context"
    "encoding/json"
    "time"

    "github.com/segmentio/kafka-go"
)

type HeartbeatEvent struct {
    DeviceID  string    `json:"device_id"`
    IP        string    `json:"ip"`
    Timestamp time.Time `json:"timestamp"`
}

var writer = &kafka.Writer{
    Addr:  kafka.TCP("kafka1:9092"),
    Topic: "device.heartbeat",
    // kafka.Hash{} 保证同一设备的心跳路由到同一 Partition,便于顺序消费
    Balancer: &kafka.Hash{},
    // 心跳消息对可靠性要求略低,允许偶尔丢失(网络抖动导致的)
    // 使用 RequireOne 而非 RequireAll,降低发送延迟
    // 因为即使少发一两条心跳,只要下一条在 TTL 内到达,设备仍被认为在线
    RequiredAcks: kafka.RequireOne,
    // Async=true:异步发送,心跳不阻塞主业务流程
    // 发送失败由 Completion 回调处理,或直接忽略(下次心跳会补偿)
    Async: true,
}

// StartHeartbeat 启动设备心跳,每 30 秒发送一次
// 心跳间隔(30s)必须小于 Redis TTL(90s)的 1/2,留足余量应对网络延迟
func StartHeartbeat(ctx context.Context, deviceID, ip string) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return // 收到停止信号,优雅退出
        case <-ticker.C:
            event := HeartbeatEvent{
                DeviceID:  deviceID,
                IP:        ip,
                Timestamp: time.Now(),
            }
            payload, _ := json.Marshal(event)
            writer.WriteMessages(ctx, kafka.Message{
                Key:   []byte(deviceID), // 用 DeviceID 作为 Key,保证同设备路由一致
                Value: payload,
            })
        }
    }
}
// ========== 心跳消费者:刷新 Redis TTL ==========

package monitor

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
    "github.com/segmentio/kafka-go"
)

const (
    // HeartbeatTTL:Redis Key 的存活时长
    // 设计原则:TTL = 心跳间隔 × 3
    // 心跳间隔 30s,TTL = 90s,允许连续丢失 2 次心跳后才判定为离线
    // 这样可以容忍短暂的网络抖动,避免误报
    HeartbeatTTL = 90 * time.Second
)

func StartHeartbeatConsumer(ctx context.Context, rdb *redis.Client) {
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"kafka1:9092"},
        Topic:   "device.heartbeat",
        GroupID: "device-monitor-group",
        // 心跳场景要求低延迟,MinBytes=1 有消息立即消费
        MinBytes: 1,
        MaxBytes: 10e6,
        MaxWait:  1 * time.Second,
    })
    defer r.Close()

    for {
        m, err := r.FetchMessage(ctx)
        if err != nil {
            continue
        }

        var event HeartbeatEvent
        if err := json.Unmarshal(m.Value, &event); err != nil {
            r.CommitMessages(ctx, m)
            continue
        }

        // 每次收到心跳就刷新 Redis 中这台设备的 Key,并重置 TTL
        // Set 会覆盖旧值并重置过期时间(比 Expire 命令更原子)
        // Key 格式:device:online:{deviceID},Value 存 IP 便于排查
        rdb.Set(ctx,
            "device:online:"+event.DeviceID,
            event.IP,
            HeartbeatTTL,
        )

        r.CommitMessages(ctx, m)
        log.Printf("心跳更新: device=%s ip=%s ttl=%v", event.DeviceID, event.IP, HeartbeatTTL)
    }
}
// ========== Redis Key 过期监听:触发离线告警 ==========

package monitor

import (
    "context"
    "log"
    "strings"

    "github.com/redis/go-redis/v9"
)

// StartOfflineListener 监听 Redis Key 过期事件,设备 Key 过期即代表设备离线
//
// 前置条件:需要在 Redis 配置中开启 Keyspace 通知
// redis.conf 中设置:notify-keyspace-events "KEx"
//   K = Keyspace 事件(以 __keyspace@<db>__ 为前缀)
//   E = Keyevent 事件(以 __keyevent@<db>__ 为前缀)
//   x = 过期事件(expired)
//
// 或者运行时执行:CONFIG SET notify-keyspace-events KEx
func StartOfflineListener(ctx context.Context, rdb *redis.Client) {
    // 订阅 db0 中所有 Key 的过期事件
    // 格式:__keyevent@{dbIndex}__:expired
    pubsub := rdb.PSubscribe(ctx, "__keyevent@0__:expired")
    defer pubsub.Close()

    log.Println("开始监听设备离线事件...")

    for msg := range pubsub.Channel() {
        // msg.Payload 是过期的 Key 名称,如 "device:online:device-001"
        key := msg.Payload

        // 只处理设备心跳相关的 Key,过滤其他业务的 Key
        if !strings.HasPrefix(key, "device:online:") {
            continue
        }

        // 从 Key 中提取 DeviceID
        deviceID := strings.TrimPrefix(key, "device:online:")
        log.Printf("⚠️  设备离线: %s", deviceID)

        // 异步发送告警,不阻塞监听循环
        go alertService.SendOfflineAlert(ctx, deviceID)
    }
}

方案五:死信队列(DLQ)兜底

无论哪种方案,消费失败的消息都需要有去处,不能阻塞主流程。

流程图

  主 Topic
     │
     ↓
  Consumer 处理
     │
     ├── 成功 ──→ CommitOffset,结束
     │
     └── 失败
           │
           ├── 可重试(网络抖动等)──→ 指数退避重试(1s → 2s → 4s,最多 3 次)
           │                                │
           │                                └── 仍失败 ──→ 写入 DLQ Topic
           │
           └── 不可重试(数据格式错误等)──────→ 写入 DLQ Topic
                                                       │
                                               ┌───────┴────────┐
                                               │   DLQ Consumer  │
                                               │  人工审查 / 修复  │
                                               │  修复后重新投递   │
                                               └────────────────┘

Go 实现

package consumer

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// dlqWriter 专门用于写入死信队列
// 死信队列本身要求高可靠,必须用 RequireAll
var dlqWriter = &kafka.Writer{
    Addr:         kafka.TCP("kafka1:9092"),
    RequiredAcks: kafka.RequireAll,
}

// NonRetryableError 表示不可重试的错误(如消息格式错误、业务逻辑校验失败)
// 遇到这类错误继续重试没有意义,应该直接进 DLQ 等人工处理
type NonRetryableError struct{ msg string }
func (e NonRetryableError) Error() string { return e.msg }

// processWithRetry 带重试和 DLQ 的消息处理入口
// handler:真正的业务处理函数,由调用方传入
func processWithRetry(ctx context.Context, msg kafka.Message, handler func(kafka.Message) error) error {
    maxRetry := 3

    for i := 0; i < maxRetry; i++ {
        err := handler(msg)
        if err == nil {
            return nil // 处理成功,退出
        }

        // 判断是否为不可重试错误
        if _, ok := err.(NonRetryableError); ok {
            log.Printf("不可重试错误,直接发往 DLQ: %v", err)
            return sendToDLQ(ctx, msg, err)
        }

        if i < maxRetry-1 {
            // 指数退避:第 1 次失败等 1s,第 2 次等 2s,第 3 次等 4s
            // 避免瞬间大量重试打垮下游服务
            wait := time.Duration(1<<i) * time.Second
            log.Printf("第 %d 次重试,%v 后重试: %v", i+1, wait, err)
            select {
            case <-time.After(wait):
            case <-ctx.Done():
                return ctx.Err()
            }
        }
    }

    // 超出最大重试次数,发往 DLQ
    log.Printf("超出最大重试次数(%d),发往 DLQ", maxRetry)
    return sendToDLQ(ctx, msg, fmt.Errorf("超出最大重试次数"))
}

// sendToDLQ 将失败消息发送到死信队列 Topic
// DLQ Topic 命名规范:原 Topic 名 + ".DLQ"
// 例如:order.order.paid → order.order.paid.DLQ
func sendToDLQ(ctx context.Context, msg kafka.Message, reason error) error {
    dlqTopic := msg.Topic + ".DLQ"

    // 将原始消息的 Header 带过去,并追加失败信息,便于排查
    headers := append(msg.Headers,
        // 记录消息来自哪个 Topic,方便 DLQ 消费者知道从哪里来的
        kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
        // 记录失败原因,人工查看时可以快速定位问题
        kafka.Header{Key: "fail-reason", Value: []byte(reason.Error())},
        // 记录失败时间,便于判断消息已经在队列里积压了多久
        kafka.Header{Key: "fail-time", Value: []byte(time.Now().Format(time.RFC3339))},
        // 记录原始 Partition 和 Offset,便于追溯
        kafka.Header{Key: "original-partition", Value: []byte(fmt.Sprintf("%d", msg.Partition))},
        kafka.Header{Key: "original-offset", Value: []byte(fmt.Sprintf("%d", msg.Offset))},
    )

    return dlqWriter.WriteMessages(ctx, kafka.Message{
        Topic:   dlqTopic,
        Key:     msg.Key,   // 保留原始 Key,便于路由和排查
        Value:   msg.Value, // 保留原始消息体,便于人工重新投递
        Headers: headers,
    })
}

方案对比总结

定时任务场景Kafka 替换方案实时性复杂度
状态变更触发下游事件驱动(方案一)毫秒级
N 分钟后执行某操作延迟消息 + 时间轮(方案二)秒级
定时批量统计报表滚动窗口聚合(方案三)分钟级
心跳 / 超时检测Kafka + Redis TTL(方案四)秒级
失败补偿 / 兜底死信队列 DLQ(方案五)人工介入

不适合替换的场景

核心判断标准:
触发条件是"某件事发生了"→ 用 Kafka 事件驱动
触发条件是"到了某个时间点"→ Cron 定时任务仍是最简选择,不要过度设计

golang

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2026年04月07日 15:39
1

目录

来自 《使用 Kafka 替换定时任务的架构方案》