Real time data processing is the most sought after skill now a days. And Apache Kafka is the clear front runner in processing data streams at scale and has been battle tested at corporate giants such as Netflix, Uber to name a few.

In this post, I wanted to share how we can read twitter real time tweet stream via a Kafka Producer into a Kafka Stream (a.k.a Topic) and in turn read by Kafka Consumer. Though, the current workflow seems trivial, it demonstrates the Kafka’s profound usecase of publish-subscribe model. One can envision a scenario where we can have multiple consumers consuming the same data elements in parallel, processing the down stream business logic in a fault tolerance fashion.

With no further ado, let’s get into action. But, first thing first, we need to fire up the Kafka environment on our local workstation to explore Kafka's fundamentals.

Kafka Installation using Compose

docker-compose.yaml

version: "3.3"
services:
  
  kafka-client:
    container_name: kafka-client
    image: docker.io/mvangala/kafka-client:latest
    command: ["tail", "-f", "/dev/null"]
    depends_on:
      - kafka
    links:
      - "kafka:kafka"
    environment:
      - KAFKA_PORT=9092

  zookeeper:
    container_name: zookeeper
    image: docker.io/wurstmeister/zookeeper:latest
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka
    image: docker.io/wurstmeister/kafka:latest
    ports:
      - "9092:9092"
    depends_on:
      - "zookeeper"
    links:
      - "zookeeper:zookeeper"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

Save the above contents as docker-compose.yaml and run

docker-compose -f docker-compose.yaml up -d
# this will bring up zookeeper, kafka and kafka-client containers

Let’s explore Kafka's basics by logging onto kafka container.

docker exec -it kafka bash
# this will launch you into kafka container

kafka-topics.sh --create --topic twitter-tweets --partitions=4 --zookeeper zookeeper --replication-factor 1
# this creates a kafka-topic

kafka-console-producer.sh --topic=twitter-tweets --bootstrap-server=kafka:9092
# with this kafka-producer, you can start pushing data onto kafka-topic you created

Now, open another terminal to launch a Kafka Consumer so that we can consume the data off of the Kafka Topic we have created.

docker exec -it kafka bash
# this will launch you into kafka container

kafka-console-consumer.sh --topic=twitter-tweets --bootstrap-server=kafka:9092
# this consumer can now read messages off of twitter-tweets topic in real time

Now type, let’s say the proverbial hello world phrase via kafka-producer we opened in the first terminal window. And this message magically appears in the kafka-consumer window you opened.

producer-consumer

Now that we have seen publish-subscribe model in action, let’s do this with real time twitter data.

You will need to create,

  • Twitter Developer Account at https://developer.twitter.com.
  • an app with oauth2 client credentials.
  • copy key into api.key file and secret into api.secret file.
  • finally, upgrade your account (it’s free) to have elevated access to Twitter API, so that you can read the tweets in bulk.

Now log onto kafka-client container using,

docker exec -it kafka-client bash
# this will launch you into kafka-client container

cd /mnt
# this changes to /mnt directory 

get_tweets.py

import tweepy
from kafka import KafkaProducer
from os import environ as ENV
import sys
import time

kafka_port = ENV.get('KAFKA_PORT', 9092)
kafka_producer = KafkaProducer(bootstrap_servers = 'kafka' + ':' + kafka_port)
kafka_topic = "twitter-tweets"

twitter_key = b'<PASTE YOUR api.key HERE>'
twitter_secret = b'<PASTE YOUR api.secret HERE>'
twitter_auth = tweepy.AppAuthHandler(twitter_key, twitter_secret)
twitter_client = tweepy.API(twitter_auth)

for tweet in tweepy.Cursor(twitter_client.search_tweets, q = 'firstworldproblems').items(10):
  kafka_producer.send(kafka_topic, str.encode(tweet.text))
  time.sleep(5) # wait for 5 seconds

sys.exit(0)

When you run the script, you will notice the real time tweets show up in your consumer. So, that’s a wrap … for now!!

Happy Coding!! :+1:

Buy Me A Coffee