-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathapi.lua
More file actions
126 lines (108 loc) · 3.82 KB
/
api.lua
File metadata and controls
126 lines (108 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
local cartridge = require('cartridge')
local config = require('sharded_queue.router.config')
local metrics = require('sharded_queue.router.metrics')
local utils = require('sharded_queue.utils')
local queue = require('sharded_queue.router.queue')
local consts = require('sharded_queue.consts')
function cfg_call(_, options)
options = options or {}
if options.metrics == nil then
return
end
if type(options.metrics) ~= 'boolean' then
error('"metrics" must be a boolean')
end
if config.metrics ~= options.metrics then
local tubes = cartridge.config_get_deepcopy('tubes') or {}
if tubes['cfg'] ~= nil and tubes['cfg'].metrics == nil then
error('tube "cfg" exist, unable to update a default configuration')
end
tubes['cfg'] = {metrics = options.metrics}
local ok, err = cartridge.config_patch_clusterwide({ tubes = tubes })
if not ok then error(err) end
end
end
local function init(opts)
queue.export_globals()
end
local function validate_config(cfg)
local cfg_tubes = cfg.tubes or {}
local ok, err = utils.validate_tubes(cfg_tubes, false)
if not ok then
return ok, err
end
ok, err = utils.validate_dlq(cfg_tubes or {})
if not ok then
return ok, err
end
return utils.validate_cfg(cfg_tubes['cfg'])
end
local function apply_config(cfg, opts)
local cfg_tubes = cfg.tubes or {}
-- try init tubes --
for tube_name, options in pairs(cfg_tubes) do
if tube_name == 'cfg' then
if options.metrics ~= nil then
config.metrics = options.metrics and true or false
end
elseif queue.map()[tube_name] == nil then
queue.add(tube_name, metrics, options)
if options.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
local dlq_options = table.deepcopy(options or {})
dlq_options.release_limit_policy = nil
dlq_options.release_limit = -1
queue.add(tube_name .. consts.DLQ_SUFFIX, metrics, dlq_options)
end
end
end
-- try drop tubes --
for tube_name, _ in pairs(queue.map()) do
if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then
-- Dead letter queue is not in config,
-- so instead we check that main queue is in it.
local tube = cfg_tubes[tube_name:sub(1, -(#consts.DLQ_SUFFIX+1))]
if tube_name:sub(-#consts.DLQ_SUFFIX) == consts.DLQ_SUFFIX and tube
and tube.release_limit_policy == consts.RELEASE_LIMIT_POLICY.DLQ then
break
end
queue.remove(tube_name)
end
end
if config.metrics then
metrics.enable(queue)
else
metrics.disable()
end
end
local function queue_action_wrapper(action)
return function(name, ...)
return queue.call(name, action, ...)
end
end
return {
init = init,
apply_config = apply_config,
validate_config = validate_config,
put = queue_action_wrapper('put'),
take = queue_action_wrapper('take'),
delete = queue_action_wrapper('delete'),
release = queue_action_wrapper('release'),
touch = queue_action_wrapper('touch'),
ack = queue_action_wrapper('ack'),
bury = queue_action_wrapper('bury'),
kick = queue_action_wrapper('kick'),
peek = queue_action_wrapper('peek'),
drop = queue_action_wrapper('drop'),
truncate = queue_action_wrapper('truncate'),
cfg = setmetatable({}, {
__index = config,
__newindex = function() error("Use api.cfg() instead", 2) end,
__call = cfg_call,
__serialize = function() return config end,
}),
statistics = queue.statistics,
_VERSION = require('sharded_queue.version'),
dependencies = {
'cartridge.roles.vshard-router',
},
}