-
Notifications
You must be signed in to change notification settings - Fork 507
[WIP]Support range-based reads for deletion vectors #3478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -147,7 +147,7 @@ | |
| from pyiceberg.table.locations import load_location_provider | ||
| from pyiceberg.table.metadata import TableMetadata | ||
| from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping | ||
| from pyiceberg.table.puffin import PuffinFile | ||
| from pyiceberg.table.puffin import PuffinFile, _bitmaps_to_chunked_array, _deserialize_dv_blob | ||
| from pyiceberg.transforms import IdentityTransform, TruncateTransform | ||
| from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion | ||
| from pyiceberg.types import ( | ||
|
|
@@ -192,6 +192,8 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
| ONE_MEGABYTE = 1024 * 1024 | ||
| # Match Iceberg Java's Integer.MAX_VALUE limit before reading a DV content range into memory. | ||
| _MAX_DELETION_VECTOR_CONTENT_SIZE = 2**31 - 1 | ||
| BUFFER_SIZE = "buffer-size" | ||
| ICEBERG_SCHEMA = b"iceberg.schema" | ||
| # The PARQUET: in front means that it is Parquet specific, in this case the field_id | ||
|
|
@@ -1116,6 +1118,27 @@ def _get_file_format(file_format: FileFormat, **kwargs: dict[str, Any]) -> ds.Fi | |
| raise ValueError(f"Unsupported file format: {file_format}") | ||
|
|
||
|
|
||
| def _validate_deletion_vector(data_file: DataFile) -> tuple[int, int, str]: | ||
| content_offset = getattr(data_file, "content_offset", None) | ||
| content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None) | ||
| referenced_data_file = getattr(data_file, "referenced_data_file", None) | ||
|
|
||
| if content_offset is None: | ||
| raise ValueError(f"Invalid deletion vector, content offset is missing: {data_file.file_path}") | ||
| if content_size_in_bytes is None: | ||
| raise ValueError(f"Invalid deletion vector, content size is missing: {data_file.file_path}") | ||
| if content_offset < 0: | ||
| raise ValueError(f"Invalid deletion vector, content offset cannot be negative: {content_offset}") | ||
| if content_size_in_bytes < 0: | ||
| raise ValueError(f"Invalid deletion vector, content size cannot be negative: {content_size_in_bytes}") | ||
| if content_size_in_bytes > _MAX_DELETION_VECTOR_CONTENT_SIZE: | ||
| raise ValueError(f"Cannot read deletion vector larger than 2GB: {content_size_in_bytes}") | ||
| if referenced_data_file is None: | ||
| raise ValueError(f"Invalid deletion vector, referenced data file is missing: {data_file.file_path}") | ||
|
|
||
| return content_offset, content_size_in_bytes, referenced_data_file | ||
|
|
||
|
|
||
| def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray]: | ||
| if data_file.file_format == FileFormat.PARQUET: | ||
| with io.new_input(data_file.file_path).open() as fi: | ||
|
|
@@ -1139,6 +1162,22 @@ def _read_deletes(io: FileIO, data_file: DataFile) -> dict[str, pa.ChunkedArray] | |
| } | ||
| elif data_file.file_format == FileFormat.PUFFIN: | ||
| with io.new_input(data_file.file_path).open() as fi: | ||
| content_offset = getattr(data_file, "content_offset", None) | ||
| content_size_in_bytes = getattr(data_file, "content_size_in_bytes", None) | ||
| if content_offset is not None or content_size_in_bytes is not None: | ||
|
Comment on lines
1163
to
+1167
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two fields are never None when the file format is Puffin, right? https://iceberg.apache.org/spec/#data-file-fields
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct @ebyhr , these should always exist. so I think we can do a stricter read on these fields. |
||
| # A DV is declared as PUFFIN in the manifest, but the content range points directly | ||
| # to the serialized bitmap blob, so avoid parsing the entire file as a Puffin file. | ||
| content_offset, content_size_in_bytes, referenced_data_file = _validate_deletion_vector(data_file) | ||
|
|
||
| fi.seek(content_offset) | ||
| payload = fi.read(content_size_in_bytes) | ||
| if len(payload) != content_size_in_bytes: | ||
| raise ValueError( | ||
| f"Could not read deletion vector, expected {content_size_in_bytes} bytes, got {len(payload)}" | ||
| ) | ||
| bitmaps = _deserialize_dv_blob(payload, data_file.record_count) | ||
| return {referenced_data_file: _bitmaps_to_chunked_array(bitmaps)} | ||
|
Comment on lines
+1168
to
+1179
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as below, I think if we abstract away a read_deletion_vector(io, delete_file) -> bytes API it'll be a bit cleaner. That takes care of all the offset handling, io, deserialization, validation etc . |
||
|
|
||
| payload = fi.read() | ||
|
|
||
| return PuffinFile(payload).to_vector() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| import math | ||
| import struct | ||
| import zlib | ||
| from typing import TYPE_CHECKING, Literal | ||
|
|
||
| from pydantic import Field | ||
|
|
@@ -30,6 +32,12 @@ | |
| EMPTY_BITMAP = FrozenBitMap() | ||
| MAX_JAVA_SIGNED = int(math.pow(2, 31)) - 1 | ||
| PROPERTY_REFERENCED_DATA_FILE = "referenced-data-file" | ||
| _DV_BLOB_LENGTH = struct.Struct(">I") | ||
| _DV_BLOB_MAGIC = struct.Struct("<I") | ||
| _DV_BLOB_CRC = struct.Struct(">I") | ||
| _DV_BLOB_MAGIC_NUMBER = 1681511377 | ||
| _ROARING_BITMAP_COUNT_SIZE_BYTES = 8 | ||
| _DV_BLOB_MIN_SIZE_BYTES = _DV_BLOB_LENGTH.size + _DV_BLOB_MAGIC.size + _ROARING_BITMAP_COUNT_SIZE_BYTES + _DV_BLOB_CRC.size | ||
|
Comment on lines
+35
to
+40
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we'd want to decouple the DV specific parts into a separate module, table/deletion_vector.py? That would expose something like
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to decoupling the logic from this class. I submitted a PR yesterday (#3491) that addresses this issue. |
||
|
|
||
|
|
||
| def _deserialize_bitmap(pl: bytes) -> list[BitMap]: | ||
|
|
@@ -62,6 +70,40 @@ def _deserialize_bitmap(pl: bytes) -> list[BitMap]: | |
| return bitmaps | ||
|
|
||
|
|
||
| def _deserialize_dv_blob(blob: bytes, record_count: int | None = None) -> list[BitMap]: | ||
| # The DV blob encoding matches Iceberg Java's BitmapPositionDeleteIndex: | ||
| # 4-byte big-endian bitmap-data length, 4-byte little-endian magic number, | ||
| # portable Roaring bitmap data, and 4-byte big-endian CRC-32. | ||
| if len(blob) < _DV_BLOB_MIN_SIZE_BYTES: | ||
| raise ValueError(f"Invalid deletion vector blob length: {len(blob)}") | ||
|
|
||
| bitmap_data_length = _DV_BLOB_LENGTH.unpack_from(blob)[0] | ||
| expected_bitmap_data_length = len(blob) - _DV_BLOB_LENGTH.size - _DV_BLOB_CRC.size | ||
| if bitmap_data_length != expected_bitmap_data_length: | ||
| raise ValueError(f"Invalid bitmap data length: {bitmap_data_length}, expected {expected_bitmap_data_length}") | ||
|
|
||
| bitmap_data_offset = _DV_BLOB_LENGTH.size | ||
| crc_offset = bitmap_data_offset + bitmap_data_length | ||
| bitmap_data = blob[bitmap_data_offset:crc_offset] | ||
|
|
||
| magic_number = _DV_BLOB_MAGIC.unpack_from(bitmap_data)[0] | ||
| if magic_number != _DV_BLOB_MAGIC_NUMBER: | ||
| raise ValueError(f"Invalid magic number: {magic_number}, expected {_DV_BLOB_MAGIC_NUMBER}") | ||
|
|
||
| checksum = zlib.crc32(bitmap_data) & 0xFFFFFFFF | ||
| expected_checksum = _DV_BLOB_CRC.unpack_from(blob, crc_offset)[0] | ||
| if checksum != expected_checksum: | ||
| raise ValueError("Invalid CRC") | ||
|
|
||
| bitmaps = _deserialize_bitmap(bitmap_data[_DV_BLOB_MAGIC.size :]) | ||
| if record_count is not None: | ||
| cardinality = sum(len(bitmap) for bitmap in bitmaps) | ||
| if cardinality != record_count: | ||
| raise ValueError(f"Invalid cardinality: {cardinality}, expected {record_count}") | ||
|
|
||
| return bitmaps | ||
|
|
||
|
|
||
| class PuffinBlobMetadata(IcebergBaseModel): | ||
| type: Literal["deletion-vector-v1"] = Field() | ||
| fields: list[int] = Field() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be a helper in the deletion_vector module I mentioned below that I think we should introduce. This class looks like it's kinda become a bit of a kitchen sink of different things but the original intent I think was for fileIO and so I don't think we would want to have the low level DV validation here.