Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/en/antalya/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ In case a table function is used as the destination, the schema can be omitted a
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.

### `export_merge_tree_part_allow_lossy_cast` (Optional)

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead.

When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use. This keeps the Iceberg metadata consistent with the data files.

**Warning:** A lossy cast on a partition column remains semantically truncating. For example, if a table is partitioned by an `Int64` column and some partition values do not fit into a destination `Int32` partition column, both the data files and the Iceberg metadata will contain the truncated `Int32` value (they agree with each other, but the original `Int64` value is lost). Such casts require `export_merge_tree_part_allow_lossy_cast = 1`.


## Examples

Expand Down
10 changes: 10 additions & 0 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires.

### `export_merge_tree_part_allow_lossy_cast` (Optional)

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead.

When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use. This keeps the Iceberg metadata consistent with the data files.

**Warning:** A lossy cast on a partition column remains semantically truncating. For example, if a table is partitioned by an `Int64` column and some partition values do not fit into a destination `Int32` partition column, both the data files and the Iceberg metadata will contain the truncated `Int32` value (they agree with each other, but the original `Int64` value is lost). Such casts require `export_merge_tree_part_allow_lossy_cast = 1`.

## Examples

### Basic Export to S3
Expand Down
5 changes: 5 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7605,6 +7605,11 @@ Has no effect on `EXPORT PARTITION <id>` (single-partition export).
)", 0) \
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
)", 0) \
DECLARE(Bool, export_merge_tree_part_allow_lossy_cast, false, R"(
Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead.

When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use, so the metadata stays consistent with the data. A lossy cast on a partition column remains semantically truncating: both the data files and the metadata contain the truncated value, and such casts require this setting to be enabled.
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"},
{"export_merge_tree_part_allow_lossy_cast", false, false, "New setting to gate lossy casts in EXPORT PART/PARTITION behind explicit acknowledgment"},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct ExportReplicatedMergeTreePartitionManifest
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool write_full_path_in_iceberg_metadata = false;
bool allow_lossy_cast = false;
String iceberg_metadata_json;

std::string toJsonString() const
Expand Down Expand Up @@ -208,6 +209,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("allow_lossy_cast", allow_lossy_cast);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -262,6 +264,11 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

/// Default to true for tasks created before this field existed, so an in-flight
/// export scheduled with the old permissive worker behavior is not wrongly rejected
/// on upgrade. New tasks always persist the initiator's actual choice.
manifest.allow_lossy_cast = json->has("allow_lossy_cast") ? json->getValue<bool>("allow_lossy_cast") : true;

return manifest;
}
};
Expand Down
10 changes: 5 additions & 5 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ It is currently only implemented in StorageObjectStorage.
struct IcebergCommitExportPartitionArguments
{
std::string metadata_json_string;
/// Partition column values (after transforms). Callers are responsible for
/// populating this: the partition-export path parses them from the persisted
/// JSON string, while the direct EXPORT PART path reads them from the part's
/// partition key.
std::vector<Field> partition_values;
/// Representative source partition-key columns from one exported part (the part's
/// minmax block). The destination derives the Iceberg partition tuple from a row of
/// this block by casting to the destination column types and applying the partition
/// transform, so the metadata partition value matches the exported data files.
Block partition_source_block;
};

virtual void commitExportPartitionTransaction(
Expand Down
31 changes: 30 additions & 1 deletion src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,31 @@ namespace
}
}

/// Mirrors `InterpreterInsertQuery::addInsertToSelectPipeline`: positional match,
/// destination header = `getSampleBlockNonMaterialized()`, all type bridging is done
/// by the CAST inside `makeConvertingActions`. No pre-validation, no per-column
/// lossy/non-lossy classification — restrictions are exactly what INSERT SELECT enforces.
void addExportConvertingActions(
QueryPlan & plan_for_part,
const IStorage & destination_storage,
const ContextPtr & local_context)
{
const auto destination_header
= destination_storage.getInMemoryMetadataPtr()->getSampleBlockNonMaterialized();

auto dag = ActionsDAG::makeConvertingActions(
plan_for_part.getCurrentHeader()->getColumnsWithTypeAndName(),
destination_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position,
local_context);

auto expression_step = std::make_unique<ExpressionStep>(
plan_for_part.getCurrentHeader(),
std::move(dag));
expression_step->setStepDescription("Convert source columns to destination types for export");
plan_for_part.addStep(std::move(expression_step));
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
Expand Down Expand Up @@ -261,6 +286,10 @@ bool ExportPartTask::executeStep()
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);

/// Align the pipeline header with the destination's non-materialized sample block,
/// using the same `makeConvertingActions(Position)` call INSERT SELECT performs.
addExportConvertingActions(plan_for_part, *destination_storage, local_context);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Convert partition values before creating the import sink

When the destination is partitioned object storage and the partition column type changes in a castable way (for example source year String, destination year UInt32, both PARTITION BY year), this new row conversion happens only after destination_storage->import has already computed the S3/Hive partition path from block_with_partition_values using the source part's minmax block. That means a schema pair now accepted by the synchronous validation can either fail inside partition-key evaluation with the destination actions seeing source-typed inputs, or write files under a path based on the uncast source value while the exported rows were cast, so the export is not equivalent to INSERT SELECT.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Convert exported partition values as well

When the partition column itself needs a cast (for example a source UInt16 partition key exported to an Iceberg table whose partition column is String, which canBeSafelyCast accepts without opt-in), this step only converts the row stream. StorageObjectStorage::import still receives the source-typed block_with_partition_values, and the Iceberg commit paths still use the source part's partition.value, so the data files are written with destination types while the object path/manifest partition value remains source-typed; for Iceberg this can fail at commit or record an incorrectly typed partition. Please apply the same conversion to the partition values used for import/commit whenever partition-key columns are cast.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected, at least for now.


QueryPlanOptimizationSettings optimization_settings(local_context);
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
Expand Down Expand Up @@ -303,7 +332,7 @@ bool ExportPartTask::executeStep()
{
IStorage::IcebergCommitExportPartitionArguments iceberg_args;
iceberg_args.metadata_json_string = manifest.iceberg_metadata_json;
iceberg_args.partition_values = manifest.data_part->partition.value;
iceberg_args.partition_source_block = block_with_partition_values;

destination_storage->commitExportPartitionTransaction(
manifest.transaction_id,
Expand Down
71 changes: 65 additions & 6 deletions src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <filesystem>
#include <thread>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <DataTypes/Utils.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>

#if USE_AVRO
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
Expand Down Expand Up @@ -37,6 +42,11 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
}

namespace Setting
{
extern const SettingsBool export_merge_tree_part_allow_lossy_cast;
}

namespace FailPoints
{
extern const char iceberg_export_after_commit_before_zk_completed[];
Expand All @@ -47,22 +57,21 @@ namespace fs = std::filesystem;

namespace ExportPartitionUtils
{
std::vector<Field> getPartitionValuesForIcebergCommit(
Block getPartitionSourceBlockForIcebergCommit(
MergeTreeData & storage, const String & partition_id)
{
auto lock = storage.readLockParts();
const auto parts = storage.getDataPartsVectorInPartitionForInternalUsage(
MergeTreeDataPartState::Active, partition_id, lock);

/// todo arthur: bad arguments for now, pick a better one

if (parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART,
"Cannot find active part for partition_id '{}' to derive Iceberg partition "
"values. Edge case: the partition may have been dropped after export started, "
"or this replica has not yet received any part for this partition. "
"The commit will be retried.",
partition_id);
return parts.front()->partition.value;
return parts.front()->minmax_idx->getBlock(storage);
}

ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest)
Expand Down Expand Up @@ -92,6 +101,12 @@ namespace ExportPartitionUtils
/// stalls when the setting is only set at the query level.
context_copy->setSetting("allow_insert_into_iceberg", true);

/// Reapply the initiator's lossy-cast decision (persisted in the manifest) so the
/// worker's schema revalidation honors the user's choice. Without this, a task
/// scheduled without the opt-in could still apply a lossy cast if the destination
/// schema drifts to a lossy target between scheduling and execution.
context_copy->setSetting("export_merge_tree_part_allow_lossy_cast", manifest.allow_lossy_cast);

return context_copy;
}

Expand Down Expand Up @@ -204,8 +219,8 @@ namespace ExportPartitionUtils
{
iceberg_args.metadata_json_string = manifest.iceberg_metadata_json;
if (source_storage.getInMemoryMetadataPtr()->hasPartitionKey())
iceberg_args.partition_values =
getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id);
iceberg_args.partition_source_block =
getPartitionSourceBlockForIcebergCommit(source_storage, manifest.partition_id);
}

destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context);
Expand Down Expand Up @@ -519,6 +534,50 @@ namespace ExportPartitionUtils
}
}
#endif

void verifyExportSchemaCastable(
const StorageMetadataPtr & source_metadata,
const StorageMetadataPtr & destination_metadata,
const StorageID & destination_storage_id,
const ContextPtr & context)
{
/// Build (and discard) the same converting DAG the export worker will build
/// later, to surface structural mismatches (column count, untyped casts) early.
Block source_sample_block;
for (const auto & column : source_metadata->getColumns().getReadable())
source_sample_block.insert({column.type->createColumn(), column.type, column.name});

const auto destination_sample_block = destination_metadata->getSampleBlockNonMaterialized();

const auto source_columns = source_sample_block.getColumnsWithTypeAndName();
const auto destination_columns = destination_sample_block.getColumnsWithTypeAndName();

(void) ActionsDAG::makeConvertingActions(
source_columns,
destination_columns,
ActionsDAG::MatchColumnsMode::Position,
context);

/// Lossy casts may silently change values, so reject them unless the user opts in.
if (context->getSettingsRef()[Setting::export_merge_tree_part_allow_lossy_cast])
return;

const size_t num_columns = std::min(source_columns.size(), destination_columns.size());
for (size_t i = 0; i < num_columns; ++i)
{
const auto & source_column = source_columns[i];
const auto & destination_column = destination_columns[i];
if (!canBeSafelyCast(source_column.type, destination_column.type))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot export to {}: column '{}' requires a lossy cast from {} to {}, "
"which may change values. Set `export_merge_tree_part_allow_lossy_cast = 1` "
"to allow lossy casts during export.",
destination_storage_id.getFullTableName(),
destination_column.name,
source_column.type->getName(),
destination_column.type->getName());
}
}
}

}
24 changes: 18 additions & 6 deletions src/Storages/MergeTree/ExportPartitionUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/Logger.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "Storages/IStorage.h"
#include <Storages/StorageInMemoryMetadata.h>
#include <config.h>

#if USE_AVRO
Expand All @@ -26,14 +27,15 @@ namespace ExportPartitionUtils

ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest);

/// Returns the partition key values for the given partition_id by reading from
/// the first active local part. Throws LOGICAL_ERROR if no such part is found.
/// Returns the representative source partition-key columns (the first active local part's
/// minmax block) for the given partition_id. The destination recomputes the Iceberg partition
/// tuple from this block by casting to its column types and applying the partition transform.
///
/// Edge case: if the partition was dropped after export started, or this replica
/// has not yet received any part for this partition (extreme replication lag on a
/// recovery path), no active part will be found and the commit will fail. The task
/// will be retried on the next poll cycle or picked up by a different replica.
std::vector<Field> getPartitionValuesForIcebergCommit(
Block getPartitionSourceBlockForIcebergCommit(
MergeTreeData & storage, const String & partition_id);

void commit(
Expand Down Expand Up @@ -88,10 +90,20 @@ namespace ExportPartitionUtils
const std::string & exception_message,
const LoggerPtr & log);

/// Validates that source columns can be exported into the destination with the
/// same positional CAST matching as `INSERT INTO dest SELECT * FROM src`. Lossy
/// casts are rejected unless `export_merge_tree_part_allow_lossy_cast` is set.
/// Throws BAD_ARGUMENTS on any violation.
void verifyExportSchemaCastable(
const StorageMetadataPtr & source_metadata,
const StorageMetadataPtr & destination_metadata,
const StorageID & destination_storage_id,
const ContextPtr & context);

#if USE_AVRO
/// Verifies that the source MergeTree partition key is compatible with the
/// destination Iceberg partition spec by comparing field source-ids and
/// transforms in order. Throws BAD_ARGUMENTS if they do not match.
/// Verifies the source MergeTree partition key matches the destination Iceberg
/// partition spec (source-ids and transforms in order). Throws BAD_ARGUMENTS on
/// mismatch.
void verifyIcebergPartitionCompatibility(
const Poco::JSON::Object::Ptr & metadata_object,
const ASTPtr & partition_key_ast);
Expand Down
Loading
Loading