Run Spark Jobs with Airflow
The IOMETE Airflow Plugin is an extension that lets you trigger and manage Spark jobs from your Airflow workflows. It submits job runs, monitors their status, and supports cancellation on task kill.
Installation
Prerequisites
- Python 3.10 or later (up to 3.12)
- Apache Airflow 3.x
Setup Airflow (Optional)
Skip this step if you already have Airflow installed and configured.
Two ways to install Airflow with the IOMETE plugin locally:
1. Docker Setup
Clone the Airflow plugin repository and run docker-compose. It builds the latest version from source and starts Airflow webserver on http://localhost:8080. May take a couple of minutes to start.
docker-compose up --build
2. Helm Install
Install Airflow on Kubernetes using the official Helm chart.
Update the docker image in values.yaml to use the IOMETE Airflow image matching your Airflow 3.x version. This runs Airflow with the IOMETE plugin pre-installed.
Manually Install IOMETE Airflow Plugin
Skip this step if you installed Airflow with one of the methods above.
If you already have Airflow installed, install the plugin manually. In your Airflow home directory, run:
pip install iomete-airflow-plugin
Restart the Airflow webserver after installation.
Configuration
The operator needs API credentials and endpoint details to communicate with IOMETE. From Airflow UI, navigate to Admin > Variables and add the following:
| Variable | Required | Description |
|---|---|---|
iomete_access_token | Yes | Personal access token for IOMETE API. See the Resources section below. |
iomete_host | Yes | IOMETE platform host URL. Example: https://sandbox.iomete.cloud |
iomete_domain | Yes | IOMETE domain identifier. Required since plugin v2.0.0. |
iomete_host_verify | No | SSL certificate verification. Defaults to True. Set to False to disable. |
If you need to use a different prefix for these variables (e.g., to connect to multiple IOMETE environments), set the variable_prefix parameter on the operator. The default prefix is iomete_.
Usage
Once the plugin is installed and configured, you can use the IometeOperator in your DAGs to trigger Spark jobs.
All DAGs should be placed in the dags/ folder in your Airflow home directory (or the directory where you ran docker-compose).
Operator Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
task_id | str | Yes | — | Unique identifier for the task in the DAG. |
job_id | str | Yes | — | Spark Job ID (UUID) or name from IOMETE platform. |
config_override | dict or str | No | {} | Configuration overrides for the Spark Job (see below). |
polling_period_seconds | int | No | 10 | Interval in seconds between status checks while the job is running. |
do_xcom_push | bool | No | False | Push job_id and job_run_id to XCom for downstream tasks. |
variable_prefix | str | No | iomete_ | Prefix for Airflow Variables used by this operator. |
Config Override
The config_override parameter accepts a dict (or JSON string) with these optional fields:
{
"arguments": ["arg1", "arg2"],
"envVars": {
"key": "value"
},
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
Job Run States
The operator monitors the job run and reports these states:
| State | Description | Final? |
|---|---|---|
ENQUEUED | Job is queued | No |
SUBMITTED | Job is being deployed (~1 min) | No |
RUNNING | Job is executing | No |
COMPLETED | Job finished successfully | Yes |
FAILED | Job failed | Yes |
ABORTED | Job was cancelled | Yes |
ABORTING | Job cancellation in progress | No |
The operator raises an AirflowException if the job ends in FAILED or ABORTED state.
If the Airflow task is killed, the operator automatically cancels the running job in IOMETE.
Example DAGs
Source of examples below can be found in the GitHub repository.
Single Task
import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}
dag = DAG(dag_id="iomete-task", default_args=args, schedule_interval=None)
task = IometeOperator(
task_id="iomete-catalog-sync-task",
job_id="0761a510-3a66-4c72-b06e-9d071f30d85d",
dag=dag,
)
Task with Config Overrides
You can dynamically change config_override params with each run by choosing "Run w/ config" from Airflow's UI.
import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}
dag = DAG(
dag_id="iomete-task-with-args",
default_args=args,
schedule_interval=None,
render_template_as_native_obj=True,
params=ParamsDict({
'job_id': "11ef35a6-ff9d-4996-bf16-7a7ef0baf4fc",
'config_override': {
"envVars": {
"env1": "value1"
},
"arguments": ["arg1"],
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
}),
)
task = IometeOperator(
task_id="iomete-catalog-sync-task-with-config",
job_id="{{ params.job_id }}",
config_override="{{ params.config_override }}",
dag=dag,
)
Sequential Execution
import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}
dag = DAG(dag_id="iomete-demo", default_args=args, schedule_interval=None)
catalog_task = IometeOperator(
task_id="task-01-sync-catalog",
job_id="iomete-catalog-sync", # using job name
dag=dag,
)
sql_task = IometeOperator(
task_id="task-02-run-sql",
job_id="sql-runner", # using job name
dag=dag,
)
# sql_task runs first, then catalog_task
sql_task >> catalog_task
Using XCom to Pass Data Between Tasks
When do_xcom_push=True, the operator pushes job_id and job_run_id to XCom, allowing downstream tasks to reference the job run.
task = IometeOperator(
task_id="my-task",
job_id="my-spark-job",
do_xcom_push=True,
dag=dag,
)
Downstream tasks can then access the values:
job_run_id = "{{ ti.xcom_pull(task_ids='my-task', key='job_run_id') }}"
job_id = "{{ ti.xcom_pull(task_ids='my-task', key='job_id') }}"
Support
For support and assistance, use the IOMETE platform's support section or contact the IOMETE support team at support@iomete.com.