From 34f39de7b0ec2423bfe53ce6986a748511235950 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 4 Jul 2026 22:41:39 +0200 Subject: [PATCH 1/2] stream: speed up async iteration over WHATWG byte streams for await / reader.read() loops over byte streams were ~4x slower than over default streams. Three per-chunk costs, none required by the spec: - ArrayBufferViewGetBuffer/ByteLength/ByteOffset went through ReflectGet(view.constructor.prototype, ...), a reflective get that is ~3.5x slower than the original prototype getters from primordials and spoofable through a user-defined .constructor to boot. - The buffered fast paths in ReadableStreamDefaultReader.read() and the async iterator only covered default controllers, so byte streams with queued data still allocated a read request and PromiseWithResolvers per chunk. Byte-queue dequeue is fully synchronous (it is the queue-filled arm of the byte controller's pull steps), so both fast paths now resolve directly from the byte queue. - readableByteStreamControllerEnqueue re-ran the reader brand check and re-loaded the read request list four times per chunk across HasDefaultReader / ProcessReadRequestsUsingQueue / GetNumReadRequests / FulfillReadRequest; it now does a single pass. The async iterator also reuses its read request object across reads (at most one is ever in flight). benchmark/webstreams interleaved same-day A/B, --runs 10: readable-async-iterator bytes +16.3% (***), readable-read byob +9.1% (***), all other rows neutral. Profiler harness: parked byte iteration +14%, buffered byte iteration +37%, buffered byte read loop +18%, default-stream rows at parity. WPT streams/compression/encoding subtests identical to baseline. Signed-off-by: Matteo Collina --- lib/internal/webstreams/readablestream.js | 137 +++++++++++++++------- lib/internal/webstreams/util.js | 32 +++-- 2 files changed, 118 insertions(+), 51 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index e0a84f30ec8380..a9d57ddeeb1614 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -501,6 +501,12 @@ class ReadableStream { current: undefined, }; let started = false; + // A single reusable read request: at most one read is ever in flight + // (next() chains through state.current), and the request is consumed + // before the next read starts, so only its promise record changes + // per read. + // eslint-disable-next-line no-use-before-define + const readRequest = new ReadableStreamAsyncIteratorReadRequest(reader, state, undefined); // The nextSteps function is not an async function in order // to make it more efficient. Because nextSteps explicitly @@ -519,8 +525,8 @@ class ReadableStream { } const promise = PromiseWithResolvers(); - // eslint-disable-next-line no-use-before-define - readableStreamDefaultReaderRead(reader, new ReadableStreamAsyncIteratorReadRequest(reader, state, promise)); + readRequest.promise = promise; + readableStreamDefaultReaderRead(reader, readRequest); return promise.promise; } @@ -574,28 +580,38 @@ class ReadableStream { } // No read is in flight. Mirror the buffered fast path of // ReadableStreamDefaultReader.read(): when data is already queued - // in a default controller, resolve immediately without allocating - // a read request. The result settles synchronously, so leaving + // in the controller, resolve immediately without allocating a + // read request. The result settles synchronously, so leaving // state.current undefined matches the state the slow path reaches // once its read request callbacks have settled. const stream = reader[kState].stream; - if (!state.done && stream !== undefined) { + if (!state.done && stream !== undefined && + stream[kState].state === 'readable') { const controller = stream[kState].controller; - if (stream[kState].state === 'readable' && - isReadableStreamDefaultController(controller) && - controller[kState].queue.length > 0) { - stream[kState].disturbed = true; - const chunk = dequeueValue(controller); - - if (controller[kState].closeRequested && - !controller[kState].queue.length) { - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamClose(stream); - } else { - readableStreamDefaultControllerCallPullIfNeeded(controller); + if (isReadableStreamDefaultController(controller)) { + if (controller[kState].queue.length > 0) { + stream[kState].disturbed = true; + const chunk = dequeueValue(controller); + + if (controller[kState].closeRequested && + !controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } else { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + + return PromiseResolve({ done: false, value: chunk }); } + } else if (controller[kState].queueTotalSize > 0) { + // Byte controller with buffered data: same shape as above via + // the queue-filled arm of the byte controller's pull steps. + stream[kState].disturbed = true; + return PromiseResolve({ + done: false, - return PromiseResolve({ done: false, value: chunk }); + value: readableByteStreamControllerDequeueChunk(controller), + }); } } state.current = nextSteps(); @@ -918,24 +934,36 @@ class ReadableStreamDefaultReader { const stream = this[kState].stream; const controller = stream[kState].controller; - // Fast path: if data is already buffered in a default controller, + // Fast path: if data is already buffered in the controller's queue, // return a resolved promise immediately without creating a read request. // This is spec-compliant because read() returns a Promise, and // Promise.resolve() callbacks still run in the microtask queue. - if (stream[kState].state === 'readable' && - isReadableStreamDefaultController(controller) && - controller[kState].queue.length > 0) { - stream[kState].disturbed = true; - const chunk = dequeueValue(controller); + if (stream[kState].state === 'readable') { + if (isReadableStreamDefaultController(controller)) { + if (controller[kState].queue.length > 0) { + stream[kState].disturbed = true; + const chunk = dequeueValue(controller); + + if (controller[kState].closeRequested && !controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } else { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } - if (controller[kState].closeRequested && !controller[kState].queue.length) { - readableStreamDefaultControllerClearAlgorithms(controller); - readableStreamClose(stream); - } else { - readableStreamDefaultControllerCallPullIfNeeded(controller); + return PromiseResolve({ done: false, value: chunk }); + } + } else if (controller[kState].queueTotalSize > 0) { + // Byte controller with buffered data: mirror the queue-filled arm + // of its pull steps (which never consults pendingPullIntos) minus + // the read request. + stream[kState].disturbed = true; + return PromiseResolve({ + done: false, + + value: readableByteStreamControllerDequeueChunk(controller), + }); } - - return PromiseResolve({ done: false, value: chunk }); } // Slow path: create request and go through normal flow @@ -3044,9 +3072,23 @@ function readableByteStreamControllerEnqueue(controller, chunk) { } } - if (readableStreamHasDefaultReader(stream)) { - readableByteStreamControllerProcessReadRequestsUsingQueue(controller); - if (!readableStreamGetNumReadRequests(stream)) { + // Single consolidated pass over the reader state. The spec routes this + // through HasDefaultReader / ProcessReadRequestsUsingQueue / + // GetNumReadRequests / FulfillReadRequest, which would re-run the same + // reader brand check and re-load the read request list four times on + // this per-chunk path. + const { reader } = stream[kState]; + if (reader !== undefined && + reader[kState] !== undefined && + reader[kType] === 'ReadableStreamDefaultReader') { + const { readRequests } = reader[kState]; + if (readRequests.length && controller[kState].queueTotalSize > 0) { + // Only possible when data was enqueued while the stream was not + // being read; read requests otherwise never coexist with a + // non-empty queue. + readableByteStreamControllerProcessReadRequestsUsingQueue(controller); + } + if (!readRequests.length) { readableByteStreamControllerEnqueueChunkToQueue( controller, transferredBuffer, @@ -3060,7 +3102,8 @@ function readableByteStreamControllerEnqueue(controller, chunk) { } const transferredView = new Uint8Array(transferredBuffer, byteOffset, byteLength); - readableStreamFulfillReadRequest(stream, transferredView, false); + const readRequest = ArrayPrototypeShift(readRequests); + readRequest[kChunk](transferredView); } } else if (readableStreamHasBYOBReader(stream)) { readableByteStreamControllerEnqueueChunkToQueue( @@ -3395,22 +3438,28 @@ function readableByteStreamControllerCancelSteps(controller, reason) { return result; } -function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) { - const { - queue, - queueTotalSize, - } = controller[kState]; - assert(queueTotalSize > 0); +// Dequeues the first chunk of the byte queue as a Uint8Array view, +// handling queue drain (close-on-empty or pull) before the view is +// created. This is the [[queueTotalSize]] > 0 arm of the byte +// controller's pull steps; it is also called directly from the +// buffered fast paths in ReadableStreamDefaultReader.read() and the +// async iterator, which resolve with the view without allocating a +// read request. +function readableByteStreamControllerDequeueChunk(controller) { + assert(controller[kState].queueTotalSize > 0); const { buffer, byteOffset, byteLength, - } = ArrayPrototypeShift(queue); + } = ArrayPrototypeShift(controller[kState].queue); controller[kState].queueTotalSize -= byteLength; readableByteStreamControllerHandleQueueDrain(controller); - const view = new Uint8Array(buffer, byteOffset, byteLength); - readRequest[kChunk](view); + return new Uint8Array(buffer, byteOffset, byteLength); +} + +function readableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) { + readRequest[kChunk](readableByteStreamControllerDequeueChunk(controller)); } function readableByteStreamControllerProcessReadRequestsUsingQueue(controller) { diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index cd203b77c2d22a..fb5e6a3a8fc7d0 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -7,14 +7,19 @@ const { ArrayPrototypePush, ArrayPrototypeShift, AsyncIteratorPrototype, + DataViewPrototypeGetBuffer, + DataViewPrototypeGetByteLength, + DataViewPrototypeGetByteOffset, FunctionPrototypeCall, MathMax, NumberIsNaN, PromisePrototypeThen, PromiseReject, PromiseResolve, - ReflectGet, Symbol, + TypedArrayPrototypeGetBuffer, + TypedArrayPrototypeGetByteLength, + TypedArrayPrototypeGetByteOffset, Uint8Array, } = primordials; @@ -41,6 +46,10 @@ const { const assert = require('internal/assert'); +const { + isDataView, +} = require('internal/util/types'); + const { validateFunction, } = require('internal/validators'); @@ -93,20 +102,29 @@ function customInspect(depth, options, name, data) { return `${name} ${inspect(data, opts)}`; } -// These are defensive to work around the possibility that -// the buffer, byteLength, and byteOffset properties on -// ArrayBuffer and ArrayBufferView's may have been tampered with. +// These use the original prototype getters so that user tampering with +// the buffer, byteLength, and byteOffset properties on ArrayBuffer and +// ArrayBufferView's is not observed. They run once or more per chunk on +// every byte-stream path, so they must not go through a reflective get +// (the previous view.constructor.prototype lookup was both slower and +// spoofable via a user-defined .constructor). function ArrayBufferViewGetBuffer(view) { - return ReflectGet(view.constructor.prototype, 'buffer', view); + return isDataView(view) ? + DataViewPrototypeGetBuffer(view) : + TypedArrayPrototypeGetBuffer(view); } function ArrayBufferViewGetByteLength(view) { - return ReflectGet(view.constructor.prototype, 'byteLength', view); + return isDataView(view) ? + DataViewPrototypeGetByteLength(view) : + TypedArrayPrototypeGetByteLength(view); } function ArrayBufferViewGetByteOffset(view) { - return ReflectGet(view.constructor.prototype, 'byteOffset', view); + return isDataView(view) ? + DataViewPrototypeGetByteOffset(view) : + TypedArrayPrototypeGetByteOffset(view); } function cloneAsUint8Array(view) { From fa8ad294ad5cdedaca9372fc14a5d21154da5dc8 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 4 Jul 2026 22:41:39 +0200 Subject: [PATCH 2/2] benchmark: add bytes variant to webstreams async-iterator Signed-off-by: Matteo Collina --- .../webstreams/readable-async-iterator.js | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/benchmark/webstreams/readable-async-iterator.js b/benchmark/webstreams/readable-async-iterator.js index 8cdc4785db3dac..73ff3e1ef2b165 100644 --- a/benchmark/webstreams/readable-async-iterator.js +++ b/benchmark/webstreams/readable-async-iterator.js @@ -6,23 +6,40 @@ const { const bench = common.createBenchmark(main, { n: [1e5], + type: ['normal', 'bytes'], }); -async function main({ n }) { - const rs = new ReadableStream({ - pull: function(controller) { - controller.enqueue(1); - }, - }); +async function main({ n, type }) { + const rs = type === 'bytes' ? + new ReadableStream({ + type: 'bytes', + pull: function(controller) { + controller.enqueue(new Uint8Array(1)); + }, + }) : + new ReadableStream({ + pull: function(controller) { + controller.enqueue(1); + }, + }); let x = 0; bench.start(); - for await (const chunk of rs) { - x += chunk; - if (x > n) { - break; + if (type === 'bytes') { + for await (const chunk of rs) { + x += chunk.byteLength; + if (x > n) { + break; + } + } + } else { + for await (const chunk of rs) { + x += chunk; + if (x > n) { + break; + } } } // Use x to ensure V8 does not optimize away the loop as a noop.