File Streaming
Continuously ingest files from object storage into an Iceberg table.
File formats
Tested file formats.
- CSV
Job creation
- In the left sidebar, under Applications, click Job Templates.
- Click Marketplace to open the list of preconfigured Marketplace jobs.
- Find
file-streaming-job, open the actions menu, and click Deploy. - The Marketplace template opens a pre-filled Create New Job form. Review the defaults and update the values for your environment.
You can also create a job template manually with New Job Template, but the Marketplace flow is recommended because it pre-populates the File Streaming image, Spark config, environment variables, and default config map.
Specify the following parameters (these are examples, you can change them based on your preference):
- Name:
file-streaming-job - Docker image:
iomete/iomete-file-streaming:1.0.1 - Main application file:
local:///app/driver.py - Environment variables:
LOG_LEVEL:INFOorERROR - Spark config:
spark.sql.streaming.schemaInference:true


Environment variables
The Marketplace template includes LOG_LEVEL by default. You can also use environment variables to store sensitive values, such as passwords or secrets, and reference them in your config file using the ${DB_PASSWORD} syntax.


Spark config
The Marketplace template enables schema inference for CSV file streaming.


Config file
-
Config file: The Marketplace template includes a default
application.conffile under theConfig Mapstab. Review the configuration and update the source path and destination table values for your environment.The configuration uses HOCON syntax, which supports JSON-like objects with comments and unquoted keys. The example below matches the default Marketplace
application.confshape.
{
source: {
file: {
format: csv,
header: true,
path: "s3a://bucket/path_to_csv_files/",
max_files_per_trigger: 1,
latest_first: false,
max_file_age: "7d"
}
}
destination: {
schema: default,
table: csv_file_stream,
partitions: []
}
processing_time: {
interval: 30,
unit: seconds # minutes
}
}
Configuration properties
| Property | Description |
|---|---|
source.file | Source file configuration.
|
destination | Destination Iceberg table configuration.
|
processing_time | Streaming trigger interval configuration.
|
Use the full source path in source.file.path, including the filesystem scheme, for example s3a://bucket/path_to_csv_files/. The default Marketplace config omits schema; only add it if you provide explicit column definitions.
Tests
Prepare the dev environment
virtualenv .env #or python3 -m venv .env
source .env/bin/activate
pip install -e ."[dev]"
Run test
python3 -m pytest # or just pytest