-
-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathSentinelClient.php
More file actions
93 lines (80 loc) · 2.86 KB
/
SentinelClient.php
File metadata and controls
93 lines (80 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
<?php
declare(strict_types=1);
namespace Clue\React\Redis;
use Clue\React\Redis\Io\Factory;
use Clue\React\Redis\Io\StreamingClient;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
use React\Socket\ConnectorInterface;
use function React\Promise\reject;
use function React\Promise\resolve;
/**
* Client for receiving Sentinel master url
*/
class SentinelClient
{
/** @var array<string> */
private $urls;
/** @var string */
private $masterName;
/** @var Factory */
private $factory;
/** @var StreamingClient */
private $masterClient;
/**
* @param array $urls list of sentinel addresses
* @param string $masterName sentinel master name
* @param ?ConnectorInterface $connector
* @param ?LoopInterface $loop
*/
public function __construct(array $urls, string $masterName, ConnectorInterface $connector = null, LoopInterface $loop = null)
{
$this->urls = $urls;
$this->masterName = $masterName;
$this->factory = new Factory($loop ?: Loop::get(), $connector);
}
public function masterAddress(): PromiseInterface
{
$chain = reject(new \RuntimeException('Initial reject promise'));
foreach ($this->urls as $url) {
$chain = $chain->then(function ($masterUrl) {
return $masterUrl;
}, function () use ($url) {
return $this->onError($url);
});
}
return $chain;
}
public function masterConnection(string $masterUriPath = '', array $masterUriParams = []): PromiseInterface
{
if (isset($this->masterClient)) {
return resolve($this->masterClient);
}
return $this
->masterAddress()
->then(function (string $masterUrl) use ($masterUriPath, $masterUriParams) {
$query = $masterUriParams ? '?' . http_build_query($masterUriParams) : '';
return $this->factory->createClient($masterUrl . $masterUriPath . $query);
})
->then(function (StreamingClient $client) {
$this->masterClient = $client;
return $client->role();
})
->then(function (array $role) {
$isRealMaster = ($role[0] ?? '') === 'master';
return $isRealMaster ? $this->masterClient : reject(new \RuntimeException("Invalid master role: {$role[0]}"));
});
}
private function onError(string $nextUrl): PromiseInterface
{
return $this->factory
->createClient($nextUrl)
->then(function (StreamingClient $client) {
return $client->sentinel('get-master-addr-by-name', $this->masterName);
})
->then(function (array $response) {
return $response[0] . ':' . $response[1]; // ip:port
});
}
}