• Latest
  • Trending
Introduction to Kafka Stream Processing in Python

Introduction to Kafka Stream Processing in Python

March 2, 2022
Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

July 29, 2025
French Telco Orange Hit by Cyber-Attack

French Telco Orange Hit by Cyber-Attack

July 29, 2025
ATC Ghana supports Girls-In-ICT Program

ATC Ghana supports Girls-In-ICT Program

April 25, 2023
Vice President Dr. Bawumia inaugurates  ICT Hub

Vice President Dr. Bawumia inaugurates ICT Hub

April 2, 2023
Co-Creation Hub’s edtech accelerator puts $15M towards African startups

Co-Creation Hub’s edtech accelerator puts $15M towards African startups

February 20, 2023
Data Leak Hits Thousands of NHS Workers

Data Leak Hits Thousands of NHS Workers

February 20, 2023
EU Cybersecurity Agency Warns Against Chinese APTs

EU Cybersecurity Agency Warns Against Chinese APTs

February 20, 2023
How Your Storage System Will Still Be Viable in 5 Years’ Time?

How Your Storage System Will Still Be Viable in 5 Years’ Time?

February 20, 2023
The Broken Promises From Cybersecurity Vendors

Cloud Infrastructure Used By WIP26 For Espionage Attacks on Telcos

February 20, 2023
Instagram and Facebook to get paid-for verification

Instagram and Facebook to get paid-for verification

February 20, 2023
YouTube CEO Susan Wojcicki steps down after nine years

YouTube CEO Susan Wojcicki steps down after nine years

February 20, 2023
Inaugural AfCFTA Conference on Women and Youth in Trade

Inaugural AfCFTA Conference on Women and Youth in Trade

September 6, 2022
  • Consumer Watch
  • Kids Page
  • Directory
  • Events
  • Reviews
Friday, 10 April, 2026
  • Login
itechnewsonline.com
  • Home
  • Tech
  • Africa Tech
  • InfoSEC
  • Data Science
  • Data Storage
  • Business
  • Opinion
Subscription
Advertise
No Result
View All Result
itechnewsonline.com
No Result
View All Result

Introduction to Kafka Stream Processing in Python

by ITECHNEWS
March 2, 2022
in Data Science, Leading Stories
0 0
0
Introduction to Kafka Stream Processing in Python

YOU MAY ALSO LIKE

French Telco Orange Hit by Cyber-Attack

ATC Ghana supports Girls-In-ICT Program

Let us first provide a quick introduction to Apache Kafka for those who are not aware of this technology.

Kafka, in a nutshell, is an open-source distributed event streaming platform by Apache. Kafka allows us to build and manage real-time data streaming pipelines.

For the sake of this article, you need to be aware of 4 main Kafka concepts.

  1. Topic: All Kafka messages pass through topics. A topic is simply a way for us to organize and group a collection of messages. We can have multiple topics, each with a unique name.
  2. Consumers: A consumer is an entity within Kafka (commonly referred to as a subscriber) that is responsible for connecting (or subscribing) to a particular topic to read its messages.
  3. Producers: A producer is an entity within Kafka (commonly referred to as a publisher) that is responsible for writing (or publishing) messages to a particular topic.
  4. Consumer Groups: In Kafka, we can have multiple topics with multiple consumers subscribed to them. We can also have multiple services (i.e. external applications) subscribed to the same topics. To prevent overlaps and data issues, every service can have its own consumer group to keep track of which messages were already processed (commonly referred to as offsets).

TL;DR

Apache Kafka is a message-passing system.

Messages are organized in topics.

Producers send data to topics.

Consumers read data from topics.

Consumer groups manage a set of consumers.

Setting up a Local Kafka Instance

wurstmeister provides a really good repository for running Kafka in a docker container.

> git clone https://github.com/wurstmeister/kafka-docker.git 
> cd kafka-docker
> docker-compose up -d
# to scale your Kafka cluster we can use:
# docker-compose scale kafka=<NUMBER_OF_KAFKA_BROKERS_YOU_WANT>

If we want our Kafka cluster to be accessible externally (i.e. from your terminal or services), we need to update the docker-compose.yml file.

vim docker-compose.yml and update it with the following:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    build: .
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock

and then docker-compose up -d in a terminal.

The above will open the Kafka listeners on localhost:9092.

Once the docker image is built, we should see that the Kafka instance is running.

A running Kafka instance

To enter the Kafka bash shell, we can use:

kafka-docker % docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash

Now is the time for us to create our first topic. We can do that by:

$KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 --topic test

To list all available topics in our Kafka instance, we can use:

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list

To test that all is working correctly, let us send some messages.

Start a producer on the topic test:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test
Test producer

Start a consumer on the topic test:

$KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=test
Test consumer

We can confirm that all messages published by the producer were read by the consumer.

Introducing Faust

Faust is a stream and event processing framework developed by Robinhood. The Python community eventually forked the repository and gave birth to faust-streaming. We will be using faust-streaming throughout the course of this article.

The concepts behind Faust are relatively straightforward and heavily based on the ideas behind Kafka Streams. Faust uses asynchronous programming and then requires Python version ≥ 3.6.0 (due to the async/await keywords).

The main concepts behind Faust are:

  1. Faust App — instantiates Faust and connects to our Kafka brokers.
  2. Agents — creates a stream processor. The agent is an async def function. We define the agent using the @app.agent decorator.

We can install Faust through pip using:

pip install faust-streaming

Building our First Faust App

Running a Faust app is very easy. Let us build our first app in myapp.py.

import faust
class Test(faust.Record):
    msg: str
app = faust.App('myapp', broker='kafka://localhost:9092')
topic = app.topic('test', value_type=Test)
@app.agent(topic)
async def hello(messages):
    async for message in messages:
        print(f'Received {message.msg}')
@app.timer(interval=5.0)
async def example_sender():
    await hello.send(
        value=Test(msg='Hello World!'),
    )
if __name__ == '__main__':
    app.main()

We can run our app using:

faust -A myapp worker -l info

This will start the Worker instance of myapp (handled by Faust). The log Worker ready signals that the worker has started successfully and is ready to start processing the stream.

This app will send a message to our test Kafka topic every 5 seconds and have the agent consume it in real-time and print it out for us.

Sample output from running our first Faust app

Let us start dissecting the code bit by bit.

The Faust App

app = faust.App('myapp', broker='kafka://localhost:9092')

Here we give our app a name (which will also be used to create a Kafka consumer group) and specify the location of our Kafka brokers. Assuming that you are following this guide with me, you do not need to change the broker settings. Of course, if you have different broker URLs, specify those!

These are the minimum parameters required to get started with Faust. The full list of app parameters can be found here.

We also specify the topic that we want to use using the app.topic() function. We pass in the parameter value_type to force a message structure. This parameter is mostly used for sending messages to a Kafka topic. If we do not need to enforce any message type, we can set value_type to use bytes instead (value_type=bytes).

A message type can be enforced by creating a Python class that extends the class faust.Record.

The Agent

We indicate that a function is a Faust agent by using the @app.agent() decorator. We pass the topic that we want our agent to subscribe to as part of the decorator.

The agent should always be a coroutine. Therefore, we should define the agent function using the async def keywords.

Since our agent is subscribed to a topic, we need to have an asynchronous loop to go over the messages in the stream. We do this using the async for keywords.

Inside this for-loop, we now have access to every single message being consumed by our agent. Here, we can start implementing our stream processing logic.

Besides subscribing an agent to a Kafka topic, we can also have our agent publish messages to some Kafka topic. The agent decorator accepts a parameter — sink — which defines the Kafka topic for publishing messages.

import faust
import logging
from asyncio import sleep
log = logging.getLogger(__name__)
class Test(faust.Record):
    msg: str
app = faust.App('myapp', broker='kafka://localhost:9092')
source_topic = app.topic('test', value_type=Test)
destination_topic = app.topic('test', value_type=Test)
# specify the source_topic and destination_topic to the agent
@app.agent(source_topic, sink=[destination_topic])
async def hello(messages):
    async for message in messages:
        if message is not None:
            log.info(message.msg)
            # the yield keyword is used to send the message to the destination_topic
            yield Test(msg='This message is from the AGENT')
            # sleep for 2 seconds
            await sleep(2)
        else:
            log.info('No message received')
if __name__ == '__main__':
    app.main()

In the above example, we pass in the destination topic as a sink to our agent (in this case, the same test topic we used for our source). On consumption of the first message (which we can send via the producer as shown above), the agent will start executing the logic inside the asynchronous loop.

We are yielding another message to the sink/destination topic. In our case, the destination topic is the same as the source topic, simulating the behavior of constantly receiving messages in our Kafka topic.

Batch Processing

The agent also provides us with the ability to process any Kafka stream in batches. We can achieve this behavior through the stream.take() function.

# specify the source_topic and destination_topic to the agent
@app.agent(source_topic, sink=[destination_topic])
async def hello(messages_stream):
    async for records in messages_stream.take(5, within=10):
        # do something
        yield Test(msg='This message is from the AGENT')

The take() function takes in 2 parameters:

  • max_ — the maximum number of messages in the batch
  • within — timeout for waiting to receive max_ messages

We also use another app decorator in our example — @app.timer(interval=5.0). This is a similar coroutine as the agent, however, it is not subscribed to any Kafka stream or topic. Instead, this coroutine will be triggered every interval (in our case, we set our interval to be 5 seconds).

These decorators are called Actions. You can find the full list of accepted actions here.

We can have multiple actions defined in our app (as done in our example).

Project Structure

In our example, we wrote the entire Faust application in a single python script. For production code or a more complex application, it would be best to organize our project structure into a proper hierarchy.

Faust recommends the following project structure for larger projects:

+ proj/
    - setup.py
    - MANIFEST.in
    - README.rst
    - setup.cfg    + proj/
        - __init__.py
        - __main__.py
        - app.py       # APP initialisation and app settings
        + <APP_NAME>/
        -   __init__.py
        -   agents.py  # all agents and actions
        -   models.py  # message structures as python classes
        -   views.py   # all web views

Note: with this structure, we would have several agents and other action decorators in different modules. To tell Faust to search for these decorators, we can pass the autodiscover=True option to the Faust App in the app.py.

Conclusion

In this article, we discussed how to spawn a Kafka cluster in Docker and how to robustly process its stream of events from Python using Faust.

Source: David Farrugia, Data Scientist | AI Enthusiast and Researcher
Tags: Kafka Stream Processing in Python
ShareTweet

Get real time update about this post categories directly on your device, subscribe now.

Unsubscribe

Search

No Result
View All Result

Recent News

Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

July 29, 2025
French Telco Orange Hit by Cyber-Attack

French Telco Orange Hit by Cyber-Attack

July 29, 2025
ATC Ghana supports Girls-In-ICT Program

ATC Ghana supports Girls-In-ICT Program

April 25, 2023

About What We Do

itechnewsonline.com

We bring you the best Premium Tech News.

Recent News With Image

Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa

July 29, 2025
French Telco Orange Hit by Cyber-Attack

French Telco Orange Hit by Cyber-Attack

July 29, 2025

Recent News

  • Absa and Visa Extend Strategic Partnership to Advance Growth and Innovation Across Africa July 29, 2025
  • French Telco Orange Hit by Cyber-Attack July 29, 2025
  • ATC Ghana supports Girls-In-ICT Program April 25, 2023
  • Vice President Dr. Bawumia inaugurates ICT Hub April 2, 2023
  • Home
  • InfoSec
  • Opinion
  • Africa Tech
  • Data Storage

© Copyright 2026, All Rights Reserved | iTechNewsOnline.Com - Powered by BackUPDataSystems

Welcome Back!

Login to your account below

Forgotten Password?

Retrieve your password

Please enter your username or email address to reset your password.

Log In

Add New Playlist

No Result
View All Result
  • Home
  • Tech
  • Africa Tech
  • InfoSEC
  • Data Science
  • Data Storage
  • Business
  • Opinion

© Copyright 2026, All Rights Reserved | iTechNewsOnline.Com - Powered by BackUPDataSystems

Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?
Go to mobile version