From 61b6bd2fb345aadd747888997d7636f6b6f4b5e8 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Sat, 4 Jul 2026 15:42:07 -0700 Subject: [PATCH] stream: avoid draining merged iter sources Defer reading the next value from each source in merge() until the merged consumer resumes after receiving the previous value. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/consumers.js | 32 ++++++++++--------- .../test-stream-iter-consumers-merge.js | 27 ++++++++++++++++ 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/lib/internal/streams/iter/consumers.js b/lib/internal/streams/iter/consumers.js index 7d89607fee0a3b..7a2d5719b9b97b 100644 --- a/lib/internal/streams/iter/consumers.js +++ b/lib/internal/streams/iter/consumers.js @@ -463,20 +463,11 @@ function merge(...args) { if (result.done) { activeCount--; } else { - ArrayPrototypePush(ready, result.value); - // Immediately request the next value from this source - // (at most one pending .next() per source) - PromisePrototypeThen( - iterator.next(), - (r) => onSettled(iterator, r), - (err) => { - ArrayPrototypePush(ready, { __proto__: null, error: err }); - if (waitResolve) { - waitResolve(); - waitResolve = null; - } - }, - ); + ArrayPrototypePush(ready, { + __proto__: null, + iterator, + value: result.value, + }); } if (waitResolve) { waitResolve(); @@ -513,7 +504,18 @@ function merge(...args) { if (item?.error) { throw item.error; } - yield item; + yield item.value; + PromisePrototypeThen( + item.iterator.next(), + (r) => onSettled(item.iterator, r), + (err) => { + ArrayPrototypePush(ready, { __proto__: null, error: err }); + if (waitResolve) { + waitResolve(); + waitResolve = null; + } + }, + ); } // If sources are still active, wait for the next settlement diff --git a/test/parallel/test-stream-iter-consumers-merge.js b/test/parallel/test-stream-iter-consumers-merge.js index b551599f731462..859fcf8c1580fa 100644 --- a/test/parallel/test-stream-iter-consumers-merge.js +++ b/test/parallel/test-stream-iter-consumers-merge.js @@ -170,6 +170,32 @@ async function testMergeSignalDuringPendingMultiSourceRead() { await assert.rejects(next, { name: 'AbortError' }); } +async function testMergeDoesNotDrainSourcesWhileIdle() { + function source(n) { + return { + __proto__: null, + pulls: 0, + async *[Symbol.asyncIterator]() { + while (this.pulls < n) { + yield [Buffer.from(`${++this.pulls}`)]; + } + }, + }; + } + + const a = source(5); + const b = source(5); + const iterator = merge(a, b)[Symbol.asyncIterator](); + + await iterator.next(); + await new Promise(setImmediate); + + assert.strictEqual(a.pulls, 1); + assert.strictEqual(b.pulls, 1); + + await iterator.return?.(); +} + // merge() accepts string sources (normalized via from()) async function testMergeStringSources() { const batches = []; @@ -306,6 +332,7 @@ Promise.all([ testMergeConsumerBreak(), testMergeSignalMidIteration(), testMergeSignalDuringPendingMultiSourceRead(), + testMergeDoesNotDrainSourcesWhileIdle(), testMergeStringSources(), testMergeObjectLikeSources(), testMergeCleanupErrorOnly(),