diff --git a/src/Components/amqp/src/Queue/AMQPQueueDriverHandler.php b/src/Components/amqp/src/Queue/AMQPQueueDriverHandler.php index 02ed7485201fc906e764f0094ffd30b00ec9459d..8612b2618470baf2d214bea0004eb7510f9ef5b3 100644 --- a/src/Components/amqp/src/Queue/AMQPQueueDriverHandler.php +++ b/src/Components/amqp/src/Queue/AMQPQueueDriverHandler.php @@ -290,7 +290,7 @@ class AMQPQueueDriverHandler implements IQueueDriver } return $messageId; - }, $this->redisPoolName); + }, $this->redisPoolName, true); } /** @@ -344,7 +344,7 @@ class AMQPQueueDriverHandler implements IQueueDriver Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($score, $message) { $redis->zAdd($this->getRedisQueueKey(QueueType::WORKING), $score, json_encode($message->toArray(), \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE)); - }, $this->redisPoolName); + }, $this->redisPoolName, true); return $message; } @@ -363,7 +363,7 @@ class AMQPQueueDriverHandler implements IQueueDriver */ public function delete(IMessage $message): bool { - return Redis::use(fn (\Imi\Redis\RedisHandler $redis) => $redis->sAdd($this->getRedisQueueKey('deleted'), $message->getMessageId()) > 0, $this->redisPoolName); + return Redis::use(fn (\Imi\Redis\RedisHandler $redis) => $redis->sAdd($this->getRedisQueueKey('deleted'), $message->getMessageId()) > 0, $this->redisPoolName, true); } /** @@ -417,7 +417,7 @@ class AMQPQueueDriverHandler implements IQueueDriver { } } - }, $this->redisPoolName); + }, $this->redisPoolName, true); } /** @@ -546,7 +546,7 @@ class AMQPQueueDriverHandler implements IQueueDriver $status['delay'] = $delayReady; return new QueueStatus($status); - }, $this->redisPoolName); + }, $this->redisPoolName, true); } /** @@ -650,7 +650,7 @@ class AMQPQueueDriverHandler implements IQueueDriver $amqpMessage->setRoutingKey(AMQPQueueDriver::ROUTING_TIMEOUT); $this->timeoutPublisher->publish($amqpMessage); } - }, $this->redisPoolName); + }, $this->redisPoolName, true); } /** @@ -672,6 +672,6 @@ class AMQPQueueDriverHandler implements IQueueDriver $this->getRedisQueueKey('deleted'), $redis->_serialize($messageId), $delete, - ], 1) > 0, $this->redisPoolName); + ], 1) > 0, $this->redisPoolName, true); } } diff --git a/src/Components/queue/src/Driver/RedisQueueDriver.php b/src/Components/queue/src/Driver/RedisQueueDriver.php index 83806972ed1d82b30cef45d0e8135017796095e1..0aafef3d14e371c2e4ac52e2e49fe9d617dec2c6 100644 --- a/src/Components/queue/src/Driver/RedisQueueDriver.php +++ b/src/Components/queue/src/Driver/RedisQueueDriver.php @@ -44,12 +44,28 @@ class RedisQueueDriver implements IQueueDriver */ protected float $timespan = 0.03; + private ?string $keyName = null; + public function __construct(string $name, array $config = []) { $this->name = $name; $this->traitConstruct($config); } + public function __init(): void + { + Redis::use(function (\Imi\Redis\RedisHandler $redis) { + if ($redis->isCluster()) + { + $this->keyName = '{' . $this->name . '}'; + } + else + { + $this->keyName = $this->name; + } + }, $this->poolName, true); + } + /** * {@inheritDoc} */ @@ -153,7 +169,7 @@ class RedisQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -161,27 +177,27 @@ class RedisQueueDriver implements IQueueDriver */ public function pop(float $timeout = 0): ?IMessage { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($timeout) { - $time = $useTime = 0; - do + $time = $useTime = 0; + do + { + if ($timeout > 0) { - if ($timeout > 0) + if ($time) { - if ($time) + $leftTime = $timeout - $useTime; + if ($leftTime > $this->timespan) { - $leftTime = $timeout - $useTime; - if ($leftTime > $this->timespan) - { - usleep((int) ($this->timespan * 1000000)); - } - } - else - { - $time = microtime(true); + usleep((int) ($this->timespan * 1000000)); } } - $this->parseDelayMessages(); - $this->parseTimeoutMessages(); + else + { + $time = microtime(true); + } + } + $result = Redis::use(function (\Imi\Redis\RedisHandler $redis) { + $this->parseDelayMessages($redis); + $this->parseTimeoutMessages($redis); $result = $redis->evalEx(<<<'LUA' -- 从列表弹出 local messageId = redis.call('lpop', KEYS[1]) @@ -207,19 +223,6 @@ class RedisQueueDriver implements IQueueDriver $this->getMessageKeyPrefix(), microtime(true), ], 3); - if ($result && \is_array($result)) - { - $data = []; - $length = \count($result); - for ($i = 0; $i < $length; $i += 2) - { - $data[$result[$i]] = $result[$i + 1]; - } - $message = new Message(); - $message->loadFromArray($data); - - return $message; - } if (false === $result) { if (null === ($error = $redis->getLastError())) @@ -231,15 +234,30 @@ class RedisQueueDriver implements IQueueDriver throw new QueueException('Queue pop failed, ' . $error); } } - if ($timeout < 0) + + return $result; + }, $this->poolName, true); + if ($result && \is_array($result)) + { + $data = []; + $length = \count($result); + for ($i = 0; $i < $length; $i += 2) { - return null; + $data[$result[$i]] = $result[$i + 1]; } + $message = new Message(); + $message->loadFromArray($data); + + return $message; + } + if ($timeout < 0) + { + return null; } - while (($useTime = (microtime(true) - $time)) < $timeout); + } + while (($useTime = (microtime(true) - $time)) < $timeout); - return null; - }, $this->poolName); + return null; } /** @@ -279,7 +297,7 @@ class RedisQueueDriver implements IQueueDriver } return 1 == $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -303,7 +321,7 @@ class RedisQueueDriver implements IQueueDriver Redis::use(static function (\Imi\Redis\RedisHandler $redis) use ($keys) { $redis->del(...$keys); - }, $this->poolName); + }, $this->poolName, true); } /** @@ -340,7 +358,7 @@ class RedisQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -349,24 +367,10 @@ class RedisQueueDriver implements IQueueDriver public function fail(IMessage $message, bool $requeue = false): int { return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($message, $requeue) { - if ($requeue) - { - $operation = <<<'LUA' - -- 加入队列 - redis.call('rpush', KEYS[2], ARGV[1]); - LUA; - } - else - { - $operation = <<<'LUA' - -- 加入失败队列 - redis.call('rpush', KEYS[2], ARGV[1]) - LUA; - } - $result = $redis->evalEx(<<evalEx(<<<'LUA' -- 从工作队列删除 redis.call('zrem', KEYS[1], ARGV[1]) - {$operation} + redis.call('rpush', KEYS[2], ARGV[1]) return true LUA, [ $this->getQueueKey(QueueType::WORKING), @@ -387,7 +391,7 @@ class RedisQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -415,7 +419,7 @@ class RedisQueueDriver implements IQueueDriver } return new QueueStatus($status); - }, $this->poolName); + }, $this->poolName, true); } /** @@ -449,7 +453,7 @@ class RedisQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -483,7 +487,7 @@ class RedisQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -491,42 +495,40 @@ class RedisQueueDriver implements IQueueDriver * * 返回消息数量 */ - protected function parseDelayMessages(int $count = 100): int + protected function parseDelayMessages(\Imi\Redis\RedisHandler $redis, int $count = 100): int { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($count) { - $result = $redis->evalEx(<<<'LUA' - -- 查询消息ID - local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2]) - local messageIdCount = table.getn(messageIds) - if 0 == messageIdCount then - return 0 - end - -- 加入队列 - redis.call('rpush', KEYS[1], unpack(messageIds)) - -- 从延时队列删除 - redis.call('zrem', KEYS[2], unpack(messageIds)) - return messageIdCount - LUA, [ - $this->getQueueKey(QueueType::READY), - $this->getQueueKey(QueueType::DELAY), - microtime(true), - $count, - ], 2); - - if (false === $result) + $result = $redis->evalEx(<<<'LUA' + -- 查询消息ID + local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2]) + local messageIdCount = table.getn(messageIds) + if 0 == messageIdCount then + return 0 + end + -- 加入队列 + redis.call('rpush', KEYS[1], unpack(messageIds)) + -- 从延时队列删除 + redis.call('zrem', KEYS[2], unpack(messageIds)) + return messageIdCount + LUA, [ + $this->getQueueKey(QueueType::READY), + $this->getQueueKey(QueueType::DELAY), + microtime(true), + $count, + ], 2); + + if (false === $result) + { + if (null === ($error = $redis->getLastError())) { - if (null === ($error = $redis->getLastError())) - { - throw new QueueException('Queue parseDelayMessages failed'); - } - else - { - throw new QueueException('Queue parseDelayMessages failed, ' . $error); - } + throw new QueueException('Queue parseDelayMessages failed'); } + else + { + throw new QueueException('Queue parseDelayMessages failed, ' . $error); + } + } - return $result; - }, $this->poolName); + return $result; } /** @@ -534,42 +536,40 @@ class RedisQueueDriver implements IQueueDriver * * 返回消息数量 */ - protected function parseTimeoutMessages(int $count = 100): int + protected function parseTimeoutMessages(\Imi\Redis\RedisHandler $redis, int $count = 100): int { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($count) { - $result = $redis->evalEx(<<<'LUA' - -- 查询消息ID - local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2]) - local messageIdCount = table.getn(messageIds) - if 0 == messageIdCount then - return 0 - end - -- 加入超时队列 - redis.call('rpush', KEYS[2], unpack(messageIds)) - -- 从工作队列删除 - redis.call('zrem', KEYS[1], unpack(messageIds)) - return messageIdCount - LUA, [ - $this->getQueueKey(QueueType::WORKING), - $this->getQueueKey(QueueType::TIMEOUT), - microtime(true), - $count, - ], 2); - - if (false === $result) + $result = $redis->evalEx(<<<'LUA' + -- 查询消息ID + local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2]) + local messageIdCount = table.getn(messageIds) + if 0 == messageIdCount then + return 0 + end + -- 加入超时队列 + redis.call('rpush', KEYS[2], unpack(messageIds)) + -- 从工作队列删除 + redis.call('zrem', KEYS[1], unpack(messageIds)) + return messageIdCount + LUA, [ + $this->getQueueKey(QueueType::WORKING), + $this->getQueueKey(QueueType::TIMEOUT), + microtime(true), + $count, + ], 2); + + if (false === $result) + { + if (null === ($error = $redis->getLastError())) { - if (null === ($error = $redis->getLastError())) - { - throw new QueueException('Queue parseTimeoutMessages failed'); - } - else - { - throw new QueueException('Queue parseTimeoutMessages failed, ' . $error); - } + throw new QueueException('Queue parseTimeoutMessages failed'); + } + else + { + throw new QueueException('Queue parseTimeoutMessages failed, ' . $error); } + } - return (int) $result; - }, $this->poolName); + return (int) $result; } /** @@ -577,18 +577,7 @@ class RedisQueueDriver implements IQueueDriver */ public function getMessageKeyPrefix(): string { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) { - if ($redis->isCluster()) - { - $name = '{' . $this->name . '}'; - } - else - { - $name = $this->name; - } - - return $this->prefix . $name . ':message:'; - }, $this->poolName); + return $this->prefix . $this->keyName . ':message:'; } /** @@ -596,18 +585,7 @@ class RedisQueueDriver implements IQueueDriver */ public function getMessageIdKey(): string { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) { - if ($redis->isCluster()) - { - $name = '{' . $this->name . '}'; - } - else - { - $name = $this->name; - } - - return $this->prefix . $name . ':message:id'; - }, $this->poolName); + return $this->prefix . $this->keyName . ':message:id'; } /** @@ -615,17 +593,6 @@ class RedisQueueDriver implements IQueueDriver */ public function getQueueKey(int $queueType): string { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($queueType) { - if ($redis->isCluster()) - { - $name = '{' . $this->name . '}'; - } - else - { - $name = $this->name; - } - - return $this->prefix . $name . ':' . strtolower(QueueType::getName($queueType)); - }, $this->poolName); + return $this->prefix . $this->keyName . ':' . strtolower(QueueType::getName($queueType)); } } diff --git a/src/Components/queue/src/Driver/RedisStreamQueueDriver.php b/src/Components/queue/src/Driver/RedisStreamQueueDriver.php index 10f55de3e76e813b204d7da980aee8b17b01ae23..5df1a8a0bfa0bd9570d460ac4492cf8f900662a2 100644 --- a/src/Components/queue/src/Driver/RedisStreamQueueDriver.php +++ b/src/Components/queue/src/Driver/RedisStreamQueueDriver.php @@ -79,12 +79,28 @@ class RedisStreamQueueDriver implements IQueueDriver */ private bool $groupInited = false; + private ?string $keyName = null; + public function __construct(string $name, array $config = []) { $this->name = $name; $this->traitConstruct($config); } + public function __init(): void + { + Redis::use(function (\Imi\Redis\RedisHandler $redis) { + if ($redis->isCluster()) + { + $this->keyName = '{' . $this->name . '}'; + } + else + { + $this->keyName = $this->name; + } + }, $this->poolName, true); + } + /** * {@inheritDoc} */ @@ -129,7 +145,7 @@ class RedisStreamQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -163,7 +179,7 @@ class RedisStreamQueueDriver implements IQueueDriver $message->loadFromArray($data); return $message; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -187,7 +203,7 @@ class RedisStreamQueueDriver implements IQueueDriver } return 1 == $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -197,7 +213,7 @@ class RedisStreamQueueDriver implements IQueueDriver { Redis::use(function (\Imi\Redis\RedisHandler $redis) { $redis->del($this->getQueueKey()); - }, $this->poolName); + }, $this->poolName, true); } /** @@ -223,7 +239,7 @@ class RedisStreamQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -264,7 +280,7 @@ class RedisStreamQueueDriver implements IQueueDriver return $result ? 1 : 0; } - }, $this->poolName); + }, $this->poolName, true); } /** @@ -319,7 +335,7 @@ class RedisStreamQueueDriver implements IQueueDriver $status['ready'] = $status['timeout'] = $status['delay'] = 0; return new QueueStatus($status); - }, $this->poolName); + }, $this->poolName, true); } /** @@ -375,7 +391,7 @@ class RedisStreamQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -450,7 +466,7 @@ class RedisStreamQueueDriver implements IQueueDriver } return $result; - }, $this->poolName); + }, $this->poolName, true); } /** @@ -458,18 +474,7 @@ class RedisStreamQueueDriver implements IQueueDriver */ public function getQueueKey(): string { - return Redis::use(function (\Imi\Redis\RedisHandler $redis) { - if ($redis->isCluster()) - { - $name = '{' . $this->name . '}'; - } - else - { - $name = $this->name; - } - - return $this->prefix . $name; - }, $this->poolName); + return $this->prefix . $this->keyName; } protected function prepareGroup(\Imi\Redis\RedisHandler $redis): void diff --git a/src/Pool/PoolManager.php b/src/Pool/PoolManager.php index 510b60185d4edf9964265a925b4bcfd427eb0768..63d7c59fb8d9348cd07eaa01ea6b671b19d5afbc 100644 --- a/src/Pool/PoolManager.php +++ b/src/Pool/PoolManager.php @@ -229,8 +229,8 @@ class PoolManager */ public static function releaseResource(IPoolResource $resource): void { - $resource->getPool()->release($resource); self::removeResourceFromRequestContext($resource); + $resource->getPool()->release($resource); } /**