Big-Data-Processing (FullStack & beyond) Part-3
Web Sockets & Kafka Streams
In Part-2, we have seen our web app fetching data from graph database via an API layer. Now, users can get the movies info of their favorite actors or the cast of their favorite movies using the web form with out much latency.
In this post, we will explore streaming these results back to the server and in the process get our hands dirty with Web Sockets
and Kafka Streams
. Following video demonstrates the learning outcomes of this post with respect to Sockets & Streams
.
As far as my experience goes working with Web Sockets
in multiple languages - Python
, Rust
and NodeJs
, NodeJs
websocket library - ws
- is more amicable to work with especially in seamless integration with web client frameworks such as Angular
. And, thanks to Microservices Based Architecture
with Docker
, it is ever so easy to glue together systems of different languages cohesively so that we are using best of the benefits programming language ecosystem has to offer.
With that being said, let’s add stream
functionality using web sockets
to our UI codebase.
services/ws.service.ts
import { Injectable, Inject } from '@angular/core';
import { Observable, of } from 'rxjs';
import {map} from 'rxjs/operators';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, tap, switchAll } from 'rxjs/operators';
import { EMPTY, Subject } from 'rxjs';
export const WS_ENDPOINT = 'ws://localhost:8443/';
@Injectable({
providedIn: 'root'
})
export class WsService {
private socket$: WebSocketSubject<any>;
private socketClosed: boolean = false;
private messagesSubject$: any = new Subject();
public messages$ = this.messagesSubject$.pipe(
switchAll(), catchError(e => { throw e })
);
constructor() { }
public connect(): void {
if (!this.socket$ || this.socket$.closed || this.socketClosed) {
this.socket$ = this.getNewWebSocket();
this.socket$.subscribe(
msg => {
this.messagesSubject$.next(of(msg))
}, // when there is a message from the server.
err => console.log(err), // when there's an error from WebSocket API.
() => {
console.log('complete');
this.socketClosed = true;
} // when connection is closed.
);
}
}
private getNewWebSocket() {
return webSocket(WS_ENDPOINT);
}
sendMessage(msg: any): Observable<any> {
return of(this.socket$.next(msg));
}
close() {
this.socket$.complete();
}
}
app.component.html
...
<div class="stream-container" *ngIf="loading || data.length > 0">
<button mat-raised-button class="stream-btn" (click)="stream();">Stream</button>
</div>
<div *ngIf="notifications.length > 0" class="stream-container">
<ng-container *ngFor="let notification of notifications;">
<p></p>
</ng-container>
</div>
app.component.scss
.stream-container {
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
padding: 10px;
}
.stream-btn {
padding: 5px;
size: 2rem;
}
app.component.ts
// ...
import { WsService } from './services/ws.service';
// ...
public loading: boolean = false;
public notifications: string[] = [];
constructor(
private wsService: WsService
, private movieService: MovieService
) { }
// ...
public stream() {
this.wsService.connect();
this.data.forEach((m, i) => {
this.wsService.sendMessage(m);
if (i === this.data.length - 1) {
this.notifications.push("Messages were streamed successfully.");
}
});
}
It’s time to fire up NodeJs
container and serve our server side application that captures the stream events from our client and publishes the events in turn onto Kafka
topic.
docker run --name web-sock-nodejs --rm -it -p 8443:8443 node:latest bash
Once on the container,
mkdir /mnt/web-sock-app && cd $_
npm init
npm install --save express kafkajs ws
Add the following content to /mnt/web-sock-app/index.js
.
'use strict';
const EXPRESS = require('express');
const HTTP = require('http');
const WebSocket = require('ws');
const { Kafka } = require('kafkajs');
const PORT = 8443;
const HOST = '0.0.0.0';
const kafka = new Kafka({
clientId: 'web-sock-app',
brokers: ['kafka:9092']
});
const producer = kafka.producer();
const topic = 'test-topic';
const sendMessage = (msg) => {
return producer
.send({
topic,
messages: Array({
value: msg
}),
})
.then(console.log)
.catch(e => console.error(`[example/producer] ${e.message}`, e))
}
const stream = async (msg) => {
await producer.connect();
sendMessage(msg);
}
var app = EXPRESS();
const server = HTTP.createServer(app);
const wss = new WebSocket.Server({noServer: true});
server.on('upgrade', function (request, socket, head) {
wss.handleUpgrade(request, socket, head, function (ws) {
wss.emit('connection', ws, request);
});
});
wss.on('connection', function (ws, request) {
ws.on('message', function (message) {
console.log(`Received message ${message}`);
stream(message).catch(e => console.error(`[example/producer] ${e.message}`, e));
});
ws.on('close', function () {
console.log("closing websocket");
});
});
server.listen(PORT, HOST, () => console.log('app listening on port 8443!'));
Run the application using,
node index.js
Disclaimer:
In a production app, it’s recommended to use key-value based messages onto kafka topic. In doing so, with proper hash function, identical message will always be sent to the same partition of the kafka topic. In addition, it’s best advised to use Apache Avro
format when serializing the messages onto Kafka Topic.
If you don’t have Kafka Cluster
set up, please refer to this post. Go ahead and create test-topic
and you should see the messages getting streamed onto the consumer as you play with the IMDB form on our web-ui.
Happy Coding, or better to say, Happy Streaming!