Web Sockets & Kafka Streams

In Part-2, we have seen our web app fetching data from graph database via an API layer. Now, users can get the movies info of their favorite actors or the cast of their favorite movies using the web form with out much latency.

In this post, we will explore streaming these results back to the server and in the process get our hands dirty with Web Sockets and Kafka Streams. Following video demonstrates the learning outcomes of this post with respect to Sockets & Streams.

As far as my experience goes working with Web Sockets in multiple languages - Python, Rust and NodeJs, NodeJs websocket library - ws - is more amicable to work with especially in seamless integration with web client frameworks such as Angular. And, thanks to Microservices Based Architecture with Docker, it is ever so easy to glue together systems of different languages cohesively so that we are using best of the benefits programming language ecosystem has to offer.

With that being said, let’s add stream functionality using web sockets to our UI codebase.

services/ws.service.ts

import { Injectable, Inject } from '@angular/core';
import { Observable, of } from  'rxjs';
import {map} from 'rxjs/operators';

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, tap, switchAll } from 'rxjs/operators';
import { EMPTY, Subject } from 'rxjs';
export const WS_ENDPOINT = 'ws://localhost:8443/';

@Injectable({
    providedIn: 'root'
})
export class WsService {
    private socket$: WebSocketSubject<any>;
    private socketClosed: boolean = false;
    private messagesSubject$: any = new Subject();
    public messages$ = this.messagesSubject$.pipe(
        switchAll(), catchError(e => { throw e })
    );

    constructor() { }

    public connect(): void {
  
        if (!this.socket$ || this.socket$.closed || this.socketClosed) {
          this.socket$ = this.getNewWebSocket();
          this.socket$.subscribe(
            msg => {
              this.messagesSubject$.next(of(msg))
            }, // when there is a message from the server.

            err => console.log(err), // when there's an error from WebSocket API.

            () => {
              console.log('complete');
              this.socketClosed = true;
            } // when connection is closed.

          );
        }
      }
      
      private getNewWebSocket() {
        return webSocket(WS_ENDPOINT);
      }
    
      sendMessage(msg: any): Observable<any> {
        return of(this.socket$.next(msg));
      }
    
      close() {
        this.socket$.complete(); 
      }
}

app.component.html

...
<div class="stream-container" *ngIf="loading || data.length > 0">
  <button mat-raised-button class="stream-btn" (click)="stream();">Stream</button> 
</div>
<div *ngIf="notifications.length > 0" class="stream-container">
  <ng-container *ngFor="let notification of notifications;">
    <p></p>
  </ng-container>
</div>

app.component.scss

.stream-container {
    display: flex;
    flex-direction: column;
    justify-content: center;
    align-items: center;
    padding: 10px;
}

.stream-btn {
    padding: 5px;
    size: 2rem;
}

app.component.ts

// ...

import { WsService } from './services/ws.service';
// ...

public loading: boolean = false;
public notifications: string[] = [];

constructor(
    private wsService: WsService
    , private movieService: MovieService  
) { }

// ...

public stream() {
    this.wsService.connect();
    this.data.forEach((m, i) => {
      this.wsService.sendMessage(m);
      if (i === this.data.length - 1) {
        this.notifications.push("Messages were streamed successfully.");
      }
    });
}

It’s time to fire up NodeJs container and serve our server side application that captures the stream events from our client and publishes the events in turn onto Kafka topic.

docker run --name web-sock-nodejs --rm -it -p 8443:8443 node:latest bash

Once on the container,

mkdir /mnt/web-sock-app && cd $_
npm init
npm install --save express kafkajs ws

Add the following content to /mnt/web-sock-app/index.js.

'use strict';
const EXPRESS = require('express');
const HTTP = require('http');
const WebSocket = require('ws');
const { Kafka } = require('kafkajs');

const PORT = 8443;
const HOST = '0.0.0.0';

const kafka = new Kafka({
  clientId: 'web-sock-app',
  brokers: ['kafka:9092']
});

const producer = kafka.producer();
const topic = 'test-topic';

const sendMessage = (msg) => {
  return producer
    .send({
      topic,
      messages: Array({
        value: msg
      }),
    })
    .then(console.log)
    .catch(e => console.error(`[example/producer] ${e.message}`, e))
}

const stream = async (msg) => {
  await producer.connect();
  sendMessage(msg);
}

var app = EXPRESS();
const server = HTTP.createServer(app);
const wss = new WebSocket.Server({noServer: true});

server.on('upgrade', function (request, socket, head) {
  wss.handleUpgrade(request, socket, head, function (ws) {
      wss.emit('connection', ws, request);
  });
});

wss.on('connection', function (ws, request) {
  ws.on('message', function (message) {
    console.log(`Received message ${message}`);
    stream(message).catch(e => console.error(`[example/producer] ${e.message}`, e));
  });
  ws.on('close', function () {
    console.log("closing websocket");
  });
});

server.listen(PORT, HOST, () => console.log('app listening on port 8443!'));

Run the application using,

node index.js

Disclaimer:
In a production app, it’s recommended to use key-value based messages onto kafka topic. In doing so, with proper hash function, identical message will always be sent to the same partition of the kafka topic. In addition, it’s best advised to use Apache Avro format when serializing the messages onto Kafka Topic.

If you don’t have Kafka Cluster set up, please refer to this post. Go ahead and create test-topic and you should see the messages getting streamed onto the consumer as you play with the IMDB form on our web-ui.

big-data-part3

Happy Coding, or better to say, Happy Streaming! :+1:

Buy Me A Coffee