From 77169d2712535d6fe38833e8aa573934a6ebe8ca Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 09:39:29 +0100 Subject: [PATCH 1/9] Refactor Swoole concurrency to receiver fan-out; remove AMQP broker Move the unit of concurrency from "N consumers each running the whole loop" to "one receiver fanning message processing out to a bounded pool of coroutines". This decouples the concurrency level from the connection count and makes the Locking decorator useful instead of a footgun. - Add Concurrency\Executor with Inline (default, sequential) and Coroutine (bounded fan-out via a semaphore Channel + WaitGroup) strategies. maxCoroutines now lives on the executor. - Broker\Redis takes two connections: a dedicated `receive` connection for the blocking pop loop and a `work` connection (Locking-wrapped when concurrent) for bookkeeping/acks. Processing is handed to the executor; `work` defaults to `receive` so existing single-connection callers are unchanged. - Adapter\Swoole drops the N-coroutine wrapper and maxCoroutines; it is back to runtime setup plus coroutine-local DI context. - Remove the AMQP broker entirely (broker, test, server, compose services, CI matrix entry, php-amqplib dependency). php-amqplib channels are not coroutine-safe and AMQP was not on the active path. - Wire the Swoole E2E worker to the concurrent two-connection broker. - Make the priority-ordering tests use a queue no worker consumes, so they no longer race the live consumer. - Add CI matrix entries for the Concurrency, Locking and SwooleConcurrency suites. - Disable composer platform-check: pools 1.0.3 declares php >=8.4 while the project and its Docker images target 8.3, which otherwise aborts the runtime guard. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/tests.yml | 4 +- composer.json | 4 +- composer.lock | 312 +----------------- docker-compose.yml | 24 -- src/Queue/Adapter/Swoole.php | 75 +---- src/Queue/Broker/AMQP.php | 226 ------------- src/Queue/Broker/Redis.php | 181 ++++++---- src/Queue/Concurrency/Coroutine.php | 60 ++++ src/Queue/Concurrency/Executor.php | 29 ++ src/Queue/Concurrency/Inline.php | 21 ++ src/Queue/Error/ConsumerFailures.php | 25 -- tests/Queue/E2E/Adapter/AMQPTest.php | 20 -- tests/Queue/E2E/Adapter/ConcurrencyTest.php | 81 +++++ .../Queue/E2E/Adapter/InMemoryConnection.php | 197 +++++++++++ .../E2E/Adapter/SwooleConcurrencyTest.php | 157 +++++---- .../E2E/Adapter/SwooleRedisClusterTest.php | 15 +- tests/Queue/E2E/Adapter/SwooleTest.php | 15 +- tests/Queue/servers/AMQP/Dockerfile | 3 - tests/Queue/servers/AMQP/worker.php | 42 --- tests/Queue/servers/Swoole/worker.php | 11 +- 20 files changed, 650 insertions(+), 852 deletions(-) delete mode 100644 src/Queue/Broker/AMQP.php create mode 100644 src/Queue/Concurrency/Coroutine.php create mode 100644 src/Queue/Concurrency/Executor.php create mode 100644 src/Queue/Concurrency/Inline.php delete mode 100644 src/Queue/Error/ConsumerFailures.php delete mode 100644 tests/Queue/E2E/Adapter/AMQPTest.php create mode 100644 tests/Queue/E2E/Adapter/ConcurrencyTest.php create mode 100644 tests/Queue/E2E/Adapter/InMemoryConnection.php delete mode 100644 tests/Queue/servers/AMQP/Dockerfile delete mode 100644 tests/Queue/servers/AMQP/worker.php diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed5bb80..ce53b47 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,10 +15,12 @@ jobs: matrix: adapter: [ - AMQP, + Concurrency, + Locking, Pool, SwooleRedisCluster, Swoole, + SwooleConcurrency, Workerman, ] diff --git a/composer.json b/composer.json index 751c7ab..ad6281c 100644 --- a/composer.json +++ b/composer.json @@ -27,7 +27,6 @@ }, "require": { "php": ">=8.3", - "php-amqplib/php-amqplib": "^3.7", "utopia-php/di": "0.3.*", "utopia-php/lock": "0.2.*", "utopia-php/servers": "0.4.*", @@ -51,6 +50,7 @@ "allow-plugins": { "php-http/discovery": true, "tbachert/spi": true - } + }, + "platform-check": false } } diff --git a/composer.lock b/composer.lock index 87cfa7d..7e8f3cf 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1d5f6649fb727b85e109128125f7ff9d", + "content-hash": "f6db92bdd8d8ccbb163f74f7a79be6a5", "packages": [ { "name": "brick/math", @@ -741,206 +741,6 @@ }, "time": "2026-01-21T04:14:03+00:00" }, - { - "name": "paragonie/constant_time_encoding", - "version": "v3.1.3", - "source": { - "type": "git", - "url": "https://github.com/paragonie/constant_time_encoding.git", - "reference": "d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/paragonie/constant_time_encoding/zipball/d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77", - "reference": "d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77", - "shasum": "" - }, - "require": { - "php": "^8" - }, - "require-dev": { - "infection/infection": "^0", - "nikic/php-fuzzer": "^0", - "phpunit/phpunit": "^9|^10|^11", - "vimeo/psalm": "^4|^5|^6" - }, - "type": "library", - "autoload": { - "psr-4": { - "ParagonIE\\ConstantTime\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Paragon Initiative Enterprises", - "email": "security@paragonie.com", - "homepage": "https://paragonie.com", - "role": "Maintainer" - }, - { - "name": "Steve 'Sc00bz' Thomas", - "email": "steve@tobtu.com", - "homepage": "https://www.tobtu.com", - "role": "Original Developer" - } - ], - "description": "Constant-time Implementations of RFC 4648 Encoding (Base-64, Base-32, Base-16)", - "keywords": [ - "base16", - "base32", - "base32_decode", - "base32_encode", - "base64", - "base64_decode", - "base64_encode", - "bin2hex", - "encoding", - "hex", - "hex2bin", - "rfc4648" - ], - "support": { - "email": "info@paragonie.com", - "issues": "https://github.com/paragonie/constant_time_encoding/issues", - "source": "https://github.com/paragonie/constant_time_encoding" - }, - "time": "2025-09-24T15:06:41+00:00" - }, - { - "name": "paragonie/random_compat", - "version": "v9.99.100", - "source": { - "type": "git", - "url": "https://github.com/paragonie/random_compat.git", - "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a", - "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a", - "shasum": "" - }, - "require": { - "php": ">= 7" - }, - "require-dev": { - "phpunit/phpunit": "4.*|5.*", - "vimeo/psalm": "^1" - }, - "suggest": { - "ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes." - }, - "type": "library", - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Paragon Initiative Enterprises", - "email": "security@paragonie.com", - "homepage": "https://paragonie.com" - } - ], - "description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7", - "keywords": [ - "csprng", - "polyfill", - "pseudorandom", - "random" - ], - "support": { - "email": "info@paragonie.com", - "issues": "https://github.com/paragonie/random_compat/issues", - "source": "https://github.com/paragonie/random_compat" - }, - "time": "2020-10-15T08:29:30+00:00" - }, - { - "name": "php-amqplib/php-amqplib", - "version": "v3.7.4", - "source": { - "type": "git", - "url": "https://github.com/php-amqplib/php-amqplib.git", - "reference": "381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd", - "reference": "381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd", - "shasum": "" - }, - "require": { - "ext-mbstring": "*", - "ext-sockets": "*", - "php": "^7.2||^8.0", - "phpseclib/phpseclib": "^2.0|^3.0" - }, - "conflict": { - "php": "7.4.0 - 7.4.1" - }, - "replace": { - "videlalvaro/php-amqplib": "self.version" - }, - "require-dev": { - "ext-curl": "*", - "nategood/httpful": "^0.2.20", - "phpunit/phpunit": "^7.5|^9.5", - "squizlabs/php_codesniffer": "^3.6" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "3.0-dev" - } - }, - "autoload": { - "psr-4": { - "PhpAmqpLib\\": "PhpAmqpLib/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "LGPL-2.1-or-later" - ], - "authors": [ - { - "name": "Alvaro Videla", - "role": "Original Maintainer" - }, - { - "name": "Raúl Araya", - "email": "nubeiro@gmail.com", - "role": "Maintainer" - }, - { - "name": "Luke Bakken", - "email": "luke@bakken.io", - "role": "Maintainer" - }, - { - "name": "Ramūnas Dronga", - "email": "github@ramuno.lt", - "role": "Maintainer" - } - ], - "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", - "homepage": "https://github.com/php-amqplib/php-amqplib/", - "keywords": [ - "message", - "queue", - "rabbitmq" - ], - "support": { - "issues": "https://github.com/php-amqplib/php-amqplib/issues", - "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.4" - }, - "time": "2025-11-23T17:00:56+00:00" - }, { "name": "php-http/discovery", "version": "1.20.0", @@ -1020,116 +820,6 @@ }, "time": "2024-10-02T11:20:13+00:00" }, - { - "name": "phpseclib/phpseclib", - "version": "3.0.52", - "source": { - "type": "git", - "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "2adaefc83df2ec548558307690f376dd7d4f4fce" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/2adaefc83df2ec548558307690f376dd7d4f4fce", - "reference": "2adaefc83df2ec548558307690f376dd7d4f4fce", - "shasum": "" - }, - "require": { - "paragonie/constant_time_encoding": "^1|^2|^3", - "paragonie/random_compat": "^1.4|^2.0|^9.99.99", - "php": ">=5.6.1" - }, - "require-dev": { - "phpunit/phpunit": "*" - }, - "suggest": { - "ext-dom": "Install the DOM extension to load XML formatted public keys.", - "ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.", - "ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.", - "ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.", - "ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations." - }, - "type": "library", - "autoload": { - "files": [ - "phpseclib/bootstrap.php" - ], - "psr-4": { - "phpseclib3\\": "phpseclib/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Jim Wigginton", - "email": "terrafrost@php.net", - "role": "Lead Developer" - }, - { - "name": "Patrick Monnerat", - "email": "pm@datasphere.ch", - "role": "Developer" - }, - { - "name": "Andreas Fischer", - "email": "bantu@phpbb.com", - "role": "Developer" - }, - { - "name": "Hans-Jürgen Petrich", - "email": "petrich@tronic-media.com", - "role": "Developer" - }, - { - "name": "Graham Campbell", - "email": "graham@alt-three.com", - "role": "Developer" - } - ], - "description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.", - "homepage": "http://phpseclib.sourceforge.net", - "keywords": [ - "BigInteger", - "aes", - "asn.1", - "asn1", - "blowfish", - "crypto", - "cryptography", - "encryption", - "rsa", - "security", - "sftp", - "signature", - "signing", - "ssh", - "twofish", - "x.509", - "x509" - ], - "support": { - "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.52" - }, - "funding": [ - { - "url": "https://github.com/terrafrost", - "type": "github" - }, - { - "url": "https://www.patreon.com/phpseclib", - "type": "patreon" - }, - { - "url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib", - "type": "tidelift" - } - ], - "time": "2026-04-27T07:02:15+00:00" - }, { "name": "psr/container", "version": "2.0.2", diff --git a/docker-compose.yml b/docker-compose.yml index 6b230a3..5419ca3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,6 @@ services: - ./tests:/usr/src/code/tests depends_on: - swoole - - swoole-amqp - swoole-redis-cluster - workerman @@ -35,18 +34,6 @@ services: redis-cluster-0: condition: service_healthy - swoole-amqp: - container_name: swoole-amqp - build: ./tests/Queue/servers/AMQP/. - command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php - volumes: - - ./vendor:/usr/src/code/vendor - - ./src:/usr/src/code/src - - ./tests:/usr/src/code/tests - depends_on: - amqp: - condition: service_healthy - workerman: container_name: workerman build: ./tests/Queue/servers/Workerman/. @@ -116,14 +103,3 @@ services: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s start_period: 0s - - amqp: - image: rabbitmq:4 - environment: - RABBITMQ_DEFAULT_USER: amqp - RABBITMQ_DEFAULT_PASS: amqp - RABBITMQ_DEFAULT_VHOST: "/" - healthcheck: - test: [ "CMD", "rabbitmqctl", "node_health_check" ] - start_interval: 1s - start_period: 0s diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index aab5623..c197c93 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -3,12 +3,9 @@ namespace Utopia\Queue\Adapter; use Swoole\Coroutine; -use Swoole\Coroutine\Channel; use Swoole\Process; use Utopia\DI\Container; use Utopia\Queue\Adapter; -use Utopia\Queue\Consumer; -use Utopia\Queue\Error\ConsumerFailures; use Utopia\Queue\Message; class Swoole extends Adapter @@ -24,18 +21,6 @@ class Swoole extends Adapter /** @var callable[] */ protected array $onWorkerStop = []; - public function __construct( - Consumer $consumer, - int $workerNum, - string $queue, - string $namespace = 'utopia-queue', - protected int $maxCoroutines = 1, - Container $resources = new Container(), - ) { - parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); - $this->maxCoroutines = \max(1, $maxCoroutines); - } - public function start(): self { for ($i = 0; $i < $this->workerNum; $i++) { @@ -79,54 +64,30 @@ protected function spawnWorker(int $workerId): void $this->workers[$pid] = $process; } + /** + * Drives the consumer's receive loop. Concurrency is the broker's concern + * now (it fans message processing out via an Executor); the adapter only + * gives each processed message its own DI container, stored in the + * coroutine context so it stays isolated across concurrent handlers. + */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $messageCallback = function (Message $message) use ($messageCallback) { - Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); - - return $messageCallback($message); - }; - - $errorCallback = function (?Message $message, \Throwable $error) use ($errorCallback) { - if ($message === null) { + $this->consumer->consume( + $this->queue, + function (Message $message) use ($messageCallback) { Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); - } - $errorCallback($message, $error); - }; - - $channel = new Channel($this->maxCoroutines); - $errors = []; - - for ($i = 0; $i < $this->maxCoroutines; $i++) { - Coroutine::create(function () use ($messageCallback, $successCallback, $errorCallback, $channel, &$errors) { - try { - $this->consumer->consume( - $this->queue, - $messageCallback, - $successCallback, - $errorCallback, - ); - } catch (\Throwable $error) { - $errors[] = $error; - $this->consumer->close(); - $channel->push(true); - return; + return $messageCallback($message); + }, + $successCallback, + function (?Message $message, \Throwable $error) use ($errorCallback) { + if ($message === null) { + Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); } - $channel->push(true); - }); - } - - for ($i = 0; $i < $this->maxCoroutines; $i++) { - $channel->pop(); - } - - $channel->close(); - - if ($errors !== []) { - throw new ConsumerFailures($errors); - } + $errorCallback($message, $error); + }, + ); } public function context(): Container diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php deleted file mode 100644 index 3ec2964..0000000 --- a/src/Queue/Broker/AMQP.php +++ /dev/null @@ -1,226 +0,0 @@ -exchangeArguments[$key] = $value; - } - - public function setQueueArgument(string $key, string $value): void - { - $this->queueArguments[$key] = $value; - } - - public function setConsumerArguments(string $key, string $value): void - { - $this->consumerArguments[$key] = $value; - } - - public function configureConnection(callable $callback): void - { - $this->connectionConfigHook = $callback; - } - - public function configureChannel(callable $callback): void - { - $this->channelConfigHook = $callback; - } - - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { - $processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) { - try { - $nextMessage = json_decode($amqpMessage->getBody(), associative: true) ?? false; - if (!$nextMessage) { - $amqpMessage->nack(); - return; - } - - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; - $message = new Message($nextMessage); - - $result = $messageCallback($message); - match (true) { - $result instanceof Commit => $amqpMessage->ack(true), - $result instanceof NoCommit => null, - default => $amqpMessage->ack() - }; - $successCallback($message); - } catch (Retryable $e) { - $amqpMessage->nack(requeue: true); - $errorCallback($message ?? null, $e); - } catch (\Throwable $th) { - $amqpMessage->nack(); - $errorCallback($message ?? null, $th); - } - }; - - $this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) { - // It's good practice for the consumer to set up exchange and queues. - // This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and - // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. - - // 1. Declare the exchange and a dead-letter-exchange. - $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - - // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"]))); - $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); - - // 3. Declare the dead-letter-queue and bind it to the DLX. - $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments)); - $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); - - // 4. Instruct to consume on the working queue. - $channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments)); - }); - - // Run ->consume in own callback to avoid re-running queue creation flow on error. - $this->withChannel(function (AMQPChannel $channel) { - // 5. Consume. This blocks until the connection gets closed. - $channel->consume(); - }); - } - - public function close(): void - { - $this->channel?->stopConsume(); - $this->channel?->getConnection()?->close(); - } - - public function enqueue(Queue $queue, array $payload, bool $priority = false): bool - { - $payload = [ - 'pid' => \uniqid(more_entropy: true), - 'queue' => $queue->name, - 'timestamp' => time(), - 'payload' => $payload - ]; - $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); - $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { - $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); - }); - return true; - } - - public function retry(Queue $queue, ?int $limit = null): void - { - // This is a no-op for AMQP - } - - /** - * @throws \Exception - */ - public function getQueueSize(Queue $queue, bool $failedJobs = false): int - { - $queueName = $queue->name; - if ($failedJobs) { - $queueName = $queueName . '.failed'; - } - - $messageCount = 0; - $this->withChannel(function (AMQPChannel $channel) use ($queueName, &$messageCount) { - try { - [, $messageCount] = $channel->queue_declare($queueName, passive: true); - } catch (AMQPProtocolChannelException $e) { - $this->channel = null; - if ($e->getCode() === 404) { - return; - } - - throw $e; - } - }); - - return $messageCount; - } - - /** - * @param callable(AMQPChannel $channel): void $callback - * @throws \Exception - */ - private function withChannel(callable $callback): void - { - $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection( - $this->host, - $this->port, - $this->user, - $this->password, - $this->vhost, - connection_timeout: $this->connectTimeout, - read_write_timeout: $this->readWriteTimeout, - heartbeat: $this->heartbeat, - ); - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); - } - $channel = $connection->channel(); - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); - } - return $channel; - }; - - if (!$this->channel) { - $this->channel = $createChannel(); - } - - try { - $callback($this->channel); - } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); - } - } -} diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 161e687..bae09a8 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -2,6 +2,8 @@ namespace Utopia\Queue\Broker; +use Utopia\Queue\Concurrency\Executor; +use Utopia\Queue\Concurrency\Inline; use Utopia\Queue\Connection; use Utopia\Queue\Consumer; use Utopia\Queue\Message; @@ -14,6 +16,22 @@ class Redis implements Publisher, Consumer private const int RECONNECT_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; + /** + * Dedicated to the blocking receive loop, which is driven by a single + * coroutine. Because it has exactly one user it never needs locking. + */ + private readonly Connection $receive; + + /** + * Carries every other command — the per-message bookkeeping, acknowledging + * and publishing. When messages are processed concurrently (see the + * {@see Executor}), several coroutines share this connection, so it should + * be wrapped in {@see \Utopia\Queue\Connection\Locking}. + */ + private readonly Connection $work; + + private readonly Executor $executor; + private bool $closed = false; /** * @var (callable(Queue, \Throwable, int, int): void)|null @@ -24,8 +42,22 @@ class Redis implements Publisher, Consumer */ private $reconnectSuccessCallback = null; - public function __construct(private readonly Connection $connection) - { + /** + * @param Connection $receive Connection used for the blocking receive loop. + * @param Connection|null $work Connection used for every other command; defaults + * to $receive, which is safe while processing inline. + * Pass a separate, locked connection when using a + * concurrent executor. + * @param Executor $executor Strategy for processing each received message. + */ + public function __construct( + Connection $receive, + ?Connection $work = null, + Executor $executor = new Inline(), + ) { + $this->receive = $receive; + $this->work = $work ?? $receive; + $this->executor = $executor; } public function setReconnectCallback(?callable $callback): self @@ -49,10 +81,11 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe while (!$this->closed) { /** - * Waiting for next Job. + * Waiting for next Job. The receive loop runs in a single coroutine, + * so the blocking pop has exclusive use of its connection. */ try { - $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); + $nextMessage = $this->receive->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); if ($reconnectAttempt > 0) { $this->triggerReconnectSuccessCallback($queue, $reconnectAttempt); } @@ -67,7 +100,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $reconnectAttempt++; try { - $this->connection->close(); + $this->receive->close(); } catch (\Throwable) { } @@ -89,59 +122,91 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $message = new Message($nextMessage); /** - * Move Job to Jobs and it's PID to the processing list. + * Hand the message off to the executor. Inline runs it here and now; + * a concurrent executor fans it out onto a coroutine while the loop + * goes back to receiving (bounded by the executor's backpressure). */ - $this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl); - $this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); + $this->executor->submit(function () use ($queue, $message, $nextMessage, $messageCallback, $successCallback, $errorCallback) { + $this->process($queue, $message, $nextMessage, $messageCallback, $successCallback, $errorCallback); + }); + } + + /** + * Wait for in-flight messages to finish before returning, so a graceful + * shutdown does not abandon work that was already received. + */ + $this->executor->drain(); + } + /** + * Process a single received message: record it as in-flight, run the + * handler, then acknowledge or fail it. Every command here runs on the + * $work connection so it is safe to invoke from many coroutines at once. + * + * @param array $raw The decoded message payload. + */ + private function process( + Queue $queue, + Message $message, + array $raw, + callable $messageCallback, + callable $successCallback, + callable $errorCallback, + ): void { + $pid = $message->getPid(); + + /** + * Move Job to Jobs and it's PID to the processing list. + */ + $this->work->setArray("{$queue->namespace}.jobs.{$queue->name}.{$pid}", $raw, $queue->jobTtl); + $this->work->leftPush("{$queue->namespace}.processing.{$queue->name}", $pid); + + /** + * Increment Total Jobs Received from Stats. + */ + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.total"); + + try { /** - * Increment Total Jobs Received from Stats. + * Increment Processing Jobs from Stats. */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.total"); + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.processing"); - try { - /** - * Increment Processing Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.processing"); - - $messageCallback($message); - - /** - * Remove Jobs if successful. - */ - $this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}"); - - /** - * Increment Successful Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.success"); - - - $successCallback($message); - } catch (\Throwable $th) { - /** - * Move failed Job to Failed list. - */ - $this->connection->leftPush("{$queue->namespace}.failed.{$queue->name}", $message->getPid()); - - /** - * Increment Failed Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed"); - - $errorCallback($message, $th); - } finally { - /** - * Remove Job from Processing. - */ - $this->connection->listRemove("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); - - /** - * Decrease Processing Jobs from Stats. - */ - $this->connection->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); - } + $messageCallback($message); + + /** + * Remove Jobs if successful. + */ + $this->work->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + + /** + * Increment Successful Jobs from Stats. + */ + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.success"); + + $successCallback($message); + } catch (\Throwable $th) { + /** + * Move failed Job to Failed list. + */ + $this->work->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); + + /** + * Increment Failed Jobs from Stats. + */ + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.failed"); + + $errorCallback($message, $th); + } finally { + /** + * Remove Job from Processing. + */ + $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + + /** + * Decrease Processing Jobs from Stats. + */ + $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); } } @@ -183,9 +248,9 @@ public function enqueue(Queue $queue, array $payload, bool $priority = false): b 'payload' => $payload ]; if ($priority) { - return $this->connection->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->work->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } - return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->work->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } /** @@ -198,7 +263,7 @@ public function retry(Queue $queue, ?int $limit = null): void $processed = 0; while (true) { - $pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); + $pid = $this->work->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); // No more jobs to retry if ($pid === false) { @@ -229,7 +294,7 @@ public function retry(Queue $queue, ?int $limit = null): void private function getJob(Queue $queue, string $pid): Message|false { - $value = $this->connection->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $value = $this->work->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); if ($value === false) { return false; @@ -245,6 +310,6 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int if ($failedJobs) { $queueName = "{$queue->namespace}.failed.{$queue->name}"; } - return $this->connection->listSize($queueName); + return $this->work->listSize($queueName); } } diff --git a/src/Queue/Concurrency/Coroutine.php b/src/Queue/Concurrency/Coroutine.php new file mode 100644 index 0000000..66e853b --- /dev/null +++ b/src/Queue/Concurrency/Coroutine.php @@ -0,0 +1,60 @@ +maxCoroutines = \max(1, $maxCoroutines); + } + + public function submit(callable $task): void + { + $slots = $this->slots ??= new Channel($this->maxCoroutines); + $waitGroup = $this->waitGroup ??= new WaitGroup(); + + // Acquire a slot; blocks the caller once $maxCoroutines are in flight. + $slots->push(true); + $waitGroup->add(); + + SwooleCoroutine::create(function () use ($task, $slots, $waitGroup) { + try { + $task(); + } finally { + $waitGroup->done(); + $slots->pop(); + } + }); + } + + public function drain(): void + { + $this->waitGroup?->wait(); + } +} diff --git a/src/Queue/Concurrency/Executor.php b/src/Queue/Concurrency/Executor.php new file mode 100644 index 0000000..d387d4e --- /dev/null +++ b/src/Queue/Concurrency/Executor.php @@ -0,0 +1,29 @@ +errors; - } -} diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php deleted file mode 100644 index e8557a3..0000000 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ /dev/null @@ -1,20 +0,0 @@ -submit(function () use (&$order) { + $order[] = 1; + }); + $executor->submit(function () use (&$order) { + $order[] = 2; + }); + $executor->drain(); + + $this->assertSame([1, 2], $order); + } + + /** + * The coroutine executor runs every submitted task, but never lets more + * than $maxCoroutines of them run at the same time. + */ + public function testCoroutineBoundsConcurrencyToLimit(): void + { + $active = 0; + $maxActive = 0; + $completed = 0; + + \Swoole\Coroutine\run(function () use (&$active, &$maxActive, &$completed) { + $executor = new Coroutine(maxCoroutines: 3); + + for ($i = 0; $i < 9; $i++) { + $executor->submit(function () use (&$active, &$maxActive, &$completed) { + $active++; + $maxActive = \max($maxActive, $active); + \Swoole\Coroutine::sleep(0.02); + $active--; + $completed++; + }); + } + + $executor->drain(); + }); + + $this->assertSame(9, $completed, 'every task should run'); + $this->assertSame(3, $maxActive, 'no more than maxCoroutines run concurrently'); + } + + /** + * drain() must block until in-flight tasks have finished, so a graceful + * shutdown does not abandon work. + */ + public function testCoroutineDrainWaitsForInFlightTasks(): void + { + $finished = false; + + \Swoole\Coroutine\run(function () use (&$finished) { + $executor = new Coroutine(maxCoroutines: 2); + + $executor->submit(function () use (&$finished) { + \Swoole\Coroutine::sleep(0.05); + $finished = true; + }); + + $executor->drain(); + }); + + $this->assertTrue($finished, 'drain() should not return before submitted work completes'); + } +} diff --git a/tests/Queue/E2E/Adapter/InMemoryConnection.php b/tests/Queue/E2E/Adapter/InMemoryConnection.php new file mode 100644 index 0000000..236cc8b --- /dev/null +++ b/tests/Queue/E2E/Adapter/InMemoryConnection.php @@ -0,0 +1,197 @@ +> */ + private array $lists = []; + + /** @var array */ + private array $values = []; + + /** @var array */ + private array $counters = []; + + public function rightPushArray(string $queue, array $payload): bool + { + $this->lists[$queue][] = $payload; + + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $value = $this->pop($queue, fromTail: true); + + return \is_array($value) ? $value : false; + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + $value = $this->rightPopArray($queue, $timeout); + if (\is_array($value)) { + $this->lists[$destination] ??= []; + \array_unshift($this->lists[$destination], $value); + } + + return $value; + } + + public function leftPushArray(string $queue, array $payload): bool + { + $this->lists[$queue] ??= []; + \array_unshift($this->lists[$queue], $payload); + + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + $value = $this->pop($queue, fromTail: false); + + return \is_array($value) ? $value : false; + } + + public function rightPush(string $queue, string $payload): bool + { + $this->lists[$queue][] = $payload; + + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + $value = $this->pop($queue, fromTail: true); + + return \is_string($value) ? $value : false; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + $value = $this->rightPop($queue, $timeout); + if (\is_string($value)) { + $this->lists[$destination] ??= []; + \array_unshift($this->lists[$destination], $value); + } + + return $value; + } + + public function leftPush(string $queue, string $payload): bool + { + $this->lists[$queue] ??= []; + \array_unshift($this->lists[$queue], $payload); + + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + $value = $this->pop($queue, fromTail: false); + + return \is_string($value) ? $value : false; + } + + public function listRemove(string $queue, string $key): bool + { + $list = $this->lists[$queue] ?? []; + $index = \array_search($key, $list, true); + if ($index === false) { + return false; + } + + unset($list[$index]); + $this->lists[$queue] = \array_values($list); + + return true; + } + + public function listSize(string $key): int + { + return \count($this->lists[$key] ?? []); + } + + public function listRange(string $key, int $total, int $offset): array + { + return \array_slice($this->lists[$key] ?? [], $offset, $total); + } + + public function remove(string $key): bool + { + unset($this->values[$key]); + + return true; + } + + public function move(string $queue, string $destination): bool + { + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + $this->values[$key] = $value; + + return true; + } + + public function get(string $key): array|string|null + { + return $this->values[$key] ?? null; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + $this->values[$key] = $value; + + return true; + } + + public function increment(string $key): int + { + return $this->counters[$key] = ($this->counters[$key] ?? 0) + 1; + } + + public function decrement(string $key): int + { + return $this->counters[$key] = ($this->counters[$key] ?? 0) - 1; + } + + public function ping(): bool + { + return true; + } + + public function close(): void + { + } + + /** + * Pop from either end, yielding when empty so a tight receive loop does not + * monopolise the scheduler. + */ + private function pop(string $queue, bool $fromTail): mixed + { + if (empty($this->lists[$queue])) { + if (Coroutine::getCid() !== -1) { + Coroutine::sleep(0.005); + } + + return null; + } + + return $fromTail ? \array_pop($this->lists[$queue]) : \array_shift($this->lists[$queue]); + } +} diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php index d82c514..16c8c43 100644 --- a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -3,92 +3,105 @@ namespace Tests\E2E\Adapter; use PHPUnit\Framework\TestCase; -use Utopia\Queue\Adapter\Swoole; -use Utopia\Queue\Consumer; -use Utopia\Queue\Error\ConsumerFailures; +use Utopia\Queue\Concurrency\Coroutine; +use Utopia\Queue\Concurrency\Inline; +use Utopia\Queue\Broker\Redis; use Utopia\Queue\Queue; class SwooleConcurrencyTest extends TestCase { - public function testMaxCoroutinesConsumeInParallel(): void + private const string QUEUE = 'concurrency'; + private const string NAMESPACE = 'tests'; + + /** + * With a concurrent executor the broker receives messages on its receive + * connection and processes them on overlapping coroutines, bounded by the + * executor's limit — one receiver fanning out to many handlers. + */ + public function testRedisFansProcessingOutAcrossCoroutines(): void { - $consumer = new ConcurrentConsumer(); - - \Swoole\Coroutine\run(function () use ($consumer) { - $adapter = new Swoole($consumer, 1, 'swoole-concurrency', maxCoroutines: 3); - $adapter->consume(fn () => null, fn () => null, fn () => null); + $connection = new InMemoryConnection(); + $queue = new Queue(self::QUEUE, self::NAMESPACE); + $this->enqueue($connection, $queue, 9); + + $active = 0; + $maxActive = 0; + $processed = 0; + + \Swoole\Coroutine\run(function () use ($connection, $queue, &$active, &$maxActive, &$processed) { + $broker = new Redis( + receive: $connection, + work: $connection, + executor: new Coroutine(maxCoroutines: 3), + ); + + $broker->consume( + $queue, + function () use ($broker, &$active, &$maxActive, &$processed) { + $active++; + $maxActive = \max($maxActive, $active); + \Swoole\Coroutine::sleep(0.02); + $active--; + + if (++$processed === 9) { + $broker->close(); + } + }, + fn () => null, + fn () => null, + ); }); - $this->assertSame(3, $consumer->consumeCalls); - $this->assertSame(3, $consumer->maxActive); + $this->assertSame(9, $processed, 'every enqueued message should be processed'); + $this->assertSame(3, $maxActive, 'concurrency is bounded by the executor limit'); } - public function testPreservesAllCoroutineConsumerErrors(): void + /** + * The default (Inline) executor processes one message fully before the next + * is received, so handlers never overlap. + */ + public function testInlineProcessesOneMessageAtATime(): void { - $consumer = new FailingConcurrentConsumer(); - $failure = null; - - \Swoole\Coroutine\run(function () use ($consumer, &$failure) { - $adapter = new Swoole($consumer, 1, 'swoole-concurrency', maxCoroutines: 3); - - try { - $adapter->consume(fn () => null, fn () => null, fn () => null); - } catch (ConsumerFailures $error) { - $failure = $error; - } + $connection = new InMemoryConnection(); + $queue = new Queue(self::QUEUE, self::NAMESPACE); + $this->enqueue($connection, $queue, 5); + + $active = 0; + $maxActive = 0; + $processed = 0; + + \Swoole\Coroutine\run(function () use ($connection, $queue, &$active, &$maxActive, &$processed) { + $broker = new Redis(receive: $connection, executor: new Inline()); + + $broker->consume( + $queue, + function () use ($broker, &$active, &$maxActive, &$processed) { + $active++; + $maxActive = \max($maxActive, $active); + $active--; + + if (++$processed === 5) { + $broker->close(); + } + }, + fn () => null, + fn () => null, + ); }); - $this->assertInstanceOf(ConsumerFailures::class, $failure); - $this->assertSame(3, $consumer->consumeCalls); - $this->assertSame(3, $consumer->closed); - $this->assertCount(3, $failure->getErrors()); - $messages = \array_map(fn (\Throwable $error) => $error->getMessage(), $failure->getErrors()); - \sort($messages); - - $this->assertSame(['consumer 1 failed', 'consumer 2 failed', 'consumer 3 failed'], $messages); - $this->assertSame($failure->getErrors()[0], $failure->getPrevious()); - } -} - -final class ConcurrentConsumer implements Consumer -{ - public int $active = 0; - public int $consumeCalls = 0; - public int $maxActive = 0; - - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { - $this->consumeCalls++; - $this->active++; - $this->maxActive = \max($this->maxActive, $this->active); - - \Swoole\Coroutine::sleep(0.05); - - $this->active--; - } - - public function close(): void - { - } -} - -final class FailingConcurrentConsumer implements Consumer -{ - public int $closed = 0; - public int $consumeCalls = 0; - - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { - $this->consumeCalls++; - $id = $this->consumeCalls; - - \Swoole\Coroutine::sleep(0.05); - - throw new \RuntimeException("consumer {$id} failed"); + $this->assertSame(5, $processed); + $this->assertSame(1, $maxActive, 'inline processing never overlaps'); } - public function close(): void + private function enqueue(InMemoryConnection $connection, Queue $queue, int $count): void { - $this->closed++; + for ($i = 0; $i < $count; $i++) { + $connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", [ + 'pid' => "pid-{$i}", + 'queue' => $queue->name, + 'timestamp' => 0, + 'payload' => ['n' => $i], + ]); + } } } diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index d7a6e2e..0b42999 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -31,7 +31,12 @@ protected function getQueue(): Queue public function testPriorityJobIsConsumedBeforeNormalJobs(): void { $connection = $this->getConnection(); - $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Priority ordering is purely a property of enqueue, so use a queue no + // worker consumes — otherwise the live consumer drains these jobs before + // we can observe their order. + $queue = new Queue($this->getQueue()->name . '-priority', $this->getQueue()->namespace); + $key = "{$queue->namespace}.queue.{$queue->name}"; // Flush any leftover state from previous runs. while ($connection->rightPopArray($key, 1) !== false) { @@ -39,12 +44,12 @@ public function testPriorityJobIsConsumedBeforeNormalJobs(): void } // Enqueue three normal jobs (pushed to head/left). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-1']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-2']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-3']); // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + $this->getPublisher()->enqueue($queue, ['order' => 'priority'], priority: true); // The first pop should yield the priority job. $first = $connection->rightPopArray($key, 1); diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index 641b1c3..9b02275 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -27,7 +27,12 @@ protected function getQueue(): Queue public function testPriorityJobIsConsumedBeforeNormalJobs(): void { $connection = $this->getConnection(); - $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Priority ordering is purely a property of enqueue, so use a queue no + // worker consumes — otherwise the live consumer drains these jobs before + // we can observe their order. + $queue = new Queue($this->getQueue()->name . '-priority', $this->getQueue()->namespace); + $key = "{$queue->namespace}.queue.{$queue->name}"; // Flush any leftover state from previous runs. while ($connection->rightPopArray($key, 1) !== false) { @@ -35,12 +40,12 @@ public function testPriorityJobIsConsumedBeforeNormalJobs(): void } // Enqueue three normal jobs (pushed to head/left). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-1']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-2']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-3']); // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + $this->getPublisher()->enqueue($queue, ['order' => 'priority'], priority: true); // The first pop should yield the priority job. $first = $connection->rightPopArray($key, 1); diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile deleted file mode 100644 index 65460fa..0000000 --- a/tests/Queue/servers/AMQP/Dockerfile +++ /dev/null @@ -1,3 +0,0 @@ -FROM phpswoole/swoole:php8.3-alpine - -RUN apk add autoconf build-base diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php deleted file mode 100644 index 2f20540..0000000 --- a/tests/Queue/servers/AMQP/worker.php +++ /dev/null @@ -1,42 +0,0 @@ -job() - ->inject('message') - ->param( - key: 'aliasValue', - default: '', - validator: new Text(length: 255, min: 0), - description: 'alias resolution test value', - optional: true, - aliases: ['alias_value', 'aliased'], - ) - ->action(handleRequest(...)); - -$server - ->error() - ->inject('error') - ->action(function ($th) { - echo $th->getMessage() . PHP_EOL; - }); - -$server->workerStart()->action(function () { - echo 'Worker Started' . PHP_EOL; -}); - -$server->workerStop()->action(function () { - echo 'Worker Stopped' . PHP_EOL; -}); - -$server->start(); diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 01f3ab9..1723232 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -5,11 +5,20 @@ use Utopia\Queue\Server; use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Concurrency\Coroutine; +use Utopia\Queue\Connection\Locking; use Utopia\Queue\Connection\Redis as RedisConnection; use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -$consumer = new Redis(new RedisConnection('redis')); +// One connection drives the blocking receive loop; a separate, locked +// connection carries the bookkeeping for the handlers fanned out across +// coroutines. +$consumer = new Redis( + receive: new RedisConnection('redis'), + work: new Locking(new RedisConnection('redis')), + executor: new Coroutine(maxCoroutines: 5), +); $adapter = new Swoole($consumer, 12, 'swoole'); $server = new Server($adapter); From 7d124ca8f4d2e31e4b0cc6e0ea3ee3c315f60248 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 10:01:48 +0100 Subject: [PATCH 2/9] Move concurrency to the adapter; fix arg ordering; require PHP 8.4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review: the broker should not own the executor. With AMQP gone the push-based constraint that forced it there is gone, so the loop moves to the adapter and the broker becomes plain primitives. - Consumer is now receive()/commit()/reject() instead of consume(). Brokers expose primitives and hold no loop, executor, or coroutine knowledge. - The base Adapter owns the receive loop and an Executor (default Inline). The Swoole adapter selects the Coroutine executor and takes maxCoroutines — concurrency is now purely a property of the adapter. Per-message context is set via setContext(), overridden by Swoole to stay coroutine-local. - Broker\Redis: receive() does the blocking pop + claim on the receive connection; commit()/reject() ack on the work connection. Reconnect state moved to instance fields so it persists across receive() calls. - Broker\Pool delegates the new primitives. Fix a pre-existing argument-ordering bug: Server::getArguments built the arguments array in two passes (params then injections), so it was keyed by declared order but not iterated in it. call_user_func_array passes integer-keyed values positionally in iteration order, so handlers with an inject() before a param() received arguments swapped (e.g. a string where a Message was expected). ksort the arguments before invoking. Require PHP 8.4 (utopia-php/pools 1.0.3 already requires it) and move the Docker images to phpswoole/swoole:php8.4-alpine, replacing the earlier platform-check workaround. Co-Authored-By: Claude Opus 4.8 (1M context) --- Dockerfile | 2 +- composer.json | 5 +- composer.lock | 4 +- src/Queue/Adapter.php | 84 ++++++-- src/Queue/Adapter/Swoole.php | 55 +++-- src/Queue/Broker/Pool.php | 19 +- src/Queue/Broker/Redis.php | 204 +++++++----------- src/Queue/Consumer.php | 32 +-- src/Queue/Server.php | 6 + .../Adapter/RedisReconnectCallbackTest.php | 23 +- .../Queue/E2E/Adapter/ServerTelemetryTest.php | 61 ++++-- .../E2E/Adapter/SwooleConcurrencyTest.php | 98 ++++----- tests/Queue/servers/Swoole/Dockerfile | 2 +- tests/Queue/servers/Swoole/worker.php | 8 +- .../servers/SwooleRedisCluster/Dockerfile | 2 +- tests/Queue/servers/Workerman/Dockerfile | 2 +- 16 files changed, 312 insertions(+), 295 deletions(-) diff --git a/Dockerfile b/Dockerfile index 565cb52..77d8286 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ COPY composer.json /usr/local/src/ RUN composer install --ignore-platform-reqs -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine WORKDIR /usr/local/src/ diff --git a/composer.json b/composer.json index ad6281c..a31ff05 100644 --- a/composer.json +++ b/composer.json @@ -26,7 +26,7 @@ "lint": "vendor/bin/pint --test" }, "require": { - "php": ">=8.3", + "php": ">=8.4", "utopia-php/di": "0.3.*", "utopia-php/lock": "0.2.*", "utopia-php/servers": "0.4.*", @@ -50,7 +50,6 @@ "allow-plugins": { "php-http/discovery": true, "tbachert/spi": true - }, - "platform-check": false + } } } diff --git a/composer.lock b/composer.lock index 7e8f3cf..8f50714 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f6db92bdd8d8ccbb163f74f7a79be6a5", + "content-hash": "83028abc9b8135fb855a26a701e630aa", "packages": [ { "name": "brick/math", @@ -4200,7 +4200,7 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.3" + "php": ">=8.4" }, "platform-dev": { "ext-redis": "*" diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index d88f191..c665527 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -3,12 +3,31 @@ namespace Utopia\Queue; use Utopia\DI\Container; +use Utopia\Queue\Concurrency\Executor; +use Utopia\Queue\Concurrency\Inline; abstract class Adapter { + /** + * How long the receive loop blocks for a message before looping back to + * re-check whether it should keep running. + */ + protected const int RECEIVE_TIMEOUT = 2; + public Queue $queue; protected ?Container $context = null; + /** + * Strategy for processing each received message. Defaults to running them + * inline, one at a time; a concurrent adapter (e.g. Swoole) swaps in an + * executor that fans processing out across coroutines. This is where the + * adapter — not the broker — owns the concurrency model. + */ + protected Executor $executor; + + /** Set to break out of the receive loop on its next iteration. */ + protected bool $stopped = false; + public function __construct( public Consumer $consumer, public int $workerNum, @@ -17,6 +36,7 @@ public function __construct( protected Container $resources = new Container(), ) { $this->queue = new Queue($queue, $namespace); + $this->executor = new Inline(); } /** @@ -31,24 +51,56 @@ abstract public function start(): self; */ abstract public function stop(): self; + /** + * Receive messages on a single loop and hand each one to the executor. With + * the default {@see Inline} executor a message is processed before the next + * is received; a concurrent executor processes several at once while the + * loop keeps receiving (bounded by the executor's back-pressure). + */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $this->consumer->consume( - $this->queue, - function (Message $message) use ($messageCallback) { - $this->context = new Container($this->resources()); - - return $messageCallback($message); - }, - $successCallback, - function (?Message $message, \Throwable $error) use ($errorCallback) { - if ($message === null) { - $this->context = new Container($this->resources()); - } - - $errorCallback($message, $error); - }, - ); + $this->stopped = false; + + while (!$this->stopped) { + $message = $this->consumer->receive($this->queue, static::RECEIVE_TIMEOUT); + + if ($message === null) { + continue; + } + + $this->executor->submit(function () use ($message, $messageCallback, $successCallback, $errorCallback) { + $this->process($message, $messageCallback, $successCallback, $errorCallback); + }); + } + + $this->executor->drain(); + } + + /** + * Process one message: give it a fresh context, run the handler, then + * acknowledge or reject it. + */ + protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->setContext(new Container($this->resources())); + + try { + $messageCallback($message); + $this->consumer->commit($this->queue, $message); + $successCallback($message); + } catch (\Throwable $error) { + $this->consumer->reject($this->queue, $message); + $errorCallback($message, $error); + } + } + + /** + * Install the per-message context container. Overridden by concurrent + * adapters to keep the container coroutine-local. + */ + protected function setContext(Container $context): void + { + $this->context = $context; } public function resources(): Container diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index c197c93..de48aa0 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -6,7 +6,8 @@ use Swoole\Process; use Utopia\DI\Container; use Utopia\Queue\Adapter; -use Utopia\Queue\Message; +use Utopia\Queue\Concurrency\Coroutine as CoroutineExecutor; +use Utopia\Queue\Consumer; class Swoole extends Adapter { @@ -21,6 +22,25 @@ class Swoole extends Adapter /** @var callable[] */ protected array $onWorkerStop = []; + /** + * @param int $maxCoroutines Number of messages a worker may process + * concurrently. This is where the Swoole adapter + * owns its concurrency: it fans message processing + * out across coroutines while the receive loop + * keeps pulling work. + */ + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue', + int $maxCoroutines = 1, + Container $resources = new Container(), + ) { + parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); + $this->executor = new CoroutineExecutor($maxCoroutines); + } + public function start(): self { for ($i = 0; $i < $this->workerNum; $i++) { @@ -48,7 +68,10 @@ protected function spawnWorker(int $workerId): void Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); Coroutine\run(function () use ($workerId) { - Process::signal(SIGTERM, fn () => $this->consumer->close()); + Process::signal(SIGTERM, function () { + $this->stopped = true; + $this->consumer->close(); + }); foreach ($this->onWorkerStart as $callback) { $callback((string)$workerId); @@ -65,29 +88,12 @@ protected function spawnWorker(int $workerId): void } /** - * Drives the consumer's receive loop. Concurrency is the broker's concern - * now (it fans message processing out via an Executor); the adapter only - * gives each processed message its own DI container, stored in the - * coroutine context so it stays isolated across concurrent handlers. + * Store the per-message container in the coroutine context so concurrent + * handlers each see their own, rather than sharing one on the adapter. */ - public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + protected function setContext(Container $context): void { - $this->consumer->consume( - $this->queue, - function (Message $message) use ($messageCallback) { - Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); - - return $messageCallback($message); - }, - $successCallback, - function (?Message $message, \Throwable $error) use ($errorCallback) { - if ($message === null) { - Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); - } - - $errorCallback($message, $error); - }, - ); + Coroutine::getContext()[self::CONTEXT_KEY] = $context; } public function context(): Container @@ -108,9 +114,12 @@ protected function reap(): void public function stop(): self { + $this->stopped = true; + foreach ($this->workers as $pid => $process) { Process::kill($pid, SIGTERM); } + return $this; } diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index 5fcdcc7..df0a765 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -3,6 +3,7 @@ namespace Utopia\Queue\Broker; use Utopia\Queue\Consumer; +use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; use Utopia\Pools\Pool as UtopiaPool; @@ -30,12 +31,18 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int return $this->delegatePublish(__FUNCTION__, \func_get_args()); } - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback, - ): void { + public function receive(Queue $queue, int $timeout): ?Message + { + return $this->delegateConsumer(__FUNCTION__, \func_get_args()); + } + + public function commit(Queue $queue, Message $message): void + { + $this->delegateConsumer(__FUNCTION__, \func_get_args()); + } + + public function reject(Queue $queue, Message $message): void + { $this->delegateConsumer(__FUNCTION__, \func_get_args()); } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index bae09a8..4c869f1 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -2,8 +2,6 @@ namespace Utopia\Queue\Broker; -use Utopia\Queue\Concurrency\Executor; -use Utopia\Queue\Concurrency\Inline; use Utopia\Queue\Connection; use Utopia\Queue\Consumer; use Utopia\Queue\Message; @@ -17,22 +15,22 @@ class Redis implements Publisher, Consumer private const int RECONNECT_MAX_BACKOFF_MS = 5_000; /** - * Dedicated to the blocking receive loop, which is driven by a single - * coroutine. Because it has exactly one user it never needs locking. + * Dedicated to the blocking receive loop and its claim writes. A single + * caller drives it, so it never needs locking. */ private readonly Connection $receive; /** - * Carries every other command — the per-message bookkeeping, acknowledging - * and publishing. When messages are processed concurrently (see the - * {@see Executor}), several coroutines share this connection, so it should - * be wrapped in {@see \Utopia\Queue\Connection\Locking}. + * Carries acknowledgements and publishing. When the adapter processes + * messages concurrently, several coroutines share this connection, so it + * should be wrapped in {@see \Utopia\Queue\Connection\Locking}. Defaults to + * $receive, which is safe while processing inline. */ private readonly Connection $work; - private readonly Executor $executor; - private bool $closed = false; + private int $reconnectAttempt = 0; + private int $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; /** * @var (callable(Queue, \Throwable, int, int): void)|null */ @@ -43,21 +41,17 @@ class Redis implements Publisher, Consumer private $reconnectSuccessCallback = null; /** - * @param Connection $receive Connection used for the blocking receive loop. - * @param Connection|null $work Connection used for every other command; defaults - * to $receive, which is safe while processing inline. - * Pass a separate, locked connection when using a - * concurrent executor. - * @param Executor $executor Strategy for processing each received message. + * @param Connection $receive Connection used for the blocking receive loop. + * @param Connection|null $work Connection used for acknowledgements and publishing; + * defaults to $receive. Pass a separate, locked + * connection when an adapter processes concurrently. */ public function __construct( Connection $receive, ?Connection $work = null, - Executor $executor = new Inline(), ) { $this->receive = $receive; $this->work = $work ?? $receive; - $this->executor = $executor; } public function setReconnectCallback(?callable $callback): self @@ -74,140 +68,92 @@ public function setReconnectSuccessCallback(?callable $callback): self return $this; } - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + public function receive(Queue $queue, int $timeout): ?Message { - $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; - $reconnectAttempt = 0; - - while (!$this->closed) { - /** - * Waiting for next Job. The receive loop runs in a single coroutine, - * so the blocking pop has exclusive use of its connection. - */ - try { - $nextMessage = $this->receive->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); - if ($reconnectAttempt > 0) { - $this->triggerReconnectSuccessCallback($queue, $reconnectAttempt); - } - - $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; - $reconnectAttempt = 0; - } catch (\RedisException|\RedisClusterException $e) { - if ($this->closed) { - break; - } - - $reconnectAttempt++; + if ($this->closed) { + return null; + } - try { - $this->receive->close(); - } catch (\Throwable) { - } + /** + * Wait for the next Job. The receive loop is single-threaded, so the + * blocking pop and the claim writes below have exclusive use of this + * connection and need no locking. + */ + try { + $nextMessage = $this->receive->rightPopArray("{$queue->namespace}.queue.{$queue->name}", $timeout); + if ($this->reconnectAttempt > 0) { + $this->triggerReconnectSuccessCallback($queue, $this->reconnectAttempt); + } - $sleepMs = \mt_rand(0, $reconnectBackoffMs); - $this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs); + $this->reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $this->reconnectAttempt = 0; + } catch (\RedisException|\RedisClusterException $e) { + if ($this->closed) { + return null; + } - \usleep($sleepMs * 1000); - $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); + $this->reconnectAttempt++; - continue; + try { + $this->receive->close(); + } catch (\Throwable) { } - if (!$nextMessage) { - continue; - } + $sleepMs = \mt_rand(0, $this->reconnectBackoffMs); + $this->triggerReconnectCallback($queue, $e, $this->reconnectAttempt, $sleepMs); - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; + \usleep($sleepMs * 1000); + $this->reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $this->reconnectBackoffMs * 2); - $message = new Message($nextMessage); + return null; + } - /** - * Hand the message off to the executor. Inline runs it here and now; - * a concurrent executor fans it out onto a coroutine while the loop - * goes back to receiving (bounded by the executor's backpressure). - */ - $this->executor->submit(function () use ($queue, $message, $nextMessage, $messageCallback, $successCallback, $errorCallback) { - $this->process($queue, $message, $nextMessage, $messageCallback, $successCallback, $errorCallback); - }); + if (!$nextMessage) { + return null; } + $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; + + $message = new Message($nextMessage); + $pid = $message->getPid(); + /** - * Wait for in-flight messages to finish before returning, so a graceful - * shutdown does not abandon work that was already received. + * Claim the Job: record it under Jobs, add its PID to the processing + * list and bump the received/processing stats. */ - $this->executor->drain(); + $this->receive->setArray("{$queue->namespace}.jobs.{$queue->name}.{$pid}", $nextMessage, $queue->jobTtl); + $this->receive->leftPush("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->receive->increment("{$queue->namespace}.stats.{$queue->name}.total"); + $this->receive->increment("{$queue->namespace}.stats.{$queue->name}.processing"); + + return $message; } - /** - * Process a single received message: record it as in-flight, run the - * handler, then acknowledge or fail it. Every command here runs on the - * $work connection so it is safe to invoke from many coroutines at once. - * - * @param array $raw The decoded message payload. - */ - private function process( - Queue $queue, - Message $message, - array $raw, - callable $messageCallback, - callable $successCallback, - callable $errorCallback, - ): void { + public function commit(Queue $queue, Message $message): void + { $pid = $message->getPid(); /** - * Move Job to Jobs and it's PID to the processing list. + * Remove the Job, bump the success stat, then clear it from processing. */ - $this->work->setArray("{$queue->namespace}.jobs.{$queue->name}.{$pid}", $raw, $queue->jobTtl); - $this->work->leftPush("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->work->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.success"); + $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); + } + + public function reject(Queue $queue, Message $message): void + { + $pid = $message->getPid(); /** - * Increment Total Jobs Received from Stats. + * Move the Job to the failed list, bump the failed stat, then clear it + * from processing. */ - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.total"); - - try { - /** - * Increment Processing Jobs from Stats. - */ - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.processing"); - - $messageCallback($message); - - /** - * Remove Jobs if successful. - */ - $this->work->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); - - /** - * Increment Successful Jobs from Stats. - */ - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.success"); - - $successCallback($message); - } catch (\Throwable $th) { - /** - * Move failed Job to Failed list. - */ - $this->work->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); - - /** - * Increment Failed Jobs from Stats. - */ - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.failed"); - - $errorCallback($message, $th); - } finally { - /** - * Remove Job from Processing. - */ - $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); - - /** - * Decrease Processing Jobs from Stats. - */ - $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); - } + $this->work->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); + $this->work->increment("{$queue->namespace}.stats.{$queue->name}.failed"); + $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); } public function close(): void diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php index c71ce6e..fad5bd1 100644 --- a/src/Queue/Consumer.php +++ b/src/Queue/Consumer.php @@ -2,24 +2,28 @@ namespace Utopia\Queue; -use Utopia\Queue\Result\Commit; -use Utopia\Queue\Result\NoCommit; - interface Consumer { /** - * @param Queue $queue - * @param callable(Message $message): (Commit|NoCommit|mixed) $messageCallback - * @param callable(Message $message): void $successCallback - * @param callable(Message $message, \Throwable $th): void $errorCallback - * @return void + * Block for up to $timeout seconds for the next message and claim it as + * in-flight. Returns null when nothing arrived in that window (or the + * consumer is closing), so the caller's loop can re-check its state. + * + * The loop that calls this is single-threaded; concurrency, if any, comes + * from the adapter fanning {@see commit()}/{@see reject()} out across + * coroutines. + */ + public function receive(Queue $queue, int $timeout): ?Message; + + /** + * Acknowledge a message as successfully processed. + */ + public function commit(Queue $queue, Message $message): void; + + /** + * Mark a message as failed. */ - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void; + public function reject(Queue $queue, Message $message): void; /** * Closes the consumer and frees any underlying resources. diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 97f8100..a0b1a72 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -434,6 +434,12 @@ protected function getArguments(Container $context, Hook $hook, array $payload = ); } + // Params and injections are collected in two passes, so $arguments ends + // up keyed by declared order but not necessarily iterated in it. Sort by + // key: call_user_func_array passes integer-keyed values positionally in + // iteration order, so an unordered array would mis-assign arguments. + \ksort($arguments); + return $arguments; } diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php index 481ab9e..12de2bd 100644 --- a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -27,12 +27,12 @@ public function testReconnectCallbackReceivesAttemptContext(): void $broker->close(); }); - $broker->consume( - $queue, - fn () => null, - fn () => null, - fn () => null, - ); + // The adapter would call receive() in a loop; a failed pop reconnects + // and returns null, after which the callback has closed the broker so + // further calls are no-ops. + for ($i = 0; $i < 3; $i++) { + $broker->receive($queue, 1); + } $this->assertSame(1, $connection->popAttempts); $this->assertCount(1, $calls); @@ -61,12 +61,11 @@ public function testReconnectSuccessCallbackReceivesAttemptCount(): void $broker->close(); }); - $broker->consume( - $queue, - fn () => null, - fn () => null, - fn () => null, - ); + // First receive() fails and reconnects; the second succeeds (empty pop) + // and fires the success callback, which closes the broker. + for ($i = 0; $i < 3; $i++) { + $broker->receive($queue, 1); + } $this->assertSame(2, $connection->popAttempts); $this->assertCount(1, $calls); diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 7607d0a..bb4417d 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -204,6 +204,17 @@ public function stop(): self return $this; } + /** + * Drain every message the consumer offers, then return — the loop is + * bounded for tests rather than running until a stop signal. + */ + public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + while (($message = $this->consumer->receive($this->queue, 0)) !== null) { + $this->process($message, $messageCallback, $successCallback, $errorCallback); + } + } + public function workerStart(callable $callback): self { $this->onWorkerStart[] = $callback; @@ -219,21 +230,30 @@ public function workerStop(callable $callback): self class ServerTelemetryConsumer implements Consumer { - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void { - $message = new Message([ + private bool $delivered = false; + + public function receive(Queue $queue, int $timeout): ?Message + { + if ($this->delivered) { + return null; + } + + $this->delivered = true; + + return new Message([ 'pid' => 'test-pid', 'queue' => $queue->name, 'timestamp' => time() - 1, 'payload' => [], ]); + } - $messageCallback($message); - $successCallback($message); + public function commit(Queue $queue, Message $message): void + { + } + + public function reject(Queue $queue, Message $message): void + { } public function close(): void @@ -250,16 +270,19 @@ public function __construct(private array $messages) { } - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void { - foreach ($this->messages as $message) { - $messageCallback($message); - $successCallback($message); - } + public function receive(Queue $queue, int $timeout): ?Message + { + $message = array_shift($this->messages); + + return $message instanceof Message ? $message : null; + } + + public function commit(Queue $queue, Message $message): void + { + } + + public function reject(Queue $queue, Message $message): void + { } public function close(): void diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php index 16c8c43..a3224db 100644 --- a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -3,8 +3,7 @@ namespace Tests\E2E\Adapter; use PHPUnit\Framework\TestCase; -use Utopia\Queue\Concurrency\Coroutine; -use Utopia\Queue\Concurrency\Inline; +use Utopia\Queue\Adapter\Swoole; use Utopia\Queue\Broker\Redis; use Utopia\Queue\Queue; @@ -14,74 +13,62 @@ class SwooleConcurrencyTest extends TestCase private const string NAMESPACE = 'tests'; /** - * With a concurrent executor the broker receives messages on its receive - * connection and processes them on overlapping coroutines, bounded by the - * executor's limit — one receiver fanning out to many handlers. + * The Swoole adapter owns concurrency: it receives on a single loop and + * fans message processing out across coroutines, with at most + * maxCoroutines handlers running at once. */ - public function testRedisFansProcessingOutAcrossCoroutines(): void + public function testAdapterFansProcessingOutUpToMaxCoroutines(): void { - $connection = new InMemoryConnection(); - $queue = new Queue(self::QUEUE, self::NAMESPACE); - $this->enqueue($connection, $queue, 9); - - $active = 0; - $maxActive = 0; - $processed = 0; - - \Swoole\Coroutine\run(function () use ($connection, $queue, &$active, &$maxActive, &$processed) { - $broker = new Redis( - receive: $connection, - work: $connection, - executor: new Coroutine(maxCoroutines: 3), - ); + [$processed, $maxActive] = $this->runWorker(messages: 9, maxCoroutines: 3); - $broker->consume( - $queue, - function () use ($broker, &$active, &$maxActive, &$processed) { - $active++; - $maxActive = \max($maxActive, $active); - \Swoole\Coroutine::sleep(0.02); - $active--; + $this->assertSame(9, $processed, 'every enqueued message should be processed'); + $this->assertSame(3, $maxActive, 'concurrency is bounded by maxCoroutines'); + } - if (++$processed === 9) { - $broker->close(); - } - }, - fn () => null, - fn () => null, - ); - }); + /** + * With maxCoroutines of 1 the adapter degrades to sequential processing — + * handlers never overlap. + */ + public function testMaxCoroutinesOfOneProcessesSequentially(): void + { + [$processed, $maxActive] = $this->runWorker(messages: 5, maxCoroutines: 1); - $this->assertSame(9, $processed, 'every enqueued message should be processed'); - $this->assertSame(3, $maxActive, 'concurrency is bounded by the executor limit'); + $this->assertSame(5, $processed); + $this->assertSame(1, $maxActive, 'a single coroutine never overlaps'); } /** - * The default (Inline) executor processes one message fully before the next - * is received, so handlers never overlap. + * Enqueue $messages jobs, run the adapter's consume loop until they are all + * processed, and report how many ran and the peak concurrency observed. + * + * @return array{0: int, 1: int} [processed, maxActive] */ - public function testInlineProcessesOneMessageAtATime(): void + private function runWorker(int $messages, int $maxCoroutines): array { $connection = new InMemoryConnection(); + $broker = new Redis($connection); $queue = new Queue(self::QUEUE, self::NAMESPACE); - $this->enqueue($connection, $queue, 5); $active = 0; $maxActive = 0; $processed = 0; - \Swoole\Coroutine\run(function () use ($connection, $queue, &$active, &$maxActive, &$processed) { - $broker = new Redis(receive: $connection, executor: new Inline()); + \Swoole\Coroutine\run(function () use ($broker, $queue, $messages, $maxCoroutines, &$active, &$maxActive, &$processed) { + for ($i = 0; $i < $messages; $i++) { + $broker->enqueue($queue, ['n' => $i]); + } + + $adapter = new Swoole($broker, 1, self::QUEUE, self::NAMESPACE, maxCoroutines: $maxCoroutines); - $broker->consume( - $queue, - function () use ($broker, &$active, &$maxActive, &$processed) { + $adapter->consume( + function () use ($adapter, $messages, &$active, &$maxActive, &$processed) { $active++; $maxActive = \max($maxActive, $active); + \Swoole\Coroutine::sleep(0.02); $active--; - if (++$processed === 5) { - $broker->close(); + if (++$processed === $messages) { + $adapter->stop(); } }, fn () => null, @@ -89,19 +76,6 @@ function () use ($broker, &$active, &$maxActive, &$processed) { ); }); - $this->assertSame(5, $processed); - $this->assertSame(1, $maxActive, 'inline processing never overlaps'); - } - - private function enqueue(InMemoryConnection $connection, Queue $queue, int $count): void - { - for ($i = 0; $i < $count; $i++) { - $connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", [ - 'pid' => "pid-{$i}", - 'queue' => $queue->name, - 'timestamp' => 0, - 'payload' => ['n' => $i], - ]); - } + return [$processed, $maxActive]; } } diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index eb30cec..b7ebd91 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 1723232..a8eeaa6 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -5,21 +5,19 @@ use Utopia\Queue\Server; use Utopia\Queue\Adapter\Swoole; -use Utopia\Queue\Concurrency\Coroutine; use Utopia\Queue\Connection\Locking; use Utopia\Queue\Connection\Redis as RedisConnection; use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; // One connection drives the blocking receive loop; a separate, locked -// connection carries the bookkeeping for the handlers fanned out across -// coroutines. +// connection carries the acks for the handlers the Swoole adapter fans out +// across coroutines (maxCoroutines). $consumer = new Redis( receive: new RedisConnection('redis'), work: new Locking(new RedisConnection('redis')), - executor: new Coroutine(maxCoroutines: 5), ); -$adapter = new Swoole($consumer, 12, 'swoole'); +$adapter = new Swoole($consumer, 12, 'swoole', maxCoroutines: 5); $server = new Server($adapter); $server->job() diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index eb30cec..b7ebd91 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Workerman/Dockerfile b/tests/Queue/servers/Workerman/Dockerfile index 1704dd1..84dbd1e 100644 --- a/tests/Queue/servers/Workerman/Dockerfile +++ b/tests/Queue/servers/Workerman/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base From 487b64e0c37be66c0fe44f17a4ec5c45dee0fbfa Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 10:52:05 +0100 Subject: [PATCH 3/9] Inline coroutine fan-out in the Swoole adapter; drop Executor abstraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove the Concurrency\Executor/Inline/Coroutine classes. The base adapter processes sequentially; the Swoole adapter inlines the coroutine fan-out (semaphore Channel + WaitGroup) directly. The Swoole adapter has a single model parameterised by workerNum and maxCoroutines — no inline-vs-concurrent split. - Harden Adapter::process(): it now never throws. A failing commit/reject or error callback is routed to $errorCallback instead of escaping the coroutine and being swallowed by Swoole's default handler (error-visibility regression raised in review). The Swoole coroutine body also logs as a last resort. - Broker\Redis::getJob() tolerates a missing job (get() returning null), which could otherwise crash retry() via new Message(null). - Trim the over-explanatory doc comments added in the refactor. Co-Authored-By: Claude Opus 4.8 (1M context) --- .github/workflows/tests.yml | 1 - src/Queue/Adapter.php | 62 ++++++-------- src/Queue/Adapter/Swoole.php | 55 ++++++++++--- src/Queue/Broker/Redis.php | 41 +++------- src/Queue/Concurrency/Coroutine.php | 60 -------------- src/Queue/Concurrency/Executor.php | 29 ------- src/Queue/Concurrency/Inline.php | 21 ----- src/Queue/Consumer.php | 11 +-- tests/Queue/E2E/Adapter/ConcurrencyTest.php | 81 ------------------- .../Queue/E2E/Adapter/InMemoryConnection.php | 14 +--- .../Adapter/RedisReconnectCallbackTest.php | 5 +- .../Queue/E2E/Adapter/ServerTelemetryTest.php | 5 +- .../E2E/Adapter/SwooleConcurrencyTest.php | 21 ++--- tests/Queue/servers/Swoole/worker.php | 4 +- 14 files changed, 94 insertions(+), 316 deletions(-) delete mode 100644 src/Queue/Concurrency/Coroutine.php delete mode 100644 src/Queue/Concurrency/Executor.php delete mode 100644 src/Queue/Concurrency/Inline.php delete mode 100644 tests/Queue/E2E/Adapter/ConcurrencyTest.php diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ce53b47..a725f94 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,7 +15,6 @@ jobs: matrix: adapter: [ - Concurrency, Locking, Pool, SwooleRedisCluster, diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index c665527..5fa4c38 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -3,29 +3,16 @@ namespace Utopia\Queue; use Utopia\DI\Container; -use Utopia\Queue\Concurrency\Executor; -use Utopia\Queue\Concurrency\Inline; abstract class Adapter { - /** - * How long the receive loop blocks for a message before looping back to - * re-check whether it should keep running. - */ + /** Seconds to block for a message before re-checking the stop flag. */ protected const int RECEIVE_TIMEOUT = 2; public Queue $queue; protected ?Container $context = null; - /** - * Strategy for processing each received message. Defaults to running them - * inline, one at a time; a concurrent adapter (e.g. Swoole) swaps in an - * executor that fans processing out across coroutines. This is where the - * adapter — not the broker — owns the concurrency model. - */ - protected Executor $executor; - - /** Set to break out of the receive loop on its next iteration. */ + /** Set to break out of the receive loop. */ protected bool $stopped = false; public function __construct( @@ -36,7 +23,6 @@ public function __construct( protected Container $resources = new Container(), ) { $this->queue = new Queue($queue, $namespace); - $this->executor = new Inline(); } /** @@ -52,10 +38,7 @@ abstract public function start(): self; abstract public function stop(): self; /** - * Receive messages on a single loop and hand each one to the executor. With - * the default {@see Inline} executor a message is processed before the next - * is received; a concurrent executor processes several at once while the - * loop keeps receiving (bounded by the executor's back-pressure). + * Receive and process messages one at a time until stopped. */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { @@ -68,36 +51,39 @@ public function consume(callable $messageCallback, callable $successCallback, ca continue; } - $this->executor->submit(function () use ($message, $messageCallback, $successCallback, $errorCallback) { - $this->process($message, $messageCallback, $successCallback, $errorCallback); - }); + $this->process($message, $messageCallback, $successCallback, $errorCallback); } - - $this->executor->drain(); } /** - * Process one message: give it a fresh context, run the handler, then - * acknowledge or reject it. + * Run the handler for one message, then commit or reject it. Never throws: + * any failure — including a failing commit/reject or error callback — is + * routed to $errorCallback so it can't escape and be lost (e.g. swallowed + * by a coroutine's default handler). */ protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $this->setContext(new Container($this->resources())); - try { - $messageCallback($message); - $this->consumer->commit($this->queue, $message); - $successCallback($message); + $this->setContext(new Container($this->resources())); + + try { + $messageCallback($message); + $this->consumer->commit($this->queue, $message); + $successCallback($message); + } catch (\Throwable $error) { + $this->consumer->reject($this->queue, $message); + $errorCallback($message, $error); + } } catch (\Throwable $error) { - $this->consumer->reject($this->queue, $message); - $errorCallback($message, $error); + try { + $errorCallback($message, $error); + } catch (\Throwable) { + // Nothing left to do — the error callback itself failed. + } } } - /** - * Install the per-message context container. Overridden by concurrent - * adapters to keep the container coroutine-local. - */ + /** Install the per-message context container. */ protected function setContext(Container $context): void { $this->context = $context; diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index de48aa0..b074b4a 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -3,10 +3,11 @@ namespace Utopia\Queue\Adapter; use Swoole\Coroutine; +use Swoole\Coroutine\Channel; +use Swoole\Coroutine\WaitGroup; use Swoole\Process; use Utopia\DI\Container; use Utopia\Queue\Adapter; -use Utopia\Queue\Concurrency\Coroutine as CoroutineExecutor; use Utopia\Queue\Consumer; class Swoole extends Adapter @@ -22,13 +23,9 @@ class Swoole extends Adapter /** @var callable[] */ protected array $onWorkerStop = []; - /** - * @param int $maxCoroutines Number of messages a worker may process - * concurrently. This is where the Swoole adapter - * owns its concurrency: it fans message processing - * out across coroutines while the receive loop - * keeps pulling work. - */ + /** Messages a worker may process concurrently. */ + protected int $maxCoroutines; + public function __construct( Consumer $consumer, int $workerNum, @@ -38,7 +35,7 @@ public function __construct( Container $resources = new Container(), ) { parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); - $this->executor = new CoroutineExecutor($maxCoroutines); + $this->maxCoroutines = \max(1, $maxCoroutines); } public function start(): self @@ -88,9 +85,45 @@ protected function spawnWorker(int $workerId): void } /** - * Store the per-message container in the coroutine context so concurrent - * handlers each see their own, rather than sharing one on the adapter. + * Receive on a single loop and process each message on its own coroutine, + * at most $maxCoroutines at a time. The channel is a semaphore: push() + * blocks the loop once the pool is full until a handler frees a slot. */ + public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + $this->stopped = false; + $slots = new Channel($this->maxCoroutines); + $waitGroup = new WaitGroup(); + + while (!$this->stopped) { + $message = $this->consumer->receive($this->queue, static::RECEIVE_TIMEOUT); + + if ($message === null) { + continue; + } + + $slots->push(true); + $waitGroup->add(); + + Coroutine::create(function () use ($message, $messageCallback, $successCallback, $errorCallback, $slots, $waitGroup) { + try { + $this->process($message, $messageCallback, $successCallback, $errorCallback); + } catch (\Throwable $error) { + // process() is total; last-resort net so a stray throw is + // logged, not swallowed by Swoole's default handler. + \error_log('Uncaught error while processing queue message: ' . $error->getMessage()); + } finally { + $waitGroup->done(); + $slots->pop(); + } + }); + } + + // Let in-flight handlers finish before returning. + $waitGroup->wait(); + } + + /** Keep the per-message container coroutine-local so handlers don't share it. */ protected function setContext(Container $context): void { Coroutine::getContext()[self::CONTEXT_KEY] = $context; diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 4c869f1..6b45f54 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -14,18 +14,10 @@ class Redis implements Publisher, Consumer private const int RECONNECT_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; - /** - * Dedicated to the blocking receive loop and its claim writes. A single - * caller drives it, so it never needs locking. - */ + /** Drives the blocking receive loop and its claim writes (single caller). */ private readonly Connection $receive; - /** - * Carries acknowledgements and publishing. When the adapter processes - * messages concurrently, several coroutines share this connection, so it - * should be wrapped in {@see \Utopia\Queue\Connection\Locking}. Defaults to - * $receive, which is safe while processing inline. - */ + /** Carries acks and publishing; wrap in Locking when shared by coroutines. */ private readonly Connection $work; private bool $closed = false; @@ -41,10 +33,8 @@ class Redis implements Publisher, Consumer private $reconnectSuccessCallback = null; /** - * @param Connection $receive Connection used for the blocking receive loop. - * @param Connection|null $work Connection used for acknowledgements and publishing; - * defaults to $receive. Pass a separate, locked - * connection when an adapter processes concurrently. + * @param Connection|null $work Defaults to $receive; pass a separate, locked + * connection when processing concurrently. */ public function __construct( Connection $receive, @@ -74,11 +64,6 @@ public function receive(Queue $queue, int $timeout): ?Message return null; } - /** - * Wait for the next Job. The receive loop is single-threaded, so the - * blocking pop and the claim writes below have exclusive use of this - * connection and need no locking. - */ try { $nextMessage = $this->receive->rightPopArray("{$queue->namespace}.queue.{$queue->name}", $timeout); if ($this->reconnectAttempt > 0) { @@ -117,10 +102,7 @@ public function receive(Queue $queue, int $timeout): ?Message $message = new Message($nextMessage); $pid = $message->getPid(); - /** - * Claim the Job: record it under Jobs, add its PID to the processing - * list and bump the received/processing stats. - */ + // Claim: store the job, mark it processing, bump received stats. $this->receive->setArray("{$queue->namespace}.jobs.{$queue->name}.{$pid}", $nextMessage, $queue->jobTtl); $this->receive->leftPush("{$queue->namespace}.processing.{$queue->name}", $pid); $this->receive->increment("{$queue->namespace}.stats.{$queue->name}.total"); @@ -133,9 +115,6 @@ public function commit(Queue $queue, Message $message): void { $pid = $message->getPid(); - /** - * Remove the Job, bump the success stat, then clear it from processing. - */ $this->work->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); $this->work->increment("{$queue->namespace}.stats.{$queue->name}.success"); $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); @@ -146,10 +125,6 @@ public function reject(Queue $queue, Message $message): void { $pid = $message->getPid(); - /** - * Move the Job to the failed list, bump the failed stat, then clear it - * from processing. - */ $this->work->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); $this->work->increment("{$queue->namespace}.stats.{$queue->name}.failed"); $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); @@ -242,12 +217,14 @@ private function getJob(Queue $queue, string $pid): Message|false { $value = $this->work->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); - if ($value === false) { + // Missing/expired jobs return false or null depending on the driver. + if (!\is_string($value)) { return false; } $job = json_decode($value, true); - return new Message($job); + + return \is_array($job) ? new Message($job) : false; } public function getQueueSize(Queue $queue, bool $failedJobs = false): int diff --git a/src/Queue/Concurrency/Coroutine.php b/src/Queue/Concurrency/Coroutine.php deleted file mode 100644 index 66e853b..0000000 --- a/src/Queue/Concurrency/Coroutine.php +++ /dev/null @@ -1,60 +0,0 @@ -maxCoroutines = \max(1, $maxCoroutines); - } - - public function submit(callable $task): void - { - $slots = $this->slots ??= new Channel($this->maxCoroutines); - $waitGroup = $this->waitGroup ??= new WaitGroup(); - - // Acquire a slot; blocks the caller once $maxCoroutines are in flight. - $slots->push(true); - $waitGroup->add(); - - SwooleCoroutine::create(function () use ($task, $slots, $waitGroup) { - try { - $task(); - } finally { - $waitGroup->done(); - $slots->pop(); - } - }); - } - - public function drain(): void - { - $this->waitGroup?->wait(); - } -} diff --git a/src/Queue/Concurrency/Executor.php b/src/Queue/Concurrency/Executor.php deleted file mode 100644 index d387d4e..0000000 --- a/src/Queue/Concurrency/Executor.php +++ /dev/null @@ -1,29 +0,0 @@ -submit(function () use (&$order) { - $order[] = 1; - }); - $executor->submit(function () use (&$order) { - $order[] = 2; - }); - $executor->drain(); - - $this->assertSame([1, 2], $order); - } - - /** - * The coroutine executor runs every submitted task, but never lets more - * than $maxCoroutines of them run at the same time. - */ - public function testCoroutineBoundsConcurrencyToLimit(): void - { - $active = 0; - $maxActive = 0; - $completed = 0; - - \Swoole\Coroutine\run(function () use (&$active, &$maxActive, &$completed) { - $executor = new Coroutine(maxCoroutines: 3); - - for ($i = 0; $i < 9; $i++) { - $executor->submit(function () use (&$active, &$maxActive, &$completed) { - $active++; - $maxActive = \max($maxActive, $active); - \Swoole\Coroutine::sleep(0.02); - $active--; - $completed++; - }); - } - - $executor->drain(); - }); - - $this->assertSame(9, $completed, 'every task should run'); - $this->assertSame(3, $maxActive, 'no more than maxCoroutines run concurrently'); - } - - /** - * drain() must block until in-flight tasks have finished, so a graceful - * shutdown does not abandon work. - */ - public function testCoroutineDrainWaitsForInFlightTasks(): void - { - $finished = false; - - \Swoole\Coroutine\run(function () use (&$finished) { - $executor = new Coroutine(maxCoroutines: 2); - - $executor->submit(function () use (&$finished) { - \Swoole\Coroutine::sleep(0.05); - $finished = true; - }); - - $executor->drain(); - }); - - $this->assertTrue($finished, 'drain() should not return before submitted work completes'); - } -} diff --git a/tests/Queue/E2E/Adapter/InMemoryConnection.php b/tests/Queue/E2E/Adapter/InMemoryConnection.php index 236cc8b..1edb66d 100644 --- a/tests/Queue/E2E/Adapter/InMemoryConnection.php +++ b/tests/Queue/E2E/Adapter/InMemoryConnection.php @@ -6,12 +6,9 @@ use Utopia\Queue\Connection; /** - * A minimal in-memory {@see Connection} for tests: lists are plain arrays and - * counters live in a map. Enough to drive the Redis broker's consume loop - * without a real Redis server. - * - * When a pop finds an empty list it yields (inside a coroutine) so a busy - * receive loop cannot starve the handler coroutines it is feeding. + * Minimal in-memory {@see Connection} for tests, backing the broker without a + * real Redis server. An empty pop yields so a busy receive loop doesn't starve + * the handler coroutines. */ class InMemoryConnection implements Connection { @@ -178,10 +175,7 @@ public function close(): void { } - /** - * Pop from either end, yielding when empty so a tight receive loop does not - * monopolise the scheduler. - */ + /** Pop from either end, yielding when empty so the receive loop doesn't spin. */ private function pop(string $queue, bool $fromTail): mixed { if (empty($this->lists[$queue])) { diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php index 12de2bd..df6e71f 100644 --- a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -27,9 +27,8 @@ public function testReconnectCallbackReceivesAttemptContext(): void $broker->close(); }); - // The adapter would call receive() in a loop; a failed pop reconnects - // and returns null, after which the callback has closed the broker so - // further calls are no-ops. + // A failed pop reconnects and returns null; the callback then closes + // the broker, so the remaining calls are no-ops. for ($i = 0; $i < 3; $i++) { $broker->receive($queue, 1); } diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index bb4417d..e016197 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -204,10 +204,7 @@ public function stop(): self return $this; } - /** - * Drain every message the consumer offers, then return — the loop is - * bounded for tests rather than running until a stop signal. - */ + /** Drain every message the consumer offers, then return (bounded for tests). */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { while (($message = $this->consumer->receive($this->queue, 0)) !== null) { diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php index a3224db..511ed67 100644 --- a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -12,34 +12,25 @@ class SwooleConcurrencyTest extends TestCase private const string QUEUE = 'concurrency'; private const string NAMESPACE = 'tests'; - /** - * The Swoole adapter owns concurrency: it receives on a single loop and - * fans message processing out across coroutines, with at most - * maxCoroutines handlers running at once. - */ - public function testAdapterFansProcessingOutUpToMaxCoroutines(): void + public function testProcessesUpToMaxCoroutinesAtOnce(): void { [$processed, $maxActive] = $this->runWorker(messages: 9, maxCoroutines: 3); - $this->assertSame(9, $processed, 'every enqueued message should be processed'); + $this->assertSame(9, $processed); $this->assertSame(3, $maxActive, 'concurrency is bounded by maxCoroutines'); } - /** - * With maxCoroutines of 1 the adapter degrades to sequential processing — - * handlers never overlap. - */ - public function testMaxCoroutinesOfOneProcessesSequentially(): void + public function testOneCoroutineNeverOverlaps(): void { [$processed, $maxActive] = $this->runWorker(messages: 5, maxCoroutines: 1); $this->assertSame(5, $processed); - $this->assertSame(1, $maxActive, 'a single coroutine never overlaps'); + $this->assertSame(1, $maxActive); } /** - * Enqueue $messages jobs, run the adapter's consume loop until they are all - * processed, and report how many ran and the peak concurrency observed. + * Run the consume loop until $messages are processed; return the count and + * the peak concurrency observed. * * @return array{0: int, 1: int} [processed, maxActive] */ diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index a8eeaa6..243e141 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -10,9 +10,7 @@ use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -// One connection drives the blocking receive loop; a separate, locked -// connection carries the acks for the handlers the Swoole adapter fans out -// across coroutines (maxCoroutines). +// Separate receive (blocking pop) and work (locked, shared by coroutines) connections. $consumer = new Redis( receive: new RedisConnection('redis'), work: new Locking(new RedisConnection('redis')), From 2be4c1b1df55b9d20c27312be26d06b32160c807 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 12:18:59 +0100 Subject: [PATCH 4/9] Trim comments to non-obvious WHYs; rename Redis $work to $commands - Cut the explanatory doc comments down to the few that carry non-obvious rationale (process() never throws, the two-connection split, the call_user_func_array argument ordering); drop the rest that just restated the code. - Rename the Redis broker's second connection from $work to $commands, which says what it carries (acks + publishing) rather than a vague "work". Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Adapter.php | 15 +++-------- src/Queue/Adapter/Swoole.php | 13 ++++------ src/Queue/Broker/Redis.php | 36 ++++++++++++--------------- src/Queue/Consumer.php | 17 +++---------- src/Queue/Server.php | 6 ++--- tests/Queue/servers/Swoole/worker.php | 4 +-- 6 files changed, 32 insertions(+), 59 deletions(-) diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 5fa4c38..d9141e4 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -6,13 +6,10 @@ abstract class Adapter { - /** Seconds to block for a message before re-checking the stop flag. */ protected const int RECEIVE_TIMEOUT = 2; public Queue $queue; protected ?Container $context = null; - - /** Set to break out of the receive loop. */ protected bool $stopped = false; public function __construct( @@ -37,9 +34,6 @@ abstract public function start(): self; */ abstract public function stop(): self; - /** - * Receive and process messages one at a time until stopped. - */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { $this->stopped = false; @@ -56,10 +50,8 @@ public function consume(callable $messageCallback, callable $successCallback, ca } /** - * Run the handler for one message, then commit or reject it. Never throws: - * any failure — including a failing commit/reject or error callback — is - * routed to $errorCallback so it can't escape and be lost (e.g. swallowed - * by a coroutine's default handler). + * Never throws: a failing handler, commit, reject, or error callback is all + * routed to $errorCallback so nothing escapes (and is lost) on a coroutine. */ protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void { @@ -78,12 +70,11 @@ protected function process(Message $message, callable $messageCallback, callable try { $errorCallback($message, $error); } catch (\Throwable) { - // Nothing left to do — the error callback itself failed. + // the error callback itself failed; nothing left to do } } } - /** Install the per-message context container. */ protected function setContext(Container $context): void { $this->context = $context; diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index b074b4a..668152a 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -23,7 +23,6 @@ class Swoole extends Adapter /** @var callable[] */ protected array $onWorkerStop = []; - /** Messages a worker may process concurrently. */ protected int $maxCoroutines; public function __construct( @@ -85,9 +84,9 @@ protected function spawnWorker(int $workerId): void } /** - * Receive on a single loop and process each message on its own coroutine, - * at most $maxCoroutines at a time. The channel is a semaphore: push() - * blocks the loop once the pool is full until a handler frees a slot. + * Receive on one loop, process each message on its own coroutine. The + * channel caps concurrency at $maxCoroutines: push() blocks the loop while + * the pool is full. */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { @@ -109,8 +108,7 @@ public function consume(callable $messageCallback, callable $successCallback, ca try { $this->process($message, $messageCallback, $successCallback, $errorCallback); } catch (\Throwable $error) { - // process() is total; last-resort net so a stray throw is - // logged, not swallowed by Swoole's default handler. + // process() is total; net for a stray throw so it isn't lost \error_log('Uncaught error while processing queue message: ' . $error->getMessage()); } finally { $waitGroup->done(); @@ -119,13 +117,12 @@ public function consume(callable $messageCallback, callable $successCallback, ca }); } - // Let in-flight handlers finish before returning. $waitGroup->wait(); } - /** Keep the per-message container coroutine-local so handlers don't share it. */ protected function setContext(Container $context): void { + // coroutine-local so concurrent handlers don't share a context Coroutine::getContext()[self::CONTEXT_KEY] = $context; } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 6b45f54..329bbd5 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -18,7 +18,7 @@ class Redis implements Publisher, Consumer private readonly Connection $receive; /** Carries acks and publishing; wrap in Locking when shared by coroutines. */ - private readonly Connection $work; + private readonly Connection $commands; private bool $closed = false; private int $reconnectAttempt = 0; @@ -32,16 +32,12 @@ class Redis implements Publisher, Consumer */ private $reconnectSuccessCallback = null; - /** - * @param Connection|null $work Defaults to $receive; pass a separate, locked - * connection when processing concurrently. - */ public function __construct( Connection $receive, - ?Connection $work = null, + ?Connection $commands = null, ) { $this->receive = $receive; - $this->work = $work ?? $receive; + $this->commands = $commands ?? $receive; } public function setReconnectCallback(?callable $callback): self @@ -115,20 +111,20 @@ public function commit(Queue $queue, Message $message): void { $pid = $message->getPid(); - $this->work->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.success"); - $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); - $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); + $this->commands->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $this->commands->increment("{$queue->namespace}.stats.{$queue->name}.success"); + $this->commands->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->commands->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); } public function reject(Queue $queue, Message $message): void { $pid = $message->getPid(); - $this->work->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); - $this->work->increment("{$queue->namespace}.stats.{$queue->name}.failed"); - $this->work->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); - $this->work->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); + $this->commands->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); + $this->commands->increment("{$queue->namespace}.stats.{$queue->name}.failed"); + $this->commands->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->commands->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); } public function close(): void @@ -169,9 +165,9 @@ public function enqueue(Queue $queue, array $payload, bool $priority = false): b 'payload' => $payload ]; if ($priority) { - return $this->work->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->commands->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } - return $this->work->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->commands->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } /** @@ -184,7 +180,7 @@ public function retry(Queue $queue, ?int $limit = null): void $processed = 0; while (true) { - $pid = $this->work->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); + $pid = $this->commands->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); // No more jobs to retry if ($pid === false) { @@ -215,7 +211,7 @@ public function retry(Queue $queue, ?int $limit = null): void private function getJob(Queue $queue, string $pid): Message|false { - $value = $this->work->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $value = $this->commands->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); // Missing/expired jobs return false or null depending on the driver. if (!\is_string($value)) { @@ -233,6 +229,6 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int if ($failedJobs) { $queueName = "{$queue->namespace}.failed.{$queue->name}"; } - return $this->work->listSize($queueName); + return $this->commands->listSize($queueName); } } diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php index 0f69aa0..b5f332c 100644 --- a/src/Queue/Consumer.php +++ b/src/Queue/Consumer.php @@ -4,24 +4,15 @@ interface Consumer { - /** - * Block up to $timeout seconds for the next message and claim it. Returns - * null on timeout so the caller can re-check its state. - */ + /** Block up to $timeout seconds for the next message and claim it, or null on timeout. */ public function receive(Queue $queue, int $timeout): ?Message; - /** - * Acknowledge a message as successfully processed. - */ + /** Acknowledge a processed message. */ public function commit(Queue $queue, Message $message): void; - /** - * Mark a message as failed. - */ + /** Mark a message as failed. */ public function reject(Queue $queue, Message $message): void; - /** - * Close the consumer and free any underlying resources. - */ + /** Close the consumer and free resources. */ public function close(): void; } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index a0b1a72..ca7e7f3 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -434,10 +434,8 @@ protected function getArguments(Container $context, Hook $hook, array $payload = ); } - // Params and injections are collected in two passes, so $arguments ends - // up keyed by declared order but not necessarily iterated in it. Sort by - // key: call_user_func_array passes integer-keyed values positionally in - // iteration order, so an unordered array would mis-assign arguments. + // call_user_func_array passes integer keys in iteration order, not key + // order, so sort the two-pass (params, then injections) array by key. \ksort($arguments); return $arguments; diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 243e141..e0ec82d 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -10,10 +10,10 @@ use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -// Separate receive (blocking pop) and work (locked, shared by coroutines) connections. +// Dedicated blocking-receive connection; a separate locked connection for commands. $consumer = new Redis( receive: new RedisConnection('redis'), - work: new Locking(new RedisConnection('redis')), + commands: new Locking(new RedisConnection('redis')), ); $adapter = new Swoole($consumer, 12, 'swoole', maxCoroutines: 5); $server = new Server($adapter); From 95064b17fb6c119689b41199965541f009ec0e48 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 12:34:21 +0100 Subject: [PATCH 5/9] Use constructor property promotion for Redis $receive $commands keeps its body assignment because of the `?? $receive` fallback, but $receive is a plain assignment and can be promoted (as the broker's original single connection was). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Broker/Redis.php | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 329bbd5..7f275a1 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -14,9 +14,6 @@ class Redis implements Publisher, Consumer private const int RECONNECT_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; - /** Drives the blocking receive loop and its claim writes (single caller). */ - private readonly Connection $receive; - /** Carries acks and publishing; wrap in Locking when shared by coroutines. */ private readonly Connection $commands; @@ -33,10 +30,10 @@ class Redis implements Publisher, Consumer private $reconnectSuccessCallback = null; public function __construct( - Connection $receive, + // Drives the blocking receive loop and its claim writes (single caller). + private readonly Connection $receive, ?Connection $commands = null, ) { - $this->receive = $receive; $this->commands = $commands ?? $receive; } From 3d0131c6271ca57737b9d18f3d3eee8904df9b49 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 12:42:57 +0100 Subject: [PATCH 6/9] Require both Redis connections explicitly; drop the $work fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No implicit "$commands defaults to $receive" — callers pass both connections, which makes the two-connection model explicit and lets both be promoted. Pass the same connection twice when one suffices (sequential/inline). All call sites (workers, tests) updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Broker/Redis.php | 9 +++------ tests/Queue/E2E/Adapter/PoolTest.php | 2 +- .../Queue/E2E/Adapter/RedisReconnectCallbackTest.php | 4 ++-- tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php | 2 +- tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php | 2 +- tests/Queue/E2E/Adapter/SwooleTest.php | 2 +- tests/Queue/E2E/Adapter/WorkermanTest.php | 3 +-- tests/Queue/servers/SwooleRedisCluster/worker.php | 12 +++++++----- tests/Queue/servers/Workerman/worker.php | 5 ++++- 9 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 7f275a1..47b0023 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -14,9 +14,6 @@ class Redis implements Publisher, Consumer private const int RECONNECT_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; - /** Carries acks and publishing; wrap in Locking when shared by coroutines. */ - private readonly Connection $commands; - private bool $closed = false; private int $reconnectAttempt = 0; private int $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; @@ -30,11 +27,11 @@ class Redis implements Publisher, Consumer private $reconnectSuccessCallback = null; public function __construct( - // Drives the blocking receive loop and its claim writes (single caller). + // Blocking receive loop + claim writes (single caller). private readonly Connection $receive, - ?Connection $commands = null, + // Acks and publishing; wrap in Locking when shared by coroutines. + private readonly Connection $commands, ) { - $this->commands = $commands ?? $receive; } public function setReconnectCallback(?callable $callback): self diff --git a/tests/Queue/E2E/Adapter/PoolTest.php b/tests/Queue/E2E/Adapter/PoolTest.php index 3c46be0..44b12c7 100644 --- a/tests/Queue/E2E/Adapter/PoolTest.php +++ b/tests/Queue/E2E/Adapter/PoolTest.php @@ -15,7 +15,7 @@ class PoolTest extends Base protected function getPublisher(): Publisher { $pool = new UtopiaPool(new Stack(), 'redis', 1, function () { - return new RedisBroker(new Redis('redis', 6379)); + return new RedisBroker(new Redis('redis', 6379), new Redis('redis', 6379)); }); return new Pool($pool, $pool); diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php index df6e71f..5757e66 100644 --- a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -13,7 +13,7 @@ public function testReconnectCallbackReceivesAttemptContext(): void { $queue = new Queue('reconnect-callback'); $connection = new FailingRedisConnection(); - $broker = new RedisBroker($connection); + $broker = new RedisBroker($connection, $connection); $calls = []; $broker->setReconnectCallback(function (Queue $queue, \Throwable $error, int $attempt, int $sleepMs) use (&$calls, $broker): void { @@ -47,7 +47,7 @@ public function testReconnectSuccessCallbackReceivesAttemptCount(): void { $queue = new Queue('reconnect-success-callback'); $connection = new RecoveringRedisConnection(); - $broker = new RedisBroker($connection); + $broker = new RedisBroker($connection, $connection); $calls = []; $broker->setReconnectCallback(fn () => null); diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php index 511ed67..30a7d9b 100644 --- a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -37,7 +37,7 @@ public function testOneCoroutineNeverOverlaps(): void private function runWorker(int $messages, int $maxCoroutines): array { $connection = new InMemoryConnection(); - $broker = new Redis($connection); + $broker = new Redis($connection, $connection); $queue = new Queue(self::QUEUE, self::NAMESPACE); $active = 0; diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index 0b42999..d31fe7d 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -20,7 +20,7 @@ private function getConnection(): RedisCluster protected function getPublisher(): Publisher { - return new Redis($this->getConnection()); + return new Redis($this->getConnection(), $this->getConnection()); } protected function getQueue(): Queue diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index 9b02275..6262ee7 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -16,7 +16,7 @@ private function getConnection(): Redis protected function getPublisher(): Publisher { - return new RedisBroker($this->getConnection()); + return new RedisBroker($this->getConnection(), $this->getConnection()); } protected function getQueue(): Queue diff --git a/tests/Queue/E2E/Adapter/WorkermanTest.php b/tests/Queue/E2E/Adapter/WorkermanTest.php index a9c7899..8b6a7ef 100644 --- a/tests/Queue/E2E/Adapter/WorkermanTest.php +++ b/tests/Queue/E2E/Adapter/WorkermanTest.php @@ -11,8 +11,7 @@ class WorkermanTest extends Base { protected function getPublisher(): Publisher { - $connection = new Redis('redis', 6379); - return new RedisPublisher($connection); + return new RedisPublisher(new Redis('redis', 6379), new Redis('redis', 6379)); } protected function getQueue(): Queue diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index a851186..51cfcef 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -9,12 +9,14 @@ use Utopia\Queue\Server; use Utopia\Validator\Text; +$nodes = [ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', +]; $consumer = new Redis( - new RedisCluster([ - 'redis-cluster-0:6379', - 'redis-cluster-1:6379', - 'redis-cluster-2:6379', - ]), + receive: new RedisCluster($nodes), + commands: new RedisCluster($nodes), ); $adapter = new Swoole($consumer, 12, 'swoole-redis-cluster'); $server = new Server($adapter); diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index d26fa44..f78b19f 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -9,7 +9,10 @@ use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -$consumer = new Redis(new RedisConnection('redis')); +$consumer = new Redis( + receive: new RedisConnection('redis'), + commands: new RedisConnection('redis'), +); $adapter = new Workerman($consumer, 12, 'wokerman'); $server = new Queue\Server($adapter); From d0faa5eb2bbb542b6649f470603234fdf3e0591a Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 13:23:51 +0100 Subject: [PATCH 7/9] Drop setContext; reset context in the loop, lazy per-coroutine on Swoole MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the setContext seam. process() no longer manages context — the base loop assigns a fresh container per message, and the Swoole adapter creates one lazily per coroutine in context() (each message already runs in its own coroutine, so it's naturally isolated). Fewer moving parts; process() is now purely run-handler-then-commit/reject. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Adapter.php | 8 +------- src/Queue/Adapter/Swoole.php | 10 +++------- tests/Queue/E2E/Adapter/ServerTelemetryTest.php | 1 + 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index d9141e4..9dd1b12 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -45,6 +45,7 @@ public function consume(callable $messageCallback, callable $successCallback, ca continue; } + $this->context = new Container($this->resources()); $this->process($message, $messageCallback, $successCallback, $errorCallback); } } @@ -56,8 +57,6 @@ public function consume(callable $messageCallback, callable $successCallback, ca protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void { try { - $this->setContext(new Container($this->resources())); - try { $messageCallback($message); $this->consumer->commit($this->queue, $message); @@ -75,11 +74,6 @@ protected function process(Message $message, callable $messageCallback, callable } } - protected function setContext(Container $context): void - { - $this->context = $context; - } - public function resources(): Container { return $this->resources; diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index 668152a..932fb65 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -120,16 +120,12 @@ public function consume(callable $messageCallback, callable $successCallback, ca $waitGroup->wait(); } - protected function setContext(Container $context): void - { - // coroutine-local so concurrent handlers don't share a context - Coroutine::getContext()[self::CONTEXT_KEY] = $context; - } - public function context(): Container { + // Each message runs in its own coroutine, so the container is created + // lazily per coroutine and stays isolated across concurrent handlers. if (Coroutine::getCid() !== -1) { - return Coroutine::getContext()[self::CONTEXT_KEY] ?? $this->resources(); + return Coroutine::getContext()[self::CONTEXT_KEY] ??= new Container($this->resources()); } return $this->resources(); diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index e016197..5fad74f 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -208,6 +208,7 @@ public function stop(): self public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { while (($message = $this->consumer->receive($this->queue, 0)) !== null) { + $this->context = new Container($this->resources()); $this->process($message, $messageCallback, $successCallback, $errorCallback); } } From 42560b0bf49c6f0bf9943226591c713123b396af Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 13:27:05 +0100 Subject: [PATCH 8/9] Flatten Adapter::process() error handling Collapse the nested try/catch into one try with guarded reject/error calls. Same guarantee (process never throws) with less nesting, and $errorCallback now always receives the original handler error rather than a reject error when reject() also fails. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Adapter.php | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 9dd1b12..8969b5a 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -51,25 +51,24 @@ public function consume(callable $messageCallback, callable $successCallback, ca } /** - * Never throws: a failing handler, commit, reject, or error callback is all - * routed to $errorCallback so nothing escapes (and is lost) on a coroutine. + * Never throws: a failed handler is rejected and reported to $errorCallback; + * a failing reject or callback is swallowed rather than left to escape (and + * be lost on a coroutine). */ protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void { try { + $messageCallback($message); + $this->consumer->commit($this->queue, $message); + $successCallback($message); + } catch (\Throwable $error) { try { - $messageCallback($message); - $this->consumer->commit($this->queue, $message); - $successCallback($message); - } catch (\Throwable $error) { $this->consumer->reject($this->queue, $message); - $errorCallback($message, $error); + } catch (\Throwable) { } - } catch (\Throwable $error) { try { $errorCallback($message, $error); } catch (\Throwable) { - // the error callback itself failed; nothing left to do } } } From 6db5c8feb6ed72dce4dbb8081ebf51e5279d9b55 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 4 Jun 2026 13:30:39 +0100 Subject: [PATCH 9/9] Remove dead Commit/NoCommit/Retryable; collapse Pool delegate helpers - Delete Result\Commit, Result\NoCommit and Error\Retryable. They were the AMQP acknowledgement protocol and are unreferenced now that AMQP is gone. - Collapse Pool's delegatePublish/delegateConsumer into a single delegate() that takes the target pool, removing the duplicated wrapper. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Broker/Pool.php | 34 +++++++++++----------------------- src/Queue/Error/Retryable.php | 10 ---------- src/Queue/Result/Commit.php | 7 ------- src/Queue/Result/NoCommit.php | 7 ------- 4 files changed, 11 insertions(+), 47 deletions(-) delete mode 100644 src/Queue/Error/Retryable.php delete mode 100644 src/Queue/Result/Commit.php delete mode 100644 src/Queue/Result/NoCommit.php diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index df0a765..ab16b24 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -18,32 +18,32 @@ public function __construct( public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { - return $this->delegatePublish(__FUNCTION__, \func_get_args()); + return $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } public function retry(Queue $queue, ?int $limit = null): void { - $this->delegatePublish(__FUNCTION__, \func_get_args()); + $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } public function getQueueSize(Queue $queue, bool $failedJobs = false): int { - return $this->delegatePublish(__FUNCTION__, \func_get_args()); + return $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } public function receive(Queue $queue, int $timeout): ?Message { - return $this->delegateConsumer(__FUNCTION__, \func_get_args()); + return $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); } public function commit(Queue $queue, Message $message): void { - $this->delegateConsumer(__FUNCTION__, \func_get_args()); + $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); } public function reject(Queue $queue, Message $message): void { - $this->delegateConsumer(__FUNCTION__, \func_get_args()); + $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); } public function close(): void @@ -51,23 +51,11 @@ public function close(): void // TODO: Implement closing all connections in the pool } - protected function delegatePublish(string $method, array $args): mixed + /** + * @param array $args + */ + protected function delegate(?UtopiaPool $pool, string $method, array $args): mixed { - return $this->publisher?->use(function (Publisher $adapter) use ( - $method, - $args, - ) { - return $adapter->$method(...$args); - }); - } - - protected function delegateConsumer(string $method, array $args): mixed - { - return $this->consumer?->use(function (Consumer $adapter) use ( - $method, - $args, - ) { - return $adapter->$method(...$args); - }); + return $pool?->use(fn (Publisher|Consumer $adapter) => $adapter->$method(...$args)); } } diff --git a/src/Queue/Error/Retryable.php b/src/Queue/Error/Retryable.php deleted file mode 100644 index 9c7064b..0000000 --- a/src/Queue/Error/Retryable.php +++ /dev/null @@ -1,10 +0,0 @@ -