Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@
visit_with_partner,
)
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
from pyiceberg.table.deletion_vector import DeletionVector
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
from pyiceberg.types import (
Expand Down Expand Up @@ -1141,7 +1141,7 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]
with io.new_input(data_file.file_path).open() as fi:
payload = fi.read()

return PuffinFile(payload).to_vector()
return DeletionVector(payload).to_vector()
else:
raise ValueError(f"Delete file format not supported: {data_file.file_format}")

Expand Down
79 changes: 79 additions & 0 deletions pyiceberg/table/deletion_vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import math
from typing import TYPE_CHECKING

from pyroaring import BitMap, FrozenBitMap

from pyiceberg.table.puffin import PuffinFile

if TYPE_CHECKING:
import pyarrow as pa

EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"


class DeletionVector:
_deletion_vectors: dict[str, list[BitMap]]

def __init__(self, puffin: bytes) -> None:
puffin_file = PuffinFile(puffin)
self._deletion_vectors = {
blob.properties[PROPERTY_REFERENCED_DATA_FILE]: self._deserialize_bitmap(puffin_file.get_blob_payload(blob))
for blob in puffin_file.footer.blobs
}

@staticmethod
def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little")
pl = pl[8:]

bitmaps = []
last_key = -1
for _ in range(number_of_bitmaps):
key = int.from_bytes(pl[0:4], byteorder="little")
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key <= last_key:
raise ValueError("Keys must be sorted in ascending order")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")
pl = pl[4:]

while last_key < key - 1:
bitmaps.append(EMPTY_BITMAP)
last_key += 1

bm = BitMap().deserialize(pl)
# TODO: Optimize this
pl = pl[len(bm.serialize()) :]
bitmaps.append(bm)

last_key = key

return bitmaps

@staticmethod
def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
import pyarrow as pa

return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: self._bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}
56 changes: 5 additions & 51 deletions pyiceberg/table/puffin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import math
from typing import TYPE_CHECKING, Literal

from pydantic import Field
from pyroaring import BitMap, FrozenBitMap

from pyiceberg.typedef import IcebergBaseModel

if TYPE_CHECKING:
import pyarrow as pa
pass

# Short for: Puffin Fratercula arctica, version 1
MAGIC_BYTES = b"PFA1"
EMPTY_BITMAP = FrozenBitMap()
MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1
PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file"


def _deserialize_bitmap(pl: bytes) -> list[BitMap]:
number_of_bitmaps = int.from_bytes(pl[0:8], byteorder="little")
pl = pl[8:]

bitmaps = []
last_key = -1
for _ in range(number_of_bitmaps):
key = int.from_bytes(pl[0:4], byteorder="little")
if key < 0:
raise ValueError(f"Invalid unsigned key: {key}")
if key <= last_key:
raise ValueError("Keys must be sorted in ascending order")
if key > MAX_JAVA_SIGNED:
raise ValueError(f"Key {key} is too large, max {MAX_JAVA_SIGNED} to maintain compatibility with Java impl")
pl = pl[4:]

while last_key < key - 1:
bitmaps.append(EMPTY_BITMAP)
last_key += 1

bm = BitMap().deserialize(pl)
# TODO: Optimize this
pl = pl[len(bm.serialize()) :]
bitmaps.append(bm)

last_key = key

return bitmaps


class PuffinBlobMetadata(IcebergBaseModel):
Expand All @@ -78,15 +43,9 @@ class Footer(IcebergBaseModel):
properties: dict[str, str] = Field(default_factory=dict)


def _bitmaps_to_chunked_array(bitmaps: list[BitMap]) -> "pa.ChunkedArray":
import pyarrow as pa

return pa.chunked_array([(key_pos << 32) + pos for pos in bitmap] for key_pos, bitmap in enumerate(bitmaps))


class PuffinFile:
footer: Footer
_deletion_vectors: dict[str, list[BitMap]]
_payload: bytes

def __init__(self, puffin: bytes) -> None:
for magic_bytes in [puffin[:4], puffin[-4:]]:
Expand All @@ -105,12 +64,7 @@ def __init__(self, puffin: bytes) -> None:
footer_payload_size_int = int.from_bytes(puffin[-12:-8], byteorder="little")

self.footer = Footer.model_validate_json(puffin[-(footer_payload_size_int + 12) : -12])
puffin = puffin[8:]

self._deletion_vectors = {
blob.properties[PROPERTY_REFERENCED_DATA_FILE]: _deserialize_bitmap(puffin[blob.offset : blob.offset + blob.length])
for blob in self.footer.blobs
}
self._payload = puffin[8:]

def to_vector(self) -> dict[str, "pa.ChunkedArray"]:
return {path: _bitmaps_to_chunked_array(bitmaps) for path, bitmaps in self._deletion_vectors.items()}
def get_blob_payload(self, blob: PuffinBlobMetadata) -> bytes:
return self._payload[blob.offset : blob.offset + blob.length]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pytest
from pyroaring import BitMap

from pyiceberg.table.puffin import _deserialize_bitmap
from pyiceberg.table.deletion_vector import DeletionVector


def _open_file(file: str) -> bytes:
Expand All @@ -32,7 +32,7 @@ def test_map_empty() -> None:
puffin = _open_file("64mapempty.bin")

expected: list[BitMap] = []
actual = _deserialize_bitmap(puffin)
actual = DeletionVector._deserialize_bitmap(puffin)

assert expected == actual

Expand All @@ -41,7 +41,7 @@ def test_map_bitvals() -> None:
puffin = _open_file("64map32bitvals.bin")

expected = [BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])]
actual = _deserialize_bitmap(puffin)
actual = DeletionVector._deserialize_bitmap(puffin)

assert expected == actual

Expand All @@ -61,7 +61,7 @@ def test_map_spread_vals() -> None:
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
BitMap([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
]
actual = _deserialize_bitmap(puffin)
actual = DeletionVector._deserialize_bitmap(puffin)

assert expected == actual

Expand All @@ -70,4 +70,4 @@ def test_map_high_vals() -> None:
puffin = _open_file("64maphighvals.bin")

with pytest.raises(ValueError, match="Key 4022190063 is too large, max 2147483647 to maintain compatibility with Java impl"):
_ = _deserialize_bitmap(puffin)
_ = DeletionVector._deserialize_bitmap(puffin)