Skip to main content

File Streaming


Continuously ingest files from object storage into an Iceberg table.

File formats

Tested file formats.

  • CSV

Job creation

  • In the left sidebar, under Applications, click Job Templates.
  • Click Marketplace to open the list of preconfigured Marketplace jobs.
  • Find file-streaming-job, open the actions menu, and click Deploy.
  • The Marketplace template opens a pre-filled Create New Job form. Review the defaults and update the values for your environment.

You can also create a job template manually with New Job Template, but the Marketplace flow is recommended because it pre-populates the File Streaming image, Spark config, environment variables, and default config map.

Specify the following parameters (these are examples, you can change them based on your preference):

  • Name: file-streaming-job
  • Docker image: iomete/iomete-file-streaming:1.0.1
  • Main application file: local:///app/driver.py
  • Environment variables: LOG_LEVEL: INFO or ERROR
  • Spark config: spark.sql.streaming.schemaInference: true
IOMETE Spark Jobs Create | IOMETEIOMETE Spark Jobs Create | IOMETE

Environment variables

The Marketplace template includes LOG_LEVEL by default. You can also use environment variables to store sensitive values, such as passwords or secrets, and reference them in your config file using the ${DB_PASSWORD} syntax.

File Streaming environment variables | IOMETEFile Streaming environment variables | IOMETE

Spark config

The Marketplace template enables schema inference for CSV file streaming.

File Streaming Spark config | IOMETEFile Streaming Spark config | IOMETE

Config file

  • Config file: The Marketplace template includes a default application.conf file under the Config Maps tab. Review the configuration and update the source path and destination table values for your environment.

    The configuration uses HOCON syntax, which supports JSON-like objects with comments and unquoted keys. The example below matches the default Marketplace application.conf shape.

{
source: {
file: {
format: csv,
header: true,
path: "s3a://bucket/path_to_csv_files/",
max_files_per_trigger: 1,
latest_first: false,
max_file_age: "7d"
}
}
destination: {
schema: default,
table: csv_file_stream,
partitions: []
}
processing_time: {
interval: 30,
unit: seconds # minutes
}
}

Configuration properties

PropertyDescription
source.file

Source file configuration.

  • format File format to read. CSV is currently supported.
  • header Whether the CSV files include a header row.
  • path Source directory path, including the filesystem scheme, for example s3a://bucket/path_to_csv_files/.
  • max_files_per_trigger Maximum number of new files to process per streaming trigger.
  • latest_first Whether to process the latest files first when there is a backlog.
  • max_file_age Maximum age of files to be considered by the stream.
destination

Destination Iceberg table configuration.

  • schema Destination schema or database.
  • table Destination table.
  • partitions Optional destination partition columns.
processing_time

Streaming trigger interval configuration.

  • interval Processing trigger interval.
  • unit Processing trigger unit, such as seconds or minutes.
note

Use the full source path in source.file.path, including the filesystem scheme, for example s3a://bucket/path_to_csv_files/. The default Marketplace config omits schema; only add it if you provide explicit column definitions.

Tests

Prepare the dev environment

virtualenv .env #or python3 -m venv .env
source .env/bin/activate

pip install -e ."[dev]"

Run test

python3 -m pytest # or just pytest