Apache Iceberg has recently grown in popularity because it adds data warehouse-like capabilities to your data lake making it easier to analyze all your data—structured and unstructured. It offers several benefits such as schema evolution, hidden partitioning, time travel, and more that improve the productivity of data engineers and data analysts. However, you need to regularly maintain Iceberg tables to keep them in a healthy state so that read queries can perform faster. This blog discusses a few problems that you might encounter with Iceberg tables and offers strategies on how to optimize them in each of those scenarios. You can take advantage of a combination of the strategies provided and adapt them to your particular use cases.
Everytime a write operation occurs on an Iceberg table, a new snapshot is created. Over a period of time this can cause the table’s metadata.json file to get bloated and the number of old and potentially unnecessary data/delete files present in the data store to grow, increasing storage costs. A bloated metadata.json file could increase both read/write times because a large metadata file needs to be read/written every time. Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. Expiring snapshots is a relatively cheap operation and uses metadata to determine newly unreachable files.
We can expire old snapshots using expire_snapshots
Over time the snapshots might reference many manifest files. This could cause a slowdown in query planning and increase the runtime of metadata queries. Furthermore, when first created the manifests may not lend themselves well to partition pruning, which increases the overall runtime of the query. On the other hand, if the manifests are well organized into discrete bounds of partitions, then partition pruning can prune away entire subtrees of data files.
We can solve the too many manifest files problem with rewrite_manifests and potentially get a well-balanced hierarchical tree of data files.
merge-on-read vs copy-on-write
Since Iceberg V2, whenever existing data needs to be updated (via delete, update, or merge statements), there are two options available: copy-on-write and merge-on-read. With the copy-on-write option, the corresponding data files of a delete, update, or merge operation will be read and entirely new data files will be written with the necessary write modifications. Iceberg doesn’t delete the old data files. So if you want to query the table before the modifications were applied you can use the time travel feature of Iceberg. In a later blog, we will go into details about how to take advantage of the time travel feature. If you decided that the old data files are not needed any more then you can get rid of them by expiring the older snapshot as discussed above.
With the merge-on-read option, instead of rewriting the entire data files during the write time, simply a delete file is written. This can be an equality delete file or a positional delete file. As of this writing, Spark doesn’t write equality deletes, but it is capable of reading them. The advantage of using this option is that your writes can be much quicker as you are not rewriting an entire data file. Suppose you want to delete a specific user’s data in a table because of GDPR requirements, Iceberg will simply write a delete file specifying the locations of the user data in the corresponding data files where the user’s data exist. So whenever you are reading the tables, Iceberg will dynamically apply those deletes and present a logical table where the user’s data is deleted even though the corresponding records are still present in the physical data files.
We enable the merge-on-read option for our customers by default. You can enable or disable them by setting the following properties based on your requirements. See Write properties.
Serializable vs snapshot isolation
The default isolation guarantee provided for the delete, update, and merge operations is serializable isolation. You could also change the isolation level to snapshot isolation. Both serializable and snapshot isolation guarantees provide a read-consistent view of your data. Serializable Isolation is a stronger guarantee. For instance, you have an employee table that maintains employee salaries. Now, you want to delete all records corresponding to employees with salary greater than $100,000. Let’s say this salary table has five data files and three of those have records of employees with salary greater than $100,000. When you initiate the delete operation, the three files containing employee salaries greater than $100,000 are selected, then if your “delete_mode” is merge-on-read a delete file is written that points to the positions to delete in those three data files. If your “delete_mode” is copy-on-write, then all three data files are simply rewritten.
Irrespective of the delete_mode, while the delete operation is happening, assume a new data file is written by another user with a salary greater than $100,000. If the isolation guarantee you chose is snapshot, then the delete operation will succeed and only the salary records corresponding to the original three data files are removed from your table. The records in the newly written data file while your delete operation was in progress, will remain intact. On the other hand, if your isolation guarantee was serializable, then your delete operation will fail and you will have to retry the delete from scratch. Depending on your use case you might want to reduce your isolation level to “snapshot.”
The presence of too many delete files will eventually reduce the read performance, because in Iceberg V2 spec, everytime a data file is read, all the corresponding delete files also need to be read (the Iceberg community is currently considering introducing a concept called “delete vector” in the future and that might work differently from the current spec). This could be very costly. The position delete files might contain dangling deletes, as in it might have references to data that are no longer present in any of the current snapshots.
For position delete files, compacting the position delete files mitigates the problem a little bit by reducing the number of delete files that need to be read and offering faster performance by better compressing the delete data. In addition the procedure also deletes the dangling deletes.
Rewrite position delete files
Iceberg provides a rewrite position delete files procedure in Spark SQL.
But the presence of delete files still pose a performance problem. Also, regulatory requirements might force you to eventually physically delete the data rather than do a logical deletion. This can be addressed by doing a major compaction and removing the delete files entirely, which is addressed later in the blog.
We typically want to minimize the number of files we are touching during a read. Opening files is costly. File formats like Parquet work better if the underlying file size is large. Reading more of the same file is cheaper than opening a new file. In Parquet, typically you want your files to be around 512 MB and row-group sizes to be around 128 MB. During the write phase these are controlled by “write.target-file-size-bytes” and “write.parquet.row-group-size-bytes” respectively. You might want to leave the Iceberg defaults alone unless you know what you are doing.
In Spark for example, the size of a Spark task in memory will need to be much higher to reach those defaults, because when data is written to disk, it will be compressed in Parquet/ORC. So getting your files to be of the desirable size is not easy unless your Spark task size is big enough.
Another problem arises with partitions. Unless aligned properly, a Spark task might touch multiple partitions. Let’s say you have 100 Spark tasks and each of them needs to write to 100 partitions, together they will write 10,000 small files. Let’s call this problem partition amplification.
The amplification problem could be addressed at write time by setting the appropriate write distribution mode in write properties. Insert distribution is controlled by “write.distribution-mode” and is defaulted to none by default. Delete distribution is controlled by “write.delete.distribution-mode” and is defaulted to hash, Update distribution is controlled by “write.update.distribution-mode” and is defaulted to hash and merge distribution is controlled by “write.merge.distribution-mode” and is defaulted to none.
The three write distribution modes that are available in Iceberg as of this writing are none, hash, and range. When your mode is none, no data shuffle occurs. You should use this mode only when you don’t care about the partition amplification problem or when you know that each task in your job only writes to a specific partition.
When your mode is set to hash, your data is shuffled by using the partition key to generate the hashcode so that each resultant task will only write to a specific partition. When your distribution mode is range, your data is distributed such that your data is ordered by the partition key or sort key if the table has a SortOrder.
Using the hash or range can get tricky as you are now repartitioning the data based on the number of partitions your table might have. This can cause your Spark tasks after the shuffle to be either too small or too large. This problem can be mitigated by enabling adaptive query execution in spark by setting “spark.sql.adaptive.enabled=true” (this is enabled by default from Spark 3.2). Several configs are made available in Spark to adjust the behavior of adaptive query execution. Leaving the defaults as is unless you know exactly what you are doing is probably the best option.
Even though the partition amplification problem could be mitigated by setting correct write distribution mode appropriate for your job, the resultant files could still be small just because the Spark tasks writing them could be small. Your job cannot write more data than it has.
To address the small files problem and delete files problem, Iceberg provides a feature to rewrite data files. This feature is currently available only with Spark. The rest of the blog will go into this in more detail. This feature can be used to compact or even expand your data files, incorporate deletes from delete files corresponding to the data files that are being rewritten, provide better data ordering so that more data could be filtered directly at read time, and more. It is one of the most powerful tools in your toolbox that Iceberg provides.
RewriteDataFiles
Iceberg provides a rewrite data files procedure in Spark SQL.
See RewriteDatafiles JavaDoc to see all the supported options.
Now let’s discuss what the strategy option means because it is important to understand to get more out of the rewrite data files procedure. There are three strategy options available. They are Bin Pack, Sort, and Z Order. Note that when using the Spark procedure the Z Order strategy is invoked by simply setting the sort_order to “zorder(columns…).”
Iceberg uses optimistic concurrency control when committing new snapshots. So, when we use rewrite data files to update our data a new snapshot is created. But before that snapshot is committed, a check is done to see if there are any conflicts. If a conflict occurs all the work done could potentially be discarded. It is important to plan maintenance operations to minimize potential conflicts. Let us discuss some of the sources of conflicts.
Iceberg provides several features that a modern data lake needs. With a little care, planning and understanding a bit of Iceberg’s architecture one can take maximum advantage of all the awesome features it provides.
To try some of these Iceberg features yourself you can sign up for one of our next live hands-on labs.
You can also watch the webinar to learn more about Apache Iceberg and see the demo to learn the latest capabilities.
This may have been caused by one of the following: