Real Time Data Processing using Apache Kafka
Real time data processing is the most sought after skill now a days. And Apache Kafka is the clear front runner in processing data streams at scale and has been battle tested at corporate giants such as Netflix, Uber to name a few.
In this post, I wanted to share how we can read twitter real time tweet stream via a Kafka Producer into a Kafka Stream (a.k.a Topic) and in turn read by Kafka Consumer. Though, the current workflow seems trivial, it demonstrates the Kafka’s profound usecase of publish-subscribe model. One can envision a scenario where we can have multiple consumers consuming the same data elements in parallel, processing the down stream business logic in a fault tolerance fashion.
With no further ado, let’s get into action. But, first thing first, we need to fire up the Kafka environment on our local workstation to explore Kafka's fundamentals.
Kafka Installation using Compose
docker-compose.yaml
version: "3.3"
services:
kafka-client:
container_name: kafka-client
image: docker.io/mvangala/kafka-client:latest
command: ["tail", "-f", "/dev/null"]
depends_on:
- kafka
links:
- "kafka:kafka"
environment:
- KAFKA_PORT=9092
zookeeper:
container_name: zookeeper
image: docker.io/wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
container_name: kafka
image: docker.io/wurstmeister/kafka:latest
ports:
- "9092:9092"
depends_on:
- "zookeeper"
links:
- "zookeeper:zookeeper"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sockSave the above contents as docker-compose.yaml and run
docker-compose -f docker-compose.yaml up -d
# this will bring up zookeeper, kafka and kafka-client containersLet’s explore Kafka's basics by logging onto kafka container.
docker exec -it kafka bash
# this will launch you into kafka container
kafka-topics.sh --create --topic twitter-tweets --partitions=4 --zookeeper zookeeper --replication-factor 1
# this creates a kafka-topic
kafka-console-producer.sh --topic=twitter-tweets --bootstrap-server=kafka:9092
# with this kafka-producer, you can start pushing data onto kafka-topic you createdNow, open another terminal to launch a Kafka Consumer so that we can consume the data off of the Kafka Topic we have created.
docker exec -it kafka bash
# this will launch you into kafka container
kafka-console-consumer.sh --topic=twitter-tweets --bootstrap-server=kafka:9092
# this consumer can now read messages off of twitter-tweets topic in real timeNow type, let’s say the proverbial hello world phrase via kafka-producer we opened in the first terminal window. And this message magically appears in the kafka-consumer window you opened.

Now that we have seen publish-subscribe model in action, let’s do this with real time twitter data.
You will need to create,
-
Twitter Developer Accountat https://developer.twitter.com. - an app with oauth2 client credentials.
- copy
keyintoapi.keyfile andsecretintoapi.secretfile. - finally, upgrade your account (it’s free) to have
elevatedaccess toTwitter API, so that you can read the tweets in bulk.
Now log onto kafka-client container using,
docker exec -it kafka-client bash
# this will launch you into kafka-client container
cd /mnt
# this changes to /mnt directory get_tweets.py
import tweepy
from kafka import KafkaProducer
from os import environ as ENV
import sys
import time
kafka_port = ENV.get('KAFKA_PORT', 9092)
kafka_producer = KafkaProducer(bootstrap_servers = 'kafka' + ':' + kafka_port)
kafka_topic = "twitter-tweets"
twitter_key = b'<PASTE YOUR api.key HERE>'
twitter_secret = b'<PASTE YOUR api.secret HERE>'
twitter_auth = tweepy.AppAuthHandler(twitter_key, twitter_secret)
twitter_client = tweepy.API(twitter_auth)
for tweet in tweepy.Cursor(twitter_client.search_tweets, q = 'firstworldproblems').items(10):
kafka_producer.send(kafka_topic, str.encode(tweet.text))
time.sleep(5) # wait for 5 seconds
sys.exit(0)When you run the script, you will notice the real time tweets show up in your consumer. So, that’s a wrap … for now!!
Happy Coding!! ![]()