Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import (
TYPE_CHECKING,
Any,
cast,
)
from urllib.parse import quote, unquote

Expand Down Expand Up @@ -233,6 +234,7 @@ class ScanPlanningMode(Enum):


ACCESS_DELEGATION_DEFAULT = "vended-credentials"
ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation"
AUTHORIZATION_HEADER = "Authorization"
BEARER_PREFIX = "Bearer"
CATALOG_SCOPE = "catalog"
Expand Down Expand Up @@ -486,6 +488,40 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l

return best_match.config if best_match else {}

def _should_load_credentials_from_endpoint(self) -> bool:
if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints:
return False

access_delegation = cast(str, self._session.headers.get(ACCESS_DELEGATION_HEADER, ""))
# The spec encodes access delegation as a comma-separated list of mechanisms.
# Load credentials only when vended-credentials is requested.
return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(","))

def _resolve_table_credentials_from_response_or_endpoint(
self, identifier_tuple: tuple[str, ...], table_response: TableResponse
) -> Properties:
"""Resolve credentials from the table response or load them from the credentials endpoint.

Inline storage-credentials from the table response take precedence. When the response
does not include credentials, the catalog advertises loadCredentials, and vended
credentials are requested, load credentials from the table's /credentials endpoint.
"""
location = table_response.metadata_location or table_response.metadata.location
# Java keeps storage credentials with FileIO for path-level selection. PyIceberg resolves
# them to FileIO properties here, so it needs a location to choose a matching prefix.
if not location:
return {}

credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location)
if table_response.storage_credentials:
return credential_config

if not self._should_load_credentials_from_endpoint():
return {}

credentials_response = self._load_credentials(identifier_tuple)
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
merged_properties = {**self.properties, **properties}
if self._auth_manager:
Expand Down Expand Up @@ -852,10 +888,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
session.mount(self.uri, SigV4Adapter(**self.properties))

def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
# Per Iceberg spec: storage-credentials take precedence over config
credential_config = self._resolve_storage_credentials(
table_response.storage_credentials, table_response.metadata_location
)
credential_config = self._resolve_table_credentials_from_response_or_endpoint(identifier_tuple, table_response)
return Table(
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
Expand Down Expand Up @@ -904,7 +937,7 @@ def _config_headers(self, session: Session) -> None:
session.headers["Content-type"] = "application/json"
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
session.headers["X-Client-Version"] = f"PyIceberg {__version__}"
session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT)
session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT)

def _create_table(
self,
Expand Down
103 changes: 95 additions & 8 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
Capability.V1_DELETE_TABLE,
Capability.V1_RENAME_TABLE,
Capability.V1_REGISTER_TABLE,
Capability.V1_LOAD_CREDENTIALS,
Capability.V1_LIST_VIEWS,
Capability.V1_LOAD_VIEW,
Capability.V1_VIEW_EXISTS,
Expand Down Expand Up @@ -1452,10 +1451,19 @@ def test_load_table_200_loading_mode(


def test_load_table_honor_access_delegation(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
requests_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={
"defaults": {},
"overrides": {},
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
},
status_code=200,
)
test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"}
rest_mock.get(
requests_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
Expand Down Expand Up @@ -3114,9 +3122,20 @@ def test_resolve_storage_credentials_empty() -> None:
assert RestCatalog._resolve_storage_credentials([], None) == {}


def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
def test_load_table_with_storage_credentials(
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={
"defaults": {},
"overrides": {},
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
},
status_code=200,
)
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
rest_mock.get(
requests_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json={
"metadata-location": metadata_location,
Expand Down Expand Up @@ -3146,10 +3165,78 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
assert table.io.properties["s3.access-key-id"] == "vended-key"
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
assert table.io.properties["s3.session-token"] == "vended-token"
assert len(requests_mock.request_history) == 2


def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
rest_mock.get(
def test_load_table_loads_credentials_when_endpoint_supported(
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={
"defaults": {},
"overrides": {},
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
},
status_code=200,
)
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
requests_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json={
"metadata-location": metadata_location,
"metadata": example_table_metadata_with_snapshot_v1,
"config": {
"s3.access-key-id": "from-config",
"s3.secret-access-key": "from-config-secret",
},
},
status_code=200,
request_headers=TEST_HEADERS,
)
requests_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
json={
"storage-credentials": [
{
"prefix": "s3://warehouse/database/",
"config": {"s3.access-key-id": "short-prefix-key"},
},
{
"prefix": "s3://warehouse/database/table",
"config": {
"s3.access-key-id": "long-prefix-key",
"s3.secret-access-key": "long-prefix-secret",
},
},
],
},
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))

assert table.io.properties["s3.access-key-id"] == "long-prefix-key"
assert table.io.properties["s3.secret-access-key"] == "long-prefix-secret"
assert [request.url for request in requests_mock.request_history] == [
f"{TEST_URI}v1/config",
f"{TEST_URI}v1/namespaces/fokko/tables/table",
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
]


def test_load_credentials_with_longest_prefix(requests_mock: Mocker) -> None:
requests_mock.get(
f"{TEST_URI}v1/config",
json={
"defaults": {},
"overrides": {},
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
},
status_code=200,
)
requests_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
json={
"storage-credentials": [
Expand Down Expand Up @@ -3177,7 +3264,7 @@ def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
)

assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
assert requests_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"


def test_load_table_without_storage_credentials(
Expand Down