Spark Data Pipeline End To End
I am working in a company that has a lot of data, the data is distributed over multiple sources. We have Kafka topics (million messages per minute) with real-time data running through it. We have multiple PostgreSQL servers that are being constantly updated. Our mission was to gather data from multiple sources (PostgreSQL, Kafka and others) and locations (regions) and to bring all the data to a BI platform (in our case Redshift).
The challenges as we will see were from several areas.
Our application in installed across continents, so obviously we have a Kafka per region. But in addition since the throughput of data was very high we also ended up creating cells of customers and deploying each cell on a different Kafka so that we could handle the load.
Even Kafka has its limits, and having so many messages on a limited number of partitions, your need another layer of balancing. So once you hit this limit you need to create a new Kafka cluster with another “partition” type — in our case region and then cells. Regions are obviously according to geography, so the next level of partitions we do arbitrary be “cells”, and this allows us also to group according to SLA of clients.
From PostgreSQL we need to get two types of tables: facts & dimensions.
There are two ways to sync data from a relational DB — pull vs push.
We started out with the pull paradigma.
When pulling data, in the case of dimensions — they are tables that do not change much. So we can sink those tables once a day. Using Spark we select all the data from each table and do an up-insert to Redshift.
For facts — we need to work harder. Since this data changes a lot, we need a column in the db that we can use to know when a row has been updated. In Spark we select all the rows since the last update time and append them to the table in Redshift.
A side note on Redshift. Redshift does not support up-insert commands so to get the same effect, you need to delete all records in the destination tables that are to be updated, and then to append the new records. To make the whole process atomic we used a transaction. For more information see One Trick to support UPSERT.
The solution of pulling data is very simple but has quite a few inherit issues. First the dimension tables, even if they were not updated, we will get all the records and update the Redshift. This could be optimized by adding an updated column in the original PostgreSQL table.
The bigger issue is the facts. In this case if a row was updated multiple times, then we would miss the update.
This is a “by design” bug in the case of pull.
To solve this we need to move to a push mechanism that will send us all updates. To solve this issue, the architect chose to use CDC to push all changes to a Kafka topic. From this topic we can the batch update the Redshift with all relevant data.
For more information on the CDC process, see sigals article on Building a CDC — Lessons learned.
The spark application for all the data pipelines was done on top of Kubernetes with the spark operator.
Since we started out with two types of pipelines (Kafka, PostgreSQL) we created a base class for each, and each subclass implemented the specific ETL.
This is a bit different than what we usually do. Usually we create a spark application per pipeline, but in this case the common code was very big, so in the same git repo we had all pipelines, and in the HELM Chart we define which main class and which resource configuration to use.
To monitor the process we sent statistics to datadog, per pipeline the number of records synced and the number updated.
Scheduling of the pipelines, we used the Cron Scheduled tasks of Kubernetes.
Deployment of all jobs of course must be automated, so we have a jenkins job to deploy the HELM Chart to Kubernetes.
To make life easy, we added a config section in the values file so that each new pipeline needs to be configured only in one place. For example:
- class: "com.clouddb.dimension.AccountDimensionToRedshiftApp"
javaOptions: "-XX:+UseG1GC -XX:G1HeapRegionSize=8M -XX:+UseStringDeduplication -Dlog4j.configuration=file:/opt/log4j-config/log4j.properties"
schedule: "0 0 * * *"
This way I have one place for all pipelines, but still have the option to change configurations per pipeline. Obviously the spark-scheduled-application.yaml needs to be templated to support this.
Our application is hosted on AWS cloud, so we kept all our secrets in AWS Secret Manager. In this application we need multiple secrets, we create one for each connection. Since we also have a policy of rolling passwords, we need the secrets to be controlled by the source application, but on the other hand we do not want to share secrets so we create a secret per source per application.
In our case we need a secret for PostgreSQL (read only), a secret to connect to Kafak (also read only) and a secret to connect to Redshift (read write).
Our application needs to be a single click to create a full environment. So once I want to deploy my application to QA, I need a Jenkins job that will create all the resources we needed (Redshift, AWS Roles, Kubernetes namespace…).
Since we believe in automating our environment, in a “infrastructure as code” fashion, we used Pulumi to create our environment. We choose to use GO as our language since it is used in other back-end projects, and is more familiar by the dev team.
Once Jenkins runs the Pulumi to create the external environment, we can then deploy the helm chart to create all resources in the Kubernetes namespace.
Last but not least I would like to mention our decision on the git repository. This is obviously open for debate as to how many repositories to create (separate code from deployment), but we found that having all code that has to with we each spark application in the same repository is a time saver.
Since the programmer is in charge of the application from end to end (including deployment) changes to code might also bring with it changes to deployment in HELM Chart or in Pulumi, so we found it time saving to have all of them in the same repository.
So when you create a branch a build the application, you can use the same branch to deploy the HELM Chart.
Our folder structure is as follows:
Each company decides how to distribute the work between teams. In this case devops created the platforms for dev team to deploy, and gave support for devops issues when they arise. Personally, to create an application from the code up to full deployment in Kubernetes was very rewarding.
We will contact you as soon as possible.