Simple kafka consumer and producer using fastapi
In this chapter, we are going to see how to implement the Apache Kafka in FastAPI application. Apache Kafka is an open source project used to publish and subscribe the messages based on the fault-tolerant messaging system. It is fast, scalable and distributed by design. If you are a beginner to Kafka, or want to gain a better understanding on it. https://kafka.apache.org/
Article Content
- Setup or Install Kafka
- Configuration FastAPI and Kafka
- Producing Messages
- Consuming a Message
1.Setup or Install Kafka
On this occasion I did install kafka using docker compose and also added kafdrop as kafka ui:
version: '3.3'
services:
zookeeper:
image: 'bitnami/zookeeper:3.7.0'
container_name: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- ./bitnami/zookeeper:/bitnami/zookeeper
kafka:
image: 'bitnami/kafka:2.8.0'
container_name: kafka
ports:
- "9093:9093"
expose:
- "9093"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS="kafka_capstone_event_bus:1:1"
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
- ./bitnami/kafka:/bitnami/kafka
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
depends_on:
- kafka
2.Configuration FastAPI and Kafka
To start off we need to create a virtual environment and FastAPI.
# Create dir
mkdir app
cd app
#create virtual environment
python -m venv ./venv
#activate virtual environment (Windows)
.\venv\Scripts\activate
after That you can install fastapi
pip install fastapi
We will need an ASGI (Asynchronous Server Gateway Interface) server, in this case we will use Uvicorn.
pip install uvicorn
In the initial configuration we add the file main.py:
from fastapi import FastAPI
app = FastAPI()
#define endpoint
@app.get("/")
def home():
return "Welcome Home"
Let us initialize configration kafka in config.py file:
import asyncio
# env Variable
KAFKA_BOOTSTRAP_SERVERS= "localhost:9093"
KAFKA_TOPIC="kafka"
KAFKA_CONSUMER_GROUP="group-id"
loop = asyncio.get_event_loop()
And we create message schema in schema.py file:
from pydantic import BaseModel
class Message(BaseModel):
message : str
3.Producing Messages
To produce messages into Apache Kafka, we add some router in router.py file.
@route.post('/create_message')
async def send(message: Message):
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
try:
print(f'Sendding message with value: {message}')
value_json = json.dumps(message.__dict__).encode('utf-8')
await producer.send_and_wait(topic=KAFKA_TOPIC, value=value_json)
finally:
await producer.stop()
4.Consuming a Message
To consume messages, we need to write a Consumer function file as shown below.
async def consume():
consumer = AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, group_id=KAFKA_CONSUMER_GROUP)
await consumer.start()
try:
async for msg in consumer:
print(f'Consumer msg: {msg}')
finally:
await consumer.stop()
and finally the router.py file as below :
from fastapi import APIRouter
from schema import Message
from config import loop, KAFKA_BOOTSTRAP_SERVERS, KAFKA_CONSUMER_GROUP, KAFKA_TOPIC
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import json
route = APIRouter()
@route.post('/create_message')
async def send(message: Message):
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
try:
print(f'Sendding message with value: {message}')
value_json = json.dumps(message.__dict__).encode('utf-8')
await producer.send_and_wait(topic=KAFKA_TOPIC, value=value_json)
finally:
await producer.stop()
async def consume():
consumer = AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, group_id=KAFKA_CONSUMER_GROUP)
await consumer.start()
try:
async for msg in consumer:
print(f'Consumer msg: {msg}')
finally:
await consumer.stop()
before we do the test, we add some more scripts to the main file finally like this:
from fastapi import FastAPI
import router
import asyncio
app = FastAPI()
@app.get('/')
async def Home():
return "welcome home"
app.include_router(router.route)
asyncio.create_task(router.consume())