Google News
logo
Kafka - Interview Questions
What is the poll loop in Kafka?
As we know that consumer system subscribes to topics in Kafka but it is Pooling loop which informs consumers if any new data has arrived or not. It is poll loop responsibility to handle coordination, partition rebalances, heartbeats, and data fetching. It is the core function in consumer API which keeps polling the server for any new data. Let's try to understand polling look in Kafka :
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            log.debug("topic = %s, partition = %d, offset = %d,"
                customer = %s, country = %s\n",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());
            int updatedCount = 1;
            if (custCountryMap.countainsValue(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount)
            JSONObject json = new JSONObject(custCountryMap);
            System.out.println(json.toString(4))
* This section is an infinite loop. Consumers keep pooling Kafka for new data.

* Consumers.Poll(100) : This section is very critical for the consumer as this section determine time interval(milliseconds)consumer should wait for data to arrive from the Kafka broker. If any consumer will not keep polling the data, the assigned partition usually goes to another consumer as they will be considered not alive.  If we pass 0 as parameters the function will return immediately.

* The second section returns the result set.  Individual results will be having data related to the topic and partition it belongs along with offset of record. We also get key and value pairs of record. Now we iterate through the result set and does our custom processing.  

* Once processing is completed, it writes a result in a data store. This will ensure the running count of customers from each country by updating a hashtable.   

* The ideal way for the consumer is calling a close() function before exiting. This ensures that it closes the active network connections and sockets. This function also triggers rebalancing at the same time rather than waiting for consumer group co-ordinator to find the same and assign partitions to other consumers. 
Advertisement