From 29677a7b369053f46efad60d66ebcf02fb4db77f Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Sat, 6 Jun 2026 18:29:24 +0100 Subject: [PATCH] Add end-to-end response streaming via streamRequest() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add streamRequest(RequestInterface, callable $sink) to the Adapter interface, the Client wrapper, and both transports. It delivers the response body to the sink chunk-by-chunk as it arrives, keeping memory bounded regardless of body size; the returned response carries the status and headers with an empty body. Both adapters use their native push mechanism in the calling context — no curl_multi, no child coroutine: - cURL forwards CURLOPT_WRITEFUNCTION to the sink (sendRequest and streamRequest now share a transfer() core). - Swoole forwards write_func to the sink in the main coroutine (sendRequest/streamRequest share perform()). sendRequest() is unchanged: still PSR-18, buffered, decodable. Tests (shared AdapterContract, so proven on curl and Swoole): - stream a body to a sink, asserting incremental delivery - 8 MiB request and response payloads round-trip; the streamed response holds peak memory under 2 MiB - redirects are not followed when streaming The test server now runs with post_max_size=64M/memory_limit=256M so large request payloads are accepted. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 49 ++++++++ src/Client.php | 18 +++ src/Client/Adapter.php | 15 +++ src/Client/Adapter/Curl/Client.php | 68 ++++++++--- src/Client/Adapter/SwooleCoroutine/Client.php | 64 ++++++---- tests/Client/Adapter/AdapterContract.php | 111 ++++++++++++++++++ tests/ClientTest.php | 32 +++++ tests/Server/Http.php | 2 +- tests/server.php | 39 ++++++ 9 files changed, 361 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index d24f212..1b55671 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ It provides: - `Utopia\Client`, a PSR-18 client wrapper with immutable headers, auth, base URI, and timeout helpers. - `Utopia\Client\Adapter\Curl\Client`, a cURL transport for regular PHP runtimes. - `Utopia\Client\Adapter\SwooleCoroutine\Client`, a Swoole coroutine transport. +- `streamRequest()`, which delivers the response body to a sink callback chunk-by-chunk so large downloads and event streams are consumed with bounded memory (see [Streaming Responses](#streaming-responses)). - `Utopia\Psr7\*` PSR-7 messages and PSR-17 factories. - Request factories for JSON, forms, query strings, raw bodies, and multipart uploads. - Response decoders for JSON, form-encoded, and multipart payloads. @@ -159,6 +160,54 @@ foreach ($parts as $part) { } ``` +## Streaming Responses + +`streamRequest()` delivers the response body to a sink callback chunk-by-chunk as +it arrives, so large downloads, Server-Sent Events, and LLM token streams are +consumed with bounded memory — the whole body is never held at once. It returns a +response carrying the status and headers; the body is empty because the body was +handed to the sink. Both adapters support it. + +```php +streamRequest($request, function (string $chunk): void { + echo $chunk; +}); + +echo $response->getStatusCode(); +``` + +The sink runs as each chunk arrives, which means it also applies backpressure: the +transfer does not read ahead while the sink is still working. To stop early, throw +from the sink. + +```php +streamRequest($request, function (string $chunk) use (&$buffer): void { + $buffer .= $chunk; + + while (($newline = strpos($buffer, "\n")) !== false) { + $line = substr($buffer, 0, $newline); + $buffer = substr($buffer, $newline + 1); + // handle $line + } +}); +``` + +Notes: + +- Use `sendRequest()` for normal requests — it buffers the body and returns a + fully decodable response (`->json()`, `->form()`, `->multipart()`). +- `streamRequest()` returns only once the stream ends. For an unbounded stream + (e.g. SSE), set the transport timeout to no-limit (`CURLOPT_TIMEOUT_MS => 0` on + cURL, `timeout => -1` on Swoole) and stop by throwing from the sink. +- The Swoole adapter must run inside a coroutine, like `sendRequest()`. + ## Timeouts Timeout values are seconds. The helpers are immutable and delegate to the selected adapter. diff --git a/src/Client.php b/src/Client.php index 5b05314..549c780 100644 --- a/src/Client.php +++ b/src/Client.php @@ -100,6 +100,24 @@ public function sendRequest(RequestInterface $request): ResponseInterface ); } + /** + * Send a request and pass each response body chunk to $sink as it arrives. + * The returned response carries the status and headers; its body is empty. + * + * @param callable(string): void $sink + * + * @throws ClientExceptionInterface + */ + public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface + { + return $this->adapter->streamRequest( + $this->applyHeaders( + $this->applyBaseUri($request), + ), + $sink, + ); + } + private function applyBaseUri(RequestInterface $request): RequestInterface { if (!$this->baseUri instanceof \Psr\Http\Message\UriInterface) { diff --git a/src/Client/Adapter.php b/src/Client/Adapter.php index 43ee8a2..a38561a 100644 --- a/src/Client/Adapter.php +++ b/src/Client/Adapter.php @@ -4,11 +4,26 @@ namespace Utopia\Client; +use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; interface Adapter extends ClientInterface { public function withTimeout(float $seconds): static; public function withConnectTimeout(float $seconds): static; + + /** + * Send a request and pass each response body chunk to $sink as it arrives, + * keeping memory bounded regardless of body size. The returned response + * carries the status and headers; its body is empty because the body was + * delivered to $sink. + * + * @param callable(string): void $sink + * + * @throws ClientExceptionInterface + */ + public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface; } diff --git a/src/Client/Adapter/Curl/Client.php b/src/Client/Adapter/Curl/Client.php index 92a8c15..5d6b530 100644 --- a/src/Client/Adapter/Curl/Client.php +++ b/src/Client/Adapter/Curl/Client.php @@ -74,6 +74,53 @@ public function withConnectTimeout(float $seconds): static * @throws ClientExceptionInterface */ public function sendRequest(RequestInterface $request): ResponseInterface + { + $body = ''; + + $parsed = $this->transfer($request, static function (string $chunk) use (&$body): void { + $body .= $chunk; + }); + + return $this->responseBuilder->build( + $parsed['status'], + $parsed['reason'], + $parsed['headers'], + $body, + $parsed['protocol'], + ); + } + + /** + * @param callable(string): void $sink + * + * @throws ClientExceptionInterface + */ + public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface + { + $parsed = $this->transfer($request, $sink); + + return $this->responseBuilder->build( + $parsed['status'], + $parsed['reason'], + $parsed['headers'], + '', + $parsed['protocol'], + ); + } + + /** + * Run the transfer, forwarding each body chunk to $sink, and return the + * parsed status line and headers. cURL invokes the write callback as data + * arrives, so a streaming $sink sees the body chunk-by-chunk and the body + * is never fully held in memory. + * + * @param callable(string): void $sink + * + * @return array{protocol: string, status: int, reason: string, headers: array>} + * + * @throws ClientExceptionInterface + */ + private function transfer(RequestInterface $request, callable $sink): array { if (!\extension_loaded('curl')) { throw new AdapterPreconditionException($request, 'The curl extension is required.'); @@ -87,14 +134,13 @@ public function sendRequest(RequestInterface $request): ResponseInterface } $headers = ''; - $body = ''; $handle = curl_init($url); if (!$handle instanceof CurlHandle) { throw new AdapterInitializationException($request, 'Unable to initialize curl.'); } - $options = $this->options($request, $headers, $body); + $options = $this->options($request, $headers, $sink); try { if (curl_setopt_array($handle, $options) === false) { @@ -123,22 +169,16 @@ public function sendRequest(RequestInterface $request): ResponseInterface throw new InvalidResponseException($request, 'Received an invalid HTTP response.'); } - return $this->responseBuilder->build( - $parsed['status'], - $parsed['reason'], - $parsed['headers'], - $body, - $parsed['protocol'], - ); + return $parsed; } /** - * @param-out string $headers - * @param-out string $body + * @param-out string $headers + * @param callable(string): void $sink * * @return array */ - private function options(RequestInterface $request, string &$headers, string &$body): array + private function options(RequestInterface $request, string &$headers, callable $sink): array { $options = [ \CURLOPT_CUSTOMREQUEST => $request->getMethod(), @@ -153,9 +193,9 @@ private function options(RequestInterface $request, string &$headers, string &$b return \strlen($line); }, \CURLOPT_RETURNTRANSFER => false, - \CURLOPT_WRITEFUNCTION => static function (CurlHandle $handle, string $chunk) use (&$body): int { + \CURLOPT_WRITEFUNCTION => static function (CurlHandle $handle, string $chunk) use ($sink): int { unset($handle); - $body .= $chunk; + $sink($chunk); return \strlen($chunk); }, diff --git a/src/Client/Adapter/SwooleCoroutine/Client.php b/src/Client/Adapter/SwooleCoroutine/Client.php index f69cf70..e01f0ce 100644 --- a/src/Client/Adapter/SwooleCoroutine/Client.php +++ b/src/Client/Adapter/SwooleCoroutine/Client.php @@ -80,6 +80,31 @@ public function withConnectTimeout(float $seconds): static * @throws ClientExceptionInterface */ public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->perform($request, null); + } + + /** + * @param callable(string): void $sink + * + * @throws ClientExceptionInterface + */ + public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface + { + return $this->perform($request, $sink); + } + + /** + * Execute the request. When $sink is given, Swoole's write callback forwards + * each body chunk to it as data arrives, so the body is never fully held in + * memory and the returned response has an empty body; otherwise the body is + * buffered onto the response. + * + * @param (callable(string): void)|null $sink + * + * @throws ClientExceptionInterface + */ + private function perform(RequestInterface $request, ?callable $sink): ResponseInterface { if (!\extension_loaded('swoole')) { throw new AdapterPreconditionException($request, 'The swoole extension is required.'); @@ -107,10 +132,17 @@ public function sendRequest(RequestInterface $request): ResponseInterface throw new AdapterInitializationException($request, $throwable->getMessage(), (int) $throwable->getCode(), $throwable); } + $settings = $this->settings + [self::SETTING_HTTP2 => false]; + + if ($sink !== null) { + $settings['write_func'] = static function (SwooleClient $cli, string $chunk) use ($sink): void { + unset($cli); + $sink($chunk); + }; + } + try { - if ($client->set($this->settings + [ - self::SETTING_HTTP2 => false, - ]) === false) { + if ($client->set($settings) === false) { throw new InvalidArgumentException('Unable to configure Swoole client settings.'); } @@ -121,6 +153,12 @@ public function sendRequest(RequestInterface $request): ResponseInterface if ($client->setHeaders($this->requestHeaders($request)) === false) { throw new InvalidArgumentException('Unable to configure Swoole request headers.'); } + + $body = (string) $request->getBody(); + + if ($body !== '' && $client->setData($body) === false) { + throw new InvalidArgumentException('Unable to configure Swoole request body.'); + } } catch (InvalidArgumentException $invalidArgumentException) { $client->close(); @@ -131,24 +169,6 @@ public function sendRequest(RequestInterface $request): ResponseInterface throw new InvalidArgumentException($throwable->getMessage(), (int) $throwable->getCode(), $throwable); } - $body = (string) $request->getBody(); - - if ($body !== '') { - try { - if ($client->setData($body) === false) { - throw new InvalidArgumentException('Unable to configure Swoole request body.'); - } - } catch (InvalidArgumentException $invalidArgumentException) { - $client->close(); - - throw $invalidArgumentException; - } catch (Throwable $throwable) { - $client->close(); - - throw new InvalidArgumentException($throwable->getMessage(), (int) $throwable->getCode(), $throwable); - } - } - try { $result = $client->execute($this->path($request)); } catch (Throwable $throwable) { @@ -184,7 +204,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface $headers = []; } - $responseBody = $client->body; + $responseBody = $sink === null ? $client->body : ''; if (!\is_string($responseBody)) { $responseBody = ''; diff --git a/tests/Client/Adapter/AdapterContract.php b/tests/Client/Adapter/AdapterContract.php index 5f414e3..beabd9b 100644 --- a/tests/Client/Adapter/AdapterContract.php +++ b/tests/Client/Adapter/AdapterContract.php @@ -28,6 +28,11 @@ abstract class AdapterContract extends TestCase { + /** + * Payload size exercised by the request and response payload-size contracts. + */ + private const int PAYLOAD_SIZE = 8 * 1024 * 1024; + /** * @param array $transportOptions */ @@ -126,6 +131,25 @@ public function testItDoesNotFollowRedirectsByDefault(): void }); } + public function testItDoesNotFollowRedirectsWhenStreaming(): void + { + Http::serve(function (int $port): void { + $request = new Request\Factory()->createRequest(Method::GET, 'http://127.0.0.1:' . $port . '/redirect'); + $client = $this->createAdapter(); + + $received = ''; + + $response = $this->sendStream($client, $request, function (string $chunk) use (&$received): void { + $received .= $chunk; + }); + + $this->assertSame(302, $response->getStatusCode()); + $this->assertSame('/final', $response->getHeaderLine(Header::LOCATION)); + $this->assertSame('redirect', $received); + $this->assertSame('', (string) $response->getBody()); + }); + } + public function testItPreservesDuplicateMixedCaseHeadersAndBinaryBodies(): void { Http::serve(function (int $port): void { @@ -356,6 +380,23 @@ public function testItRoundTripsLargeRequestBodies(): void }); } + public function testItSendsLargeRequestPayloads(): void + { + Http::serve(function (int $port): void { + $body = str_repeat('a', self::PAYLOAD_SIZE); + $streamFactory = new Stream\Factory(); + $request = new Request\Factory() + ->createRequest(Method::POST, 'http://127.0.0.1:' . $port . '/body-info') + ->withHeader(Header::CONTENT_TYPE, ContentType::OCTET_STREAM) + ->withBody($streamFactory->createStream($body)); + $client = $this->createAdapter(); + + $response = $this->send($client, $request); + + $this->assertSame(self::PAYLOAD_SIZE . ':' . hash('sha256', $body), (string) $response->getBody()); + }); + } + public function testTimeoutHelpersReturnConfiguredClones(): void { $client = $this->createAdapter(); @@ -542,6 +583,49 @@ public function testItSendsZeroStringBodiesAsNonEmptyBodies(): void }); } + public function testItStreamsResponseBodiesToASink(): void + { + Http::serve(function (int $port): void { + $request = new Request\Factory()->createRequest(Method::GET, 'http://127.0.0.1:' . $port . '/stream'); + $client = $this->createAdapter(); + + $received = ''; + + $response = $this->sendStream($client, $request, function (string $chunk) use (&$received): void { + $received .= $chunk; + }); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame("chunk0\nchunk1\nchunk2\nchunk3\nchunk4\n", $received); + $this->assertSame('', (string) $response->getBody()); + }); + } + + public function testItStreamsLargeResponsesWithBoundedMemory(): void + { + Http::serve(function (int $port): void { + $request = new Request\Factory()->createRequest(Method::GET, 'http://127.0.0.1:' . $port . '/stream-large'); + $client = $this->createAdapter(); + + $expected = self::PAYLOAD_SIZE; + $hash = hash_init('sha256'); + $read = 0; + $baseline = memory_get_usage(); + $peak = 0; + + $response = $this->sendStream($client, $request, function (string $chunk) use ($hash, &$read, $baseline, &$peak): void { + hash_update($hash, $chunk); + $read += \strlen($chunk); + $peak = max($peak, memory_get_usage() - $baseline); + }); + + $this->assertSame(200, $response->getStatusCode()); + $this->assertSame($expected, $read); + $this->assertSame(hash('sha256', str_repeat('a', $expected)), hash_final($hash)); + $this->assertLessThan(2 * 1_048_576, $peak, 'Streaming must not buffer the whole body.'); + }); + } + private function send(Adapter $client, RequestInterface $request): ResponseInterface { $response = null; @@ -565,4 +649,31 @@ private function send(Adapter $client, RequestInterface $request): ResponseInter return $response; } + + /** + * @param callable(string): void $sink + */ + private function sendStream(Adapter $client, RequestInterface $request, callable $sink): ResponseInterface + { + $response = null; + $thrown = null; + + $this->runAdapter(function () use ($client, $request, $sink, &$response, &$thrown): void { + try { + $response = $client->streamRequest($request, $sink); + } catch (Throwable $throwable) { + $thrown = $throwable; + } + }); + + if ($thrown instanceof Throwable) { + throw $thrown; + } + + if (!$response instanceof ResponseInterface) { + self::fail('Adapter did not return a response.'); + } + + return $response; + } } diff --git a/tests/ClientTest.php b/tests/ClientTest.php index a6c1229..e7d9129 100644 --- a/tests/ClientTest.php +++ b/tests/ClientTest.php @@ -117,6 +117,26 @@ public function testItRejectsRelativeBaseUris(): void $client->withBaseUri('/api'); } + + public function testItStreamsThroughTheAdapterApplyingBaseUriAndHeaders(): void + { + $requestFactory = new Request\Factory(); + $received = ''; + $client = new Client(new RecordingAdapter()) + ->withBaseUri('https://api.example.com/v1') + ->withHeaders(['Accept' => 'application/json']); + + $response = $client->streamRequest( + $requestFactory->createRequest('GET', 'users'), + function (string $chunk) use (&$received): void { + $received .= $chunk; + }, + ); + + $this->assertSame('chunk', $received); + $this->assertSame('https://api.example.com/v1/users', $response->getHeaderLine('X-Request-Uri')); + $this->assertSame('application/json', $response->getHeaderLine('X-Request-Accept')); + } } final class RecordingAdapter implements Adapter @@ -173,4 +193,16 @@ public function sendRequest(RequestInterface $request): ResponseInterface return $response; } + + /** + * @param callable(string): void $sink + * + * @throws ClientExceptionInterface + */ + public function streamRequest(RequestInterface $request, callable $sink): ResponseInterface + { + $sink('chunk'); + + return $this->sendRequest($request); + } } diff --git a/tests/Server/Http.php b/tests/Server/Http.php index be19a0e..667bfdf 100644 --- a/tests/Server/Http.php +++ b/tests/Server/Http.php @@ -86,7 +86,7 @@ private static function availablePort(): int private static function start(int $port): mixed { $server = proc_open( - [\PHP_BINARY, '-S', '127.0.0.1:' . $port, \dirname(__DIR__) . '/server.php'], + [\PHP_BINARY, '-d', 'post_max_size=64M', '-d', 'memory_limit=256M', '-S', '127.0.0.1:' . $port, \dirname(__DIR__) . '/server.php'], [ 0 => ['pipe', 'r'], 1 => ['pipe', 'w'], diff --git a/tests/server.php b/tests/server.php index eadf732..cbcbf71 100644 --- a/tests/server.php +++ b/tests/server.php @@ -139,6 +139,45 @@ return; } +if ($path === '/stream') { + http_response_code(200); + header('Content-Type: text/plain;charset=UTF-8'); + + while (ob_get_level() > 0) { + ob_end_flush(); + } + + for ($i = 0; $i < 5; $i++) { + echo 'chunk' . $i . "\n"; + flush(); + usleep(20_000); + } + + return; +} + +if ($path === '/stream-large') { + $chunkSize = 65_536; + $chunkCount = 128; + + http_response_code(200); + header('Content-Type: application/octet-stream'); + header('Content-Length: ' . ($chunkSize * $chunkCount)); + + while (ob_get_level() > 0) { + ob_end_flush(); + } + + $chunk = str_repeat('a', $chunkSize); + + for ($i = 0; $i < $chunkCount; $i++) { + echo $chunk; + flush(); + } + + return; +} + if ($path === '/slow') { sleep(1); http_response_code(200);