Iceberg tables INSERT, MERGE, and DELETE Operations
Writing with SQL
IOMETE supports INSERT INTO
, MERGE INTO
, and INSERT OVERWRITE
INSERT INTO
To append new data to a table, use INSERT INTO
.
INSERT INTO db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO db.table SELECT ...
MERGE INTO
MERGE INTO
queries that can express row-level updates.
Iceberg supports MERGE INTO
by rewriting data files that contain rows that need to be updated in an overwrite
commit.
MERGE INTO is recommended instead of INSERT OVERWRITE because Iceberg can replace only the affected data files, and because the data overwritten by a dynamic overwrite may change if the table’s partitioning changes.
MERGE INTO syntax
MERGE INTO
updates a table, called the target table, using a set of updates from another query, called the source. The update for a row in the target table is found using the ON
clause that is like a join condition.
MERGE INTO db.target t -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN ... -- updates
Updates to rows in the target table are listed using WHEN MATCHED ... THEN ....
Multiple MATCHED
clauses can be added with conditions that determine when each match should be applied. The first matching expression is used.
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
Source rows (updates) that do not match can be inserted:
WHEN NOT MATCHED THEN INSERT *
Inserts also support additional conditions:
WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)
Only one record in the source data can update any given row of the target table, or else an error will be thrown.
INSERT OVERWRITE
INSERT OVERWRITE
can replace data in the table with the result of a query. Overwrites are atomic operations for Iceberg tables.
The partitions that will be replaced by INSERT OVERWRITE
depends on Spark’s partition overwrite mode and the partitioning of a table. MERGE INTO
can rewrite only affected data files and has more easily understood behavior, so it is recommended instead of INSERT OVERWRITE.
Overwrite behavior
Spark’s default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION
clause to a filter, but the PARTITION
clause can only reference table columns.
Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic.
To demonstrate the behavior of dynamic and static overwrites, consider a logs
table defined by the following DDL:
CREATE TABLE my_app.logs (
uuid string NOT NULL,
level string NOT NULL,
ts timestamp NOT NULL,
message string)
PARTITIONED BY (level, hours(ts))
Dynamic overwrite
When Spark’s overwrite mode is dynamic, partitions that have rows produced by the SELECT
query will be replaced.
For example, this query removes duplicate log events from the example logs
table.
INSERT OVERWRITE my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid
In dynamic mode, this will replace any partition with rows in the SELECT
result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.
Static overwrite
When Spark’s overwrite mode is static, the PARTITION
clause is converted to a filter that is used to delete from the table. If the PARTITION
clause is omitted, all partitions will be replaced.
Because there is no PARTITION
clause in the query above, it will drop all existing rows in the table when run in static mode, but will only write the logs from 1 July.
To overwrite just the partitions that were loaded, add a PARTITION
clause that aligns with the SELECT
query filter:
INSERT OVERWRITE my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid
Note that this mode cannot replace hourly partitions like the dynamic example query because the PARTITION
clause can only reference table columns, not hidden partitions.
DELETE FROM
Delete queries accept a filter to match rows to delete.
DELETE FROM prod.db.table
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
If the delte filter matches entire partitions of the table, Iceberg will perform a metadata-only delete. If the filter matches individual rows of a table, then Iceberg will rewrite only the affected data files.
Writing to partitioned tables
Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table.
Let’s go through writing the data against below sample table:
CREATE TABLE db.sample (
id bigint,
data string,
category string,
ts timestamp)
PARTITIONED BY (days(ts), category)
To write data to the sample table, your data needs to be sorted by days(ts)
, category.
If you’re inserting data with SQL statement, you can use ORDER BY
to achieve it, like below:
INSERT INTO db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category
Type compatibility
Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables.
Spark type to Iceberg type
This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark.
Spark | Iceberg |
---|---|
boolean | boolean |
short | integer |
byte | integer |
integer | integer |
long | long |
float | float |
double | double |
date | date |
timestamp | timestamp with timezone |
char | string |
varchar | string |
string | string |
binary | binary |
decimal | decimal |
struct | struct |
array | list |
map | map |
The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here’re some points on write:
- Iceberg numeric types (
integer
,long
,float
,double
,decimal
) support promotion during writes. e.g. You can write Spark types short, byte, integer, long to Iceberg type long. - You can write to Iceberg fixed type using Spark binary type. Note that assertion on the length will be performed.
Iceberg type to Spark type
This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark.
Iceberg | Spark | Notes |
---|---|---|
boolean | boolean | |
integer | integer | |
long | long | |
float | float | |
double | double | |
date | date | |
time | Not supported | |
timestamp with timezone | timestamp | |
timestamp without timezone | Not supported | |
string | string | |
uuid | string | |
fixed | binary | |
binary | binary | |
decimal | decimal | |
struct | struct | |
list | array | |
map | map |