diff --git a/.github/workflows/packaging_wheels.yml b/.github/workflows/packaging_wheels.yml index 82ed76a8..fed70203 100644 --- a/.github/workflows/packaging_wheels.yml +++ b/.github/workflows/packaging_wheels.yml @@ -32,7 +32,7 @@ jobs: matrix: python: [ cp314 ] platform: - - { os: windows-2025, arch: amd64, cibw_system: win } + - { os: windows-2022, arch: amd64, cibw_system: win } - { os: windows-11-arm, arch: ARM64, cibw_system: win } - { os: ubuntu-24.04, arch: x86_64, cibw_system: manylinux } - { os: ubuntu-24.04-arm, arch: aarch64, cibw_system: manylinux } diff --git a/.github/workflows/targeted_test.yml b/.github/workflows/targeted_test.yml index d1a828de..eff62fe2 100644 --- a/.github/workflows/targeted_test.yml +++ b/.github/workflows/targeted_test.yml @@ -8,7 +8,7 @@ on: required: true type: choice options: - - 'windows-2025' + - 'windows-2022' - 'windows-11-arm' - 'ubuntu-24.04' - 'ubuntu-24.04-arm' diff --git a/external/duckdb b/external/duckdb index 0361de44..06eb6b68 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit 0361de441a122f11fe07c40ce4531cd58f5eaf7a +Subproject commit 06eb6b6858c6d568f5fe62855f53c386f13c98c7 diff --git a/src/duckdb_py/arrow/arrow_array_stream.cpp b/src/duckdb_py/arrow/arrow_array_stream.cpp index a4877f6a..ed9e2275 100644 --- a/src/duckdb_py/arrow/arrow_array_stream.cpp +++ b/src/duckdb_py/arrow/arrow_array_stream.cpp @@ -14,11 +14,11 @@ namespace duckdb { -void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches) { +void TransformDuckToArrowChunk(py::object pyarrow_schema, ArrowArray &data, py::list &batches) { py::gil_assert(); auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); auto batch_import_func = pyarrow_lib_module.attr("RecordBatch").attr("_import_from_c"); - batches.append(batch_import_func(reinterpret_cast(&data), reinterpret_cast(&arrow_schema))); + batches.append(batch_import_func(reinterpret_cast(&data), pyarrow_schema)); } void VerifyArrowDatasetLoaded() { diff --git a/src/duckdb_py/arrow/arrow_export_utils.cpp b/src/duckdb_py/arrow/arrow_export_utils.cpp index ea30b94b..8333bbf6 100644 --- a/src/duckdb_py/arrow/arrow_export_utils.cpp +++ b/src/duckdb_py/arrow/arrow_export_utils.cpp @@ -17,18 +17,28 @@ namespace duckdb { namespace pyarrow { -py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, - ClientProperties &options) { +py::object ToPyArrowSchema(const ArrowSchema &schema) { py::gil_scoped_acquire acquire; auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); - auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches"); auto schema_import_func = pyarrow_lib_module.attr("Schema").attr("_import_from_c"); + return schema_import_func(reinterpret_cast(&schema)); +} + +py::object ToArrowTable(const py::list &batches, py::object pyarrow_schema) { + py::gil_scoped_acquire acquire; + + auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); + auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches"); + + return py::cast(from_batches_func(batches, pyarrow_schema)); +} + +py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, + ClientProperties &options) { ArrowSchema schema; ArrowConverter::ToArrowSchema(&schema, types, names, options); - auto schema_obj = schema_import_func(reinterpret_cast(&schema)); - - return py::cast(from_batches_func(batches, schema_obj)); + return ToArrowTable(batches, ToPyArrowSchema(schema)); } } // namespace pyarrow diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp index 90974ff7..1f790c28 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp @@ -62,7 +62,7 @@ enum class PyArrowObjectType { PolarsLazyFrame }; -void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches); +void TransformDuckToArrowChunk(py::object pyarrow_schema, ArrowArray &data, py::list &batches); PyArrowObjectType GetArrowType(const py::handle &obj); diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp index 39d1b3dc..6306b116 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp @@ -6,9 +6,13 @@ namespace duckdb { namespace pyarrow { +py::object ToPyArrowSchema(const ArrowSchema &schema); + py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, ClientProperties &options); +py::object ToArrowTable(const py::list &batches, py::object pyarrow_schema); + } // namespace pyarrow } // namespace duckdb diff --git a/src/duckdb_py/include/duckdb_python/pyresult.hpp b/src/duckdb_py/include/duckdb_python/pyresult.hpp index 941a203b..d7da83cc 100644 --- a/src/duckdb_py/include/duckdb_python/pyresult.hpp +++ b/src/duckdb_py/include/duckdb_python/pyresult.hpp @@ -36,15 +36,13 @@ struct DuckDBPyResult { PandasDataFrame FetchDF(bool date_as_object); - duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars); - PandasDataFrame FetchDFChunk(const idx_t vectors_per_chunk = 1, bool date_as_object = false); py::dict FetchPyTorch(); py::dict FetchTF(); - ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch = 1000000); + duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars); duckdb::pyarrow::RecordBatchReader FetchRecordBatchReader(idx_t rows_per_batch = 1000000); py::object FetchArrowCapsule(idx_t rows_per_batch = 1000000); @@ -71,6 +69,19 @@ struct DuckDBPyResult { unique_ptr FetchNextRaw(QueryResult &result); unique_ptr InitializeNumpyConversion(bool pandas = false); + //! Re-feed an already-MATERIALIZED result (a ColumnDataCollection, e.g. from + //! rel.execute()) back through the engine on the user's own context. The eager + //! variant installs a PhysicalArrowCollector to produce an ArrowQueryResult + //! (parallel); the stream variant produces a lazy StreamQueryResult that co-owns + //! the context (so it survives `del conn`). Never call these on a StreamQueryResult: + //! a lazy result already has a live context and is converted/wrapped directly. + void PromoteMaterializedToArrow(idx_t batch_size); + + template + T RunWithArrowSchema(const std::function &fun, bool dedup_col_names); + duckdb::pyarrow::Table MaterializedResultToArrowTable(const ArrowSchema &arrow_schema, idx_t rows_per_batch); + ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch); + private: idx_t chunk_offset = 0; diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index b6e14293..8b9ac8f9 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -413,7 +413,7 @@ shared_ptr DuckDBPyConnection::UnregisterUDF(const string &n auto &catalog = Catalog::GetCatalog(context, SYSTEM_CATALOG); DropInfo info; info.type = CatalogType::SCALAR_FUNCTION_ENTRY; - info.name = Identifier(name); + info.NameMutable() = Identifier(name); info.allow_drop_internal = true; info.cascade = false; info.if_not_found = OnEntryNotFound::THROW_EXCEPTION; @@ -1651,11 +1651,12 @@ unique_ptr DuckDBPyConnection::RunQuery(const py::object &quer unique_ptr DuckDBPyConnection::Table(const string &tname) { auto &connection = con.GetConnection(); auto qualified_name = QualifiedName::Parse(tname); - if (qualified_name.schema.empty()) { - qualified_name.schema = DEFAULT_SCHEMA; + if (qualified_name.Schema().empty()) { + qualified_name.SchemaMutable() = DEFAULT_SCHEMA; } try { - return CreateRelation(connection.Table(qualified_name.catalog, qualified_name.schema, qualified_name.name)); + return CreateRelation( + connection.Table(qualified_name.Catalog(), qualified_name.Schema(), qualified_name.Name())); } catch (const CatalogException &) { // CatalogException will be of the type '... is not a table' // Not a table in the database, make a query relation that can perform replacement scans diff --git a/src/duckdb_py/pyexpression.cpp b/src/duckdb_py/pyexpression.cpp index 329b034e..e6d263a6 100644 --- a/src/duckdb_py/pyexpression.cpp +++ b/src/duckdb_py/pyexpression.cpp @@ -325,13 +325,13 @@ shared_ptr DuckDBPyExpression::ColumnExpression(const py::ar } auto qualified_name = QualifiedName::Parse(column_name); - if (!qualified_name.catalog.empty()) { - column_names.push_back(qualified_name.catalog); + if (!qualified_name.Catalog().empty()) { + column_names.push_back(qualified_name.Catalog()); } - if (!qualified_name.schema.empty()) { - column_names.push_back(qualified_name.schema); + if (!qualified_name.Schema().empty()) { + column_names.push_back(qualified_name.Schema()); } - column_names.push_back(qualified_name.name); + column_names.push_back(qualified_name.Name()); } else { for (auto &part : names) { column_names.push_back(Identifier(py::str(part))); diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index daf439e0..10e8aa72 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -960,11 +960,11 @@ PandasDataFrame DuckDBPyRelation::FetchDFChunk(idx_t vectors_per_chunk, bool dat return result->FetchDFChunk(vectors_per_chunk, date_as_object); } -duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) { +pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) { + if (!result && !rel) { + return py::none(); + } if (!result) { - if (!rel) { - return py::none(); - } auto &config = ClientConfig::GetConfig(*rel->context->GetContext()); ScopedConfigSetting scoped_setting( config, @@ -991,19 +991,9 @@ py::object DuckDBPyRelation::ToArrowCapsule(const py::object &requested_schema) if (!rel) { return py::none(); } - // The PyCapsule protocol doesn't allow custom parameters, so we use the same - // default batch size as fetch_arrow_table / fetch_record_batch. - idx_t batch_size = 1000000; - auto &config = ClientConfig::GetConfig(*rel->context->GetContext()); - ScopedConfigSetting scoped_setting( - config, - [&batch_size](ClientConfig &config) { - config.get_result_collector = [&batch_size](ClientContext &context, PreparedStatementData &data) { - return PhysicalArrowCollector::Create(context, data, batch_size); - }; - }, - [](ClientConfig &config) { config.get_result_collector = nullptr; }); - ExecuteOrThrow(); + // Fresh relation: stream lazily on the user's context (capsule survives `del conn`, + // but shares the single active-stream slot - consume before reusing the connection). + ExecuteOrThrow(true); } AssertResultOpen(); auto res = result->FetchArrowCapsule(); @@ -1049,6 +1039,7 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_s if (!rel) { return py::none(); } + // Fresh relation: stream lazily on the user's own context (survives `del conn`). ExecuteOrThrow(true); } AssertResultOpen(); @@ -1590,7 +1581,7 @@ DuckDBPyRelation &DuckDBPyRelation::Execute() { void DuckDBPyRelation::InsertInto(const string &table) { AssertRelation(); auto parsed_info = QualifiedName::Parse(table); - auto insert = rel->InsertRel(parsed_info.catalog, parsed_info.schema, parsed_info.name); + auto insert = rel->InsertRel(parsed_info.Catalog(), parsed_info.Schema(), parsed_info.Name()); PyExecuteRelation(insert); } @@ -1654,7 +1645,7 @@ void DuckDBPyRelation::Insert(const py::object ¶ms) const { void DuckDBPyRelation::Create(const string &table) { AssertRelation(); auto parsed_info = QualifiedName::Parse(table); - auto create = rel->CreateRel(parsed_info.schema, parsed_info.name, false); + auto create = rel->CreateRel(parsed_info.Schema(), parsed_info.Name(), false); PyExecuteRelation(create); } @@ -1756,7 +1747,7 @@ string DuckDBPyRelation::Explain(ExplainType type, const string &format) { // An empty format means "auto": the default format, or HTML when running under Jupyter. const bool auto_format = format.empty(); - auto explain_format = auto_format ? GetExplainFormat(type) : ProfilerPrintFormat::FromString(format); + auto explain_format = auto_format ? GetExplainFormat(type) : ProfilerPrintFormat(format); auto res = rel->Explain(type, explain_format); D_ASSERT(res->type == duckdb::QueryResultType::MATERIALIZED_RESULT); auto &materialized = res->Cast(); diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index 1e96cd87..5de8f848 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -14,8 +14,16 @@ #include "duckdb/common/exception.hpp" #include "duckdb/common/enums/stream_execution_result.hpp" #include "duckdb_python/arrow/arrow_export_utils.hpp" -#include "duckdb/main/chunk_scan_state/query_result.hpp" #include "duckdb/common/arrow/arrow_query_result.hpp" +#include "duckdb/common/arrow/physical_arrow_collector.hpp" +#include "duckdb/main/chunk_scan_state/query_result.hpp" +#include "duckdb/main/client_config.hpp" +#include "duckdb/main/materialized_query_result.hpp" +#include "duckdb/main/stream_query_result.hpp" +#include "duckdb/parser/expression/star_expression.hpp" +#include "duckdb/parser/query_node/select_node.hpp" +#include "duckdb/parser/statement/select_statement.hpp" +#include "duckdb/parser/tableref/column_data_ref.hpp" using namespace pybind11::literals; @@ -413,73 +421,153 @@ py::dict DuckDBPyResult::FetchTF() { return result_dict; } -duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, bool to_polars) { - if (!result) { - throw InvalidInputException("There is no query result"); +// `SELECT * FROM ` over `collection`, executed as a SelectStatement rather +// than via PendingQuery(relation) - the latter's RelationStatement stringifies the whole +// collection (O(rows)). The ColumnDataRef owns the collection, so it outlives the result +// (needed by the lazy stream path). +static unique_ptr MakeColumnDataScanStatement(unique_ptr collection, + const vector &names) { + // The binder rejects duplicate column names; callers restore the originals afterwards. + auto deduplicated_names = names; + QueryResult::DeduplicateColumns(deduplicated_names); + // Core's ColumnDataRef now takes case-insensitive Identifiers; promote the runtime names explicitly. + vector expected_names; + expected_names.reserve(deduplicated_names.size()); + for (auto &name : deduplicated_names) { + expected_names.emplace_back(std::move(name)); + } + auto table_ref = make_uniq(std::move(collection), std::move(expected_names)); + table_ref->alias = "materialized"; // binding asserts on an unset alias + auto select_node = make_uniq(); + select_node->select_list.push_back(make_uniq()); + select_node->from_table = std::move(table_ref); + auto select = make_uniq(); + select->node = std::move(select_node); + return select; +} + +// Re-feed a materialized result through a PhysicalArrowCollector on the user's own context +// (parallel conversion, correct Arrow settings) -> ArrowQueryResult. +void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) { + D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); + auto client_context = result->client_properties.client_context; + if (!client_context) { + throw InternalException("Cannot promote result to Arrow: the originating client context is gone"); + } + auto context = client_context->shared_from_this(); + auto &materialized = result->Cast(); + auto names = result->names; + auto select = MakeColumnDataScanStatement(materialized.TakeCollection(), names); + + auto &config = ClientConfig::GetConfig(*context); + ScopedConfigSetting scoped_setting( + config, + [batch_size](ClientConfig &config) { + config.get_result_collector = [batch_size](ClientContext &context, PreparedStatementData &data) { + return PhysicalArrowCollector::Create(context, data, batch_size); + }; + }, + [](ClientConfig &config) { config.get_result_collector = nullptr; }); + + unique_ptr new_result; + { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + auto pending_query = context->PendingQuery(std::move(select), QueryParameters(false)); + new_result = DuckDBPyConnection::CompletePendingQuery(*pending_query); } + if (new_result->HasError()) { + new_result->ThrowError(); + } + new_result->names = std::move(names); // restore names de-duplicated by re-binding + result = std::move(new_result); +} + +template +T DuckDBPyResult::RunWithArrowSchema(const std::function &fun, bool dedup_col_names) { + D_ASSERT(result); + if (!result->client_properties.client_context) { + throw ConnectionException("Cannot fetch arrow schema without a valid connection"); + } + auto ctx = result->client_properties.client_context->shared_from_this(); + auto names = result->names; - if (to_polars) { + if (dedup_col_names) { QueryResult::DeduplicateColumns(names); } - if (!result) { - throw InvalidInputException("result closed"); - } - auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); + ArrowSchema arrow_schema; + ctx->RunFunctionInTransaction( + [&] { ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties); }); + + return fun(arrow_schema); +} +duckdb::pyarrow::Table DuckDBPyResult::MaterializedResultToArrowTable(const ArrowSchema &arrow_schema, + const idx_t rows_per_batch) { + D_ASSERT(result); + D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT); + + auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema); + if (result->type == QueryResultType::MATERIALIZED_RESULT) { + PromoteMaterializedToArrow(rows_per_batch); + } py::list batches; - if (result->type == QueryResultType::ARROW_RESULT) { - auto &arrow_result = result->Cast(); - auto arrays = arrow_result.ConsumeArrays(); - for (auto &array : arrays) { - ArrowSchema arrow_schema; - auto result_names = arrow_result.names; - if (to_polars) { - QueryResult::DeduplicateColumns(result_names); - } - ArrowArray data = array->arrow_array; - array->arrow_array.release = nullptr; - ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names, - arrow_result.client_properties); - TransformDuckToArrowChunk(arrow_schema, data, batches); - } - } else { - QueryResultChunkScanState scan_state(*result.get()); - while (true) { - ArrowArray data; - idx_t count; - auto &query_result = *result.get(); - { - D_ASSERT(py::gil_check()); - py::gil_scoped_release release; - count = ArrowUtil::FetchChunk(scan_state, query_result.client_properties, rows_per_batch, &data, - ArrowTypeExtensionData::GetExtensionTypes( - *query_result.client_properties.client_context, query_result.types)); - } - if (count == 0) { - break; - } - ArrowSchema arrow_schema; - auto result_names = query_result.names; - if (to_polars) { - QueryResult::DeduplicateColumns(result_names); - } - ArrowConverter::ToArrowSchema(&arrow_schema, query_result.types, result_names, - query_result.client_properties); - TransformDuckToArrowChunk(arrow_schema, data, batches); - } + auto &arrow_result = result->Cast(); + auto arrays = arrow_result.ConsumeArrays(); + for (auto &array : arrays) { + ArrowArray data = array->arrow_array; + array->arrow_array.release = nullptr; + TransformDuckToArrowChunk(pyarrow_schema, data, batches); } + return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); +} - return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties); +duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(const idx_t rows_per_batch, const bool to_polars) { + if (!result) { + throw InvalidInputException("There is no query result"); + } + + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> duckdb::pyarrow::Table { + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + return MaterializedResultToArrowTable(schema, rows_per_batch); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowTable called with unsupported query result: %d", result->type); + } + auto pyarrow_schema = pyarrow::ToPyArrowSchema(schema); + py::list batches; + QueryResultChunkScanState scan_state(*result); + while (true) { + ArrowArray data; + idx_t count; + { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data, + ArrowTypeExtensionData::GetExtensionTypes( + *result->client_properties.client_context, result->types)); + } + if (count == 0) { + break; + } + TransformDuckToArrowChunk(pyarrow_schema, data, batches); + } + return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); + }, + to_polars); } ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } - ResultArrowArrayStreamWrapper *result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); - // The 'result_stream' is part of the 'private_data' of the ArrowArrayStream and its lifetime is bound to that of - // the ArrowArrayStream. + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowArrayStream called with unsupported query result: %d", result->type); + } + // The wrapper is owned by the ArrowArrayStream's private_data (released with the stream). + const auto result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); return result_stream->stream; } @@ -487,6 +575,20 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t if (!result) { throw InvalidInputException("There is no query result"); } + + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + constexpr bool dedup_column_names = false; + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> duckdb::pyarrow::RecordBatchReader { + const auto table = MaterializedResultToArrowTable(schema, rows_per_batch); + return py::cast( + table.attr("to_reader")(py::arg("max_chunksize") = rows_per_batch)); + }, + dedup_column_names); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchRecordBatchReader called with unsupported query result: %d", result->type); + } py::gil_scoped_acquire acquire; auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); auto record_batch_reader_func = pyarrow_lib_module.attr("RecordBatchReader").attr("_import_from_c"); @@ -495,273 +597,6 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t return py::cast(record_batch_reader); } -// Holds owned copies of the string data for a deep-copied ArrowSchema node. -struct ArrowSchemaCopyData { - string format; - string name; - string metadata; -}; - -static void ReleaseCopiedArrowSchema(ArrowSchema *schema) { - if (!schema || !schema->release) { - return; - } - for (int64_t i = 0; i < schema->n_children; i++) { - if (schema->children[i]->release) { - schema->children[i]->release(schema->children[i]); - } - delete schema->children[i]; - } - delete[] schema->children; - if (schema->dictionary) { - if (schema->dictionary->release) { - schema->dictionary->release(schema->dictionary); - } - delete schema->dictionary; - } - delete reinterpret_cast(schema->private_data); - schema->release = nullptr; -} - -static idx_t ArrowMetadataSize(const char *metadata) { - if (!metadata) { - return 0; - } - // Arrow metadata format: int32 num_entries, then for each entry: - // int32 key_len, key_bytes, int32 value_len, value_bytes - auto ptr = metadata; - int32_t num_entries; - memcpy(&num_entries, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t); - for (int32_t i = 0; i < num_entries; i++) { - int32_t len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - } - return ptr - metadata; -} - -// Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema -// transfers ownership to the caller, so each call must produce an independent copy. -// Each node owns its string data via an ArrowSchemaCopyData in private_data. -static int ArrowSchemaDeepCopy(const ArrowSchema &source, ArrowSchema *out, string &error) { - out->release = nullptr; - try { - auto data = new ArrowSchemaCopyData(); - data->format = source.format ? source.format : ""; - data->name = source.name ? source.name : ""; - if (source.metadata) { - auto metadata_size = ArrowMetadataSize(source.metadata); - data->metadata.assign(source.metadata, metadata_size); - } - - out->format = data->format.c_str(); - out->name = data->name.c_str(); - out->metadata = source.metadata ? data->metadata.data() : nullptr; - out->flags = source.flags; - out->n_children = source.n_children; - out->dictionary = nullptr; - out->private_data = data; - out->release = ReleaseCopiedArrowSchema; - - if (source.n_children > 0) { - out->children = new ArrowSchema *[source.n_children]; - for (int64_t i = 0; i < source.n_children; i++) { - out->children[i] = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.children[i], out->children[i], error); - if (rc != 0) { - for (int64_t j = 0; j <= i; j++) { - if (out->children[j]->release) { - out->children[j]->release(out->children[j]); - } - delete out->children[j]; - } - delete[] out->children; - out->children = nullptr; - out->n_children = 0; - // Release the partially constructed node - delete data; - out->private_data = nullptr; - out->release = nullptr; - return rc; - } - } - } else { - out->children = nullptr; - } - - if (source.dictionary) { - out->dictionary = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.dictionary, out->dictionary, error); - if (rc != 0) { - delete out->dictionary; - out->dictionary = nullptr; - return rc; - } - } - } catch (std::exception &e) { - error = e.what(); - return -1; - } - return 0; -} - -// Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream. -// This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper -// with an ArrowQueryResult (which throws NotImplementedException from FetchInternal). -// -// The schema is cached eagerly in the constructor (while the ClientContext is still alive) -// so that get_schema can be called after the originating connection has been destroyed. -// ToArrowSchema needs a live ClientContext for transaction access and catalog lookups -// (e.g. CRS conversion for GEOMETRY types). -struct ArrowQueryResultStreamWrapper { - ArrowQueryResultStreamWrapper(unique_ptr result_p) : result(std::move(result_p)), index(0) { - auto &arrow_result = result->Cast(); - arrays = arrow_result.ConsumeArrays(); - - cached_schema.release = nullptr; - ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties); - - stream.private_data = this; - stream.get_schema = GetSchema; - stream.get_next = GetNext; - stream.release = Release; - stream.get_last_error = GetLastError; - } - - ~ArrowQueryResultStreamWrapper() { - if (cached_schema.release) { - cached_schema.release(&cached_schema); - } - } - - static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - return ArrowSchemaDeepCopy(self->cached_schema, out, self->last_error); - } - - static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - if (self->index >= self->arrays.size()) { - out->release = nullptr; - return 0; - } - *out = self->arrays[self->index]->arrow_array; - self->arrays[self->index]->arrow_array.release = nullptr; - self->index++; - return 0; - } - - static void Release(ArrowArrayStream *stream) { - if (!stream || !stream->release) { - return; - } - stream->release = nullptr; - delete reinterpret_cast(stream->private_data); - } - - static const char *GetLastError(ArrowArrayStream *stream) { - if (!stream->release) { - return "stream was released"; - } - auto self = reinterpret_cast(stream->private_data); - return self->last_error.c_str(); - } - - ArrowArrayStream stream; - unique_ptr result; - vector> arrays; - ArrowSchema cached_schema; - idx_t index; - string last_error; -}; - -// Wraps an ArrowArrayStream and caches its schema eagerly. -// Used for the slow path (MaterializedQueryResult / StreamQueryResult) where the -// inner stream is a ResultArrowArrayStreamWrapper from DuckDB core. That wrapper's -// get_schema calls ToArrowSchema which needs a live ClientContext, so we fetch it -// once at construction time and return copies from cache afterwards. -struct SchemaCachingStreamWrapper { - SchemaCachingStreamWrapper(ArrowArrayStream inner_p) : inner(inner_p) { - inner_p.release = nullptr; - - cached_schema.release = nullptr; - if (inner.get_schema(&inner, &cached_schema)) { - schema_error = inner.get_last_error(&inner); - schema_ok = false; - } else { - schema_ok = true; - } - - stream.private_data = this; - stream.get_schema = GetSchema; - stream.get_next = GetNext; - stream.release = Release; - stream.get_last_error = GetLastError; - } - - ~SchemaCachingStreamWrapper() { - if (cached_schema.release) { - cached_schema.release(&cached_schema); - } - if (inner.release) { - inner.release(&inner); - } - } - - static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - if (!self->schema_ok) { - return -1; - } - return ArrowSchemaDeepCopy(self->cached_schema, out, self->schema_error); - } - - static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - return self->inner.get_next(&self->inner, out); - } - - static void Release(ArrowArrayStream *stream) { - if (!stream || !stream->release) { - return; - } - stream->release = nullptr; - delete reinterpret_cast(stream->private_data); - } - - static const char *GetLastError(ArrowArrayStream *stream) { - if (!stream->release) { - return "stream was released"; - } - auto self = reinterpret_cast(stream->private_data); - if (!self->schema_error.empty()) { - return self->schema_error.c_str(); - } - return self->inner.get_last_error(&self->inner); - } - - ArrowArrayStream stream; - ArrowArrayStream inner; - ArrowSchema cached_schema; - bool schema_ok; - string schema_error; -}; - static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) { auto data = PyCapsule_GetPointer(object, "arrow_array_stream"); if (!data) { @@ -774,22 +609,26 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) { delete stream; } -py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) { - if (result && result->type == QueryResultType::ARROW_RESULT) { - // Fast path: yield pre-built Arrow arrays directly. - auto wrapper = new ArrowQueryResultStreamWrapper(std::move(result)); - auto stream = new ArrowArrayStream(); - *stream = wrapper->stream; - wrapper->stream.release = nullptr; - return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor); +py::object DuckDBPyResult::FetchArrowCapsule(const idx_t rows_per_batch) { + if (!result) { + throw InvalidInputException("There is no query result"); + } + + constexpr bool dedup_column_names = false; + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> py::object { + const auto table = MaterializedResultToArrowTable(schema, rows_per_batch); + return table.attr("__arrow_c_stream__")(); + }, + dedup_column_names); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowCapsule called with unsupported query result: %d", result->type); } - // Slow path: wrap in SchemaCachingStreamWrapper so the schema is fetched - // eagerly while the ClientContext is still alive. auto inner_stream = FetchArrowArrayStream(rows_per_batch); - auto wrapper = new SchemaCachingStreamWrapper(inner_stream); auto stream = new ArrowArrayStream(); - *stream = wrapper->stream; - wrapper->stream.release = nullptr; + *stream = inner_stream; return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor); } diff --git a/src/duckdb_py/python_udf.cpp b/src/duckdb_py/python_udf.cpp index 77b5f96a..85fa243b 100644 --- a/src/duckdb_py/python_udf.cpp +++ b/src/duckdb_py/python_udf.cpp @@ -25,13 +25,14 @@ static py::list ConvertToSingleBatch(vector &types, vector ClientProperties &options, ClientContext &context) { ArrowSchema schema; ArrowConverter::ToArrowSchema(&schema, types, names, options); + auto pyarrow_schema = pyarrow::ToPyArrowSchema(schema); py::list single_batch; ArrowAppender appender(types, STANDARD_VECTOR_SIZE, options, ArrowTypeExtensionData::GetExtensionTypes(context, types)); appender.Append(input, 0, input.size(), input.size()); auto array = appender.Finalize(); - TransformDuckToArrowChunk(schema, array, single_batch); + TransformDuckToArrowChunk(pyarrow_schema, array, single_batch); return single_batch; } diff --git a/tests/fast/arrow/test_arrow_refeed.py b/tests/fast/arrow/test_arrow_refeed.py new file mode 100644 index 00000000..6524ec91 --- /dev/null +++ b/tests/fast/arrow/test_arrow_refeed.py @@ -0,0 +1,356 @@ +"""Tests for the MaterializedRelation re-feed Arrow design. + +The redesign promotes an already-executed result back into a relation and re-feeds +it through the engine, so there is a single uniform Arrow path. + +Routing is by *result type*, and a lazy result is never materialized: + +* ``MaterializedQueryResult`` (from ``rel.execute()``) — a ColumnDataCollection in + memory — is the only result re-fed back through the engine: ``to_arrow_table`` / + ``pl`` run it through a ``PhysicalArrowCollector`` (eager + parallel); the lazy + surfaces re-run it as a ``StreamQueryResult``. Both run on the user's own context, + which the produced stream co-owns (so it survives ``del conn``). +* ``StreamQueryResult`` (from ``con.execute()``, or a fresh streaming relation) + already has a live context, so it is converted directly: ``to_arrow_table`` pulls + the stream serially; ``__arrow_c_stream__`` / ``to_arrow_reader`` wrap it directly + in core's ``ResultArrowArrayStreamWrapper`` — never copied. Such a reader/capsule + shares the connection's single active-stream slot (consume before reusing the + connection) and survives ``del conn``. + +The headline correctness win — GEOMETRY conversion after the originating connection +closes — is exercised directly (see ``TestGeometryAfterClose``): DuckDB's built-in +``GEOMETRY`` type maps to the ``geoarrow.wkb`` Arrow extension type, whose conversion +needs a live ``ClientContext`` / type-extension registry. The lazy paths run on a +context the ``StreamQueryResult`` co-owns, so the conversion succeeds even after the +originating connection is destroyed. +""" + +import gc + +import pytest + +import duckdb + +pa = pytest.importorskip("pyarrow") + + +# A spread of types that exercise the re-feed: ints, floats, strings, bools, +# nullables, nested struct/list/map, and an enum-backed categorical. +RICH_SQL = """ + SELECT + i AS int_col, + i::DOUBLE AS double_col, + 'row_' || i::VARCHAR AS str_col, + i % 2 = 0 AS bool_col, + CASE WHEN i % 3 = 0 THEN NULL ELSE i END AS nullable_col, + {'x': i, 'y': i::VARCHAR} AS struct_col, + [i, i + 1, i + 2] AS list_col, + MAP {i::VARCHAR: i * 10} AS map_col, + FROM range(1000) t(i) +""" + + +class TestEagerToArrowTablePromotion: + """Eager + parallel promotion of already-executed results matches the fresh path.""" + + def test_cursor_fetch_arrow_table_matches_fresh(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + # con.execute(...) materializes; fetch goes through the promotion path. + actual = conn.execute(RICH_SQL).to_arrow_table() + assert actual.equals(expected) + + def test_preexecuted_relation_to_arrow_table_matches_fresh(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + rel = conn.sql(RICH_SQL) + rel.execute() # forces a MaterializedQueryResult + actual = rel.to_arrow_table() + assert actual.equals(expected) + + def test_cursor_pl_matches_fresh(self): + pl = pytest.importorskip("polars") + conn = duckdb.connect() + sql = "SELECT i AS a, i::VARCHAR AS b FROM range(500) t(i)" + expected = conn.sql(sql).pl() + actual = conn.execute(sql).pl() + assert expected.equals(actual) + assert isinstance(actual, pl.DataFrame) + + def test_cursor_fetch_arrow_table_empty(self): + conn = duckdb.connect() + sql = "SELECT i AS a, i::VARCHAR AS b FROM range(10) t(i) WHERE i < 0" + expected = conn.sql(sql).to_arrow_table() + actual = conn.execute(sql).to_arrow_table() + assert actual.num_rows == 0 + assert actual.schema.equals(expected.schema) + + +class TestLazyCapsuleStreaming: + """``__arrow_c_stream__`` is now a lazy streaming object.""" + + def test_fresh_capsule_matches_to_arrow_table(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + actual = pa.table(conn.sql(RICH_SQL)) + assert actual.equals(expected) + + def test_preexecuted_capsule_matches_to_arrow_table(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + rel = conn.sql(RICH_SQL) + rel.execute() # MaterializedQueryResult -> re-fed through the engine as a stream + actual = pa.table(rel) + assert actual.equals(expected) + + def test_fresh_capsule_survives_del_conn(self): + """Fresh capsule survives ``del conn``: the StreamQueryResult owns the context (#492).""" + conn = duckdb.connect() + capsule = conn.sql(RICH_SQL).__arrow_c_stream__() # noqa: F841 + del conn + gc.collect() + out = duckdb.connect().sql("SELECT * FROM capsule").to_arrow_table() + assert out.num_rows == 1000 + + def test_preexecuted_capsule_survives_del_conn(self): + """Pre-executed (materialized) capsule survives ``del conn``: the re-fed stream owns the context.""" + conn = duckdb.connect() + rel = conn.sql(RICH_SQL) + rel.execute() + capsule = rel.__arrow_c_stream__() # noqa: F841 + del rel, conn + gc.collect() + out = duckdb.connect().sql("SELECT * FROM capsule").to_arrow_table() + assert out.num_rows == 1000 + + def test_fresh_capsule_consume_then_reuse_same_connection(self): + """Streaming-object contract: consume the fresh capsule before reusing the connection.""" + conn = duckdb.connect() + sql = "SELECT i FROM range(100) t(i)" + c1 = conn.sql(sql).__arrow_c_stream__() + first = pa.RecordBatchReader._import_from_c_capsule(c1).read_all() + assert first.num_rows == 100 + # After fully consuming the first stream, the slot is free for the next. + c2 = conn.sql(sql).__arrow_c_stream__() + second = pa.RecordBatchReader._import_from_c_capsule(c2).read_all() + assert second.num_rows == 100 + + +class TestCursorRecordBatchReaderStreaming: + """Cursor reader over a stream: not materialized, survives del conn, shares the active-stream slot.""" + + def test_reader_consume_then_reuse_connection(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + tbl = reader.read_all() # consume before reusing the connection + assert tbl.num_rows == 3000 + assert tbl.column("a").to_pylist() == list(range(3000)) + # the connection is free to use again + assert conn.execute("SELECT 42").fetchone()[0] == 42 + + def test_reader_survives_del_conn(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + del conn + gc.collect() + tbl = reader.read_all() + assert tbl.num_rows == 3000 + + def test_cursor_reader_exact_batch_sizes(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + assert reader.read_next_batch().num_rows == 1024 + assert reader.read_next_batch().num_rows == 1024 + assert reader.read_next_batch().num_rows == 952 + with pytest.raises(StopIteration): + reader.read_next_batch() + + +class TestDuplicateColumnNames: + """Duplicate output column names must survive the re-feed (promotion restores them).""" + + DUP_SQL = "SELECT i AS a, i + 1 AS a, i + 2 AS a FROM range(50) t(i)" + + def test_cursor_fetch_arrow_table_duplicate_columns(self): + conn = duckdb.connect() + tbl = conn.execute(self.DUP_SQL).to_arrow_table() + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + # Re-feeding through a MaterializedRelation re-binds (which would dedup the + # names); the promotion restores the original names, so pyarrow still sees + # the duplicate 'a' columns exactly as the un-promoted result would. + assert tbl.column_names == ["a", "a", "a"] + assert tbl.column(0).to_pylist() == list(range(50)) + assert tbl.column(1).to_pylist() == list(range(1, 51)) + assert tbl.column(2).to_pylist() == list(range(2, 52)) + + def test_preexecuted_capsule_duplicate_columns(self): + conn = duckdb.connect() + rel = conn.sql(self.DUP_SQL) + rel.execute() + tbl = pa.table(rel) + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + + def test_cursor_record_batch_duplicate_columns(self): + conn = duckdb.connect() + reader = conn.execute(self.DUP_SQL).to_arrow_reader() + tbl = reader.read_all() + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + + def test_cursor_pl_duplicate_columns_dedups(self): + pl = pytest.importorskip("polars") + conn = duckdb.connect() + df = conn.execute(self.DUP_SQL).pl() + # polars requires unique column names; the dedup must still apply. + assert isinstance(df, pl.DataFrame) + assert len(set(df.columns)) == 3 + + +class TestEdgeCaseShapes: + # Note: a 0-column result is not constructible via SQL ("SELECT FROM ..." is a + # parser error), so the "0 columns" risk from CONTEXT.md is not reachable here. + + def test_single_row_single_column(self): + conn = duckdb.connect() + tbl = conn.execute("SELECT 42 AS x").to_arrow_table() + assert tbl.num_rows == 1 + assert tbl.column("x").to_pylist() == [42] + + def test_enum_categorical_roundtrip(self): + conn = duckdb.connect() + conn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')") + conn.execute("CREATE TABLE t AS SELECT 'happy'::mood AS m FROM range(100)") + expected = conn.sql("SELECT m FROM t").to_arrow_table() + actual = conn.execute("SELECT m FROM t").to_arrow_table() + assert actual.equals(expected) + # ENUM should map to a dictionary-encoded Arrow column. + assert pa.types.is_dictionary(actual.schema.field("m").type) + + def test_enum_via_cursor_stream(self): + conn = duckdb.connect() + conn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')") + conn.execute("CREATE TABLE t AS SELECT 'ok'::mood AS m FROM range(100)") + reader = conn.execute("SELECT m FROM t").to_arrow_reader() + tbl = reader.read_all() + assert tbl.num_rows == 100 + assert pa.types.is_dictionary(tbl.schema.field("m").type) + + +class TestConfigFidelity: + """Re-fed / directly-wrapped results reproduce the user's Arrow output config (e.g. TimeZone).""" + + def test_timezone_preserved_through_cursor_stream_reader(self): + conn = duckdb.connect() + conn.execute("SET TimeZone = 'America/New_York'") + reader = conn.execute("SELECT TIMESTAMPTZ '2021-06-01 12:00:00' AS ts").to_arrow_reader() + tbl = reader.read_all() + assert tbl.schema.field("ts").type.tz == "America/New_York" + + def test_timezone_preserved_through_preexec_capsule(self): + conn = duckdb.connect() + conn.execute("SET TimeZone = 'Asia/Kathmandu'") + rel = conn.sql("SELECT TIMESTAMPTZ '2021-06-01 12:00:00' AS ts") + rel.execute() + tbl = pa.table(rel) + assert tbl.schema.field("ts").type.tz == "Asia/Kathmandu" + + def test_large_buffer_size_preserved_through_cursor_stream(self): + # arrow_large_buffer_size promotes string/blob/list offsets to 64-bit. + conn = duckdb.connect() + conn.execute("SET arrow_large_buffer_size = true") + reader = conn.execute("SELECT 'hello' AS s FROM range(10)").to_arrow_reader() + tbl = reader.read_all() + assert tbl.schema.field("s").type == pa.large_string() + + +class TestLazyStreamMechanism: + """Mechanism behind the GEOMETRY-after-close win: a lazy re-fed stream is correct after del conn.""" + + def test_refed_stream_data_correct_after_del_conn(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT i, i::VARCHAR AS s FROM range(2000) t(i)") + reader = conn.execute("SELECT * FROM t ORDER BY i").to_arrow_reader(512) + del conn + gc.collect() + tbl = reader.read_all() + assert tbl.num_rows == 2000 + assert tbl.column("i").to_pylist() == list(range(2000)) + + +class TestGeometryAfterClose: + """The headline correctness win, now directly testable. + + DuckDB's built-in GEOMETRY type maps to the ``geoarrow.wkb`` Arrow extension type, + whose conversion requires a live ClientContext / type-extension registry. The lazy + capsule/reader run on a context the StreamQueryResult co-owns, so the conversion + (schema + WKB data) succeeds AFTER the originating connection is destroyed. Before + this design, the conversion ran against a dangling context (use-after-free / wrong + schema). GEOMETRY is a core type here (no spatial extension needed). + """ + + GEOM_SQL = "SELECT id, ('POINT(' || id || ' ' || id || ')')::GEOMETRY AS g FROM range(8) t(id)" + + def _expected(self): + # deterministic query -> compute the reference on an independent connection + return duckdb.connect().sql(self.GEOM_SQL).to_arrow_table() + + @staticmethod + def _assert_geoarrow(tbl) -> None: + field = tbl.schema.field("g") + assert field.metadata is not None + assert field.metadata[b"ARROW:extension:name"] == b"geoarrow.wkb" + + def test_geometry_to_arrow_is_geoarrow_wkb(self): + # sanity: the build really has built-in GEOMETRY mapping to geoarrow.wkb + tbl = self._expected() + self._assert_geoarrow(tbl) + assert tbl.num_rows == 8 + + def test_fresh_capsule_geometry_after_del_conn(self): + expected = self._expected() + conn = duckdb.connect() + capsule = conn.sql(self.GEOM_SQL).__arrow_c_stream__() + del conn + gc.collect() + actual = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_preexecuted_capsule_geometry_after_del_conn(self): + # MaterializedQueryResult -> re-fed as a stream; conversion runs after del conn + expected = self._expected() + conn = duckdb.connect() + rel = conn.sql(self.GEOM_SQL) + rel.execute() + capsule = rel.__arrow_c_stream__() + del rel, conn + gc.collect() + actual = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_cursor_reader_geometry_after_del_conn(self): + # con.execute() stream wrapped directly; conversion runs after del conn + expected = self._expected() + conn = duckdb.connect() + reader = conn.execute(self.GEOM_SQL).to_arrow_reader() + del conn + gc.collect() + actual = reader.read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_preexecuted_to_arrow_table_geometry(self): + # eager parallel re-feed (SelectStatement + PhysicalArrowCollector) of geometry + expected = self._expected() + conn = duckdb.connect() + rel = conn.sql(self.GEOM_SQL) + rel.execute() + actual = rel.to_arrow_table() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist()