SDK Usage for Spark Jobs
IOMETE's Python SDK (iomete-sdk) provides a convenient way to manage Spark Jobs programmatically.
With the SparkJobApiClient, you can create, update, delete, and run jobs, as well as retrieve run logs and metrics.
Installation
pip install iomete-sdk
Requires Python 3.9 or higher. The SDK depends on requests and dataclasses-json.
Prerequisites
You need the following to use the SDK:
- API Token: Create one from the IOMETE Console. See Resources below.
- Host: Your dataplane endpoint URL (e.g.,
https://your-dataplane.iomete.com). - Domain: Your IOMETE domain identifier. Both the host and domain can be found in Settings > General in the IOMETE Console.
Initialization
import os
from iomete_sdk.spark import SparkJobApiClient
job_client = SparkJobApiClient(
host="https://your-dataplane.iomete.com",
api_key=os.environ["IOMETE_API_TOKEN"],
domain="your-domain"
)
| Parameter | Type | Default | Description |
|---|---|---|---|
host | str | required | Dataplane endpoint URL |
api_key | str | required | IOMETE API token |
domain | str | required | IOMETE domain identifier |
verify | bool | True | SSL certificate verification. Set to False for self-signed certs. |
Creating a Job
Register a new Spark Job by sending a create request with the job payload:
- Python
- cURL
job_payload = {
"name": "my-etl-job",
"bundleId": "your-bundle-id",
"namespace": "default",
"template": {
"applicationType": "python",
"image": "iomete/spark-py:3.5.3-v1",
"mainApplicationFile": "path/to/job.py",
"instanceConfig": {
"driverType": "driver-x-small",
"executorType": "exec-x-small",
"executorCount": 1
}
}
}
job = job_client.create_job(payload=job_payload)
print(job)
curl -X POST 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs' \
-H 'X-API-TOKEN: YOUR_API_TOKEN' \
-H 'Content-Type: application/json' \
-d '{
"name": "my-etl-job",
"bundleId": "your-bundle-id",
"namespace": "default",
"template": {
"applicationType": "python",
"image": "iomete/spark-py:3.5.3-v1",
"mainApplicationFile": "path/to/job.py",
"instanceConfig": {
"driverType": "driver-x-small",
"executorType": "exec-x-small",
"executorCount": 1
}
}
}'
The bundleId field is required when creating a job. The SDK raises a ValueError if it is missing.
Job Payload Reference
| Field | Type | Required | Description |
|---|---|---|---|
name | string | Yes | Job name (must be non-blank) |
bundleId | string | Yes | Bundle identifier (SDK validates presence) |
namespace | string | Yes | Kubernetes namespace (must be allowed in domain) |
template | object | Yes | Job configuration (see template fields below) |
description | string | No | Job description |
flow | string | No | LEGACY (default) or PRIORITY |
priority | string | No | NORMAL (default) or HIGH (requires domain owner) |
jobUser | string | No | User identity for the job (defaults to authenticated user) |
jobType | string | No | MANUAL (default), SCHEDULED, or STREAMING |
schedule | string | Conditional | Cron expression (required when jobType is SCHEDULED, must be omitted otherwise) |
concurrency | string | No | ALLOW (default) or FORBID (only applies to SCHEDULED jobs) |
resourceTags | array | No | Resource tags for the job |
template.image | string | Yes | Docker image for the Spark job (must include tag, e.g., image:tag) |
template.mainApplicationFile | string | Yes | Path to the main application file |
template.applicationType | string | No | python, java, or scala |
template.mainClass | string | No | Main class (for Java/Scala jobs) |
template.arguments | array | No | Command-line arguments |
template.sparkConf | object | No | Spark configuration key-value pairs |
template.envVars | object | No | Environment variables |
template.configMaps | array | No | Config maps with key, content, mountPath |
template.deps.jars | array | No | JAR file dependencies |
template.deps.pyFiles | array | No | Python dependency files |
template.deps.packages | array | No | Maven/PIP packages |
template.instanceConfig | object | No | Driver/executor sizing |
template.instanceConfig.driverType | string | No | Driver node type (platform default applied if omitted) |
template.instanceConfig.executorType | string | No | Executor node type (platform default applied if omitted) |
template.instanceConfig.executorCount | integer | No | Number of executors (default 1, minimum 1) |
template.instanceConfig.singleNodeDeployment | boolean | No | Run driver and executor in a single node (default false) |
template.restartPolicy | object | No | Restart policy (e.g., {"type": "Never"}) |
template.maxExecutionDurationSeconds | string | No | Maximum execution time in seconds (e.g., "3600" for 1 hour, default "86400") |
template.volumeId | string | No | Volume identifier |
Updating a Job
Modify an existing job's configuration, such as changing its schedule or instance sizing:
- Python
- cURL
update_payload = {
**job,
"jobType": "SCHEDULED",
"schedule": "0 0 */1 * *"
}
updated = job_client.update_job(job_id=job["id"], payload=update_payload)
print(updated)
curl -X PUT 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID' \
-H 'X-API-TOKEN: YOUR_API_TOKEN' \
-H 'Content-Type: application/json' \
-d '{
"name": "my-etl-job",
"bundleId": "your-bundle-id",
"namespace": "default",
"jobType": "SCHEDULED",
"schedule": "0 0 */1 * *",
"template": {
"applicationType": "python",
"image": "iomete/spark-py:3.5.3-v1",
"mainApplicationFile": "path/to/job.py",
"instanceConfig": {
"driverType": "driver-x-small",
"executorType": "exec-x-small",
"executorCount": 1
}
}
}'
Listing Jobs
Retrieve all Spark Jobs in your domain:
- Python
- cURL
jobs = job_client.get_jobs()
for j in jobs:
print(j["name"], j["id"])
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Getting a Job
You can retrieve a job by its ID or by name:
- Python
- cURL
# By ID
job = job_client.get_job_by_id(job_id="your-job-id")
# By name
job = job_client.get_job_by_name(job_name="my-etl-job")
# By ID
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
# By name
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/name/my-etl-job' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Deleting a Job
Remove a job and its configuration permanently:
- Python
- cURL
job_client.delete_job_by_id(job_id=job["id"])
curl -X DELETE 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Running a Job
Submit a job run using submit_job_run. You can optionally override configuration like arguments:
- Python
- cURL
# Basic run
run = job_client.submit_job_run(job_id=job["id"], payload={})
print(run)
# With argument overrides
run = job_client.submit_job_run(job_id=job["id"], payload={
"arguments": ["arg1", "arg2"]
})
# Basic run
curl -X POST 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs' \
-H 'X-API-TOKEN: YOUR_API_TOKEN' \
-H 'Content-Type: application/json' \
-d '{}'
# With argument overrides
curl -X POST 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs' \
-H 'X-API-TOKEN: YOUR_API_TOKEN' \
-H 'Content-Type: application/json' \
-d '{"arguments": ["arg1", "arg2"]}'
Managing Job Runs
Listing Runs
- Python
- cURL
runs = job_client.get_job_runs(job_id=job["id"])
for r in runs:
print(r["id"], r.get("status"))
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Getting a Specific Run
- Python
- cURL
run_info = job_client.get_job_run_by_id(job_id=job["id"], run_id=run["id"])
print(run_info)
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs/RUN_ID' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Getting Run Logs
Retrieve logs for a specific run. The time_range parameter controls how far back to fetch logs (defaults to "5m").
- Python
- cURL
logs = job_client.get_job_run_logs(
job_id=job["id"],
run_id=run["id"],
time_range="1h"
)
for entry in logs:
print(entry["date"], entry["logLine"])
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs/RUN_ID/logs?range=1h' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Supported time ranges: 5m, 15m, 30m, 1h, 3h, 6h, 12h, 24h, 2d, 7d, 14d, 30d
Getting Run Metrics
- Python
- cURL
metrics = job_client.get_job_run_metrics(job_id=job["id"], run_id=run["id"])
print(metrics)
curl -X GET 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs/RUN_ID/metrics' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Cancelling a Run
- Python
- cURL
job_client.cancel_job_run(job_id=job["id"], run_id=run["id"])
curl -X DELETE 'https://your-dataplane.iomete.com/api/v2/domains/your-domain/sdk/spark/jobs/JOB_ID/runs/RUN_ID' \
-H 'X-API-TOKEN: YOUR_API_TOKEN'
Flow and Priority
The SDK supports Flow and Priority enums for job configuration:
from iomete_sdk.spark.spark_job import Flow, Priority
job_payload = {
"name": "priority-job",
"bundleId": "your-bundle-id",
"flow": Flow.PRIORITY.value, # "LEGACY" or "PRIORITY"
"priority": Priority.HIGH.value, # "NORMAL" or "HIGH"
"template": { ... }
}
job = job_client.create_job(payload=job_payload)
| Enum | Values | Description |
|---|---|---|
Flow | LEGACY, PRIORITY | Job execution flow type |
Priority | NORMAL, HIGH | Job scheduling priority |
Error Handling
The SDK raises specific exceptions for validation failures, API errors, and network issues:
import requests
from iomete_sdk.api_utils import ClientError
try:
job_client.create_job(payload={"name": "test"})
except ValueError as e:
# Validation errors (missing bundleId, invalid flow/priority)
print(f"Validation error: {e}")
except ClientError as e:
# API errors (400, 404, 500, etc.)
print(f"API error {e.status}: {e.content}")
except requests.exceptions.RequestException as e:
# Network/connection errors
print(f"Connection error: {e}")
| Exception | When |
|---|---|
ValueError | Missing bundleId, invalid flow or priority values |
ClientError | HTTP errors from the API. Has .status (int) and .content (dict) attributes |
RequestException | Network or connection errors (timeouts, DNS failures, etc.) |
API Reference
The following table lists all methods available on the SparkJobApiClient:
| Method | Description |
|---|---|
create_job(payload) | Create a new job |
update_job(job_id, payload) | Update an existing job |
get_jobs() | List all jobs |
get_job_by_id(job_id) | Get a job by ID |
get_job_by_name(job_name) | Get a job by name |
delete_job_by_id(job_id) | Delete a job |
submit_job_run(job_id, payload) | Submit a new run |
get_job_runs(job_id) | List runs for a job |
get_job_run_by_id(job_id, run_id) | Get a specific run |
get_job_run_logs(job_id, run_id, time_range) | Get run logs |
get_job_run_metrics(job_id, run_id) | Get run metrics |
cancel_job_run(job_id, run_id) | Cancel a running job |