Merge Into
Merges a set of updates, insertions, and deletions based on a source table into a target Delta table
You can upsert data from an Apache Spark DataFrame into a Delta table using the merge operation. This operation is similar to the SQL MERGE command but has additional support for deletes and extra conditions in updates, inserts, and deletes.
Syntax
MERGE INTO target_table_identifier [AS target_alias]
USING source_table_identifier [AS source_alias]
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
where
-
table_identifier
[database_name.] table_name:A table name, optionally qualified with a database name.delta.<path-to-table>: The location of an existing Delta table.
-
AS alias Define a table alias.
<merge_condition> =
How the rows from one relation are combined with the rows of another relation. An expression with a return type of Boolean.
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = value1 [, column2 = value2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
Operation semantics
Here is a detailed description of the merge programmatic operation.
-
There can be any number of
whenMatchedandwhenNotMatchedclauses.- Multiple matches are allowed when matches are unconditionally deleted (since unconditional delete is not ambiguous even if there are multiple matches).
-
whenMatchedclauses are executed when a source row matches a target table row based on the match condition. These clauses have the following semantics.whenMatchedclauses can have at most oneupdateand onedeleteaction. Theupdateaction inmergeonly updates the specified columns (similar to theupdateoperation) of the matched target row. Thedeleteaction deletes the matched row.- Each
whenMatchedclause can have an optional condition. If this clause condition exists, theupdateordeleteaction is executed for any matching source-target row pair only when the clause condition is true. - If there are multiple
whenMatchedclauses, then they are evaluated in the order they are specified (that is, the order of the clauses matter). AllwhenMatchedclauses, except the last one, must have conditions. - If multiple
whenMatchedclauses have conditions and none of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged. - To update all the columns of the target Delta table with the corresponding columns of the source dataset, use
UPDATE SET *. This is equivalent toUPDATE SET col1 = source.col1 [, col2 = source.col2 ...]for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error
-
whenNotMatchedclauses are executed when a source row does not match any target row based on the match condition. These clauses have the following semantics.whenNotMatchedclauses can have only theinsertaction. The new row is generated based on the specified column and corresponding expressions. You do not need to specify all the columns in the target table. For unspecified target columns,NULLis inserted.- Each
whenNotMatchedclause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored. - If there are multiple
whenNotMatchedclauses, then they are evaluated in the order they are specified (that is, the order of the clauses matter). AllwhenNotMatchedclauses, except the last one, must have conditions. - To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use
INSERT *. This is equivalent toINSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...])for all the columns of the target Delta table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
A MERGE operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches.
Examples
You can use MERGE INTO for complex operations like deduplicating data, upserting change data, applying SCD Type 2 operations, etc.
Merging new event updates
Suppose you have a table that contains new data for events with eventId. Some of these events may already be present in the events table. To merge the new data into the events table, you want to update the matching rows (that is, eventId already present) and insert the new rows (that is, eventId not present). You can run the following:
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
-- short version
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *;
Data deduplication when writing into Delta tables
A common ETL use case is to collect logs into Delta table by appending them to a table. However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. With merge, you can avoid inserting duplicate records.
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
The dataset containing the new logs needs to be deduplicated within itself. By the SQL semantics of merge, it matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within the new dataset, it is inserted. Hence, deduplicate the new data before merging into the table.
If you know that you may get duplicate records only for a few days, you can optimize your query further by partitioning the table by date, and then specifying the date range of the target table to match on
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table.