Skip to main content

Run Spark Jobs with Airflow

The IOMETE's Airflow Plugin is an extension designed to make it easy for developers to trigger and manage Spark jobs within their Airflow workflows.
In this documentation, we will guide you through the installation, configuration, and usage of the plugin
to help you get started quickly and efficiently.

Installation

Prerequisites

  • Python 3.7 or later
  • Apache Airflow 2.x

Setup Airflow (Optional)

note

Skip this step if you already have Airflow installed and configured.

We will cover 2 ways of installing Airflow with IOMETE plugin (Operator) locally:

1. Docker Setup
Clone our Airflow plugin repository and run docker-compose file. It will build latest version from source and will start Airflow webserver on \http://localhost:8080\. May take couple of minutes to start.

docker-compose up --build

2. Helm Install
You can install Airflow directly to Kubernetes using official Helm chart. Just change the docker image to iomete/airflow:2.7.1 in values.yaml file. It will run Airflow with installed IOMETE plugin.

Manually Install IOMETE Airflow Plugin

note

Skip this step if you installed Airflow with one of the methods above.

If you already have Airflow installed and configured, you can install the IOMETE Airflow Plugin manually. In your Airflow home directory, run the following command (May need to restart Airflow webserver):

pip install iomete-airflow-plugin

Configure IOMETE Variables

Before using the IometeOperator, you need to configure the required parameters in your Airflow environment.
From Airflow UI, navigate to Admin > Variables and add the following variables:

  1. iomete_access_token - In the Resources section below, you can find a link to the detailed documentation on how to create an API token.

  2. iomete_host - The host of the IOMETE platform. For example: https://sandbox.iomete.cloud

Airflow Variables | IOMETEAirflow Variables | IOMETE

Usage

With the Iomete Airflow Plugin installed and configured, you can now use the IometeOperator in your Airflow DAGs to trigger Spark jobs.

All your DAG's should be added to dags/ folder in your Airflow home directory (or to the directory where you ran docker-compose)

Parameters:

  1. task_id: A unique identifier for the task in the DAG (required).

  2. job_id: Spark Job ID or Name from IOMETE platform.

    job_id = 0978b946-42b1-4f8d-a807-8acfc32347f9

    # or

    job_id = "my-spark-job-name"
  3. config_override: [Optional] Configuration overrides of the Spark Job. Example:

    {
    "arguments": ["string"],
    "envVars": {
    "key": "value"
    },
    "sparkConf": {
    "spark.example.variable": "sample_value"
    }
    }

Example DAGs

Link to Source

Source of examples below could be found in this Github Repo

Overriding Config

You can dynamically change config_override params with each run by choosing "Run w/ config" from Airflow's UI.

from airflow import DAG, utils
from iomete_airflow_plugin.iomete_operator import IometeOperator

args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": utils.dates.days_ago(0, second=1),
}

# You can change params from Airflow's UI by choosing "Run w/ config"
dag = DAG(
dag_id="iomete-dag-catalog-sync-runner",
default_args=args,
schedule_interval=None,
params={
'job_id': "0761a510-3a66-4c72-b06e-9d071f30d85d", # or name
'config_override': {
# everything inside config_override should be camelCase
"envVars": {
"env1": "value1"
},
"arguments": ["arg1"],
"sparkConf": {
"spark.example.variable": "sample_value"
}
}
}
)

iomete_task = IometeOperator(
task_id="iomete-task-catalog-sync-runner",
job_id="{{ params.job_id }}",
config_override="{{ params.config_override }}",
dag=dag,
)

iomete_task

Sequential execution example

from airflow import DAG, utils
from iomete_airflow_plugin.iomete_operator import IometeOperator

args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": utils.dates.days_ago(0, second=1),
}

dag = DAG(dag_id="iomete-dag-sequential-execution", default_args=args, schedule_interval=None)

task1 = IometeOperator(
task_id="first-task-catalog-sync",
job_id="0761a510-3a66-4c72-b06e-9d071f30d85d", # we use job id here
dag=dag,
)

task2 = IometeOperator(
task_id="second-task-sql-runner",
job_id="sql-runner", # we use job name here
dag=dag,
)

# task2 will be executed after task1
task1 >> task2

Support

For support and further assistance, you can use IOMETE Platform's support section or contact the IOMETE support team at support@iomete.com.

Resources