diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed5bb80..a725f94 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,10 +15,11 @@ jobs: matrix: adapter: [ - AMQP, + Locking, Pool, SwooleRedisCluster, Swoole, + SwooleConcurrency, Workerman, ] diff --git a/Dockerfile b/Dockerfile index 565cb52..77d8286 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ COPY composer.json /usr/local/src/ RUN composer install --ignore-platform-reqs -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine WORKDIR /usr/local/src/ diff --git a/composer.json b/composer.json index 751c7ab..a31ff05 100644 --- a/composer.json +++ b/composer.json @@ -26,8 +26,7 @@ "lint": "vendor/bin/pint --test" }, "require": { - "php": ">=8.3", - "php-amqplib/php-amqplib": "^3.7", + "php": ">=8.4", "utopia-php/di": "0.3.*", "utopia-php/lock": "0.2.*", "utopia-php/servers": "0.4.*", diff --git a/composer.lock b/composer.lock index 87cfa7d..8f50714 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1d5f6649fb727b85e109128125f7ff9d", + "content-hash": "83028abc9b8135fb855a26a701e630aa", "packages": [ { "name": "brick/math", @@ -741,206 +741,6 @@ }, "time": "2026-01-21T04:14:03+00:00" }, - { - "name": "paragonie/constant_time_encoding", - "version": "v3.1.3", - "source": { - "type": "git", - "url": "https://github.com/paragonie/constant_time_encoding.git", - "reference": "d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/paragonie/constant_time_encoding/zipball/d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77", - "reference": "d5b01a39b3415c2cd581d3bd3a3575c1ebbd8e77", - "shasum": "" - }, - "require": { - "php": "^8" - }, - "require-dev": { - "infection/infection": "^0", - "nikic/php-fuzzer": "^0", - "phpunit/phpunit": "^9|^10|^11", - "vimeo/psalm": "^4|^5|^6" - }, - "type": "library", - "autoload": { - "psr-4": { - "ParagonIE\\ConstantTime\\": "src/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Paragon Initiative Enterprises", - "email": "security@paragonie.com", - "homepage": "https://paragonie.com", - "role": "Maintainer" - }, - { - "name": "Steve 'Sc00bz' Thomas", - "email": "steve@tobtu.com", - "homepage": "https://www.tobtu.com", - "role": "Original Developer" - } - ], - "description": "Constant-time Implementations of RFC 4648 Encoding (Base-64, Base-32, Base-16)", - "keywords": [ - "base16", - "base32", - "base32_decode", - "base32_encode", - "base64", - "base64_decode", - "base64_encode", - "bin2hex", - "encoding", - "hex", - "hex2bin", - "rfc4648" - ], - "support": { - "email": "info@paragonie.com", - "issues": "https://github.com/paragonie/constant_time_encoding/issues", - "source": "https://github.com/paragonie/constant_time_encoding" - }, - "time": "2025-09-24T15:06:41+00:00" - }, - { - "name": "paragonie/random_compat", - "version": "v9.99.100", - "source": { - "type": "git", - "url": "https://github.com/paragonie/random_compat.git", - "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a", - "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a", - "shasum": "" - }, - "require": { - "php": ">= 7" - }, - "require-dev": { - "phpunit/phpunit": "4.*|5.*", - "vimeo/psalm": "^1" - }, - "suggest": { - "ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes." - }, - "type": "library", - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Paragon Initiative Enterprises", - "email": "security@paragonie.com", - "homepage": "https://paragonie.com" - } - ], - "description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7", - "keywords": [ - "csprng", - "polyfill", - "pseudorandom", - "random" - ], - "support": { - "email": "info@paragonie.com", - "issues": "https://github.com/paragonie/random_compat/issues", - "source": "https://github.com/paragonie/random_compat" - }, - "time": "2020-10-15T08:29:30+00:00" - }, - { - "name": "php-amqplib/php-amqplib", - "version": "v3.7.4", - "source": { - "type": "git", - "url": "https://github.com/php-amqplib/php-amqplib.git", - "reference": "381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd", - "reference": "381b6f7c600e0e0c7463cdd7f7a1a3bc6268e5fd", - "shasum": "" - }, - "require": { - "ext-mbstring": "*", - "ext-sockets": "*", - "php": "^7.2||^8.0", - "phpseclib/phpseclib": "^2.0|^3.0" - }, - "conflict": { - "php": "7.4.0 - 7.4.1" - }, - "replace": { - "videlalvaro/php-amqplib": "self.version" - }, - "require-dev": { - "ext-curl": "*", - "nategood/httpful": "^0.2.20", - "phpunit/phpunit": "^7.5|^9.5", - "squizlabs/php_codesniffer": "^3.6" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "3.0-dev" - } - }, - "autoload": { - "psr-4": { - "PhpAmqpLib\\": "PhpAmqpLib/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "LGPL-2.1-or-later" - ], - "authors": [ - { - "name": "Alvaro Videla", - "role": "Original Maintainer" - }, - { - "name": "Raúl Araya", - "email": "nubeiro@gmail.com", - "role": "Maintainer" - }, - { - "name": "Luke Bakken", - "email": "luke@bakken.io", - "role": "Maintainer" - }, - { - "name": "Ramūnas Dronga", - "email": "github@ramuno.lt", - "role": "Maintainer" - } - ], - "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", - "homepage": "https://github.com/php-amqplib/php-amqplib/", - "keywords": [ - "message", - "queue", - "rabbitmq" - ], - "support": { - "issues": "https://github.com/php-amqplib/php-amqplib/issues", - "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.4" - }, - "time": "2025-11-23T17:00:56+00:00" - }, { "name": "php-http/discovery", "version": "1.20.0", @@ -1020,116 +820,6 @@ }, "time": "2024-10-02T11:20:13+00:00" }, - { - "name": "phpseclib/phpseclib", - "version": "3.0.52", - "source": { - "type": "git", - "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "2adaefc83df2ec548558307690f376dd7d4f4fce" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/2adaefc83df2ec548558307690f376dd7d4f4fce", - "reference": "2adaefc83df2ec548558307690f376dd7d4f4fce", - "shasum": "" - }, - "require": { - "paragonie/constant_time_encoding": "^1|^2|^3", - "paragonie/random_compat": "^1.4|^2.0|^9.99.99", - "php": ">=5.6.1" - }, - "require-dev": { - "phpunit/phpunit": "*" - }, - "suggest": { - "ext-dom": "Install the DOM extension to load XML formatted public keys.", - "ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.", - "ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.", - "ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.", - "ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations." - }, - "type": "library", - "autoload": { - "files": [ - "phpseclib/bootstrap.php" - ], - "psr-4": { - "phpseclib3\\": "phpseclib/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Jim Wigginton", - "email": "terrafrost@php.net", - "role": "Lead Developer" - }, - { - "name": "Patrick Monnerat", - "email": "pm@datasphere.ch", - "role": "Developer" - }, - { - "name": "Andreas Fischer", - "email": "bantu@phpbb.com", - "role": "Developer" - }, - { - "name": "Hans-Jürgen Petrich", - "email": "petrich@tronic-media.com", - "role": "Developer" - }, - { - "name": "Graham Campbell", - "email": "graham@alt-three.com", - "role": "Developer" - } - ], - "description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.", - "homepage": "http://phpseclib.sourceforge.net", - "keywords": [ - "BigInteger", - "aes", - "asn.1", - "asn1", - "blowfish", - "crypto", - "cryptography", - "encryption", - "rsa", - "security", - "sftp", - "signature", - "signing", - "ssh", - "twofish", - "x.509", - "x509" - ], - "support": { - "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.52" - }, - "funding": [ - { - "url": "https://github.com/terrafrost", - "type": "github" - }, - { - "url": "https://www.patreon.com/phpseclib", - "type": "patreon" - }, - { - "url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib", - "type": "tidelift" - } - ], - "time": "2026-04-27T07:02:15+00:00" - }, { "name": "psr/container", "version": "2.0.2", @@ -4510,7 +4200,7 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.3" + "php": ">=8.4" }, "platform-dev": { "ext-redis": "*" diff --git a/docker-compose.yml b/docker-compose.yml index 6b230a3..5419ca3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,7 +8,6 @@ services: - ./tests:/usr/src/code/tests depends_on: - swoole - - swoole-amqp - swoole-redis-cluster - workerman @@ -35,18 +34,6 @@ services: redis-cluster-0: condition: service_healthy - swoole-amqp: - container_name: swoole-amqp - build: ./tests/Queue/servers/AMQP/. - command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php - volumes: - - ./vendor:/usr/src/code/vendor - - ./src:/usr/src/code/src - - ./tests:/usr/src/code/tests - depends_on: - amqp: - condition: service_healthy - workerman: container_name: workerman build: ./tests/Queue/servers/Workerman/. @@ -116,14 +103,3 @@ services: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s start_period: 0s - - amqp: - image: rabbitmq:4 - environment: - RABBITMQ_DEFAULT_USER: amqp - RABBITMQ_DEFAULT_PASS: amqp - RABBITMQ_DEFAULT_VHOST: "/" - healthcheck: - test: [ "CMD", "rabbitmqctl", "node_health_check" ] - start_interval: 1s - start_period: 0s diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index d88f191..8969b5a 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -6,8 +6,11 @@ abstract class Adapter { + protected const int RECEIVE_TIMEOUT = 2; + public Queue $queue; protected ?Container $context = null; + protected bool $stopped = false; public function __construct( public Consumer $consumer, @@ -33,22 +36,41 @@ abstract public function stop(): self; public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $this->consumer->consume( - $this->queue, - function (Message $message) use ($messageCallback) { - $this->context = new Container($this->resources()); - - return $messageCallback($message); - }, - $successCallback, - function (?Message $message, \Throwable $error) use ($errorCallback) { - if ($message === null) { - $this->context = new Container($this->resources()); - } + $this->stopped = false; + + while (!$this->stopped) { + $message = $this->consumer->receive($this->queue, static::RECEIVE_TIMEOUT); + + if ($message === null) { + continue; + } + + $this->context = new Container($this->resources()); + $this->process($message, $messageCallback, $successCallback, $errorCallback); + } + } + /** + * Never throws: a failed handler is rejected and reported to $errorCallback; + * a failing reject or callback is swallowed rather than left to escape (and + * be lost on a coroutine). + */ + protected function process(Message $message, callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + try { + $messageCallback($message); + $this->consumer->commit($this->queue, $message); + $successCallback($message); + } catch (\Throwable $error) { + try { + $this->consumer->reject($this->queue, $message); + } catch (\Throwable) { + } + try { $errorCallback($message, $error); - }, - ); + } catch (\Throwable) { + } + } } public function resources(): Container diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index aab5623..932fb65 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -4,12 +4,11 @@ use Swoole\Coroutine; use Swoole\Coroutine\Channel; +use Swoole\Coroutine\WaitGroup; use Swoole\Process; use Utopia\DI\Container; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; -use Utopia\Queue\Error\ConsumerFailures; -use Utopia\Queue\Message; class Swoole extends Adapter { @@ -24,12 +23,14 @@ class Swoole extends Adapter /** @var callable[] */ protected array $onWorkerStop = []; + protected int $maxCoroutines; + public function __construct( Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue', - protected int $maxCoroutines = 1, + int $maxCoroutines = 1, Container $resources = new Container(), ) { parent::__construct($consumer, $workerNum, $queue, $namespace, $resources); @@ -63,7 +64,10 @@ protected function spawnWorker(int $workerId): void Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]); Coroutine\run(function () use ($workerId) { - Process::signal(SIGTERM, fn () => $this->consumer->close()); + Process::signal(SIGTERM, function () { + $this->stopped = true; + $this->consumer->close(); + }); foreach ($this->onWorkerStart as $callback) { $callback((string)$workerId); @@ -79,60 +83,49 @@ protected function spawnWorker(int $workerId): void $this->workers[$pid] = $process; } + /** + * Receive on one loop, process each message on its own coroutine. The + * channel caps concurrency at $maxCoroutines: push() blocks the loop while + * the pool is full. + */ public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $messageCallback = function (Message $message) use ($messageCallback) { - Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); + $this->stopped = false; + $slots = new Channel($this->maxCoroutines); + $waitGroup = new WaitGroup(); - return $messageCallback($message); - }; + while (!$this->stopped) { + $message = $this->consumer->receive($this->queue, static::RECEIVE_TIMEOUT); - $errorCallback = function (?Message $message, \Throwable $error) use ($errorCallback) { if ($message === null) { - Coroutine::getContext()[self::CONTEXT_KEY] = new Container($this->resources()); + continue; } - $errorCallback($message, $error); - }; - - $channel = new Channel($this->maxCoroutines); - $errors = []; + $slots->push(true); + $waitGroup->add(); - for ($i = 0; $i < $this->maxCoroutines; $i++) { - Coroutine::create(function () use ($messageCallback, $successCallback, $errorCallback, $channel, &$errors) { + Coroutine::create(function () use ($message, $messageCallback, $successCallback, $errorCallback, $slots, $waitGroup) { try { - $this->consumer->consume( - $this->queue, - $messageCallback, - $successCallback, - $errorCallback, - ); + $this->process($message, $messageCallback, $successCallback, $errorCallback); } catch (\Throwable $error) { - $errors[] = $error; - $this->consumer->close(); - $channel->push(true); - return; + // process() is total; net for a stray throw so it isn't lost + \error_log('Uncaught error while processing queue message: ' . $error->getMessage()); + } finally { + $waitGroup->done(); + $slots->pop(); } - - $channel->push(true); }); } - for ($i = 0; $i < $this->maxCoroutines; $i++) { - $channel->pop(); - } - - $channel->close(); - - if ($errors !== []) { - throw new ConsumerFailures($errors); - } + $waitGroup->wait(); } public function context(): Container { + // Each message runs in its own coroutine, so the container is created + // lazily per coroutine and stays isolated across concurrent handlers. if (Coroutine::getCid() !== -1) { - return Coroutine::getContext()[self::CONTEXT_KEY] ?? $this->resources(); + return Coroutine::getContext()[self::CONTEXT_KEY] ??= new Container($this->resources()); } return $this->resources(); @@ -147,9 +140,12 @@ protected function reap(): void public function stop(): self { + $this->stopped = true; + foreach ($this->workers as $pid => $process) { Process::kill($pid, SIGTERM); } + return $this; } diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php deleted file mode 100644 index 3ec2964..0000000 --- a/src/Queue/Broker/AMQP.php +++ /dev/null @@ -1,226 +0,0 @@ -exchangeArguments[$key] = $value; - } - - public function setQueueArgument(string $key, string $value): void - { - $this->queueArguments[$key] = $value; - } - - public function setConsumerArguments(string $key, string $value): void - { - $this->consumerArguments[$key] = $value; - } - - public function configureConnection(callable $callback): void - { - $this->connectionConfigHook = $callback; - } - - public function configureChannel(callable $callback): void - { - $this->channelConfigHook = $callback; - } - - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { - $processMessage = function (AMQPMessage $amqpMessage) use ($messageCallback, $successCallback, $errorCallback) { - try { - $nextMessage = json_decode($amqpMessage->getBody(), associative: true) ?? false; - if (!$nextMessage) { - $amqpMessage->nack(); - return; - } - - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; - $message = new Message($nextMessage); - - $result = $messageCallback($message); - match (true) { - $result instanceof Commit => $amqpMessage->ack(true), - $result instanceof NoCommit => null, - default => $amqpMessage->ack() - }; - $successCallback($message); - } catch (Retryable $e) { - $amqpMessage->nack(requeue: true); - $errorCallback($message ?? null, $e); - } catch (\Throwable $th) { - $amqpMessage->nack(); - $errorCallback($message ?? null, $th); - } - }; - - $this->withChannel(function (AMQPChannel $channel) use ($queue, $processMessage) { - // It's good practice for the consumer to set up exchange and queues. - // This approach uses TOPICs (https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) and - // dead-letter-exchanges (https://www.rabbitmq.com/docs/dlx) for failed messages. - - // 1. Declare the exchange and a dead-letter-exchange. - $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - - // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"]))); - $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); - - // 3. Declare the dead-letter-queue and bind it to the DLX. - $channel->queue_declare("{$queue->name}.failed", durable: true, auto_delete: false, arguments: new AMQPTable($this->queueArguments)); - $channel->queue_bind("{$queue->name}.failed", "{$queue->namespace}.failed", routing_key: $queue->name); - - // 4. Instruct to consume on the working queue. - $channel->basic_consume($queue->name, callback: $processMessage, arguments: new AMQPTable($this->consumerArguments)); - }); - - // Run ->consume in own callback to avoid re-running queue creation flow on error. - $this->withChannel(function (AMQPChannel $channel) { - // 5. Consume. This blocks until the connection gets closed. - $channel->consume(); - }); - } - - public function close(): void - { - $this->channel?->stopConsume(); - $this->channel?->getConnection()?->close(); - } - - public function enqueue(Queue $queue, array $payload, bool $priority = false): bool - { - $payload = [ - 'pid' => \uniqid(more_entropy: true), - 'queue' => $queue->name, - 'timestamp' => time(), - 'payload' => $payload - ]; - $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); - $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { - $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); - }); - return true; - } - - public function retry(Queue $queue, ?int $limit = null): void - { - // This is a no-op for AMQP - } - - /** - * @throws \Exception - */ - public function getQueueSize(Queue $queue, bool $failedJobs = false): int - { - $queueName = $queue->name; - if ($failedJobs) { - $queueName = $queueName . '.failed'; - } - - $messageCount = 0; - $this->withChannel(function (AMQPChannel $channel) use ($queueName, &$messageCount) { - try { - [, $messageCount] = $channel->queue_declare($queueName, passive: true); - } catch (AMQPProtocolChannelException $e) { - $this->channel = null; - if ($e->getCode() === 404) { - return; - } - - throw $e; - } - }); - - return $messageCount; - } - - /** - * @param callable(AMQPChannel $channel): void $callback - * @throws \Exception - */ - private function withChannel(callable $callback): void - { - $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection( - $this->host, - $this->port, - $this->user, - $this->password, - $this->vhost, - connection_timeout: $this->connectTimeout, - read_write_timeout: $this->readWriteTimeout, - heartbeat: $this->heartbeat, - ); - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); - } - $channel = $connection->channel(); - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); - } - return $channel; - }; - - if (!$this->channel) { - $this->channel = $createChannel(); - } - - try { - $callback($this->channel); - } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); - } - } -} diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index 5fcdcc7..ab16b24 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -3,6 +3,7 @@ namespace Utopia\Queue\Broker; use Utopia\Queue\Consumer; +use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; use Utopia\Pools\Pool as UtopiaPool; @@ -17,50 +18,44 @@ public function __construct( public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { - return $this->delegatePublish(__FUNCTION__, \func_get_args()); + return $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } public function retry(Queue $queue, ?int $limit = null): void { - $this->delegatePublish(__FUNCTION__, \func_get_args()); + $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } public function getQueueSize(Queue $queue, bool $failedJobs = false): int { - return $this->delegatePublish(__FUNCTION__, \func_get_args()); + return $this->delegate($this->publisher, __FUNCTION__, \func_get_args()); } - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback, - ): void { - $this->delegateConsumer(__FUNCTION__, \func_get_args()); + public function receive(Queue $queue, int $timeout): ?Message + { + return $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); } - public function close(): void + public function commit(Queue $queue, Message $message): void { - // TODO: Implement closing all connections in the pool + $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); + } + + public function reject(Queue $queue, Message $message): void + { + $this->delegate($this->consumer, __FUNCTION__, \func_get_args()); } - protected function delegatePublish(string $method, array $args): mixed + public function close(): void { - return $this->publisher?->use(function (Publisher $adapter) use ( - $method, - $args, - ) { - return $adapter->$method(...$args); - }); + // TODO: Implement closing all connections in the pool } - protected function delegateConsumer(string $method, array $args): mixed + /** + * @param array $args + */ + protected function delegate(?UtopiaPool $pool, string $method, array $args): mixed { - return $this->consumer?->use(function (Consumer $adapter) use ( - $method, - $args, - ) { - return $adapter->$method(...$args); - }); + return $pool?->use(fn (Publisher|Consumer $adapter) => $adapter->$method(...$args)); } } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 161e687..47b0023 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -15,6 +15,8 @@ class Redis implements Publisher, Consumer private const int RECONNECT_MAX_BACKOFF_MS = 5_000; private bool $closed = false; + private int $reconnectAttempt = 0; + private int $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; /** * @var (callable(Queue, \Throwable, int, int): void)|null */ @@ -24,8 +26,12 @@ class Redis implements Publisher, Consumer */ private $reconnectSuccessCallback = null; - public function __construct(private readonly Connection $connection) - { + public function __construct( + // Blocking receive loop + claim writes (single caller). + private readonly Connection $receive, + // Acks and publishing; wrap in Locking when shared by coroutines. + private readonly Connection $commands, + ) { } public function setReconnectCallback(?callable $callback): self @@ -42,107 +48,77 @@ public function setReconnectSuccessCallback(?callable $callback): self return $this; } - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + public function receive(Queue $queue, int $timeout): ?Message { - $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; - $reconnectAttempt = 0; + if ($this->closed) { + return null; + } + + try { + $nextMessage = $this->receive->rightPopArray("{$queue->namespace}.queue.{$queue->name}", $timeout); + if ($this->reconnectAttempt > 0) { + $this->triggerReconnectSuccessCallback($queue, $this->reconnectAttempt); + } + + $this->reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $this->reconnectAttempt = 0; + } catch (\RedisException|\RedisClusterException $e) { + if ($this->closed) { + return null; + } + + $this->reconnectAttempt++; - while (!$this->closed) { - /** - * Waiting for next Job. - */ try { - $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); - if ($reconnectAttempt > 0) { - $this->triggerReconnectSuccessCallback($queue, $reconnectAttempt); - } + $this->receive->close(); + } catch (\Throwable) { + } - $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; - $reconnectAttempt = 0; - } catch (\RedisException|\RedisClusterException $e) { - if ($this->closed) { - break; - } + $sleepMs = \mt_rand(0, $this->reconnectBackoffMs); + $this->triggerReconnectCallback($queue, $e, $this->reconnectAttempt, $sleepMs); - $reconnectAttempt++; + \usleep($sleepMs * 1000); + $this->reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $this->reconnectBackoffMs * 2); - try { - $this->connection->close(); - } catch (\Throwable) { - } + return null; + } - $sleepMs = \mt_rand(0, $reconnectBackoffMs); - $this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs); + if (!$nextMessage) { + return null; + } - \usleep($sleepMs * 1000); - $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); + $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; - continue; - } + $message = new Message($nextMessage); + $pid = $message->getPid(); - if (!$nextMessage) { - continue; - } + // Claim: store the job, mark it processing, bump received stats. + $this->receive->setArray("{$queue->namespace}.jobs.{$queue->name}.{$pid}", $nextMessage, $queue->jobTtl); + $this->receive->leftPush("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->receive->increment("{$queue->namespace}.stats.{$queue->name}.total"); + $this->receive->increment("{$queue->namespace}.stats.{$queue->name}.processing"); - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; + return $message; + } - $message = new Message($nextMessage); + public function commit(Queue $queue, Message $message): void + { + $pid = $message->getPid(); - /** - * Move Job to Jobs and it's PID to the processing list. - */ - $this->connection->setArray("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}", $nextMessage, $queue->jobTtl); - $this->connection->leftPush("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); + $this->commands->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $this->commands->increment("{$queue->namespace}.stats.{$queue->name}.success"); + $this->commands->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->commands->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); + } - /** - * Increment Total Jobs Received from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.total"); + public function reject(Queue $queue, Message $message): void + { + $pid = $message->getPid(); - try { - /** - * Increment Processing Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.processing"); - - $messageCallback($message); - - /** - * Remove Jobs if successful. - */ - $this->connection->remove("{$queue->namespace}.jobs.{$queue->name}.{$message->getPid()}"); - - /** - * Increment Successful Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.success"); - - - $successCallback($message); - } catch (\Throwable $th) { - /** - * Move failed Job to Failed list. - */ - $this->connection->leftPush("{$queue->namespace}.failed.{$queue->name}", $message->getPid()); - - /** - * Increment Failed Jobs from Stats. - */ - $this->connection->increment("{$queue->namespace}.stats.{$queue->name}.failed"); - - $errorCallback($message, $th); - } finally { - /** - * Remove Job from Processing. - */ - $this->connection->listRemove("{$queue->namespace}.processing.{$queue->name}", $message->getPid()); - - /** - * Decrease Processing Jobs from Stats. - */ - $this->connection->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); - } - } + $this->commands->leftPush("{$queue->namespace}.failed.{$queue->name}", $pid); + $this->commands->increment("{$queue->namespace}.stats.{$queue->name}.failed"); + $this->commands->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); + $this->commands->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); } public function close(): void @@ -183,9 +159,9 @@ public function enqueue(Queue $queue, array $payload, bool $priority = false): b 'payload' => $payload ]; if ($priority) { - return $this->connection->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->commands->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } - return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + return $this->commands->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } /** @@ -198,7 +174,7 @@ public function retry(Queue $queue, ?int $limit = null): void $processed = 0; while (true) { - $pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); + $pid = $this->commands->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT); // No more jobs to retry if ($pid === false) { @@ -229,14 +205,16 @@ public function retry(Queue $queue, ?int $limit = null): void private function getJob(Queue $queue, string $pid): Message|false { - $value = $this->connection->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); + $value = $this->commands->get("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); - if ($value === false) { + // Missing/expired jobs return false or null depending on the driver. + if (!\is_string($value)) { return false; } $job = json_decode($value, true); - return new Message($job); + + return \is_array($job) ? new Message($job) : false; } public function getQueueSize(Queue $queue, bool $failedJobs = false): int @@ -245,6 +223,6 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int if ($failedJobs) { $queueName = "{$queue->namespace}.failed.{$queue->name}"; } - return $this->connection->listSize($queueName); + return $this->commands->listSize($queueName); } } diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php index c71ce6e..b5f332c 100644 --- a/src/Queue/Consumer.php +++ b/src/Queue/Consumer.php @@ -2,27 +2,17 @@ namespace Utopia\Queue; -use Utopia\Queue\Result\Commit; -use Utopia\Queue\Result\NoCommit; - interface Consumer { - /** - * @param Queue $queue - * @param callable(Message $message): (Commit|NoCommit|mixed) $messageCallback - * @param callable(Message $message): void $successCallback - * @param callable(Message $message, \Throwable $th): void $errorCallback - * @return void - */ - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void; + /** Block up to $timeout seconds for the next message and claim it, or null on timeout. */ + public function receive(Queue $queue, int $timeout): ?Message; + + /** Acknowledge a processed message. */ + public function commit(Queue $queue, Message $message): void; + + /** Mark a message as failed. */ + public function reject(Queue $queue, Message $message): void; - /** - * Closes the consumer and frees any underlying resources. - */ + /** Close the consumer and free resources. */ public function close(): void; } diff --git a/src/Queue/Error/ConsumerFailures.php b/src/Queue/Error/ConsumerFailures.php deleted file mode 100644 index 9523e87..0000000 --- a/src/Queue/Error/ConsumerFailures.php +++ /dev/null @@ -1,25 +0,0 @@ -errors; - } -} diff --git a/src/Queue/Error/Retryable.php b/src/Queue/Error/Retryable.php deleted file mode 100644 index 9c7064b..0000000 --- a/src/Queue/Error/Retryable.php +++ /dev/null @@ -1,10 +0,0 @@ -> */ + private array $lists = []; + + /** @var array */ + private array $values = []; + + /** @var array */ + private array $counters = []; + + public function rightPushArray(string $queue, array $payload): bool + { + $this->lists[$queue][] = $payload; + + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $value = $this->pop($queue, fromTail: true); + + return \is_array($value) ? $value : false; + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + $value = $this->rightPopArray($queue, $timeout); + if (\is_array($value)) { + $this->lists[$destination] ??= []; + \array_unshift($this->lists[$destination], $value); + } + + return $value; + } + + public function leftPushArray(string $queue, array $payload): bool + { + $this->lists[$queue] ??= []; + \array_unshift($this->lists[$queue], $payload); + + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + $value = $this->pop($queue, fromTail: false); + + return \is_array($value) ? $value : false; + } + + public function rightPush(string $queue, string $payload): bool + { + $this->lists[$queue][] = $payload; + + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + $value = $this->pop($queue, fromTail: true); + + return \is_string($value) ? $value : false; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + $value = $this->rightPop($queue, $timeout); + if (\is_string($value)) { + $this->lists[$destination] ??= []; + \array_unshift($this->lists[$destination], $value); + } + + return $value; + } + + public function leftPush(string $queue, string $payload): bool + { + $this->lists[$queue] ??= []; + \array_unshift($this->lists[$queue], $payload); + + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + $value = $this->pop($queue, fromTail: false); + + return \is_string($value) ? $value : false; + } + + public function listRemove(string $queue, string $key): bool + { + $list = $this->lists[$queue] ?? []; + $index = \array_search($key, $list, true); + if ($index === false) { + return false; + } + + unset($list[$index]); + $this->lists[$queue] = \array_values($list); + + return true; + } + + public function listSize(string $key): int + { + return \count($this->lists[$key] ?? []); + } + + public function listRange(string $key, int $total, int $offset): array + { + return \array_slice($this->lists[$key] ?? [], $offset, $total); + } + + public function remove(string $key): bool + { + unset($this->values[$key]); + + return true; + } + + public function move(string $queue, string $destination): bool + { + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + $this->values[$key] = $value; + + return true; + } + + public function get(string $key): array|string|null + { + return $this->values[$key] ?? null; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + $this->values[$key] = $value; + + return true; + } + + public function increment(string $key): int + { + return $this->counters[$key] = ($this->counters[$key] ?? 0) + 1; + } + + public function decrement(string $key): int + { + return $this->counters[$key] = ($this->counters[$key] ?? 0) - 1; + } + + public function ping(): bool + { + return true; + } + + public function close(): void + { + } + + /** Pop from either end, yielding when empty so the receive loop doesn't spin. */ + private function pop(string $queue, bool $fromTail): mixed + { + if (empty($this->lists[$queue])) { + if (Coroutine::getCid() !== -1) { + Coroutine::sleep(0.005); + } + + return null; + } + + return $fromTail ? \array_pop($this->lists[$queue]) : \array_shift($this->lists[$queue]); + } +} diff --git a/tests/Queue/E2E/Adapter/PoolTest.php b/tests/Queue/E2E/Adapter/PoolTest.php index 3c46be0..44b12c7 100644 --- a/tests/Queue/E2E/Adapter/PoolTest.php +++ b/tests/Queue/E2E/Adapter/PoolTest.php @@ -15,7 +15,7 @@ class PoolTest extends Base protected function getPublisher(): Publisher { $pool = new UtopiaPool(new Stack(), 'redis', 1, function () { - return new RedisBroker(new Redis('redis', 6379)); + return new RedisBroker(new Redis('redis', 6379), new Redis('redis', 6379)); }); return new Pool($pool, $pool); diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php index 481ab9e..5757e66 100644 --- a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -13,7 +13,7 @@ public function testReconnectCallbackReceivesAttemptContext(): void { $queue = new Queue('reconnect-callback'); $connection = new FailingRedisConnection(); - $broker = new RedisBroker($connection); + $broker = new RedisBroker($connection, $connection); $calls = []; $broker->setReconnectCallback(function (Queue $queue, \Throwable $error, int $attempt, int $sleepMs) use (&$calls, $broker): void { @@ -27,12 +27,11 @@ public function testReconnectCallbackReceivesAttemptContext(): void $broker->close(); }); - $broker->consume( - $queue, - fn () => null, - fn () => null, - fn () => null, - ); + // A failed pop reconnects and returns null; the callback then closes + // the broker, so the remaining calls are no-ops. + for ($i = 0; $i < 3; $i++) { + $broker->receive($queue, 1); + } $this->assertSame(1, $connection->popAttempts); $this->assertCount(1, $calls); @@ -48,7 +47,7 @@ public function testReconnectSuccessCallbackReceivesAttemptCount(): void { $queue = new Queue('reconnect-success-callback'); $connection = new RecoveringRedisConnection(); - $broker = new RedisBroker($connection); + $broker = new RedisBroker($connection, $connection); $calls = []; $broker->setReconnectCallback(fn () => null); @@ -61,12 +60,11 @@ public function testReconnectSuccessCallbackReceivesAttemptCount(): void $broker->close(); }); - $broker->consume( - $queue, - fn () => null, - fn () => null, - fn () => null, - ); + // First receive() fails and reconnects; the second succeeds (empty pop) + // and fires the success callback, which closes the broker. + for ($i = 0; $i < 3; $i++) { + $broker->receive($queue, 1); + } $this->assertSame(2, $connection->popAttempts); $this->assertCount(1, $calls); diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 7607d0a..5fad74f 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -204,6 +204,15 @@ public function stop(): self return $this; } + /** Drain every message the consumer offers, then return (bounded for tests). */ + public function consume(callable $messageCallback, callable $successCallback, callable $errorCallback): void + { + while (($message = $this->consumer->receive($this->queue, 0)) !== null) { + $this->context = new Container($this->resources()); + $this->process($message, $messageCallback, $successCallback, $errorCallback); + } + } + public function workerStart(callable $callback): self { $this->onWorkerStart[] = $callback; @@ -219,21 +228,30 @@ public function workerStop(callable $callback): self class ServerTelemetryConsumer implements Consumer { - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void { - $message = new Message([ + private bool $delivered = false; + + public function receive(Queue $queue, int $timeout): ?Message + { + if ($this->delivered) { + return null; + } + + $this->delivered = true; + + return new Message([ 'pid' => 'test-pid', 'queue' => $queue->name, 'timestamp' => time() - 1, 'payload' => [], ]); + } - $messageCallback($message); - $successCallback($message); + public function commit(Queue $queue, Message $message): void + { + } + + public function reject(Queue $queue, Message $message): void + { } public function close(): void @@ -250,16 +268,19 @@ public function __construct(private array $messages) { } - public function consume( - Queue $queue, - callable $messageCallback, - callable $successCallback, - callable $errorCallback - ): void { - foreach ($this->messages as $message) { - $messageCallback($message); - $successCallback($message); - } + public function receive(Queue $queue, int $timeout): ?Message + { + $message = array_shift($this->messages); + + return $message instanceof Message ? $message : null; + } + + public function commit(Queue $queue, Message $message): void + { + } + + public function reject(Queue $queue, Message $message): void + { } public function close(): void diff --git a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php index d82c514..30a7d9b 100644 --- a/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php +++ b/tests/Queue/E2E/Adapter/SwooleConcurrencyTest.php @@ -4,91 +4,69 @@ use PHPUnit\Framework\TestCase; use Utopia\Queue\Adapter\Swoole; -use Utopia\Queue\Consumer; -use Utopia\Queue\Error\ConsumerFailures; +use Utopia\Queue\Broker\Redis; use Utopia\Queue\Queue; class SwooleConcurrencyTest extends TestCase { - public function testMaxCoroutinesConsumeInParallel(): void - { - $consumer = new ConcurrentConsumer(); - - \Swoole\Coroutine\run(function () use ($consumer) { - $adapter = new Swoole($consumer, 1, 'swoole-concurrency', maxCoroutines: 3); - $adapter->consume(fn () => null, fn () => null, fn () => null); - }); + private const string QUEUE = 'concurrency'; + private const string NAMESPACE = 'tests'; - $this->assertSame(3, $consumer->consumeCalls); - $this->assertSame(3, $consumer->maxActive); - } - - public function testPreservesAllCoroutineConsumerErrors(): void + public function testProcessesUpToMaxCoroutinesAtOnce(): void { - $consumer = new FailingConcurrentConsumer(); - $failure = null; - - \Swoole\Coroutine\run(function () use ($consumer, &$failure) { - $adapter = new Swoole($consumer, 1, 'swoole-concurrency', maxCoroutines: 3); + [$processed, $maxActive] = $this->runWorker(messages: 9, maxCoroutines: 3); - try { - $adapter->consume(fn () => null, fn () => null, fn () => null); - } catch (ConsumerFailures $error) { - $failure = $error; - } - }); - - $this->assertInstanceOf(ConsumerFailures::class, $failure); - $this->assertSame(3, $consumer->consumeCalls); - $this->assertSame(3, $consumer->closed); - $this->assertCount(3, $failure->getErrors()); - $messages = \array_map(fn (\Throwable $error) => $error->getMessage(), $failure->getErrors()); - \sort($messages); - - $this->assertSame(['consumer 1 failed', 'consumer 2 failed', 'consumer 3 failed'], $messages); - $this->assertSame($failure->getErrors()[0], $failure->getPrevious()); + $this->assertSame(9, $processed); + $this->assertSame(3, $maxActive, 'concurrency is bounded by maxCoroutines'); } -} -final class ConcurrentConsumer implements Consumer -{ - public int $active = 0; - public int $consumeCalls = 0; - public int $maxActive = 0; - - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void + public function testOneCoroutineNeverOverlaps(): void { - $this->consumeCalls++; - $this->active++; - $this->maxActive = \max($this->maxActive, $this->active); - - \Swoole\Coroutine::sleep(0.05); + [$processed, $maxActive] = $this->runWorker(messages: 5, maxCoroutines: 1); - $this->active--; + $this->assertSame(5, $processed); + $this->assertSame(1, $maxActive); } - public function close(): void + /** + * Run the consume loop until $messages are processed; return the count and + * the peak concurrency observed. + * + * @return array{0: int, 1: int} [processed, maxActive] + */ + private function runWorker(int $messages, int $maxCoroutines): array { - } -} + $connection = new InMemoryConnection(); + $broker = new Redis($connection, $connection); + $queue = new Queue(self::QUEUE, self::NAMESPACE); -final class FailingConcurrentConsumer implements Consumer -{ - public int $closed = 0; - public int $consumeCalls = 0; + $active = 0; + $maxActive = 0; + $processed = 0; - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { - $this->consumeCalls++; - $id = $this->consumeCalls; - - \Swoole\Coroutine::sleep(0.05); + \Swoole\Coroutine\run(function () use ($broker, $queue, $messages, $maxCoroutines, &$active, &$maxActive, &$processed) { + for ($i = 0; $i < $messages; $i++) { + $broker->enqueue($queue, ['n' => $i]); + } - throw new \RuntimeException("consumer {$id} failed"); - } + $adapter = new Swoole($broker, 1, self::QUEUE, self::NAMESPACE, maxCoroutines: $maxCoroutines); + + $adapter->consume( + function () use ($adapter, $messages, &$active, &$maxActive, &$processed) { + $active++; + $maxActive = \max($maxActive, $active); + \Swoole\Coroutine::sleep(0.02); + $active--; + + if (++$processed === $messages) { + $adapter->stop(); + } + }, + fn () => null, + fn () => null, + ); + }); - public function close(): void - { - $this->closed++; + return [$processed, $maxActive]; } } diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index d7a6e2e..d31fe7d 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -20,7 +20,7 @@ private function getConnection(): RedisCluster protected function getPublisher(): Publisher { - return new Redis($this->getConnection()); + return new Redis($this->getConnection(), $this->getConnection()); } protected function getQueue(): Queue @@ -31,7 +31,12 @@ protected function getQueue(): Queue public function testPriorityJobIsConsumedBeforeNormalJobs(): void { $connection = $this->getConnection(); - $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Priority ordering is purely a property of enqueue, so use a queue no + // worker consumes — otherwise the live consumer drains these jobs before + // we can observe their order. + $queue = new Queue($this->getQueue()->name . '-priority', $this->getQueue()->namespace); + $key = "{$queue->namespace}.queue.{$queue->name}"; // Flush any leftover state from previous runs. while ($connection->rightPopArray($key, 1) !== false) { @@ -39,12 +44,12 @@ public function testPriorityJobIsConsumedBeforeNormalJobs(): void } // Enqueue three normal jobs (pushed to head/left). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-1']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-2']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-3']); // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + $this->getPublisher()->enqueue($queue, ['order' => 'priority'], priority: true); // The first pop should yield the priority job. $first = $connection->rightPopArray($key, 1); diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index 641b1c3..6262ee7 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -16,7 +16,7 @@ private function getConnection(): Redis protected function getPublisher(): Publisher { - return new RedisBroker($this->getConnection()); + return new RedisBroker($this->getConnection(), $this->getConnection()); } protected function getQueue(): Queue @@ -27,7 +27,12 @@ protected function getQueue(): Queue public function testPriorityJobIsConsumedBeforeNormalJobs(): void { $connection = $this->getConnection(); - $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Priority ordering is purely a property of enqueue, so use a queue no + // worker consumes — otherwise the live consumer drains these jobs before + // we can observe their order. + $queue = new Queue($this->getQueue()->name . '-priority', $this->getQueue()->namespace); + $key = "{$queue->namespace}.queue.{$queue->name}"; // Flush any leftover state from previous runs. while ($connection->rightPopArray($key, 1) !== false) { @@ -35,12 +40,12 @@ public function testPriorityJobIsConsumedBeforeNormalJobs(): void } // Enqueue three normal jobs (pushed to head/left). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-1']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-2']); + $this->getPublisher()->enqueue($queue, ['order' => 'normal-3']); // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). - $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + $this->getPublisher()->enqueue($queue, ['order' => 'priority'], priority: true); // The first pop should yield the priority job. $first = $connection->rightPopArray($key, 1); diff --git a/tests/Queue/E2E/Adapter/WorkermanTest.php b/tests/Queue/E2E/Adapter/WorkermanTest.php index a9c7899..8b6a7ef 100644 --- a/tests/Queue/E2E/Adapter/WorkermanTest.php +++ b/tests/Queue/E2E/Adapter/WorkermanTest.php @@ -11,8 +11,7 @@ class WorkermanTest extends Base { protected function getPublisher(): Publisher { - $connection = new Redis('redis', 6379); - return new RedisPublisher($connection); + return new RedisPublisher(new Redis('redis', 6379), new Redis('redis', 6379)); } protected function getQueue(): Queue diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile deleted file mode 100644 index 65460fa..0000000 --- a/tests/Queue/servers/AMQP/Dockerfile +++ /dev/null @@ -1,3 +0,0 @@ -FROM phpswoole/swoole:php8.3-alpine - -RUN apk add autoconf build-base diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php deleted file mode 100644 index 2f20540..0000000 --- a/tests/Queue/servers/AMQP/worker.php +++ /dev/null @@ -1,42 +0,0 @@ -job() - ->inject('message') - ->param( - key: 'aliasValue', - default: '', - validator: new Text(length: 255, min: 0), - description: 'alias resolution test value', - optional: true, - aliases: ['alias_value', 'aliased'], - ) - ->action(handleRequest(...)); - -$server - ->error() - ->inject('error') - ->action(function ($th) { - echo $th->getMessage() . PHP_EOL; - }); - -$server->workerStart()->action(function () { - echo 'Worker Started' . PHP_EOL; -}); - -$server->workerStop()->action(function () { - echo 'Worker Stopped' . PHP_EOL; -}); - -$server->start(); diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index eb30cec..b7ebd91 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 01f3ab9..e0ec82d 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -5,12 +5,17 @@ use Utopia\Queue\Server; use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Connection\Locking; use Utopia\Queue\Connection\Redis as RedisConnection; use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -$consumer = new Redis(new RedisConnection('redis')); -$adapter = new Swoole($consumer, 12, 'swoole'); +// Dedicated blocking-receive connection; a separate locked connection for commands. +$consumer = new Redis( + receive: new RedisConnection('redis'), + commands: new Locking(new RedisConnection('redis')), +); +$adapter = new Swoole($consumer, 12, 'swoole', maxCoroutines: 5); $server = new Server($adapter); $server->job() diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index eb30cec..b7ebd91 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index a851186..51cfcef 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -9,12 +9,14 @@ use Utopia\Queue\Server; use Utopia\Validator\Text; +$nodes = [ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', +]; $consumer = new Redis( - new RedisCluster([ - 'redis-cluster-0:6379', - 'redis-cluster-1:6379', - 'redis-cluster-2:6379', - ]), + receive: new RedisCluster($nodes), + commands: new RedisCluster($nodes), ); $adapter = new Swoole($consumer, 12, 'swoole-redis-cluster'); $server = new Server($adapter); diff --git a/tests/Queue/servers/Workerman/Dockerfile b/tests/Queue/servers/Workerman/Dockerfile index 1704dd1..84dbd1e 100644 --- a/tests/Queue/servers/Workerman/Dockerfile +++ b/tests/Queue/servers/Workerman/Dockerfile @@ -1,4 +1,4 @@ -FROM phpswoole/swoole:php8.3-alpine +FROM phpswoole/swoole:php8.4-alpine RUN apk add autoconf build-base diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index d26fa44..f78b19f 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -9,7 +9,10 @@ use Utopia\Queue\Broker\Redis; use Utopia\Validator\Text; -$consumer = new Redis(new RedisConnection('redis')); +$consumer = new Redis( + receive: new RedisConnection('redis'), + commands: new RedisConnection('redis'), +); $adapter = new Workerman($consumer, 12, 'wokerman'); $server = new Queue\Server($adapter);