Kafka to Pandas

Loading data from kafka to a pandas dataframe

Kafka does not naturally play well together. This is because of pandas being in memory and Kafka is designed to work with enterprise applications. Pandas pretty much the thing for data exploration and you want to actually dive into what is in a Kafka topic. This guide will show how to load a subset of data from a topic into a pandas dataframe.

Introduction

In this tutorial we walk through setting up a local version of Kafka, pushing some sample data to it. Then we will read from that topic and load the data into a pandas dataframe. The code for this project can be found on github.

setup

To set up kafka using homebrew follow these steps.

  1. brew cask install homebrew/cask-versions/adoptopenjdk8
  2. brew install Kafka
  3. brew services start zookeeper
  4. brew services start Kafka

From there we will create a topic with the following command. Kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic foo

The last piece of the setup that we will need to do is to install Kafka-python and pandas. If you cloned the repo you can run pip install -r requirements.txt otherwise run pip install kafka-python pandas

Produce Data

Next up we are going to send some data to our Kafka topic.

def producer():
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))
 
    for i in range(1000):
        num = {"number": i}
        producer.send(topic, num)

The producer will send the records to the topic as JSON that is encoded as bytes. The value serializer field handles the conversion. The byte conversion is happening to keep Kafka from yelling at us.

Consume the records

First, we will set up a Kafka consumer like this.

    consumer = KafkaConsumer(
    topics,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))

This consumer will allow us to read the Kafka topics that we specified. In our case foo and the value deserializer will turn the blob back to a dictionary. For the purposes of this example, we are going to read from offset zero. If you need to read from an arbitrary offset use consumer.seek to set the starting offset in a partition.

The standard way to consumer messages using Kafka python is.

for message in consumer:
    # do something with the message

However, that is a blocking call so we are going to use poll instead.

consumer.poll(timeout_ms=1000, max_records=100)

Just because the max records are set at 100 in this example does not guarantee that we will get 100 records back!

reading the data into the dataframe

The data that is returned from polling kafka, is a dictionary where each key is a combinaition of the topic and partition that the data came from. The value is a list of consumer records. These consumer records are python class implemented by the Kafka python library. The following code will extract each record value and place it in a list.

    l = []
    for m in messages.values():
        for i in m:
            l.append(i.value)

We are iterating through the values in the dictionary that was returned by the consumers poll. We only passed a single topic to the consumer which has one partition so in this case that dictionary only has one record.

That value is a list of consumer records. There is metadata in those records like offsets, timestamps, and more. Exploring what is in the consumer record is a good exercise to know what data that Kafka is storing. For this, we care about the actual value that we placed on the topic.

Once we have that list of dictionaries that we can turn it into a pandas dataframe like so.

df = pd.DataFrame(l)

To verify the read was successful we will print the head of the dataframe.

print(df.head())

From here we’ve successfully read records from Kafka into a pandas dataframe. At this point, you can do whatever data exploration and manipulation that you desire.


640 Words

2021-02-20 01:56 +0000