Skip to main content

Writes

Writing with SQL

IOMETE supports INSERT INTO, MERGE INTO, and INSERT OVERWRITE


INSERT INTO

To append new data to a table, use INSERT INTO.

INSERT INTO db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO db.table SELECT ...

MERGE INTO

MERGE INTO queries that can express row-level updates.

Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.

info

MERGE INTO is recommended instead of INSERT OVERWRITE because Iceberg can replace only the affected data files, and because the data overwritten by a dynamic overwrite may change if the table’s partitioning changes.


MERGE INTO syntax

MERGE INTO updates a table, called the target table, using a set of updates from another query, called the source. The update for a row in the target table is found using the ON clause that is like a join condition.

MERGE INTO db.target t   -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN ... -- updates

Updates to rows in the target table are listed using WHEN MATCHED ... THEN .... Multiple MATCHED clauses can be added with conditions that determine when each match should be applied. The first matching expression is used.

WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1

Source rows (updates) that do not match can be inserted:

WHEN NOT MATCHED THEN INSERT *

Inserts also support additional conditions:

WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)

Only one record in the source data can update any given row of the target table, or else an error will be thrown.


INSERT OVERWRITE

INSERT OVERWRITE can replace data in the table with the result of a query. Overwrites are atomic operations for Iceberg tables.

The partitions that will be replaced by INSERT OVERWRITE depends on Spark’s partition overwrite mode and the partitioning of a table. MERGE INTO can rewrite only affected data files and has more easily understood behavior, so it is recommended instead of INSERT OVERWRITE.


Overwrite behavior

Spark’s default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION clause to a filter, but the PARTITION clause can only reference table columns.

Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic.

To demonstrate the behavior of dynamic and static overwrites, consider a logs table defined by the following DDL:

CREATE TABLE my_app.logs (
uuid string NOT NULL,
level string NOT NULL,
ts timestamp NOT NULL,
message string)
PARTITIONED BY (level, hours(ts))

Dynamic overwrite

When Spark’s overwrite mode is dynamic, partitions that have rows produced by the SELECT query will be replaced.

For example, this query removes duplicate log events from the example logs table.

INSERT OVERWRITE my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid

In dynamic mode, this will replace any partition with rows in the SELECT result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.


Static overwrite

When Spark’s overwrite mode is static, the PARTITION clause is converted to a filter that is used to delete from the table. If the PARTITION clause is omitted, all partitions will be replaced.

Because there is no PARTITION clause in the query above, it will drop all existing rows in the table when run in static mode, but will only write the logs from 1 July.

To overwrite just the partitions that were loaded, add a PARTITION clause that aligns with the SELECT query filter:

INSERT OVERWRITE my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

Note that this mode cannot replace hourly partitions like the dynamic example query because the PARTITION clause can only reference table columns, not hidden partitions.


DELETE FROM

Delete queries accept a filter to match rows to delete.

DELETE FROM prod.db.table
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'

If the delte filter matches entire partitions of the table, Iceberg will perform a metadata-only delete. If the filter matches individual rows of a table, then Iceberg will rewrite only the affected data files.


Writing to partitioned tables

Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table.

Let’s go through writing the data against below sample table:

CREATE TABLE db.sample (
id bigint,
data string,
category string,
ts timestamp)
PARTITIONED BY (days(ts), category)

To write data to the sample table, your data needs to be sorted by days(ts), category. If you’re inserting data with SQL statement, you can use ORDER BY to achieve it, like below:

INSERT INTO db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category

Type compatibility

Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables.

Spark type to Iceberg type

This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark.

SparkIceberg
booleanboolean
shortinteger
byteinteger
integerinteger
longlong
floatfloat
doubledouble
datedate
timestamptimestamp with timezone
charstring
varcharstring
stringstring
binarybinary
decimaldecimal
structstruct
arraylist
mapmap

info

The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here’re some points on write:

  • Iceberg numeric types (integer , long , float , double , decimal) support promotion during writes. e.g. You can write Spark types short, byte, integer, long to Iceberg type long.
  • You can write to Iceberg fixed type using Spark binary type. Note that assertion on the length will be performed.

Iceberg type to Spark type

This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark.

IcebergSparkNotes
booleanboolean
integerinteger
longlong
floatfloat
doubledouble
datedate
timeNot supported
timestamp with timezonetimestamp
timestamp without timezoneNot supported
stringstring
uuidstring
fixedbinary
binarybinary
decimaldecimal
structstruct
listarray
mapmap