Skip to main content

Iceberg Spark Procedures for Snapshot and Metadata

Spark Procedures

Usage

Procedures can be used from any configured Iceberg catalog with CALL. All procedures are in the namespace system.

CALL supports passing arguments by name (recommended) or by position. Mixing position and named arguments is not supported.

tip

Throughout this page, catalog_name is a placeholder. Replace it with the name of your configured Iceberg catalog.

Named arguments

All procedure arguments are named. When passing arguments by name, arguments can be in any order and any optional argument can be omitted.

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)

Positional arguments

When passing arguments by position, only the ending arguments may be omitted if they are optional.

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)

Snapshot management

rollback_to_snapshot

Roll back a table to a specific snapshot ID.

To roll back to a specific time, use rollback_to_timestamp.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
snapshot_idlongSnapshot ID to rollback to

Output

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Example

Roll back table db.sample to snapshot ID 1:

CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)

rollback_to_timestamp

Roll back a table to the snapshot that was current at some time.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
timestamptimestampA timestamp to rollback to

Output

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Example

Roll back db.sample to a specific day and time.

CALL catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')

set_current_snapshot

Sets the current snapshot ID for a table.

Unlike rollback, the snapshot is not required to be an ancestor of the current table state.

You can specify either a snapshot_id or a ref (branch or tag name) to set the current snapshot.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
snapshot_idlongSnapshot ID to set as current (use this or ref, not both)
refstringBranch or tag name to set as current (use this or snapshot_id)

Output

Output NameTypeDescription
previous_snapshot_idlongThe current snapshot ID before the rollback
current_snapshot_idlongThe new current snapshot ID

Examples

Set the current snapshot for db.sample to snapshot ID 1:

CALL catalog_name.system.set_current_snapshot('db.sample', 1)

Set the current snapshot for db.sample to a branch or tag named audit-branch:

CALL catalog_name.system.set_current_snapshot(table => 'db.sample', ref => 'audit-branch')

cherrypick_snapshot

Cherry-picks changes from a snapshot into the current table state.

Cherry-picking creates a new snapshot from an existing snapshot without altering or removing the original.

Only append and dynamic overwrite snapshots can be cherry-picked.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
snapshot_idlongThe snapshot ID to cherry-pick

Output

Output NameTypeDescription
source_snapshot_idlongThe table's current snapshot before the cherry-pick
current_snapshot_idlongThe snapshot ID created by applying the cherry-pick

Examples

Cherry-pick snapshot 1

CALL catalog_name.system.cherrypick_snapshot('my_table', 1)

Cherry-pick snapshot 1 with named args

CALL catalog_name.system.cherrypick_snapshot(snapshot_id => 1, table => 'my_table' )

publish_changes

Publishes staged Write-Audit-Publish (WAP) changes. When using WAP, writes are staged with a wap_id and only become visible after being explicitly published.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to publish to
wap_idstringThe WAP ID of the staged changes

Example

Publish staged WAP changes identified by my_wap_id to db.sample:

CALL catalog_name.system.publish_changes(table => 'db.sample', wap_id => 'my_wap_id')

fast_forward

Fast-forwards a branch to match the state of another branch. The target branch must be an ancestor of the source branch.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table containing the branches
branchstringName of the branch to fast-forward
tostringName of the branch to fast-forward to

Example

Fast-forward the main branch to match audit-branch:

CALL catalog_name.system.fast_forward(table => 'db.sample', branch => 'main', to => 'audit-branch')

Metadata management

Many maintenance actions can be performed using Iceberg stored procedures. Data Compaction Job

expire_snapshots

Each write/update/delete/upsert/compaction in Iceberg produces a new snapshot while keeping the old data and metadata
around for snapshot isolation and time travel. The expire_snapshots procedure can be used to remove older snapshots
and their files which are no longer needed.

This procedure will remove old snapshots and data files which are uniquely required by those old snapshots. This means
the expire_snapshots procedure will never remove files which are still required by a non-expired snapshot.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
older_thantimestampTimestamp before which snapshots will be removed (Default: 5 days ago)
retain_lastintNumber of ancestor snapshots to preserve regardless of older_than (defaults to 1)
max_concurrent_deletesintSize of the thread pool used for delete file actions (by default, no thread pool is used)
stream_resultsbooleanWhen true, deletion files will be sent to Spark driver by RDD partition (by default, all the files will be sent to Spark driver). This option is recommended to set to true to prevent Spark driver OOM from large file size

If older_than and retain_last are omitted, the table's expiration properties will be used.

Output

Output NameTypeDescription
deleted_data_files_countlongNumber of data files deleted by this operation
deleted_manifest_files_countlongNumber of manifest files deleted by this operation
deleted_manifest_lists_countlongNumber of manifest List files deleted by this operation

Examples

Remove snapshots older than specific day and time, but retain the last 100 snapshots:

CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)

remove_orphan_files

Used to remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to clean
older_thantimestampRemove orphan files created before this timestamp (Defaults to 3 days ago)
locationstringDirectory to look for files in (defaults to the table's location)
dry_runbooleanWhen true, don't actually remove files (defaults to false)
max_concurrent_deletesintSize of the thread pool used for delete file actions (by default, no thread pool is used)
file_list_viewstringName of a temporary view that returns the file list to compare against the table's metadata. Overrides the default listing
equal_schemesbooleanWhen true, compare only the path component of file URIs, ignoring scheme differences (defaults to false)
equal_authoritiesbooleanWhen true, compare only the path component of file URIs, ignoring authority differences (defaults to false)
prefix_mismatch_modestringAction when a location prefix mismatch is detected. Supported values: ERROR, IGNORE, DELETE (defaults to ERROR)

Output

Output NameTypeDescription
orphan_file_locationStringThe path to each file determined to be an orphan by this command

Examples

List all the files that are candidates for removal by performing a dry run of the remove_orphan_files command on this table without actually removing them:

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)

Remove any files in the tablelocation/data folder which are not known to the table db.sample.

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')

rewrite_data_files

Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs.

Iceberg can compact data files in parallel using Spark with the rewriteDataFiles action. This will combine small files into larger files to reduce metadata overhead and runtime file open cost.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
strategystringName of the strategy - binpack or sort. Defaults to binpack strategy
sort_orderstringComma separated sort_order_column. Where sort_order_column is a space separated sort order info per column (ColumnName SortDirection NullOrder).
SortDirection can be ASC or DESC. NullOrder can be NULLS FIRST or NULLS LAST
optionsmap<string, string>Options to be used for actions
wherestringpredicate as a string used for filtering the files. Note that all files that may contain data matching the filter will be selected for rewriting

Output

Output NameTypeDescription
rewritten_data_files_countintNumber of data which were re-written by this command
added_data_files_countintNumber of new data files which were written by this command

Examples

Rewrite the data files in table db.sample using the default rewrite algorithm of bin-packing to combine small files
and also split large files according to the default write size of the table.

CALL catalog_name.system.rewrite_data_files('db.sample')

Rewrite the data files in table db.sample by sorting all the data on id and name
using the same defaults as bin-pack to determine which files to rewrite.

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')

Rewrite the data files in table db.sample using bin-pack strategy in any partition where more than 2 or more files need to be rewritten.

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))

Rewrite the data files in table db.sample and select the files that may contain data matching the filter (id = 3 and name = "foo") to be rewritten.

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')

rewrite_manifests

Rewrite manifests for a table to optimize scan planning.

Data files in manifests are sorted by fields in the partition spec. This procedure runs in parallel using a Spark job.

info

This procedure invalidates all cached Spark plans that reference the affected table.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to update
use_cachingbooleanUse Spark caching during operation (defaults to true)
spec_idintRewrite manifests for a specific partition spec ID (defaults to all specs)

Output

Output NameTypeDescription
rewritten_manifests_countintNumber of manifests which were re-written by this command
added_mainfests_countintNumber of new manifest files which were written by this command

Examples

Rewrite the manifests in table db.sample and align manifest files with table partitioning.

CALL catalog_name.system.rewrite_manifests('db.sample')

Rewrite the manifests in table db.sample and disable the use of Spark caching. This could be done to avoid memory issues on executors.

CALL catalog_name.system.rewrite_manifests('db.sample', false)

Table migration

The snapshot and migrate procedures help test and migrate existing Hive or Spark tables to Iceberg.

snapshot

Create a light-weight temporary copy of a table for testing, without changing the source table.

The newly created table can be changed or written to without affecting the source table, but the snapshot uses the original table's data files.

When inserts or overwrites run on the snapshot, new files are placed in the snapshot table's location rather than the original table location.

When finished testing a snapshot table, clean it up by running DROP TABLE.

info

Because tables created by snapshot are not the sole owners of their data files, they are prohibited from
actions like expire_snapshots which would physically delete data files. Iceberg deletes, which only effect metadata,
are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's
integrity. DELETE statements executed against the original Hive table will remove original data files and the
snapshot table will no longer be able to access them.

See migrate to replace an existing table with an Iceberg table.

Usage

Argument NameRequired?TypeDescription
source_tablestringName of the table to snapshot
tablestringName of the new Iceberg table to create
locationstringTable location for the new table (delegated to the catalog by default)
propertiesmap<string, string>Properties to add to the newly created table
parallelismintNumber of threads to use for file reading (defaults to 1)

Output

Output NameTypeDescription
imported_files_countlongNumber of files added to the new table

Examples

Make an isolated Iceberg table which references table db.sample named db.snap at the
catalog's default location for db.snap.

CALL catalog_name.system.snapshot('db.sample', 'db.snap')

Migrate an isolated Iceberg table which references table db.sample named db.snap at
a manually specified location /tmp/temptable/.

CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')

migrate

Replace a table with an Iceberg table, loaded with the source's data files.

Table schema, partitioning, properties, and location will be copied from the source table.

Migrate will fail if any table partition uses an unsupported format. Supported formats are Avro, Parquet, and ORC.
Existing data files are added to the Iceberg table's metadata and can be read using a name-to-id mapping created from the original table schema.

To leave the original table intact while testing, use snapshot to create new temporary table that shares source data files and schema.

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to migrate
propertiesmap<string, string>Properties for the new Iceberg table
drop_backupbooleanWhen true, the backup table created during migration is dropped (defaults to false)
backup_table_namestringName of the backup table (defaults to the original table name with _backup suffix)
parallelismintNumber of threads to use for file reading (defaults to 1)

Output

Output NameTypeDescription
migrated_files_countlongNumber of files appended to the Iceberg table

Examples

Migrate the table db.sample in Spark's default catalog to an Iceberg table and add a property 'foo' set to 'bar':

CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))

Migrate db.sample in the current catalog to an Iceberg table without adding any additional properties:

CALL catalog_name.system.migrate('db.sample')

add_files

Attempts to directly add files from a Hive or file based table into a given Iceberg table. Unlike migrate or
snapshot, add_files can import files from a specific partition or partitions and does not create a new Iceberg table.
This command will create metadata for the new files and will not move them. This procedure will not analyze the schema
of the files to determine if they actually match the schema of the Iceberg table. Upon completion, the Iceberg table
will then treat these files as if they are part of the set of files owned by Iceberg. This means any subsequent
expire_snapshot calls will be able to physically delete the added files. This method should not be used if
migrate or snapshot are possible.

Usage

Argument NameRequired?TypeDescription
tablestringTable which will have files added to
source_tablestringTable where files should come from, paths are also possible in the form of `file_format`.`path`
partition_filtermap<string, string>A map of partitions in the source table to import from

Warning : Schema is not validated, adding files with different schema to the Iceberg table will cause issues.

Warning : Files added by this method can be physically deleted by Iceberg operations

Examples

Add the files from table db.src_table, a Hive or Spark table registered in the session Catalog, to Iceberg table
db.tbl. Only add files that exist within partitions where part_col_1 is equal to A.

CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('part_col_1', 'A')
)

Add files from a parquet file based table at location path/to/table to the Iceberg table db.tbl. Add all
files regardless of what partition they belong to.

CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => '`parquet`.`path/to/table`'
)

register_table

Registers an existing Iceberg table in the catalog using a metadata file location. This is useful when the table data already exists but is not yet tracked by the catalog.

Usage

Argument NameRequired?TypeDescription
tablestringName to register the table as
metadata_filestringPath to the Iceberg metadata file for the table

Output

Output NameTypeDescription
current_snapshot_idlongThe current snapshot ID of the table

Example

Register an existing Iceberg table from a metadata file:

CALL catalog_name.system.register_table(table => 'db.new_table', metadata_file => 'path/to/metadata.json')

Metadata information

ancestors_of

Report the live snapshot IDs of parents of a specified snapshot

Usage

Argument NameRequired?TypeDescription
tablestringName of the table to report live snapshot IDs
snapshot_idlongUse a specified snapshot to get the live snapshot IDs of parents

tip : Using snapshot_id

Given snapshots history with roll back to B and addition of C' -> D'

A -> B - > C -> D
\ -> C' -> (D')

Not specifying the snapshot ID would return A -> B -> C' -> D', while providing the snapshot ID of
D as an argument would return A-> B -> C -> D

Output

Output NameTypeDescription
snapshot_idlongthe ancestor snapshot id
timestamplongsnapshot creation time

Examples

Get all the snapshot ancestors of current snapshots(default)

CALL spark_catalog.system.ancestors_of('db.tbl')

Get all the snapshot ancestors by a particular snapshot

CALL spark_catalog.system.ancestors_of('db.tbl', 1)
CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')

create_changelog_view

Creates a view that contains the changes between two snapshots of a table. Each row in the view includes a _change_type column indicating whether the row was INSERT, DELETE, or UPDATE.

Usage

Argument NameRequired?TypeDescription
tablestringName of the source table
changelog_viewstringName of the changelog view to create (defaults to the table name with _changelog_view suffix)
optionsmap<string, string>Options for the changelog view
net_changesbooleanWhen true, returns only net changes (removes intermediate changes that cancel out). Defaults to false
compute_updatesbooleanWhen true, computes update rows from paired insert/delete rows. Requires identifier_columns. Defaults to false
identifier_columnsarrayColumns used to identify rows for computing updates

Example

Create a changelog view for db.sample:

CALL catalog_name.system.create_changelog_view(table => 'db.sample')

Create a changelog view with net changes and update computation:

CALL catalog_name.system.create_changelog_view(
table => 'db.sample',
changelog_view => 'db.sample_changes',
net_changes => true,
compute_updates => true,
identifier_columns => array('id')
)