-
Notifications
You must be signed in to change notification settings - Fork 18
Auto cast values on export part and partition just like insert select #1779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
99002b9
2fffd64
5271dc1
305a318
63d8f7e
267dd33
2534320
8f869ac
7cd3ee9
a5c01b8
c5693b6
a289a6a
308159a
5161c90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,6 +99,31 @@ namespace | |
| } | ||
| } | ||
|
|
||
| /// Mirrors `InterpreterInsertQuery::addInsertToSelectPipeline`: positional match, | ||
| /// destination header = `getSampleBlockNonMaterialized()`, all type bridging is done | ||
| /// by the CAST inside `makeConvertingActions`. No pre-validation, no per-column | ||
| /// lossy/non-lossy classification — restrictions are exactly what INSERT SELECT enforces. | ||
| void addExportConvertingActions( | ||
| QueryPlan & plan_for_part, | ||
| const IStorage & destination_storage, | ||
| const ContextPtr & local_context) | ||
| { | ||
| const auto destination_header | ||
| = destination_storage.getInMemoryMetadataPtr()->getSampleBlockNonMaterialized(); | ||
|
|
||
| auto dag = ActionsDAG::makeConvertingActions( | ||
| plan_for_part.getCurrentHeader()->getColumnsWithTypeAndName(), | ||
| destination_header.getColumnsWithTypeAndName(), | ||
| ActionsDAG::MatchColumnsMode::Position, | ||
| local_context); | ||
|
|
||
| auto expression_step = std::make_unique<ExpressionStep>( | ||
| plan_for_part.getCurrentHeader(), | ||
| std::move(dag)); | ||
| expression_step->setStepDescription("Convert source columns to destination types for export"); | ||
| plan_for_part.addStep(std::move(expression_step)); | ||
| } | ||
|
|
||
| String buildDestinationFilename( | ||
| const MergeTreePartExportManifest & manifest, | ||
| const StorageID & storage_id, | ||
|
|
@@ -261,6 +286,10 @@ bool ExportPartTask::executeStep() | |
| /// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns | ||
| materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part); | ||
|
|
||
| /// Align the pipeline header with the destination's non-materialized sample block, | ||
| /// using the same `makeConvertingActions(Position)` call INSERT SELECT performs. | ||
| addExportConvertingActions(plan_for_part, *destination_storage, local_context); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the partition column itself needs a cast (for example a source Useful? React with 👍 / 👎.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is expected, at least for now. |
||
|
|
||
| QueryPlanOptimizationSettings optimization_settings(local_context); | ||
| auto pipeline_settings = BuildQueryPipelineSettings(local_context); | ||
| auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); | ||
|
|
@@ -303,7 +332,7 @@ bool ExportPartTask::executeStep() | |
| { | ||
| IStorage::IcebergCommitExportPartitionArguments iceberg_args; | ||
| iceberg_args.metadata_json_string = manifest.iceberg_metadata_json; | ||
| iceberg_args.partition_values = manifest.data_part->partition.value; | ||
| iceberg_args.partition_source_block = block_with_partition_values; | ||
|
|
||
| destination_storage->commitExportPartitionTransaction( | ||
| manifest.transaction_id, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the destination is partitioned object storage and the partition column type changes in a castable way (for example source
year String, destinationyear UInt32, bothPARTITION BY year), this new row conversion happens only afterdestination_storage->importhas already computed the S3/Hive partition path fromblock_with_partition_valuesusing the source part's minmax block. That means a schema pair now accepted by the synchronous validation can either fail inside partition-key evaluation with the destination actions seeing source-typed inputs, or write files under a path based on the uncast source value while the exported rows were cast, so the export is not equivalent toINSERT SELECT.Useful? React with 👍 / 👎.