Skip to main content

Run Spark Jobs with Airflow

The IOMETE Airflow Plugin is an extension that lets you trigger and manage Spark jobs from your Airflow workflows. It submits job runs, monitors their status, and supports cancellation on task kill.

Installation

Prerequisites

  • Python 3.10 or later (up to 3.12)
  • Apache Airflow 3.x

Setup Airflow (Optional)

note

Skip this step if you already have Airflow installed and configured.

Two ways to install Airflow with the IOMETE plugin locally:

1. Docker Setup Clone the Airflow plugin repository and run docker-compose. It builds the latest version from source and starts Airflow webserver on http://localhost:8080. May take a couple of minutes to start.

docker-compose up --build

2. Helm Install Install Airflow on Kubernetes using the official Helm chart. Update the docker image in values.yaml to use the IOMETE Airflow image matching your Airflow 3.x version. This runs Airflow with the IOMETE plugin pre-installed.

Manually Install IOMETE Airflow Plugin

note

Skip this step if you installed Airflow with one of the methods above.

If you already have Airflow installed, install the plugin manually. In your Airflow home directory, run:

pip install iomete-airflow-plugin

Restart the Airflow webserver after installation.

Configuration

The operator needs API credentials and endpoint details to communicate with IOMETE. From Airflow UI, navigate to Admin > Variables and add the following:

VariableRequiredDescription
iomete_access_tokenYesPersonal access token for IOMETE API. See the Resources section below.
iomete_hostYesIOMETE platform host URL. Example: https://sandbox.iomete.cloud
iomete_domainYesIOMETE domain identifier. Required since plugin v2.0.0.
iomete_host_verifyNoSSL certificate verification. Defaults to True. Set to False to disable.
Airflow Variables | IOMETEAirflow Variables | IOMETE
Custom variable prefix

If you need to use a different prefix for these variables (e.g., to connect to multiple IOMETE environments), set the variable_prefix parameter on the operator. The default prefix is iomete_.

Usage

Once the plugin is installed and configured, you can use the IometeOperator in your DAGs to trigger Spark jobs.

All DAGs should be placed in the dags/ folder in your Airflow home directory (or the directory where you ran docker-compose).

Operator Parameters

ParameterTypeRequiredDefaultDescription
task_idstrYesUnique identifier for the task in the DAG.
job_idstrYesSpark Job ID (UUID) or name from IOMETE platform.
config_overridedict or strNo{}Configuration overrides for the Spark Job (see below).
polling_period_secondsintNo10Interval in seconds between status checks while the job is running.
do_xcom_pushboolNoFalsePush job_id and job_run_id to XCom for downstream tasks.
variable_prefixstrNoiomete_Prefix for Airflow Variables used by this operator.

Config Override

The config_override parameter accepts a dict (or JSON string) with these optional fields:

{
"arguments": ["arg1", "arg2"],
"envVars": {
"key": "value"
},
"sparkConf": {
"spark.example.variable": "sample_value"
}
}

Job Run States

The operator monitors the job run and reports these states:

StateDescriptionFinal?
ENQUEUEDJob is queuedNo
SUBMITTEDJob is being deployed (~1 min)No
RUNNINGJob is executingNo
COMPLETEDJob finished successfullyYes
FAILEDJob failedYes
ABORTEDJob was cancelledYes
ABORTINGJob cancellation in progressNo

The operator raises an AirflowException if the job ends in FAILED or ABORTED state. If the Airflow task is killed, the operator automatically cancels the running job in IOMETE.

Example DAGs

Link to Source

Source of examples below can be found in the GitHub repository.

Single Task

import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator

args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}

dag = DAG(dag_id="iomete-task", default_args=args, schedule_interval=None)

task = IometeOperator(
task_id="iomete-catalog-sync-task",
job_id="0761a510-3a66-4c72-b06e-9d071f30d85d",
dag=dag,
)

Task with Config Overrides

Overriding Config

You can dynamically change config_override params with each run by choosing "Run w/ config" from Airflow's UI.

import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator

args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}

dag = DAG(
dag_id="iomete-task-with-args",
default_args=args,
schedule_interval=None,
render_template_as_native_obj=True,
params=ParamsDict({
'job_id': "11ef35a6-ff9d-4996-bf16-7a7ef0baf4fc",
'config_override': {
"envVars": {
"env1": "value1"
},
"arguments": ["arg1"],
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
}),
)

task = IometeOperator(
task_id="iomete-catalog-sync-task-with-config",
job_id="{{ params.job_id }}",
config_override="{{ params.config_override }}",
dag=dag,
)

Sequential Execution

import pendulum
from airflow import DAG
from airflow.sdk.definitions.param import ParamsDict
from iomete_airflow_plugin.iomete_operator import IometeOperator

args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": pendulum.today("UTC"),
}

dag = DAG(dag_id="iomete-demo", default_args=args, schedule_interval=None)

catalog_task = IometeOperator(
task_id="task-01-sync-catalog",
job_id="iomete-catalog-sync", # using job name
dag=dag,
)

sql_task = IometeOperator(
task_id="task-02-run-sql",
job_id="sql-runner", # using job name
dag=dag,
)

# sql_task runs first, then catalog_task
sql_task >> catalog_task

Using XCom to Pass Data Between Tasks

When do_xcom_push=True, the operator pushes job_id and job_run_id to XCom, allowing downstream tasks to reference the job run.

task = IometeOperator(
task_id="my-task",
job_id="my-spark-job",
do_xcom_push=True,
dag=dag,
)

Downstream tasks can then access the values:

job_run_id = "{{ ti.xcom_pull(task_ids='my-task', key='job_run_id') }}"
job_id = "{{ ti.xcom_pull(task_ids='my-task', key='job_id') }}"

Support

For support and assistance, use the IOMETE platform's support section or contact the IOMETE support team at support@iomete.com.

Resources