From 5ec03a9700f77ba6dd465e84b4775d4a7e4323e9 Mon Sep 17 00:00:00 2001 From: "Daniel Q. Kim" Date: Wed, 20 May 2026 09:24:50 +0200 Subject: [PATCH 1/2] Fix Iceberg read optimization returning NULLs for stats-less manifests When an Iceberg manifest's per-file column statistics are absent or empty (common for non-Spark writers like pyiceberg with default settings), DataFileMetaInfo::columns_info is empty. The optimization in StorageObjectStorageSource::createReader misread this as "all columns are absent from the file" and returned constant NULLs for every row while still returning the correct row count. Result: silent data loss on icebergLocal, icebergS3, icebergAzure, icebergHDFS, and all *Cluster variants. Gate the optimization's absent-NULL loop directly on columns_info.empty() instead of introducing a separate stats-presence flag. When no usable per-column stats were parsed -- whether the manifest omitted the stats fields entirely or declared them but left them empty -- fall through to the Parquet reader, which correctly handles physically-present columns (read normally) and schema-evolved-absent columns (handled by IcebergMetadata::getInitialSchemaByPath setting the file's own schema as initial_header). columns_info is already serialized to workers in the cluster JSON path, so this changes no serialization format and keeps the fork's DataFileMetaInfo serde identical to upstream. Closes Altinity/ClickHouse#1545. Mirror of Altinity/ClickHouse#1688 (antalya-25.8 fix). Signed-off-by: Daniel Q. Kim --- .../StorageObjectStorageSource.cpp | 59 +- ...t_iceberg_read_optimization_empty_stats.py | 685 ++++++++++++++++++ 2 files changed, 716 insertions(+), 28 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index ee0cf191045c..a4a92ab58599 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -886,40 +886,43 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade } } } - for (const auto & column : requested_columns_list) + if (file_meta_data.has_value() && !file_meta_data.value()->columns_info.empty()) { - const auto & column_name = column.first; + for (const auto & column : requested_columns_list) + { + const auto & column_name = column.first; - if (file_meta_data.value()->columns_info.contains(column_name)) - continue; + if (file_meta_data.value()->columns_info.contains(column_name)) + continue; - if (!column.second.second.type->isNullable()) - continue; + if (!column.second.second.type->isNullable()) + continue; - /// With View over Iceberg table we have someting like 'materialize(time)' as column_name - /// Simple cheap check - if (column_name.starts_with("materialize(") && column_name.ends_with(")")) - continue; + /// With View over Iceberg table we have someting like 'materialize(time)' as column_name + /// Simple cheap check + if (column_name.starts_with("materialize(") && column_name.ends_with(")")) + continue; - /// Skip columns produced by prewhere or row-level filter expressions — - /// they are computed at read time, not stored in the file. - if (format_filter_info - && ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name) - || (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name))) - continue; + /// Skip columns produced by prewhere or row-level filter expressions — + /// they are computed at read time, not stored in the file. + if (format_filter_info + && ((format_filter_info->prewhere_info && column_name == format_filter_info->prewhere_info->prewhere_column_name) + || (format_filter_info->row_level_filter && column_name == format_filter_info->row_level_filter->column_name))) + continue; - /// Column is nullable and absent in file - constant_columns_with_values[column.second.first] = - ConstColumnWithValue{ - column.second.second, - Field() - }; - constant_columns.insert(column_name); - - LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'", - object_info->getPath(), - column_name, - column.second.second.type); + /// Column is nullable and absent in file + constant_columns_with_values[column.second.first] = + ConstColumnWithValue{ + column.second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + column.second.second.type); + } } } diff --git a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py new file mode 100644 index 000000000000..34dc2fda2918 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py @@ -0,0 +1,685 @@ +#!/usr/bin/env python3 +""" +Reproducer for Altinity/ClickHouse#1545: +icebergLocal() returns all-NULL columns when allow_experimental_iceberg_read_optimization=1 +and the Iceberg manifest has no column statistics. + +Root cause +---------- +The manifest Avro writer schema intentionally omits the optional stats fields +(value_counts, column_sizes, null_value_counts, lower_bounds, upper_bounds). +AvroForIcebergDeserializer.hasPath() therefore returns false for all three stats +paths in ManifestFile.cpp. columns_infos is left empty. + +StorageObjectStorageSource::createReader's second loop (the schema-evolution +"absent column as NULL" path) iterates over every requested column. Because +columns_infos is empty, every nullable column is absent → each is injected as a +constant NULL. With all requested columns constant, need_only_count is set to +true and the Parquet file is read in count-only mode; the output is the correct +number of rows, but every value is NULL. + +Tests +----- +- test_iceberg_local_returns_actual_rows_with_stats_less_manifest + FAILS on unpatched code: returns all-NULL rows instead of real data. + +- test_iceberg_local_returns_correct_rows_when_optimization_disabled + PASSES on unpatched code: regression guard — optimization=0 bypasses the + buggy path and reads the Parquet file normally. +""" + +import json +import os +import tempfile +import time +import uuid + +import avro.datafile +import avro.io +import avro.schema +import pyarrow as pa +import pyarrow.parquet as pq + +from helpers.iceberg_utils import get_uuid_str +from helpers.s3_tools import LocalUploader + + +# Iceberg v2 manifest list Avro schema (minimal fields needed by ClickHouse). +_MANIFEST_LIST_SCHEMA_STR = json.dumps({ + "type": "record", + "name": "manifest_file", + "fields": [ + {"name": "manifest_path", "type": "string"}, + {"name": "manifest_length", "type": "long"}, + {"name": "partition_spec_id", "type": "int"}, + {"name": "content", "type": "int"}, + {"name": "sequence_number", "type": "long"}, + {"name": "min_sequence_number", "type": "long"}, + {"name": "added_snapshot_id", "type": "long"}, + {"name": "added_files_count", "type": "int"}, + {"name": "existing_files_count", "type": "int"}, + {"name": "deleted_files_count", "type": "int"}, + {"name": "added_rows_count", "type": "long"}, + {"name": "existing_rows_count", "type": "long"}, + {"name": "deleted_rows_count", "type": "long"}, + ], +}) + +# Manifest entry Avro schema that deliberately omits all per-column stats fields: +# column_sizes, value_counts, null_value_counts, lower_bounds, upper_bounds. +# +# Absent fields → AvroForIcebergDeserializer.hasPath() returns false for each → +# ManifestFile.cpp skips stats collection → columns_infos is empty → +# the optimization treats every nullable column as a "schema-evolution absent column" +# and injects a constant NULL. +_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR = json.dumps({ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int"}, + {"name": "snapshot_id", "type": ["null", "long"]}, + {"name": "sequence_number", "type": ["null", "long"]}, + {"name": "file_sequence_number", "type": ["null", "long"]}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + {"name": "content", "type": "int"}, + {"name": "file_path", "type": "string"}, + {"name": "file_format", "type": "string"}, + { + "name": "partition", + "type": {"type": "record", "name": "r102", "fields": []}, + }, + {"name": "record_count", "type": "long"}, + {"name": "file_size_in_bytes", "type": "long"}, + ], + }, + }, + ], +}) + + +def _write_avro(schema, records, path, metadata=None): + with open(path, "wb") as f: + writer = avro.datafile.DataFileWriter(f, avro.io.DatumWriter(), schema) + if metadata: + for k, v in metadata.items(): + writer.set_meta(k, v if isinstance(v, bytes) else v.encode("utf-8")) + for rec in records: + writer.append(rec) + writer.close() + + +def _create_stats_less_iceberg_table(tmpdir, table_name, container_base): + """ + Build a minimal Iceberg v2 table under tmpdir/table_name. The manifest is + written with a stats-less Avro schema to trigger the optimization bug. + + container_base is the absolute path inside the ClickHouse container where the + table will be placed after upload. File paths embedded in the Avro records and + metadata.json must reference container_base because they are interpreted at + query time inside the container. + + Returns the local table directory path (tmpdir/table_name). + """ + table_local = os.path.join(tmpdir, table_name) + data_local = os.path.join(table_local, "data") + meta_local = os.path.join(table_local, "metadata") + os.makedirs(data_local) + os.makedirs(meta_local) + + # Parquet data file with two nullable columns. + arrow_schema = pa.schema([ + pa.field("id", pa.int32(), nullable=True), + pa.field("data", pa.string(), nullable=True), + ]) + pq.write_table( + pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array(["hello", "world", "iceberg"], type=pa.string()), + }, + schema=arrow_schema, + ), + os.path.join(data_local, "00000-0-data.parquet"), + ) + data_size = os.path.getsize(os.path.join(data_local, "00000-0-data.parquet")) + data_container_path = f"{container_base}/data/00000-0-data.parquet" + + snapshot_id = 1 + seq_number = 1 + ts_ms = int(time.time() * 1000) + + # Iceberg schema and partition-spec to embed as Avro file-level metadata. + # ManifestFile.cpp requires both keys to be present in the manifest's Avro header. + iceberg_schema = { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + } + + # Manifest file (stats-less Avro schema). + manifest_local_path = os.path.join(meta_local, "00000-0-manifest.avro") + manifest_container_path = f"{container_base}/metadata/00000-0-manifest.avro" + _write_avro( + avro.schema.parse(_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR), + [{ + "status": 1, # ADDED + "snapshot_id": snapshot_id, + "sequence_number": seq_number, + "file_sequence_number": seq_number, + "data_file": { + "content": 0, # DATA + "file_path": data_container_path, + "file_format": "PARQUET", + "partition": {}, + "record_count": 3, + "file_size_in_bytes": data_size, + }, + }], + manifest_local_path, + metadata={ + "schema": json.dumps(iceberg_schema), + "partition-spec": "[]", + }, + ) + manifest_size = os.path.getsize(manifest_local_path) + + # Manifest list. + mlist_filename = f"snap-{snapshot_id}-0-manifest-list.avro" + mlist_local_path = os.path.join(meta_local, mlist_filename) + mlist_container_path = f"{container_base}/metadata/{mlist_filename}" + _write_avro( + avro.schema.parse(_MANIFEST_LIST_SCHEMA_STR), + [{ + "manifest_path": manifest_container_path, + "manifest_length": manifest_size, + "partition_spec_id": 0, + "content": 0, # DATA + "sequence_number": seq_number, + "min_sequence_number": seq_number, + "added_snapshot_id": snapshot_id, + "added_files_count": 1, + "existing_files_count": 0, + "deleted_files_count": 0, + "added_rows_count": 3, + "existing_rows_count": 0, + "deleted_rows_count": 0, + }], + mlist_local_path, + ) + + # Table metadata JSON. + metadata = { + "format-version": 2, + "table-uuid": str(uuid.uuid4()), + "location": container_base, + "last-sequence-number": seq_number, + "last-updated-ms": ts_ms, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [{ + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + }], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": snapshot_id, + "snapshots": [{ + "snapshot-id": snapshot_id, + "sequence-number": seq_number, + "timestamp-ms": ts_ms, + "manifest-list": mlist_container_path, + "summary": {"operation": "append"}, + "schema-id": 0, + }], + "snapshot-log": [{"timestamp-ms": ts_ms, "snapshot-id": snapshot_id}], + "metadata-log": [], + "refs": {"main": {"snapshot-id": snapshot_id, "type": "branch"}}, + } + with open(os.path.join(meta_local, "v1.metadata.json"), "w") as f: + json.dump(metadata, f, indent=2) + + return table_local + + +def _upload_to_container(instance, local_table_dir, container_base): + uploader = LocalUploader(instance) + for root, _dirs, files in os.walk(local_table_dir): + for fname in files: + local_path = os.path.join(root, fname) + rel = os.path.relpath(local_path, local_table_dir) + uploader.upload_file(local_path, os.path.join(container_base, rel)) + + +# Manifest entry schema with value_counts only (partial stats). +# column_sizes and null_value_counts are absent. value_counts alone is enough to +# populate columns_info so the absent-NULL guard (columns_info.empty()) is not triggered. +_MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR = json.dumps({ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int"}, + {"name": "snapshot_id", "type": ["null", "long"]}, + {"name": "sequence_number", "type": ["null", "long"]}, + {"name": "file_sequence_number", "type": ["null", "long"]}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + {"name": "content", "type": "int"}, + {"name": "file_path", "type": "string"}, + {"name": "file_format", "type": "string"}, + { + "name": "partition", + "type": {"type": "record", "name": "r102", "fields": []}, + }, + {"name": "record_count", "type": "long"}, + {"name": "file_size_in_bytes", "type": "long"}, + # Only value_counts is present (column_sizes and null_value_counts absent). + { + "name": "value_counts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k81v81", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "value", "type": "long"}, + ], + }, + }, + }, + ], + }, + }, + ], +}) + +# Manifest entry schema with all three stats fields (value_counts, column_sizes, +# null_value_counts). Represents a manifest written by a full-stats writer such as Spark. +_MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR = json.dumps({ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int"}, + {"name": "snapshot_id", "type": ["null", "long"]}, + {"name": "sequence_number", "type": ["null", "long"]}, + {"name": "file_sequence_number", "type": ["null", "long"]}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": [ + {"name": "content", "type": "int"}, + {"name": "file_path", "type": "string"}, + {"name": "file_format", "type": "string"}, + { + "name": "partition", + "type": {"type": "record", "name": "r102", "fields": []}, + }, + {"name": "record_count", "type": "long"}, + {"name": "file_size_in_bytes", "type": "long"}, + { + "name": "column_sizes", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k81v81", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "value", "type": "long"}, + ], + }, + }, + }, + { + "name": "value_counts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k81v81_vc", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "value", "type": "long"}, + ], + }, + }, + }, + { + "name": "null_value_counts", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "k81v81_nc", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "value", "type": "long"}, + ], + }, + }, + }, + ], + }, + }, + ], +}) + + +def _create_iceberg_table_with_schema(tmpdir, table_name, container_base, + manifest_schema_str, manifest_extra_fields=None): + """ + Build a minimal Iceberg v2 table. manifest_extra_fields is added to each + manifest entry's data_file record when using schemas that include stats. + """ + table_local = os.path.join(tmpdir, table_name) + data_local = os.path.join(table_local, "data") + meta_local = os.path.join(table_local, "metadata") + os.makedirs(data_local) + os.makedirs(meta_local) + + arrow_schema = pa.schema([ + pa.field("id", pa.int32(), nullable=True), + pa.field("data", pa.string(), nullable=True), + ]) + pq.write_table( + pa.table( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "data": pa.array(["hello", "world", "iceberg"], type=pa.string()), + }, + schema=arrow_schema, + ), + os.path.join(data_local, "00000-0-data.parquet"), + ) + data_size = os.path.getsize(os.path.join(data_local, "00000-0-data.parquet")) + data_container_path = f"{container_base}/data/00000-0-data.parquet" + + snapshot_id = 1 + seq_number = 1 + ts_ms = int(time.time() * 1000) + + iceberg_schema = { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + } + + data_file_record = { + "content": 0, + "file_path": data_container_path, + "file_format": "PARQUET", + "partition": {}, + "record_count": 3, + "file_size_in_bytes": data_size, + } + if manifest_extra_fields: + data_file_record.update(manifest_extra_fields) + + manifest_local_path = os.path.join(meta_local, "00000-0-manifest.avro") + manifest_container_path = f"{container_base}/metadata/00000-0-manifest.avro" + _write_avro( + avro.schema.parse(manifest_schema_str), + [{ + "status": 1, + "snapshot_id": snapshot_id, + "sequence_number": seq_number, + "file_sequence_number": seq_number, + "data_file": data_file_record, + }], + manifest_local_path, + metadata={ + "schema": json.dumps(iceberg_schema), + "partition-spec": "[]", + }, + ) + manifest_size = os.path.getsize(manifest_local_path) + + mlist_filename = f"snap-{snapshot_id}-0-manifest-list.avro" + mlist_local_path = os.path.join(meta_local, mlist_filename) + mlist_container_path = f"{container_base}/metadata/{mlist_filename}" + _write_avro( + avro.schema.parse(_MANIFEST_LIST_SCHEMA_STR), + [{ + "manifest_path": manifest_container_path, + "manifest_length": manifest_size, + "partition_spec_id": 0, + "content": 0, + "sequence_number": seq_number, + "min_sequence_number": seq_number, + "added_snapshot_id": snapshot_id, + "added_files_count": 1, + "existing_files_count": 0, + "deleted_files_count": 0, + "added_rows_count": 3, + "existing_rows_count": 0, + "deleted_rows_count": 0, + }], + mlist_local_path, + ) + + metadata = { + "format-version": 2, + "table-uuid": str(uuid.uuid4()), + "location": container_base, + "last-sequence-number": seq_number, + "last-updated-ms": ts_ms, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [{ + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + }], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": snapshot_id, + "snapshots": [{ + "snapshot-id": snapshot_id, + "sequence-number": seq_number, + "timestamp-ms": ts_ms, + "manifest-list": mlist_container_path, + "summary": {"operation": "append"}, + "schema-id": 0, + }], + "snapshot-log": [{"timestamp-ms": ts_ms, "snapshot-id": snapshot_id}], + "metadata-log": [], + "refs": {"main": {"snapshot-id": snapshot_id, "type": "branch"}}, + } + with open(os.path.join(meta_local, "v1.metadata.json"), "w") as f: + json.dump(metadata, f, indent=2) + + return table_local + + +def test_iceberg_local_returns_actual_rows_with_stats_less_manifest( + started_cluster_iceberg_no_spark, +): + """ + FAILS on unpatched code (Altinity#1545). + + With allow_experimental_iceberg_read_optimization=1 (the default), reading an + icebergLocal table whose manifest omits column statistics returns all-NULL rows + instead of real data. The correct output is 3 rows: (1,'hello'), (2,'world'), + (3,'iceberg'). + """ + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_opt_on_stats_less_" + get_uuid_str() + container_base = ( + f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" + ) + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = _create_stats_less_iceberg_table(tmpdir, table_name, container_base) + _upload_to_container(instance, local_dir, container_base) + + result = instance.query( + f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + " ORDER BY id" + " SETTINGS allow_experimental_iceberg_read_optimization=1" + ).strip() + + assert result == "1\thello\n2\tworld\n3\ticeberg", ( + f"Got: {result!r}\n" + "All-NULL rows means the bug fired: empty columns_infos caused the " + "optimization to treat every nullable column as an absent schema-evolution " + "column and inject constant NULL (need_only_count=1)." + ) + + +def test_iceberg_local_returns_correct_rows_when_optimization_disabled( + started_cluster_iceberg_no_spark, +): + """ + PASSES on unpatched code. + + With allow_experimental_iceberg_read_optimization=0 the optimization block is + skipped entirely and the Parquet file is read normally. This test is a + regression guard: it must keep passing after the bug is fixed. + """ + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_opt_off_stats_less_" + get_uuid_str() + container_base = ( + f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" + ) + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = _create_stats_less_iceberg_table(tmpdir, table_name, container_base) + _upload_to_container(instance, local_dir, container_base) + + result = instance.query( + f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + " ORDER BY id" + " SETTINGS allow_experimental_iceberg_read_optimization=0" + ).strip() + + assert result == "1\thello\n2\tworld\n3\ticeberg" + + +def test_iceberg_local_partial_stats_manifest_reads_correctly( + started_cluster_iceberg_no_spark, +): + """ + Regression test for Altinity#1545: partial stats (value_counts only). + + A manifest that includes value_counts but omits column_sizes and + null_value_counts still populates columns_info. The fix must treat the + columns as real (not absent), so real data is returned. + """ + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_opt_on_partial_stats_" + get_uuid_str() + container_base = ( + f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" + ) + + # value_counts for field_id 1 (id) and 2 (data): 3 rows each. + extra = { + "value_counts": [ + {"key": 1, "value": 3}, + {"key": 2, "value": 3}, + ], + } + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = _create_iceberg_table_with_schema( + tmpdir, table_name, container_base, + _MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR, + manifest_extra_fields=extra, + ) + _upload_to_container(instance, local_dir, container_base) + + result = instance.query( + f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + " ORDER BY id" + " SETTINGS allow_experimental_iceberg_read_optimization=1" + ).strip() + + assert result == "1\thello\n2\tworld\n3\ticeberg", ( + f"Got: {result!r}\n" + "Partial-stats manifest (value_counts only) should not trigger the absent-NULL " + "path because columns_info is non-empty. All-NULL rows means the guard " + "fired incorrectly for partial-stats manifests." + ) + + +def test_iceberg_local_full_stats_manifest_reads_correctly( + started_cluster_iceberg_no_spark, +): + """ + Control test: full-stats manifest (Spark-like) must continue to return real data. + + A manifest with all three stats fields (column_sizes, value_counts, + null_value_counts) is the common case written by Spark. This test verifies the + optimization does not regress for the normal path after the fix. + """ + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_opt_on_full_stats_" + get_uuid_str() + container_base = ( + f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" + ) + + # Full stats for field_id 1 (id) and 2 (data). + extra = { + "column_sizes": [ + {"key": 1, "value": 64}, + {"key": 2, "value": 128}, + ], + "value_counts": [ + {"key": 1, "value": 3}, + {"key": 2, "value": 3}, + ], + "null_value_counts": [ + {"key": 1, "value": 0}, + {"key": 2, "value": 0}, + ], + } + + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = _create_iceberg_table_with_schema( + tmpdir, table_name, container_base, + _MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR, + manifest_extra_fields=extra, + ) + _upload_to_container(instance, local_dir, container_base) + + result = instance.query( + f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + " ORDER BY id" + " SETTINGS allow_experimental_iceberg_read_optimization=1" + ).strip() + + assert result == "1\thello\n2\tworld\n3\ticeberg", ( + f"Got: {result!r}\n" + "Full-stats manifest should return real data when optimization is enabled." + ) From 056eb1a3ec8926a0a1a9541cf767571b4e9219b5 Mon Sep 17 00:00:00 2001 From: "Daniel Q. Kim" Date: Mon, 22 Jun 2026 14:55:28 +0200 Subject: [PATCH 2/2] Rework Iceberg empty-stats tests per review feedback Address @zvonand's review of #1895: the tests were poorly scoped. The C++ fix (gate the optimization's absent-NULL loop on `!columns_info.empty()`) is unchanged; only the integration tests are reworked. - Drop the `allow_experimental_iceberg_read_optimization=0` test. The fix only changes behavior when the optimization is enabled, so the disabled path is unrelated to this change and belongs in a separate PR if at all. - Add `test_stats_less_manifest_schema_evolution_absent_column`, a discriminating test where the Iceberg schema declares `id`+`data` but the Parquet file was written under the older schema and physically contains only `id`. On the fixed build the stats-less manifest falls through to the Parquet reader, reads `id` for real and synthesizes the absent `data` column as NULL via `IcebergMetadata::getInitialSchemaByPath` (the file's own schema becomes `initial_header`). On the unpatched build `id` is NULL too, so the result discriminates. - Add `test_stats_less_manifest_cluster_returns_real_data`, exercising the cluster JSON serialization path with `icebergLocalCluster`. An empty `columns_info` must round-trip to the workers and still read real data, confirming the bug also affected the `*Cluster` variants. - Add `test_non_empty_stats_absent_column_still_null`, a non-regression guard that the optimization's legitimate absent-NULL path (a column absent from a non-empty `columns_info`) keeps returning NULL while present columns read for real. - Merge the partial-stats and full-stats cases into one parametrized `test_non_empty_stats_returns_real_data`. - Keep the Iceberg fixtures generated in code rather than checked in as static files (zvonand's readability comment). Each manifest entry and the `metadata.json` embed per-test runtime values -- the UUID-suffixed container path, the byte sizes of the freshly written Parquet/Avro files and a random `table-uuid` -- so they are templates filled at run time, not static blobs. The one static part, the Avro schemas, is deduplicated into module constants, and a module-level comment explains the choice. Verified on the fixed build: all 6 cases pass. With the guard reverted to run the loop unconditionally, the three stats-less tests fail with all-NULL rows while the two non-empty-stats guards pass. Signed-off-by: Daniel Q. Kim Co-Authored-By: Claude Opus 4.8 --- ...t_iceberg_read_optimization_empty_stats.py | 895 ++++++++---------- 1 file changed, 380 insertions(+), 515 deletions(-) diff --git a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py index 34dc2fda2918..fe0c8e6d085f 100644 --- a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py +++ b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_read_optimization_empty_stats.py @@ -1,31 +1,48 @@ #!/usr/bin/env python3 """ -Reproducer for Altinity/ClickHouse#1545: -icebergLocal() returns all-NULL columns when allow_experimental_iceberg_read_optimization=1 -and the Iceberg manifest has no column statistics. +Regression coverage for Altinity/ClickHouse#1545: +the Iceberg read optimization (``allow_experimental_iceberg_read_optimization=1``) +returned all-NULL columns when the manifest carried no per-file column statistics. Root cause ---------- -The manifest Avro writer schema intentionally omits the optional stats fields -(value_counts, column_sizes, null_value_counts, lower_bounds, upper_bounds). -AvroForIcebergDeserializer.hasPath() therefore returns false for all three stats -paths in ManifestFile.cpp. columns_infos is left empty. - -StorageObjectStorageSource::createReader's second loop (the schema-evolution -"absent column as NULL" path) iterates over every requested column. Because -columns_infos is empty, every nullable column is absent → each is injected as a -constant NULL. With all requested columns constant, need_only_count is set to -true and the Parquet file is read in count-only mode; the output is the correct -number of rows, but every value is NULL. - -Tests ------ -- test_iceberg_local_returns_actual_rows_with_stats_less_manifest - FAILS on unpatched code: returns all-NULL rows instead of real data. - -- test_iceberg_local_returns_correct_rows_when_optimization_disabled - PASSES on unpatched code: regression guard — optimization=0 bypasses the - buggy path and reads the Parquet file normally. +When a manifest's optional stats fields (``value_counts``, ``column_sizes``, +``null_value_counts``, ``lower_bounds``, ``upper_bounds``) are all absent -- +common for non-Spark writers such as pyiceberg with default settings -- +``DataFileMetaInfo::columns_info`` is left empty. The optimization's +absent-column loop in ``StorageObjectStorageSource::createReader`` then iterated +every requested column, found none in the empty ``columns_info`` map, and +injected each nullable column as a constant ``NULL``. With every requested +column constant, ``need_only_count`` became true and the Parquet file was read +in count-only mode: correct row count, every value ``NULL``. + +The fix gates that loop on ``!columns_info.empty()``; an empty map now falls +through to the Parquet reader, which reads physically-present columns normally +and synthesizes schema-evolved-absent columns as ``NULL`` via +``IcebergMetadata::getInitialSchemaByPath``. + +Test taxonomy +------------- +Discriminating (fail on the unpatched build, pass on the fixed build): + - test_stats_less_manifest_returns_real_data + - test_stats_less_manifest_schema_evolution_absent_column +Cluster/serialization path (the cluster JSON round-trip of an empty +``columns_info``): + - test_stats_less_manifest_cluster_returns_real_data +Non-regression guards (pass on both builds; the optimization's legitimate +absent-NULL and present-column behavior must keep working when stats exist): + - test_non_empty_stats_absent_column_still_null + - test_non_empty_stats_returns_real_data + +Why the Iceberg fixtures are generated in code rather than checked in as static +files: every manifest entry and the table ``metadata.json`` embed per-test +runtime values -- the UUID-suffixed container path, the byte sizes of the +freshly written Parquet and Avro files, and a random ``table-uuid`` -- so they +are templates filled at run time, not the static blobs that a checked-in fixture +(e.g. ``tests/queries/0_stateless/data_minio/.../v1.metadata.json``) would be. +The one genuinely static part, the Avro schemas, is deduplicated into the +module-level constants below; the stats-less manifest entry -- the artifact +under test -- is necessarily produced in code. """ import json @@ -39,11 +56,20 @@ import avro.schema import pyarrow as pa import pyarrow.parquet as pq +import pytest from helpers.iceberg_utils import get_uuid_str from helpers.s3_tools import LocalUploader +# The three rows every fixture stores; ``data`` is omitted from the Parquet file +# when a test needs a column the file physically lacks. +_IDS = [1, 2, 3] +_DATA = ["hello", "world", "iceberg"] +_REAL_DATA_RESULT = "1\thello\n2\tworld\n3\ticeberg" +_DATA_NULL_RESULT = "1\t\\N\n2\t\\N\n3\t\\N" + + # Iceberg v2 manifest list Avro schema (minimal fields needed by ClickHouse). _MANIFEST_LIST_SCHEMA_STR = json.dumps({ "type": "record", @@ -65,46 +91,84 @@ ], }) -# Manifest entry Avro schema that deliberately omits all per-column stats fields: -# column_sizes, value_counts, null_value_counts, lower_bounds, upper_bounds. -# -# Absent fields → AvroForIcebergDeserializer.hasPath() returns false for each → -# ManifestFile.cpp skips stats collection → columns_infos is empty → -# the optimization treats every nullable column as a "schema-evolution absent column" -# and injects a constant NULL. -_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR = json.dumps({ - "type": "record", - "name": "manifest_entry", - "fields": [ - {"name": "status", "type": "int"}, - {"name": "snapshot_id", "type": ["null", "long"]}, - {"name": "sequence_number", "type": ["null", "long"]}, - {"name": "file_sequence_number", "type": ["null", "long"]}, - { - "name": "data_file", + +def _data_file_fields(stats_fields): + """data_file record fields, optionally followed by the given stats arrays.""" + fields = [ + {"name": "content", "type": "int"}, + {"name": "file_path", "type": "string"}, + {"name": "file_format", "type": "string"}, + {"name": "partition", "type": {"type": "record", "name": "r102", "fields": []}}, + {"name": "record_count", "type": "long"}, + {"name": "file_size_in_bytes", "type": "long"}, + ] + for i, name in enumerate(stats_fields): + fields.append({ + "name": name, "type": { - "type": "record", - "name": "r2", - "fields": [ - {"name": "content", "type": "int"}, - {"name": "file_path", "type": "string"}, - {"name": "file_format", "type": "string"}, - { - "name": "partition", - "type": {"type": "record", "name": "r102", "fields": []}, - }, - {"name": "record_count", "type": "long"}, - {"name": "file_size_in_bytes", "type": "long"}, - ], + "type": "array", + "items": { + "type": "record", + "name": f"k81v81_{i}", + "fields": [ + {"name": "key", "type": "int"}, + {"name": "value", "type": "long"}, + ], + }, }, - }, - ], -}) + }) + return fields -def _write_avro(schema, records, path, metadata=None): +def _manifest_entry_schema(stats_fields): + """Iceberg v2 manifest entry Avro schema carrying the given stats arrays.""" + return json.dumps({ + "type": "record", + "name": "manifest_entry", + "fields": [ + {"name": "status", "type": "int"}, + {"name": "snapshot_id", "type": ["null", "long"]}, + {"name": "sequence_number", "type": ["null", "long"]}, + {"name": "file_sequence_number", "type": ["null", "long"]}, + { + "name": "data_file", + "type": { + "type": "record", + "name": "r2", + "fields": _data_file_fields(stats_fields), + }, + }, + ], + }) + + +# Stats-less manifest: omits every optional stats field -> columns_info is empty. +_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR = _manifest_entry_schema([]) +# Partial stats: value_counts only (column_sizes and null_value_counts absent); +# enough to populate columns_info so the empty-stats guard is not triggered. +_MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR = _manifest_entry_schema(["value_counts"]) +# Full stats: the common Spark-written shape. +_MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR = _manifest_entry_schema( + ["column_sizes", "value_counts", "null_value_counts"] +) + + +def _schema_fields(with_data): + fields = [{"id": 1, "name": "id", "required": False, "type": "int"}] + if with_data: + fields.append({"id": 2, "name": "data", "required": False, "type": "string"}) + return fields + + +def _iceberg_schema(schema_id, with_data): + return {"type": "struct", "schema-id": schema_id, "fields": _schema_fields(with_data)} + + +def _write_avro(schema_str, records, path, metadata=None): with open(path, "wb") as f: - writer = avro.datafile.DataFileWriter(f, avro.io.DatumWriter(), schema) + writer = avro.datafile.DataFileWriter( + f, avro.io.DatumWriter(), avro.schema.parse(schema_str) + ) if metadata: for k, v in metadata.items(): writer.set_meta(k, v if isinstance(v, bytes) else v.encode("utf-8")) @@ -113,17 +177,32 @@ def _write_avro(schema, records, path, metadata=None): writer.close() -def _create_stats_less_iceberg_table(tmpdir, table_name, container_base): +def _create_iceberg_table( + tmpdir, + table_name, + container_base, + *, + manifest_entry_schema_str, + file_has_data_column=True, + data_file_stats=None, + schema_evolution=False, +): """ - Build a minimal Iceberg v2 table under tmpdir/table_name. The manifest is - written with a stats-less Avro schema to trigger the optimization bug. + Build a minimal Iceberg v2 table under ``tmpdir/table_name``. container_base is the absolute path inside the ClickHouse container where the - table will be placed after upload. File paths embedded in the Avro records and - metadata.json must reference container_base because they are interpreted at - query time inside the container. - - Returns the local table directory path (tmpdir/table_name). + table is placed after upload; file paths embedded in the Avro records and + metadata.json must reference it because they are interpreted at query time + inside the container. + + file_has_data_column controls whether the Parquet file physically contains + the ``data`` column. data_file_stats, when given, supplies the manifest + entry's stats arrays (e.g. ``value_counts``). schema_evolution lays the + table out with two schemas (0: ``id`` only, 1: ``id``+``data``) and two + snapshots so the data file resolves to the older schema id, exercising the + Parquet-reader fall-through that synthesizes the absent ``data`` column. + + Returns the local table directory path. """ table_local = os.path.join(tmpdir, table_name) data_local = os.path.join(table_local, "data") @@ -131,125 +210,137 @@ def _create_stats_less_iceberg_table(tmpdir, table_name, container_base): os.makedirs(data_local) os.makedirs(meta_local) - # Parquet data file with two nullable columns. - arrow_schema = pa.schema([ - pa.field("id", pa.int32(), nullable=True), - pa.field("data", pa.string(), nullable=True), - ]) + arrow_fields = [pa.field("id", pa.int32(), nullable=True)] + table_data = {"id": pa.array(_IDS, type=pa.int32())} + if file_has_data_column: + arrow_fields.append(pa.field("data", pa.string(), nullable=True)) + table_data["data"] = pa.array(_DATA, type=pa.string()) pq.write_table( - pa.table( - { - "id": pa.array([1, 2, 3], type=pa.int32()), - "data": pa.array(["hello", "world", "iceberg"], type=pa.string()), - }, - schema=arrow_schema, - ), + pa.table(table_data, schema=pa.schema(arrow_fields)), os.path.join(data_local, "00000-0-data.parquet"), ) data_size = os.path.getsize(os.path.join(data_local, "00000-0-data.parquet")) data_container_path = f"{container_base}/data/00000-0-data.parquet" - snapshot_id = 1 - seq_number = 1 - ts_ms = int(time.time() * 1000) + ts_ms = int(time.time() * 1000) - # Iceberg schema and partition-spec to embed as Avro file-level metadata. - # ManifestFile.cpp requires both keys to be present in the manifest's Avro header. - iceberg_schema = { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], + # The manifest entry's snapshot_id is the snapshot that wrote the data file. + # Under schema evolution that snapshot (id 1) carries the older schema (0), + # so the iterator resolves the file to schema 0 while reading the table at + # schema 1 -- the condition that makes getInitialSchemaByPath return the + # file's own schema as initial_header. + entry_snapshot_id = 1 + write_schema_id = 0 # schema the data file was written with + + data_file_record = { + "content": 0, # DATA + "file_path": data_container_path, + "file_format": "PARQUET", + "partition": {}, + "record_count": len(_IDS), + "file_size_in_bytes": data_size, } + if data_file_stats: + data_file_record.update(data_file_stats) - # Manifest file (stats-less Avro schema). manifest_local_path = os.path.join(meta_local, "00000-0-manifest.avro") manifest_container_path = f"{container_base}/metadata/00000-0-manifest.avro" _write_avro( - avro.schema.parse(_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR), + manifest_entry_schema_str, [{ "status": 1, # ADDED - "snapshot_id": snapshot_id, - "sequence_number": seq_number, - "file_sequence_number": seq_number, - "data_file": { - "content": 0, # DATA - "file_path": data_container_path, - "file_format": "PARQUET", - "partition": {}, - "record_count": 3, - "file_size_in_bytes": data_size, - }, + "snapshot_id": entry_snapshot_id, + "sequence_number": 1, + "file_sequence_number": 1, + "data_file": data_file_record, }], manifest_local_path, metadata={ - "schema": json.dumps(iceberg_schema), + # The manifest's own schema is the data file's write schema. + "schema": json.dumps(_iceberg_schema(write_schema_id, file_has_data_column)), "partition-spec": "[]", }, ) manifest_size = os.path.getsize(manifest_local_path) - # Manifest list. - mlist_filename = f"snap-{snapshot_id}-0-manifest-list.avro" - mlist_local_path = os.path.join(meta_local, mlist_filename) - mlist_container_path = f"{container_base}/metadata/{mlist_filename}" - _write_avro( - avro.schema.parse(_MANIFEST_LIST_SCHEMA_STR), - [{ - "manifest_path": manifest_container_path, - "manifest_length": manifest_size, - "partition_spec_id": 0, - "content": 0, # DATA - "sequence_number": seq_number, - "min_sequence_number": seq_number, - "added_snapshot_id": snapshot_id, - "added_files_count": 1, - "existing_files_count": 0, - "deleted_files_count": 0, - "added_rows_count": 3, - "existing_rows_count": 0, - "deleted_rows_count": 0, - }], - mlist_local_path, - ) + def _write_manifest_list(snapshot_id): + filename = f"snap-{snapshot_id}-0-manifest-list.avro" + local_path = os.path.join(meta_local, filename) + container_path = f"{container_base}/metadata/{filename}" + _write_avro( + _MANIFEST_LIST_SCHEMA_STR, + [{ + "manifest_path": manifest_container_path, + "manifest_length": manifest_size, + "partition_spec_id": 0, + "content": 0, # DATA + "sequence_number": 1, + "min_sequence_number": 1, + "added_snapshot_id": entry_snapshot_id, + "added_files_count": 1, + "existing_files_count": 0, + "deleted_files_count": 0, + "added_rows_count": len(_IDS), + "existing_rows_count": 0, + "deleted_rows_count": 0, + }], + local_path, + ) + return container_path + + if schema_evolution: + # Two schemas, two snapshots: the data file was written under schema 0 + # (id only); the table later evolved to schema 1 (id + data). + schemas = [_iceberg_schema(0, with_data=False), _iceberg_schema(1, with_data=True)] + current_schema = 1 + last_column_id = 2 + snapshots = [ + {"snapshot-id": 1, "schema-id": 0, "manifest-list": _write_manifest_list(1)}, + {"snapshot-id": 2, "schema-id": 1, "manifest-list": _write_manifest_list(2)}, + ] + current_snapshot = 2 + last_seq = 2 + else: + # Single schema (id + data); the data file resolves to it directly. + schemas = [_iceberg_schema(0, with_data=True)] + current_schema = 0 + last_column_id = 2 + snapshots = [ + {"snapshot-id": 1, "schema-id": 0, "manifest-list": _write_manifest_list(1)}, + ] + current_snapshot = 1 + last_seq = 1 - # Table metadata JSON. metadata = { "format-version": 2, "table-uuid": str(uuid.uuid4()), "location": container_base, - "last-sequence-number": seq_number, + "last-sequence-number": last_seq, "last-updated-ms": ts_ms, - "last-column-id": 2, - "current-schema-id": 0, - "schemas": [{ - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], - }], + "last-column-id": last_column_id, + "current-schema-id": current_schema, + "schemas": schemas, "default-spec-id": 0, "partition-specs": [{"spec-id": 0, "fields": []}], "last-partition-id": 999, "default-sort-order-id": 0, "sort-orders": [{"order-id": 0, "fields": []}], "properties": {}, - "current-snapshot-id": snapshot_id, - "snapshots": [{ - "snapshot-id": snapshot_id, - "sequence-number": seq_number, - "timestamp-ms": ts_ms, - "manifest-list": mlist_container_path, - "summary": {"operation": "append"}, - "schema-id": 0, - }], - "snapshot-log": [{"timestamp-ms": ts_ms, "snapshot-id": snapshot_id}], + "current-snapshot-id": current_snapshot, + "snapshots": [ + { + "snapshot-id": s["snapshot-id"], + "sequence-number": s["snapshot-id"], + "timestamp-ms": ts_ms, + "manifest-list": s["manifest-list"], + "summary": {"operation": "append"}, + "schema-id": s["schema-id"], + } + for s in snapshots + ], + "snapshot-log": [{"timestamp-ms": ts_ms, "snapshot-id": s["snapshot-id"]} for s in snapshots], "metadata-log": [], - "refs": {"main": {"snapshot-id": snapshot_id, "type": "branch"}}, + "refs": {"main": {"snapshot-id": current_snapshot, "type": "branch"}}, } with open(os.path.join(meta_local, "v1.metadata.json"), "w") as f: json.dump(metadata, f, indent=2) @@ -257,7 +348,7 @@ def _create_stats_less_iceberg_table(tmpdir, table_name, container_base): return table_local -def _upload_to_container(instance, local_table_dir, container_base): +def _upload_to_node(instance, local_table_dir, container_base): uploader = LocalUploader(instance) for root, _dirs, files in os.walk(local_table_dir): for fname in files: @@ -266,420 +357,194 @@ def _upload_to_container(instance, local_table_dir, container_base): uploader.upload_file(local_path, os.path.join(container_base, rel)) -# Manifest entry schema with value_counts only (partial stats). -# column_sizes and null_value_counts are absent. value_counts alone is enough to -# populate columns_info so the absent-NULL guard (columns_info.empty()) is not triggered. -_MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR = json.dumps({ - "type": "record", - "name": "manifest_entry", - "fields": [ - {"name": "status", "type": "int"}, - {"name": "snapshot_id", "type": ["null", "long"]}, - {"name": "sequence_number", "type": ["null", "long"]}, - {"name": "file_sequence_number", "type": ["null", "long"]}, - { - "name": "data_file", - "type": { - "type": "record", - "name": "r2", - "fields": [ - {"name": "content", "type": "int"}, - {"name": "file_path", "type": "string"}, - {"name": "file_format", "type": "string"}, - { - "name": "partition", - "type": {"type": "record", "name": "r102", "fields": []}, - }, - {"name": "record_count", "type": "long"}, - {"name": "file_size_in_bytes", "type": "long"}, - # Only value_counts is present (column_sizes and null_value_counts absent). - { - "name": "value_counts", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "k81v81", - "fields": [ - {"name": "key", "type": "int"}, - {"name": "value", "type": "long"}, - ], - }, - }, - }, - ], - }, - }, - ], -}) - -# Manifest entry schema with all three stats fields (value_counts, column_sizes, -# null_value_counts). Represents a manifest written by a full-stats writer such as Spark. -_MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR = json.dumps({ - "type": "record", - "name": "manifest_entry", - "fields": [ - {"name": "status", "type": "int"}, - {"name": "snapshot_id", "type": ["null", "long"]}, - {"name": "sequence_number", "type": ["null", "long"]}, - {"name": "file_sequence_number", "type": ["null", "long"]}, - { - "name": "data_file", - "type": { - "type": "record", - "name": "r2", - "fields": [ - {"name": "content", "type": "int"}, - {"name": "file_path", "type": "string"}, - {"name": "file_format", "type": "string"}, - { - "name": "partition", - "type": {"type": "record", "name": "r102", "fields": []}, - }, - {"name": "record_count", "type": "long"}, - {"name": "file_size_in_bytes", "type": "long"}, - { - "name": "column_sizes", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "k81v81", - "fields": [ - {"name": "key", "type": "int"}, - {"name": "value", "type": "long"}, - ], - }, - }, - }, - { - "name": "value_counts", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "k81v81_vc", - "fields": [ - {"name": "key", "type": "int"}, - {"name": "value", "type": "long"}, - ], - }, - }, - }, - { - "name": "null_value_counts", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "k81v81_nc", - "fields": [ - {"name": "key", "type": "int"}, - {"name": "value", "type": "long"}, - ], - }, - }, - }, - ], - }, - }, - ], -}) +def _container_base(table_name): + return f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" -def _create_iceberg_table_with_schema(tmpdir, table_name, container_base, - manifest_schema_str, manifest_extra_fields=None): - """ - Build a minimal Iceberg v2 table. manifest_extra_fields is added to each - manifest entry's data_file record when using schemas that include stats. - """ - table_local = os.path.join(tmpdir, table_name) - data_local = os.path.join(table_local, "data") - meta_local = os.path.join(table_local, "metadata") - os.makedirs(data_local) - os.makedirs(meta_local) - - arrow_schema = pa.schema([ - pa.field("id", pa.int32(), nullable=True), - pa.field("data", pa.string(), nullable=True), - ]) - pq.write_table( - pa.table( - { - "id": pa.array([1, 2, 3], type=pa.int32()), - "data": pa.array(["hello", "world", "iceberg"], type=pa.string()), - }, - schema=arrow_schema, - ), - os.path.join(data_local, "00000-0-data.parquet"), - ) - data_size = os.path.getsize(os.path.join(data_local, "00000-0-data.parquet")) - data_container_path = f"{container_base}/data/00000-0-data.parquet" +def _query_local(instance, container_base, optimization=1): + return instance.query( + f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + " ORDER BY id" + f" SETTINGS allow_experimental_iceberg_read_optimization={optimization}" + ).strip() - snapshot_id = 1 - seq_number = 1 - ts_ms = int(time.time() * 1000) - iceberg_schema = { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], - } +def test_stats_less_manifest_returns_real_data(started_cluster_iceberg_no_spark): + """ + Discriminating reproducer for Altinity#1545. - data_file_record = { - "content": 0, - "file_path": data_container_path, - "file_format": "PARQUET", - "partition": {}, - "record_count": 3, - "file_size_in_bytes": data_size, - } - if manifest_extra_fields: - data_file_record.update(manifest_extra_fields) + A stats-less manifest (empty columns_info) must read real data, not all-NULL + rows. Unpatched build: every nullable column is injected as a constant NULL + and the result is 3 all-NULL rows. Fixed build: the empty-stats guard falls + through to the Parquet reader and returns the real values. + """ + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_stats_less_real_" + get_uuid_str() + container_base = _container_base(table_name) - manifest_local_path = os.path.join(meta_local, "00000-0-manifest.avro") - manifest_container_path = f"{container_base}/metadata/00000-0-manifest.avro" - _write_avro( - avro.schema.parse(manifest_schema_str), - [{ - "status": 1, - "snapshot_id": snapshot_id, - "sequence_number": seq_number, - "file_sequence_number": seq_number, - "data_file": data_file_record, - }], - manifest_local_path, - metadata={ - "schema": json.dumps(iceberg_schema), - "partition-spec": "[]", - }, - ) - manifest_size = os.path.getsize(manifest_local_path) + with tempfile.TemporaryDirectory() as tmpdir: + local_dir = _create_iceberg_table( + tmpdir, table_name, container_base, + manifest_entry_schema_str=_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR, + ) + _upload_to_node(instance, local_dir, container_base) - mlist_filename = f"snap-{snapshot_id}-0-manifest-list.avro" - mlist_local_path = os.path.join(meta_local, mlist_filename) - mlist_container_path = f"{container_base}/metadata/{mlist_filename}" - _write_avro( - avro.schema.parse(_MANIFEST_LIST_SCHEMA_STR), - [{ - "manifest_path": manifest_container_path, - "manifest_length": manifest_size, - "partition_spec_id": 0, - "content": 0, - "sequence_number": seq_number, - "min_sequence_number": seq_number, - "added_snapshot_id": snapshot_id, - "added_files_count": 1, - "existing_files_count": 0, - "deleted_files_count": 0, - "added_rows_count": 3, - "existing_rows_count": 0, - "deleted_rows_count": 0, - }], - mlist_local_path, + result = _query_local(instance, container_base) + assert result == _REAL_DATA_RESULT, ( + f"Got: {result!r}\n" + "All-NULL rows mean the bug fired: empty columns_info caused the " + "optimization to treat every nullable column as absent and inject " + "constant NULL (need_only_count=1)." ) - metadata = { - "format-version": 2, - "table-uuid": str(uuid.uuid4()), - "location": container_base, - "last-sequence-number": seq_number, - "last-updated-ms": ts_ms, - "last-column-id": 2, - "current-schema-id": 0, - "schemas": [{ - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], - }], - "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": []}], - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": {}, - "current-snapshot-id": snapshot_id, - "snapshots": [{ - "snapshot-id": snapshot_id, - "sequence-number": seq_number, - "timestamp-ms": ts_ms, - "manifest-list": mlist_container_path, - "summary": {"operation": "append"}, - "schema-id": 0, - }], - "snapshot-log": [{"timestamp-ms": ts_ms, "snapshot-id": snapshot_id}], - "metadata-log": [], - "refs": {"main": {"snapshot-id": snapshot_id, "type": "branch"}}, - } - with open(os.path.join(meta_local, "v1.metadata.json"), "w") as f: - json.dump(metadata, f, indent=2) - - return table_local - -def test_iceberg_local_returns_actual_rows_with_stats_less_manifest( - started_cluster_iceberg_no_spark, -): +def test_stats_less_manifest_schema_evolution_absent_column(started_cluster_iceberg_no_spark): """ - FAILS on unpatched code (Altinity#1545). + Discriminating test for the schema-evolution fall-through (highest risk). + + The Iceberg schema declares ``id``+``data`` but the Parquet file was written + under the older schema and physically contains only ``id``. With the + stats-less manifest, the fixed build must fall through to the Parquet reader, + read ``id`` for real, and synthesize the absent ``data`` column as NULL via + IcebergMetadata::getInitialSchemaByPath (file schema as initial_header). - With allow_experimental_iceberg_read_optimization=1 (the default), reading an - icebergLocal table whose manifest omits column statistics returns all-NULL rows - instead of real data. The correct output is 3 rows: (1,'hello'), (2,'world'), - (3,'iceberg'). + Unpatched build: the optimization marks BOTH columns (including ``id``) + constant NULL, so ``id`` comes back NULL too -- the discriminator. """ - instance = started_cluster_iceberg_no_spark.instances["node1"] - table_name = "test_opt_on_stats_less_" + get_uuid_str() - container_base = ( - f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" - ) + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_stats_less_evolve_" + get_uuid_str() + container_base = _container_base(table_name) with tempfile.TemporaryDirectory() as tmpdir: - local_dir = _create_stats_less_iceberg_table(tmpdir, table_name, container_base) - _upload_to_container(instance, local_dir, container_base) - - result = instance.query( - f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" - " ORDER BY id" - " SETTINGS allow_experimental_iceberg_read_optimization=1" - ).strip() + local_dir = _create_iceberg_table( + tmpdir, table_name, container_base, + manifest_entry_schema_str=_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR, + file_has_data_column=False, + schema_evolution=True, + ) + _upload_to_node(instance, local_dir, container_base) - assert result == "1\thello\n2\tworld\n3\ticeberg", ( + result = _query_local(instance, container_base) + assert result == _DATA_NULL_RESULT, ( f"Got: {result!r}\n" - "All-NULL rows means the bug fired: empty columns_infos caused the " - "optimization to treat every nullable column as an absent schema-evolution " - "column and inject constant NULL (need_only_count=1)." + "Expected real 'id' with NULL 'data'. If 'id' is also NULL the bug " + "fired; if the query errors, the Parquet fall-through did not synthesize " + "the schema-evolved-absent column as NULL." ) -def test_iceberg_local_returns_correct_rows_when_optimization_disabled( - started_cluster_iceberg_no_spark, -): +def test_stats_less_manifest_cluster_returns_real_data(started_cluster_iceberg_no_spark): """ - PASSES on unpatched code. - - With allow_experimental_iceberg_read_optimization=0 the optimization block is - skipped entirely and the Parquet file is read normally. This test is a - regression guard: it must keep passing after the bug is fixed. + Cluster/serialization-path coverage for Altinity#1545. + + icebergLocalCluster distributes file-level tasks to workers, serializing + DataFileMetaInfo (and its empty columns_info) through the cluster JSON path. + An empty columns_info must survive the round-trip and still read real data on + the worker, exercising the same fix on the *Cluster variants. The table is + uploaded to every node because local storage is read by each worker from its + own filesystem. """ - instance = started_cluster_iceberg_no_spark.instances["node1"] - table_name = "test_opt_off_stats_less_" + get_uuid_str() - container_base = ( - f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" - ) + cluster = started_cluster_iceberg_no_spark + instance = cluster.instances["node1"] + table_name = "test_stats_less_cluster_" + get_uuid_str() + container_base = _container_base(table_name) with tempfile.TemporaryDirectory() as tmpdir: - local_dir = _create_stats_less_iceberg_table(tmpdir, table_name, container_base) - _upload_to_container(instance, local_dir, container_base) + local_dir = _create_iceberg_table( + tmpdir, table_name, container_base, + manifest_entry_schema_str=_MANIFEST_ENTRY_NO_STATS_SCHEMA_STR, + ) + for node in cluster.instances.values(): + _upload_to_node(node, local_dir, container_base) result = instance.query( - f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" + f"SELECT * FROM icebergLocalCluster('cluster_simple', local, path='{container_base}', format=Parquet)" " ORDER BY id" - " SETTINGS allow_experimental_iceberg_read_optimization=0" + " SETTINGS allow_experimental_iceberg_read_optimization=1" ).strip() - - assert result == "1\thello\n2\tworld\n3\ticeberg" + assert result == _REAL_DATA_RESULT, ( + f"Got: {result!r}\n" + "All-NULL rows mean empty columns_info was mishandled on the worker after " + "the cluster JSON round-trip." + ) -def test_iceberg_local_partial_stats_manifest_reads_correctly( - started_cluster_iceberg_no_spark, -): +def test_non_empty_stats_absent_column_still_null(started_cluster_iceberg_no_spark): """ - Regression test for Altinity#1545: partial stats (value_counts only). + Non-regression guard: the optimization's legitimate absent-NULL path. - A manifest that includes value_counts but omits column_sizes and - null_value_counts still populates columns_info. The fix must treat the - columns as real (not absent), so real data is returned. + The Iceberg schema declares ``id``+``data`` but the Parquet file contains + only ``id``, and the manifest carries stats for ``id`` only (non-empty + columns_info). The optimization must keep marking the genuinely-absent + ``data`` column NULL while ``id`` reads real values. Passes on both builds; + the fix only changes the empty-columns_info case. """ - instance = started_cluster_iceberg_no_spark.instances["node1"] - table_name = "test_opt_on_partial_stats_" + get_uuid_str() - container_base = ( - f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" - ) + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_nonempty_absent_" + get_uuid_str() + container_base = _container_base(table_name) - # value_counts for field_id 1 (id) and 2 (data): 3 rows each. - extra = { - "value_counts": [ - {"key": 1, "value": 3}, - {"key": 2, "value": 3}, - ], - } + # value_counts for field_id 1 (id) only -> columns_info = {id}. + stats = {"value_counts": [{"key": 1, "value": len(_IDS)}]} with tempfile.TemporaryDirectory() as tmpdir: - local_dir = _create_iceberg_table_with_schema( + local_dir = _create_iceberg_table( tmpdir, table_name, container_base, - _MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR, - manifest_extra_fields=extra, + manifest_entry_schema_str=_MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR, + file_has_data_column=False, + data_file_stats=stats, ) - _upload_to_container(instance, local_dir, container_base) + _upload_to_node(instance, local_dir, container_base) - result = instance.query( - f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" - " ORDER BY id" - " SETTINGS allow_experimental_iceberg_read_optimization=1" - ).strip() - - assert result == "1\thello\n2\tworld\n3\ticeberg", ( + result = _query_local(instance, container_base) + assert result == _DATA_NULL_RESULT, ( f"Got: {result!r}\n" - "Partial-stats manifest (value_counts only) should not trigger the absent-NULL " - "path because columns_info is non-empty. All-NULL rows means the guard " - "fired incorrectly for partial-stats manifests." + "With non-empty stats the optimization must still read 'id' and inject " + "the absent 'data' column as NULL." ) -def test_iceberg_local_full_stats_manifest_reads_correctly( - started_cluster_iceberg_no_spark, +@pytest.mark.parametrize( + "manifest_entry_schema_str,stats", + [ + ( + _MANIFEST_ENTRY_PARTIAL_STATS_SCHEMA_STR, + {"value_counts": [{"key": 1, "value": 3}, {"key": 2, "value": 3}]}, + ), + ( + _MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR, + { + "column_sizes": [{"key": 1, "value": 64}, {"key": 2, "value": 128}], + "value_counts": [{"key": 1, "value": 3}, {"key": 2, "value": 3}], + "null_value_counts": [{"key": 1, "value": 0}, {"key": 2, "value": 0}], + }, + ), + ], + ids=["partial_stats", "full_stats"], +) +def test_non_empty_stats_returns_real_data( + started_cluster_iceberg_no_spark, manifest_entry_schema_str, stats ): """ - Control test: full-stats manifest (Spark-like) must continue to return real data. - - A manifest with all three stats fields (column_sizes, value_counts, - null_value_counts) is the common case written by Spark. This test verifies the - optimization does not regress for the normal path after the fix. + Non-regression guard: manifests that do populate columns_info (partial + value_counts-only, and full Spark-style stats) must keep returning real data + when the optimization is enabled. These are the cases the empty-stats guard + must not disturb. """ - instance = started_cluster_iceberg_no_spark.instances["node1"] - table_name = "test_opt_on_full_stats_" + get_uuid_str() - container_base = ( - f"/var/lib/clickhouse/user_files/iceberg_data/default/{table_name}" - ) - - # Full stats for field_id 1 (id) and 2 (data). - extra = { - "column_sizes": [ - {"key": 1, "value": 64}, - {"key": 2, "value": 128}, - ], - "value_counts": [ - {"key": 1, "value": 3}, - {"key": 2, "value": 3}, - ], - "null_value_counts": [ - {"key": 1, "value": 0}, - {"key": 2, "value": 0}, - ], - } + instance = started_cluster_iceberg_no_spark.instances["node1"] + table_name = "test_nonempty_real_" + get_uuid_str() + container_base = _container_base(table_name) with tempfile.TemporaryDirectory() as tmpdir: - local_dir = _create_iceberg_table_with_schema( + local_dir = _create_iceberg_table( tmpdir, table_name, container_base, - _MANIFEST_ENTRY_FULL_STATS_SCHEMA_STR, - manifest_extra_fields=extra, + manifest_entry_schema_str=manifest_entry_schema_str, + data_file_stats=stats, ) - _upload_to_container(instance, local_dir, container_base) - - result = instance.query( - f"SELECT * FROM icebergLocal(local, path='{container_base}', format=Parquet)" - " ORDER BY id" - " SETTINGS allow_experimental_iceberg_read_optimization=1" - ).strip() + _upload_to_node(instance, local_dir, container_base) - assert result == "1\thello\n2\tworld\n3\ticeberg", ( + result = _query_local(instance, container_base) + assert result == _REAL_DATA_RESULT, ( f"Got: {result!r}\n" - "Full-stats manifest should return real data when optimization is enabled." + "A manifest with non-empty stats must return real data when the " + "optimization is enabled." )