Merge Into
Merges a set of updates, insertions, and deletions based on a source table into a target Delta table
You can upsert data from an Apache Spark DataFrame into a Delta table using the merge
operation. This operation is similar to the SQL MERGE
command but has additional support for deletes and extra conditions in updates, inserts, and deletes.
Syntax
MERGE INTO target_table_identifier [AS target_alias]
USING source_table_identifier [AS source_alias]
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
where
-
table_identifier
[database_name.] table_name:
A table name, optionally qualified with a database name.delta.<path-to-table>
: The location of an existing Delta table.
-
AS alias Define a table alias.
<merge_condition> =
How the rows from one relation are combined with the rows of another relation. An expression with a return type of Boolean.
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = value1 [, column2 = value2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
Operation semantics
Here is a detailed description of the merge
programmatic operation.
-
There can be any number of
whenMatched
andwhenNotMatched
clauses.- Multiple matches are allowed when matches are unconditionally deleted (since unconditional delete is not ambiguous even if there are multiple matches).
-
whenMatched
clauses are executed when a source row matches a target table row based on the match condition. These clauses have the following semantics.whenMatched
clauses can have at most oneupdate
and onedelete
action. Theupdate
action inmerge
only updates the specified columns (similar to theupdate
operation) of the matched target row. Thedelete
action deletes the matched row.- Each
whenMatched
clause can have an optional condition. If this clause condition exists, theupdate
ordelete
action is executed for any matching source-target row pair only when the clause condition is true. - If there are multiple
whenMatched
clauses, then they are evaluated in the order they are specified (that is, the order of the clauses matter). AllwhenMatched
clauses, except the last one, must have conditions. - If multiple
whenMatched
clauses have conditions and none of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged. - To update all the columns of the target Delta table with the corresponding columns of the source dataset, use
UPDATE SET *
. This is equivalent toUPDATE SET col1 = source.col1 [, col2 = source.col2 ...]
for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error
-
whenNotMatched
clauses are executed when a source row does not match any target row based on the match condition. These clauses have the following semantics.whenNotMatched
clauses can have only theinsert
action. The new row is generated based on the specified column and corresponding expressions. You do not need to specify all the columns in the target table. For unspecified target columns,NULL
is inserted.- Each
whenNotMatched
clause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored. - If there are multiple
whenNotMatched
clauses, then they are evaluated in the order they are specified (that is, the order of the clauses matter). AllwhenNotMatched
clauses, except the last one, must have conditions. - To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use
INSERT *
. This is equivalent toINSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...])
for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
A MERGE operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches.
Examples
You can use MERGE INTO
for complex operations like deduplicating data, upserting change data, applying SCD Type 2 operations, etc.
Merging new event updates
Suppose you have a table that contains new data for events with eventId
. Some of these events may already be present in the events
table. To merge the new data into the events
table, you want to update the matching rows (that is, eventId
already present) and insert the new rows (that is, eventId
not present). You can run the following:
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
-- short version
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *;
Data deduplication when writing into Delta tables
A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge
, you can avoid inserting duplicate records.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within the new dataset, it is inserted. Hence, deduplicate the new data before merging into the table.
If you know that you may get duplicate records only for a few days, you can optimize your query further by partitioning the table by date, and then specifying the date range of the target table to match on
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table.