30 minute read · February 8, 2023
Dremio Blog: Iceberg Reflections Optimization
· Principal Software Engineer, Dremio
Dremio maintains physically optimized representations of source data known as Data Reflections. It can be thought of as a hybrid between materialized views and indexes. This way, Dremio’s query optimizer can accelerate a query by utilizing one or more reflections to partially or entirely satisfy a query, rather than processing the raw data in the underlying data source. Dremio takes advantage of open data lake table formats and stores reflections as Apache Iceberg tables.
Large reflections should be designed to maximize partition and split pruning when used to accelerate queries. The basic idea is that if your reflection contains 1B rows, the reflection data files should be partitioned and sorted so that queries can scan as few physical files as possible. This can only happen if the reflection splits are designed to match query access patterns. For example, if your reflection contains a year’s worth of event data and a reflection is partitioned by date, then running reports on the last 7 days of event data will only need to access the splits (or files) from 7 partitions.
When running a query that is accelerated by a reflection, the way the reflection fields are partitioned and sorted and the SQL filters on these fields will directly impact how partitions and splits are pruned and scanned. In Dremio UI, pruning can be seen in the leaf operators of the final physical plan which can be found in the Final Physical Transformation Phase. To access it, you can go to the ‘Jobs’ section, then click on the specific job you ran (based on the query) and then Raw Profile -> Planning as shown in the screenshot below.
In the following examples, we use the SF Incidents datasets (2.2M records) with a raw reflection partitioned on the Category
field and sorted by DayOfWeek
. There are 39 distinct values for Category
and therefore we will have 39 partitions.The reflection is defined as shown in the image below.
After the reflection is built, we can check how many splits (across partitions) were created on the physical disk.
find . -name "*.parquet" | wc -l
115
Note : For this demonstration, we set the support key store.parquet.block-size to 1MB to artificially generate a lot of splits. We also set the distributed store to be local PDFS for ease of demonstration and understanding, though this is not recommended for production.
Here is the Iceberg manifest list associated with this reflection located at the below location:
/Users/bennychow/dev/dremioData/pdfs/accelerator/9f8a6fc9-a285-4cb2-b49c-b4f5707358e2/4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869_0/metadata/snap-1989587507948539403-1-c2c2f63b-daea-4e83-83ed-ca8d16f1dbe7.avro
As seen in the image above, this manifest list contains 18 manifest files. Each of these records in the list points to a manifest file and includes the lower and upper bound partition values for the parquet files within that manifest file. When SQL queries hit this table, if there are non-expression based filters on the partition columns, then the first stage of pruning can happen directly on this manifest list to reduce the number of manifest files that need to be read.Here is the Iceberg manifest file content for the manifest file with partition lower bound: LARCENY/THEFT
.
As you can see, this manifest file contains 21 splits (parquet files) with Categories between LARCENY/THEFT
and PORNOGRAPHY/OBSCENE MATERIAL
. For each Parquet file, the manifest file contains the lower and upper bound values for every field. This information is leveraged to do the second level of pruning when there are non-partition field filters on the table.
Notice in the snippet above, how each Parquet file contains mostly unique DayOfWeek
values. This is because we specified a sort on the DayOfWeek
column in the reflection definition. This becomes important when filtering on DayOfWeek
because now we can do a second stage of Parquet file pruning.
Example 1:
Let’s take a look at the split pruning for this example query whose WHERE clause ideally matches the reflection definition:
SELECT PdDistrict, count(1) FROM incidents_iceberg where Category = 'LARCENY/THEFT' and (DayOfWeek = 'Monday' or DayOfWeek = 'Sunday') group by 1
00-00 Screen : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 12011.199999999997, cumulative cost = {1022234.1199999998 rows, 6199344.203288001 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21877 00-01 Project(Fragment=[$0], Records=[$1], Path=[$2], Metadata=[$3], Partition=[$4], FileSize=[$5], IcebergMetadata=[$6], fileschema=[$7], PartitionData=[$8], OperationType=[$9]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 12011.199999999997, cumulative cost = {1021032.9999999998 rows, 6198143.083288001 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21876 00-02 WriterCommitter(final=[/Users/bennychow/dev/dremioData/pdfs/results/1cfc17ec-6707-161f-2255-14be46b60800]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 12011.199999999997, cumulative cost = {1009021.7999999998 rows, 6198141.882168001 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21875 00-03 Writer : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 12011.199999999997, cumulative cost = {997010.5999999999 rows, 6186130.682168 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21874 00-04 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 12011.199999999997, cumulative cost = {984999.3999999999 rows, 6174119.482168 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21818 00-05 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 12011.199999999997, cumulative cost = {972988.2 rows, 6174119.241944 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21817 00-06 HashAgg(group=[{0}], EXPR$1=[COUNT($1)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 12011.199999999997, cumulative cost = {960977.0 rows, 6174119.00172 cpu, 0.0 io, 9.84285184E8 network, 2113971.2 memory}, id = 21816 00-07 UnionExchange : rowType = RecordType(VARCHAR(65536) PdDistrict, INTEGER $f1): rowcount = 120112.0, cumulative cost = {840865.0 rows, 3771879.00172 cpu, 0.0 io, 9.84285184E8 network, 0.0 memory}, id = 21815 01-01 Project(PdDistrict=[$2], $f1=[1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, INTEGER $f1): rowcount = 120112.0, cumulative cost = {720753.0 rows, 2810983.00172 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21814 01-02 SelectionVectorRemover : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 120112.0, cumulative cost = {600641.0 rows, 2330533.8006 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21813 01-03 Filter(condition=[OR(=($1, 'Saturday'), =($1, 'Sunday'))]) : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 120112.0, cumulative cost = {480529.0 rows, 2210421.8006 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21812 01-04 TableFunction(filters=[[Filter on `DayOfWeek`: `DayOfWeek` in ( 'Saturday', 'Sunday') ]], columns=[`Category`, `DayOfWeek`, `PdDistrict`], Table Function Type=[DATA_FILE_SCAN], table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869"]) : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 240224.0, cumulative cost = {240305.0 rows, 240585.0006 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21811 01-05 Project(splitsIdentity=[$0], splits=[$1], colIds=[$2]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 20.0, cumulative cost = {81.0 rows, 361.0006 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21810 01-06 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 20.0, cumulative cost = {61.0 rows, 361.0 cpu, 0.0 io, 327680.0 network, 0.0 memory}, id = 21809 02-01 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`, `E_X_P_R_H_A_S_H_F_I_E_L_D`], Table Function Type=[SPLIT_ASSIGNMENT]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 20.0, cumulative cost = {41.0 rows, 41.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 21808 02-02 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`], Table Function Type=[SPLIT_GEN_MANIFEST_SCAN], ManifestFile Filter AnyColExpression=[((not_null(ref(name="Category")) and not_null(ref(name="DayOfWeek"))) and (ref(name="Category") == "LARCENY/THEFT" and (ref(name="DayOfWeek") == "Saturday" or ref(name="DayOfWeek") == "Sunday")))]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 20.0, cumulative cost = {21.0 rows, 21.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 21807 02-03 IcebergManifestList(table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869"], snapshot=[1989587507948539403], columns=[`splitsIdentity`, `splits`, `colIds`], splits=[1], metadataFileLocation=[/Users/bennychow/dev/dremioData/pdfs/accelerator/9f8a6fc9-a285-4cb2-b49c-b4f5707358e2/4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869_0/metadata/00000-6c11ee5b-8581-410f-aa15-99736a48ea50.metadata.json], ManifestList Filter Expression =[(not_null(ref(name="Category")) and ref(name="Category") == "LARCENY/THEFT")], manifestContent=[DATA]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 21806
In operator 02-03: IcebergManifestList, we scan 18 records in the Iceberg manifest file. We can see that the plan contains the following filter:
ManifestList Filter Expression =[(not_null(ref(name="Category")) and ref(name="Category") == "LARCENY/THEFT")]
Since the Category
field is both partitioned and filtered in our reflections definition, we can prune 18 manifest files down to 7. This can be manually verified by confirming that the value "LARCENY/THEFT"
falls between the lower and upper bound of 7 manifest files.
Furthermore, the operator metric ICEBERG_SUB_SCAN under 02-xx-03, helps validate this result.
If we now look at operator 02-02, we can see that we scan the records in each of the 7 manifest files. Here’s the filter:
ManifestFile Filter AnyColExpression=[((not_null(ref(name="Category")) and not_null(ref(name="DayOfWeek"))) and (ref(name="Category") == "LARCENY/THEFT" and (ref(name="DayOfWeek") == "Monday" or ref(name="DayOfWeek") == "Sunday")))]
This prunes the Parquet files across the 7 manifest files down to 8 splits.
In this example, because we sorted the reflection by DayOfWeek
, the Parquet files in each partition were sorted by DayOfWeek
. As a result, a filter on DayOfWeek
is likely to successfully prune splits. But today, planning does not look at the manifest file content so planning doesn’t know that applying the DayOfWeek
filter would result in 8 splits. Here the planner estimated the split count as 20 as seen in operator 02-02’s estimate.
We can also confirm that 8 splits were distributed across three threads in phase 01 as seen in the snippet below:
For a comparison, here is the same query run on a reflection with no partition or sorts (89 total splits) as seen in the snippets below. Even though the filters could be applied by the Iceberg API at the manifest file level, we still have to read all 89 splits because each Parquet file could contain data matching the filter conditions.
00-00 Screen : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 55375.59999999999, cumulative cost = {4380566.959999999 rows, 2.6365941667838007E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26670 00-01 Project(Fragment=[$0], Records=[$1], Path=[$2], Metadata=[$3], Partition=[$4], FileSize=[$5], IcebergMetadata=[$6], fileschema=[$7], PartitionData=[$8], OperationType=[$9]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 55375.59999999999, cumulative cost = {4375029.399999999 rows, 2.636040410783801E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26669 00-02 WriterCommitter(final=[/Users/bennychow/dev/dremioData/pdfs/results/1cfc0a98-f43e-d309-1f6a-28a314c03900]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 55375.59999999999, cumulative cost = {4319653.8 rows, 2.6360398570278008E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26668 00-03 Writer : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 55375.59999999999, cumulative cost = {4264278.2 rows, 2.6305022970278006E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26667 00-04 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 55375.59999999999, cumulative cost = {4208902.600000001 rows, 2.6249647370278005E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26666 00-05 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 55375.59999999999, cumulative cost = {4153527.0000000005 rows, 2.6249646262766004E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26665 00-06 HashAgg(group=[{0}], EXPR$1=[$SUM0($1)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 55375.59999999999, cumulative cost = {4098151.4000000004 rows, 2.6249645155254003E7 cpu, 0.0 io, 6.819135488E8 network, 1.0720716160000002E7 memory}, id = 26664 00-07 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 55375.59999999999, cumulative cost = {4042775.8000000003 rows, 2.5142133155254003E7 cpu, 0.0 io, 6.819135488E8 network, 9746105.600000001 memory}, id = 26663 00-08 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 55375.59999999999, cumulative cost = {3987400.2 rows, 2.5142132047742E7 cpu, 0.0 io, 6.819135488E8 network, 9746105.600000001 memory}, id = 26662 01-01 Project(PdDistrict=[$0], EXPR$1=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32($0)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 55375.59999999999, cumulative cost = {3932024.6 rows, 2.4256122447742E7 cpu, 0.0 io, 1458176.0 network, 9746105.600000001 memory}, id = 26661 01-02 HashAgg(group=[{0}], EXPR$1=[COUNT($1)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 55375.59999999999, cumulative cost = {3876649.0 rows, 2.403461894023E7 cpu, 0.0 io, 1458176.0 network, 9746105.600000001 memory}, id = 26660 01-03 Project(PdDistrict=[$2], $f1=[1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, INTEGER $f1): rowcount = 553756.0, cumulative cost = {3322893.0 rows, 1.295949894023E7 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26659 01-04 SelectionVectorRemover : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 553756.0, cumulative cost = {2769137.0 rows, 1.074446940267E7 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26658 01-05 Filter(condition=[OR(=($1, 'Monday'), =($1, 'Sunday'))]) : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 553756.0, cumulative cost = {2215381.0 rows, 1.019071340267E7 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26657 01-06 TableFunction(filters=[[Filter on `DayOfWeek`: `DayOfWeek` in ( 'Monday', 'Sunday') , Filter on `Category`: equal(`Category`, 'LARCENY/THEFT') ]], columns=[`Category`, `DayOfWeek`, `PdDistrict`], Table Function Type=[DATA_FILE_SCAN], table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."e4745aa7-7017-46a0-98b7-694f101d02d8"]) : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) DayOfWeek, VARCHAR(65536) PdDistrict): rowcount = 1107512.0, cumulative cost = {1107869.0 rows, 1109115.00267 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26656 01-07 Project(splitsIdentity=[$0], splits=[$1], colIds=[$2]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 89.0, cumulative cost = {357.0 rows, 1603.00267 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26655 01-08 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 89.0, cumulative cost = {268.0 rows, 1603.0 cpu, 0.0 io, 1458176.0 network, 0.0 memory}, id = 26654 02-01 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`, `E_X_P_R_H_A_S_H_F_I_E_L_D`], Table Function Type=[SPLIT_ASSIGNMENT]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 89.0, cumulative cost = {179.0 rows, 179.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 26653 02-02 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`], Table Function Type=[SPLIT_GEN_MANIFEST_SCAN], ManifestFile Filter AnyColExpression=[((not_null(ref(name="Category")) and not_null(ref(name="DayOfWeek"))) and (ref(name="Category") == "LARCENY/THEFT" and (ref(name="DayOfWeek") == "Monday" or ref(name="DayOfWeek") == "Sunday")))]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 89.0, cumulative cost = {90.0 rows, 90.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 26652 02-03 IcebergManifestList(table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."e4745aa7-7017-46a0-98b7-694f101d02d8"], snapshot=[2941837639883764846], columns=[`splitsIdentity`, `splits`, `colIds`], splits=[1], metadataFileLocation=[/Users/bennychow/dev/dremioData/pdfs/accelerator/9f8a6fc9-a285-4cb2-b49c-b4f5707358e2/e4745aa7-7017-46a0-98b7-694f101d02d8_0/metadata/00000-512e8a6d-ecf6-400e-90bc-04496f3b9165.metadata.json], manifestContent=[DATA]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 26651
Example 2:
Here is another query that filters on the partitioned column but the predicate is based on an expression that we cannot evaluate directly on the manifest list file using the Iceberg APIs:
SELECT PdDistrict, count(1) FROM incidents_iceberg where Category like '%THEFT%' group by 1
As a result, the filtering happens in Dremio after reading all of the splits
00-00 Screen : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 60704.999999999985, cumulative cost = {2373670.5 rows, 1.773242169792E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23788 00-01 Project(Fragment=[$0], Records=[$1], Path=[$2], Metadata=[$3], Partition=[$4], FileSize=[$5], IcebergMetadata=[$6], fileschema=[$7], PartitionData=[$8], OperationType=[$9]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 60704.999999999985, cumulative cost = {2367600.0 rows, 1.772635119792E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23787 00-02 WriterCommitter(final=[/Users/bennychow/dev/dremioData/pdfs/results/1cfc1149-c12f-b247-4dd7-00462cdbcc00]) : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 60704.999999999985, cumulative cost = {2306895.0 rows, 1.7726345127419997E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23786 00-03 Writer : rowType = RecordType(VARCHAR(65536) Fragment, BIGINT Records, VARCHAR(65536) Path, VARBINARY(65536) Metadata, INTEGER Partition, BIGINT FileSize, VARBINARY(65536) IcebergMetadata, VARBINARY(65536) fileschema, VARBINARY(65536) ARRAY PartitionData, INTEGER OperationType): rowcount = 60704.999999999985, cumulative cost = {2246190.0 rows, 1.7665640127419997E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23785 00-04 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 60704.999999999985, cumulative cost = {2185485.0 rows, 1.7604935127419997E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23784 00-05 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 60704.999999999985, cumulative cost = {2124780.0 rows, 1.7604933913319997E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23783 00-06 HashAgg(group=[{0}], EXPR$1=[$SUM0($1)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 60704.999999999985, cumulative cost = {2064075.0 rows, 1.7604932699219998E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.1752488E7 memory}, id = 23782 00-07 Project(PdDistrict=[$0], EXPR$1=[$1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 60704.999999999985, cumulative cost = {2003370.0 rows, 1.6390832699219998E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.068408E7 memory}, id = 23781 00-08 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 60704.999999999985, cumulative cost = {1942665.0 rows, 1.6390831485119998E7 cpu, 0.0 io, 7.462092799999998E8 network, 1.068408E7 memory}, id = 23780 01-01 Project(PdDistrict=[$0], EXPR$1=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32($0)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 60704.999999999985, cumulative cost = {1881960.0 rows, 1.5419551485119998E7 cpu, 0.0 io, 266240.0 network, 1.068408E7 memory}, id = 23779 01-02 HashAgg(group=[{0}], EXPR$1=[COUNT($1)]) : rowType = RecordType(VARCHAR(65536) PdDistrict, BIGINT EXPR$1): rowcount = 60704.999999999985, cumulative cost = {1821255.0 rows, 1.5176730271019999E7 cpu, 0.0 io, 266240.0 network, 1.068408E7 memory}, id = 23778 01-03 Project(PdDistrict=[$1], $f1=[1]) : rowType = RecordType(VARCHAR(65536) PdDistrict, INTEGER $f1): rowcount = 607050.0, cumulative cost = {1214205.0 rows, 3035730.27102 cpu, 0.0 io, 266240.0 network, 0.0 memory}, id = 23777 01-04 TableFunction(columns=[`Category`, `PdDistrict`], Table Function Type=[DATA_FILE_SCAN], table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869"]) : rowType = RecordType(VARCHAR(65536) Category, VARCHAR(65536) PdDistrict): rowcount = 607050.0, cumulative cost = {607155.0 rows, 607524.20052 cpu, 0.0 io, 266240.0 network, 0.0 memory}, id = 23776 01-05 Project(splitsIdentity=[$0], splits=[$1], colIds=[$2], Category_val=[$3]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val): rowcount = 13.0, cumulative cost = {105.0 rows, 474.20052 cpu, 0.0 io, 266240.0 network, 0.0 memory}, id = 23775 01-06 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 13.0, cumulative cost = {92.0 rows, 474.2 cpu, 0.0 io, 266240.0 network, 0.0 memory}, id = 23774 02-01 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`, `Category_val`, `E_X_P_R_H_A_S_H_F_I_E_L_D`], Table Function Type=[SPLIT_ASSIGNMENT]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val, INTEGER E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 13.0, cumulative cost = {79.0 rows, 266.2 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23773 02-02 SelectionVectorRemover : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val): rowcount = 13.0, cumulative cost = {66.0 rows, 253.2 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23772 02-03 Filter(condition=[LIKE($3, '%THEFT%':VARCHAR(7))]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val): rowcount = 13.0, cumulative cost = {53.0 rows, 240.2 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23771 02-04 TableFunction(columns=[`splitsIdentity`, `splits`, `colIds`, `Category_val`], Table Function Type=[SPLIT_GEN_MANIFEST_SCAN]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds, VARCHAR(65536) Category_val): rowcount = 26.0, cumulative cost = {27.0 rows, 27.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23770 02-05 IcebergManifestList(table=["__accelerator"."9f8a6fc9-a285-4cb2-b49c-b4f5707358e2"."4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869"], snapshot=[1989587507948539403], columns=[`splitsIdentity`, `splits`, `colIds`], splits=[1], metadataFileLocation=[/Users/bennychow/dev/dremioData/pdfs/accelerator/9f8a6fc9-a285-4cb2-b49c-b4f5707358e2/4a0ee4f6-bde4-46e2-bdbe-9cbd4e55d869_0/metadata/00000-6c11ee5b-8581-410f-aa15-99736a48ea50.metadata.json], manifestContent=[DATA]) : rowType = RecordType(RecordType(VARCHAR(65536) path, BIGINT offset, BIGINT length, BIGINT fileLength) splitsIdentity, VARBINARY(65536) splits, VARBINARY(65536) colIds): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23769
Notice how we have IcebergeManifestList → TableFunction → Filter → SelectionVectorRemover. Unlike the first example, operator 02-05 and 02-04 no longer include any ManifestList or ManifestFile filters. Instead, we process all the 115 splits with the split’s Category
value and apply the LIKE filter in the operators 02-03 and 02-02 to prune splits down to 26 (and thus scan only 26 Parquet files).
Partition Stats and Best Cost Plan
All the above examples looked at physical plans where the planner had already determined the best cost logical plan using specific reflections or not. In order for logical planning to know which reflections to use, the planner needs row count estimates so that the planner can pick the lowest cost scan among equivalent scans. (It’s a little more advanced than this since the planner can add relational operators to make query sub-trees equivalent)
Using the same incidents dataset above, let’s take a look at how the planner picks the best cost plan between the three scenarios:
- Scan on Incidents Table (i.e. incidents_parquet) - Partitioned by Category
- Scan on Raw Reflection 1 - Partition by DayOfWeek
- Scan on Raw Reflection 2 - No Partitions
Here’s how the reflections are defined in Dremio.
When running a user query, the query Raw Profile’s ‘Acceleration’ tab will contain the same definition information as shown here:
So, given this SQL:
select IncidentNum, Category, DayOfWeek from "incidents_parquet" where dir0 = '0_BURGLARY' and DayOfWeek = 'Sunday'
Let’s walk through Logical Planning and understand why the planner still chose to scan the base table “incidents_parquet” instead of scanning either reflection.
During the Logical Planning phase, we first generate canonicalized user query alternative plans which is just an equivalent form of the original user query that makes it more likely that a bottom up algebraic search will match. This is shown below:
Then we go through the considered reflections and try to match into each of the alternative plans. In this case, each reflection matched and generated 3 alternative plans.
The result of searching across the matched plans for the lowest cost (i.e. best cost) plan is shown in the Logical Planning phase. This is the plan that is passed on to physical planning.
Logical Planning phase
ProjectRel(IncidentNum=[$1], Category=[$2], DayOfWeek=[CAST('Sunday':VARCHAR(65536)):VARCHAR(65536)]): rowcount = 13731.449999999999, cumulative cost = {27462.899999999998 rows, 109851.874629 cpu, 54925.799999999996 io, 54925.799999999996 network, 0.0 memory}, id = 9567 FilesystemScanDrel(table=[LocalTestData.incidents.incidents_parquet], snapshot=[1082532425399104838], columns=[`dir0`, `IncidentNum`, `Category`, `DayOfWeek`], splits=[1], filters=[[Filter on `DayOfWeek`: equal(`DayOfWeek`, 'Sunday') ]], partitionFilters=[partition_range_filter:=($0, '0_BURGLARY');non_partition_range_filter:=($3, 'Sunday');]): rowcount = 13731.449999999999, cumulative cost = {13731.449999999999 rows, 54925.799999999996 cpu, 54925.799999999996 io, 54925.799999999996 network, 0.0 memory}, id = 9556
Among all possible equivalent plans, the above plan had the lowest cost. We can see the following:
- Scanning the incidents_parquet was better than going to either reflection
- The row count estimated was: 13731.449999999999
- A partition filter was applied partition_range_filter:=($0, '0_BURGLARY') and the planner was able to get an exact row count from the avro stats file. This count was further reduced by non_partition_range_filter:=($3, 'Sunday');]
Best Cost Plan for Raw Reflection 1 - Partition by DayOfWeek
ProjectRel(IncidentNum=[$1], Category=[$2], DayOfWeek=[CAST('Sunday':VARCHAR(65536)):VARCHAR(65536)]): rowcount = 17268.566310991286, cumulative cost = {34537.13262198257 rows, 138148.87585925648 cpu, 69074.26524396514 io, 69074.26524396514 network, 0.0 memory}, id = 9571 FilesystemScanDrel(table=["__accelerator"."4d0311b7-6107-4949-a1f8-588e6eecd846"."80e5d929-5ddf-4b6f-a8f0-c36f9437a0c7"], snapshot=[7272199703133699700], columns=[`dir0`, `IncidentNum`, `Category`, `DayOfWeek`], splits=[1], rowAdjust=[0.39079056935221795], filters=[[Filter on `dir0`: equal(`dir0`, '0_BURGLARY') ]], partitionFilters=[partition_range_filter:=($3, 'Sunday');non_partition_range_filter:=($0, '0_BURGLARY');]): rowcount = 17268.566310991286, cumulative cost = {17268.566310991286 rows, 69074.26524396514 cpu, 69074.26524396514 io, 69074.26524396514 network, 0.0 memory}, id = 9566
For each reflection, under the Acceleration tab, the query profile will pick out the best cost plan containing that reflection for reporting purposes. This may not be the plan that was selected during logical planning as is the case here. We can see the following:
- 4d0311b7-6107-4949-a1f8-588e6eecd846 is the reflection id
- The row count estimated was: 17268.566310991286
- A partition filter was applied partition_range_filter:=($3, 'Sunday') and the planner was able to get an exact row count from the avro stats file. The count was further reduced by non_partition_range_filter:=($0, '0_BURGLARY');]. However, 17,2678 > 13,731 so this plan was not selected.
Best Cost Plan for Raw Reflection 2 - No Partitions
ProjectRel(IncidentNum=[$1], Category=[$2], DayOfWeek=[CAST('Sunday':VARCHAR(65536)):VARCHAR(65536)]): rowcount = 116857.41616199167, cumulative cost = {233714.83232398334 rows, 934861.6664442567 cpu, 467429.6646479667 io, 467429.6646479667 network, 0.0 memory}, id = 9575 FilesystemScanDrel(table=["__accelerator"."83bd18c2-99df-455b-a10c-048c77a62ab5"."f5ca5ccd-ba04-449d-8f42-f80dbc55ccf2"], snapshot=[3517197514336406320], columns=[`dir0`, `IncidentNum`, `Category`, `DayOfWeek`], splits=[1], rowAdjust=[0.39079056935221795], filters=[[Filter on `DayOfWeek`: equal(`DayOfWeek`, 'Sunday') , Filter on `dir0`: equal(`dir0`, '0_BURGLARY') ]], partitionFilters=[non_partition_range_filter:AND(=($0, '0_BURGLARY'), =($3, 'Sunday'));]): rowcount = 116857.41616199167, cumulative cost = {116857.41616199167 rows, 467429.6646479667 cpu, 467429.6646479667 io, 467429.6646479667 network, 0.0 memory}, id = 9563
This reflection was not selected. We can see the following:
- 83bd18c2-99df-455b-a10c-048c77a62ab5 is the reflection id
- Row count estimate was very high, 116857.41616199167. The total number of records was about 2M.
- The row count estimate was derived from non_partition_range_filter:AND(=($0, '0_BURGLARY'), =($3, 'Sunday'));]. Since neither column was partitioned, this reflection had no stats on those columns and used a simple heuristic based on the filters and total row count to estimate the row count after filtering.
- It is possible to improve on simple heuristics when stats have been explicitly collected for the table. Read more here.
In summary, a reflection containing a large number of records should be designed to maximize partition and split pruning based on user query patterns. The examples demonstrated in this blog show how to verify that your queries are only reading the necessary data and provide insight into how the cost based planner picks among alternative user query plans. Finally, while the above discussion centered around Dremio Reflections, the same concepts apply to any Apache Iceberg table read by Dremio’s Sonar query engine.
If you are interested to explore reflections and try out Dremio's lakehouse platform, you can test drive it here.
Get a Free Early Release Copy of "Apache Iceberg: The Definitive Guide".