Karp 的技术博客
写在前面:我是个后端开发,用过 Redis 的 Pub/Sub,听说过 Kafka,但从来没有认真用过消息队列。这篇文章记录我第一次接触 Apache Pulsar 的全过程,踩了哪些坑,怎么理解那些概念,希望对同样是新手的你有用。

为什么会接触 Pulsar?

项目里有个需求:多个服务之间要传消息,而且要可靠——发出去的消息不能丢,消费失败了要能重试,还要支持多个服务同时消费同一条消息。

Redis Pub/Sub 太轻量,消费者不在线时消息就丢了。Kafka 听起来很重,配置复杂。同事推荐了 Pulsar,说"功能全,概念清晰,适合现在的规模"。

于是就开始学了。


第一步:先搞清楚它是干什么的

在看文档之前,我先问自己:消息队列到底解决什么问题?

想象一个外卖平台:用户下单之后,要通知厨房备餐、通知骑手接单、通知财务记账……如果下单服务直接调用这三个服务的接口,任何一个挂掉都会导致下单失败。

消息队列的思路是:下单服务只管把"有人下单了"这条消息丢进队列,其他服务各自来取,互不影响。

这就是解耦。Pulsar 就是这样一个"消息中转站"。


第二步:启动一个 Pulsar 玩玩

文档里有很多部署方式,新手直接用 Docker,一行命令搞定:

docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  --name pulsar \
  apachepulsar/pulsar:3.3.0 \
  bin/pulsar standalone

看到这行日志说明成功了:

INFO  - messaging service is ready

两个端口的作用:

  • 6650:客户端连接 Pulsar 用这个(发消息、收消息)
  • 8080:管理后台,可以用浏览器或 curl 查状态

验证一下是否跑起来了:

curl http://localhost:8080/admin/v2/brokers/healthcheck
# 返回 "ok" 就没问题

第三步:搞懂几个概念(用人话解释)

刚开始看文档,一堆术语:Tenant、Namespace、Topic、Subscription、Broker、Bookie……我直接懵了。

后来我用一个比喻理解清楚了:

把 Pulsar 想象成一个大型邮局
Pulsar 概念邮局比喻
Tenant(租户)邮局里不同的企业客户(比如京东、淘宝分别租了一块地方)
Namespace(命名空间)每个企业客户内部划分的业务区域(京东的"订单组"、"物流组")
Topic具体的一个邮箱/信箱("订单创建"信箱)
Producer往信箱里投信的人(寄件方)
Consumer从信箱里取信的人(收件方)
Subscription取信的方式/协议(谁可以取、怎么取)
Broker邮局前台,负责收发调度,自己不存信
Bookie后台仓库,真正存放信件的地方

Topic 的完整格式

Pulsar 的 Topic 名字有固定格式,刚开始总是搞错:

persistent://public/default/my-topic
│            │      │       │
│            │      │       └── Topic 名称(你自己起的)
│            │      └────────── Namespace(命名空间)
│            └───────────────── Tenant(租户)
└────────────────────────────── 消息是否持久化

persistent 意味着消息写到磁盘,服务重启消息不丢失。还有 non-persistent,消息只在内存里,速度快但不可靠。

新手建议:一开始就用 persistent://public/default/你的topic名 这个格式,public/default 是 Pulsar 默认自带的,不用额外创建。


第四步:用命令行发第一条消息

Pulsar 自带命令行工具,可以不写代码直接发消息:

# 发消息(进入 Docker 容器里执行)
docker exec -it pulsar \
  bin/pulsar-client produce persistent://public/default/hello-pulsar \
  --messages "我的第一条消息"

然后开另一个终端接收:

docker exec -it pulsar \
  bin/pulsar-client consume persistent://public/default/hello-pulsar \
  --subscription-name my-first-sub \
  --num-messages 0

--num-messages 0 的意思是一直监听,不自动退出。

看到下面这样的输出就说明消息收到了:

----- got message -----
value : 我的第一条消息

成就感满满! 这时候我大概知道 Pulsar 是怎么工作的了。


第五步:用 Go 代码实现生产者和消费者

光靠命令行不够,我们要在代码里用。我用的是 Go,先安装客户端:

go get github.com/apache/pulsar-client-go/pulsar

生产者:发消息

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    // 第一步:建立连接
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatal("连接失败:", err)
    }
    defer client.Close()

    // 第二步:创建 Producer
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        Topic: "persistent://public/default/hello-pulsar",
    })
    if err != nil {
        log.Fatal("创建 Producer 失败:", err)
    }
    defer producer.Close()

    // 第三步:发消息
    msgID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
        Payload: []byte("Hello from Go!"),
    })
    if err != nil {
        log.Fatal("发送失败:", err)
    }

    fmt.Printf("消息发送成功,消息 ID: %v\n", msgID)
}

消费者:收消息

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // 创建 Consumer,需要指定订阅名
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "persistent://public/default/hello-pulsar",
        SubscriptionName: "my-go-sub",  // 订阅名,同名的 Consumer 共享消费进度
        Type:             pulsar.Shared, // 订阅类型,先用 Shared
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    fmt.Println("开始监听消息...")

    for {
        // Receive 是阻塞的,没消息时会一直等
        msg, err := consumer.Receive(context.Background())
        if err != nil {
            log.Printf("接收出错: %v", err)
            continue
        }

        fmt.Printf("收到消息: %s\n", string(msg.Payload()))

        // 处理完一定要 Ack!否则消息会被反复投递
        consumer.Ack(msg)
    }
}

⚠️ 新手最容易忘的一件事:Ack

Pulsar 默认不会自动确认消息。你处理完之后必须调用 consumer.Ack(msg),告诉 Pulsar "这条我处理好了"。如果不 Ack,Pulsar 认为你没处理成功,下次还会再发给你,就出现重复消费了。


第六步:搞懂订阅类型(这里卡了我挺久的)

第一次看文档说有四种订阅类型,我没搞清楚有什么区别,全部实验了一遍才明白。

Exclusive(独占)

消息 A B C D E
       ↓
    Consumer 1(只有它一个在消费)

同一个订阅名下只允许一个 Consumer 连接。第二个 Consumer 试图连接时会报错。

适合:需要保证顺序,并且单个消费者就够用的场景。

Shared(共享)

消息 A B C D E F
    ↙    ↓    ↘
Consumer1 Consumer2 Consumer3

消息轮流分发给多个 Consumer,可以水平扩展。

缺点:不保证顺序,因为消息被分给不同的人处理。

适合:对顺序没要求,需要多个实例并行处理提高吞吐的场景。

Failover(灾备)

正常情况:
消息 A B C D E → Consumer 1(主)

Consumer 1 挂了:
消息 F G H I  → Consumer 2(自动接管)

平时只有一个主 Consumer 消费,主挂了之后自动切换到备用 Consumer,保证顺序

适合:需要顺序 + 高可用的场景。

Key_Shared(按 Key 分组共享)

消息(带 Key):
order-1: A C E  → Consumer 1(专门处理 order-1)
order-2: B D    → Consumer 2(专门处理 order-2)
order-3: F      → Consumer 3(专门处理 order-3)

相同 Key 的消息永远路由到同一个 Consumer,不同 Key 并行处理。

适合:按业务 ID 保证顺序 + 需要水平扩展(比如交易系统按订单 ID 路由)。


第七步:消息处理失败了怎么办?

这是我第一次没想到的问题:如果处理消息时报错了,该怎么办?

答案是用 Nack(Negative Acknowledge,否定确认):

msg, _ := consumer.Receive(context.Background())

err := processMessage(msg)
if err != nil {
    // 处理失败,告诉 Pulsar 重新投递
    consumer.Nack(msg)
    continue
}

// 处理成功
consumer.Ack(msg)

调用 Nack 之后,这条消息会在一段时间后重新投递给 Consumer。

但如果一条消息一直处理失败,会无限重试吗?不会。Pulsar 有死信队列(Dead Letter Topic)机制:

consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "persistent://public/default/orders",
    SubscriptionName: "order-processor",
    Type:             pulsar.Shared,
    DLQ: &pulsar.DLQPolicy{
        MaxDeliveries:   3,   // 最多重试 3 次
        // 超过 3 次后,消息自动转移到死信 Topic
        DeadLetterTopic: "persistent://public/default/orders-DLQ",
    },
})

重试超过 3 次后,消息会被扔进死信队列,你可以单独处理这些"问题消息",不影响正常流程。


我踩过的坑

坑 1:忘记 Ack,消息一直重复

现象:同一条消息被消费了好多次。

原因:consumer.Receive() 之后忘记调用 consumer.Ack(msg)

教训:Receive 和 Ack 要成对出现,处理完立刻 Ack。


坑 2:Topic 名字格式写错

写成了 hello-pulsar,忘记加前缀 persistent://public/default/

Pulsar 不会直接报错,而是自动帮你补全成 persistent://public/default/hello-pulsar,但有时候行为不符合预期。

教训:始终写完整的 Topic 路径,不要依赖自动补全。


坑 3:消费者启动比生产者晚,消息丢了

现象:生产者发了消息,消费者后来才启动,什么都没收到。

原因:我用的是 non-persistent Topic,消息不落盘,消费者不在线时消息就丢了。

教训:开发测试阶段统一用 persistent Topic,消息持久化到磁盘,消费者随时启动都能拿到历史消息。


坑 4:以为 Shared 模式会保证顺序

发了 1、2、3 三条消息,消费者收到的顺序是 1、3、2。

原因:Shared 模式消息是轮询分发的,多个 Consumer 处理速度不同,顺序无法保证。

教训:需要顺序就用 Exclusive 或 Key_Shared,不要用 Shared。

Pulsar

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2026年05月14日 15:55
0

目录

来自 《我学 Pulsar 的第一周:从懵逼到能跑起来》