0%

Pulsar 消息订阅

Apache Pulsar 消息订阅


介绍

该文摘自Apache Pulsar 官方文档Apache Pulsar Messaging#订阅

当有有数据进入到Pulsar后,需要将数据消费出来,这里就涉及订阅

订阅是命名好的配置规则,指导消息如何投递给消费者。

消息订阅

Pulsar 中有四种订阅模式: 独占(Exclusive),共享(Share),灾备(Failover)和key共享(Key_Shared)

独占(Exclusive)

Exclusive模式为默认订阅模式

在独占模式下,仅允许单个使用者附加到订阅。如果多个使用者使用相同的订阅来订阅主题,则会发生错误。

在下图中,仅允许消费者A-0消费消息

灾备(Failover)

在灾备模式下,多个消费者(Consumer)可以附加到同一订阅,主消费者会消费非分区主题或者分区主题中的每个分区的消息。当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。 发生这种情况时,所有未确认(ack)的消息都将传递给新的主消费者。

对于分区主题来说,Broker 将按照消费者的优先级和消费者名称的词汇表顺序对消费者进行排序。 然后试图将主题均匀的分配给优先级最高的消费者。

对于非分区主题来说,Broker 会根据消费者订阅非分区主题的顺序选择消费者。

在下图中,消费者B-0是主要消费者,如果消费者B-0断开连接,则消费者B-1将是排队接收消息的下一个消费者。

共享(Share)

在共享模式下,多个消费者者可以附加到同一订阅,消息通过round robin(轮询机制)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。

当消费者断开连接,所有被发送给它,但没有来得及确认(ack)的消息将被重新安排,分发给其它存活的消费者。

在下图中,Consumer-C-1和Consumer-C-2可以订阅该主题,但是Consumer-C-3和其他消费者也可以订阅该主题。

共享模式的局限性: 使用共享模式时,不保证消息顺序。不能在共享模式下使用累积确认(Cumulative Ack)。

key共享(Key_Shared)

key共享模式是共享模式的一种,不同的是它按key对消息做投递,相同的key的消息会被投递到同一个消费者上

Key_Shared模式的局限性: 使用Key_Shared模式时,需要为消息指定密钥或orderingKey。 不能在Key_Shared模式下使用累积确认(Cumulative Ack)。 生产者应禁用批处理或使用基于密钥的批处理生成器。

可以在 broker.config 中禁用 Key_Shared 模式

多主题订阅

当consumer订阅pulsar的主题时,默认指定订阅了一个主题,例如:persistent://public/default/my-topic

从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。

可以用以下两种方式定义topic的列表

  • 基于正则表达式(regex),例如persistent://public/default/finance-.*
  • 通过明确指定的topic列表

当使用正则匹配订阅多个主题的时候,所有的主题必须是在同一个命名空间里面的

当订阅多个主题的时候,Pulsar客户端将自动调用Pulsar API找到符合匹配规则的主题列表,然后订阅这些主题。 如果此时有暂不存在的主题,那么一旦这些主题被创建,消费者会自动订阅这些主题。

如下是 Java 订阅多个主题的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

PulsarClient pulsarClient = // Instantiate Pulsar client object

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(someTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();

关于代码示例,请参阅 Java

总结

Pulsar中的订阅实际上与Apache Kafka中的Consumer Group的概念类似。

独占(Exclusive): 在任何时间,一个消费者组(订阅)中有且只有一个消费者来消费 Topic 中的消息。
灾备(Failover): 多个消费者(Consumer)可以附加到同一订阅。 但是,一个订阅中的所有消费者,只会有一个消费者被选为该订阅的主消费者。 其他消费者将被指定为故障转移消费者。 当主消费者断开连接时,分区将被重新分配给其中一个故障转移消费者,而新分配的消费者将成为新的主消费者。 发生这种情况时,所有未确认(ack)的消息都将传递给新的主消费者。
共享(Share): 使用共享订阅,在同一个订阅背后,用户按照应用的需求挂载任意多的消费者。 订阅中的所有消息以循环分发形式发送给订阅背后的多个消费者,并且一个消息仅传递给一个消费者。当消费者断开连接时,所有传递给它但是未被确认(ack)的消息将被重新分配和组织,以便发送给该订阅上剩余的剩余消费者。

独占订阅或灾备订阅的消费者能够对消息进行单条确认和累积确认;共享订阅的消费者只允许对消息进行单条确认。


参考链接