Skip to main content

Kafka Streaming

kafka and IOMETE logo

This is a collection of data movement capabilities. This streaming job copies data from Kafka to Iceberg.

Table of Contents


Currently, two deserialization format supported.

  1. JSON
  2. AVRO


In the Spark configuration, a user-defined reference json schema can be defined, and the system processes the binary data accordingly. Otherwise, It considers the schema of the first row and assumes the rest of the rows is compatible.


Converts binary data according to the schema defined by the user or retrieves the schema from the schema registry.

Avro record streaming.

Job creation

  • In the left sidebar menu choose Spark Jobs
  • Click on Create

Specify the following parameters (these are examples, you can change them based on your preference):

  • Name: kafka-streaming-job
  • Docker image: iomete/iomete_kafka_streaming_job:0.2.1
  • Main application file: local:///app/
  • Environment variables: LOG_LEVEL: INFO or ERROR
IOMETE Spark Jobs Create kafka streaming
Environment variables

You can use Environment variables to store your sensitive variables like password, secrets, etc. Then you can use these variables in your config file using the ${DB_PASSWORD} syntax.

Config file

Scroll down and expand Application configurations section and click Add config file and paste following JSON.

IOMETE Spark Jobs add config file
kafka: {
bootstrap_servers: "localhost:9092",
topic: "usage.spark.0",
serialization_format: json,
group_id: group_1,
starting_offsets: latest,
trigger: {
interval: 5
unit: seconds # minutes
schema_registry_url: ""
database: {
schema: default,
table: spark_usage_20

Configuration properties


Required properties to connect and configure.


Kafka broker server.


Kafka topic name.


Value data serialization format.


Consumer group id.


Specify where to start instead.

  • interval Processing trigger interval.
  • unit Processing trigger unit: seconds, minutes

Destination database properties.

  • schema Specify the schema (database) to store into.
  • table Specify the table.

IOMETE Spark kafka streaming create spark job application configuration

And, hit the create button.


Prepare the dev environment

virtualenv .env #or python3 -m venv .env
source .env/bin/activate

pip install -e ."[dev]"

Run test

python3 -m pytest