Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,31 @@ class Redis implements Connection
protected int $port;
protected ?string $user;
protected ?string $password;
protected float $connectTimeout;
protected float $readTimeout;
protected ?\Redis $redis = null;

public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null)
/**
* @param string $host Redis host.
* @param int $port Redis port.
* @param string|null $user Redis ACL username (optional).
* @param string|null $password Redis password (optional).
* @param float $connectTimeout Connection timeout in seconds (0 = no timeout).
* @param float $readTimeout Socket read timeout in seconds (-1 = infinite).
* Use -1 for consumers so blocking commands (BRPOP/BLPOP)
* are not interrupted; the per-call blockingReadTimeout()
* helper adds a safety buffer automatically.
* Use a positive value (e.g. 5) for publishers so a hung
* Redis fails fast rather than blocking indefinitely.
*/
public function __construct(string $host, int $port = 6379, ?string $user = null, ?string $password = null, float $connectTimeout = 5, float $readTimeout = -1)
{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
$this->connectTimeout = $connectTimeout;
$this->readTimeout = $readTimeout;
}

public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false
Expand All @@ -30,16 +47,30 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t

return json_decode($response, true);
}

public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
{
$response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT);
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->bRPopLPush($queue, $destination, $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev);
}
}

if (!$response) {
return false;
}

return $response;
}

public function rightPushArray(string $queue, array $value): bool
{
return !!$this->getRedis()->rPush($queue, json_encode($value));
Expand Down Expand Up @@ -73,7 +104,19 @@ public function rightPopArray(string $queue, int $timeout): array|false

public function rightPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->brPop([$queue], $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT);
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->brPop([$queue], $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev);
}
}

if (empty($response)) {
return false;
Expand All @@ -84,18 +127,30 @@ public function rightPop(string $queue, int $timeout): string|false

public function leftPopArray(string $queue, int $timeout): array|false
{
$response = $this->getRedis()->blPop($queue, $timeout);
$response = $this->leftPop($queue, $timeout);

if (empty($response)) {
if ($response === false) {
return false;
}

return json_decode($response[1], true) ?? false;
return json_decode($response, true) ?? false;
}

public function leftPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->blPop($queue, $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(\Redis::OPT_READ_TIMEOUT);
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->blPop($queue, $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $prev);
}
}

if (empty($response)) {
return false;
Expand Down Expand Up @@ -186,8 +241,27 @@ protected function getRedis(): \Redis

$this->redis = new \Redis();

$this->redis->connect($this->host, $this->port);
$this->redis->connect($this->host, $this->port, $this->connectTimeout);
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
Comment on lines +244 to +245
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if auth() is called anywhere else or if there's a base class handling it
rg -n 'auth\(' src/Queue/Connection/Redis.php
echo "---"
# Check if Connection interface or any parent class handles auth
rg -n 'auth' src/Queue/Connection/
echo "---"
# Check git history for auth in getRedis
git log --oneline -10 -- src/Queue/Connection/Redis.php

Repository: utopia-php/queue

Length of output: 150


🏁 Script executed:

# Let's examine the Redis.php file to see constructor and getRedis method
cat -n src/Queue/Connection/Redis.php | head -100

Repository: utopia-php/queue

Length of output: 4220


🏁 Script executed:

# Continue reading Redis.php to see the rest of the constructor and getRedis method
cat -n src/Queue/Connection/Redis.php | sed -n '100,280p'

Repository: utopia-php/queue

Length of output: 5685


🏁 Script executed:

# Check if there's a parent class or interface
rg -n 'class Redis|interface Connection' src/Queue/Connection/

Repository: utopia-php/queue

Length of output: 207


🏁 Script executed:

# Check the Connection interface definition
cat -n src/Queue/Connection.php

Repository: utopia-php/queue

Length of output: 1786


🏁 Script executed:

# Check if RedisCluster handles auth differently
rg -n 'auth' src/Queue/Connection/RedisCluster.php

Repository: utopia-php/queue

Length of output: 42


Call auth() or auth(password) in getRedis() to authenticate with stored credentials.

The constructor accepts and stores $user and $password (line 30), but getRedis() never calls the PHP Redis auth() method. This means credentials are accepted but silently ignored, bypassing authentication entirely. Either implement authentication by calling auth() in getRedis() after connecting, or remove the unused parameters from the constructor.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Connection/Redis.php` around lines 244 - 245, getRedis() currently
connects but never authenticates using the stored constructor credentials ($user
and $password), so either call $this->redis->auth($this->password) or
$this->redis->auth($this->user, $this->password) immediately after connect()
(and only when credentials are present), and handle failures by throwing or
logging an exception; update getRedis() to perform this auth step (or remove the
unused $user/$password fields from the constructor if you prefer) so
authentication is not silently ignored.


return $this->redis;
}

/**
* Returns the read timeout to use for a blocking command.
* Ensures the socket does not time out before Redis returns.
* A $timeout of 0 means block indefinitely, so we use -1 (infinite).
*/
private function blockingReadTimeout(int $timeout): float
{
if ($timeout <= 0) {
return -1;
}
// Add 1s buffer so the socket outlasts the Redis-side block timeout.
// Also respect an explicit readTimeout if it is already larger.
if ($this->readTimeout < 0) {
return -1;
}
return max((float)($timeout + 1), $this->readTimeout);
}
}
91 changes: 83 additions & 8 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,31 @@

class RedisCluster implements Connection
{
// OPT_READ_TIMEOUT (3) is defined on \Redis but not on \RedisCluster in all
// environments (e.g. Swoole replaces RedisCluster with its own coroutine-aware
// class that omits the constant). The numeric value is stable across phpredis.
private const OPT_READ_TIMEOUT = 3;

protected array $seeds;
protected float $connectTimeout;
protected float $readTimeout;
protected ?\RedisCluster $redis = null;

public function __construct(array $seeds)
/**
* @param array $seeds Cluster seed nodes in "host:port" format.
* @param float $connectTimeout Connection timeout in seconds per node (0 = no timeout).
* @param float $readTimeout Socket read timeout in seconds (-1 = infinite).
* Use -1 for consumers so blocking commands (BRPOP/BLPOP)
* are not interrupted; the per-call blockingReadTimeout()
* helper adds a safety buffer automatically.
* Use a positive value (e.g. 5) for publishers so a hung
* node fails fast rather than blocking indefinitely.
*/
public function __construct(array $seeds, float $connectTimeout = 5, float $readTimeout = -1)
{
$this->seeds = $seeds;
$this->connectTimeout = $connectTimeout;
$this->readTimeout = $readTimeout;
}

public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false
Expand All @@ -24,16 +43,30 @@ public function rightPopLeftPushArray(string $queue, string $destination, int $t

return json_decode($response, true);
}

public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
{
$response = $this->getRedis()->bRPopLPush($queue, $destination, $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(self::OPT_READ_TIMEOUT);
$redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->bRPopLPush($queue, $destination, $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(self::OPT_READ_TIMEOUT, $prev);
}
}

if (!$response) {
return false;
}

return $response;
}

public function rightPushArray(string $queue, array $value): bool
{
return !!$this->getRedis()->rPush($queue, json_encode($value));
Expand Down Expand Up @@ -67,7 +100,19 @@ public function rightPopArray(string $queue, int $timeout): array|false

public function rightPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->brPop([$queue], $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(self::OPT_READ_TIMEOUT);
$redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->brPop([$queue], $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(self::OPT_READ_TIMEOUT, $prev);
}
}

if (empty($response)) {
return false;
Expand All @@ -78,18 +123,30 @@ public function rightPop(string $queue, int $timeout): string|false

public function leftPopArray(string $queue, int $timeout): array|false
{
$response = $this->getRedis()->blPop($queue, $timeout);
$response = $this->leftPop($queue, $timeout);

if (empty($response)) {
if ($response === false) {
return false;
}

return json_decode($response[1], true) ?? false;
return json_decode($response, true) ?? false;
}

public function leftPop(string $queue, int $timeout): string|false
{
$response = $this->getRedis()->blPop($queue, $timeout);
$redis = $this->getRedis();
$prev = $redis->getOption(self::OPT_READ_TIMEOUT);
$redis->setOption(self::OPT_READ_TIMEOUT, $this->blockingReadTimeout($timeout));
try {
$response = $redis->blPop($queue, $timeout);
} catch (\RedisException $e) {
$this->redis = null;
throw $e;
} finally {
if ($this->redis) {
$redis->setOption(self::OPT_READ_TIMEOUT, $prev);
}
}

if (empty($response)) {
return false;
Expand Down Expand Up @@ -181,7 +238,25 @@ protected function getRedis(): \RedisCluster
return $this->redis;
}

$this->redis = new \RedisCluster(null, $this->seeds);
$this->redis = new \RedisCluster(null, $this->seeds, $this->connectTimeout, $this->readTimeout);
return $this->redis;
}

/**
* Returns the read timeout to use for a blocking command.
* Ensures the socket does not time out before Redis returns.
* A $timeout of 0 means block indefinitely, so we use -1 (infinite).
*/
private function blockingReadTimeout(int $timeout): float
{
if ($timeout <= 0) {
return -1;
}
// Add 1s buffer so the socket outlasts the Redis-side block timeout.
// Also respect an explicit readTimeout if it is already larger.
if ($this->readTimeout < 0) {
return -1;
}
return max((float)($timeout + 1), $this->readTimeout);
}
}