Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e045fa2
Fixed iceberg alter table modify rename
subkanthi May 27, 2026
ae62718
Resolved merge conflicts
subkanthi May 27, 2026
067b3f7
Fixed iceberg test_alter_orphan_metadata_cleanup_on_catalog_failure test
subkanthi May 27, 2026
5be2aba
Fix ai audit defects
subkanthi Jun 1, 2026
8270a09
Fixed alter drop test and resolved merge conflicts
subkanthi Jun 1, 2026
bcbbf5e
Changed modify column test to include Int32
subkanthi Jun 1, 2026
2e7a9e7
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 1, 2026
38d8f13
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 1, 2026
f6e082a
Fixed failing tests
subkanthi Jun 1, 2026
889ec3f
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 1, 2026
3d2c287
Resolved merge conflicts
subkanthi Jun 4, 2026
c6a6fec
Fixed bug with set-current-schema introduced by merge conflict
subkanthi Jun 4, 2026
fc533cd
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 5, 2026
81c311d
Fix metadata not initialized error
subkanthi Jun 6, 2026
01c9fa6
Merge branch 'antalya_26_3_fix_alter_table_iceberg_new' of https://gi…
subkanthi Jun 6, 2026
6324578
Set current schema-id to -1 before performing an update
subkanthi Jun 8, 2026
4dc8f90
Added test for modify column
subkanthi Jun 12, 2026
90cf057
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 15, 2026
810890a
Merge branch 'antalya_26_3_fix_alter_table_iceberg_new' of https://gi…
subkanthi Jun 18, 2026
f6e2c07
Add isDatalakeCatalog to IDatabase
subkanthi Jun 18, 2026
20cfc23
Fixed integration tests
subkanthi Jun 18, 2026
e1f3006
Rollback change to fix failing tests in Utils.cpp
subkanthi Jun 19, 2026
0ef5ae8
Fixed update unit test
subkanthi Jun 19, 2026
cee5096
Fixed path of s3 in create table test
subkanthi Jun 19, 2026
8d29dd5
Merge branch 'antalya-26.3' into antalya_26_3_fix_alter_table_iceberg…
subkanthi Jun 19, 2026
f43581c
Revert back changes to test_database_iceberg/test.py
subkanthi Jun 22, 2026
5102206
Merge branch 'antalya_26_3_fix_alter_table_iceberg_new' of https://gi…
subkanthi Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ static struct InitFiu
ONCE(iceberg_writes_cleanup) \
ONCE(iceberg_writes_non_retry_cleanup) \
ONCE(iceberg_writes_post_publish_throw) \
ONCE(iceberg_alter_catalog_update_metadata_fail) \
REGULAR(iceberg_alter_orphan_metadata_cleanup_fail) \
REGULAR(datalake_iceberg_metadata_create_fail) \
ONCE(iceberg_export_after_commit_before_zk_completed) \
REGULAR(export_partition_commit_always_throw) \
ONCE(export_partition_status_change_throw) \
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DatabaseDataLake final : public IDatabase, WithContext

bool shouldBeEmptyOnDetach() const override { return false; }
bool isRemoteDatabase() const override { return true; }
bool isDatalakeCatalog() const override { return true; }

bool empty() const override;

Expand Down
280 changes: 230 additions & 50 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <Poco/JSON/Stringifier.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/CurrentThread.h>
Expand Down Expand Up @@ -32,6 +33,7 @@
#include <Interpreters/Context.h>
#include <filesystem>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -45,6 +47,9 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>
#include <unordered_set>


namespace DB::ErrorCodes
{
Expand All @@ -54,6 +59,10 @@ namespace DB::ErrorCodes
extern const int CATALOG_NAMESPACE_DISABLED;
}

namespace DB::FailPoints
{
extern const char iceberg_alter_catalog_update_metadata_fail[];
}
namespace ProfileEvents
{
extern const Event DataLakeRestCatalogLoadConfig;
Expand Down Expand Up @@ -141,6 +150,218 @@ String encodeNamespaceForURI(const String & namespace_name)

}

namespace
{
Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}

void collectSchemaFieldIdsFromFields(const Poco::JSON::Array::Ptr & fields, std::unordered_set<Int32> & ids)
{
if (!fields)
return;

for (UInt32 i = 0; i < fields->size(); ++i)
{
auto field = fields->getObject(i);
ids.insert(field->getValue<Int32>(DB::Iceberg::f_id));
if (field->has(DB::Iceberg::f_fields))
collectSchemaFieldIdsFromFields(field->getArray(DB::Iceberg::f_fields), ids);
}
}

/// Returns true when the table default sort order references a column absent from the new schema.
bool sortOrderIncompatibleWithSchema(
const Poco::JSON::Object::Ptr & metadata, const Poco::JSON::Object::Ptr & new_schema_obj)
{
if (!metadata->has(DB::Iceberg::f_sort_orders) || !metadata->has(DB::Iceberg::f_default_sort_order_id))
return false;

const Int64 default_sort_order_id = metadata->getValue<Int64>(DB::Iceberg::f_default_sort_order_id);
auto sort_orders = metadata->getArray(DB::Iceberg::f_sort_orders);

Poco::JSON::Object::Ptr default_sort_order;
for (UInt32 i = 0; i < sort_orders->size(); ++i)
{
auto sort_order = sort_orders->getObject(i);
if (sort_order->getValue<Int64>(DB::Iceberg::f_order_id) == default_sort_order_id)
{
default_sort_order = sort_order;
break;
}
}

if (!default_sort_order || !default_sort_order->has(DB::Iceberg::f_fields))
return false;

auto sort_fields = default_sort_order->getArray(DB::Iceberg::f_fields);
if (sort_fields->size() == 0)
return false;

std::unordered_set<Int32> new_schema_field_ids;
if (new_schema_obj->has(DB::Iceberg::f_fields))
collectSchemaFieldIdsFromFields(new_schema_obj->getArray(DB::Iceberg::f_fields), new_schema_field_ids);

for (UInt32 i = 0; i < sort_fields->size(); ++i)
{
auto field = sort_fields->getObject(i);
if (!field->has(DB::Iceberg::f_source_id))
continue;

const Int32 source_id = field->getValue<Int32>(DB::Iceberg::f_source_id);
if (!new_schema_field_ids.contains(source_id))
return true;
}

return false;
}
}

Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot)
{

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing code moved to a separate function.

if (!new_snapshot)
return nullptr;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

// Schema-change commit path (ALTER TABLE add/drop/modify/rename column).
if (new_snapshot->has(DB::Iceberg::f_schemas))
{
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{} is missing '{}' field",
namespace_name, table_name, DB::Iceberg::f_current_schema_id);

const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
if (!new_schema_obj)
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{}: no schema object matching current-schema-id={}",
namespace_name, table_name, new_schema_id);

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
if (!schema_for_rest->has("identifier-field-ids"))
{
Poco::JSON::Array::Ptr empty_identifier_field_ids = new Poco::JSON::Array;
schema_for_rest->set("identifier-field-ids", empty_identifier_field_ids);
}

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
// Iceberg REST spec: schema-id == -1 means "the last added schema".
// The catalog assigns schema ids itself and may reuse an existing id when
// the new schema is identical, so we must not assume our locally computed id.
set_current_schema->set("schema-id", -1);
updates->add(set_current_schema);
}
if (sortOrderIncompatibleWithSchema(new_snapshot, new_schema_obj))
{
Poco::JSON::Object::Ptr unsorted_sort_order = new Poco::JSON::Object;
unsorted_sort_order->set(DB::Iceberg::f_order_id, 0);
unsorted_sort_order->set(DB::Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array));

Poco::JSON::Object::Ptr add_sort_order = new Poco::JSON::Object;
add_sort_order->set("action", "add-sort-order");
add_sort_order->set("sort-order", unsorted_sort_order);
updates->add(add_sort_order);

Poco::JSON::Object::Ptr set_default_sort_order = new Poco::JSON::Object;
set_default_sort_order->set("action", "set-default-sort-order");
set_default_sort_order->set("sort-order-id", -1);
updates->add(set_default_sort_order);
}
request_body->set("updates", updates);
}
else
{
// Snapshot-append commit path (INSERT / position-delete mutation).
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}
{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
updates->add(set_snapshot);
}
request_body->set("updates", updates);
}

return request_body;
}

std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
Expand Down Expand Up @@ -1140,66 +1361,25 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
fiu_do_on(DB::FailPoints::iceberg_alter_catalog_update_metadata_fail, { return false; });

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name;

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
// Throws DB::Exception(DATALAKE_DATABASE_ERROR) on malformed metadata (programming error).
auto request_body = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot);
if (!request_body)
return true; // nothing to commit

try
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogUpdateTable);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::DataLakeRestCatalogUpdateTableMicroseconds);
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException &)
catch (const DB::HTTPException & ex)
{
LOG_WARNING(log, "Iceberg REST updateMetadata for {}.{} failed: {}",
namespace_name, table_name, ex.displayText());
return false;
}
return true;
Expand Down
10 changes: 10 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ struct AccessToken
}
};

/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update).
///
/// Returns `nullptr` when `new_snapshot` is null (nothing to commit). Throws
/// `DB::Exception(DATALAKE_DATABASE_ERROR)` with a specific message when the metadata
/// blob is malformed (e.g. missing `current-schema-id`, no schema object matching it).
Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name,
const String & table_name,
Poco::JSON::Object::Ptr new_snapshot);

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down
Loading
Loading