Skip to main content

SDK Usage for Spark Jobs

IOMETE's Python SDK (iomete-sdk) provides a convenient way to manage Spark Jobs programmatically. With the SparkJobApiClient, you can create, update, delete, and run jobs, as well as retrieve run logs and metrics.

Installation

pip install iomete-sdk
info

Requires Python 3.9 or higher. The SDK depends on requests and dataclasses-json.

Prerequisites

You need the following to use the SDK:

  • API Token: Create one from the IOMETE Console. See Resources below.
  • Host: Your dataplane endpoint URL (e.g., https://your-dataplane.iomete.com).
  • Domain: Your IOMETE domain identifier. Both the host and domain can be found in Settings > General in the IOMETE Console.

Initialization

import os
from iomete_sdk.spark import SparkJobApiClient

job_client = SparkJobApiClient(
host="https://your-dataplane.iomete.com",
api_key=os.environ["IOMETE_API_TOKEN"],
domain="your-domain"
)
ParameterTypeDefaultDescription
hoststrrequiredDataplane endpoint URL
api_keystrrequiredIOMETE API token
domainstrrequiredIOMETE domain identifier
verifyboolTrueSSL certificate verification. Set to False for self-signed certs.

Creating a Job

Register a new Spark Job by sending a create request with the job payload:

job_payload = {
"name": "my-etl-job",
"bundleId": "your-bundle-id",
"namespace": "default",
"template": {
"applicationType": "python",
"image": "iomete/spark-py:3.5.3-v1",
"mainApplicationFile": "path/to/job.py",
"instanceConfig": {
"driverType": "driver-x-small",
"executorType": "exec-x-small",
"executorCount": 1
}
}
}

job = job_client.create_job(payload=job_payload)
print(job)
note

The bundleId field is required when creating a job. The SDK raises a ValueError if it is missing.

Job Payload Reference

FieldTypeRequiredDescription
namestringYesJob name (must be non-blank)
bundleIdstringYesBundle identifier (SDK validates presence)
namespacestringYesKubernetes namespace (must be allowed in domain)
templateobjectYesJob configuration (see template fields below)
descriptionstringNoJob description
flowstringNoLEGACY (default) or PRIORITY
prioritystringNoNORMAL (default) or HIGH (requires domain owner)
jobUserstringNoUser identity for the job (defaults to authenticated user)
jobTypestringNoMANUAL (default), SCHEDULED, or STREAMING
schedulestringConditionalCron expression (required when jobType is SCHEDULED, must be omitted otherwise)
concurrencystringNoALLOW (default) or FORBID (only applies to SCHEDULED jobs)
resourceTagsarrayNoResource tags for the job
template.imagestringYesDocker image for the Spark job (must include tag, e.g., image:tag)
template.mainApplicationFilestringYesPath to the main application file
template.applicationTypestringNopython, java, or scala
template.mainClassstringNoMain class (for Java/Scala jobs)
template.argumentsarrayNoCommand-line arguments
template.sparkConfobjectNoSpark configuration key-value pairs
template.envVarsobjectNoEnvironment variables
template.configMapsarrayNoConfig maps with key, content, mountPath
template.deps.jarsarrayNoJAR file dependencies
template.deps.pyFilesarrayNoPython dependency files
template.deps.packagesarrayNoMaven/PIP packages
template.instanceConfigobjectNoDriver/executor sizing
template.instanceConfig.driverTypestringNoDriver node type (platform default applied if omitted)
template.instanceConfig.executorTypestringNoExecutor node type (platform default applied if omitted)
template.instanceConfig.executorCountintegerNoNumber of executors (default 1, minimum 1)
template.instanceConfig.singleNodeDeploymentbooleanNoRun driver and executor in a single node (default false)
template.restartPolicyobjectNoRestart policy (e.g., {"type": "Never"})
template.maxExecutionDurationSecondsstringNoMaximum execution time in seconds (e.g., "3600" for 1 hour, default "86400")
template.volumeIdstringNoVolume identifier

Updating a Job

Modify an existing job's configuration, such as changing its schedule or instance sizing:

update_payload = {
**job,
"jobType": "SCHEDULED",
"schedule": "0 0 */1 * *"
}
updated = job_client.update_job(job_id=job["id"], payload=update_payload)
print(updated)

Listing Jobs

Retrieve all Spark Jobs in your domain:

jobs = job_client.get_jobs()
for j in jobs:
print(j["name"], j["id"])

Getting a Job

You can retrieve a job by its ID or by name:

# By ID
job = job_client.get_job_by_id(job_id="your-job-id")

# By name
job = job_client.get_job_by_name(job_name="my-etl-job")

Deleting a Job

Remove a job and its configuration permanently:

job_client.delete_job_by_id(job_id=job["id"])

Running a Job

Submit a job run using submit_job_run. You can optionally override configuration like arguments:

# Basic run
run = job_client.submit_job_run(job_id=job["id"], payload={})
print(run)

# With argument overrides
run = job_client.submit_job_run(job_id=job["id"], payload={
"arguments": ["arg1", "arg2"]
})

Managing Job Runs

Listing Runs

runs = job_client.get_job_runs(job_id=job["id"])
for r in runs:
print(r["id"], r.get("status"))

Getting a Specific Run

run_info = job_client.get_job_run_by_id(job_id=job["id"], run_id=run["id"])
print(run_info)

Getting Run Logs

Retrieve logs for a specific run. The time_range parameter controls how far back to fetch logs (defaults to "5m").

logs = job_client.get_job_run_logs(
job_id=job["id"],
run_id=run["id"],
time_range="1h"
)
for entry in logs:
print(entry["date"], entry["logLine"])

Supported time ranges: 5m, 15m, 30m, 1h, 3h, 6h, 12h, 24h, 2d, 7d, 14d, 30d

Getting Run Metrics

metrics = job_client.get_job_run_metrics(job_id=job["id"], run_id=run["id"])
print(metrics)

Cancelling a Run

job_client.cancel_job_run(job_id=job["id"], run_id=run["id"])

Flow and Priority

The SDK supports Flow and Priority enums for job configuration:

from iomete_sdk.spark.spark_job import Flow, Priority

job_payload = {
"name": "priority-job",
"bundleId": "your-bundle-id",
"flow": Flow.PRIORITY.value, # "LEGACY" or "PRIORITY"
"priority": Priority.HIGH.value, # "NORMAL" or "HIGH"
"template": { ... }
}
job = job_client.create_job(payload=job_payload)
EnumValuesDescription
FlowLEGACY, PRIORITYJob execution flow type
PriorityNORMAL, HIGHJob scheduling priority

Error Handling

The SDK raises specific exceptions for validation failures, API errors, and network issues:

import requests
from iomete_sdk.api_utils import ClientError

try:
job_client.create_job(payload={"name": "test"})
except ValueError as e:
# Validation errors (missing bundleId, invalid flow/priority)
print(f"Validation error: {e}")
except ClientError as e:
# API errors (400, 404, 500, etc.)
print(f"API error {e.status}: {e.content}")
except requests.exceptions.RequestException as e:
# Network/connection errors
print(f"Connection error: {e}")
ExceptionWhen
ValueErrorMissing bundleId, invalid flow or priority values
ClientErrorHTTP errors from the API. Has .status (int) and .content (dict) attributes
RequestExceptionNetwork or connection errors (timeouts, DNS failures, etc.)

API Reference

The following table lists all methods available on the SparkJobApiClient:

MethodDescription
create_job(payload)Create a new job
update_job(job_id, payload)Update an existing job
get_jobs()List all jobs
get_job_by_id(job_id)Get a job by ID
get_job_by_name(job_name)Get a job by name
delete_job_by_id(job_id)Delete a job
submit_job_run(job_id, payload)Submit a new run
get_job_runs(job_id)List runs for a job
get_job_run_by_id(job_id, run_id)Get a specific run
get_job_run_logs(job_id, run_id, time_range)Get run logs
get_job_run_metrics(job_id, run_id)Get run metrics
cancel_job_run(job_id, run_id)Cancel a running job

Resources