Simple kafka consumer and producer using fastapi

Lemoncode21
3 min readApr 17, 2023

--

In this chapter, we are going to see how to implement the Apache Kafka in FastAPI application. Apache Kafka is an open source project used to publish and subscribe the messages based on the fault-tolerant messaging system. It is fast, scalable and distributed by design. If you are a beginner to Kafka, or want to gain a better understanding on it. https://kafka.apache.org/

Article Content

  • Setup or Install Kafka
  • Configuration FastAPI and Kafka
  • Producing Messages
  • Consuming a Message

1.Setup or Install Kafka

On this occasion I did install kafka using docker compose and also added kafdrop as kafka ui:

version: '3.3'

services:
zookeeper:
image: 'bitnami/zookeeper:3.7.0'
container_name: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- ./bitnami/zookeeper:/bitnami/zookeeper

kafka:
image: 'bitnami/kafka:2.8.0'
container_name: kafka
ports:
- "9093:9093"
expose:
- "9093"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="kafka_capstone_event_bus:1:1"
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
- ./bitnami/kafka:/bitnami/kafka

kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
depends_on:
- kafka

2.Configuration FastAPI and Kafka

To start off we need to create a virtual environment and FastAPI.

# Create dir
mkdir app

cd app

#create virtual environment
python -m venv ./venv

#activate virtual environment (Windows)
.\venv\Scripts\activate

after That you can install fastapi

pip install fastapi

We will need an ASGI (Asynchronous Server Gateway Interface) server, in this case we will use Uvicorn.

pip install uvicorn

In the initial configuration we add the file main.py:

from fastapi import FastAPI

app = FastAPI()

#define endpoint
@app.get("/")
def home():
return "Welcome Home"

Let us initialize configration kafka in config.py file:

import asyncio

# env Variable
KAFKA_BOOTSTRAP_SERVERS= "localhost:9093"
KAFKA_TOPIC="kafka"
KAFKA_CONSUMER_GROUP="group-id"
loop = asyncio.get_event_loop()

And we create message schema in schema.py file:

from pydantic import BaseModel

class Message(BaseModel):
message : str

3.Producing Messages

To produce messages into Apache Kafka, we add some router in router.py file.

@route.post('/create_message')
async def send(message: Message):
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
try:
print(f'Sendding message with value: {message}')
value_json = json.dumps(message.__dict__).encode('utf-8')
await producer.send_and_wait(topic=KAFKA_TOPIC, value=value_json)
finally:
await producer.stop()

4.Consuming a Message

To consume messages, we need to write a Consumer function file as shown below.

async def consume():
consumer = AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, group_id=KAFKA_CONSUMER_GROUP)
await consumer.start()
try:
async for msg in consumer:
print(f'Consumer msg: {msg}')
finally:
await consumer.stop()

and finally the router.py file as below :

from fastapi import APIRouter
from schema import Message
from config import loop, KAFKA_BOOTSTRAP_SERVERS, KAFKA_CONSUMER_GROUP, KAFKA_TOPIC
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import json

route = APIRouter()


@route.post('/create_message')
async def send(message: Message):
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
try:
print(f'Sendding message with value: {message}')
value_json = json.dumps(message.__dict__).encode('utf-8')
await producer.send_and_wait(topic=KAFKA_TOPIC, value=value_json)
finally:
await producer.stop()


async def consume():
consumer = AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, group_id=KAFKA_CONSUMER_GROUP)
await consumer.start()
try:
async for msg in consumer:
print(f'Consumer msg: {msg}')
finally:
await consumer.stop()

before we do the test, we add some more scripts to the main file finally like this:

from fastapi import FastAPI
import router
import asyncio

app = FastAPI()

@app.get('/')
async def Home():
return "welcome home"

app.include_router(router.route)
asyncio.create_task(router.consume())

--

--

Lemoncode21
Lemoncode21

No responses yet