From 0f80215a55ca83ce1ab41323c4b8539202b7595c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 13 Jun 2026 14:36:32 -0700 Subject: [PATCH] Load table credentials from credentials endpoint --- pyiceberg/catalog/rest/__init__.py | 43 ++++++++++-- tests/catalog/test_rest.py | 103 ++++++++++++++++++++++++++--- 2 files changed, 133 insertions(+), 13 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index c85ce02609..4dfa6fad36 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -21,6 +21,7 @@ from typing import ( TYPE_CHECKING, Any, + cast, ) from urllib.parse import quote, unquote @@ -233,6 +234,7 @@ class ScanPlanningMode(Enum): ACCESS_DELEGATION_DEFAULT = "vended-credentials" +ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation" AUTHORIZATION_HEADER = "Authorization" BEARER_PREFIX = "Bearer" CATALOG_SCOPE = "catalog" @@ -486,6 +488,40 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l return best_match.config if best_match else {} + def _should_load_credentials_from_endpoint(self) -> bool: + if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints: + return False + + access_delegation = cast(str, self._session.headers.get(ACCESS_DELEGATION_HEADER, "")) + # The spec encodes access delegation as a comma-separated list of mechanisms. + # Load credentials only when vended-credentials is requested. + return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(",")) + + def _resolve_table_credentials_from_response_or_endpoint( + self, identifier_tuple: tuple[str, ...], table_response: TableResponse + ) -> Properties: + """Resolve credentials from the table response or load them from the credentials endpoint. + + Inline storage-credentials from the table response take precedence. When the response + does not include credentials, the catalog advertises loadCredentials, and vended + credentials are requested, load credentials from the table's /credentials endpoint. + """ + location = table_response.metadata_location or table_response.metadata.location + # Java keeps storage credentials with FileIO for path-level selection. PyIceberg resolves + # them to FileIO properties here, so it needs a location to choose a matching prefix. + if not location: + return {} + + credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location) + if table_response.storage_credentials: + return credential_config + + if not self._should_load_credentials_from_endpoint(): + return {} + + credentials_response = self._load_credentials(identifier_tuple) + return self._resolve_storage_credentials(credentials_response.storage_credentials, location) + def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: merged_properties = {**self.properties, **properties} if self._auth_manager: @@ -852,10 +888,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin session.mount(self.uri, SigV4Adapter(**self.properties)) def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table: - # Per Iceberg spec: storage-credentials take precedence over config - credential_config = self._resolve_storage_credentials( - table_response.storage_credentials, table_response.metadata_location - ) + credential_config = self._resolve_table_credentials_from_response_or_endpoint(identifier_tuple, table_response) return Table( identifier=identifier_tuple, metadata_location=table_response.metadata_location, # type: ignore @@ -904,7 +937,7 @@ def _config_headers(self, session: Session) -> None: session.headers["Content-type"] = "application/json" session.headers["User-Agent"] = f"PyIceberg/{__version__}" session.headers["X-Client-Version"] = f"PyIceberg {__version__}" - session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT) + session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT) def _create_table( self, diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 23bb14cb54..29cf9f7c38 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -104,7 +104,6 @@ Capability.V1_DELETE_TABLE, Capability.V1_RENAME_TABLE, Capability.V1_REGISTER_TABLE, - Capability.V1_LOAD_CREDENTIALS, Capability.V1_LIST_VIEWS, Capability.V1_LOAD_VIEW, Capability.V1_VIEW_EXISTS, @@ -1452,10 +1451,19 @@ def test_load_table_200_loading_mode( def test_load_table_honor_access_delegation( - rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] + requests_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] ) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]], + }, + status_code=200, + ) test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"} - rest_mock.get( + requests_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/table", json=example_table_metadata_with_snapshot_v1_rest_json, status_code=200, @@ -3114,9 +3122,20 @@ def test_resolve_storage_credentials_empty() -> None: assert RestCatalog._resolve_storage_credentials([], None) == {} -def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None: +def test_load_table_with_storage_credentials( + requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any] +) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]], + }, + status_code=200, + ) metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" - rest_mock.get( + requests_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/table", json={ "metadata-location": metadata_location, @@ -3146,10 +3165,78 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me assert table.io.properties["s3.access-key-id"] == "vended-key" assert table.io.properties["s3.secret-access-key"] == "vended-secret" assert table.io.properties["s3.session-token"] == "vended-token" + assert len(requests_mock.request_history) == 2 -def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None: - rest_mock.get( +def test_load_table_loads_credentials_when_endpoint_supported( + requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any] +) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]], + }, + status_code=200, + ) + metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" + requests_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json={ + "metadata-location": metadata_location, + "metadata": example_table_metadata_with_snapshot_v1, + "config": { + "s3.access-key-id": "from-config", + "s3.secret-access-key": "from-config-secret", + }, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + requests_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials", + json={ + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/", + "config": {"s3.access-key-id": "short-prefix-key"}, + }, + { + "prefix": "s3://warehouse/database/table", + "config": { + "s3.access-key-id": "long-prefix-key", + "s3.secret-access-key": "long-prefix-secret", + }, + }, + ], + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + + assert table.io.properties["s3.access-key-id"] == "long-prefix-key" + assert table.io.properties["s3.secret-access-key"] == "long-prefix-secret" + assert [request.url for request in requests_mock.request_history] == [ + f"{TEST_URI}v1/config", + f"{TEST_URI}v1/namespaces/fokko/tables/table", + f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials", + ] + + +def test_load_credentials_with_longest_prefix(requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]], + }, + status_code=200, + ) + requests_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials", json={ "storage-credentials": [ @@ -3177,7 +3264,7 @@ def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None: ) assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"} - assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials" + assert requests_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials" def test_load_table_without_storage_credentials(