From 74e0d7bce4caf1d8d0d5649d99fc48af2342cebf Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Sat, 13 Jun 2026 09:07:36 +0900 Subject: [PATCH] Extract DeletionVector logic from PuffinFile --- pyiceberg/io/pyarrow.py | 4 +- pyiceberg/table/deletion_vector.py | 79 +++++++++++++++++++ pyiceberg/table/puffin.py | 56 ++----------- ...test_puffin.py => test_deletion_vector.py} | 10 +-- 4 files changed, 91 insertions(+), 58 deletions(-) create mode 100644 pyiceberg/table/deletion_vector.py rename tests/table/{test_puffin.py => test_deletion_vector.py} (88%) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4ec7a73afe..c73d77eca2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -144,10 +144,10 @@ visit_with_partner, ) from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties +from pyiceberg.table.deletion_vector import DeletionVector from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping -from pyiceberg.table.puffin import PuffinFile from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion from pyiceberg.types import ( @@ -1141,7 +1141,7 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] with io.new_input(data_file.file_path).open() as fi: payload = fi.read() - return PuffinFile(payload).to_vector() + return DeletionVector(payload).to_vector() else: raise ValueError(f"Delete file format not supported: {data_file.file_format}") diff --git a/pyiceberg/table/deletion_vector.py b/pyiceberg/table/deletion_vector.py new file mode 100644 index 0000000000..c4cc395d6f --- /dev/null +++ b/pyiceberg/table/deletion_vector.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import math +from typing import TYPE_CHECKING + +from pyroaring import BitMap, FrozenBitMap + +from pyiceberg.table.puffin import PuffinFile + +if TYPE_CHECKING: + import pyarrow as pa + +EMPTY_BITMAP = FrozenBitMap() +MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 +PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" + + +class DeletionVector: + _deletion_vectors: dict[str, list[BitMap]] + + def __init__(self, puffin: bytes) -> None: + puffin_file = PuffinFile(puffin) + self._deletion_vectors = { + blob.properties[PROPERTY_REFERENCED_DATA_FILE]: self._deserialize_bitmap(puffin_file.get_blob_payload(blob)) + for blob in puffin_file.footer.blobs + } + + @staticmethod + def _deserialize_bitmap(pl: bytes) -> list[BitMap]: + number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little") + pl = pl[8:] + + bitmaps = [] + last_key = -1 + for _ in range(number_of_bitmaps): + key = int.from_bytes(pl[0:4], byteorder="little") + if key < 0: + raise ValueError(f"Invalid unsigned key: {key}") + if key <= last_key: + raise ValueError("Keys must be sorted in ascending order") + if key > MAX_JAVA_SIGNED: + raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") + pl = pl[4:] + + while last_key < key - 1: + bitmaps.append(EMPTY_BITMAP) + last_key += 1 + + bm = BitMap().deserialize(pl) + # TODO: Optimize this + pl = pl[len(bm.serialize()) :] + bitmaps.append(bm) + + last_key = key + + return bitmaps + + @staticmethod + def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": + import pyarrow as pa + + return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps)) + + def to_vector(self) -> dict[str, "pa.ChunkedArray"]: + return {path: self._bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} diff --git a/pyiceberg/table/puffin.py b/pyiceberg/table/puffin.py index 917d387f45..3fd048e72e 100644 --- a/pyiceberg/table/puffin.py +++ b/pyiceberg/table/puffin.py @@ -14,52 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import math from typing import TYPE_CHECKING, Literal from pydantic import Field -from pyroaring import BitMap, FrozenBitMap from pyiceberg.typedef import IcebergBaseModel if TYPE_CHECKING: - import pyarrow as pa + pass # Short for: Puffin Fratercula arctica, version 1 MAGIC_BYTES = b"PFA1" -EMPTY_BITMAP = FrozenBitMap() -MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 -PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" - - -def _deserialize_bitmap(pl: bytes) -> list[BitMap]: - number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little") - pl = pl[8:] - - bitmaps = [] - last_key = -1 - for _ in range(number_of_bitmaps): - key = int.from_bytes(pl[0:4], byteorder="little") - if key < 0: - raise ValueError(f"Invalid unsigned key: {key}") - if key <= last_key: - raise ValueError("Keys must be sorted in ascending order") - if key > MAX_JAVA_SIGNED: - raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl") - pl = pl[4:] - - while last_key < key - 1: - bitmaps.append(EMPTY_BITMAP) - last_key += 1 - - bm = BitMap().deserialize(pl) - # TODO: Optimize this - pl = pl[len(bm.serialize()) :] - bitmaps.append(bm) - - last_key = key - - return bitmaps class PuffinBlobMetadata(IcebergBaseModel): @@ -78,15 +43,9 @@ class Footer(IcebergBaseModel): properties: dict[str, str] = Field(default_factory=dict) -def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray": - import pyarrow as pa - - return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps)) - - class PuffinFile: footer: Footer - _deletion_vectors: dict[str, list[BitMap]] + _payload: bytes def __init__(self, puffin: bytes) -> None: for magic_bytes in [puffin[:4], puffin[-4:]]: @@ -105,12 +64,7 @@ def __init__(self, puffin: bytes) -> None: footer_payload_size_int = int.from_bytes(puffin[-12:-8], byteorder="little") self.footer = Footer.model_validate_json(puffin[-(footer_payload_size_int + 12) : -12]) - puffin = puffin[8:] - - self._deletion_vectors = { - blob.properties[PROPERTY_REFERENCED_DATA_FILE]: _deserialize_bitmap(puffin[blob.offset : blob.offset + blob.length]) - for blob in self.footer.blobs - } + self._payload = puffin[8:] - def to_vector(self) -> dict[str, "pa.ChunkedArray"]: - return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()} + def get_blob_payload(self, blob: PuffinBlobMetadata) -> bytes: + return self._payload[blob.offset : blob.offset + blob.length] diff --git a/tests/table/test_puffin.py b/tests/table/test_deletion_vector.py similarity index 88% rename from tests/table/test_puffin.py rename to tests/table/test_deletion_vector.py index bf8c82014c..788216f8b3 100644 --- a/tests/table/test_puffin.py +++ b/tests/table/test_deletion_vector.py @@ -19,7 +19,7 @@ import pytest from pyroaring import BitMap -from pyiceberg.table.puffin import _deserialize_bitmap +from pyiceberg.table.deletion_vector import DeletionVector def _open_file(file: str) -> bytes: @@ -32,7 +32,7 @@ def test_map_empty() -> None: puffin = _open_file("64mapempty.bin") expected: list[BitMap] = [] - actual = _deserialize_bitmap(puffin) + actual = DeletionVector._deserialize_bitmap(puffin) assert expected == actual @@ -41,7 +41,7 @@ def test_map_bitvals() -> None: puffin = _open_file("64map32bitvals.bin") expected = [BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])] - actual = _deserialize_bitmap(puffin) + actual = DeletionVector._deserialize_bitmap(puffin) assert expected == actual @@ -61,7 +61,7 @@ def test_map_spread_vals() -> None: BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), ] - actual = _deserialize_bitmap(puffin) + actual = DeletionVector._deserialize_bitmap(puffin) assert expected == actual @@ -70,4 +70,4 @@ def test_map_high_vals() -> None: puffin = _open_file("64maphighvals.bin") with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"): - _ = _deserialize_bitmap(puffin) + _ = DeletionVector._deserialize_bitmap(puffin)