kafka-node几个消费者(kafka-node several consumers)

有一个基本的例子,就像一个消费者的魅力。 它接收消息。 但是添加一个额外的消费者将被忽略。

let kafka = require('kafka-node'); let client = new kafka.Client(); let producer = new kafka.Producer(client); let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]); let consumer2 = new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]); producer.on('ready', function () { producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{ console.log(err,'1 sent'); }); producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{ console.log(err, '2 sent'); }); }); producer.on('error', function (err) { console.log('err', err); }) consumer1.on('message',(message) =>{ console.log(11, message); }); consumer2.on('message',(message) =>{ console.log(22, message); })

消费者2的“22”事件永远不会触发的问题。 如果我使用命令行工具进行检查,则存在有关该主题的数据

Have a basic example that works like a charm for 1 consumer. It receives the messages. But adding one additional consumer will be ignored.

let kafka = require('kafka-node'); let client = new kafka.Client(); let producer = new kafka.Producer(client); let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]); let consumer2 = new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]); producer.on('ready', function () { producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{ console.log(err,'1 sent'); }); producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{ console.log(err, '2 sent'); }); }); producer.on('error', function (err) { console.log('err', err); }) consumer1.on('message',(message) =>{ console.log(11, message); }); consumer2.on('message',(message) =>{ console.log(22, message); })

The issue that event with '22' for consumer2 never fires. Data on that topic exist if I check it with command line tools

最满意答案

您忘记在使用者参数中添加使用者组。 在您的情况下,两个使用者都属于一个使用者组(默认情况下称为kafka-node-group )。 拥有一个消费者群体意味着每个消费者群体将传递一次消息。 由于您的主题有0个分区,因此只有第一个消费者才会处理该消息。

如果要向两个消费者发送相同的消息,则必须有两个消费者组(每个组一个消费者)。 你可以这样做:

let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'}); let consumer2 = new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

如果您希望消费者一起处理消息(按分区),则需要增加主题中的分区数。

You forgot to add a consumer group in consumer parameter. In your case both consumers belong to one consumer group (which is by default called kafka-node-group). Having one consumer group means that every message will be delivered once per consumer group. And since your topic has 0 partitions only the first consumer will process the message.

If you want to send the same message to both consumers you have to have two consumer groups (one consumer per group). You can do it following way:

let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'}); let consumer2 = new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});

If you want your consumers process messages together (by partitions) you need to increase number of partitions in your topic.

更多推荐