

Introduction to Kafka Stream Processing in Python
Modelingposted by ODSC Community February 25, 2022 ODSC Community

Let us first provide a quick introduction to Apache Kafka for those who are not aware of this technology.
Kafka, in a nutshell, is an open-source distributed event streaming platform by Apache. Kafka allows us to build and manage real-time data streaming pipelines.
For the sake of this article, you need to be aware of 4 main Kafka concepts.
- Topic: All Kafka messages pass through topics. A topic is simply a way for us to organize and group a collection of messages. We can have multiple topics, each with a unique name.
- Consumers: A consumer is an entity within Kafka (commonly referred to as a subscriber) that is responsible for connecting (or subscribing) to a particular topic to read its messages.
- Producers: A producer is an entity within Kafka (commonly referred to as a publisher) that is responsible for writing (or publishing) messages to a particular topic.
- Consumer Groups: In Kafka, we can have multiple topics with multiple consumers subscribed to them. We can also have multiple services (i.e. external applications) subscribed to the same topics. To prevent overlaps and data issues, every service can have its own consumer group to keep track of which messages were already processed (commonly referred to as offsets).
TL;DR
Apache Kafka is a message-passing system.
Messages are organized in topics.
Producers send data to topics.
Consumers read data from topics.
Consumer groups manage a set of consumers.
Setting up a Local Kafka Instance
wurstmeister provides a really good repository for running Kafka in a docker container.
> git clone https://github.com/wurstmeister/kafka-docker.git > cd kafka-docker > docker-compose up -d # to scale your Kafka cluster we can use: # docker-compose scale kafka=<NUMBER_OF_KAFKA_BROKERS_YOU_WANT>
If we want our Kafka cluster to be accessible externally (i.e. from your terminal or services), we need to update the docker-compose.yml file.
vim docker-compose.yml and update it with the following:
version: '2' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: build: . ports: - "9092:9092" expose: - "9093" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
and then docker-compose up -d in a terminal.
The above will open the Kafka listeners on localhost:9092.
Once the docker image is built, we should see that the Kafka instance is running.

A running Kafka instance
To enter the Kafka bash shell, we can use:
kafka-docker % docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
Now is the time for us to create our first topic. We can do that by:
$KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 --topic test
To list all available topics in our Kafka instance, we can use:
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
To test that all is working correctly, let us send some messages.
Start a producer on the topic test:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test

Start a consumer on the topic test:
$KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=test

We can confirm that all messages published by the producer were read by the consumer.
Introducing Faust
Faust is a stream and event processing framework developed by Robinhood. The Python community eventually forked the repository and gave birth to faust-streaming. We will be using faust-streaming throughout the course of this article.
The concepts behind Faust are relatively straightforward and heavily based on the ideas behind Kafka Streams. Faust uses asynchronous programming and then requires Python version ≥ 3.6.0 (due to the async/await keywords).
The main concepts behind Faust are:
- Faust App — instantiates Faust and connects to our Kafka brokers.
- Agents — creates a stream processor. The agent is an async def function. We define the agent using the @app.agent decorator.
We can install Faust through pip using:
pip install faust-streaming
Building our First Faust App
Running a Faust app is very easy. Let us build our first app in myapp.py.
import faust class Test(faust.Record): msg: str app = faust.App('myapp', broker='kafka://localhost:9092') topic = app.topic('test', value_type=Test) @app.agent(topic) async def hello(messages): async for message in messages: print(f'Received {message.msg}') @app.timer(interval=5.0) async def example_sender(): await hello.send( value=Test(msg='Hello World!'), ) if __name__ == '__main__': app.main()
We can run our app using:
faust -A myapp worker -l info
This will start the Worker instance of myapp (handled by Faust). The log Worker ready signals that the worker has started successfully and is ready to start processing the stream.
This app will send a message to our test Kafka topic every 5 seconds and have the agent consume it in real-time and print it out for us.
Sample output from running our first Faust app
Let us start dissecting the code bit by bit.
The Faust App
app = faust.App('myapp', broker='kafka://localhost:9092')
Here we give our app a name (which will also be used to create a Kafka consumer group) and specify the location of our Kafka brokers. Assuming that you are following this guide with me, you do not need to change the broker settings. Of course, if you have different broker URLs, specify those!
These are the minimum parameters required to get started with Faust. The full list of app parameters can be found here.
We also specify the topic that we want to use using the app.topic() function. We pass in the parameter value_type to force a message structure. This parameter is mostly used for sending messages to a Kafka topic. If we do not need to enforce any message type, we can set value_type to use bytes instead (value_type=bytes).
A message type can be enforced by creating a Python class that extends the class faust.Record.
The Agent
We indicate that a function is a Faust agent by using the @app.agent() decorator. We pass the topic that we want our agent to subscribe to as part of the decorator.
The agent should always be a coroutine. Therefore, we should define the agent function using the async def keywords.
Since our agent is subscribed to a topic, we need to have an asynchronous loop to go over the messages in the stream. We do this using the async for keywords.
Inside this for-loop, we now have access to every single message being consumed by our agent. Here, we can start implementing our stream processing logic.
Besides subscribing an agent to a Kafka topic, we can also have our agent publish messages to some Kafka topic. The agent decorator accepts a parameter — sink — which defines the Kafka topic for publishing messages.
import faust import logging from asyncio import sleep log = logging.getLogger(__name__) class Test(faust.Record): msg: str app = faust.App('myapp', broker='kafka://localhost:9092') source_topic = app.topic('test', value_type=Test) destination_topic = app.topic('test', value_type=Test) # specify the source_topic and destination_topic to the agent @app.agent(source_topic, sink=[destination_topic]) async def hello(messages): async for message in messages: if message is not None: log.info(message.msg) # the yield keyword is used to send the message to the destination_topic yield Test(msg='This message is from the AGENT') # sleep for 2 seconds await sleep(2) else: log.info('No message received') if __name__ == '__main__': app.main()
In the above example, we pass in the destination topic as a sink to our agent (in this case, the same test topic we used for our source). On consumption of the first message (which we can send via the producer as shown above), the agent will start executing the logic inside the asynchronous loop.
We are yielding another message to the sink/destination topic. In our case, the destination topic is the same as the source topic, simulating the behavior of constantly receiving messages in our Kafka topic.
Batch Processing
The agent also provides us with the ability to process any Kafka stream in batches. We can achieve this behavior through the stream.take() function.
# specify the source_topic and destination_topic to the agent @app.agent(source_topic, sink=[destination_topic]) async def hello(messages_stream): async for records in messages_stream.take(5, within=10): # do something yield Test(msg='This message is from the AGENT')
The take() function takes in 2 parameters:
- max_ — the maximum number of messages in the batch
- within — timeout for waiting to receive max_ messages
We also use another app decorator in our example — @app.timer(interval=5.0). This is a similar coroutine as the agent, however, it is not subscribed to any Kafka stream or topic. Instead, this coroutine will be triggered every interval (in our case, we set our interval to be 5 seconds).
These decorators are called Actions. You can find the full list of accepted actions here.
We can have multiple actions defined in our app (as done in our example).
Project Structure
In our example, we wrote the entire Faust application in a single python script. For production code or a more complex application, it would be best to organize our project structure into a proper hierarchy.
Faust recommends the following project structure for larger projects:
+ proj/ - setup.py - MANIFEST.in - README.rst - setup.cfg + proj/ - __init__.py - __main__.py - app.py # APP initialisation and app settings + <APP_NAME>/ - __init__.py - agents.py # all agents and actions - models.py # message structures as python classes - views.py # all web views
Note: with this structure, we would have several agents and other action decorators in different modules. To tell Faust to search for these decorators, we can pass the autodiscover=True option to the Faust App in the app.py.
Conclusion
In this article, we discussed how to spawn a Kafka cluster in Docker and how to robustly process its stream of events from Python using Faust. We discussed how we can launch a minimal Faust app to subscribe to a Kafka stream and process its events.
Is this all? Definitely not. Faust is a huge library that is continuously evolving and there are many other amazing features. But for now, we have the basics to get started with building high-performance distributed systems.
I highly recommend you go over the Faust documentation and continue experimenting with the different functionalities that this library offers!
Originally posted here. Reposted with permission by David Farrugia, Data Scientist | AI Enthusiast and Researcher