From 426f6ccec4ad43c9f4aa74998302f2c85e2d8b18 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Fri, 12 Jun 2026 23:21:53 +0200 Subject: [PATCH 01/11] Pull materialized CDCs through the engine again for arrow conversion with a live connection / transaction --- .../include/duckdb_python/pyresult.hpp | 9 + src/duckdb_py/pyrelation.cpp | 25 +- src/duckdb_py/pyresult.cpp | 443 ++++++------------ tests/fast/arrow/test_arrow_refeed.py | 356 ++++++++++++++ 4 files changed, 527 insertions(+), 306 deletions(-) create mode 100644 tests/fast/arrow/test_arrow_refeed.py diff --git a/src/duckdb_py/include/duckdb_python/pyresult.hpp b/src/duckdb_py/include/duckdb_python/pyresult.hpp index 941a203b..ace4d42d 100644 --- a/src/duckdb_py/include/duckdb_python/pyresult.hpp +++ b/src/duckdb_py/include/duckdb_python/pyresult.hpp @@ -71,6 +71,15 @@ struct DuckDBPyResult { unique_ptr FetchNextRaw(QueryResult &result); unique_ptr InitializeNumpyConversion(bool pandas = false); + //! Re-feed an already-MATERIALIZED result (a ColumnDataCollection, e.g. from + //! rel.execute()) back through the engine on the user's own context. The eager + //! variant installs a PhysicalArrowCollector to produce an ArrowQueryResult + //! (parallel); the stream variant produces a lazy StreamQueryResult that co-owns + //! the context (so it survives `del conn`). Never call these on a StreamQueryResult: + //! a lazy result already has a live context and is converted/wrapped directly. + void PromoteMaterializedToArrow(idx_t batch_size); + void PromoteMaterializedToStream(); + private: idx_t chunk_offset = 0; diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index 51a44f87..96a9dc0a 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -801,6 +801,8 @@ unique_ptr DuckDBPyRelation::Distinct() { duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::FetchRecordBatchReader(idx_t rows_per_batch) { AssertResult(); + // FetchRecordBatchReader routes by the stored result's type: a con.execute() stream is + // wrapped directly; a pre-executed relation's MaterializedQueryResult is re-fed. return result->FetchRecordBatchReader(rows_per_batch); } @@ -991,20 +993,15 @@ py::object DuckDBPyRelation::ToArrowCapsule(const py::object &requested_schema) if (!rel) { return py::none(); } - // The PyCapsule protocol doesn't allow custom parameters, so we use the same - // default batch size as fetch_arrow_table / fetch_record_batch. - idx_t batch_size = 1000000; - auto &config = ClientConfig::GetConfig(*rel->context->GetContext()); - ScopedConfigSetting scoped_setting( - config, - [&batch_size](ClientConfig &config) { - config.get_result_collector = [&batch_size](ClientContext &context, PreparedStatementData &data) { - return PhysicalArrowCollector::Create(context, data, batch_size); - }; - }, - [](ClientConfig &config) { config.get_result_collector = nullptr; }); - ExecuteOrThrow(); + // Fresh relation: execute lazily as a stream on the user's own context. The + // resulting StreamQueryResult owns that context, so the capsule survives a + // later `del conn` (natively fixing #492); it shares the connection's single + // active-stream slot, so it must be consumed before reusing the connection. + ExecuteOrThrow(true); } + // FetchArrowCapsule routes by the result's type: a StreamQueryResult (fresh or a + // con.execute() cursor) is wrapped directly; a pre-executed relation's + // MaterializedQueryResult is re-fed through the engine as a stream. AssertResultOpen(); auto res = result->FetchArrowCapsule(); result = nullptr; @@ -1049,8 +1046,10 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_s if (!rel) { return py::none(); } + // Fresh relation: stream lazily on the user's own context (survives `del conn`). ExecuteOrThrow(true); } + // FetchRecordBatchReader routes by type (stream wrapped directly, materialized re-fed). AssertResultOpen(); auto res = result->FetchRecordBatchReader(batch_size); result = nullptr; diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index d67ba420..0446f282 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -20,8 +20,16 @@ #include "duckdb/common/exception.hpp" #include "duckdb/common/enums/stream_execution_result.hpp" #include "duckdb_python/arrow/arrow_export_utils.hpp" -#include "duckdb/main/chunk_scan_state/query_result.hpp" #include "duckdb/common/arrow/arrow_query_result.hpp" +#include "duckdb/common/arrow/physical_arrow_collector.hpp" +#include "duckdb/main/chunk_scan_state/query_result.hpp" +#include "duckdb/main/client_config.hpp" +#include "duckdb/main/materialized_query_result.hpp" +#include "duckdb/main/stream_query_result.hpp" +#include "duckdb/parser/expression/star_expression.hpp" +#include "duckdb/parser/query_node/select_node.hpp" +#include "duckdb/parser/statement/select_statement.hpp" +#include "duckdb/parser/tableref/column_data_ref.hpp" using namespace pybind11::literals; @@ -419,20 +427,130 @@ py::dict DuckDBPyResult::FetchTF() { return result_dict; } +// Build a `SELECT * FROM ` statement that scans `collection` in place, to be +// executed directly via ClientContext::PendingQuery(statement, ...). +// +// This deliberately avoids ClientContext::PendingQuery(relation, ...), whose +// RelationStatement constructor eagerly calls relation->GetQuery() and stringifies the +// ENTIRE ColumnDataCollection (formatting every value - an O(rows) pathology that costs +// >8s / 10M rows). The query string is never used for binding (Binder::Bind just calls +// relation->Bind), so executing the SelectStatement skips it entirely while keeping the +// zero-copy, parallel ColumnDataScan. The ColumnDataRef owns the collection (and the bound +// LogicalColumnDataGet takes it over), so the data stays alive for the result - important +// for the lazy StreamQueryResult path. +static unique_ptr MakeColumnDataScanStatement(unique_ptr collection, + const vector &names) { + // The binder rejects a ColumnDataRef with duplicate column names, so de-duplicate + // them (a, a -> a, a_1); callers restore the original output names on the result. + auto deduplicated_names = names; + QueryResult::DeduplicateColumns(deduplicated_names); + auto table_ref = make_uniq(std::move(collection), std::move(deduplicated_names)); + // Binding requires a set alias (BindingAlias::GetAlias asserts otherwise). + table_ref->alias = "materialized"; + auto select_node = make_uniq(); + select_node->select_list.push_back(make_uniq()); + select_node->from_table = std::move(table_ref); + auto select = make_uniq(); + select->node = std::move(select_node); + return select; +} + +void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) { + // Only an already-materialized result (a ColumnDataCollection in memory) is pulled + // back through the engine. A lazy StreamQueryResult is converted directly instead - + // see FetchArrowTable - so we never materialize a lazy result just to re-feed it. + D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); + auto client_context = result->client_properties.client_context; + if (!client_context) { + throw InternalException("Cannot promote result to Arrow: the originating client context is gone"); + } + // Re-run the in-memory result through the engine with a PhysicalArrowCollector on + // the user's own context. The ColumnDataScan over the collection is parallel, so the + // Arrow conversion runs in parallel, just like the fresh relational to_arrow_table + // path. Running on the user's context means the result carries the user's Arrow + // output settings automatically (no need to replicate them). + auto context = client_context->shared_from_this(); + auto &materialized = result->Cast(); + auto names = result->names; + auto select = MakeColumnDataScanStatement(materialized.TakeCollection(), names); + + auto &config = ClientConfig::GetConfig(*context); + ScopedConfigSetting scoped_setting( + config, + [batch_size](ClientConfig &config) { + config.get_result_collector = [batch_size](ClientContext &context, PreparedStatementData &data) { + return PhysicalArrowCollector::Create(context, data, batch_size); + }; + }, + [](ClientConfig &config) { config.get_result_collector = nullptr; }); + + unique_ptr new_result; + { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + auto pending_query = context->PendingQuery(std::move(select), QueryParameters(false)); + new_result = DuckDBPyConnection::CompletePendingQuery(*pending_query); + } + if (new_result->HasError()) { + new_result->ThrowError(); + } + // Re-binding the ColumnDataRef de-duplicates duplicate output column names + // (a, a -> a, a_1). Restore the originals so the Arrow output is unchanged. + new_result->names = std::move(names); + result = std::move(new_result); +} + +void DuckDBPyResult::PromoteMaterializedToStream() { + // As above, only an already-materialized result is re-fed; a lazy StreamQueryResult + // is wrapped directly (it already has a live context). + D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); + auto client_context = result->client_properties.client_context; + if (!client_context) { + throw InternalException("Cannot promote result to an Arrow stream: the originating client context is gone"); + } + // Re-run the materialized result as a lazy stream on the user's own context. The + // resulting StreamQueryResult co-owns that ClientContext (shared_ptr), so the stream + // - and the Arrow conversion it drives - survives `del conn` and runs under a live + // transaction (the geometry/extension correctness win, native #492 fix). The + // ColumnDataRef owns the collection, so the data outlives the temporary relation. + auto context = client_context->shared_from_this(); + auto &materialized = result->Cast(); + auto names = result->names; + auto select = MakeColumnDataScanStatement(materialized.TakeCollection(), names); + + unique_ptr new_result; + { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + auto pending_query = context->PendingQuery(std::move(select), QueryParameters(true)); + new_result = DuckDBPyConnection::CompletePendingQuery(*pending_query); + } + if (new_result->HasError()) { + new_result->ThrowError(); + } + new_result->names = std::move(names); + result = std::move(new_result); +} + duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, bool to_polars) { if (!result) { throw InvalidInputException("There is no query result"); } + // Route by result type: + // - ARROW_RESULT: fresh to_arrow_table already ran a PhysicalArrowCollector. + // - MATERIALIZED_RESULT (rel.execute()): a CDC in memory - re-feed it through a + // PhysicalArrowCollector for parallel conversion (-> ARROW_RESULT below). + // - STREAM_RESULT (con.execute()): a lazy result with a live context - convert it + // directly, batch by batch. We never materialize a lazy result just to re-feed it. + if (result->type == QueryResultType::MATERIALIZED_RESULT) { + PromoteMaterializedToArrow(rows_per_batch); + } + auto names = result->names; if (to_polars) { QueryResult::DeduplicateColumns(names); } - if (!result) { - throw InvalidInputException("result closed"); - } - auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); - py::list batches; if (result->type == QueryResultType::ARROW_RESULT) { auto &arrow_result = result->Cast(); @@ -450,28 +568,27 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo TransformDuckToArrowChunk(arrow_schema, data, batches); } } else { - QueryResultChunkScanState scan_state(*result.get()); + // STREAM_RESULT: pull the live stream directly into Arrow batches. + QueryResultChunkScanState scan_state(*result); while (true) { ArrowArray data; idx_t count; - auto &query_result = *result.get(); { D_ASSERT(py::gil_check()); py::gil_scoped_release release; - count = ArrowUtil::FetchChunk(scan_state, query_result.client_properties, rows_per_batch, &data, + count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data, ArrowTypeExtensionData::GetExtensionTypes( - *query_result.client_properties.client_context, query_result.types)); + *result->client_properties.client_context, result->types)); } if (count == 0) { break; } ArrowSchema arrow_schema; - auto result_names = query_result.names; + auto result_names = result->names; if (to_polars) { QueryResult::DeduplicateColumns(result_names); } - ArrowConverter::ToArrowSchema(&arrow_schema, query_result.types, result_names, - query_result.client_properties); + ArrowConverter::ToArrowSchema(&arrow_schema, result->types, result_names, result->client_properties); TransformDuckToArrowChunk(arrow_schema, data, batches); } } @@ -483,6 +600,15 @@ ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } + // The lazy Arrow surfaces (capsule, reader) need a context-owning StreamQueryResult. + // A pre-executed relation hands us a MaterializedQueryResult (a CDC) - re-feed it as a + // stream so the conversion gets a live, owned context (survives `del conn`, runs under + // a live txn). A StreamQueryResult (con.execute(), or a fresh streaming relation) + // already has a live context, so we wrap it directly - never materializing a lazy + // result. + if (result->type == QueryResultType::MATERIALIZED_RESULT) { + PromoteMaterializedToStream(); + } ResultArrowArrayStreamWrapper *result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); // The 'result_stream' is part of the 'private_data' of the ArrowArrayStream and its lifetime is bound to that of // the ArrowArrayStream. @@ -501,273 +627,6 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t return py::cast(record_batch_reader); } -// Holds owned copies of the string data for a deep-copied ArrowSchema node. -struct ArrowSchemaCopyData { - string format; - string name; - string metadata; -}; - -static void ReleaseCopiedArrowSchema(ArrowSchema *schema) { - if (!schema || !schema->release) { - return; - } - for (int64_t i = 0; i < schema->n_children; i++) { - if (schema->children[i]->release) { - schema->children[i]->release(schema->children[i]); - } - delete schema->children[i]; - } - delete[] schema->children; - if (schema->dictionary) { - if (schema->dictionary->release) { - schema->dictionary->release(schema->dictionary); - } - delete schema->dictionary; - } - delete reinterpret_cast(schema->private_data); - schema->release = nullptr; -} - -static idx_t ArrowMetadataSize(const char *metadata) { - if (!metadata) { - return 0; - } - // Arrow metadata format: int32 num_entries, then for each entry: - // int32 key_len, key_bytes, int32 value_len, value_bytes - auto ptr = metadata; - int32_t num_entries; - memcpy(&num_entries, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t); - for (int32_t i = 0; i < num_entries; i++) { - int32_t len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - memcpy(&len, ptr, sizeof(int32_t)); - ptr += sizeof(int32_t) + len; - } - return ptr - metadata; -} - -// Deep-copy an ArrowSchema. The Arrow C Data Interface specifies that get_schema -// transfers ownership to the caller, so each call must produce an independent copy. -// Each node owns its string data via an ArrowSchemaCopyData in private_data. -static int ArrowSchemaDeepCopy(const ArrowSchema &source, ArrowSchema *out, string &error) { - out->release = nullptr; - try { - auto data = new ArrowSchemaCopyData(); - data->format = source.format ? source.format : ""; - data->name = source.name ? source.name : ""; - if (source.metadata) { - auto metadata_size = ArrowMetadataSize(source.metadata); - data->metadata.assign(source.metadata, metadata_size); - } - - out->format = data->format.c_str(); - out->name = data->name.c_str(); - out->metadata = source.metadata ? data->metadata.data() : nullptr; - out->flags = source.flags; - out->n_children = source.n_children; - out->dictionary = nullptr; - out->private_data = data; - out->release = ReleaseCopiedArrowSchema; - - if (source.n_children > 0) { - out->children = new ArrowSchema *[source.n_children]; - for (int64_t i = 0; i < source.n_children; i++) { - out->children[i] = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.children[i], out->children[i], error); - if (rc != 0) { - for (int64_t j = 0; j <= i; j++) { - if (out->children[j]->release) { - out->children[j]->release(out->children[j]); - } - delete out->children[j]; - } - delete[] out->children; - out->children = nullptr; - out->n_children = 0; - // Release the partially constructed node - delete data; - out->private_data = nullptr; - out->release = nullptr; - return rc; - } - } - } else { - out->children = nullptr; - } - - if (source.dictionary) { - out->dictionary = new ArrowSchema(); - auto rc = ArrowSchemaDeepCopy(*source.dictionary, out->dictionary, error); - if (rc != 0) { - delete out->dictionary; - out->dictionary = nullptr; - return rc; - } - } - } catch (std::exception &e) { - error = e.what(); - return -1; - } - return 0; -} - -// Wraps pre-built Arrow arrays from an ArrowQueryResult into an ArrowArrayStream. -// This avoids the double-materialization that happens when using ResultArrowArrayStreamWrapper -// with an ArrowQueryResult (which throws NotImplementedException from FetchInternal). -// -// The schema is cached eagerly in the constructor (while the ClientContext is still alive) -// so that get_schema can be called after the originating connection has been destroyed. -// ToArrowSchema needs a live ClientContext for transaction access and catalog lookups -// (e.g. CRS conversion for GEOMETRY types). -struct ArrowQueryResultStreamWrapper { - ArrowQueryResultStreamWrapper(unique_ptr result_p) : result(std::move(result_p)), index(0) { - auto &arrow_result = result->Cast(); - arrays = arrow_result.ConsumeArrays(); - - cached_schema.release = nullptr; - ArrowConverter::ToArrowSchema(&cached_schema, result->types, result->names, result->client_properties); - - stream.private_data = this; - stream.get_schema = GetSchema; - stream.get_next = GetNext; - stream.release = Release; - stream.get_last_error = GetLastError; - } - - ~ArrowQueryResultStreamWrapper() { - if (cached_schema.release) { - cached_schema.release(&cached_schema); - } - } - - static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - return ArrowSchemaDeepCopy(self->cached_schema, out, self->last_error); - } - - static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - if (self->index >= self->arrays.size()) { - out->release = nullptr; - return 0; - } - *out = self->arrays[self->index]->arrow_array; - self->arrays[self->index]->arrow_array.release = nullptr; - self->index++; - return 0; - } - - static void Release(ArrowArrayStream *stream) { - if (!stream || !stream->release) { - return; - } - stream->release = nullptr; - delete reinterpret_cast(stream->private_data); - } - - static const char *GetLastError(ArrowArrayStream *stream) { - if (!stream->release) { - return "stream was released"; - } - auto self = reinterpret_cast(stream->private_data); - return self->last_error.c_str(); - } - - ArrowArrayStream stream; - unique_ptr result; - vector> arrays; - ArrowSchema cached_schema; - idx_t index; - string last_error; -}; - -// Wraps an ArrowArrayStream and caches its schema eagerly. -// Used for the slow path (MaterializedQueryResult / StreamQueryResult) where the -// inner stream is a ResultArrowArrayStreamWrapper from DuckDB core. That wrapper's -// get_schema calls ToArrowSchema which needs a live ClientContext, so we fetch it -// once at construction time and return copies from cache afterwards. -struct SchemaCachingStreamWrapper { - SchemaCachingStreamWrapper(ArrowArrayStream inner_p) : inner(inner_p) { - inner_p.release = nullptr; - - cached_schema.release = nullptr; - if (inner.get_schema(&inner, &cached_schema)) { - schema_error = inner.get_last_error(&inner); - schema_ok = false; - } else { - schema_ok = true; - } - - stream.private_data = this; - stream.get_schema = GetSchema; - stream.get_next = GetNext; - stream.release = Release; - stream.get_last_error = GetLastError; - } - - ~SchemaCachingStreamWrapper() { - if (cached_schema.release) { - cached_schema.release(&cached_schema); - } - if (inner.release) { - inner.release(&inner); - } - } - - static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - if (!self->schema_ok) { - return -1; - } - return ArrowSchemaDeepCopy(self->cached_schema, out, self->schema_error); - } - - static int GetNext(ArrowArrayStream *stream, ArrowArray *out) { - if (!stream->release) { - return -1; - } - auto self = reinterpret_cast(stream->private_data); - return self->inner.get_next(&self->inner, out); - } - - static void Release(ArrowArrayStream *stream) { - if (!stream || !stream->release) { - return; - } - stream->release = nullptr; - delete reinterpret_cast(stream->private_data); - } - - static const char *GetLastError(ArrowArrayStream *stream) { - if (!stream->release) { - return "stream was released"; - } - auto self = reinterpret_cast(stream->private_data); - if (!self->schema_error.empty()) { - return self->schema_error.c_str(); - } - return self->inner.get_last_error(&self->inner); - } - - ArrowArrayStream stream; - ArrowArrayStream inner; - ArrowSchema cached_schema; - bool schema_ok; - string schema_error; -}; - static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) { auto data = PyCapsule_GetPointer(object, "arrow_array_stream"); if (!data) { @@ -781,21 +640,19 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) { } py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) { - if (result && result->type == QueryResultType::ARROW_RESULT) { - // Fast path: yield pre-built Arrow arrays directly. - auto wrapper = new ArrowQueryResultStreamWrapper(std::move(result)); - auto stream = new ArrowArrayStream(); - *stream = wrapper->stream; - wrapper->stream.release = nullptr; - return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor); - } - // Slow path: wrap in SchemaCachingStreamWrapper so the schema is fetched - // eagerly while the ClientContext is still alive. + if (!result) { + throw InvalidInputException("There is no query result"); + } + // The capsule is a lazy streaming object backed by a StreamQueryResult, which + // owns its own ClientContext. The caller (DuckDBPyRelation) ensures `result` is + // a stream: a fresh relation streams on the user's connection; an already- + // executed result is re-fed via PromoteToDedicatedStream(). Because the stream + // owns its context, core's ResultArrowArrayStreamWrapper can lazily compute the + // schema and convert (incl. extension/geometry types under a live transaction) + // after the originating connection is gone - no eager schema caching needed. auto inner_stream = FetchArrowArrayStream(rows_per_batch); - auto wrapper = new SchemaCachingStreamWrapper(inner_stream); auto stream = new ArrowArrayStream(); - *stream = wrapper->stream; - wrapper->stream.release = nullptr; + *stream = inner_stream; return py::capsule(stream, "arrow_array_stream", ArrowArrayStreamPyCapsuleDestructor); } diff --git a/tests/fast/arrow/test_arrow_refeed.py b/tests/fast/arrow/test_arrow_refeed.py new file mode 100644 index 00000000..6524ec91 --- /dev/null +++ b/tests/fast/arrow/test_arrow_refeed.py @@ -0,0 +1,356 @@ +"""Tests for the MaterializedRelation re-feed Arrow design. + +The redesign promotes an already-executed result back into a relation and re-feeds +it through the engine, so there is a single uniform Arrow path. + +Routing is by *result type*, and a lazy result is never materialized: + +* ``MaterializedQueryResult`` (from ``rel.execute()``) — a ColumnDataCollection in + memory — is the only result re-fed back through the engine: ``to_arrow_table`` / + ``pl`` run it through a ``PhysicalArrowCollector`` (eager + parallel); the lazy + surfaces re-run it as a ``StreamQueryResult``. Both run on the user's own context, + which the produced stream co-owns (so it survives ``del conn``). +* ``StreamQueryResult`` (from ``con.execute()``, or a fresh streaming relation) + already has a live context, so it is converted directly: ``to_arrow_table`` pulls + the stream serially; ``__arrow_c_stream__`` / ``to_arrow_reader`` wrap it directly + in core's ``ResultArrowArrayStreamWrapper`` — never copied. Such a reader/capsule + shares the connection's single active-stream slot (consume before reusing the + connection) and survives ``del conn``. + +The headline correctness win — GEOMETRY conversion after the originating connection +closes — is exercised directly (see ``TestGeometryAfterClose``): DuckDB's built-in +``GEOMETRY`` type maps to the ``geoarrow.wkb`` Arrow extension type, whose conversion +needs a live ``ClientContext`` / type-extension registry. The lazy paths run on a +context the ``StreamQueryResult`` co-owns, so the conversion succeeds even after the +originating connection is destroyed. +""" + +import gc + +import pytest + +import duckdb + +pa = pytest.importorskip("pyarrow") + + +# A spread of types that exercise the re-feed: ints, floats, strings, bools, +# nullables, nested struct/list/map, and an enum-backed categorical. +RICH_SQL = """ + SELECT + i AS int_col, + i::DOUBLE AS double_col, + 'row_' || i::VARCHAR AS str_col, + i % 2 = 0 AS bool_col, + CASE WHEN i % 3 = 0 THEN NULL ELSE i END AS nullable_col, + {'x': i, 'y': i::VARCHAR} AS struct_col, + [i, i + 1, i + 2] AS list_col, + MAP {i::VARCHAR: i * 10} AS map_col, + FROM range(1000) t(i) +""" + + +class TestEagerToArrowTablePromotion: + """Eager + parallel promotion of already-executed results matches the fresh path.""" + + def test_cursor_fetch_arrow_table_matches_fresh(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + # con.execute(...) materializes; fetch goes through the promotion path. + actual = conn.execute(RICH_SQL).to_arrow_table() + assert actual.equals(expected) + + def test_preexecuted_relation_to_arrow_table_matches_fresh(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + rel = conn.sql(RICH_SQL) + rel.execute() # forces a MaterializedQueryResult + actual = rel.to_arrow_table() + assert actual.equals(expected) + + def test_cursor_pl_matches_fresh(self): + pl = pytest.importorskip("polars") + conn = duckdb.connect() + sql = "SELECT i AS a, i::VARCHAR AS b FROM range(500) t(i)" + expected = conn.sql(sql).pl() + actual = conn.execute(sql).pl() + assert expected.equals(actual) + assert isinstance(actual, pl.DataFrame) + + def test_cursor_fetch_arrow_table_empty(self): + conn = duckdb.connect() + sql = "SELECT i AS a, i::VARCHAR AS b FROM range(10) t(i) WHERE i < 0" + expected = conn.sql(sql).to_arrow_table() + actual = conn.execute(sql).to_arrow_table() + assert actual.num_rows == 0 + assert actual.schema.equals(expected.schema) + + +class TestLazyCapsuleStreaming: + """``__arrow_c_stream__`` is now a lazy streaming object.""" + + def test_fresh_capsule_matches_to_arrow_table(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + actual = pa.table(conn.sql(RICH_SQL)) + assert actual.equals(expected) + + def test_preexecuted_capsule_matches_to_arrow_table(self): + conn = duckdb.connect() + expected = conn.sql(RICH_SQL).to_arrow_table() + rel = conn.sql(RICH_SQL) + rel.execute() # MaterializedQueryResult -> re-fed through the engine as a stream + actual = pa.table(rel) + assert actual.equals(expected) + + def test_fresh_capsule_survives_del_conn(self): + """Fresh capsule survives ``del conn``: the StreamQueryResult owns the context (#492).""" + conn = duckdb.connect() + capsule = conn.sql(RICH_SQL).__arrow_c_stream__() # noqa: F841 + del conn + gc.collect() + out = duckdb.connect().sql("SELECT * FROM capsule").to_arrow_table() + assert out.num_rows == 1000 + + def test_preexecuted_capsule_survives_del_conn(self): + """Pre-executed (materialized) capsule survives ``del conn``: the re-fed stream owns the context.""" + conn = duckdb.connect() + rel = conn.sql(RICH_SQL) + rel.execute() + capsule = rel.__arrow_c_stream__() # noqa: F841 + del rel, conn + gc.collect() + out = duckdb.connect().sql("SELECT * FROM capsule").to_arrow_table() + assert out.num_rows == 1000 + + def test_fresh_capsule_consume_then_reuse_same_connection(self): + """Streaming-object contract: consume the fresh capsule before reusing the connection.""" + conn = duckdb.connect() + sql = "SELECT i FROM range(100) t(i)" + c1 = conn.sql(sql).__arrow_c_stream__() + first = pa.RecordBatchReader._import_from_c_capsule(c1).read_all() + assert first.num_rows == 100 + # After fully consuming the first stream, the slot is free for the next. + c2 = conn.sql(sql).__arrow_c_stream__() + second = pa.RecordBatchReader._import_from_c_capsule(c2).read_all() + assert second.num_rows == 100 + + +class TestCursorRecordBatchReaderStreaming: + """Cursor reader over a stream: not materialized, survives del conn, shares the active-stream slot.""" + + def test_reader_consume_then_reuse_connection(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + tbl = reader.read_all() # consume before reusing the connection + assert tbl.num_rows == 3000 + assert tbl.column("a").to_pylist() == list(range(3000)) + # the connection is free to use again + assert conn.execute("SELECT 42").fetchone()[0] == 42 + + def test_reader_survives_del_conn(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + del conn + gc.collect() + tbl = reader.read_all() + assert tbl.num_rows == 3000 + + def test_cursor_reader_exact_batch_sizes(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT range AS a FROM range(3000)") + reader = conn.execute("SELECT a FROM t").to_arrow_reader(1024) + assert reader.read_next_batch().num_rows == 1024 + assert reader.read_next_batch().num_rows == 1024 + assert reader.read_next_batch().num_rows == 952 + with pytest.raises(StopIteration): + reader.read_next_batch() + + +class TestDuplicateColumnNames: + """Duplicate output column names must survive the re-feed (promotion restores them).""" + + DUP_SQL = "SELECT i AS a, i + 1 AS a, i + 2 AS a FROM range(50) t(i)" + + def test_cursor_fetch_arrow_table_duplicate_columns(self): + conn = duckdb.connect() + tbl = conn.execute(self.DUP_SQL).to_arrow_table() + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + # Re-feeding through a MaterializedRelation re-binds (which would dedup the + # names); the promotion restores the original names, so pyarrow still sees + # the duplicate 'a' columns exactly as the un-promoted result would. + assert tbl.column_names == ["a", "a", "a"] + assert tbl.column(0).to_pylist() == list(range(50)) + assert tbl.column(1).to_pylist() == list(range(1, 51)) + assert tbl.column(2).to_pylist() == list(range(2, 52)) + + def test_preexecuted_capsule_duplicate_columns(self): + conn = duckdb.connect() + rel = conn.sql(self.DUP_SQL) + rel.execute() + tbl = pa.table(rel) + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + + def test_cursor_record_batch_duplicate_columns(self): + conn = duckdb.connect() + reader = conn.execute(self.DUP_SQL).to_arrow_reader() + tbl = reader.read_all() + assert tbl.num_rows == 50 + assert tbl.num_columns == 3 + + def test_cursor_pl_duplicate_columns_dedups(self): + pl = pytest.importorskip("polars") + conn = duckdb.connect() + df = conn.execute(self.DUP_SQL).pl() + # polars requires unique column names; the dedup must still apply. + assert isinstance(df, pl.DataFrame) + assert len(set(df.columns)) == 3 + + +class TestEdgeCaseShapes: + # Note: a 0-column result is not constructible via SQL ("SELECT FROM ..." is a + # parser error), so the "0 columns" risk from CONTEXT.md is not reachable here. + + def test_single_row_single_column(self): + conn = duckdb.connect() + tbl = conn.execute("SELECT 42 AS x").to_arrow_table() + assert tbl.num_rows == 1 + assert tbl.column("x").to_pylist() == [42] + + def test_enum_categorical_roundtrip(self): + conn = duckdb.connect() + conn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')") + conn.execute("CREATE TABLE t AS SELECT 'happy'::mood AS m FROM range(100)") + expected = conn.sql("SELECT m FROM t").to_arrow_table() + actual = conn.execute("SELECT m FROM t").to_arrow_table() + assert actual.equals(expected) + # ENUM should map to a dictionary-encoded Arrow column. + assert pa.types.is_dictionary(actual.schema.field("m").type) + + def test_enum_via_cursor_stream(self): + conn = duckdb.connect() + conn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')") + conn.execute("CREATE TABLE t AS SELECT 'ok'::mood AS m FROM range(100)") + reader = conn.execute("SELECT m FROM t").to_arrow_reader() + tbl = reader.read_all() + assert tbl.num_rows == 100 + assert pa.types.is_dictionary(tbl.schema.field("m").type) + + +class TestConfigFidelity: + """Re-fed / directly-wrapped results reproduce the user's Arrow output config (e.g. TimeZone).""" + + def test_timezone_preserved_through_cursor_stream_reader(self): + conn = duckdb.connect() + conn.execute("SET TimeZone = 'America/New_York'") + reader = conn.execute("SELECT TIMESTAMPTZ '2021-06-01 12:00:00' AS ts").to_arrow_reader() + tbl = reader.read_all() + assert tbl.schema.field("ts").type.tz == "America/New_York" + + def test_timezone_preserved_through_preexec_capsule(self): + conn = duckdb.connect() + conn.execute("SET TimeZone = 'Asia/Kathmandu'") + rel = conn.sql("SELECT TIMESTAMPTZ '2021-06-01 12:00:00' AS ts") + rel.execute() + tbl = pa.table(rel) + assert tbl.schema.field("ts").type.tz == "Asia/Kathmandu" + + def test_large_buffer_size_preserved_through_cursor_stream(self): + # arrow_large_buffer_size promotes string/blob/list offsets to 64-bit. + conn = duckdb.connect() + conn.execute("SET arrow_large_buffer_size = true") + reader = conn.execute("SELECT 'hello' AS s FROM range(10)").to_arrow_reader() + tbl = reader.read_all() + assert tbl.schema.field("s").type == pa.large_string() + + +class TestLazyStreamMechanism: + """Mechanism behind the GEOMETRY-after-close win: a lazy re-fed stream is correct after del conn.""" + + def test_refed_stream_data_correct_after_del_conn(self): + conn = duckdb.connect() + conn.execute("CREATE TABLE t AS SELECT i, i::VARCHAR AS s FROM range(2000) t(i)") + reader = conn.execute("SELECT * FROM t ORDER BY i").to_arrow_reader(512) + del conn + gc.collect() + tbl = reader.read_all() + assert tbl.num_rows == 2000 + assert tbl.column("i").to_pylist() == list(range(2000)) + + +class TestGeometryAfterClose: + """The headline correctness win, now directly testable. + + DuckDB's built-in GEOMETRY type maps to the ``geoarrow.wkb`` Arrow extension type, + whose conversion requires a live ClientContext / type-extension registry. The lazy + capsule/reader run on a context the StreamQueryResult co-owns, so the conversion + (schema + WKB data) succeeds AFTER the originating connection is destroyed. Before + this design, the conversion ran against a dangling context (use-after-free / wrong + schema). GEOMETRY is a core type here (no spatial extension needed). + """ + + GEOM_SQL = "SELECT id, ('POINT(' || id || ' ' || id || ')')::GEOMETRY AS g FROM range(8) t(id)" + + def _expected(self): + # deterministic query -> compute the reference on an independent connection + return duckdb.connect().sql(self.GEOM_SQL).to_arrow_table() + + @staticmethod + def _assert_geoarrow(tbl) -> None: + field = tbl.schema.field("g") + assert field.metadata is not None + assert field.metadata[b"ARROW:extension:name"] == b"geoarrow.wkb" + + def test_geometry_to_arrow_is_geoarrow_wkb(self): + # sanity: the build really has built-in GEOMETRY mapping to geoarrow.wkb + tbl = self._expected() + self._assert_geoarrow(tbl) + assert tbl.num_rows == 8 + + def test_fresh_capsule_geometry_after_del_conn(self): + expected = self._expected() + conn = duckdb.connect() + capsule = conn.sql(self.GEOM_SQL).__arrow_c_stream__() + del conn + gc.collect() + actual = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_preexecuted_capsule_geometry_after_del_conn(self): + # MaterializedQueryResult -> re-fed as a stream; conversion runs after del conn + expected = self._expected() + conn = duckdb.connect() + rel = conn.sql(self.GEOM_SQL) + rel.execute() + capsule = rel.__arrow_c_stream__() + del rel, conn + gc.collect() + actual = pa.RecordBatchReader._import_from_c_capsule(capsule).read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_cursor_reader_geometry_after_del_conn(self): + # con.execute() stream wrapped directly; conversion runs after del conn + expected = self._expected() + conn = duckdb.connect() + reader = conn.execute(self.GEOM_SQL).to_arrow_reader() + del conn + gc.collect() + actual = reader.read_all() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() + + def test_preexecuted_to_arrow_table_geometry(self): + # eager parallel re-feed (SelectStatement + PhysicalArrowCollector) of geometry + expected = self._expected() + conn = duckdb.connect() + rel = conn.sql(self.GEOM_SQL) + rel.execute() + actual = rel.to_arrow_table() + self._assert_geoarrow(actual) + assert actual.column("g").to_pylist() == expected.column("g").to_pylist() From 5fd7f6953c9dda38d294862e5e297592a74fa289 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Fri, 12 Jun 2026 23:41:15 +0200 Subject: [PATCH 02/11] strip comments --- src/duckdb_py/pyrelation.cpp | 12 +----- src/duckdb_py/pyresult.cpp | 72 +++++++++--------------------------- 2 files changed, 20 insertions(+), 64 deletions(-) diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index 96a9dc0a..ff3a7d86 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -801,8 +801,6 @@ unique_ptr DuckDBPyRelation::Distinct() { duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::FetchRecordBatchReader(idx_t rows_per_batch) { AssertResult(); - // FetchRecordBatchReader routes by the stored result's type: a con.execute() stream is - // wrapped directly; a pre-executed relation's MaterializedQueryResult is re-fed. return result->FetchRecordBatchReader(rows_per_batch); } @@ -993,15 +991,10 @@ py::object DuckDBPyRelation::ToArrowCapsule(const py::object &requested_schema) if (!rel) { return py::none(); } - // Fresh relation: execute lazily as a stream on the user's own context. The - // resulting StreamQueryResult owns that context, so the capsule survives a - // later `del conn` (natively fixing #492); it shares the connection's single - // active-stream slot, so it must be consumed before reusing the connection. + // Fresh relation: stream lazily on the user's context (capsule survives `del conn`, + // but shares the single active-stream slot - consume before reusing the connection). ExecuteOrThrow(true); } - // FetchArrowCapsule routes by the result's type: a StreamQueryResult (fresh or a - // con.execute() cursor) is wrapped directly; a pre-executed relation's - // MaterializedQueryResult is re-fed through the engine as a stream. AssertResultOpen(); auto res = result->FetchArrowCapsule(); result = nullptr; @@ -1049,7 +1042,6 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyRelation::ToRecordBatch(idx_t batch_s // Fresh relation: stream lazily on the user's own context (survives `del conn`). ExecuteOrThrow(true); } - // FetchRecordBatchReader routes by type (stream wrapped directly, materialized re-fed). AssertResultOpen(); auto res = result->FetchRecordBatchReader(batch_size); result = nullptr; diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index 0446f282..002b142b 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -427,26 +427,17 @@ py::dict DuckDBPyResult::FetchTF() { return result_dict; } -// Build a `SELECT * FROM ` statement that scans `collection` in place, to be -// executed directly via ClientContext::PendingQuery(statement, ...). -// -// This deliberately avoids ClientContext::PendingQuery(relation, ...), whose -// RelationStatement constructor eagerly calls relation->GetQuery() and stringifies the -// ENTIRE ColumnDataCollection (formatting every value - an O(rows) pathology that costs -// >8s / 10M rows). The query string is never used for binding (Binder::Bind just calls -// relation->Bind), so executing the SelectStatement skips it entirely while keeping the -// zero-copy, parallel ColumnDataScan. The ColumnDataRef owns the collection (and the bound -// LogicalColumnDataGet takes it over), so the data stays alive for the result - important -// for the lazy StreamQueryResult path. +// `SELECT * FROM ` over `collection`, executed as a SelectStatement rather +// than via PendingQuery(relation) - the latter's RelationStatement stringifies the whole +// collection (O(rows)). The ColumnDataRef owns the collection, so it outlives the result +// (needed by the lazy stream path). static unique_ptr MakeColumnDataScanStatement(unique_ptr collection, const vector &names) { - // The binder rejects a ColumnDataRef with duplicate column names, so de-duplicate - // them (a, a -> a, a_1); callers restore the original output names on the result. + // The binder rejects duplicate column names; callers restore the originals afterwards. auto deduplicated_names = names; QueryResult::DeduplicateColumns(deduplicated_names); auto table_ref = make_uniq(std::move(collection), std::move(deduplicated_names)); - // Binding requires a set alias (BindingAlias::GetAlias asserts otherwise). - table_ref->alias = "materialized"; + table_ref->alias = "materialized"; // binding asserts on an unset alias auto select_node = make_uniq(); select_node->select_list.push_back(make_uniq()); select_node->from_table = std::move(table_ref); @@ -455,20 +446,14 @@ static unique_ptr MakeColumnDataScanStatement(unique_ptr ArrowQueryResult. void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) { - // Only an already-materialized result (a ColumnDataCollection in memory) is pulled - // back through the engine. A lazy StreamQueryResult is converted directly instead - - // see FetchArrowTable - so we never materialize a lazy result just to re-feed it. D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); auto client_context = result->client_properties.client_context; if (!client_context) { throw InternalException("Cannot promote result to Arrow: the originating client context is gone"); } - // Re-run the in-memory result through the engine with a PhysicalArrowCollector on - // the user's own context. The ColumnDataScan over the collection is parallel, so the - // Arrow conversion runs in parallel, just like the fresh relational to_arrow_table - // path. Running on the user's context means the result carries the user's Arrow - // output settings automatically (no need to replicate them). auto context = client_context->shared_from_this(); auto &materialized = result->Cast(); auto names = result->names; @@ -494,25 +479,19 @@ void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) { if (new_result->HasError()) { new_result->ThrowError(); } - // Re-binding the ColumnDataRef de-duplicates duplicate output column names - // (a, a -> a, a_1). Restore the originals so the Arrow output is unchanged. - new_result->names = std::move(names); + new_result->names = std::move(names); // restore names de-duplicated by re-binding result = std::move(new_result); } +// Re-feed a materialized result as a lazy stream on the user's own context. The +// StreamQueryResult co-owns the context, so conversion survives `del conn` and runs under a +// live transaction (geometry/extension correctness, #492). void DuckDBPyResult::PromoteMaterializedToStream() { - // As above, only an already-materialized result is re-fed; a lazy StreamQueryResult - // is wrapped directly (it already has a live context). D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); auto client_context = result->client_properties.client_context; if (!client_context) { throw InternalException("Cannot promote result to an Arrow stream: the originating client context is gone"); } - // Re-run the materialized result as a lazy stream on the user's own context. The - // resulting StreamQueryResult co-owns that ClientContext (shared_ptr), so the stream - // - and the Arrow conversion it drives - survives `del conn` and runs under a live - // transaction (the geometry/extension correctness win, native #492 fix). The - // ColumnDataRef owns the collection, so the data outlives the temporary relation. auto context = client_context->shared_from_this(); auto &materialized = result->Cast(); auto names = result->names; @@ -536,12 +515,8 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo if (!result) { throw InvalidInputException("There is no query result"); } - // Route by result type: - // - ARROW_RESULT: fresh to_arrow_table already ran a PhysicalArrowCollector. - // - MATERIALIZED_RESULT (rel.execute()): a CDC in memory - re-feed it through a - // PhysicalArrowCollector for parallel conversion (-> ARROW_RESULT below). - // - STREAM_RESULT (con.execute()): a lazy result with a live context - convert it - // directly, batch by batch. We never materialize a lazy result just to re-feed it. + // ARROW_RESULT: fresh collector output. MATERIALIZED: re-feed for parallel conversion. + // STREAM: a live result, converted directly below (never materialized to re-feed). if (result->type == QueryResultType::MATERIALIZED_RESULT) { PromoteMaterializedToArrow(rows_per_batch); } @@ -600,18 +575,13 @@ ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } - // The lazy Arrow surfaces (capsule, reader) need a context-owning StreamQueryResult. - // A pre-executed relation hands us a MaterializedQueryResult (a CDC) - re-feed it as a - // stream so the conversion gets a live, owned context (survives `del conn`, runs under - // a live txn). A StreamQueryResult (con.execute(), or a fresh streaming relation) - // already has a live context, so we wrap it directly - never materializing a lazy - // result. + // Re-feed a materialized result to get a context-owning stream; a StreamQueryResult is + // wrapped directly (already has a live context). if (result->type == QueryResultType::MATERIALIZED_RESULT) { PromoteMaterializedToStream(); } + // The wrapper is owned by the ArrowArrayStream's private_data (released with the stream). ResultArrowArrayStreamWrapper *result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); - // The 'result_stream' is part of the 'private_data' of the ArrowArrayStream and its lifetime is bound to that of - // the ArrowArrayStream. return result_stream->stream; } @@ -643,13 +613,7 @@ py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } - // The capsule is a lazy streaming object backed by a StreamQueryResult, which - // owns its own ClientContext. The caller (DuckDBPyRelation) ensures `result` is - // a stream: a fresh relation streams on the user's connection; an already- - // executed result is re-fed via PromoteToDedicatedStream(). Because the stream - // owns its context, core's ResultArrowArrayStreamWrapper can lazily compute the - // schema and convert (incl. extension/geometry types under a live transaction) - // after the originating connection is gone - no eager schema caching needed. + // Lazy streaming capsule backed by a context-owning stream (see FetchArrowArrayStream). auto inner_stream = FetchArrowArrayStream(rows_per_batch); auto stream = new ArrowArrayStream(); *stream = inner_stream; From 7ddc75fe0d329782650950a9392771b52e07f8e5 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Mon, 15 Jun 2026 17:05:12 +0200 Subject: [PATCH 03/11] run schema fetching in same transaction as arrow data conversion and only once --- src/duckdb_py/arrow/arrow_array_stream.cpp | 4 +- src/duckdb_py/arrow/arrow_export_utils.cpp | 22 ++++-- .../arrow/arrow_array_stream.hpp | 2 +- .../arrow/arrow_export_utils.hpp | 4 + src/duckdb_py/pyrelation.cpp | 73 ++++++++++++++++++- src/duckdb_py/pyresult.cpp | 24 ++---- src/duckdb_py/python_udf.cpp | 3 +- 7 files changed, 103 insertions(+), 29 deletions(-) diff --git a/src/duckdb_py/arrow/arrow_array_stream.cpp b/src/duckdb_py/arrow/arrow_array_stream.cpp index 4f438dec..5b167e5e 100644 --- a/src/duckdb_py/arrow/arrow_array_stream.cpp +++ b/src/duckdb_py/arrow/arrow_array_stream.cpp @@ -14,11 +14,11 @@ namespace duckdb { -void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches) { +void TransformDuckToArrowChunk(py::object pyarrow_schema, ArrowArray &data, py::list &batches) { py::gil_assert(); auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); auto batch_import_func = pyarrow_lib_module.attr("RecordBatch").attr("_import_from_c"); - batches.append(batch_import_func(reinterpret_cast(&data), reinterpret_cast(&arrow_schema))); + batches.append(batch_import_func(reinterpret_cast(&data), pyarrow_schema)); } void VerifyArrowDatasetLoaded() { diff --git a/src/duckdb_py/arrow/arrow_export_utils.cpp b/src/duckdb_py/arrow/arrow_export_utils.cpp index ea30b94b..7d44a1c0 100644 --- a/src/duckdb_py/arrow/arrow_export_utils.cpp +++ b/src/duckdb_py/arrow/arrow_export_utils.cpp @@ -17,18 +17,28 @@ namespace duckdb { namespace pyarrow { -py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, - ClientProperties &options) { +py::object ToPyArrowSchema(ArrowSchema &schema) { py::gil_scoped_acquire acquire; auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); - auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches"); auto schema_import_func = pyarrow_lib_module.attr("Schema").attr("_import_from_c"); + return schema_import_func(reinterpret_cast(&schema)); +} + +py::object ToArrowTable(const py::list &batches, py::object pyarrow_schema) { + py::gil_scoped_acquire acquire; + + auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); + auto from_batches_func = pyarrow_lib_module.attr("Table").attr("from_batches"); + + return py::cast(from_batches_func(batches, pyarrow_schema)); +} + +py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, + ClientProperties &options) { ArrowSchema schema; ArrowConverter::ToArrowSchema(&schema, types, names, options); - auto schema_obj = schema_import_func(reinterpret_cast(&schema)); - - return py::cast(from_batches_func(batches, schema_obj)); + return ToArrowTable(batches, ToPyArrowSchema(schema)); } } // namespace pyarrow diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp index 90974ff7..1f790c28 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_array_stream.hpp @@ -62,7 +62,7 @@ enum class PyArrowObjectType { PolarsLazyFrame }; -void TransformDuckToArrowChunk(ArrowSchema &arrow_schema, ArrowArray &data, py::list &batches); +void TransformDuckToArrowChunk(py::object pyarrow_schema, ArrowArray &data, py::list &batches); PyArrowObjectType GetArrowType(const py::handle &obj); diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp index 39d1b3dc..9926774f 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp @@ -6,9 +6,13 @@ namespace duckdb { namespace pyarrow { +py::object ToPyArrowSchema(ArrowSchema &schema); + py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, ClientProperties &options); +py::object ToArrowTable(const py::list &batches, py::object pyarrow_schema); + } // namespace pyarrow } // namespace duckdb diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index ff3a7d86..d04a91f1 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -23,6 +23,59 @@ #include "duckdb/common/arrow/physical_arrow_collector.hpp" #include "duckdb_python/arrow/arrow_export_utils.hpp" +namespace { + +// A helper for arrow conversion. We want to be able to fetch a result's schema in the same transaction that +// creates the result, so we have to wrap both calls in the same transaction. This helper always reverts the +// transaction if we haven't committed it explicitly. Note that this is not the same as RunFunctionInTransaction: +// we run _queries_ in a transaction (where each query acquires the context lock) while RFIT runs a function +// while holding the context lock for that duration. +// Note: this is a workaround that is intended to be temporary. We should really just cache the schema in the +// ArrowQueryResult. + +void RunOrThrow(duckdb::ClientContext &context, const char *sql) { + auto result = context.Query(sql, duckdb::QueryParameters(false)); + if (result->HasError()) { + result->ThrowError(); + } +} + +class ArrowConversionTransaction { +public: + explicit ArrowConversionTransaction(duckdb::ClientContext &context_p) : context(context_p), owns(false) { + auto &txn = context.transaction; + if (txn.IsAutoCommit() && !txn.HasActiveTransaction()) { + RunOrThrow(context, "BEGIN TRANSACTION"); + owns = true; + } + } + + ~ArrowConversionTransaction() { + if (owns) { + try { + RunOrThrow(context, "ROLLBACK"); + } catch (...) { // NOLINT + } + } + } + + void Commit() { + if (owns) { + RunOrThrow(context, "COMMIT"); + owns = false; + } + } + + ArrowConversionTransaction(const ArrowConversionTransaction &) = delete; + ArrowConversionTransaction &operator=(const ArrowConversionTransaction &) = delete; + +private: + duckdb::ClientContext &context; + bool owns; +}; + +} // namespace + namespace duckdb { DuckDBPyRelation::DuckDBPyRelation(shared_ptr rel_p) : rel(std::move(rel_p)) { @@ -961,10 +1014,22 @@ PandasDataFrame DuckDBPyRelation::FetchDFChunk(idx_t vectors_per_chunk, bool dat } duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) { + if (!result && !rel) { + return py::none(); + } + // Make sure we have a valid client context + shared_ptr context; + if (rel) { + context = rel->context->GetContext(); + } else if (auto cc = result->GetClientProperties().client_context) { + context = cc->shared_from_this(); + } else { + throw ConnectionException("Cannot fetch an arrow table without a valid connection"); + } + // Start (or piggyback on) a transaction for the conversion + ArrowConversionTransaction conversion_txn(*context); + if (!result) { - if (!rel) { - return py::none(); - } auto &config = ClientConfig::GetConfig(*rel->context->GetContext()); ScopedConfigSetting scoped_setting( config, @@ -979,6 +1044,8 @@ duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, AssertResultOpen(); auto res = result->FetchArrowTable(batch_size, to_polars); result = nullptr; + // We must commit the transaction before returning + conversion_txn.Commit(); return res; } diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index 002b142b..a1f4a270 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -526,21 +526,19 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo QueryResult::DeduplicateColumns(names); } + // Fetch the schema once + ArrowSchema arrow_schema; + ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties); + auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema); + py::list batches; if (result->type == QueryResultType::ARROW_RESULT) { auto &arrow_result = result->Cast(); auto arrays = arrow_result.ConsumeArrays(); for (auto &array : arrays) { - ArrowSchema arrow_schema; - auto result_names = arrow_result.names; - if (to_polars) { - QueryResult::DeduplicateColumns(result_names); - } ArrowArray data = array->arrow_array; array->arrow_array.release = nullptr; - ArrowConverter::ToArrowSchema(&arrow_schema, arrow_result.types, result_names, - arrow_result.client_properties); - TransformDuckToArrowChunk(arrow_schema, data, batches); + TransformDuckToArrowChunk(pyarrow_schema, data, batches); } } else { // STREAM_RESULT: pull the live stream directly into Arrow batches. @@ -558,17 +556,11 @@ duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, boo if (count == 0) { break; } - ArrowSchema arrow_schema; - auto result_names = result->names; - if (to_polars) { - QueryResult::DeduplicateColumns(result_names); - } - ArrowConverter::ToArrowSchema(&arrow_schema, result->types, result_names, result->client_properties); - TransformDuckToArrowChunk(arrow_schema, data, batches); + TransformDuckToArrowChunk(pyarrow_schema, data, batches); } } - return pyarrow::ToArrowTable(result->types, names, std::move(batches), result->client_properties); + return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); } ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { diff --git a/src/duckdb_py/python_udf.cpp b/src/duckdb_py/python_udf.cpp index a62004d4..9af66b37 100644 --- a/src/duckdb_py/python_udf.cpp +++ b/src/duckdb_py/python_udf.cpp @@ -25,13 +25,14 @@ static py::list ConvertToSingleBatch(vector &types, vector ClientProperties &options, ClientContext &context) { ArrowSchema schema; ArrowConverter::ToArrowSchema(&schema, types, names, options); + auto pyarrow_schema = pyarrow::ToPyArrowSchema(schema); py::list single_batch; ArrowAppender appender(types, STANDARD_VECTOR_SIZE, options, ArrowTypeExtensionData::GetExtensionTypes(context, types)); appender.Append(input, 0, input.size(), input.size()); auto array = appender.Finalize(); - TransformDuckToArrowChunk(schema, array, single_batch); + TransformDuckToArrowChunk(pyarrow_schema, array, single_batch); return single_batch; } From c8c35ebd79889a7a23d8e46deaf35272b9eb17ac Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Tue, 16 Jun 2026 11:20:39 +0200 Subject: [PATCH 04/11] Get the arrow schema in a separate transaction for materialized data only --- src/duckdb_py/arrow/arrow_export_utils.cpp | 2 +- .../arrow/arrow_export_utils.hpp | 2 +- .../include/duckdb_python/pyresult.hpp | 10 +- src/duckdb_py/pyrelation.cpp | 69 +------- src/duckdb_py/pyresult.cpp | 166 ++++++++++-------- 5 files changed, 102 insertions(+), 147 deletions(-) diff --git a/src/duckdb_py/arrow/arrow_export_utils.cpp b/src/duckdb_py/arrow/arrow_export_utils.cpp index 7d44a1c0..8333bbf6 100644 --- a/src/duckdb_py/arrow/arrow_export_utils.cpp +++ b/src/duckdb_py/arrow/arrow_export_utils.cpp @@ -17,7 +17,7 @@ namespace duckdb { namespace pyarrow { -py::object ToPyArrowSchema(ArrowSchema &schema) { +py::object ToPyArrowSchema(const ArrowSchema &schema) { py::gil_scoped_acquire acquire; auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); diff --git a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp index 9926774f..6306b116 100644 --- a/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp +++ b/src/duckdb_py/include/duckdb_python/arrow/arrow_export_utils.hpp @@ -6,7 +6,7 @@ namespace duckdb { namespace pyarrow { -py::object ToPyArrowSchema(ArrowSchema &schema); +py::object ToPyArrowSchema(const ArrowSchema &schema); py::object ToArrowTable(const vector &types, const vector &names, const py::list &batches, ClientProperties &options); diff --git a/src/duckdb_py/include/duckdb_python/pyresult.hpp b/src/duckdb_py/include/duckdb_python/pyresult.hpp index ace4d42d..d7da83cc 100644 --- a/src/duckdb_py/include/duckdb_python/pyresult.hpp +++ b/src/duckdb_py/include/duckdb_python/pyresult.hpp @@ -36,15 +36,13 @@ struct DuckDBPyResult { PandasDataFrame FetchDF(bool date_as_object); - duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars); - PandasDataFrame FetchDFChunk(const idx_t vectors_per_chunk = 1, bool date_as_object = false); py::dict FetchPyTorch(); py::dict FetchTF(); - ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch = 1000000); + duckdb::pyarrow::Table FetchArrowTable(idx_t rows_per_batch, bool to_polars); duckdb::pyarrow::RecordBatchReader FetchRecordBatchReader(idx_t rows_per_batch = 1000000); py::object FetchArrowCapsule(idx_t rows_per_batch = 1000000); @@ -78,7 +76,11 @@ struct DuckDBPyResult { //! the context (so it survives `del conn`). Never call these on a StreamQueryResult: //! a lazy result already has a live context and is converted/wrapped directly. void PromoteMaterializedToArrow(idx_t batch_size); - void PromoteMaterializedToStream(); + + template + T RunWithArrowSchema(const std::function &fun, bool dedup_col_names); + duckdb::pyarrow::Table MaterializedResultToArrowTable(const ArrowSchema &arrow_schema, idx_t rows_per_batch); + ArrowArrayStream FetchArrowArrayStream(idx_t rows_per_batch); private: idx_t chunk_offset = 0; diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index d04a91f1..23040a53 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -23,59 +23,6 @@ #include "duckdb/common/arrow/physical_arrow_collector.hpp" #include "duckdb_python/arrow/arrow_export_utils.hpp" -namespace { - -// A helper for arrow conversion. We want to be able to fetch a result's schema in the same transaction that -// creates the result, so we have to wrap both calls in the same transaction. This helper always reverts the -// transaction if we haven't committed it explicitly. Note that this is not the same as RunFunctionInTransaction: -// we run _queries_ in a transaction (where each query acquires the context lock) while RFIT runs a function -// while holding the context lock for that duration. -// Note: this is a workaround that is intended to be temporary. We should really just cache the schema in the -// ArrowQueryResult. - -void RunOrThrow(duckdb::ClientContext &context, const char *sql) { - auto result = context.Query(sql, duckdb::QueryParameters(false)); - if (result->HasError()) { - result->ThrowError(); - } -} - -class ArrowConversionTransaction { -public: - explicit ArrowConversionTransaction(duckdb::ClientContext &context_p) : context(context_p), owns(false) { - auto &txn = context.transaction; - if (txn.IsAutoCommit() && !txn.HasActiveTransaction()) { - RunOrThrow(context, "BEGIN TRANSACTION"); - owns = true; - } - } - - ~ArrowConversionTransaction() { - if (owns) { - try { - RunOrThrow(context, "ROLLBACK"); - } catch (...) { // NOLINT - } - } - } - - void Commit() { - if (owns) { - RunOrThrow(context, "COMMIT"); - owns = false; - } - } - - ArrowConversionTransaction(const ArrowConversionTransaction &) = delete; - ArrowConversionTransaction &operator=(const ArrowConversionTransaction &) = delete; - -private: - duckdb::ClientContext &context; - bool owns; -}; - -} // namespace - namespace duckdb { DuckDBPyRelation::DuckDBPyRelation(shared_ptr rel_p) : rel(std::move(rel_p)) { @@ -1013,22 +960,10 @@ PandasDataFrame DuckDBPyRelation::FetchDFChunk(idx_t vectors_per_chunk, bool dat return result->FetchDFChunk(vectors_per_chunk, date_as_object); } -duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) { +pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, bool to_polars) { if (!result && !rel) { return py::none(); } - // Make sure we have a valid client context - shared_ptr context; - if (rel) { - context = rel->context->GetContext(); - } else if (auto cc = result->GetClientProperties().client_context) { - context = cc->shared_from_this(); - } else { - throw ConnectionException("Cannot fetch an arrow table without a valid connection"); - } - // Start (or piggyback on) a transaction for the conversion - ArrowConversionTransaction conversion_txn(*context); - if (!result) { auto &config = ClientConfig::GetConfig(*rel->context->GetContext()); ScopedConfigSetting scoped_setting( @@ -1044,8 +979,6 @@ duckdb::pyarrow::Table DuckDBPyRelation::ToArrowTableInternal(idx_t batch_size, AssertResultOpen(); auto res = result->FetchArrowTable(batch_size, to_polars); result = nullptr; - // We must commit the transaction before returning - conversion_txn.Commit(); return res; } diff --git a/src/duckdb_py/pyresult.cpp b/src/duckdb_py/pyresult.cpp index a1f4a270..270c1625 100644 --- a/src/duckdb_py/pyresult.cpp +++ b/src/duckdb_py/pyresult.cpp @@ -483,97 +483,91 @@ void DuckDBPyResult::PromoteMaterializedToArrow(idx_t batch_size) { result = std::move(new_result); } -// Re-feed a materialized result as a lazy stream on the user's own context. The -// StreamQueryResult co-owns the context, so conversion survives `del conn` and runs under a -// live transaction (geometry/extension correctness, #492). -void DuckDBPyResult::PromoteMaterializedToStream() { - D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); - auto client_context = result->client_properties.client_context; - if (!client_context) { - throw InternalException("Cannot promote result to an Arrow stream: the originating client context is gone"); +template +T DuckDBPyResult::RunWithArrowSchema(const std::function &fun, bool dedup_col_names) { + D_ASSERT(result); + if (!result->client_properties.client_context) { + throw ConnectionException("Cannot fetch arrow schema without a valid connection"); } - auto context = client_context->shared_from_this(); - auto &materialized = result->Cast(); - auto names = result->names; - auto select = MakeColumnDataScanStatement(materialized.TakeCollection(), names); + auto ctx = result->client_properties.client_context->shared_from_this(); - unique_ptr new_result; - { - D_ASSERT(py::gil_check()); - py::gil_scoped_release release; - auto pending_query = context->PendingQuery(std::move(select), QueryParameters(true)); - new_result = DuckDBPyConnection::CompletePendingQuery(*pending_query); - } - if (new_result->HasError()) { - new_result->ThrowError(); + auto names = result->names; + if (dedup_col_names) { + QueryResult::DeduplicateColumns(names); } - new_result->names = std::move(names); - result = std::move(new_result); + + ArrowSchema arrow_schema; + ctx->RunFunctionInTransaction( + [&] { ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties); }); + + return fun(arrow_schema); } -duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(idx_t rows_per_batch, bool to_polars) { - if (!result) { - throw InvalidInputException("There is no query result"); - } - // ARROW_RESULT: fresh collector output. MATERIALIZED: re-feed for parallel conversion. - // STREAM: a live result, converted directly below (never materialized to re-feed). +duckdb::pyarrow::Table DuckDBPyResult::MaterializedResultToArrowTable(const ArrowSchema &arrow_schema, + const idx_t rows_per_batch) { + D_ASSERT(result); + D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT); + + auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema); if (result->type == QueryResultType::MATERIALIZED_RESULT) { PromoteMaterializedToArrow(rows_per_batch); } - - auto names = result->names; - if (to_polars) { - QueryResult::DeduplicateColumns(names); + py::list batches; + auto &arrow_result = result->Cast(); + auto arrays = arrow_result.ConsumeArrays(); + for (auto &array : arrays) { + ArrowArray data = array->arrow_array; + array->arrow_array.release = nullptr; + TransformDuckToArrowChunk(pyarrow_schema, data, batches); } + return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); +} - // Fetch the schema once - ArrowSchema arrow_schema; - ArrowConverter::ToArrowSchema(&arrow_schema, result->types, names, result->client_properties); - auto pyarrow_schema = pyarrow::ToPyArrowSchema(arrow_schema); - - py::list batches; - if (result->type == QueryResultType::ARROW_RESULT) { - auto &arrow_result = result->Cast(); - auto arrays = arrow_result.ConsumeArrays(); - for (auto &array : arrays) { - ArrowArray data = array->arrow_array; - array->arrow_array.release = nullptr; - TransformDuckToArrowChunk(pyarrow_schema, data, batches); - } - } else { - // STREAM_RESULT: pull the live stream directly into Arrow batches. - QueryResultChunkScanState scan_state(*result); - while (true) { - ArrowArray data; - idx_t count; - { - D_ASSERT(py::gil_check()); - py::gil_scoped_release release; - count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data, - ArrowTypeExtensionData::GetExtensionTypes( - *result->client_properties.client_context, result->types)); - } - if (count == 0) { - break; - } - TransformDuckToArrowChunk(pyarrow_schema, data, batches); - } +duckdb::pyarrow::Table DuckDBPyResult::FetchArrowTable(const idx_t rows_per_batch, const bool to_polars) { + if (!result) { + throw InvalidInputException("There is no query result"); } - return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> duckdb::pyarrow::Table { + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + return MaterializedResultToArrowTable(schema, rows_per_batch); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowTable called with unsupported query result: %d", result->type); + } + auto pyarrow_schema = pyarrow::ToPyArrowSchema(schema); + py::list batches; + QueryResultChunkScanState scan_state(*result); + while (true) { + ArrowArray data; + idx_t count; + { + D_ASSERT(py::gil_check()); + py::gil_scoped_release release; + count = ArrowUtil::FetchChunk(scan_state, result->client_properties, rows_per_batch, &data, + ArrowTypeExtensionData::GetExtensionTypes( + *result->client_properties.client_context, result->types)); + } + if (count == 0) { + break; + } + TransformDuckToArrowChunk(pyarrow_schema, data, batches); + } + return pyarrow::ToArrowTable(std::move(batches), pyarrow_schema); + }, + to_polars); } ArrowArrayStream DuckDBPyResult::FetchArrowArrayStream(idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } - // Re-feed a materialized result to get a context-owning stream; a StreamQueryResult is - // wrapped directly (already has a live context). - if (result->type == QueryResultType::MATERIALIZED_RESULT) { - PromoteMaterializedToStream(); + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowArrayStream called with unsupported query result: %d", result->type); } // The wrapper is owned by the ArrowArrayStream's private_data (released with the stream). - ResultArrowArrayStreamWrapper *result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); + const auto result_stream = new ResultArrowArrayStreamWrapper(std::move(result), rows_per_batch); return result_stream->stream; } @@ -581,6 +575,20 @@ duckdb::pyarrow::RecordBatchReader DuckDBPyResult::FetchRecordBatchReader(idx_t if (!result) { throw InvalidInputException("There is no query result"); } + + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + constexpr bool dedup_column_names = false; + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> duckdb::pyarrow::RecordBatchReader { + const auto table = MaterializedResultToArrowTable(schema, rows_per_batch); + return py::cast( + table.attr("to_reader")(py::arg("max_chunksize") = rows_per_batch)); + }, + dedup_column_names); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchRecordBatchReader called with unsupported query result: %d", result->type); + } py::gil_scoped_acquire acquire; auto pyarrow_lib_module = py::module::import("pyarrow").attr("lib"); auto record_batch_reader_func = pyarrow_lib_module.attr("RecordBatchReader").attr("_import_from_c"); @@ -601,11 +609,23 @@ static void ArrowArrayStreamPyCapsuleDestructor(PyObject *object) { delete stream; } -py::object DuckDBPyResult::FetchArrowCapsule(idx_t rows_per_batch) { +py::object DuckDBPyResult::FetchArrowCapsule(const idx_t rows_per_batch) { if (!result) { throw InvalidInputException("There is no query result"); } - // Lazy streaming capsule backed by a context-owning stream (see FetchArrowArrayStream). + + constexpr bool dedup_column_names = false; + if (result->type == QueryResultType::MATERIALIZED_RESULT || result->type == QueryResultType::ARROW_RESULT) { + return RunWithArrowSchema( + [&](const ArrowSchema &schema) -> py::object { + const auto table = MaterializedResultToArrowTable(schema, rows_per_batch); + return table.attr("__arrow_c_stream__")(); + }, + dedup_column_names); + } + if (result->type != QueryResultType::STREAM_RESULT) { + throw InternalException("FetchArrowCapsule called with unsupported query result: %d", result->type); + } auto inner_stream = FetchArrowArrayStream(rows_per_batch); auto stream = new ArrowArrayStream(); *stream = inner_stream; From 321197761923c6a104cc88a099f45d15adf534f2 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Tue, 16 Jun 2026 11:32:14 +0200 Subject: [PATCH 05/11] force windows 2022 runners --- .github/workflows/packaging_wheels.yml | 2 +- .github/workflows/targeted_test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/packaging_wheels.yml b/.github/workflows/packaging_wheels.yml index b59e5387..c7f8c5d7 100644 --- a/.github/workflows/packaging_wheels.yml +++ b/.github/workflows/packaging_wheels.yml @@ -32,7 +32,7 @@ jobs: matrix: python: [ cp310, cp311, cp312, cp313, cp314 ] platform: - - { os: windows-2025, arch: amd64, cibw_system: win } + - { os: windows-2022, arch: amd64, cibw_system: win } - { os: windows-11-arm, arch: ARM64, cibw_system: win } # cibw requires ARM64 to be uppercase - { os: ubuntu-24.04, arch: x86_64, cibw_system: manylinux } - { os: ubuntu-24.04-arm, arch: aarch64, cibw_system: manylinux } diff --git a/.github/workflows/targeted_test.yml b/.github/workflows/targeted_test.yml index d1a828de..eff62fe2 100644 --- a/.github/workflows/targeted_test.yml +++ b/.github/workflows/targeted_test.yml @@ -8,7 +8,7 @@ on: required: true type: choice options: - - 'windows-2025' + - 'windows-2022' - 'windows-11-arm' - 'ubuntu-24.04' - 'ubuntu-24.04-arm' From 305369d48d2b7e3460c36f00842200b560d6447d Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Tue, 16 Jun 2026 13:12:33 +0200 Subject: [PATCH 06/11] Pin duckdb at release hash 08e34c447b --- external/duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/duckdb b/external/duckdb index 894e3727..08e34c44 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit 894e3727d194d72295d10aa971798de10a82e657 +Subproject commit 08e34c447bae34eaee3723cac61f2878b6bdf787 From fcf4359c15b3dc3c287bdd715977f8aef2c926ee Mon Sep 17 00:00:00 2001 From: DuckDB Labs GitHub Bot Date: Wed, 17 Jun 2026 07:32:19 +0000 Subject: [PATCH 07/11] Bump submodule --- external/duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/duckdb b/external/duckdb index 08e34c44..ceb2aef3 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit 08e34c447bae34eaee3723cac61f2878b6bdf787 +Subproject commit ceb2aef3e30c5c04cf97eea4af3990a274bd49bb From 61a982227c40dccb0a0940877a99298e0c7eb8f6 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Thu, 18 Jun 2026 12:23:10 +0200 Subject: [PATCH 08/11] pin torch --- external/duckdb | 2 +- pyproject.toml | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/external/duckdb b/external/duckdb index ceb2aef3..08e34c44 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit ceb2aef3e30c5c04cf97eea4af3990a274bd49bb +Subproject commit 08e34c447bae34eaee3723cac61f2878b6bdf787 diff --git a/pyproject.toml b/pyproject.toml index 4de54a76..f3bd17dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -257,7 +257,8 @@ test = [ # dependencies used for running tests "pandas<3.0.0; python_version < '3.11'", "pyarrow>=23.0.0; python_version >= '3.10' and (sys_platform != 'win32' or platform_machine != 'ARM64')", "pyarrow>=18.0.0; python_version < '3.10' and (sys_platform != 'win32' or platform_machine != 'ARM64')", - "torch>=2.2.2; python_version < '3.13' and ( sys_platform != 'win32' or platform_machine != 'ARM64' or python_version > '3.11' )", + "torch>=2.2.2; python_version < '3.13' and ( sys_platform != 'win32' or platform_machine != 'ARM64' )", + "torch>=2.2.2,<2.12.1; python_version > '3.11' and python_version < '3.13' and sys_platform == 'win32' and platform_machine == 'ARM64'", "torch>=2.2.2,<2.11.0; python_version >= '3.13' and python_version < '3.14' and ( sys_platform != 'darwin' or platform_machine != 'x86_64' ) and ( sys_platform != 'win32' or platform_machine != 'ARM64' )", "torch>=2.10.0; python_version >= '3.14' and ( sys_platform != 'darwin' or platform_machine != 'x86_64' ) and ( sys_platform != 'win32' or platform_machine != 'ARM64' )", "tensorflow==2.14.0; sys_platform == 'darwin' and python_version < '3.12'", From e9630434c4e539eea6bae92e0f8bc91eba014d25 Mon Sep 17 00:00:00 2001 From: DuckDB Labs GitHub Bot Date: Sun, 21 Jun 2026 06:31:40 +0000 Subject: [PATCH 09/11] Bump submodule --- external/duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/duckdb b/external/duckdb index 08e34c44..c4770ecb 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit 08e34c447bae34eaee3723cac61f2878b6bdf787 +Subproject commit c4770ecba48065b691843da2e6eb9f91e3fea77b From 4c63e39edb01ee286ffba9f29889c739d039e6e3 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Fri, 26 Jun 2026 09:00:57 +0200 Subject: [PATCH 10/11] bump submodule to June 26 nightly --- external/duckdb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/duckdb b/external/duckdb index 0361de44..06eb6b68 160000 --- a/external/duckdb +++ b/external/duckdb @@ -1 +1 @@ -Subproject commit 0361de441a122f11fe07c40ce4531cd58f5eaf7a +Subproject commit 06eb6b6858c6d568f5fe62855f53c386f13c98c7 From 3d73752407af312d406efbecb1acaabd1f544ef0 Mon Sep 17 00:00:00 2001 From: Evert Lammerts Date: Fri, 26 Jun 2026 10:14:53 +0200 Subject: [PATCH 11/11] QualifiedName and ProfilerPrintFormat --- src/duckdb_py/pyconnection.cpp | 9 +++++---- src/duckdb_py/pyexpression.cpp | 10 +++++----- src/duckdb_py/pyrelation.cpp | 6 +++--- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/duckdb_py/pyconnection.cpp b/src/duckdb_py/pyconnection.cpp index b6e14293..8b9ac8f9 100644 --- a/src/duckdb_py/pyconnection.cpp +++ b/src/duckdb_py/pyconnection.cpp @@ -413,7 +413,7 @@ shared_ptr DuckDBPyConnection::UnregisterUDF(const string &n auto &catalog = Catalog::GetCatalog(context, SYSTEM_CATALOG); DropInfo info; info.type = CatalogType::SCALAR_FUNCTION_ENTRY; - info.name = Identifier(name); + info.NameMutable() = Identifier(name); info.allow_drop_internal = true; info.cascade = false; info.if_not_found = OnEntryNotFound::THROW_EXCEPTION; @@ -1651,11 +1651,12 @@ unique_ptr DuckDBPyConnection::RunQuery(const py::object &quer unique_ptr DuckDBPyConnection::Table(const string &tname) { auto &connection = con.GetConnection(); auto qualified_name = QualifiedName::Parse(tname); - if (qualified_name.schema.empty()) { - qualified_name.schema = DEFAULT_SCHEMA; + if (qualified_name.Schema().empty()) { + qualified_name.SchemaMutable() = DEFAULT_SCHEMA; } try { - return CreateRelation(connection.Table(qualified_name.catalog, qualified_name.schema, qualified_name.name)); + return CreateRelation( + connection.Table(qualified_name.Catalog(), qualified_name.Schema(), qualified_name.Name())); } catch (const CatalogException &) { // CatalogException will be of the type '... is not a table' // Not a table in the database, make a query relation that can perform replacement scans diff --git a/src/duckdb_py/pyexpression.cpp b/src/duckdb_py/pyexpression.cpp index 329b034e..e6d263a6 100644 --- a/src/duckdb_py/pyexpression.cpp +++ b/src/duckdb_py/pyexpression.cpp @@ -325,13 +325,13 @@ shared_ptr DuckDBPyExpression::ColumnExpression(const py::ar } auto qualified_name = QualifiedName::Parse(column_name); - if (!qualified_name.catalog.empty()) { - column_names.push_back(qualified_name.catalog); + if (!qualified_name.Catalog().empty()) { + column_names.push_back(qualified_name.Catalog()); } - if (!qualified_name.schema.empty()) { - column_names.push_back(qualified_name.schema); + if (!qualified_name.Schema().empty()) { + column_names.push_back(qualified_name.Schema()); } - column_names.push_back(qualified_name.name); + column_names.push_back(qualified_name.Name()); } else { for (auto &part : names) { column_names.push_back(Identifier(py::str(part))); diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index cdb1df57..10e8aa72 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -1581,7 +1581,7 @@ DuckDBPyRelation &DuckDBPyRelation::Execute() { void DuckDBPyRelation::InsertInto(const string &table) { AssertRelation(); auto parsed_info = QualifiedName::Parse(table); - auto insert = rel->InsertRel(parsed_info.catalog, parsed_info.schema, parsed_info.name); + auto insert = rel->InsertRel(parsed_info.Catalog(), parsed_info.Schema(), parsed_info.Name()); PyExecuteRelation(insert); } @@ -1645,7 +1645,7 @@ void DuckDBPyRelation::Insert(const py::object ¶ms) const { void DuckDBPyRelation::Create(const string &table) { AssertRelation(); auto parsed_info = QualifiedName::Parse(table); - auto create = rel->CreateRel(parsed_info.schema, parsed_info.name, false); + auto create = rel->CreateRel(parsed_info.Schema(), parsed_info.Name(), false); PyExecuteRelation(create); } @@ -1747,7 +1747,7 @@ string DuckDBPyRelation::Explain(ExplainType type, const string &format) { // An empty format means "auto": the default format, or HTML when running under Jupyter. const bool auto_format = format.empty(); - auto explain_format = auto_format ? GetExplainFormat(type) : ProfilerPrintFormat::FromString(format); + auto explain_format = auto_format ? GetExplainFormat(type) : ProfilerPrintFormat(format); auto res = rel->Explain(type, explain_format); D_ASSERT(res->type == duckdb::QueryResultType::MATERIALIZED_RESULT); auto &materialized = res->Cast();