diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..f3d08bcc55 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,12 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib.metadata +import io import math +import zlib +from collections.abc import Iterable from typing import TYPE_CHECKING, Literal from pydantic import Field from pyroaring import BitMap, FrozenBitMap +from pyiceberg.io import OutputFile from pyiceberg.typedef import IcebergBaseModel if TYPE_CHECKING: @@ -27,9 +32,13 @@ # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" +DELETION_VECTOR_MAGIC = b"\xd1\xd3\x39\x64" EMPTY_BITMAP = FrozenBitMap() MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" +# Reserved field id of the row position (_pos) metadata column, referenced by +# deletion-vector-v1 blob metadata (Java: MetadataColumns.ROW_POSITION) +ROW_POSITION_FIELD_ID = 2147483645 def _deserialize_bitmap(pl: bytes) -> list[BitMap]: @@ -62,6 +71,35 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: return bitmaps +def _serialize_bitmaps(bitmaps: dict[int, BitMap]) -> bytes: + """ + Serialize a dictionary of bitmaps into a byte array. + + The format is: + - 8 bytes: number of bitmaps (little-endian) + - For each bitmap: + - 4 bytes: key (little-endian) + - n bytes: serialized bitmap + """ + with io.BytesIO() as out: + sorted_keys = sorted(bitmaps.keys()) + + # number of bitmaps + out.write(len(sorted_keys).to_bytes(8, "little")) + + for key in sorted_keys: + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + + # key + out.write(key.to_bytes(4, "little")) + # bitmap + out.write(bitmaps[key].serialize()) + return out.getvalue() + + class PuffinBlobMetadata(IcebergBaseModel): type: Literal["deletion-vector-v1"] = Field() fields: list[int] = Field() @@ -114,3 +152,99 @@ def __init__(self, puffin: bytes) -> None: def to_vector(self) -> dict[str, "pa.ChunkedArray"]: return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + + +class PuffinWriter: + """Writes a Puffin file containing a single deletion-vector-v1 blob to an output file.""" + + _output_file: OutputFile + _blobs: list[PuffinBlobMetadata] + _blob_payloads: list[bytes] + _created_by: str + + def __init__(self, output_file: OutputFile, created_by: str | None = None) -> None: + self._output_file = output_file + self._blobs = [] + self._blob_payloads = [] + self._created_by = ( + created_by if created_by is not None else f"PyIceberg version {importlib.metadata.version('pyiceberg')}" + ) + + def set_blob( + self, + positions: Iterable[int], + referenced_data_file: str, + ) -> None: + """Set the deletion vector blob for a data file, replacing any previously set blob. + + Args: + positions: Zero-based positions of the deleted rows in the referenced data file. + referenced_data_file: Location of the data file the deletion vector applies to. + """ + # We only support one blob at the moment + self._blobs = [] + self._blob_payloads = [] + + bitmaps: dict[int, BitMap] = {} + for pos in positions: + if pos < 0: + raise ValueError(f"Invalid position: {pos}, positions must be non-negative") + key = pos >> 32 + low_bits = pos & 0xFFFFFFFF + if key not in bitmaps: + bitmaps[key] = BitMap() + bitmaps[key].add(low_bits) + + if not bitmaps: + raise ValueError("Deletion vector must contain at least one position") + + cardinality = sum(len(bm) for bm in bitmaps.values()) + vector_payload = _serialize_bitmaps(bitmaps) + + # deletion-vector-v1 blob layout: combined length of magic and vector (4 bytes, big-endian), + # the DV magic bytes, the serialized vector, and a CRC-32 checksum of magic + vector (4 bytes, big-endian) + blob_content = DELETION_VECTOR_MAGIC + vector_payload + self._blob_payloads.append( + len(blob_content).to_bytes(4, "big") + blob_content + zlib.crc32(blob_content).to_bytes(4, "big") + ) + + self._blobs.append( + PuffinBlobMetadata( + type="deletion-vector-v1", + fields=[ROW_POSITION_FIELD_ID], + # -1 means the snapshot id and sequence number are inherited at commit time + snapshot_id=-1, + sequence_number=-1, + # offset and length are placeholders; finish() fills them in when assembling the file + offset=0, + length=0, + properties={PROPERTY_REFERENCED_DATA_FILE: referenced_data_file, "cardinality": str(cardinality)}, + compression_codec=None, + ) + ) + + def finish(self) -> int: + """Write the Puffin file to the output file and return its size in bytes.""" + with io.BytesIO() as out: + out.write(MAGIC_BYTES) + + blobs_metadata: list[PuffinBlobMetadata] = [] + for blob_metadata, blob_payload in zip(self._blobs, self._blob_payloads, strict=True): + blobs_metadata.append(blob_metadata.model_copy(update={"offset": out.tell(), "length": len(blob_payload)})) + out.write(blob_payload) + + footer = Footer(blobs=blobs_metadata, properties={"created-by": self._created_by}) + footer_payload_bytes = footer.model_dump_json(by_alias=True, exclude_none=True).encode("utf-8") + + out.write(MAGIC_BYTES) + out.write(footer_payload_bytes) + out.write(len(footer_payload_bytes).to_bytes(4, "little")) + out.write((0).to_bytes(4, "little")) # flags + out.write(MAGIC_BYTES) + + puffin_bytes = out.getvalue() + + with self._output_file.create(overwrite=True) as output_stream: + output_stream.write(puffin_bytes) + + return len(puffin_bytes) diff --git a/tests/table/test_puffin.py b/tests/table/test_puffin.py index bf8c82014c..5bc091033d 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_puffin.py @@ -14,12 +14,23 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib.metadata +import zlib from os import path +from pathlib import Path import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.table.puffin import ( + DELETION_VECTOR_MAGIC, + MAGIC_BYTES, + PROPERTY_REFERENCED_DATA_FILE, + PuffinFile, + PuffinWriter, + _deserialize_bitmap, +) def _open_file(file: str) -> bytes: @@ -71,3 +82,142 @@ def test_map_high_vals() -> None: with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): _ = _deserialize_bitmap(puffin) + + +def _new_writer(tmp_path: Path, created_by: str | None = None) -> tuple[PuffinWriter, Path]: + puffin_path = tmp_path / "test.puffin" + return PuffinWriter(PyArrowFileIO().new_output(str(puffin_path)), created_by=created_by), puffin_path + + +def test_puffin_round_trip(tmp_path: Path) -> None: + # Define some deletion positions for a file + deletions = [5, (1 << 32) + 1, 5] # Test with a high-bit position and duplicate + + file_path = "path/to/data.parquet" + + # Write the Puffin file + writer, puffin_path = _new_writer(tmp_path, created_by="my-test-app") + writer.set_blob(positions=deletions, referenced_data_file=file_path) + size = writer.finish() + + # Read the Puffin file back + puffin_bytes = puffin_path.read_bytes() + assert size == len(puffin_bytes) + reader = PuffinFile(puffin_bytes) + + # Assert footer metadata + assert reader.footer.properties["created-by"] == "my-test-app" + assert len(reader.footer.blobs) == 1 + + blob_meta = reader.footer.blobs[0] + assert blob_meta.properties[PROPERTY_REFERENCED_DATA_FILE] == file_path + assert blob_meta.properties["cardinality"] == str(len(set(deletions))) + + # Assert the content of deletion vectors + read_vectors = reader.to_vector() + + assert file_path in read_vectors + assert read_vectors[file_path].to_pylist() == sorted(set(deletions)) + + +def test_puffin_round_trip_with_sparse_bitmap_keys(tmp_path: Path) -> None: + # High bits 0 and 2 are present while 1 is absent; the writer must emit sorted keys + # and the reader pads the missing key with an empty bitmap. + positions = [3, (2 << 32) + 4] + writer, puffin_path = _new_writer(tmp_path) + writer.set_blob(positions=positions, referenced_data_file="file.parquet") + writer.finish() + + vectors = PuffinFile(puffin_path.read_bytes()).to_vector() + assert vectors["file.parquet"].to_pylist() == positions + + +def test_write_and_read_puffin_file(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) + writer.set_blob(positions=[1, 2, 3], referenced_data_file="file1.parquet") + writer.set_blob(positions=[4, 5, 6], referenced_data_file="file2.parquet") + writer.finish() + + reader = PuffinFile(puffin_path.read_bytes()) + + assert len(reader.footer.blobs) == 1 + blob = reader.footer.blobs[0] + + assert blob.properties["referenced-data-file"] == "file2.parquet" + assert blob.properties["cardinality"] == "3" + assert blob.type == "deletion-vector-v1" + # Reserved field id of the row position column (Java MetadataColumns.ROW_POSITION, INT_MAX - 2); + # required for Java/Spark interoperability. + assert blob.fields == [2147483645] + assert blob.snapshot_id == -1 + assert blob.sequence_number == -1 + assert blob.compression_codec is None + + vectors = reader.to_vector() + assert len(vectors) == 1 + assert "file1.parquet" not in vectors + assert vectors["file2.parquet"].to_pylist() == [4, 5, 6] + + +def test_deletion_vector_blob_framing_is_spec_compliant(tmp_path: Path) -> None: + # PuffinFile reads only the serialized vector, skipping the blob's length prefix, + # deletion-vector magic and CRC-32. Assert that framing directly at the byte level so + # the bytes an external reader (Java/Spark) relies on stay spec-compliant. + positions = [0, 1, 5, (1 << 32) + 7] + writer, puffin_path = _new_writer(tmp_path) + writer.set_blob(positions=positions, referenced_data_file="file.parquet") + writer.finish() + puffin_bytes = puffin_path.read_bytes() + + # The Puffin file begins with the magic. + assert puffin_bytes[:4] == MAGIC_BYTES + + blob = PuffinFile(puffin_bytes).footer.blobs[0] + blob_bytes = puffin_bytes[blob.offset : blob.offset + blob.length] + + # Layout: length (4B big-endian) | DV magic (4B) | vector | CRC-32 (4B big-endian), + # where the length and CRC-32 both cover the magic bytes plus the vector. + length_prefix = int.from_bytes(blob_bytes[0:4], "big") + dv_magic = blob_bytes[4:8] + vector = blob_bytes[8 : 4 + length_prefix] + crc = int.from_bytes(blob_bytes[4 + length_prefix : 8 + length_prefix], "big") + + assert dv_magic == DELETION_VECTOR_MAGIC + assert length_prefix == len(dv_magic) + len(vector) + assert blob.length == 4 + length_prefix + 4 + assert crc == zlib.crc32(dv_magic + vector) + + +def test_puffin_file_with_no_blobs(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) + writer.finish() + + reader = PuffinFile(puffin_path.read_bytes()) + assert len(reader.footer.blobs) == 0 + assert len(reader.to_vector()) == 0 + + +def test_puffin_writer_default_created_by(tmp_path: Path) -> None: + writer, puffin_path = _new_writer(tmp_path) + writer.finish() + + reader = PuffinFile(puffin_path.read_bytes()) + assert reader.footer.properties["created-by"] == f"PyIceberg version {importlib.metadata.version('pyiceberg')}" + + +def test_set_blob_rejects_negative_positions(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) + with pytest.raises(ValueError, match="Invalid position: -1"): + writer.set_blob(positions=[1, -1], referenced_data_file="file.parquet") + + +def test_set_blob_rejects_empty_positions(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) + with pytest.raises(ValueError, match="Deletion vector must contain at least one position"): + writer.set_blob(positions=[], referenced_data_file="file.parquet") + + +def test_set_blob_rejects_position_exceeding_java_key_range(tmp_path: Path) -> None: + writer, _ = _new_writer(tmp_path) + with pytest.raises(ValueError, match="Key 2147483648 is too large, max 2147483647"): + writer.set_blob(positions=[(2**31) << 32], referenced_data_file="file.parquet")