Run Spark Jobs with Airflow
The IOMETE's Airflow Plugin is an extension designed to make it easy for developers to trigger and manage
Spark jobs within their Airflow workflows.
In this documentation, we will guide you through the installation, configuration, and usage of the plugin
to help you get started quickly and efficiently.
Installation
Prerequisites
- Python 3.7 or later
- Apache Airflow 2.x
Setup Airflow (Optional)
Skip this step if you already have Airflow installed and configured.
We will cover 2 ways of installing Airflow with IOMETE plugin (Operator) locally:
1. Docker Setup
Clone our Airflow plugin repository and run docker-compose
file.
It will build latest version from source and will start Airflow webserver on \http://localhost:8080\. May take couple of minutes to start.
docker-compose up --build
2. Helm Install
You can install Airflow directly to Kubernetes using official Helm chart.
Just change the docker image to iomete/airflow:2.7.1
in values.yaml
file. It will run Airflow with installed IOMETE plugin.
Manually Install IOMETE Airflow Plugin
Skip this step if you installed Airflow with one of the methods above.
If you already have Airflow installed and configured, you can install the IOMETE Airflow Plugin manually. In your Airflow home directory, run the following command (May need to restart Airflow webserver):
pip install iomete-airflow-plugin
Configure IOMETE Variables
Before using the IometeOperator, you need to configure the required parameters in your Airflow environment.
From Airflow UI, navigate to Admin > Variables and add the following variables:
-
iomete_access_token
- In the Resources section below, you can find a link to the detailed documentation on how to create an API token. -
iomete_host
- The host of the IOMETE platform. For example:https://sandbox.iomete.cloud
Usage
With the Iomete Airflow Plugin installed and configured,
you can now use the IometeOperator
in your Airflow DAGs to trigger Spark jobs.
All your DAG's should be added to dags/
folder in your Airflow home directory (or to the directory where you ran docker-compose)
Parameters:
-
task_id
: A unique identifier for the task in the DAG (required). -
job_id
: Spark Job ID or Name from IOMETE platform.job_id = 0978b946-42b1-4f8d-a807-8acfc32347f9
# or
job_id = "my-spark-job-name" -
config_override
: [Optional] Configuration overrides of the Spark Job. Example:{
"arguments": ["string"],
"envVars": {
"key": "value"
},
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
Example DAGs
Source of examples below could be found in this Github Repo
You can dynamically change config_override
params with each run by choosing "Run w/ config" from Airflow's UI.
from airflow import DAG, utils
from iomete_airflow_plugin.iomete_operator import IometeOperator
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": utils.dates.days_ago(0, second=1),
}
# You can change params from Airflow's UI by choosing "Run w/ config"
dag = DAG(
dag_id="iomete-dag-catalog-sync-runner",
default_args=args,
schedule_interval=None,
params={
'job_id': "0761a510-3a66-4c72-b06e-9d071f30d85d", # or name
'config_override': {
# everything inside config_override should be camelCase
"envVars": {
"env1": "value1"
},
"arguments": ["arg1"],
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
}
)
iomete_task = IometeOperator(
task_id="iomete-task-catalog-sync-runner",
job_id="{{ params.job_id }}",
config_override="{{ params.config_override }}",
dag=dag,
)
iomete_task
Sequential execution example
from airflow import DAG, utils
from iomete_airflow_plugin.iomete_operator import IometeOperator
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": utils.dates.days_ago(0, second=1),
}
dag = DAG(dag_id="iomete-dag-sequential-execution", default_args=args, schedule_interval=None)
task1 = IometeOperator(
task_id="first-task-catalog-sync",
job_id="0761a510-3a66-4c72-b06e-9d071f30d85d", # we use job id here
dag=dag,
)
task2 = IometeOperator(
task_id="second-task-sql-runner",
job_id="sql-runner", # we use job name here
dag=dag,
)
# task2 will be executed after task1
task1 >> task2
Support
For support and further assistance, you can use IOMETE Platform's support section or contact the IOMETE support team at support@iomete.com.