提交 d185a6aa 编写于 作者: weixin_47267244's avatar weixin_47267244

修复+优化代码

上级 a27891e4
......@@ -290,7 +290,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
}
return $messageId;
}, $this->redisPoolName);
}, $this->redisPoolName, true);
}
/**
......@@ -344,7 +344,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($score, $message) {
$redis->zAdd($this->getRedisQueueKey(QueueType::WORKING), $score, json_encode($message->toArray(), \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE));
}, $this->redisPoolName);
}, $this->redisPoolName, true);
return $message;
}
......@@ -363,7 +363,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
*/
public function delete(IMessage $message): bool
{
return Redis::use(fn (\Imi\Redis\RedisHandler $redis) => $redis->sAdd($this->getRedisQueueKey('deleted'), $message->getMessageId()) > 0, $this->redisPoolName);
return Redis::use(fn (\Imi\Redis\RedisHandler $redis) => $redis->sAdd($this->getRedisQueueKey('deleted'), $message->getMessageId()) > 0, $this->redisPoolName, true);
}
/**
......@@ -417,7 +417,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
{
}
}
}, $this->redisPoolName);
}, $this->redisPoolName, true);
}
/**
......@@ -546,7 +546,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
$status['delay'] = $delayReady;
return new QueueStatus($status);
}, $this->redisPoolName);
}, $this->redisPoolName, true);
}
/**
......@@ -650,7 +650,7 @@ class AMQPQueueDriverHandler implements IQueueDriver
$amqpMessage->setRoutingKey(AMQPQueueDriver::ROUTING_TIMEOUT);
$this->timeoutPublisher->publish($amqpMessage);
}
}, $this->redisPoolName);
}, $this->redisPoolName, true);
}
/**
......@@ -672,6 +672,6 @@ class AMQPQueueDriverHandler implements IQueueDriver
$this->getRedisQueueKey('deleted'),
$redis->_serialize($messageId),
$delete,
], 1) > 0, $this->redisPoolName);
], 1) > 0, $this->redisPoolName, true);
}
}
......@@ -44,12 +44,28 @@ class RedisQueueDriver implements IQueueDriver
*/
protected float $timespan = 0.03;
private ?string $keyName = null;
public function __construct(string $name, array $config = [])
{
$this->name = $name;
$this->traitConstruct($config);
}
public function __init(): void
{
Redis::use(function (\Imi\Redis\RedisHandler $redis) {
if ($redis->isCluster())
{
$this->keyName = '{' . $this->name . '}';
}
else
{
$this->keyName = $this->name;
}
}, $this->poolName, true);
}
/**
* {@inheritDoc}
*/
......@@ -153,7 +169,7 @@ class RedisQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -161,27 +177,27 @@ class RedisQueueDriver implements IQueueDriver
*/
public function pop(float $timeout = 0): ?IMessage
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($timeout) {
$time = $useTime = 0;
do
$time = $useTime = 0;
do
{
if ($timeout > 0)
{
if ($timeout > 0)
if ($time)
{
if ($time)
$leftTime = $timeout - $useTime;
if ($leftTime > $this->timespan)
{
$leftTime = $timeout - $useTime;
if ($leftTime > $this->timespan)
{
usleep((int) ($this->timespan * 1000000));
}
}
else
{
$time = microtime(true);
usleep((int) ($this->timespan * 1000000));
}
}
$this->parseDelayMessages();
$this->parseTimeoutMessages();
else
{
$time = microtime(true);
}
}
$result = Redis::use(function (\Imi\Redis\RedisHandler $redis) {
$this->parseDelayMessages($redis);
$this->parseTimeoutMessages($redis);
$result = $redis->evalEx(<<<'LUA'
-- 从列表弹出
local messageId = redis.call('lpop', KEYS[1])
......@@ -207,19 +223,6 @@ class RedisQueueDriver implements IQueueDriver
$this->getMessageKeyPrefix(),
microtime(true),
], 3);
if ($result && \is_array($result))
{
$data = [];
$length = \count($result);
for ($i = 0; $i < $length; $i += 2)
{
$data[$result[$i]] = $result[$i + 1];
}
$message = new Message();
$message->loadFromArray($data);
return $message;
}
if (false === $result)
{
if (null === ($error = $redis->getLastError()))
......@@ -231,15 +234,30 @@ class RedisQueueDriver implements IQueueDriver
throw new QueueException('Queue pop failed, ' . $error);
}
}
if ($timeout < 0)
return $result;
}, $this->poolName, true);
if ($result && \is_array($result))
{
$data = [];
$length = \count($result);
for ($i = 0; $i < $length; $i += 2)
{
return null;
$data[$result[$i]] = $result[$i + 1];
}
$message = new Message();
$message->loadFromArray($data);
return $message;
}
if ($timeout < 0)
{
return null;
}
while (($useTime = (microtime(true) - $time)) < $timeout);
}
while (($useTime = (microtime(true) - $time)) < $timeout);
return null;
}, $this->poolName);
return null;
}
/**
......@@ -279,7 +297,7 @@ class RedisQueueDriver implements IQueueDriver
}
return 1 == $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -303,7 +321,7 @@ class RedisQueueDriver implements IQueueDriver
Redis::use(static function (\Imi\Redis\RedisHandler $redis) use ($keys) {
$redis->del(...$keys);
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -340,7 +358,7 @@ class RedisQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -349,24 +367,10 @@ class RedisQueueDriver implements IQueueDriver
public function fail(IMessage $message, bool $requeue = false): int
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($message, $requeue) {
if ($requeue)
{
$operation = <<<'LUA'
-- 加入队列
redis.call('rpush', KEYS[2], ARGV[1]);
LUA;
}
else
{
$operation = <<<'LUA'
-- 加入失败队列
redis.call('rpush', KEYS[2], ARGV[1])
LUA;
}
$result = $redis->evalEx(<<<LUA
$result = $redis->evalEx(<<<'LUA'
-- 从工作队列删除
redis.call('zrem', KEYS[1], ARGV[1])
{$operation}
redis.call('rpush', KEYS[2], ARGV[1])
return true
LUA, [
$this->getQueueKey(QueueType::WORKING),
......@@ -387,7 +391,7 @@ class RedisQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -415,7 +419,7 @@ class RedisQueueDriver implements IQueueDriver
}
return new QueueStatus($status);
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -449,7 +453,7 @@ class RedisQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -483,7 +487,7 @@ class RedisQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -491,42 +495,40 @@ class RedisQueueDriver implements IQueueDriver
*
* 返回消息数量
*/
protected function parseDelayMessages(int $count = 100): int
protected function parseDelayMessages(\Imi\Redis\RedisHandler $redis, int $count = 100): int
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($count) {
$result = $redis->evalEx(<<<'LUA'
-- 查询消息ID
local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2])
local messageIdCount = table.getn(messageIds)
if 0 == messageIdCount then
return 0
end
-- 加入队列
redis.call('rpush', KEYS[1], unpack(messageIds))
-- 从延时队列删除
redis.call('zrem', KEYS[2], unpack(messageIds))
return messageIdCount
LUA, [
$this->getQueueKey(QueueType::READY),
$this->getQueueKey(QueueType::DELAY),
microtime(true),
$count,
], 2);
if (false === $result)
$result = $redis->evalEx(<<<'LUA'
-- 查询消息ID
local messageIds = redis.call('zrevrangebyscore', KEYS[2], ARGV[1], 0, 'limit', 0, ARGV[2])
local messageIdCount = table.getn(messageIds)
if 0 == messageIdCount then
return 0
end
-- 加入队列
redis.call('rpush', KEYS[1], unpack(messageIds))
-- 从延时队列删除
redis.call('zrem', KEYS[2], unpack(messageIds))
return messageIdCount
LUA, [
$this->getQueueKey(QueueType::READY),
$this->getQueueKey(QueueType::DELAY),
microtime(true),
$count,
], 2);
if (false === $result)
{
if (null === ($error = $redis->getLastError()))
{
if (null === ($error = $redis->getLastError()))
{
throw new QueueException('Queue parseDelayMessages failed');
}
else
{
throw new QueueException('Queue parseDelayMessages failed, ' . $error);
}
throw new QueueException('Queue parseDelayMessages failed');
}
else
{
throw new QueueException('Queue parseDelayMessages failed, ' . $error);
}
}
return $result;
}, $this->poolName);
return $result;
}
/**
......@@ -534,42 +536,40 @@ class RedisQueueDriver implements IQueueDriver
*
* 返回消息数量
*/
protected function parseTimeoutMessages(int $count = 100): int
protected function parseTimeoutMessages(\Imi\Redis\RedisHandler $redis, int $count = 100): int
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($count) {
$result = $redis->evalEx(<<<'LUA'
-- 查询消息ID
local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2])
local messageIdCount = table.getn(messageIds)
if 0 == messageIdCount then
return 0
end
-- 加入超时队列
redis.call('rpush', KEYS[2], unpack(messageIds))
-- 从工作队列删除
redis.call('zrem', KEYS[1], unpack(messageIds))
return messageIdCount
LUA, [
$this->getQueueKey(QueueType::WORKING),
$this->getQueueKey(QueueType::TIMEOUT),
microtime(true),
$count,
], 2);
if (false === $result)
$result = $redis->evalEx(<<<'LUA'
-- 查询消息ID
local messageIds = redis.call('zrevrangebyscore', KEYS[1], ARGV[1], 0, 'limit', 0, ARGV[2])
local messageIdCount = table.getn(messageIds)
if 0 == messageIdCount then
return 0
end
-- 加入超时队列
redis.call('rpush', KEYS[2], unpack(messageIds))
-- 从工作队列删除
redis.call('zrem', KEYS[1], unpack(messageIds))
return messageIdCount
LUA, [
$this->getQueueKey(QueueType::WORKING),
$this->getQueueKey(QueueType::TIMEOUT),
microtime(true),
$count,
], 2);
if (false === $result)
{
if (null === ($error = $redis->getLastError()))
{
if (null === ($error = $redis->getLastError()))
{
throw new QueueException('Queue parseTimeoutMessages failed');
}
else
{
throw new QueueException('Queue parseTimeoutMessages failed, ' . $error);
}
throw new QueueException('Queue parseTimeoutMessages failed');
}
else
{
throw new QueueException('Queue parseTimeoutMessages failed, ' . $error);
}
}
return (int) $result;
}, $this->poolName);
return (int) $result;
}
/**
......@@ -577,18 +577,7 @@ class RedisQueueDriver implements IQueueDriver
*/
public function getMessageKeyPrefix(): string
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) {
if ($redis->isCluster())
{
$name = '{' . $this->name . '}';
}
else
{
$name = $this->name;
}
return $this->prefix . $name . ':message:';
}, $this->poolName);
return $this->prefix . $this->keyName . ':message:';
}
/**
......@@ -596,18 +585,7 @@ class RedisQueueDriver implements IQueueDriver
*/
public function getMessageIdKey(): string
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) {
if ($redis->isCluster())
{
$name = '{' . $this->name . '}';
}
else
{
$name = $this->name;
}
return $this->prefix . $name . ':message:id';
}, $this->poolName);
return $this->prefix . $this->keyName . ':message:id';
}
/**
......@@ -615,17 +593,6 @@ class RedisQueueDriver implements IQueueDriver
*/
public function getQueueKey(int $queueType): string
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) use ($queueType) {
if ($redis->isCluster())
{
$name = '{' . $this->name . '}';
}
else
{
$name = $this->name;
}
return $this->prefix . $name . ':' . strtolower(QueueType::getName($queueType));
}, $this->poolName);
return $this->prefix . $this->keyName . ':' . strtolower(QueueType::getName($queueType));
}
}
......@@ -79,12 +79,28 @@ class RedisStreamQueueDriver implements IQueueDriver
*/
private bool $groupInited = false;
private ?string $keyName = null;
public function __construct(string $name, array $config = [])
{
$this->name = $name;
$this->traitConstruct($config);
}
public function __init(): void
{
Redis::use(function (\Imi\Redis\RedisHandler $redis) {
if ($redis->isCluster())
{
$this->keyName = '{' . $this->name . '}';
}
else
{
$this->keyName = $this->name;
}
}, $this->poolName, true);
}
/**
* {@inheritDoc}
*/
......@@ -129,7 +145,7 @@ class RedisStreamQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -163,7 +179,7 @@ class RedisStreamQueueDriver implements IQueueDriver
$message->loadFromArray($data);
return $message;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -187,7 +203,7 @@ class RedisStreamQueueDriver implements IQueueDriver
}
return 1 == $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -197,7 +213,7 @@ class RedisStreamQueueDriver implements IQueueDriver
{
Redis::use(function (\Imi\Redis\RedisHandler $redis) {
$redis->del($this->getQueueKey());
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -223,7 +239,7 @@ class RedisStreamQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -264,7 +280,7 @@ class RedisStreamQueueDriver implements IQueueDriver
return $result ? 1 : 0;
}
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -319,7 +335,7 @@ class RedisStreamQueueDriver implements IQueueDriver
$status['ready'] = $status['timeout'] = $status['delay'] = 0;
return new QueueStatus($status);
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -375,7 +391,7 @@ class RedisStreamQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -450,7 +466,7 @@ class RedisStreamQueueDriver implements IQueueDriver
}
return $result;
}, $this->poolName);
}, $this->poolName, true);
}
/**
......@@ -458,18 +474,7 @@ class RedisStreamQueueDriver implements IQueueDriver
*/
public function getQueueKey(): string
{
return Redis::use(function (\Imi\Redis\RedisHandler $redis) {
if ($redis->isCluster())
{
$name = '{' . $this->name . '}';
}
else
{
$name = $this->name;
}
return $this->prefix . $name;
}, $this->poolName);
return $this->prefix . $this->keyName;
}
protected function prepareGroup(\Imi\Redis\RedisHandler $redis): void
......
......@@ -229,8 +229,8 @@ class PoolManager
*/
public static function releaseResource(IPoolResource $resource): void
{
$resource->getPool()->release($resource);
self::removeResourceFromRequestContext($resource);
$resource->getPool()->release($resource);
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册