diff --git a/docs/en/antalya/part_export.md b/docs/en/antalya/part_export.md index 03f9f479991b..73d467c5d9b1 100644 --- a/docs/en/antalya/part_export.md +++ b/docs/en/antalya/part_export.md @@ -107,6 +107,16 @@ In case a table function is used as the destination, the schema can be omitted a - **Default**: `{part_name}_{checksum}` - **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. +### `export_merge_tree_part_allow_lossy_cast` (Optional) + +- **Type**: `Bool` +- **Default**: `false` +- **Description**: Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead. + + When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use. This keeps the Iceberg metadata consistent with the data files. + + **Warning:** A lossy cast on a partition column remains semantically truncating. For example, if a table is partitioned by an `Int64` column and some partition values do not fit into a destination `Int32` partition column, both the data files and the Iceberg metadata will contain the truncated `Int32` value (they agree with each other, but the original `Int64` value is lost). Such casts require `export_merge_tree_part_allow_lossy_cast = 1`. + ## Examples diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 975915859482..bb17a57f87cd 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -111,6 +111,16 @@ Notes: - Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation. - Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires. +### `export_merge_tree_part_allow_lossy_cast` (Optional) + +- **Type**: `Bool` +- **Default**: `false` +- **Description**: Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead. + + When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use. This keeps the Iceberg metadata consistent with the data files. + + **Warning:** A lossy cast on a partition column remains semantically truncating. For example, if a table is partitioned by an `Int64` column and some partition values do not fit into a destination `Int32` partition column, both the data files and the Iceberg metadata will contain the truncated `Int32` value (they agree with each other, but the original `Int64` value is lost). Such casts require `export_merge_tree_part_allow_lossy_cast = 1`. + ## Examples ### Basic Export to S3 diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ba022db0f83a..d905075c5b71 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7605,6 +7605,11 @@ Has no effect on `EXPORT PARTITION ` (single-partition export). )", 0) \ DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"( Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_allow_lossy_cast, false, R"( +Allow `EXPORT PART`/`EXPORT PARTITION` to apply lossy (non-value-preserving) casts when the source and destination column types differ. When disabled, an export that would require a lossy cast throws instead. + +When exporting to Apache Iceberg, the partition value written to the metadata is derived from the source partition columns by casting them to the destination partition-field types and applying the destination partition transform — the same computation the exported data files use, so the metadata stays consistent with the data. A lossy cast on a partition column remains semantically truncating: both the data files and the metadata contain the truncated value, and such casts require this setting to be enabled. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 50cc8c572561..3a72ec667415 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -43,6 +43,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"export_merge_tree_partition_task_timeout_seconds", "3600", "86400", "Increase default value to make it more realistic"}, + {"export_merge_tree_part_allow_lossy_cast", false, false, "New setting to gate lossy casts in EXPORT PART/PARTITION behind explicit acknowledgment"}, }); addSettingsChanges(settings_changes_history, "26.3", { diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index acfabc28ca61..db26836bdb2e 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -174,6 +174,7 @@ struct ExportReplicatedMergeTreePartitionManifest MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; String filename_pattern; bool write_full_path_in_iceberg_metadata = false; + bool allow_lossy_cast = false; String iceberg_metadata_json; std::string toJsonString() const @@ -208,6 +209,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("ttl_seconds", ttl_seconds); json.set("task_timeout_seconds", task_timeout_seconds); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); + json.set("allow_lossy_cast", allow_lossy_cast); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -262,6 +264,11 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.write_full_path_in_iceberg_metadata = json->getValue("write_full_path_in_iceberg_metadata"); + /// Default to true for tasks created before this field existed, so an in-flight + /// export scheduled with the old permissive worker behavior is not wrongly rejected + /// on upgrade. New tasks always persist the initiator's actual choice. + manifest.allow_lossy_cast = json->has("allow_lossy_cast") ? json->getValue("allow_lossy_cast") : true; + return manifest; } }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0a07b892cef8..93caf3f65ebe 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -504,11 +504,11 @@ It is currently only implemented in StorageObjectStorage. struct IcebergCommitExportPartitionArguments { std::string metadata_json_string; - /// Partition column values (after transforms). Callers are responsible for - /// populating this: the partition-export path parses them from the persisted - /// JSON string, while the direct EXPORT PART path reads them from the part's - /// partition key. - std::vector partition_values; + /// Representative source partition-key columns from one exported part (the part's + /// minmax block). The destination derives the Iceberg partition tuple from a row of + /// this block by casting to the destination column types and applying the partition + /// transform, so the metadata partition value matches the exported data files. + Block partition_source_block; }; virtual void commitExportPartitionTransaction( diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 94b25f63ad9f..e626e7364eaa 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -99,6 +99,31 @@ namespace } } + /// Mirrors `InterpreterInsertQuery::addInsertToSelectPipeline`: positional match, + /// destination header = `getSampleBlockNonMaterialized()`, all type bridging is done + /// by the CAST inside `makeConvertingActions`. No pre-validation, no per-column + /// lossy/non-lossy classification — restrictions are exactly what INSERT SELECT enforces. + void addExportConvertingActions( + QueryPlan & plan_for_part, + const IStorage & destination_storage, + const ContextPtr & local_context) + { + const auto destination_header + = destination_storage.getInMemoryMetadataPtr()->getSampleBlockNonMaterialized(); + + auto dag = ActionsDAG::makeConvertingActions( + plan_for_part.getCurrentHeader()->getColumnsWithTypeAndName(), + destination_header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Position, + local_context); + + auto expression_step = std::make_unique( + plan_for_part.getCurrentHeader(), + std::move(dag)); + expression_step->setStepDescription("Convert source columns to destination types for export"); + plan_for_part.addStep(std::move(expression_step)); + } + String buildDestinationFilename( const MergeTreePartExportManifest & manifest, const StorageID & storage_id, @@ -261,6 +286,10 @@ bool ExportPartTask::executeStep() /// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part); + /// Align the pipeline header with the destination's non-materialized sample block, + /// using the same `makeConvertingActions(Position)` call INSERT SELECT performs. + addExportConvertingActions(plan_for_part, *destination_storage, local_context); + QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); @@ -303,7 +332,7 @@ bool ExportPartTask::executeStep() { IStorage::IcebergCommitExportPartitionArguments iceberg_args; iceberg_args.metadata_json_string = manifest.iceberg_metadata_json; - iceberg_args.partition_values = manifest.data_part->partition.value; + iceberg_args.partition_source_block = block_with_partition_values; destination_storage->commitExportPartitionTransaction( manifest.transaction_id, diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 4a447c8de3ab..679e07d0b132 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -9,7 +9,12 @@ #include #include #include +#include +#include +#include +#include #include +#include #if USE_AVRO #include @@ -37,6 +42,11 @@ namespace ErrorCodes extern const int NETWORK_ERROR; } +namespace Setting +{ + extern const SettingsBool export_merge_tree_part_allow_lossy_cast; +} + namespace FailPoints { extern const char iceberg_export_after_commit_before_zk_completed[]; @@ -47,14 +57,13 @@ namespace fs = std::filesystem; namespace ExportPartitionUtils { - std::vector getPartitionValuesForIcebergCommit( + Block getPartitionSourceBlockForIcebergCommit( MergeTreeData & storage, const String & partition_id) { auto lock = storage.readLockParts(); const auto parts = storage.getDataPartsVectorInPartitionForInternalUsage( MergeTreeDataPartState::Active, partition_id, lock); - - /// todo arthur: bad arguments for now, pick a better one + if (parts.empty()) throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Cannot find active part for partition_id '{}' to derive Iceberg partition " @@ -62,7 +71,7 @@ namespace ExportPartitionUtils "or this replica has not yet received any part for this partition. " "The commit will be retried.", partition_id); - return parts.front()->partition.value; + return parts.front()->minmax_idx->getBlock(storage); } ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest) @@ -92,6 +101,12 @@ namespace ExportPartitionUtils /// stalls when the setting is only set at the query level. context_copy->setSetting("allow_insert_into_iceberg", true); + /// Reapply the initiator's lossy-cast decision (persisted in the manifest) so the + /// worker's schema revalidation honors the user's choice. Without this, a task + /// scheduled without the opt-in could still apply a lossy cast if the destination + /// schema drifts to a lossy target between scheduling and execution. + context_copy->setSetting("export_merge_tree_part_allow_lossy_cast", manifest.allow_lossy_cast); + return context_copy; } @@ -204,8 +219,8 @@ namespace ExportPartitionUtils { iceberg_args.metadata_json_string = manifest.iceberg_metadata_json; if (source_storage.getInMemoryMetadataPtr()->hasPartitionKey()) - iceberg_args.partition_values = - getPartitionValuesForIcebergCommit(source_storage, manifest.partition_id); + iceberg_args.partition_source_block = + getPartitionSourceBlockForIcebergCommit(source_storage, manifest.partition_id); } destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, iceberg_args, context); @@ -519,6 +534,50 @@ namespace ExportPartitionUtils } } #endif + + void verifyExportSchemaCastable( + const StorageMetadataPtr & source_metadata, + const StorageMetadataPtr & destination_metadata, + const StorageID & destination_storage_id, + const ContextPtr & context) + { + /// Build (and discard) the same converting DAG the export worker will build + /// later, to surface structural mismatches (column count, untyped casts) early. + Block source_sample_block; + for (const auto & column : source_metadata->getColumns().getReadable()) + source_sample_block.insert({column.type->createColumn(), column.type, column.name}); + + const auto destination_sample_block = destination_metadata->getSampleBlockNonMaterialized(); + + const auto source_columns = source_sample_block.getColumnsWithTypeAndName(); + const auto destination_columns = destination_sample_block.getColumnsWithTypeAndName(); + + (void) ActionsDAG::makeConvertingActions( + source_columns, + destination_columns, + ActionsDAG::MatchColumnsMode::Position, + context); + + /// Lossy casts may silently change values, so reject them unless the user opts in. + if (context->getSettingsRef()[Setting::export_merge_tree_part_allow_lossy_cast]) + return; + + const size_t num_columns = std::min(source_columns.size(), destination_columns.size()); + for (size_t i = 0; i < num_columns; ++i) + { + const auto & source_column = source_columns[i]; + const auto & destination_column = destination_columns[i]; + if (!canBeSafelyCast(source_column.type, destination_column.type)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Cannot export to {}: column '{}' requires a lossy cast from {} to {}, " + "which may change values. Set `export_merge_tree_part_allow_lossy_cast = 1` " + "to allow lossy casts during export.", + destination_storage_id.getFullTableName(), + destination_column.name, + source_column.type->getName(), + destination_column.type->getName()); + } + } } } diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index eb67d288d71e..0434bc59a2cb 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -7,6 +7,7 @@ #include #include #include "Storages/IStorage.h" +#include #include #if USE_AVRO @@ -26,14 +27,15 @@ namespace ExportPartitionUtils ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest); - /// Returns the partition key values for the given partition_id by reading from - /// the first active local part. Throws LOGICAL_ERROR if no such part is found. + /// Returns the representative source partition-key columns (the first active local part's + /// minmax block) for the given partition_id. The destination recomputes the Iceberg partition + /// tuple from this block by casting to its column types and applying the partition transform. /// /// Edge case: if the partition was dropped after export started, or this replica /// has not yet received any part for this partition (extreme replication lag on a /// recovery path), no active part will be found and the commit will fail. The task /// will be retried on the next poll cycle or picked up by a different replica. - std::vector getPartitionValuesForIcebergCommit( + Block getPartitionSourceBlockForIcebergCommit( MergeTreeData & storage, const String & partition_id); void commit( @@ -88,10 +90,20 @@ namespace ExportPartitionUtils const std::string & exception_message, const LoggerPtr & log); + /// Validates that source columns can be exported into the destination with the + /// same positional CAST matching as `INSERT INTO dest SELECT * FROM src`. Lossy + /// casts are rejected unless `export_merge_tree_part_allow_lossy_cast` is set. + /// Throws BAD_ARGUMENTS on any violation. + void verifyExportSchemaCastable( + const StorageMetadataPtr & source_metadata, + const StorageMetadataPtr & destination_metadata, + const StorageID & destination_storage_id, + const ContextPtr & context); + #if USE_AVRO - /// Verifies that the source MergeTree partition key is compatible with the - /// destination Iceberg partition spec by comparing field source-ids and - /// transforms in order. Throws BAD_ARGUMENTS if they do not match. + /// Verifies the source MergeTree partition key matches the destination Iceberg + /// partition spec (source-ids and transforms in order). Throws BAD_ARGUMENTS on + /// mismatch. void verifyIcebergPartitionCompatibility( const Poco::JSON::Object::Ptr & metadata_object, const ASTPtr & partition_key_ast); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a07bd992066e..937c675fab2f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -6742,7 +6743,9 @@ void MergeTreeData::exportPartToTable( metadata_object->stringify(oss); iceberg_metadata_json = oss.str(); - ExportPartitionUtils::verifyIcebergPartitionCompatibility(metadata_object, source_metadata_ptr->getPartitionKeyAST()); + ExportPartitionUtils::verifyIcebergPartitionCompatibility( + metadata_object, + source_metadata_ptr->getPartitionKeyAST()); } #else (void)iceberg_metadata_json_; @@ -6750,17 +6753,12 @@ void MergeTreeData::exportPartToTable( #endif } - const auto & source_columns = source_metadata_ptr->getColumns(); + /// Positional CAST matching, like `INSERT INTO dest SELECT * FROM src`. + ExportPartitionUtils::verifyExportSchemaCastable( + source_metadata_ptr, destination_metadata_ptr, dest_storage->getStorageID(), query_context); - const auto & destination_columns = destination_metadata_ptr->getColumns(); - - /// compare all source readable columns with all destination insertable columns - /// this allows us to skip ephemeral columns - if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable())) - throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); - - /// for data lakes this check is performed differently. It is a bit more complex as we need to convert the iceberg partition spec - /// to the MergeTree partition spec and compare the two. + /// Iceberg partition compatibility is checked above; here we only need the + /// partition-key ASTs to match (partition-column types follow the lossy-cast gate). if (!dest_storage->isDataLake()) { if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST())) diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 85c91ba5de19..a90c49b35210 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -226,7 +226,7 @@ class IDataLakeMetadata : boost::noncopyable const String & /* transaction_id */, Int64 /* original_schema_id */, Int64 /* partition_spec_id */, - const std::vector & /* partition_values */, + const Block & /* partition_source_block */, SharedHeader /* sample_block */, const std::vector & /* data_file_paths */, ContextPtr /* context */) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 6016be1cd341..9819a8783516 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -1524,6 +1525,57 @@ Poco::JSON::Object::Ptr lookupSchema(const Poco::JSON::Object::Ptr & meta, Int64 "Schema with id {} not found in table metadata", schema_id); } +/// Derive the Iceberg partition tuple for an exported part from a representative source row. +/// The MergeTree `partition.value` is the source partition-key expression result; it is neither +/// cast to the destination column types nor expressed through the Iceberg transform, so it must +/// not be written to metadata directly. Within a MergeTree partition the transform result is +/// constant, so a single representative value per partition-source column (taken from the part's +/// minmax block) suffices: cast it to the destination column type and run the same transform the +/// data uses. The result is transform-correct and consistent with the exported data files. +std::vector recomputeExportPartitionValues( + ChunkPartitioner & partitioner, + const SharedHeader & sample_block, + const Block & partition_source_block) +{ + const auto & partition_columns = partitioner.getColumns(); + if (partition_columns.empty()) + return {}; + + for (const auto & column_name : partition_columns) + if (!partition_source_block.has(column_name)) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Partition source column '{}' required by the Iceberg partition transform is missing " + "from the representative source block while committing an export.", column_name); + + Columns columns; + columns.reserve(sample_block->columns()); + for (size_t i = 0; i < sample_block->columns(); ++i) + { + const auto & dest_column = sample_block->getByPosition(i); + if (partition_source_block.has(dest_column.name)) + { + const auto & source = partition_source_block.getByName(dest_column.name); + ColumnWithTypeAndName representative{source.column->cut(0, 1), source.type, source.name}; + columns.push_back(castColumn(representative, dest_column.type)); + } + else + { + auto column = dest_column.type->createColumn(); + column->insertDefault(); + columns.push_back(std::move(column)); + } + } + + auto partitioned = partitioner.partitionChunk(Chunk(std::move(columns), 1)); + if (partitioned.size() != 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Recomputing Iceberg partition values produced {} partitions for a single representative row; " + "a MergeTree partition must map to exactly one Iceberg partition.", partitioned.size()); + + const auto & key = partitioned.front().first; + return std::vector(key.begin(), key.end()); +} + } bool IcebergMetadata::commitImportPartitionTransactionImpl( @@ -1826,7 +1878,7 @@ void IcebergMetadata::commitExportPartitionTransaction( const String & transaction_id, Int64 original_schema_id, Int64 partition_spec_id, - const std::vector & partition_values, + const Block & partition_source_block, SharedHeader sample_block, const std::vector & data_file_paths, ContextPtr context) @@ -1895,6 +1947,10 @@ void IcebergMetadata::commitExportPartitionTransaction( const auto partition_columns = partitioner.getColumns(); const auto partition_types = partitioner.getResultTypes(); + /// Recompute the partition tuple via the destination transform so the metadata partition + /// value matches the exported data (rather than the raw source MergeTree partition value). + const auto partition_values = recomputeExportPartitionValues(partitioner, sample_block, partition_source_block); + const auto metadata_compression_method = persistent_components.metadata_compression_method; FileNamesGenerator filename_generator = FileNamesGenerator( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 2e28df93e0e6..2ec8efe4a9b7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -141,7 +141,7 @@ class IcebergMetadata : public IDataLakeMetadata const String & transaction_id, Int64 original_schema_id, Int64 partition_spec_id, - const std::vector & partition_values, + const Block & partition_source_block, SharedHeader sample_block, const std::vector & data_file_paths, ContextPtr context) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index c08db678c46c..50f8c373767d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -680,7 +680,8 @@ void StorageObjectStorage::commitExportPartitionTransaction( /// Parse the Iceberg metadata snapshot (stored in ZooKeeper at export-start time) only to /// extract the schema-id and partition-spec-id that were current when the export began. /// partition_columns and partition_types are derived inside commitExportPartitionTransaction - /// from the same JSON, so only partition_values need to be carried here. + /// from the same JSON; the representative source partition columns are carried here so the + /// partition tuple can be recomputed through the destination transform. Poco::JSON::Parser iceberg_parser; Poco::JSON::Object::Ptr iceberg_metadata = iceberg_parser.parse(iceberg_commit_export_partition_arguments.metadata_json_string).extract(); @@ -695,7 +696,7 @@ void StorageObjectStorage::commitExportPartitionTransaction( transaction_id, original_schema_id, partition_spec_id, - iceberg_commit_export_partition_arguments.partition_values, + iceberg_commit_export_partition_arguments.partition_source_block, std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), exported_paths, local_context); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7aaecac34832..10eee4e5db95 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -79,6 +79,7 @@ #include #include #include +#include #include #include @@ -224,6 +225,7 @@ namespace Setting extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; + extern const SettingsBool export_merge_tree_part_allow_lossy_cast; extern const SettingsExportPartitionAllOnError export_merge_tree_partition_all_on_error; extern const SettingsString export_merge_tree_part_filename_pattern; extern const SettingsBool write_full_path_in_iceberg_metadata; @@ -8387,19 +8389,17 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & auto src_snapshot = getInMemoryMetadataPtr(); auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); - /// compare all source readable columns with all destination insertable columns - /// this allows us to skip ephemeral columns - if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable())) - throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + /// Positional CAST matching, like `INSERT INTO dest SELECT * FROM src`. + ExportPartitionUtils::verifyExportSchemaCastable( + src_snapshot, destination_snapshot, dest_storage->getStorageID(), query_context); - /// for data lakes this check is performed later. It is a bit more complex as we need to convert the iceberg partition spec - /// to the MergeTree partition spec and compare the two. + /// Iceberg partition compatibility is checked below; here we only need the + /// partition-key ASTs to match (partition-column types follow the lossy-cast gate). if (!dest_storage->isDataLake()) { if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); } - zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly(); @@ -8538,6 +8538,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; manifest.write_full_path_in_iceberg_metadata = query_context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]; + manifest.allow_lossy_cast = query_context->getSettingsRef()[Setting::export_merge_tree_part_allow_lossy_cast]; if (dest_storage->isDataLake()) { @@ -8571,7 +8572,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & const auto metadata_object = iceberg_metadata->getMetadataJSON(query_context); ExportPartitionUtils::verifyIcebergPartitionCompatibility( - metadata_object, src_snapshot->getPartitionKeyAST()); + metadata_object, + src_snapshot->getPartitionKeyAST()); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); diff --git a/tests/integration/test_export_merge_tree_part_to_iceberg/test.py b/tests/integration/test_export_merge_tree_part_to_iceberg/test.py index b371727a08cd..486adf1f2b17 100644 --- a/tests/integration/test_export_merge_tree_part_to_iceberg/test.py +++ b/tests/integration/test_export_merge_tree_part_to_iceberg/test.py @@ -69,11 +69,15 @@ def get_part(node, table: str, partition_id: str) -> str: ).strip() -def export_part(node, table: str, part: str, dest: str) -> None: +def export_part(node, table: str, part: str, dest: str, extra_settings: str = "") -> None: + settings = ( + "allow_experimental_export_merge_tree_part = 1, " + "allow_experimental_insert_into_iceberg = 1" + ) + if extra_settings: + settings += ", " + extra_settings node.query( - f"ALTER TABLE {table} EXPORT PART '{part}' TO TABLE {dest} " - f"SETTINGS allow_experimental_export_merge_tree_part = 1, " - f"allow_experimental_insert_into_iceberg = 1" + f"ALTER TABLE {table} EXPORT PART '{part}' TO TABLE {dest} SETTINGS {settings}" ) @@ -121,6 +125,42 @@ def assert_part_log(node, table: str, part: str) -> None: ) +def wait_for_failed_export_part( + node, + table: str, + part: str, + timeout: int = 60, + poll_interval: float = 0.5, +) -> str: + """Poll system.part_log until a failed ExportPart event appears for *part*. + + Returns the exception text recorded on the log entry, which lets callers + assert on the specific runtime error that propagated from the export worker. + """ + deadline = time.time() + timeout + last_seen = "" + while time.time() < deadline: + node.query("SYSTEM FLUSH LOGS") + row = node.query( + f"SELECT error, exception FROM system.part_log " + f"WHERE event_type = 'ExportPart' " + f"AND database = currentDatabase() " + f"AND table = '{table}' " + f"AND part_name = '{part}' " + f"AND error != 0 " + f"ORDER BY event_time DESC LIMIT 1" + ).strip() + if row: + _error, exception = row.split("\t", 1) + return exception + last_seen = row + time.sleep(poll_interval) + raise TimeoutError( + f"Failed ExportPart event for part {part!r} in table {table!r} " + f"did not appear in system.part_log within {timeout}s (last row: {last_seen!r})" + ) + + # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- @@ -349,6 +389,38 @@ def test_export_part_with_year_transform_partition(cluster): node.query(f"DROP TABLE IF EXISTS {iceberg}") +def test_export_part_partition_column_lossless_widening(cluster): + """A lossless widening of a partition column (year Int32 -> Int64) round-trips.""" + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_pcol_widening_{sfx}" + iceberg = f"iceberg_pcol_widening_{sfx}" + + make_mt(node, mt, "id Int32, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int64", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020), (3, 2020)") + + part_2020 = get_part(node, mt, "2020") + export_part(node, mt, part_2020, iceberg) + wait_for_export_part(node, mt, part_2020) + + count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) + assert count == 3, f"Expected 3 rows in Iceberg table after export, got {count}" + + result = node.query( + f"SELECT id, toTypeName(year), year FROM {iceberg} ORDER BY id" + ).strip() + assert result == "1\tInt64\t2020\n2\tInt64\t2020\n3\tInt64\t2020", ( + f"Unexpected widened partition-column data:\n{result}" + ) + + assert_part_log(node, mt, part_2020) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") + + def test_export_part_partition_key_mismatch_is_rejected(cluster): """ EXPORT PART must synchronously reject (BAD_ARGUMENTS) when the source @@ -479,3 +551,205 @@ def test_export_part_writes_column_statistics(cluster): node.query(f"DROP TABLE IF EXISTS {mt} SYNC") node.query(f"DROP TABLE IF EXISTS {iceberg}") + + +def test_export_part_column_count_mismatch_source_more_is_rejected(cluster): + """ + Source has 3 columns (id, year, extra), destination has 2 (id, year). + The ALTER must be rejected synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH + and the Iceberg table must remain empty. + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_count_more_{sfx}" + iceberg = f"iceberg_count_more_{sfx}" + + make_mt(node, mt, "id Int32, year Int32, extra String", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020, 'foo'), (2, 2020, 'bar')") + part_2020 = get_part(node, mt, "2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} " + f"SETTINGS allow_experimental_export_merge_tree_part = 1, " + f"allow_experimental_insert_into_iceberg = 1" + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source>dest column count, " + f"got: {error!r}" + ) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") + + +def test_export_part_column_count_mismatch_source_fewer_is_rejected(cluster): + """ + Source has 2 columns (id, year), destination has 3 (id, year, extra). + Same expected synchronous rejection as the source>dest case. + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_count_fewer_{sfx}" + iceberg = f"iceberg_count_fewer_{sfx}" + + make_mt(node, mt, "id Int32, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32, extra String", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)") + part_2020 = get_part(node, mt, "2020") + + error = node.query_and_get_error( + f"ALTER TABLE {mt} EXPORT PART '{part_2020}' TO TABLE {iceberg} " + f"SETTINGS allow_experimental_export_merge_tree_part = 1, " + f"allow_experimental_insert_into_iceberg = 1" + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source Int32) succeeds once the user opts in via + export_merge_tree_part_allow_lossy_cast.""" + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_narrow_fit_{sfx}" + iceberg = f"iceberg_narrow_fit_{sfx}" + + make_mt(node, mt, "id Int64, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year") + + node.query(f"INSERT INTO {mt} VALUES (1, 2020), (2, 2020)") + part_2020 = get_part(node, mt, "2020") + + export_part(node, mt, part_2020, iceberg, "export_merge_tree_part_allow_lossy_cast = 1") + wait_for_export_part(node, mt, part_2020) + + count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) + assert count == 2, f"Expected 2 rows in Iceberg table after export, got {count}" + + result = node.query( + f"SELECT id, toTypeName(id), year FROM {iceberg} ORDER BY id" + ).strip() + assert result == "1\tInt32\t2020\n2\tInt32\t2020", ( + f"Unexpected narrowed data:\n{result}" + ) + + assert_part_log(node, mt, part_2020) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") + + +def test_export_part_runtime_cast_failure_propagates_async(cluster): + """A String value that cannot be parsed as the destination Int32 passes the + synchronous lossy-cast gate (with export_merge_tree_part_allow_lossy_cast = 1) but + fails at runtime in the async worker; the failure surfaces in system.part_log and + Iceberg is left empty. + + (Integer overflow is not used because the internal cast uses CastType::nonAccurate, + which wraps rather than throwing.) + """ + node = cluster.instances["node1"] + sfx = unique_suffix() + mt = f"mt_runtime_cast_fail_{sfx}" + iceberg = f"iceberg_runtime_cast_fail_{sfx}" + + make_mt(node, mt, "id String, year Int32", "year") + make_iceberg_s3(node, iceberg, "id Int32, year Int32", "year") + + node.query(f"INSERT INTO {mt} VALUES ('not a number', 2020)") + part_2020 = get_part(node, mt, "2020") + + export_part(node, mt, part_2020, iceberg, "export_merge_tree_part_allow_lossy_cast = 1") + + exception = wait_for_failed_export_part(node, mt, part_2020) + assert exception, ( + f"Expected non-empty exception text on failed ExportPart entry, got {exception!r}" + ) + + count = int(node.query(f"SELECT count() FROM {iceberg}").strip()) + assert count == 0, ( + f"Expected 0 rows in Iceberg table after failed export, got {count}" + ) + + node.query(f"DROP TABLE IF EXISTS {mt} SYNC") + node.query(f"DROP TABLE IF EXISTS {iceberg}") diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index 8b49589a3005..00cf5b5ff6f2 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -984,3 +984,661 @@ def test_export_partition_writes_column_statistics(cluster): entries = fetch_manifest_entries(node, query_id) assert_exported_stats(entries) + + +def test_export_partition_column_count_mismatch_source_more_is_rejected(cluster): + """ + Source has 3 columns (id, year, extra), destination has 2 (id, year). + The ALTER must be rejected synchronously with NUMBER_OF_COLUMNS_DOESNT_MATCH, + nothing must be scheduled in system.replicated_partition_exports, and the + Iceberg table must remain empty. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_count_more_{uid}" + iceberg_table = f"iceberg_count_more_{uid}" + + make_rmt(node, mt_table, "id Int64, year Int32, extra String", "year", + replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020, 'foo'), (2, 2020, 'bar')") + + make_iceberg_s3(node, iceberg_table, "id Int64, year Int32", partition_by="year") + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source>dest column count, " + f"got: {error!r}" + ) + + rows_in_system_view = node.query( + f"SELECT count() FROM system.replicated_partition_exports " + f"WHERE source_table = '{mt_table}' " + f" AND destination_table = '{iceberg_table}' " + f" AND partition_id = '2020'" + ).strip() + assert rows_in_system_view == "0", ( + f"Expected no row in system.replicated_partition_exports after a " + f"synchronously-rejected export, got {rows_in_system_view}." + ) + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, ( + f"Expected 0 rows in Iceberg table after rejected export, got {count}" + ) + + +def test_export_partition_column_count_mismatch_source_fewer_is_rejected(cluster): + """ + Source has 2 columns (id, year), destination has 3 (id, year, extra). + Same expected synchronous rejection as the source>dest case. + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_count_fewer_{uid}" + iceberg_table = f"iceberg_count_fewer_{uid}" + + make_rmt(node, mt_table, "id Int64, year Int32", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020)") + + make_iceberg_s3(node, iceberg_table, "id Int64, year Int32, extra String", + partition_by="year") + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + assert "NUMBER_OF_COLUMNS_DOESNT_MATCH" in error, ( + f"Expected NUMBER_OF_COLUMNS_DOESNT_MATCH for source Int64) and the + partition column (year Int32 -> Int64) round-trips.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_widen_{uid}" + iceberg_table = f"iceberg_widen_{uid}" + + make_rmt(node, mt_table, "id Int32, year Int32", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020)") + + make_iceberg_s3(node, iceberg_table, "id Int64, year Int64", partition_by="year") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 2, f"Expected 2 rows in Iceberg table after export, got {count}" + + result = node.query( + f"SELECT id, toTypeName(id), year, toTypeName(year) FROM {iceberg_table} ORDER BY id" + ).strip() + assert result == "1\tInt64\t2020\tInt64\n2\tInt64\t2020\tInt64", ( + f"Unexpected widened data:\n{result}" + ) + + +def test_export_partition_with_castable_narrowing_values_fit(cluster): + """A lossy narrowing (id Int64 -> Int32) succeeds once the user opts in via + export_merge_tree_part_allow_lossy_cast.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_narrow_fit_{uid}" + iceberg_table = f"iceberg_narrow_fit_{uid}" + + make_rmt(node, mt_table, "id Int64, year Int32", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020)") + + make_iceberg_s3(node, iceberg_table, "id Int32, year Int32", partition_by="year") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table}", + settings={ + "allow_insert_into_iceberg": 1, + "export_merge_tree_part_allow_lossy_cast": 1, + }, + ) + wait_for_export_status(node, mt_table, iceberg_table, "2020", "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 2, f"Expected 2 rows in Iceberg table after export, got {count}" + + result = node.query( + f"SELECT id, toTypeName(id), year FROM {iceberg_table} ORDER BY id" + ).strip() + assert result == "1\tInt32\t2020\n2\tInt32\t2020", ( + f"Unexpected narrowed data:\n{result}" + ) + + +def test_export_partition_lossy_cast_rejected_without_optin(cluster): + """A lossy narrowing (id Int64 -> Int32) is rejected synchronously with + BAD_ARGUMENTS unless export_merge_tree_part_allow_lossy_cast is set.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_lossy_reject_{uid}" + iceberg_table = f"iceberg_lossy_reject_{uid}" + + make_rmt(node, mt_table, "id Int64, year Int32", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020)") + + make_iceberg_s3(node, iceberg_table, "id Int32, year Int32", partition_by="year") + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} " + f"SETTINGS allow_insert_into_iceberg = 1" + ) + assert "BAD_ARGUMENTS" in error, f"Expected BAD_ARGUMENTS, got: {error!r}" + assert "lossy cast" in error, f"Expected 'lossy cast' in error, got: {error!r}" + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, f"Expected no rows after a rejected export, got {count}" + + +def test_export_partition_runtime_cast_failure_propagates_async(cluster): + """A String value that cannot be parsed as the destination Int32 passes the + synchronous lossy-cast gate (with export_merge_tree_part_allow_lossy_cast = 1) but + fails at runtime in the async worker, marking the export FAILED and leaving Iceberg + empty. + + (Integer overflow is not used because the internal cast uses CastType::nonAccurate, + which wraps rather than throwing.) + """ + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_runtime_cast_fail_{uid}" + iceberg_table = f"iceberg_runtime_cast_fail_{uid}" + + make_rmt(node, mt_table, "id String, year Int32", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES ('not a number', 2020)") + + make_iceberg_s3(node, iceberg_table, "id Int32, year Int32", partition_by="year") + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {iceberg_table} " + f"SETTINGS export_merge_tree_partition_max_retries = 1, allow_insert_into_iceberg = 1, " + f"export_merge_tree_part_allow_lossy_cast = 1" + ) + + wait_for_export_status(node, mt_table, iceberg_table, "2020", "FAILED", timeout=60) + + exception_count = int(node.query( + f"SELECT any(exception_count) FROM system.replicated_partition_exports " + f"WHERE source_table = '{mt_table}' " + f" AND destination_table = '{iceberg_table}' " + f" AND partition_id = '2020'" + ).strip()) + assert exception_count > 0, ( + "Expected non-zero exception_count after a failed runtime cast" + ) + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 0, ( + f"Expected 0 rows in Iceberg table after failed export, got {count}" + ) + + +def test_export_partition_all_iceberg_types(cluster): + """Every getIcebergType-supported type round-trips through an EXPORT PARTITION: + scalars use narrower source types (explicit lossless widening CASTs), plus + Array/Map/Tuple nested columns.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_all_types_{uid}" + iceberg_table = f"iceberg_all_types_{uid}" + + # Scalar source types are strictly narrower than the destination; the export inserts + # a positional widening CAST per column (Int8->Int16, UInt32->UInt64, ...). Nested + # columns keep the same type on both sides. + source_columns = ( + "i16 Int8, u16 UInt8, u32 UInt16, u64 UInt32, " + "id Int16, big Int32, f32 Float32, f64 Float64, " + "d Date, d32 Date32, dt DateTime, dt64 DateTime64(6), " + "s String, uid UUID, " + "arr Array(Int32), m Map(String, Int64), tup Tuple(a Int32, b String), " + "year Int32" + ) + dest_columns = ( + "i16 Int16, u16 UInt16, u32 UInt32, u64 UInt64, " + "id Int32, big Int64, f32 Float32, f64 Float64, " + "d Date, d32 Date32, dt DateTime, dt64 DateTime64(6), " + "s String, uid UUID, " + "arr Array(Int32), m Map(String, Int64), tup Tuple(a Int32, b String), " + "year Int32" + ) + + make_rmt(node, mt_table, source_columns, "year", replica_name="replica1") + make_iceberg_s3(node, iceberg_table, dest_columns, partition_by="year") + + node.query( + f""" + INSERT INTO {mt_table} + (i16, u16, u32, u64, id, big, f32, f64, d, d32, dt, dt64, s, uid, arr, m, tup, year) + VALUES ( + -100, 200, 50000, 4000000000, + 12345, 1000000000, 3.14, 2.718281828459045, + '2024-01-15', '2024-01-15', '2024-01-15 12:30:45', '2024-01-15 12:30:45.123456', + 'hello iceberg', '550e8400-e29b-41d4-a716-446655440000', + [1, 2, 3], {{'a': 10, 'b': 20}}, (7, 'seven'), 2024 + ) + """ + ) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2024' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + wait_for_export_status(node, mt_table, iceberg_table, "2024", "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 1, f"Expected 1 row in Iceberg table, got {count}" + + result = node.query( + f""" + SELECT + i16, u16, u32, u64, id, big, + toString(d), toString(d32), toString(dt), + s, toString(uid), + arr, m['a'], m['b'], tup.a, tup.b, year + FROM {iceberg_table} + """ + ).strip() + expected = "\t".join([ + "-100", "200", "50000", "4000000000", + "12345", "1000000000", + "2024-01-15", "2024-01-15", "2024-01-15 12:30:45.000000", + "hello iceberg", "550e8400-e29b-41d4-a716-446655440000", + "[1,2,3]", "10", "20", "7", "seven", "2024", + ]) + assert result == expected, f"Unexpected round-trip data:\n{result!r}\nexpected:\n{expected!r}" + + # Floats compared with a tolerance to avoid formatting flakiness. + floats_ok = node.query( + f"SELECT abs(f32 - 3.14) < 1e-4 AND abs(f64 - 2.718281828459045) < 1e-12 FROM {iceberg_table}" + ).strip() + assert floats_ok == "1", f"Float round-trip outside tolerance: {floats_ok!r}" + + # DateTime64 sub-second component: assert the date part is preserved (exact format varies). + ts_result = node.query(f"SELECT dt64 FROM {iceberg_table}").strip() + assert "2024-01-15" in ts_result, f"DateTime64 date component missing: {ts_result!r}" + + +def test_export_partition_all_iceberg_types_lossy(cluster): + """Lossy narrowing casts across types succeed with the opt-in flag: values that + fit round-trip, Float64 -> Float32 loses precision, and Nullable columns carry + both NULL and non-NULL (the latter via a lossy Nullable(Int64) -> Nullable(Int32)).""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_lossy_types_{uid}" + iceberg_table = f"iceberg_lossy_types_{uid}" + + # Each source column is wider than the destination, so the export inserts a lossy + # narrowing CAST (allowed only because export_merge_tree_part_allow_lossy_cast=1). + # Int8/UInt8 are not Iceberg-representable, so the narrowest integer dest is Int16. + source_columns = ( + "big Int64, ubig UInt64, mid Int32, " + "f Float64, dt DateTime64(6), d Date32, " + "opt_s Nullable(String), opt_i Nullable(Int64), year Int32" + ) + dest_columns = ( + "big Int32, ubig UInt32, mid Int16, " + "f Float32, dt DateTime, d Date, " + "opt_s Nullable(String), opt_i Nullable(Int32), year Int32" + ) + + make_rmt(node, mt_table, source_columns, "year", replica_name="replica1") + make_iceberg_s3(node, iceberg_table, dest_columns, partition_by="year") + + # Values chosen to fit the destination types (the async cast wraps on overflow + # rather than throwing, so out-of-range values would silently corrupt instead). + # opt_s is NULL and opt_i is set, covering both nullable paths in one row. + node.query( + f""" + INSERT INTO {mt_table} (big, ubig, mid, f, dt, d, opt_s, opt_i, year) + VALUES ( + 1000000, 2000000000, 30000, + 2.718281828459045, '2024-01-15 12:30:45.123456', '2024-01-15', + NULL, 100, 2024 + ) + """ + ) + + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2024' TO TABLE {iceberg_table}", + settings={ + "allow_insert_into_iceberg": 1, + "export_merge_tree_part_allow_lossy_cast": 1, + }, + ) + wait_for_export_status(node, mt_table, iceberg_table, "2024", "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 1, f"Expected 1 row in Iceberg table, got {count}" + + result = node.query( + f"SELECT big, ubig, mid, toString(d), toString(dt), opt_s, opt_i, year FROM {iceberg_table}" + ).strip() + expected = "\t".join([ + "1000000", "2000000000", "30000", + "2024-01-15", "2024-01-15 12:30:45.000000", "\\N", "100", "2024", + ]) + assert result == expected, f"Unexpected lossy round-trip data:\n{result!r}\nexpected:\n{expected!r}" + + # Float64 -> Float32 stays within Float32 precision but is no longer exact. + f_checks = node.query( + f"SELECT abs(f - 2.718281828459045) < 1e-6, abs(f - 2.718281828459045) > 1e-9 FROM {iceberg_table}" + ).strip() + assert f_checks == "1\t1", f"Expected Float32 precision loss within tolerance, got: {f_checks!r}" + + +def _data_file_partition_records(entries): + """Partition dicts of the non-delete data files described by manifest entries.""" + records = [] + for entry in entries: + data_file = entry.get("data_file") or {} + if data_file.get("content", 0) not in (0, None): + continue + partition = data_file.get("partition") + if partition is not None: + records.append(partition) + return records + + +def _partition_scalar(partition, field): + """Read a partition field value, tolerating an Avro-union ``{type: value}`` wrapper.""" + value = partition.get(field) + if isinstance(value, dict): + assert len(value) == 1, f"Unexpected partition union shape for {field!r}: {value!r}" + value = next(iter(value.values())) + return value + + +def test_export_partition_bucket_transform_metadata_matches_data(cluster): + """A bucket[N] partition column whose type changes Int64 -> String records the + destination murmur(String) bucket in the Iceberg metadata, matching the exported + data rather than the source hashLong bucket.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_bucket_xform_{uid}" + iceberg_table = f"iceberg_bucket_xform_{uid}" + + # N=16, key=42 diverges: icebergBucket(16, 42::Int64)=14 (source/old hashLong) but + # icebergBucket(16, '42')=6 (destination/new murmur over the exported String). + make_rmt(node, mt_table, "id Int64, key Int64", "icebergBucket(16, key)", + replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 42), (2, 42)") + + make_iceberg_s3(node, iceberg_table, "id Int64, key String", + partition_by="icebergBucket(16, key)") + + pid = first_partition_id(node, mt_table) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + wait_for_export_status(node, mt_table, iceberg_table, pid, "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 2, f"Expected 2 rows after export, got {count}" + + string_bucket = int(node.query( + f"SELECT DISTINCT icebergBucket(16, key) FROM {iceberg_table}" + ).strip()) + long_bucket = int(node.query( + f"SELECT DISTINCT icebergBucket(16, toInt64(key)) FROM {iceberg_table}" + ).strip()) + assert string_bucket != long_bucket, ( + f"Test setup invalid: String and Int64 buckets coincide ({string_bucket}); " + f"pick a different N/key so the transform diverges." + ) + + query_id = f"bucket_xform_{uid}" + node.query( + f"SELECT * FROM {iceberg_table}", + query_id=query_id, + settings={"iceberg_metadata_log_level": "manifest_file_entry"}, + ) + entries = fetch_manifest_entries(node, query_id) + partitions = _data_file_partition_records(entries) + assert partitions, "No data-file partition records found in manifest entries" + meta_values = {int(_partition_scalar(p, "key")) for p in partitions} + assert meta_values == {string_bucket}, ( + f"Metadata bucket {meta_values} must equal the destination String bucket " + f"{string_bucket} (not the source Int64 bucket {long_bucket})." + ) + + +def test_export_partition_month_transform_metadata_matches_data(cluster): + """A month-transform partition records a months-since-epoch value in metadata that + matches the value derived from the exported data, and a transform-filtered read + returns the rows.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_month_xform_{uid}" + iceberg_table = f"iceberg_month_xform_{uid}" + + make_rmt(node, mt_table, "id Int64, event_date Date", + "toMonthNumSinceEpoch(event_date)", replica_name="replica1") + node.query( + f"INSERT INTO {mt_table} VALUES " + f"(1, '2024-03-05'), (2, '2024-03-20'), (3, '2024-03-31')" + ) + + make_iceberg_s3(node, iceberg_table, "id Int64, event_date Date", + partition_by="toMonthNumSinceEpoch(event_date)") + + pid = first_partition_id(node, mt_table) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + wait_for_export_status(node, mt_table, iceberg_table, pid, "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 3, f"Expected 3 rows after export, got {count}" + + month_num = int(node.query( + f"SELECT DISTINCT toMonthNumSinceEpoch(event_date) FROM {iceberg_table}" + ).strip()) + + query_id = f"month_xform_{uid}" + node.query( + f"SELECT * FROM {iceberg_table}", + query_id=query_id, + settings={"iceberg_metadata_log_level": "manifest_file_entry"}, + ) + entries = fetch_manifest_entries(node, query_id) + partitions = _data_file_partition_records(entries) + assert partitions, "No data-file partition records found in manifest entries" + meta_values = {int(_partition_scalar(p, "event_date")) for p in partitions} + assert meta_values == {month_num}, ( + f"Metadata month {meta_values} must equal toMonthNumSinceEpoch over the data " + f"({month_num})." + ) + + filtered = int(node.query( + f"SELECT count() FROM {iceberg_table} " + f"WHERE toMonthNumSinceEpoch(event_date) = {month_num}" + ).strip()) + assert filtered == 3, f"Transform-filtered read expected 3 rows, got {filtered}" + + +def test_export_partition_identity_type_change_metadata_matches_data(cluster): + """An identity partition column whose type changes UInt16 -> String records the + destination String value in the Iceberg metadata, matching the exported data.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_identity_xform_{uid}" + iceberg_table = f"iceberg_identity_xform_{uid}" + + make_rmt(node, mt_table, "id Int32, year UInt16", "year", replica_name="replica1") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2024), (2, 2024)") + + make_iceberg_s3(node, iceberg_table, "id Int32, year String", partition_by="year") + + pid = first_partition_id(node, mt_table) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table}", + settings={"allow_insert_into_iceberg": 1}, + ) + wait_for_export_status(node, mt_table, iceberg_table, pid, "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 2, f"Expected 2 rows after export, got {count}" + + data_year = node.query(f"SELECT DISTINCT year FROM {iceberg_table}").strip() + assert data_year == "2024", f"Expected exported year '2024' (String), got {data_year!r}" + + query_id = f"identity_xform_{uid}" + node.query( + f"SELECT * FROM {iceberg_table}", + query_id=query_id, + settings={"iceberg_metadata_log_level": "manifest_file_entry"}, + ) + entries = fetch_manifest_entries(node, query_id) + partitions = _data_file_partition_records(entries) + assert partitions, "No data-file partition records found in manifest entries" + meta_values = {str(_partition_scalar(p, "year")) for p in partitions} + assert meta_values == {"2024"}, ( + f"Metadata partition {meta_values} must equal the destination String value " + f"'2024' (not the source integer representation)." + ) + + +def test_export_partition_multicolumn_identity_metadata_matches_data(cluster): + """A multi-column identity partition (event_date Date, retention UInt64 -> Int64) + records per-column values in the Iceberg metadata that match the exported data.""" + node = cluster.instances["replica1"] + + uid = unique_suffix() + mt_table = f"mt_multicol_{uid}" + iceberg_table = f"iceberg_multicol_{uid}" + + # Iceberg has no unsigned types, so retention widens UInt64 -> Int64; the cast is + # not value-preserving per canBeSafelyCast, hence the lossy opt-in below. + make_rmt(node, mt_table, "id Int64, event_date Date, retention UInt64", + "(event_date, retention)", replica_name="replica1") + node.query( + f"INSERT INTO {mt_table} VALUES " + f"(1, '2024-03-05', 30), (2, '2024-03-05', 30), (3, '2024-03-05', 30)" + ) + + make_iceberg_s3(node, iceberg_table, "id Int64, event_date Date, retention Int64", + partition_by="(event_date, retention)") + + pid = first_partition_id(node, mt_table) + node.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{pid}' TO TABLE {iceberg_table}", + settings={ + "allow_insert_into_iceberg": 1, + "export_merge_tree_part_allow_lossy_cast": 1, + }, + ) + wait_for_export_status(node, mt_table, iceberg_table, pid, "COMPLETED") + + count = int(node.query(f"SELECT count() FROM {iceberg_table}").strip()) + assert count == 3, f"Expected 3 rows after export, got {count}" + + data_retention = int(node.query( + f"SELECT DISTINCT retention FROM {iceberg_table}" + ).strip()) + assert data_retention == 30, f"Expected exported retention 30, got {data_retention}" + + days = int(node.query( + f"SELECT DISTINCT toInt64(event_date) FROM {iceberg_table}" + ).strip()) + + query_id = f"multicol_{uid}" + node.query( + f"SELECT * FROM {iceberg_table}", + query_id=query_id, + settings={"iceberg_metadata_log_level": "manifest_file_entry"}, + ) + entries = fetch_manifest_entries(node, query_id) + partitions = _data_file_partition_records(entries) + assert partitions, "No data-file partition records found in manifest entries" + + meta_dates = {int(_partition_scalar(p, "event_date")) for p in partitions} + assert meta_dates == {days}, ( + f"Metadata event_date {meta_dates} must equal days-since-epoch {days}." + ) + meta_retentions = {int(_partition_scalar(p, "retention")) for p in partitions} + assert meta_retentions == {30}, ( + f"Metadata retention {meta_retentions} must equal the exported value 30." + ) + + filtered = int(node.query( + f"SELECT count() FROM {iceberg_table} " + f"WHERE event_date = '2024-03-05' AND retention = 30" + ).strip()) + assert filtered == 3, f"Partition-filtered read expected 3 rows, got {filtered}" diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 3161e3b67100..d403538999df 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -1532,6 +1532,86 @@ def test_export_partition_all(cluster): assert row_count == 3, f"Expected 3 rows in S3 after EXPORT PARTITION ALL, got {row_count}" +def test_export_partition_partition_column_castable_type_mismatch(cluster): + """A lossy partition-column cast (year String -> UInt16) is rejected synchronously + when export_merge_tree_part_allow_lossy_cast is off, scheduling nothing.""" + skip_if_remote_database_disk_enabled(cluster) + node = cluster.instances["replica1"] + + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"pkey_cast_mismatch_partition_mt_{postfix}" + s3_table = f"pkey_cast_mismatch_partition_s3_{postfix}" + + # Source: year String; destination: year UInt16. PARTITION BY year on + # both sides — same AST text — to defeat the AST equivalence check. + node.query( + f"CREATE TABLE {mt_table} (id UInt64, year String) " + f"ENGINE = ReplicatedMergeTree('/clickhouse/tables/{mt_table}', 'replica1') " + f"PARTITION BY year " + f"ORDER BY tuple()" + ) + node.query( + f"CREATE TABLE {s3_table} (id UInt64, year UInt16) " + f"ENGINE = S3(s3_conn, filename='{s3_table}', " + f"format=Parquet, partition_strategy='hive') " + f"PARTITION BY year" + ) + + node.query( + f"INSERT INTO {mt_table} VALUES (1, '2020'), (2, '2020'), (3, '2020')" + ) + + # With a String partition column the partition_id is the SipHash of the + # value rather than the textual representation — look it up so we can + # reference the partition explicitly in EXPORT PARTITION ID and in + # subsequent system.replicated_partition_exports queries. + partition_id = node.query( + f"SELECT partition_id FROM system.parts " + f"WHERE database = currentDatabase() AND table = '{mt_table}' " + f" AND active " + f"ORDER BY name LIMIT 1" + ).strip() + assert partition_id, ( + "Expected one active part on the source table after INSERT; " + "system.parts returned nothing." + ) + + error = node.query_and_get_error( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '{partition_id}' " + f"TO TABLE {s3_table}" + ) + assert "BAD_ARGUMENTS" in error, ( + f"Expected BAD_ARGUMENTS for a lossy partition-column cast, " + f"got: {error!r}" + ) + assert "requires a lossy cast" in error and "'year'" in error, ( + f"Expected the error message to report the lossy cast on column " + f"'year', got: {error!r}" + ) + + # Nothing scheduled: no row in system.replicated_partition_exports. + rows_in_system_view = node.query( + f"SELECT count() FROM system.replicated_partition_exports " + f"WHERE source_table = '{mt_table}' " + f" AND destination_table = '{s3_table}' " + f" AND partition_id = '{partition_id}'" + ).strip() + assert rows_in_system_view == "0", ( + f"Expected no row in system.replicated_partition_exports after a " + f"synchronously-rejected export, got {rows_in_system_view}." + ) + + # Nothing written: no parquet file under any year=*/ partition prefix. + files_in_s3 = node.query( + f"SELECT count() FROM s3(s3_conn, " + f"filename='{s3_table}/year=*/*.parquet', format='One')" + ).strip() + assert files_in_s3 == "0", ( + f"Expected no Parquet files in S3 after a synchronously-rejected " + f"export, found {files_in_s3}." + ) + + def test_export_partition_all_failure_modes(cluster): """Cover the three values of `export_merge_tree_partition_all_on_error`. diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql index 7cb70af024a2..8200233a7322 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -1,6 +1,6 @@ -- Tags: no-parallel, no-fasttest -DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table, 03572_ephemeral_mt_table, 03572_matching_ephemeral_s3_table; +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table, 03572_ephemeral_mt_table, 03572_matching_ephemeral_s3_table, 03572_partition_type_mismatch_mt, 03572_partition_type_mismatch_s3, 03572_lossy_mt, 03572_lossy_s3, 03572_lossless_mt, 03572_lossless_s3; SET allow_experimental_export_merge_tree_part=1; @@ -9,10 +9,12 @@ CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTIT INSERT INTO 03572_mt_table VALUES (1, 2020); -- Create a table with a different partition key and export a partition to it. It should throw +-- on the partition-key AST mismatch (schema compat now follows INSERT SELECT positional semantics, +-- so the column shape matches and the partition-key check is what fires). CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table -SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} DROP TABLE 03572_invalid_schema_table; @@ -27,13 +29,42 @@ ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION extractKey -- It is a table function, but the engine does not support exports/imports, should throw ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION url('a.parquet'); -- {serverError NOT_IMPLEMENTED} --- Test that destination table can not have a column that matches the source ephemeral +-- Source-side ephemeral columns are not readable, so the destination must not declare a matching +-- ordinary column or the column count will not align under positional matching. CREATE TABLE 03572_ephemeral_mt_table (id UInt64, year UInt16, name String EPHEMERAL) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); CREATE TABLE 03572_matching_ephemeral_s3_table (id UInt64, year UInt16, name String) ENGINE = S3(s3_conn, filename='03572_matching_ephemeral_s3_table', format='Parquet', partition_strategy='hive') PARTITION BY year; INSERT INTO 03572_ephemeral_mt_table (id, year, name) VALUES (1, 2020, 'alice'); -ALTER TABLE 03572_ephemeral_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_matching_ephemeral_s3_table; -- {serverError INCOMPATIBLE_COLUMNS} +ALTER TABLE 03572_ephemeral_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_matching_ephemeral_s3_table; -- {serverError NUMBER_OF_COLUMNS_DOESNT_MATCH} -DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table, 03572_ephemeral_mt_table, 03572_matching_ephemeral_s3_table; +-- Partition columns follow the same lossy-cast gate as any other column (no special +-- exact-type guard). String -> UInt16 is a lossy cast, so with the default +-- export_merge_tree_part_allow_lossy_cast = 0 it is rejected synchronously. +CREATE TABLE 03572_partition_type_mismatch_mt (id UInt64, year String) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_partition_type_mismatch_s3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_partition_type_mismatch_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_partition_type_mismatch_mt EXPORT PART '2020_1_1_0' TO TABLE 03572_partition_type_mismatch_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} + +CREATE TABLE 03572_lossy_mt (id Int64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_lossy_s3 (id Int32, year UInt16) ENGINE = S3(s3_conn, filename='03572_lossy_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_lossy_mt EXPORT PART '2020_1_1_0' TO TABLE 03572_lossy_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} + +-- With the acknowledgment setting enabled, the lossy cast passes validation and reaches the +-- part lookup, which fails because the part does not exist. +ALTER TABLE 03572_lossy_mt EXPORT PART '2020_1_1_0' TO TABLE 03572_lossy_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_allow_lossy_cast = 1; -- {serverError NO_SUCH_DATA_PART} + +-- A lossless widening cast (Int32 -> Int64) passes validation without the setting and reaches +-- the part lookup, which fails because the part does not exist. +CREATE TABLE 03572_lossless_mt (id Int32, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_lossless_s3 (id Int64, year UInt16) ENGINE = S3(s3_conn, filename='03572_lossless_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_lossless_mt EXPORT PART '2020_1_1_0' TO TABLE 03572_lossless_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NO_SUCH_DATA_PART} + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table, 03572_ephemeral_mt_table, 03572_matching_ephemeral_s3_table, 03572_partition_type_mismatch_mt, 03572_partition_type_mismatch_s3, 03572_lossy_mt, 03572_lossy_s3, 03572_lossless_mt, 03572_lossless_s3; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql index f8f23532f0a7..0a199755c40a 100644 --- a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -1,16 +1,18 @@ -- Tags: no-parallel, no-fasttest -DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table, 03572_rmt_partition_type_mismatch_mt, 03572_rmt_partition_type_mismatch_s3, 03572_rmt_lossy_mt, 03572_rmt_lossy_s3, 03572_rmt_lossless_mt, 03572_rmt_lossless_s3; CREATE TABLE 03572_rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt/03572_rmt_table', 'replica1') PARTITION BY year ORDER BY tuple(); INSERT INTO 03572_rmt_table VALUES (1, 2020); -- Create a table with a different partition key and export a partition to it. It should throw +-- on the partition-key AST mismatch (schema compat now follows INSERT SELECT positional semantics, +-- so the column shape matches and the partition-key check is what fires). CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table -SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} DROP TABLE 03572_invalid_schema_table; @@ -19,4 +21,33 @@ CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_ ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} -DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; +-- Partition columns follow the same lossy-cast gate as any other column (no special +-- exact-type guard). String -> UInt16 is a lossy cast, so with the default +-- export_merge_tree_part_allow_lossy_cast = 0 it is rejected synchronously. +CREATE TABLE 03572_rmt_partition_type_mismatch_mt (id UInt64, year String) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt_pcol_type/03572_rmt_partition_type_mismatch_mt', 'replica1') PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_rmt_partition_type_mismatch_s3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_rmt_partition_type_mismatch_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_rmt_partition_type_mismatch_mt EXPORT PART '2020_0_0_0' TO TABLE 03572_rmt_partition_type_mismatch_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} + +-- A lossy cast on a non-partition column (Int64 -> Int32) is rejected synchronously by default. +CREATE TABLE 03572_rmt_lossy_mt (id Int64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt_lossy/03572_rmt_lossy_mt', 'replica1') PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_rmt_lossy_s3 (id Int32, year UInt16) ENGINE = S3(s3_conn, filename='03572_rmt_lossy_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_rmt_lossy_mt EXPORT PART '2020_0_0_0' TO TABLE 03572_rmt_lossy_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError BAD_ARGUMENTS} + +-- With the acknowledgment setting enabled, the lossy cast passes validation and reaches the +-- part lookup, which fails because the part does not exist. +ALTER TABLE 03572_rmt_lossy_mt EXPORT PART '2020_0_0_0' TO TABLE 03572_rmt_lossy_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_allow_lossy_cast = 1; -- {serverError NO_SUCH_DATA_PART} + +-- A lossless widening cast (Int32 -> Int64) passes validation without the setting and reaches +-- the part lookup, which fails because the part does not exist. +CREATE TABLE 03572_rmt_lossless_mt (id Int32, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt_lossless/03572_rmt_lossless_mt', 'replica1') PARTITION BY year ORDER BY tuple(); +CREATE TABLE 03572_rmt_lossless_s3 (id Int64, year UInt16) ENGINE = S3(s3_conn, filename='03572_rmt_lossless_s3', format='Parquet', partition_strategy='hive') PARTITION BY year; + +ALTER TABLE 03572_rmt_lossless_mt EXPORT PART '2020_0_0_0' TO TABLE 03572_rmt_lossless_s3 +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NO_SUCH_DATA_PART} + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table, 03572_rmt_partition_type_mismatch_mt, 03572_rmt_partition_type_mismatch_s3, 03572_rmt_lossy_mt, 03572_rmt_lossy_s3, 03572_rmt_lossless_mt, 03572_rmt_lossless_s3;