Consumer
A consumer is a process that attaches to a topic via a subscription and then receives messages.
Subscription Types - describe the way the consumers receive the messages from topics
- Exclusive - Only one consumer can subscribe, guaranteeing message order.
- Shared - Multiple consumers can subscribe, messages are delivered round-robin, offering good scalability but no order guarantee.
- Failover - Similar to shared subscriptions, but multiple consumers can subscribe, and one actively receives messages.
Example
let topic = "/default/test_topic".to_string();
let mut consumer = client
.new_consumer()
.with_topic(topic.clone())
.with_consumer_name("test_consumer")
.with_subscription("test_subscription")
.with_subscription_type(SubType::Exclusive)
.build();
// Subscribe to the topic
let consumer_id = consumer.subscribe().await?;
println!("The Consumer with ID: {:?} was created", consumer_id);
let _schema = client.get_schema(topic).await.unwrap();
// Start receiving messages
let mut message_stream = consumer.receive().await?;
while let Some(message) = message_stream.next().await {
//process the message and ack for receive
}
ctx := context.Background()
topic := "/default/test_topic"
subType := danube.Exclusive
consumer, err := client.NewConsumer(ctx).
WithConsumerName("test_consumer").
WithTopic(topic).
WithSubscription("test_subscription").
WithSubscriptionType(subType).
Build()
if err != nil {
log.Fatalf("Failed to initialize the consumer: %v", err)
}
consumerID, err := consumer.Subscribe(ctx)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
log.Printf("The Consumer with ID: %v was created", consumerID)
// Receiving messages
streamClient, err := consumer.Receive(ctx)
if err != nil {
log.Fatalf("Failed to receive messages: %v", err)
}
for {
msg, err := streamClient.Recv()
//process the message and ack for receive
}
Complete example
For a complete example implementation of the above code using producers and consumers, check the examples: