In this article we will go through how Kubernetes works as a Cluster Resource Manager for deploying Apache Spark applications. Spark as a project has evolved a lot in the last few years and with release of Spark 3.x, there’s a huge interest to see how Kubernetes helps to run our applications anywhere. Kubernetes as a resource manager is now Generally Available for use in production. For people who're new to Big Data or Spark, Apache Spark supports four different Resource Managers -
- Standalone
- YARN (Yet Another Resource Negotiator)
- Apache Mesos
- Kubernetes
Kubernetes as a Resource Manager is the latest entrant to the Spark project which went GA (Generally Available) this year itself. With huge traction towards Containerization around the world, I’m sure K8s will become the de facto deployment mode soon for Apache Spark.
So without any further ado, let’s jump straight into it.
Introduction
Spark is a general purpose distributed cluster computing framework similar to Apache Hadoop wherein you have a bunch of machines connected together to form a Compute Cluster which helps to process your huge volume of data sometimes called Big Data.
Spark uses something called Master Slave architecture which means there are Master Nodes (in a High Available Cluster there are typically 3 Master Nodes) and one to many Worker Nodes. Master Nodes manages your Worker Nodes with the help of Cluster Manager wherein the Executors run which helps to process all the data in the form of tasks.
Spark provides two different modes for run your applications - Cluster and Client Mode. Detailed explanation around them is written in Apache Spark’s official documentation.
So, this is all about a brief introduction to Apache Spark and now let’s see how we can use it to run our jobs on top of a Kubernetes Cluster. Generally there are three ways by which we can run Spark applications on Kubernetes. Let me show you, how all 3 works -
- Spark Submit CLI (Local Development/ Testing)
- KubernetesPodOperator for Apache Airflow (Recommended for Production)
- SparkK8sOperator (I have not used in Production, but others do I guess)
Before we start looking into how all these three methods work, lets just see the Spark Architecture and Spark on K8s Architecture.
Architecture
Kubernetes as Cluster Manager
Prerequisites
We need to met following prerequisites before running our Apache Spark job on a K8s Cluster.
Spark Docker Image
In order to run Spark jobs on Kubernetes, you will require a Docker Image with Spark in it. I have created an image using Spark 3.0.1 which you can use as well. The below docker image is hosted on Dockerhub and is publicly available.
1. Spark 3.0.1 Dockerfile
2. Spark 3.0.1 Docker Image
Spark Job
I’m going to use following PySpark job during this exercise -
Kubernetes Namespace and Service Account
To submit spark jobs in your Kubernetes Cluster, you will require a namespace along with a Service Account with proper permissions to create required pods in the cluster. You can use following commands -
kubectl create namespace spark
kubectl create serviceaccount spark -n spark
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
Once you have all your prerequisites done, let's see how we can use various different methods to submit Spark jobs on Kubernetes.
Spark Submit CLI
To submit a Spark job using Spark Submit CLI command, we would require Spark binaries present at some location from where we can submit the job. For this exercise, I just created a pod using Spark image and submitted the job via it. You can create this pod both on your On-Premise K8s Cluster and AWS EKS which can act as your client to submit the jobs.
Spark Jump Pod
apiVersion: v1
kind: Pod
metadata:
name: spark-jump-pod
namespace: spark
spec:
serviceAccountName: spark
containers:
- image: dataengineeringe2e/spark-ubuntu-3.0.1
name: spark-jump-pod
command: ["bash"]
args: ["-c", "while true; do echo hello; sleep 10;done"]
imagePullPolicy: Always
Once Spark Jump Pod is up and running, you can connect to the Pod to run below Spark Submit commands.
kubectl -n spark exec -it spark-jump-pod bash
Once connected to the pod, just use below commands to submit your Spark application in Cluster Mode to process data in Ceph and S3 respectively.
On-Premise Rancher Kubernetes Cluster processing data in Ceph
Some key points to note here about Ceph -
- Ceph is an Object Store similar to S3. Ceph uses same APIs, SDKs or Hadoop JARs which are required to work with AWS S3
- Ceph can be deployed on Bare Metal servers using cephadm utility
- Ceph can be deployed on Kubernetes using Rook Ceph
- Ceph Installation Guide - Link
To learn more about how a Spark job authenticates with Ceph or S3, I would recommend to kindly read Hadoop-AWS Integration documentation - Link
Now, let me try to explain some of the properties related to Ceph used below so that you can have a better understanding of them -
spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
If you use following Credentials Provider, it means you have to specify the value of
fs.s3a.access.key
andfs.s3a.secret.key
. Ceph uses same terminologies as S3. We need to create a Ceph Bucket and also have to create a Ceph Object Gateway User. The Ceph Object Gateway user will have an Access Key and Secret Key attached to it, which has to be given access to respective Ceph bucket using Ceph Bucket Policies. This functionality is exactly similar how we create an IAM User in AWS and give that IAM User access to a respective S3 Bucket.Let's see how Ceph Object Gateway user and access keys look like -
spark.hadoop.fs.s3a.access.key=<CEPH_ACCESS_KEY>
This property as name suggests signifies Ceph Object Gateway user Access Key which has access to required Ceph Bucket.
spark.hadoop.fs.s3a.secret.key=<CEPH_SECRET_KEY>
This property as name suggests signifies Ceph Object Gateway user Secret Key which has access to required Ceph Bucket.
spark.hadoop.fs.s3a.endpoint=http://ceph.example.com:8080
This property signifies the Ceph Endpoint. In AWS we don't have to pass this, as it automatically resolves to S3 endpoint. However, for S3 compliant object stores like Ceph or Minio, we need to pass value for this property.
spark.hadoop.fs.s3a.connection.ssl.enabled=false
This property should be set to true if your Ceph Endpoint has HTTPS enabled. In my case, its set to False.
spark.hadoop.fs.s3a.path.style.access=true
Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
All the other properties used below are very well documented in official Spark Documentation - Link
/opt/spark/bin/spark-submit \
--master k8s://https://rancher.example.com:6443 \
--deploy-mode cluster \
--name amazon-data-review \
--conf spark.kubernetes.driver.pod.name=amazon-data-review \
--conf spark.kubernetes.executor.podNamePrefix=amazon-data-review \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=2 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=55g \
--conf spark.kubernetes.container.image=dataengineeringe2e/spark-ubuntu-3.0.1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.hadoop.fs.s3a.access.key=<CEPH_ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<CEPH_SECRET_KEY> \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.endpoint=http://ceph.example.com:8080 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.eventLog.enabled=false \
s3a://spark-demo/Ceph_S3_Data_Read_PySpark.py
AWS EKS processing data in S3
/opt/spark/bin/spark-submit \
--master k8s://https://14HH948AC611F5A7F020B62A5C366F04.yl4.us-east-1.eks.amazonaws.com:443 \
--deploy-mode cluster \
--name amazon-data-review \
--conf spark.kubernetes.driver.pod.name=amazon-data-review \
--conf spark.kubernetes.executor.podNamePrefix=amazon-data-review \
--conf spark.kubernetes.namespace=spark \
--conf spark.executor.instances=2 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=55g \
--conf spark.kubernetes.container.image=dataengineeringe2e/spark-ubuntu-3.0.1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.hadoop.fs.s3a.access.key=<AWS_ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<AWS_SECRET_KEY> \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.eventLog.enabled=false \
s3a://spark-demo/Ceph_S3_Data_Read_PySpark.py
CONCLUSION - Ceph and S3 follows same terminologies and without changing a single line of code in your Spark application you can submit it on both On-Premise and AWS K8s Cluster.
KubernetesPodOperator in Airflow DAG
Let’s see how we can use the KubernetesPodOperator within our Airflow DAG to submit our job.
On-Premise Rancher Kubernetes Cluster processing data in Ceph
# Airflow DEMO DAG
from airflow import DAG
from datetime import timedelta, datetime
from kubernetes.client import models as k8s
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
args = {
"owner": "prateek.dubey",
"email": ["dataengineeringe2e@gmail.com"],
"depends_on_past": False,
"start_date": datetime(2019,1,1),
"catchup": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
dag = DAG(dag_id='kubernetes_sample_dag', default_args=args, schedule_interval=None)
amazon_data_review = KubernetesPodOperator(
namespace='spark',
image='dataengineeringe2e/spark-ubuntu-3.0.1',
image_pull_policy='Always',
service_account_name='spark',
name='amazon_data_review',
task_id='amazon_data_review',
in_cluster=True,
get_logs=True,
arguments=[
'/opt/spark/bin/spark-submit',
'--master', 'k8s://https://rancher.example.com:6443',
'--deploy-mode', 'cluster',
'--name', 'amazon-data-review',
'--conf', 'spark.kubernetes.driver.pod.name=amazon-data-review',
'--conf', 'spark.kubernetes.executor.podNamePrefix=amazon-data-review',
'--conf', 'spark.kubernetes.namespace=spark',
'--conf', 'spark.kubernetes.container.image=dataengineeringe2e/spark-ubuntu-3.0.1',
'--conf', 'spark.kubernetes.container.image.pullPolicy=Always',
'--conf', 'spark.kubernetes.authenticate.driver.serviceAccountName=spark',
'--conf', 'spark.kubernetes.authenticate.executor.serviceAccountName=spark',
'--conf', 'spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt',
'--conf', 'spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token',
'--conf', 'spark.hadoop.fs.s3a.access.key=<CEPH_ACCESS_KEY>',
'--conf', 'spark.hadoop.fs.s3a.secret.key=<CEPH_SECRET_KEY>',
'--conf', 'spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider',
'--conf', 'spark.hadoop.fs.s3a.endpoint=http://ceph.example.com:8080',
'--conf', 'spark.hadoop.fs.s3a.connection.ssl.enabled=false',
'--conf', 'spark.hadoop.fs.s3a.path.style.access=true',
'--conf', 'spark.executor.instances=2',
'--conf', 'spark.executor.cores=3',
'--conf', 'spark.executor.memory=55g',
'--conf', 'spark.eventLog.enabled=false',
's3a://spark-demo/Ceph_S3_Data_Read_PySpark.py'
],
dag=dag
)
amazon_data_review
SparkK8sOperator by Google Cloud
SparkK8sOperator by Google Cloud is another great way to run Spark applications on a K8s Cluster. You can install the operator by following the official documentation -
Let’s see how we can use the SparkK8sOperator to submit our job. In this case, I would suggest to put Spark Config properties within the PySpark job itself. You can refer following code to understand how we can define Spark Config properties in Spark Job itself - https://github.com/dprateek1991/Data-Engineering-Courseware/blob/master/deep-dive/apache-spark/pyspark/Ceph_S3_Data_Read_PySpark_with_config.py
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: amazon-data-review
namespace: spark
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "dataengineeringe2e/spark-ubuntu-3.0.1"
imagePullPolicy: Always
mainApplicationFile: s3a://spark-demo/Ceph_S3_Data_Read_PySpark.py
sparkVersion: "3.0.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.0.1
serviceAccount: spark
executor:
cores: 3
instances: 2
memory: "55g"
labels:
version: 3.0.1
I hope you all have enjoyed going through this article and would love to explore Kubernetes for Spark.