From 755793c10949249020ba4ff5980d4990178a1948 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 13:50:24 -0800 Subject: [PATCH 01/10] deletion vector write --- pyiceberg/table/puffin.py | 126 ++++++++++++++++++++++++++++++++++++- tests/table/test_puffin.py | 77 ++++++++++++++++++++++- 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..da0074a954 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,8 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import io import math -from typing import TYPE_CHECKING, Literal +import zlib +from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -27,6 +29,7 @@ # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" @@ -62,6 +65,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: + """ + Serializes a dictionary of bitmaps into a byte array. + + The format is: + - 8 bytes: number of bitmaps (little-endian) + - For each bitmap: + - 4 bytes: key (little-endian) + - n bytes: serialized bitmap + """ + with io.BytesIO() as out: + sorted_keys = sorted(bitmaps.keys()) + + # number of bitmaps + out.write(len(sorted_keys).to_bytes(8, "little")) + + for key in sorted_keys: + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + + # key + out.write(key.to_bytes(4, "little")) + # bitmap + out.write(bitmaps[key].serialize()) + return out.getvalue() + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() @@ -114,3 +146,95 @@ def __init__(self, puffin: bytes) -> None: def to_vector(self) -> dict[str, "pa.ChunkedArray"]: return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + + +class PuffinWriter: + _blobs: List[PuffinBlobMetadata] + _blob_payloads: List[bytes] + + def __init__(self) -> None: + self._blobs = [] + self._blob_payloads = [] + + def add( + self, + positions: Iterable[int], + referenced_data_file: str, + ) -> None: + # 1. Create bitmaps from positions + bitmaps: Dict[int, BitMap] = {} + cardinality = 0 + for pos in positions: + cardinality += 1 + key = pos >> 32 + low_bits = pos & 0xFFFFFFFF + if key not in bitmaps: + bitmaps[key] = BitMap() + bitmaps[key].add(low_bits) + + # 2. Serialize bitmaps for the vector payload + vector_payload = _serialize_bitmaps(bitmaps) + + # 3. Construct the full blob payload for deletion-vector-v1 + with io.BytesIO() as blob_payload_buffer: + # Magic bytes for DV + blob_payload_buffer.write(DELETION_VECTOR_MAGIC) + # The vector itself + blob_payload_buffer.write(vector_payload) + + # The content for CRC calculation + crc_content = blob_payload_buffer.getvalue() + crc32 = zlib.crc32(crc_content) + + # The full blob to be stored in the Puffin file + with io.BytesIO() as full_blob_buffer: + # Combined length of the vector and magic bytes stored as 4 bytes, big-endian + full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) + # The content (magic + vector) + full_blob_buffer.write(crc_content) + # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian + full_blob_buffer.write(crc32.to_bytes(4, "big")) + + self._blob_payloads.append(full_blob_buffer.getvalue()) + + # 4. Create blob metadata + properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} + + self._blobs.append( + PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[], + snapshot_id=-1, + sequence_number=-1, + offset=0, # Will be set later + length=0, # Will be set later + properties=properties, + compression_codec=None, # Explicitly None + ) + ) + + def finish(self) -> bytes: + with io.BytesIO() as out: + payload_buffer = io.BytesIO() + for blob_payload in self._blob_payloads: + payload_buffer.write(blob_payload) + + # Set offsets and lengths in metadata + current_offset = 4 # Start after file magic + for i, blob_payload in enumerate(self._blob_payloads): + self._blobs[i].offset = current_offset + self._blobs[i].length = len(blob_payload) + current_offset += len(blob_payload) + + footer = Footer(blobs=self._blobs) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + # Final assembly + out.write(MAGIC_BYTES) + out.write(payload_buffer.getvalue()) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + return out.getvalue() diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..0e9c881860 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter def _open_file(file: str) -> bytes: @@ -71,3 +71,78 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = _deserialize_bitmap(puffin) + + +def test_puffin_round_trip(): + # Define some deletion positions for multiple files + deletions1 = [10, 20, 30] + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + + file1_path = "path/to/data1.parquet" + file2_path = "path/to/data2.parquet" + + # Write the Puffin file + writer = PuffinWriter() + writer.add(positions=deletions1, referenced_data_file=file1_path) + writer.add(positions=deletions2, referenced_data_file=file2_path) + puffin_bytes = writer.finish() + + # Read the Puffin file back + reader = PuffinFile(puffin_bytes) + + # Assert footer metadata + assert len(reader.footer.blobs) == 2 + + blob1_meta = reader.footer.blobs[0] + assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties["cardinality"] == str(len(deletions1)) + + blob2_meta = reader.footer.blobs[1] + assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties["cardinality"] == str(len(deletions2)) + + # Assert the content of deletion vectors + read_vectors = reader.to_vector() + + assert file1_path in read_vectors + assert file2_path in read_vectors + + assert read_vectors[file1_path].to_pylist() == sorted(deletions1) + assert read_vectors[file2_path].to_pylist() == sorted(deletions2) + + +def test_write_and_read_puffin_file(): + writer = PuffinWriter() + writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + + assert len(reader.footer.blobs) == 2 + blob1 = reader.footer.blobs[0] + blob2 = reader.footer.blobs[1] + + assert blob1.properties["referenced-data-file"] == "file1.parquet" + assert blob1.properties["cardinality"] == "3" + assert blob1.type == "deletion-vector-v1" + assert blob1.snapshot_id == -1 + assert blob1.sequence_number == -1 + assert blob1.compression_codec is None + + assert blob2.properties["referenced-data-file"] == "file2.parquet" + assert blob2.properties["cardinality"] == "3" + + vectors = reader.to_vector() + assert len(vectors) == 2 + assert vectors["file1.parquet"].to_pylist() == [1, 2, 3] + assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] + + +def test_puffin_file_with_no_blobs(): + writer = PuffinWriter() + puffin_bytes = writer.finish() + + reader = PuffinFile(puffin_bytes) + assert len(reader.footer.blobs) == 0 + assert len(reader.to_vector()) == 0 From 9b10a4f37f36fd16227aa48ffbf2f911202abc5e Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:10:57 -0800 Subject: [PATCH 02/10] test fix --- pyiceberg/table/puffin.py | 12 +++++++----- tests/table/test_puffin.py | 6 +++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index da0074a954..8a7d4c2215 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -219,14 +219,16 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - # Set offsets and lengths in metadata - current_offset = 4 # Start after file magic + updated_blobs_metadata: List[PuffinBlobMetadata] = [] + current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): - self._blobs[i].offset = current_offset - self._blobs[i].length = len(blob_payload) + original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) + original_metadata_dict["offset"] = current_offset + original_metadata_dict["length"] = len(blob_payload) + updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer(blobs=self._blobs) + footer = Footer(blobs=updated_blobs_metadata) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 0e9c881860..c71afd24af 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter +from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE def _open_file(file: str) -> bytes: @@ -94,11 +94,11 @@ def test_puffin_round_trip(): assert len(reader.footer.blobs) == 2 blob1_meta = reader.footer.blobs[0] - assert blob1_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file1_path + assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) blob2_meta = reader.footer.blobs[1] - assert blob2_meta.properties[PuffinFile.PROPERTY_REFERENCED_DATA_FILE] == file2_path + assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path assert blob2_meta.properties["cardinality"] == str(len(deletions2)) # Assert the content of deletion vectors From c90ad387c1c7920e1e6e68f3f300b8003294cc1b Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 8 Dec 2025 14:17:09 -0800 Subject: [PATCH 03/10] lint fixes --- pyiceberg/table/puffin.py | 15 ++++++++------- tests/table/test_puffin.py | 14 +++++++------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 8a7d4c2215..c54173e01a 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -17,7 +17,8 @@ import io import math import zlib -from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional +from collections.abc import Iterable +from typing import TYPE_CHECKING, Literal from pydantic import Field from pyroaring import BitMap, FrozenBitMap @@ -65,9 +66,9 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps -def _serialize_bitmaps(bitmaps: Dict[int, BitMap]) -> bytes: +def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: """ - Serializes a dictionary of bitmaps into a byte array. + Serialize a dictionary of bitmaps into a byte array. The format is: - 8 bytes: number of bitmaps (little-endian) @@ -149,8 +150,8 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: - _blobs: List[PuffinBlobMetadata] - _blob_payloads: List[bytes] + _blobs: list[PuffinBlobMetadata] + _blob_payloads: list[bytes] def __init__(self) -> None: self._blobs = [] @@ -162,7 +163,7 @@ def add( referenced_data_file: str, ) -> None: # 1. Create bitmaps from positions - bitmaps: Dict[int, BitMap] = {} + bitmaps: dict[int, BitMap] = {} cardinality = 0 for pos in positions: cardinality += 1 @@ -219,7 +220,7 @@ def finish(self) -> bytes: for blob_payload in self._blob_payloads: payload_buffer.write(blob_payload) - updated_blobs_metadata: List[PuffinBlobMetadata] = [] + updated_blobs_metadata: list[PuffinBlobMetadata] = [] current_offset = 4 # Start after file magic (4 bytes) for i, blob_payload in enumerate(self._blob_payloads): original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index c71afd24af..403b2e038f 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap, PuffinFile, PuffinWriter, PROPERTY_REFERENCED_DATA_FILE +from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap def _open_file(file: str) -> bytes: @@ -73,10 +73,10 @@ def test_map_high_vals() -> None: _ = _deserialize_bitmap(puffin) -def test_puffin_round_trip(): +def test_puffin_round_trip() -> None: # Define some deletion positions for multiple files deletions1 = [10, 20, 30] - deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position file1_path = "path/to/data1.parquet" file2_path = "path/to/data2.parquet" @@ -92,7 +92,7 @@ def test_puffin_round_trip(): # Assert footer metadata assert len(reader.footer.blobs) == 2 - + blob1_meta = reader.footer.blobs[0] assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path assert blob1_meta.properties["cardinality"] == str(len(deletions1)) @@ -103,7 +103,7 @@ def test_puffin_round_trip(): # Assert the content of deletion vectors read_vectors = reader.to_vector() - + assert file1_path in read_vectors assert file2_path in read_vectors @@ -111,7 +111,7 @@ def test_puffin_round_trip(): assert read_vectors[file2_path].to_pylist() == sorted(deletions2) -def test_write_and_read_puffin_file(): +def test_write_and_read_puffin_file() -> None: writer = PuffinWriter() writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") @@ -139,7 +139,7 @@ def test_write_and_read_puffin_file(): assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] -def test_puffin_file_with_no_blobs(): +def test_puffin_file_with_no_blobs() -> None: writer = PuffinWriter() puffin_bytes = writer.finish() From 842d6a532dccff2605461040c7b172a0d0b59b32 Mon Sep 17 00:00:00 2001 From: Gabriel Lesperance Date: Wed, 10 Dec 2025 21:13:10 -0500 Subject: [PATCH 04/10] test: Add Spark interop test for Puffin DV reader Verify pyiceberg's PuffinFile reader can parse deletion vectors written by Spark. Uses coalesce(1) to force Spark to create DVs instead of COW. --- .../integration/test_puffin_spark_interop.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 tests/integration/test_puffin_spark_interop.py diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py new file mode 100644 index 0000000000..be19276cd4 --- /dev/null +++ b/tests/integration/test_puffin_spark_interop.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.manifest import ManifestContent +from pyiceberg.table.puffin import PuffinFile + + +def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: + for sql in sqls: + spark.sql(sql) + + +@pytest.mark.integration +def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: + """Verify pyiceberg can read Puffin DVs written by Spark.""" + identifier = "default.spark_puffin_format_test" + + run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"]) + run_spark_commands( + spark, + [ + f""" + CREATE TABLE {identifier} (id BIGINT) + USING iceberg + TBLPROPERTIES ( + 'format-version' = '3', + 'write.delete.mode' = 'merge-on-read' + ) + """, + ], + ) + + df = spark.range(1, 51) + df.coalesce(1).writeTo(identifier).append() + + files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect() + assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}" + + run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"]) + + table = session_catalog.load_table(identifier) + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + + manifests = current_snapshot.manifests(table.io) + delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES] + assert len(delete_manifests) > 0, "Expected delete manifest with DVs" + + delete_manifest = delete_manifests[0] + entries = list(delete_manifest.fetch_manifest_entry(table.io)) + assert len(entries) > 0, "Expected at least one delete file entry" + + delete_entry = entries[0] + puffin_path = delete_entry.data_file.file_path + assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}" + + input_file = table.io.new_input(puffin_path) + with input_file.open() as f: + puffin_bytes = f.read() + + puffin = PuffinFile(puffin_bytes) + + assert len(puffin.footer.blobs) == 1, "Expected exactly one blob" + + blob = puffin.footer.blobs[0] + assert blob.type == "deletion-vector-v1" + assert "referenced-data-file" in blob.properties + assert blob.properties["cardinality"] == "4" + + dv_dict = puffin.to_vector() + assert len(dv_dict) == 1, "Expected one data file's deletions" + + for data_file_path, chunked_array in dv_dict.items(): + positions = chunked_array.to_pylist() + assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" + assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" From 952461887254ac2b472ff09ede4f5d166d4b7a4c Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 11 Dec 2025 16:43:05 -0800 Subject: [PATCH 05/10] PR comments --- pyiceberg/table/puffin.py | 28 +++++++++++------ tests/table/test_puffin.py | 62 +++++++++++++++----------------------- 2 files changed, 44 insertions(+), 46 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index c54173e01a..18516b33fc 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -152,27 +152,34 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: _blobs: list[PuffinBlobMetadata] _blob_payloads: list[bytes] + _created_by: str | None - def __init__(self) -> None: + def __init__(self, created_by: str | None = None) -> None: self._blobs = [] self._blob_payloads = [] + self._created_by = created_by - def add( + def set_blob( self, positions: Iterable[int], referenced_data_file: str, ) -> None: + # We only support one blob at the moment + self._blobs = [] + self._blob_payloads = [] + # 1. Create bitmaps from positions bitmaps: dict[int, BitMap] = {} - cardinality = 0 for pos in positions: - cardinality += 1 key = pos >> 32 low_bits = pos & 0xFFFFFFFF if key not in bitmaps: bitmaps[key] = BitMap() bitmaps[key].add(low_bits) + # Calculate the cardinality from the bitmaps + cardinality = sum(len(bm) for bm in bitmaps.values()) + # 2. Serialize bitmaps for the vector payload vector_payload = _serialize_bitmaps(bitmaps) @@ -204,13 +211,13 @@ def add( self._blobs.append( PuffinBlobMetadata( type="deletion-vector-v1", - fields=[], + fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors snapshot_id=-1, sequence_number=-1, - offset=0, # Will be set later - length=0, # Will be set later + offset=0, # TODO: Use DeleteFileIndex data + length=0, # TODO: Use DeleteFileIndex data properties=properties, - compression_codec=None, # Explicitly None + compression_codec=None, ) ) @@ -229,12 +236,15 @@ def finish(self) -> bytes: updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer(blobs=updated_blobs_metadata) + footer = Footer( + blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {} + ) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly out.write(MAGIC_BYTES) out.write(payload_buffer.getvalue()) + out.write(MAGIC_BYTES) out.write(footer_payload_bytes) out.write(len(footer_payload_bytes).to_bytes(4, "little")) out.write((0).to_bytes(4, "little")) # flags diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 403b2e038f..c39a9da0fd 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -74,68 +74,55 @@ def test_map_high_vals() -> None: def test_puffin_round_trip() -> None: - # Define some deletion positions for multiple files - deletions1 = [10, 20, 30] - deletions2 = [5, (1 << 32) + 1] # Test with a high-bit position + # Define some deletion positions for a file + deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate - file1_path = "path/to/data1.parquet" - file2_path = "path/to/data2.parquet" + file_path = "path/to/data.parquet" # Write the Puffin file - writer = PuffinWriter() - writer.add(positions=deletions1, referenced_data_file=file1_path) - writer.add(positions=deletions2, referenced_data_file=file2_path) + writer = PuffinWriter(created_by="my-test-app") + writer.set_blob(positions=deletions, referenced_data_file=file_path) puffin_bytes = writer.finish() # Read the Puffin file back reader = PuffinFile(puffin_bytes) # Assert footer metadata - assert len(reader.footer.blobs) == 2 - - blob1_meta = reader.footer.blobs[0] - assert blob1_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file1_path - assert blob1_meta.properties["cardinality"] == str(len(deletions1)) + assert reader.footer.properties["created-by"] == "my-test-app" + assert len(reader.footer.blobs) == 1 - blob2_meta = reader.footer.blobs[1] - assert blob2_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file2_path - assert blob2_meta.properties["cardinality"] == str(len(deletions2)) + blob_meta = reader.footer.blobs[0] + assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path + assert blob_meta.properties["cardinality"] == str(len(set(deletions))) # Assert the content of deletion vectors read_vectors = reader.to_vector() - assert file1_path in read_vectors - assert file2_path in read_vectors - - assert read_vectors[file1_path].to_pylist() == sorted(deletions1) - assert read_vectors[file2_path].to_pylist() == sorted(deletions2) + assert file_path in read_vectors + assert read_vectors[file_path].to_pylist() == sorted(list(set(deletions))) def test_write_and_read_puffin_file() -> None: writer = PuffinWriter() - writer.add(positions=[1, 2, 3], referenced_data_file="file1.parquet") - writer.add(positions=[4, 5, 6], referenced_data_file="file2.parquet") + writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet") puffin_bytes = writer.finish() reader = PuffinFile(puffin_bytes) - assert len(reader.footer.blobs) == 2 - blob1 = reader.footer.blobs[0] - blob2 = reader.footer.blobs[1] - - assert blob1.properties["referenced-data-file"] == "file1.parquet" - assert blob1.properties["cardinality"] == "3" - assert blob1.type == "deletion-vector-v1" - assert blob1.snapshot_id == -1 - assert blob1.sequence_number == -1 - assert blob1.compression_codec is None + assert len(reader.footer.blobs) == 1 + blob = reader.footer.blobs[0] - assert blob2.properties["referenced-data-file"] == "file2.parquet" - assert blob2.properties["cardinality"] == "3" + assert blob.properties["referenced-data-file"] == "file2.parquet" + assert blob.properties["cardinality"] == "3" + assert blob.type == "deletion-vector-v1" + assert blob.snapshot_id == -1 + assert blob.sequence_number == -1 + assert blob.compression_codec is None vectors = reader.to_vector() - assert len(vectors) == 2 - assert vectors["file1.parquet"].to_pylist() == [1, 2, 3] + assert len(vectors) == 1 + assert "file1.parquet" not in vectors assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] @@ -146,3 +133,4 @@ def test_puffin_file_with_no_blobs() -> None: reader = PuffinFile(puffin_bytes) assert len(reader.footer.blobs) == 0 assert len(reader.to_vector()) == 0 + assert "created-by" not in reader.footer.properties From e23a67d210ba3b6a274514bb1748ed690a85b794 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Thu, 11 Dec 2025 16:46:01 -0800 Subject: [PATCH 06/10] lint --- pyiceberg/table/puffin.py | 10 ++++------ tests/integration/test_puffin_spark_interop.py | 2 +- tests/table/test_puffin.py | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 18516b33fc..8acf21f974 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -211,11 +211,11 @@ def set_blob( self._blobs.append( PuffinBlobMetadata( type="deletion-vector-v1", - fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors + fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors snapshot_id=-1, sequence_number=-1, - offset=0, # TODO: Use DeleteFileIndex data - length=0, # TODO: Use DeleteFileIndex data + offset=0, # TODO: Use DeleteFileIndex data + length=0, # TODO: Use DeleteFileIndex data properties=properties, compression_codec=None, ) @@ -236,9 +236,7 @@ def finish(self) -> bytes: updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) current_offset += len(blob_payload) - footer = Footer( - blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {} - ) + footer = Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {}) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") # Final assembly diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py index be19276cd4..d4c6735fca 100644 --- a/tests/integration/test_puffin_spark_interop.py +++ b/tests/integration/test_puffin_spark_interop.py @@ -87,7 +87,7 @@ def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: Rest dv_dict = puffin.to_vector() assert len(dv_dict) == 1, "Expected one data file's deletions" - for data_file_path, chunked_array in dv_dict.items(): + for _data_file_path, chunked_array in dv_dict.items(): positions = chunked_array.to_pylist() assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index c39a9da0fd..1ea0913e29 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -99,7 +99,7 @@ def test_puffin_round_trip() -> None: read_vectors = reader.to_vector() assert file_path in read_vectors - assert read_vectors[file_path].to_pylist() == sorted(list(set(deletions))) + assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) def test_write_and_read_puffin_file() -> None: From 72ebba819e77ffaad1b9f307a1adb0f225b3b91c Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 10 Jun 2026 08:05:36 +0900 Subject: [PATCH 07/10] Test: lock in agreed DV field id and blob framing PuffinFile reads only the serialized vector, skipping a blob's length prefix, deletion-vector magic and CRC-32, so the round-trip tests never exercise that framing. Add coverage for review items agreed on the original PR (#2822) that were not yet asserted by any test: - Assert the blob `fields` is [2147483645] (Java MetadataColumns.ROW_POSITION, INT_MAX - 2), required for Java/Spark interoperability (raised by @ebyhr). - Assert the deletion-vector blob framing at the byte level: the length prefix, the deletion-vector magic, and the CRC-32 over magic + vector. --- tests/table/test_puffin.py | 41 +++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 1ea0913e29..09393517e8 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -14,12 +14,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import zlib from os import path import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import PROPERTY_REFERENCED_DATA_FILE, PuffinFile, PuffinWriter, _deserialize_bitmap +from pyiceberg.table.puffin import ( + DELETION_VECTOR_MAGIC, + MAGIC_BYTES, + PROPERTY_REFERENCED_DATA_FILE, + PuffinFile, + PuffinWriter, + _deserialize_bitmap, +) def _open_file(file: str) -> bytes: @@ -116,6 +124,9 @@ def test_write_and_read_puffin_file() -> None: assert blob.properties["referenced-data-file"] == "file2.parquet" assert blob.properties["cardinality"] == "3" assert blob.type == "deletion-vector-v1" + # Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2); + # required for Java/Spark interoperability. + assert blob.fields == [2147483645] assert blob.snapshot_id == -1 assert blob.sequence_number == -1 assert blob.compression_codec is None @@ -126,6 +137,34 @@ def test_write_and_read_puffin_file() -> None: assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] +def test_deletion_vector_blob_framing_is_spec_compliant() -> None: + # PuffinFile reads only the serialized vector, skipping the blob's length prefix, + # deletion-vector magic and CRC-32. Assert that framing directly at the byte level so + # the bytes an external reader (Java/Spark) relies on stay spec-compliant. + positions = [0, 1, 5, (1 << 32) + 7] + writer = PuffinWriter() + writer.set_blob(positions=positions, referenced_data_file="file.parquet") + puffin_bytes = writer.finish() + + # The Puffin file begins with the magic. + assert puffin_bytes[:4] == MAGIC_BYTES + + blob = PuffinFile(puffin_bytes).footer.blobs[0] + blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length] + + # Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian), + # where the length and CRC-32 both cover the magic bytes plus the vector. + length_prefix = int.from_bytes(blob_bytes[0:4], "big") + dv_magic = blob_bytes[4:8] + vector = blob_bytes[8 : 4 + length_prefix] + crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big") + + assert dv_magic == DELETION_VECTOR_MAGIC + assert length_prefix == len(dv_magic) + len(vector) + assert blob.length == 4 + length_prefix + 4 + assert crc == zlib.crc32(dv_magic + vector) + + def test_puffin_file_with_no_blobs() -> None: writer = PuffinWriter() puffin_bytes = writer.finish() From 4ecfd186de753643eeec96940124998745ec1762 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 10 Jun 2026 12:45:57 +0900 Subject: [PATCH 08/10] Address review comments - Default created-by footer property to 'PyIceberg version {version}' - Move the Spark interop reader test to a separate PR - Remove numbered and self-evident comments - Name the row position field id constant - Validate positions in set_blob (non-negative, non-empty) - Simplify blob framing and finish() assembly --- pyiceberg/table/puffin.py | 93 +++++++++---------- .../integration/test_puffin_spark_interop.py | 93 ------------------- tests/table/test_puffin.py | 21 ++++- 3 files changed, 62 insertions(+), 145 deletions(-) delete mode 100644 tests/integration/test_puffin_spark_interop.py diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 8acf21f974..581123bda8 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib.metadata import io import math import zlib @@ -34,6 +35,9 @@ EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" +# Reserved field id of the row position (_pos) metadata column, referenced by +# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION) +ROW_POSITION_FIELD_ID = 2147483645 def _deserialize_bitmap(pl: bytes) -> list[BitMap]: @@ -150,98 +154,85 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: + """Writes a Puffin file containing a single deletion-vector-v1 blob.""" + _blobs: list[PuffinBlobMetadata] _blob_payloads: list[bytes] - _created_by: str | None + _created_by: str def __init__(self, created_by: str | None = None) -> None: self._blobs = [] self._blob_payloads = [] - self._created_by = created_by + self._created_by = ( + created_by if created_by is not None else f"PyIceberg version {importlib.metadata.version('pyiceberg')}" + ) def set_blob( self, positions: Iterable[int], referenced_data_file: str, ) -> None: + """Set the deletion vector blob for a data file, replacing any previously set blob. + + Args: + positions: Zero-based positions of the deleted rows in the referenced data file. + referenced_data_file: Location of the data file the deletion vector applies to. + """ # We only support one blob at the moment self._blobs = [] self._blob_payloads = [] - # 1. Create bitmaps from positions bitmaps: dict[int, BitMap] = {} for pos in positions: + if pos < 0: + raise ValueError(f"Invalid position: {pos}, positions must be non-negative") key = pos >> 32 low_bits = pos & 0xFFFFFFFF if key not in bitmaps: bitmaps[key] = BitMap() bitmaps[key].add(low_bits) - # Calculate the cardinality from the bitmaps - cardinality = sum(len(bm) for bm in bitmaps.values()) + if not bitmaps: + raise ValueError("Deletion vector must contain at least one position") - # 2. Serialize bitmaps for the vector payload + cardinality = sum(len(bm) for bm in bitmaps.values()) vector_payload = _serialize_bitmaps(bitmaps) - # 3. Construct the full blob payload for deletion-vector-v1 - with io.BytesIO() as blob_payload_buffer: - # Magic bytes for DV - blob_payload_buffer.write(DELETION_VECTOR_MAGIC) - # The vector itself - blob_payload_buffer.write(vector_payload) - - # The content for CRC calculation - crc_content = blob_payload_buffer.getvalue() - crc32 = zlib.crc32(crc_content) - - # The full blob to be stored in the Puffin file - with io.BytesIO() as full_blob_buffer: - # Combined length of the vector and magic bytes stored as 4 bytes, big-endian - full_blob_buffer.write(len(crc_content).to_bytes(4, "big")) - # The content (magic + vector) - full_blob_buffer.write(crc_content) - # A CRC-32 checksum of the magic bytes and serialized vector as 4 bytes, big-endian - full_blob_buffer.write(crc32.to_bytes(4, "big")) - - self._blob_payloads.append(full_blob_buffer.getvalue()) - - # 4. Create blob metadata - properties = {PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)} + # deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian), + # the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian) + blob_content = DELETION_VECTOR_MAGIC + vector_payload + self._blob_payloads.append( + len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big") + ) self._blobs.append( PuffinBlobMetadata( type="deletion-vector-v1", - fields=[2147483645], # Java INT_MAX - 2, reserved field id for deletion vectors + fields=[ROW_POSITION_FIELD_ID], + # -1 means the snapshot id and sequence number are inherited at commit time snapshot_id=-1, sequence_number=-1, - offset=0, # TODO: Use DeleteFileIndex data - length=0, # TODO: Use DeleteFileIndex data - properties=properties, + # offset and length are placeholders; finish() fills them in when assembling the file + offset=0, + length=0, + properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)}, compression_codec=None, ) ) def finish(self) -> bytes: + """Serialize the Puffin file and return its contents as bytes.""" with io.BytesIO() as out: - payload_buffer = io.BytesIO() - for blob_payload in self._blob_payloads: - payload_buffer.write(blob_payload) - - updated_blobs_metadata: list[PuffinBlobMetadata] = [] - current_offset = 4 # Start after file magic (4 bytes) - for i, blob_payload in enumerate(self._blob_payloads): - original_metadata_dict = self._blobs[i].model_dump(by_alias=True, exclude_none=True) - original_metadata_dict["offset"] = current_offset - original_metadata_dict["length"] = len(blob_payload) - updated_blobs_metadata.append(PuffinBlobMetadata(**original_metadata_dict)) - current_offset += len(blob_payload) - - footer = Footer(blobs=updated_blobs_metadata, properties={"created-by": self._created_by} if self._created_by else {}) + out.write(MAGIC_BYTES) + + blobs_metadata: list[PuffinBlobMetadata] = [] + for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True): + blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)})) + out.write(blob_payload) + + footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by}) footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") - # Final assembly - out.write(MAGIC_BYTES) - out.write(payload_buffer.getvalue()) out.write(MAGIC_BYTES) out.write(footer_payload_bytes) out.write(len(footer_payload_bytes).to_bytes(4, "little")) diff --git a/tests/integration/test_puffin_spark_interop.py b/tests/integration/test_puffin_spark_interop.py deleted file mode 100644 index d4c6735fca..0000000000 --- a/tests/integration/test_puffin_spark_interop.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import pytest -from pyspark.sql import SparkSession - -from pyiceberg.catalog.rest import RestCatalog -from pyiceberg.manifest import ManifestContent -from pyiceberg.table.puffin import PuffinFile - - -def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: - for sql in sqls: - spark.sql(sql) - - -@pytest.mark.integration -def test_read_spark_written_puffin_dv(spark: SparkSession, session_catalog: RestCatalog) -> None: - """Verify pyiceberg can read Puffin DVs written by Spark.""" - identifier = "default.spark_puffin_format_test" - - run_spark_commands(spark, [f"DROP TABLE IF EXISTS {identifier}"]) - run_spark_commands( - spark, - [ - f""" - CREATE TABLE {identifier} (id BIGINT) - USING iceberg - TBLPROPERTIES ( - 'format-version' = '3', - 'write.delete.mode' = 'merge-on-read' - ) - """, - ], - ) - - df = spark.range(1, 51) - df.coalesce(1).writeTo(identifier).append() - - files_before = spark.sql(f"SELECT * FROM {identifier}.files").collect() - assert len(files_before) == 1, f"Expected 1 file, got {len(files_before)}" - - run_spark_commands(spark, [f"DELETE FROM {identifier} WHERE id IN (10, 20, 30, 40)"]) - - table = session_catalog.load_table(identifier) - current_snapshot = table.current_snapshot() - assert current_snapshot is not None - - manifests = current_snapshot.manifests(table.io) - delete_manifests = [m for m in manifests if m.content == ManifestContent.DELETES] - assert len(delete_manifests) > 0, "Expected delete manifest with DVs" - - delete_manifest = delete_manifests[0] - entries = list(delete_manifest.fetch_manifest_entry(table.io)) - assert len(entries) > 0, "Expected at least one delete file entry" - - delete_entry = entries[0] - puffin_path = delete_entry.data_file.file_path - assert puffin_path.endswith(".puffin"), f"Expected Puffin file, got: {puffin_path}" - - input_file = table.io.new_input(puffin_path) - with input_file.open() as f: - puffin_bytes = f.read() - - puffin = PuffinFile(puffin_bytes) - - assert len(puffin.footer.blobs) == 1, "Expected exactly one blob" - - blob = puffin.footer.blobs[0] - assert blob.type == "deletion-vector-v1" - assert "referenced-data-file" in blob.properties - assert blob.properties["cardinality"] == "4" - - dv_dict = puffin.to_vector() - assert len(dv_dict) == 1, "Expected one data file's deletions" - - for _data_file_path, chunked_array in dv_dict.items(): - positions = chunked_array.to_pylist() - assert len(positions) == 4, f"Expected 4 deleted positions, got {len(positions)}" - assert sorted(positions) == [9, 19, 29, 39], f"Unexpected positions: {positions}" diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 09393517e8..f0bcf2b8f5 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib.metadata import zlib from os import path @@ -172,4 +173,22 @@ def test_puffin_file_with_no_blobs() -> None: reader = PuffinFile(puffin_bytes) assert len(reader.footer.blobs) == 0 assert len(reader.to_vector()) == 0 - assert "created-by" not in reader.footer.properties + + +def test_puffin_writer_default_created_by() -> None: + puffin_bytes = PuffinWriter().finish() + + reader = PuffinFile(puffin_bytes) + assert reader.footer.properties["created-by"] == f"PyIceberg version {importlib.metadata.version('pyiceberg')}" + + +def test_set_blob_rejects_negative_positions() -> None: + writer = PuffinWriter() + with pytest.raises(ValueError, match="Invalid position: -1"): + writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet") + + +def test_set_blob_rejects_empty_positions() -> None: + writer = PuffinWriter() + with pytest.raises(ValueError, match="Deletion vector must contain at least one position"): + writer.set_blob(positions=[], referenced_data_file="file.parquet") From eb81422753d2194abb762047c636fcc5f1067975 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 10 Jun 2026 17:52:28 +0900 Subject: [PATCH 09/10] Accept an OutputFile in PuffinWriter and write the file in finish() --- pyiceberg/table/puffin.py | 18 +++++++++---- tests/table/test_puffin.py | 53 +++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 581123bda8..f3d08bcc55 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -24,6 +24,7 @@ from pydantic import Field from pyroaring import BitMap, FrozenBitMap +from pyiceberg.io import OutputFile from pyiceberg.typedef import IcebergBaseModel if TYPE_CHECKING: @@ -154,13 +155,15 @@ def to_vector(self) -> dict[str, "pa.ChunkedArray"]: class PuffinWriter: - """Writes a Puffin file containing a single deletion-vector-v1 blob.""" + """Writes a Puffin file containing a single deletion-vector-v1 blob to an output file.""" + _output_file: OutputFile _blobs: list[PuffinBlobMetadata] _blob_payloads: list[bytes] _created_by: str - def __init__(self, created_by: str | None = None) -> None: + def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None: + self._output_file = output_file self._blobs = [] self._blob_payloads = [] self._created_by = ( @@ -220,8 +223,8 @@ def set_blob( ) ) - def finish(self) -> bytes: - """Serialize the Puffin file and return its contents as bytes.""" + def finish(self) -> int: + """Write the Puffin file to the output file and return its size in bytes.""" with io.BytesIO() as out: out.write(MAGIC_BYTES) @@ -239,4 +242,9 @@ def finish(self) -> bytes: out.write((0).to_bytes(4, "little")) # flags out.write(MAGIC_BYTES) - return out.getvalue() + puffin_bytes = out.getvalue() + + with self._output_file.create(overwrite=True) as output_stream: + output_stream.write(puffin_bytes) + + return len(puffin_bytes) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index f0bcf2b8f5..9a4323c5e8 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -17,10 +17,12 @@ import importlib.metadata import zlib from os import path +from pathlib import Path import pytest from pyroaring import BitMap +from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.table.puffin import ( DELETION_VECTOR_MAGIC, MAGIC_BYTES, @@ -82,18 +84,25 @@ def test_map_high_vals() -> None: _ = _deserialize_bitmap(puffin) -def test_puffin_round_trip() -> None: +def _new_writer(tmp_path: Path, created_by: str | None = None) -> tuple[PuffinWriter, Path]: + puffin_path = tmp_path / "test.puffin" + return PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by), puffin_path + + +def test_puffin_round_trip(tmp_path: Path) -> None: # Define some deletion positions for a file deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate file_path = "path/to/data.parquet" # Write the Puffin file - writer = PuffinWriter(created_by="my-test-app") + writer, puffin_path = _new_writer(tmp_path, created_by="my-test-app") writer.set_blob(positions=deletions, referenced_data_file=file_path) - puffin_bytes = writer.finish() + size = writer.finish() # Read the Puffin file back + puffin_bytes = puffin_path.read_bytes() + assert size == len(puffin_bytes) reader = PuffinFile(puffin_bytes) # Assert footer metadata @@ -111,13 +120,13 @@ def test_puffin_round_trip() -> None: assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) -def test_write_and_read_puffin_file() -> None: - writer = PuffinWriter() +def test_write_and_read_puffin_file(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet") - puffin_bytes = writer.finish() + writer.finish() - reader = PuffinFile(puffin_bytes) + reader = PuffinFile(puffin_path.read_bytes()) assert len(reader.footer.blobs) == 1 blob = reader.footer.blobs[0] @@ -138,14 +147,15 @@ def test_write_and_read_puffin_file() -> None: assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] -def test_deletion_vector_blob_framing_is_spec_compliant() -> None: +def test_deletion_vector_blob_framing_is_spec_compliant(tmp_path: Path) -> None: # PuffinFile reads only the serialized vector, skipping the blob's length prefix, # deletion-vector magic and CRC-32. Assert that framing directly at the byte level so # the bytes an external reader (Java/Spark) relies on stay spec-compliant. positions = [0, 1, 5, (1 << 32) + 7] - writer = PuffinWriter() + writer, puffin_path = _new_writer(tmp_path) writer.set_blob(positions=positions, referenced_data_file="file.parquet") - puffin_bytes = writer.finish() + writer.finish() + puffin_bytes = puffin_path.read_bytes() # The Puffin file begins with the magic. assert puffin_bytes[:4] == MAGIC_BYTES @@ -166,29 +176,30 @@ def test_deletion_vector_blob_framing_is_spec_compliant() -> None: assert crc == zlib.crc32(dv_magic + vector) -def test_puffin_file_with_no_blobs() -> None: - writer = PuffinWriter() - puffin_bytes = writer.finish() +def test_puffin_file_with_no_blobs(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) + writer.finish() - reader = PuffinFile(puffin_bytes) + reader = PuffinFile(puffin_path.read_bytes()) assert len(reader.footer.blobs) == 0 assert len(reader.to_vector()) == 0 -def test_puffin_writer_default_created_by() -> None: - puffin_bytes = PuffinWriter().finish() +def test_puffin_writer_default_created_by(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) + writer.finish() - reader = PuffinFile(puffin_bytes) + reader = PuffinFile(puffin_path.read_bytes()) assert reader.footer.properties["created-by"] == f"PyIceberg version {importlib.metadata.version('pyiceberg')}" -def test_set_blob_rejects_negative_positions() -> None: - writer = PuffinWriter() +def test_set_blob_rejects_negative_positions(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) with pytest.raises(ValueError, match="Invalid position: -1"): writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet") -def test_set_blob_rejects_empty_positions() -> None: - writer = PuffinWriter() +def test_set_blob_rejects_empty_positions(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) with pytest.raises(ValueError, match="Deletion vector must contain at least one position"): writer.set_blob(positions=[], referenced_data_file="file.parquet") From a6d2f319a517551dd6aa659af1ff909262ed4e4e Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 10 Jun 2026 18:07:26 +0900 Subject: [PATCH 10/10] Add unit tests for sparse bitmap keys and the Java key range limit --- tests/table/test_puffin.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index 9a4323c5e8..5bc091033d 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -120,6 +120,18 @@ def test_puffin_round_trip(tmp_path: Path) -> None: assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) +def test_puffin_round_trip_with_sparse_bitmap_keys(tmp_path: Path) -> None: + # High bits 0 and 2 are present while 1 is absent; the writer must emit sorted keys + # and the reader pads the missing key with an empty bitmap. + positions = [3, (2 << 32) + 4] + writer, puffin_path = _new_writer(tmp_path) + writer.set_blob(positions=positions, referenced_data_file="file.parquet") + writer.finish() + + vectors = PuffinFile(puffin_path.read_bytes()).to_vector() + assert vectors["file.parquet"].to_pylist() == positions + + def test_write_and_read_puffin_file(tmp_path: Path) -> None: writer, puffin_path = _new_writer(tmp_path) writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") @@ -203,3 +215,9 @@ def test_set_blob_rejects_empty_positions(tmp_path: Path) -> None: writer, _ = _new_writer(tmp_path) with pytest.raises(ValueError, match="Deletion vector must contain at least one position"): writer.set_blob(positions=[], referenced_data_file="file.parquet") + + +def test_set_blob_rejects_position_exceeding_java_key_range(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) + with pytest.raises(ValueError, match="Key 2147483648 is too large, max 2147483647"): + writer.set_blob(positions=[(2**31) << 32], referenced_data_file="file.parquet")