This article is about some of the work I have been doing in my prior and current organization around Data Infrastructures. For the past few years, I have been using Kubernetes as the container orchestration platform to manage docker containers and also to run Big Data and ML workloads. I thought it’s time to share some of that knowledge with everyone and my learnings. Let’s walk through how we build a Data Platform on Kubernetes.

State of Data Infrastructure

When I joined my current organization, we were using CDH 6.1 (Cloudera Distribution Hub) On-Premise on Bare Metal CentOS servers to run our Data and ML workloads. We leverage CDH to manage Apache Kafka Cluster, Apache Hive, Apache Hadoop, and Apache Spark. Our Data Engineer was scheduling Data & ML Pipelines using crontab on the Gateway Node. We were missing some important functionalities like Data Governance, proper Job Scheduling, Observability/ Monitoring, any DevOps processes and most important reliability.


To meet Data Governance requirements, I started with adding Apache Atlas, Apache Ranger, and HUE to our Data Infrastructure. These technologies helped us to add Access Controls on data stored in Hive and HDFS along with providing Metadata Discovery, Lineage and Cataloging capabilities. I started running all these services as docker containers and integrated them with CDH services. Integration with CDH was not an easy task, as it required continuous updates in configuration properties and multiple restarts of the Cloudera cluster.

Cloudera_Data_Platform


The motivation to replace CDH started from here only. Our requirements were simple -

  • Redesign the platform, so that integrating other services would be a seamless task.
  • Reduce the frequent downtimes in our CDH environment which we were facing due to memory issues and improper utilisation of compute resources (CPU and Memory).
  • We had plans to move some workloads from On-Premise to AWS in future, so we wanted our migration to Cloud be seamless.

Therefore, I started working on a Modern Data Platform with the aim of making a Cloud Agnostic, Cloud Native and also a Cost Effective solution. In my previous organization Grab, I gathered quite a good experience of designing systems and infrastructure. My aim was to use that experience and build a Data Platform using widely acknowledged Open Source technologies in the field of Big Data and build a platform that can be deployed anywhere (On-Premise or Cloud).

I started with a solution for AWS and did a POC to show our leadership how we can leverage Kubernetes to build a Data Platform. The solution looks like below -

Data_Platform_K8s_AWS


The POC was a success and we were able to achieve our key tasks, which were:-

  • Build a Data Platform solution on Kubernetes.
  • Setup JupyterHub on Kubernetes for our Data Scientists and Data Engineers.
  • Run Apache Spark jobs on Kubernetes to process TB’s of data.
  • Schedule Apache Spark jobs using Apache Airflow also running on Kubernetes.

We were able to prove the value of building such a platform to use it for our project and even across different projects. We were also able to present a good CI/CD solution around such a platform. My proposal was to use EKS (Elastic Kubernetes Service) as the container orchestration platform for deploying all Data Platform services.

This gave path to finally replacing CDH running On-Premise with our current solution running on Rancher Kubernetes Platform. The solution looks like below -

Data_Platform_K8s_On_Premise


If you look closely, we were able to design an exactly similar Data Platform that can be run on AWS and On-Premise Bare Metal.

I started with a Highly Available Rancher 2.5.5 Kubernetes Cluster with 3 Master Nodes and 3 Worker Nodes. Till now we have grown our cluster 8x all using Ubuntu OS. We decided to replace CentOS with Ubuntu as CentOS support is going off. Reason to choose Rancher was simple - it’s completely Open Source.

Some key learnings while building such a platform On-Premise were:-
  • Read the documentation carefully. Rancher has written an amazing documentation on how to set up a Highly Available, Production Ready Kubernetes Cluster.
  • Running HDFS on Kubernetes is possible, however we haven’t come across any articles/ blogs stating companies have used it in Production. Therefore we decided to replace HDFS with Ceph. Ceph can be deployed on Kubernetes using Rook and also on Bare Metal without Kubernetes.
  • Use MetalLB as a Load Balancer for running K8s applications On-Premise.
  • Use Bind9 DNS Server and External DNS to update DNS Server with Ingress records.
  • We used Ceph for creating Persistent Volumes with Read Write Once (RWO) Access Mode and Longhorn for Read Write Many (RWX) Access Mode. Longhorn is a distributed storage platform created by Rancher for Kubernetes.

Let me also take you through how I build our CI/CD solution for this platform. I decided to use GitLab for CI and ArgoCD for CD. We also used GitLab to store our Docker images. I would like to cover our CICD as a two part solution, one covering our Kubernetes Platform and other one our Data Pipelines.

CI/CD Kubernetes Platform

CICD_Kubernetes


This is a standard CICD process used across organizations. Use GitLab to store all artifacts along with leveraging GitLab for CI. In our case, since our platform runs On-Premise we used GitLab for Docker Registry as well. I set up Gitlab CI pipelines to build and publish images to GitLab Container Registry. I used ArgoCD for deploying any changes done to our Helm Chart back onto our Rancher Kubernetes Cluster.

CI/CD Data Pipelines

CICD_Data_Pipeline


To set up a CICD process for our Data ETL pipelines I used the same approach as I have for our K8s cluster. We use Apache Airflow to schedule our ETL pipelines and set up the Gitlab CI pipeline to push PySpark scripts to a Ceph bucket. Our Airflow deployment has Git Sync enabled, therefore DAGs are automatically synced back to the Airflow scheduler and thus to the Airflow webserver every 60 seconds.

I used KubernetesPodOperator within our DAGs to submit spark jobs onto our K8s Cluster. Below is an example of a DAG using a docker image present in GitLab Container Registry:-
# Airflow DEMO DAG

from airflow import DAG
from datetime import timedelta, datetime
from kubernetes.client import models as k8s
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

args = {
	"owner": "prateek.dubey",
	"email": ["dataengineeringe2e@gmail.com"],
	"depends_on_past": False,
	"start_date": datetime(2019,1,1),
    "catchup": False, 
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

dag = DAG(dag_id='kubernetes_sample_dag', default_args=args, schedule_interval=None)

ceph_raw_data_read = KubernetesPodOperator(
        namespace='airflow',
        image='spark-executor-3.0.1',
        image_pull_policy='Always',
        image_pull_secrets=[k8s.V1LocalObjectReference('gcr')],
        service_account_name='spark',
        name='prateek-ceph-raw-data-read',
        task_id='ceph_raw_data_read',
        in_cluster=True,
        get_logs=True,
        arguments=[
                '/opt/spark/bin/spark-submit',
                '--master', 'k8s://https://<K8s_API_Server_Host>:<K8s_API_Server_Port>,
                '--deploy-mode', 'cluster',
                '--name', 'prateek-ceph-raw-data-read',
                '--conf', 'spark.kubernetes.driver.pod.name=prateek-ceph-raw-data-read',
                '--conf', 'spark.kubernetes.executor.podNamePrefix=prateek-ceph-raw-data-read',
                '--conf', 'spark.kubernetes.namespace=airflow',
                '--conf', 'spark.kubernetes.container.image=spark-executor-3.0.1',
                '--conf', 'spark.kubernetes.container.image.pullPolicy=Always',
                '--conf', 'spark.kubernetes.container.image.pullSecrets=gcr',
                '--conf', 'spark.kubernetes.authenticate.driver.serviceAccountName=spark',
                '--conf', 'spark.kubernetes.authenticate.executor.serviceAccountName=spark',
                '--conf', 'spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt',
                '--conf', 'spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token',
                '--conf', 'spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.EnvironmentVariableCredentialsProvider',
                '--conf', 'spark.hadoop.fs.s3a.endpoint=http://<Ceph_Endpoint>:8080',
                '--conf', 'spark.hadoop.fs.s3a.connection.ssl.enabled=false',
                '--conf', 'spark.hadoop.fs.s3a.path.style.access=true',
                '--conf', 'spark.executor.instances=2',
                '--conf', 'spark.executor.cores=6',
                '--conf', 'spark.executor.memory=55g',
                's3a://airflow/scripts/demo/Ceph_PySpark_Read_Raw_Data.py'
            ],
        dag=dag
    )

ceph_raw_data_read

Lastly, before I conclude this article I would like to say I hope my work will shed some light on how to use Kubernetes On-Premise and how we can build a Data Platform on it efficiently.

Tune up to my upcoming article about how we run JupyterHub on EKS and On-Premise Kubernetes.