Hi,
I have a list of offsets with their corresponding partition and I need to commit them manually.
To do so I am looping through the list and assigning partition to the consumer and then seeking to a particular offset.
then I am consuming the message and passing the ConsumerBulider to commit method.
Sometimes it executes smoothly but sometimes it throws "Local:Waiting for Coordinator" exception.
But in both the cases , when I try consuming messages afterwards I re-consume the same series of messages I already have committed or should I say I tried committing. Which means I never really could commit them :(
- foreach(var item in cmdparamslist)
- {
- Partition p = new Partition(Int16.Parse(item.PartitionID));
- TopicPartition tp = new TopicPartition(configuration.GetSection("KafkaSettings").GetSection("Topic").Value, p);
- Offset o = new Offset(long.Parse(item.Offset));
- TopicPartitionOffset tpo = new TopicPartitionOffset(tp,o);
-
- try
- {
- KafkaConsumer.Assign(tpo);
- await Task.Delay(TimeSpan.FromSeconds(1));
- KafkaConsumer.Seek(tpo);
- var cr = KafkaConsumer.Consume(cts.Token);
- try
- {
- KafkaConsumer.Commit(cr);
- }
- catch (TopicPartitionOffsetException e1)
- {
- Console.WriteLine("exception "+e);
- }
- catch (KafkaException e)
- {
- Console.WriteLine("exception "+e);
- }
- }
- catch (KafkaException e)
- {
- Console.WriteLine("exception "+e);
- }
- }
- KafkaConsumer.Close();
- }
- catch(Exception e)
- {
- Console.WriteLine("exception "+e);
- }
- }
-
- Consumer / Client configuration:
- var conf = new ConsumerConfig
- {
- GroupId = Guid.NewGuid().ToString(),
- BootstrapServers = configuration.GetSection("KafkaSettings").GetSection("RemoteServers").Value,
- AutoOffsetReset = AutoOffsetReset.Earliest,
- SaslMechanism = SaslMechanism.Gssapi,
- SecurityProtocol = SecurityProtocol.SaslPlaintext,
- EnableAutoCommit = false
-
- };
I am using Confluent.Kafka 1.6.2 version
Could someone please help me ?