Karp 的技术博客

在落地 Pulsar 的过程中,开发者最容易踩的坑集中在两个问题上:

问题一:"我有多个消费者,消息是怎么分发的?顺序能保证吗?"

问题二:"消费者处理业务逻辑很慢,会不会把整个消费流程阻塞住?"

这两个问题的答案,分别对应 Pulsar 的订阅模式(Subscription Type)消费模型(Consumption Pattern)

本文基于 Go 语言,通过完整可运行的代码,逐一拆解四种订阅模式,并给出生产环境中解决阻塞问题的异步消费方案。


环境准备

# 1. 启动本地 Pulsar(Docker)
docker run -d --name pulsar \
  -p 6650:6650 \
  apachepulsar/pulsar:latest \
  bin/pulsar standalone

# 2. 初始化 Go 项目
go mod init pulsar-demo
go get github.com/apache/pulsar-client-go/pulsar

# 3. 运行 Demo
go run pulsar_subscription_demo.go

一、基础概念:订阅模式是什么?

Pulsar 的订阅(Subscription) 是消费者与 Topic 之间的绑定关系,由三个要素确定:

Topic  ──────────────────→  Subscription
                              │
                              ├─ SubscriptionName(订阅名,唯一标识)
                              ├─ Type(订阅模式,决定分发规则)
                              └─ Cursor(消费进度,记录哪些消息已 Ack)

订阅模式在 Consumer 订阅时声明,一旦该订阅名创建,模式不可更改。

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "persistent://public/default/my-topic",
    SubscriptionName: "my-sub",
    Type:             pulsar.KeyShared,  // ← 在这里声明,只此一次
})

Pulsar 提供四种订阅模式,覆盖从"严格有序"到"最大吞吐"的所有需求:

严格有序 ◄─────────────────────────────────► 最大吞吐
    │                                           │
 Exclusive     Failover      Key_Shared      Shared
 (独占)      (主备)      (Key路由)     (共享)

二、Topic 命名规则

在看代码之前,先理解 Pulsar Topic 的完整命名格式:

persistent://  public  /  default  /  my-topic
─────────────  ──────     ───────     ────────
存储类型         租户       命名空间     Topic名称

persistent     = 消息持久化到磁盘(BookKeeper)
non-persistent = 消息不落盘,Broker 重启即丢失
public         = 默认租户(内置)
default        = 默认命名空间(内置)

三层命名空间(租户 / 命名空间 / Topic)正是 Pulsar 原生多租户的基础,每一层都可以独立配置权限、配额和存储策略。


三、创建客户端连接

客户端是整个应用的单例,一个进程通常只需要一个 Client。

func newClient() pulsar.Client {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        // Broker 地址,格式 pulsar://host:6650
        // 生产环境通常是 pulsar+ssl://host:6651(开启 TLS)
        URL: "pulsar://localhost:6650",

        // 单次操作超时(Send / Receive 超时后返回 error,而非永久阻塞)
        OperationTimeout: 30 * time.Second,

        // TCP 连接建立超时(网络不通时快速失败,而非等待系统默认超时)
        ConnectionTimeout: 30 * time.Second,

        // 生产环境建议补充:
        // Authentication: pulsar.NewAuthenticationToken("eyJ..."),  // Token 认证
        // TLSOptions: &pulsar.TLSOptions{InsecureSkipVerify: false}, // TLS 加密
    })
    if err != nil {
        log.Fatalf("创建 Pulsar 客户端失败: %v", err)
    }
    return client
}

四、Producer:发送消息

4.1 普通消息(无 Key)

func produceMessages(client pulsar.Client, topic string, messages []string) {
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: topic,

        // Pulsar 默认开启批量发送(Batching):
        // 将 10ms 内的多条消息合并成一批发给 Broker,显著提升吞吐量。
        // 若需要严格逐条确认(如金融场景),可关闭:
        // EnableBatching: false,

        // 生产环境建议开启压缩,降低网络带宽:
        // CompressionType: pulsar.LZ4,   // 压缩率低,速度快
        // CompressionType: pulsar.ZSTD,  // 压缩率高,CPU 消耗略高
    })
    if err != nil {
        log.Fatalf("创建 Producer 失败: %v", err)
    }
    defer producer.Close() // 关闭时会刷新缓冲区,确保所有消息发出

    for _, msg := range messages {
        // Send() 同步发送:阻塞直到 Broker 确认写入 BookKeeper 多数派副本
        // 返回的 MsgID 是消息的唯一标识,格式:(LedgerID, EntryID, BatchIndex)
        // 可用于日志追踪、消息回溯、审计
        msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
            Payload: []byte(msg), // 消息体是原始字节,序列化格式由业务自定(JSON/Protobuf等)
        })
        if err != nil {
            log.Printf("发送失败: %v", err)
        } else {
            fmt.Printf("[Producer] 发送: %-20s → MsgID: %v\n", msg, msgID)
        }
    }
}

4.2 带 Key 的消息(Key_Shared 模式专用)

func produceMessagesWithKey(client pulsar.Client, topic string, messages []struct{ key, value string }) {
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: topic,
    })
    if err != nil {
        log.Fatalf("创建 Producer 失败: %v", err)
    }
    defer producer.Close()

    for _, m := range messages {
        _, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
            // Key 决定消息路由到哪个 Consumer
            // 路由规则:ConsumerIndex = hash(Key) % ActiveConsumerCount
            // 同一个 Key → 始终路由到同一个 Consumer → 该 Key 下消息严格有序
            //
            // 典型 Key 设计:
            //   用户ID  → 同一用户的操作按序处理
            //   订单ID  → 同一订单的状态变更按序处理
            //   设备ID  → 同一 IoT 设备的数据按序入库
            Key:     m.key,
            Payload: []byte(m.value),
        })
        if err != nil {
            log.Printf("发送失败: %v", err)
        } else {
            fmt.Printf("[Producer] Key=%-10s  Value=%s\n", m.key, m.value)
        }
    }
}

五、四种订阅模式详解

5.1 Exclusive —— 独占订阅

核心特点: 同一订阅名在同一时刻只允许 1 个 Consumer 连接,第二个会被 Broker 直接拒绝。

Broker
  msg-1 ──────────────→ Consumer-1(唯一的 Active Consumer)
  msg-2 ──────────────→ Consumer-1
  msg-3 ──────────────→ Consumer-1
                         ↑
              Consumer-2 尝试连接 → 被 Broker 拒绝(ConsumerBusy)

消息顺序:✅ 严格全局有序

func demoExclusive() {
    client := newClient()
    defer client.Close()

    topic := "persistent://public/default/demo-exclusive"

    // 第一个 Consumer:订阅成功,成为唯一的 Active Consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            topic,
        SubscriptionName: "exclusive-sub",
        Type:             pulsar.Exclusive, // 独占模式

        // ReceiverQueueSize:Consumer 本地预拉取队列大小(默认 1000)
        // Pulsar 提前从 Broker 拉取消息缓存到本地内存,减少每次 Receive() 的网络延迟。
        // 设太大:内存占用高;设太小:网络往返频繁。
        // 业务处理快(<10ms)可设大(5000+);业务处理慢可设小(100~500)。
        ReceiverQueueSize: 100,
    })
    if err != nil {
        log.Fatalf("订阅失败: %v", err)
    }
    defer consumer.Close()

    // 第二个 Consumer:尝试以相同订阅名连接,会被 Broker 拒绝
    _, err2 := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            topic,
        SubscriptionName: "exclusive-sub", // 同一订阅名 → 必然冲突
        Type:             pulsar.Exclusive,
    })
    if err2 != nil {
        // 这是预期行为,说明 Exclusive 语义生效
        fmt.Printf("[预期行为] 第二个 Consumer 被拒绝: %v\n", err2)
    }

    go produceMessages(client, topic, []string{"msg-1", "msg-2", "msg-3"})
    time.Sleep(500 * time.Millisecond)

    for i := 0; i < 3; i++ {
        // WithTimeout 防止 Receive() 在没有消息时永久阻塞
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        msg, err := consumer.Receive(ctx) // 阻塞,直到有消息到达或超时
        cancel()
        if err != nil {
            break
        }

        fmt.Printf("[Consumer] 收到: %s\n", string(msg.Payload()))

        // ⚠️ 黄金法则:先执行业务逻辑,成功后再 Ack
        // 顺序不能颠倒!先 Ack 再处理,业务失败后消息永久丢失
        time.Sleep(100 * time.Millisecond) // 模拟业务处理(写DB、调接口等)

        // Ack:通知 Broker 该消息已成功处理,Cursor 可以推进
        // Ack 后 Broker 不会再投递这条消息
        consumer.Ack(msg)
        fmt.Printf("[Consumer] Ack: %s\n", string(msg.Payload()))
    }
}

适用场景:

✅ 适合❌ 不适合
开发测试环境生产高可用(宕机无法切换)
严格单消费者 + 全局有序需要水平扩展消费能力
业务逻辑简单,处理速度快处理慢,容易积压

5.2 Failover —— 主备订阅

核心特点: 多个 Consumer 可以连接同一订阅,但同一时刻只有主(Active)Consumer 接收消息。主 Consumer 断开后,备(Standby)Consumer 自动升级为 Active,无缝接管。

正常状态:
  msg-1 ──→ Consumer-1(Active)
  msg-2 ──→ Consumer-1
             Consumer-2(Standby,保持连接,等待接管)

Consumer-1 宕机:
  Broker 检测到心跳超时(约 30s,可配置)
             ↓
  Consumer-2 自动升为 Active
  msg-3 ──→ Consumer-2(从上次 Cursor 位置继续,不丢消息)

主备选举规则: Pulsar 按 Consumer 连接顺序决定优先级,先连接的优先级高。

消息顺序:✅ 严格全局有序

func demoFailover() {
    client := newClient()
    defer client.Close()

    topic := "persistent://public/default/demo-failover"

    // 主消费者:第一个连接,自动成为 Active
    primaryConsumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            topic,
        SubscriptionName: "failover-sub",
        Type:             pulsar.Failover,
    })
    if err != nil {
        log.Fatalf("主Consumer订阅失败: %v", err)
    }
    defer primaryConsumer.Close()
    fmt.Println("[主Consumer] Active,正在接收消息")

    // 备用消费者:第二个连接,自动成为 Standby
    // 与 Broker 保持心跳连接,随时准备接管
    // 注意:必须使用相同的 SubscriptionName
    standbyConsumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            topic,
        SubscriptionName: "failover-sub", // 同一订阅名
        Type:             pulsar.Failover,
    })
    if err != nil {
        log.Fatalf("备Consumer订阅失败: %v", err)
    }
    defer standbyConsumer.Close()
    fmt.Println("[备Consumer] Standby,等待主Consumer故障")
    fmt.Println("[说明] 主Consumer宕机 → 备Consumer自动接管 → 从Cursor断点继续 → 不丢消息")

    go produceMessages(client, topic, []string{"msg-1", "msg-2", "msg-3"})
    time.Sleep(500 * time.Millisecond)

    for i := 0; i < 3; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        msg, err := primaryConsumer.Receive(ctx)
        cancel()
        if err != nil {
            break
        }
        fmt.Printf("[主Consumer] 处理: %s\n", string(msg.Payload()))

        // 若主Consumer在 Ack 前宕机:
        // 备Consumer接管后,Broker 会重新投递这条未 Ack 的消息(At-Least-Once)
        primaryConsumer.Ack(msg)
    }
}

与 Exclusive 的区别:

维度ExclusiveFailover
第二个 Consumer❌ 被拒绝✅ 可连接(Standby)
故障切换❌ 无法切换✅ 自动切换(毫秒级)
消费顺序✅ 严格有序✅ 严格有序
适合场景简单单消费者高可用 + 严格有序

5.3 Shared —— 共享订阅

核心特点: 多个 Consumer 同时连接,消息以 Round-Robin(轮询)方式分发,每条消息只投递给一个 Consumer。吞吐量最高,但不保证顺序。

Broker(9条消息,3个Consumer轮询):
  msg-1 ──→ Consumer-1(50ms 处理)
  msg-2 ──→ Consumer-2(100ms 处理)
  msg-3 ──→ Consumer-3(150ms 处理)
  msg-4 ──→ Consumer-1
  msg-5 ──→ Consumer-2
  ...

完成顺序(按处理速度):
  msg-1 ✅ (50ms)
  msg-4 ✅ (100ms)
  msg-2 ✅ (150ms)  ← 乱序!
  ...

消息顺序:❌ 不保证

Individual Ack(Pulsar 核心优势):

Pulsar Cursor(位图):
  msg-1 ✅  msg-2 ❌  msg-3 ✅  msg-4 ✅  msg-5 ❌
                ↑                              ↑
         只重投这两条,其余不受影响

Kafka Offset(线性):
  offset-1 ✅  offset-2 ❌  offset-3 ✅  offset-4 ✅
                    ↑
     必须等这里提交,否则重启从 offset-2 开始重放
     导致 offset-3、offset-4 被重复消费!
func demoShared() {
    client := newClient()
    defer client.Close()

    topic := "persistent://public/default/demo-shared"

    var wg sync.WaitGroup

    // 启动 3 个并行 Consumer,模拟水平扩展
    // 水平扩展只需启动更多 Consumer 实例,Broker 自动均衡分发,无需重启或 Rebalance
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        consumerID := i

        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            topic,
            SubscriptionName: "shared-sub",  // 同一订阅名,Shared 模式允许多 Consumer 并存
            Type:             pulsar.Shared,

            // Broker 根据 Consumer 本地队列的剩余容量推送消息(背压机制)
            // 队列满 → Broker 暂停推送 → Consumer 处理完腾出空间 → 继续推送
            // 这是 Pulsar 的流控机制(Flow Control),防止 Consumer OOM
            ReceiverQueueSize: 100,
        })
        if err != nil {
            log.Printf("Consumer-%d 订阅失败: %v", consumerID, err)
            wg.Done()
            continue
        }

        go func(c pulsar.Consumer, id int) {
            defer wg.Done()
            defer c.Close()

            for {
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
                msg, err := c.Receive(ctx)
                cancel()
                if err != nil {
                    return // 超时,没有更多消息,退出循环
                }

                // 模拟不同 Consumer 处理速度不同(演示乱序)
                processingTime := time.Duration(id*50) * time.Millisecond
                time.Sleep(processingTime)

                fmt.Printf("[Consumer-%d] 收到: %-12s  耗时: %dms\n",
                    id, string(msg.Payload()), processingTime.Milliseconds())

                // Shared 模式下每条消息独立 Ack(Individual Ack)
                // msg-2 未 Ack 不会影响 msg-1、msg-3 的 Ack 和 Cursor 推进
                c.Ack(msg)
            }
        }(consumer, consumerID)
    }

    time.Sleep(300 * time.Millisecond)

    // 生产 9 条消息,预期被 3 个 Consumer 轮询分摊(约各 3 条)
    produceMessages(client, topic, []string{
        "task-1", "task-2", "task-3",
        "task-4", "task-5", "task-6",
        "task-7", "task-8", "task-9",
    })

    wg.Wait()
    // 观察输出:完成顺序不等于发送顺序(乱序是正常的)
}

适用场景:

  • 日志收集、监控告警(量大、顺序无关)
  • 任务队列:发邮件、发短信、图片压缩等幂等任务
  • 需要弹性水平扩展的通用队列
⚠️ 注意: Shared 模式下任务必须幂等设计,因为 Consumer 宕机时未 Ack 的消息会重新投递,可能导致重复处理。

5.4 Key_Shared —— Key 路由订阅 ⭐ 生产最推荐

核心特点: 融合 Shared(高并发)和 Failover(有序)的优点。同一个 Key 的消息固定路由到同一个 Consumer(该 Key 下严格有序),不同 Key 的消息分发到不同 Consumer(并行处理)。

路由规则:ConsumerIndex = hash(MessageKey) % ActiveConsumerCount

Broker(3个Consumer,3个用户的订单事件):
  user-A/下单 ──→ Consumer-1  ┐
  user-A/支付 ──→ Consumer-1  ├── user-A 严格有序:下单→支付→发货
  user-A/发货 ──→ Consumer-1  ┘

  user-B/下单 ──→ Consumer-2  ┐
  user-B/支付 ──→ Consumer-2  ├── user-B 严格有序:下单→支付→发货
  user-B/发货 ──→ Consumer-2  ┘

  user-C/下单 ──→ Consumer-3  ┐
  user-C/支付 ──→ Consumer-3  ├── user-C 严格有序:下单→支付→发货
  user-C/发货 ──→ Consumer-3  ┘

三个用户并行处理,互不等待 → 高吞吐 + 有序
func demoKeyShared() {
    client := newClient()
    defer client.Close()

    topic := "persistent://public/default/demo-key-shared"

    var wg sync.WaitGroup

    // 启动 3 个 Consumer(Key_Shared 模式)
    // Broker 将不同的 Key 哈希分配到这 3 个 Consumer
    // 当 Consumer 数量变化时,Pulsar 自动重新分配 Key → Consumer 的映射(无需重启)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        consumerID := i + 1

        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            topic,
            SubscriptionName: "key-shared-sub",
            Type:             pulsar.KeyShared, // Key 路由模式

            // ⚠️ Key_Shared 关键配置:
            // 若某个 Consumer 有消息长时间未 Ack,该 Consumer 负责的所有 Key 后续消息都会阻塞!
            // 原因:为保证 Key 内有序,下一条同 Key 消息必须等上一条 Ack 后才能投递。
            // 解决方案:配置 AckTimeout,超时自动重投,避免永久阻塞整个 Key 的消费链路。
            // AckTimeout:      30 * time.Second,
            // AckTimeoutTimer: 1 * time.Second,
        })
        if err != nil {
            log.Printf("Consumer-%d 订阅失败: %v", consumerID, err)
            wg.Done()
            continue
        }

        go func(c pulsar.Consumer, id int) {
            defer wg.Done()
            defer c.Close()

            for {
                ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
                msg, err := c.Receive(ctx)
                cancel()
                if err != nil {
                    return
                }

                // msg.Key() 返回消息的路由 Key
                // 关键观察:同一个 Key(如 user-A)始终由同一个 Consumer 处理
                fmt.Printf("[Consumer-%d] Key=%-10s  Value=%-6s  ← 同Key消息在此Consumer内严格有序\n",
                    id, msg.Key(), string(msg.Payload()))

                time.Sleep(50 * time.Millisecond) // 模拟业务处理

                // Ack 之后,Broker 才会将同 Key 的下一条消息投递过来
                c.Ack(msg)
            }
        }(consumer, consumerID)
    }

    time.Sleep(300 * time.Millisecond)

    // 生产三个用户的订单事件(模拟乱序到达的现实场景)
    // 预期:每个用户的事件在其对应 Consumer 内严格按序处理
    produceMessagesWithKey(client, topic, []struct{ key, value string }{
        {"user-A", "下单"},
        {"user-B", "下单"},
        {"user-A", "支付"},  // user-A:必须在 下单 之后处理
        {"user-C", "下单"},
        {"user-B", "支付"},
        {"user-A", "发货"},  // user-A:必须在 支付 之后处理
        {"user-C", "支付"},
        {"user-B", "发货"},
        {"user-C", "发货"},
    })

    wg.Wait()
}

与 Kafka 分区方案的对比:

维度Kafka 分区Pulsar Key_Shared
同 Key 有序
并行消费✅(受分区数限制)✅(不受限)
扩容方式增加分区 → Rebalance(有停顿)增加 Consumer → 自动重路由(无停顿)
Consumer 数 > 分区数❌ 有 Consumer 空闲✅ 所有 Consumer 均可工作

六、Ack 机制深度解析

6.1 三种 Ack 操作

// ✅ 正常 Ack:业务成功,消息处理完毕,Cursor 推进
consumer.Ack(msg)

// ❌ Negative Ack:业务失败,主动要求重新投递
// Broker 在 negativeAckRedeliveryDelay 后重新投递(默认 1 分钟)
consumer.NegativeAcknowledge(msg)

// ⏱️ Ack 超时兜底:若超过 ackTimeout 未 Ack,Broker 自动重新投递
// 在 ConsumerOptions 中配置:
// AckTimeout: 30 * time.Second,

6.2 Ack 的黄金法则

❌ 错误顺序(先 Ack 再处理):
   Receive → Ack → 业务逻辑失败 → 消息永久丢失(无法找回)

✅ 正确顺序(先处理再 Ack):
   Receive → 业务逻辑 → 成功 → Ack
                       → 失败 → NegativeAcknowledge(触发重投)

6.3 Pulsar Cursor vs Kafka Offset

Kafka(线性 Offset):
  [✅1][✅2][❌3][✅4][✅5]
              ↑
  Offset 卡在 3,重启后从 3 开始重放
  4 和 5 被重复消费 → At-Least-Once 在这里很痛

Pulsar(位图 Cursor):
  [✅1][✅2][❌3][✅4][✅5]
              ↑
  只记录 3 未确认,重启后只重投 3
  4 和 5 不受影响 → 更精准的 At-Least-Once

七、异步消费:解决业务阻塞问题

7.1 问题:同步消费的吞吐量瓶颈

同步消费(单线程):

Consumer 线程
  ├─ Receive msg-1
  ├─ 业务逻辑(写 DB,耗时 2s)← 整个消费循环阻塞 2s
  ├─ Ack msg-1
  ├─ Receive msg-2
  ├─ 业务逻辑(2s)
  ├─ Ack msg-2
  ...

5 条消息串行处理:总耗时 = 5 × 2s = 10s

7.2 解决方案:Goroutine 池异步消费

func demoAsyncConsume() {
    client := newClient()
    defer client.Close()

    consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://public/default/demo-async",
        SubscriptionName: "async-sub",
        Type:             pulsar.Shared,

        // ReceiverQueueSize 设大:
        // 主循环 Receive() 从本地内存队列取消息(无网络 I/O,极快)
        // Goroutine 池满时主循环短暂阻塞,本地队列依然在接收 Broker 推送的消息
        // 避免 Broker 端因 Consumer 无响应而减少推送速率
        ReceiverQueueSize: 1000,
    })
    defer consumer.Close()

    // Semaphore Channel:Go 惯用的"有界并发"实现
    // 容量 = 最大并发 Goroutine 数(根据业务特性调整)
    // CPU 密集型任务:goroutine 数 ≈ CPU 核心数
    // IO 密集型任务:goroutine 数可设较大(如 50~200),让 CPU 在等待 IO 时处理其他任务
    sem := make(chan struct{}, 10)

    go produceMessages(client, consumer.Topic(), []string{
        "task-1", "task-2", "task-3", "task-4", "task-5",
    })
    time.Sleep(300 * time.Millisecond)

    startTime := time.Now()
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        msg, err := consumer.Receive(ctx) // 主循环:只取消息,极快
        cancel()
        if err != nil {
            break
        }

        wg.Add(1)
        sem <- struct{}{} // 占槽;若 10 个 Goroutine 都在运行,这里短暂阻塞(自然背压)

        // 立刻启动 Goroutine,主循环不等待,马上 Receive() 下一条
        go func(m pulsar.Message) {
            defer wg.Done()
            defer func() { <-sem }() // 释放槽,让主循环可以继续

            fmt.Printf("[Goroutine] 开始处理: %s  (t=%s)\n",
                string(m.Payload()), time.Since(startTime).Round(time.Millisecond))

            // 耗时业务逻辑(调外部接口、写 DB 等)
            // 这 2 秒内,主循环已经在取下一条消息并启动新 Goroutine 了
            time.Sleep(2 * time.Second)

            // ✅ 业务成功 → Ack
            consumer.Ack(m)

            // ❌ 业务失败时改为:
            // consumer.NegativeAcknowledge(m)

            fmt.Printf("[Goroutine] Ack 完成: %s  (t=%s)\n",
                string(m.Payload()), time.Since(startTime).Round(time.Millisecond))
        }(msg)
    }

    wg.Wait()
    fmt.Printf("全部完成,总耗时: %s(同步需 ~10s)\n",
        time.Since(startTime).Round(time.Millisecond))
}

效果对比(5条消息,每条业务耗时 2 秒):

同步消费:msg-1(2s) → msg-2(2s) → msg-3(2s) → msg-4(2s) → msg-5(2s) = 10s

异步消费:
  t=0ms   → 取 msg-1,启动 Goroutine-1
  t=1ms   → 取 msg-2,启动 Goroutine-2
  t=2ms   → 取 msg-3,启动 Goroutine-3
  t=3ms   → 取 msg-4,启动 Goroutine-4
  t=4ms   → 取 msg-5,启动 Goroutine-5
  t=2004ms → 所有 Goroutine 完成 ≈ 2s ✅

八、各模式横向对比

维度ExclusiveFailoverSharedKey_Shared
并发 Consumer 数1多(主+备)
全局消息顺序
Key 内消息顺序
自动故障转移
水平扩展
吞吐量最高
典型场景测试/简单消费金融账务日志/任务队列订单/用户流

九、选型决策树

需要消费者高可用(故障自动切换)?
├─ 否 → Exclusive(简单场景/测试)
└─ 是
    └─ 需要严格全局顺序?
        ├─ 是 → Failover(金融账务、配置变更)
        └─ 否
            └─ 需要 per-Key 顺序(同一实体的消息有序)?
                ├─ 是 → Key_Shared ⭐(订单、用户行为、IoT 设备)
                └─ 否 → Shared(日志、告警、幂等任务队列)

十、生产环境配置建议

Consumer 关键参数

consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "persistent://tenant/ns/topic",
    SubscriptionName: "prod-sub",
    Type:             pulsar.KeyShared,

    // 预拉取队列:根据业务处理速度调整
    // 处理快(<5ms):设 5000+;处理慢(>100ms):设 100~500
    ReceiverQueueSize: 1000,

    // Ack 超时:防止 Consumer 卡死导致消息积压
    // Key_Shared 模式下尤其重要
    AckTimeout:      30 * time.Second,
    AckTimeoutTimer: 1 * time.Second,

    // Negative Ack 重投延迟:失败后多久重试(默认 1 分钟)
    // 防止频繁重试打爆下游服务
    NackRedeliveryDelay: 5 * time.Second,

    // 死信队列:消息重试超过次数后发送到 DLQ,避免阻塞正常消费
    DLQ: &pulsar.DLQPolicy{
        MaxDeliveries:   3,                  // 最多重试 3 次
        DeadLetterTopic: "my-topic-DLQ",     // 死信 Topic
    },
})

Producer 关键参数

producer, _ := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           "persistent://tenant/ns/topic",
    CompressionType: pulsar.LZ4,             // 开启压缩
    BatchingMaxPublishDelay: 10 * time.Millisecond, // 批量发送窗口

    // 开启幂等发送(防止网络重试导致重复写入)
    // 结合事务 API 可实现端到端 Exactly-Once
})

总结

Pulsar 订阅模式的设计哲学是:把"顺序"和"吞吐"的权衡暴露给开发者,而不是在系统内部做妥协。

  • 需要有序:Exclusive 或 Failover,系统保证全局顺序
  • 需要吞吐:Shared,牺牲顺序换取最大并发
  • 两者都要:Key_Shared,在 Key 粒度上有序,跨 Key 并行

消费模型上,同步消费适合入门,异步消费(Goroutine 池)才是生产标准。Ack 必须在业务逻辑成功后发出,这是 At-Least-Once 语义的基石。


参考资料

golang

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2026年05月25日 08:37
1

目录

来自 《Apache Pulsar 订阅模式与消费模型深度解析(Go 实战)》