From 64d39b60baf72617dbfec05e5842ca92061ab7b6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 13 Jun 2026 13:52:20 -0700 Subject: [PATCH] Add REST load credentials support --- pyiceberg/catalog/rest/__init__.py | 32 +++++++++++++++++++++++++++++ tests/catalog/test_rest.py | 33 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index d085c6fd87..c85ce02609 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -147,6 +147,7 @@ class Endpoints: create_table: str = "namespaces/{namespace}/tables" register_table: str = "namespaces/{namespace}/register" load_table: str = "namespaces/{namespace}/tables/{table}" + load_credentials: str = "namespaces/{namespace}/tables/{table}/credentials" update_table: str = "namespaces/{namespace}/tables/{table}" drop_table: str = "namespaces/{namespace}/tables/{table}" table_exists: str = "namespaces/{namespace}/tables/{table}" @@ -181,6 +182,7 @@ class Capability: V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_table}") V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.rename_table}") V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_table}") + V1_LOAD_CREDENTIALS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_credentials}") V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}") V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}") @@ -293,6 +295,10 @@ class TableResponse(IcebergBaseModel): storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list) +class LoadCredentialsResponse(IcebergBaseModel): + storage_credentials: list[StorageCredential] = Field(alias="storage-credentials") + + class ViewResponse(IcebergBaseModel): metadata_location: str | None = Field(alias="metadata-location", default=None) metadata: ViewMetadata @@ -545,6 +551,32 @@ def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> Sca return ScanTasks.model_validate_json(response.text) + @retry(**_RETRY_ARGS) + def _load_credentials( + self, + identifier: str | Identifier, + ) -> LoadCredentialsResponse: + """Load raw vended storage credentials for a table.""" + self._check_endpoint(Capability.V1_LOAD_CREDENTIALS) + response = self._session.get( + self.url(Endpoints.load_credentials, prefixed=True, **self._split_identifier_for_path(identifier)), + ) + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {404: NoSuchTableError}) + + return LoadCredentialsResponse.model_validate_json(response.text) + + def load_credentials( + self, + identifier: str | Identifier, + location: str, + ) -> Properties: + """Load vended storage credentials and return the best match for a location.""" + credentials_response = self._load_credentials(identifier) + return self._resolve_storage_credentials(credentials_response.storage_credentials, location) + def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]: """Plan a table scan and return FileScanTasks. diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 1eb9f26a56..23bb14cb54 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -104,6 +104,7 @@ Capability.V1_DELETE_TABLE, Capability.V1_RENAME_TABLE, Capability.V1_REGISTER_TABLE, + Capability.V1_LOAD_CREDENTIALS, Capability.V1_LIST_VIEWS, Capability.V1_LOAD_VIEW, Capability.V1_VIEW_EXISTS, @@ -3147,6 +3148,38 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me assert table.io.properties["s3.session-token"] == "vended-token" +def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials", + json={ + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/", + "config": {"s3.access-key-id": "short-prefix-key"}, + }, + { + "prefix": "s3://warehouse/database/table", + "config": { + "s3.access-key-id": "long-prefix-key", + "s3.secret-access-key": "long-prefix-secret", + }, + }, + ], + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + + credentials = catalog.load_credentials( + ("fokko", "table"), + "s3://warehouse/database/table/data/file.parquet", + ) + + assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"} + assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials" + + def test_load_table_without_storage_credentials( rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] ) -> None: