In this post, I share my learnings into a real world use case with Apache Beam + SQL Server + Apache Spark stack. If you are new to Apache Beam, you may read about it here. If you want to follow along with the code and would like to bring up the Spark Cluster, you may read about it here.

We will use SQL Server for our database, connect to it using JDBC, use Apache Beam to process the data elements in parallel using Apache Spark cluster and finally write each processed set to a file.

I went with Apache Beam Java SDK due to it’s maturity with respect to number of available IO transormations.

1) SQL Server Database

Let’s first,

  • launch a SQL Server docker container,
  • create a user, a database, a table, and
  • load some data.
version: "3.4"

services:

  sql-server:
    container_name: sql-server
    image: mcr.microsoft.com/mssql/server:2019-CU5-ubuntu-16.04
    user: root
    environment:
      - ACCEPT_EULA=Y
      - SA_PASSWORD=<SA_PASSWORD>
      - MSSQL_PID=Enterprise
    ports:
      - "1433:1433"

Login to sql-server container and create a database & a user,

CREATE DATABASE beamsql;
USE beamsql;
CREATE LOGIN <USERNAME> WITH PASSWORD = <USERPASS>;
GRANT CREATE TABLE, SELECT, INSERT TO <USERNAME>;

Login with #USERNAME and #USERPASS you used above, to create a table and load some mock data.

USE beamsql;
CREATE TABLE MOVIE (title varchar(50), director varchar(50));
INSERT INTO MOVIE (title, director) 
    VALUES ('Good Will Hunting', 'Gus Van Sant'), ('The Revenant', 'Alejandro')
    , ('Inception', 'Chris Nolan'), ('Interstellar', 'Chris Nolan'), ('The Martian', 'Ridley Scott')
    , ('Gravity', 'Alfonso'), ('Baahubali', 'Rajamouli'), ('Pushpa', 'Sukumar')
    , ('Dangal', 'Nitesh Tiwari'), ('Anniyan', 'Shankar'), ('The Fall', 'Tarsem Singh')
    , ('Life of Pi', 'Ang Lee'), ('Parasite', 'Bong Joon');

2) Beam Pipeline

Using Eclipse IDE, create a Maven project with pom.xml (which includes spark and beam java dependencies) as below,

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.mvangala.beam-sql</groupId>
  <artifactId>beam-sql</artifactId>
  <version>0.0.1-SNAPSHOT</version>
<properties>
    <spark.version>3.2.0</spark.version>
  </properties>
<dependencies>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark</artifactId>
  <version>2.35.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.13</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-jdbc -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-jdbc</artifactId>
    <version>2.35.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc -->
<dependency>
    <groupId>com.microsoft.sqlserver</groupId>
    <artifactId>mssql-jdbc</artifactId>
    <version>10.1.0.jre11-preview</version>
</dependency>
<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.35.0</version>
   <scope>runtime</scope>
</dependency>

</dependencies>

<build>
  <plugins>
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>3.2.4</version>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
        <transformers>
          <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>
  </plugins>
</build>
</project>

Save the code below as BeamSQL.java.

package beamsql;

import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.Create;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import java.sql.ResultSet;
import java.sql.Statement;

import java.util.ArrayList;

public class BeamSQL {
  public static void main(String[] args) throws SQLException {
      PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
      Pipeline p = Pipeline.create(opts);
      ArrayList<String> results = getDBData();
      PCollection<String> lines = p.apply(Create.of(results)).setCoder(StringUtf8Coder.of());
      lines.apply(TextIO.write().to("/mnt/file.txt"));
      p.run();
  }
	
  public static ArrayList<String> getDBData() {
      Connection connection = null;
      ResultSet resultSet = null;
      ArrayList<String> results = new ArrayList<String>();
      try {
        String dbURL = "jdbc:sqlserver://sql-server:1433;databaseName=beamsql;encrypt=true;trustServerCertificate=true";
        String user = "<USERNAME>";
        String pass = "<USERPASS>";
        connection = DriverManager.getConnection(dbURL, user, pass);
            
        if (connection != null) {
          Statement statement = connection.createStatement();
          // Create and execute a SELECT SQL statement.

          String selectSql = "SELECT TOP 20 * FROM MOVIE";
          resultSet = statement.executeQuery(selectSql);
          while (resultSet.next()) {
            System.out.println(resultSet.getString(1) + ";" + resultSet.getString(2));
            results.add(resultSet.getString(1) + ";" + resultSet.getString(2));
          }
        }
      } catch (SQLException ex) {
        ex.printStackTrace();
      } finally {
        try {
          if (connection != null && !connection.isClosed()) {
            connection.close();
          }
        } catch (SQLException ex) {
          ex.printStackTrace();
        }
      }
      return results;
  }
}

3) Run Beam Pipeline on Spark Cluster

Maven install and build to generate beam-sql-0.0.1-SNAPSHOT-shaded.jar. Copy this jar onto spark-master docker container. If you don’t have Spark Cluster locally, check my blog post here.

docker cp beam-sql-0.0.1-SNAPSHOT-shaded.jar spark-master:/mnt/
# this copies the jar to spark-master docker container

spark-submit --class beamsql.BeamSQL beam-sql-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner
# this runs the beam pipeline in spark-context

As you notice, when you list the files in /mnt, you will see 7 output files created with 2 rows of data from database table Movie. Apache Beam parallelizes PCollection elements in ParDo and final write transformation happened on window of 2 elements (this can be further customized as documented here), thus resulting in 7 output files.

This simple example demonstrates the power of Apache Beam programming model and SDKs in parallelizing complex workflows in distributed computing environment so effortlessly.

beam-pardo

Happy Coding!! :+1:

Buy Me A Coffee