diff --git a/CHANGELOG.md b/CHANGELOG.md index bdd18aa5..847f84d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +* Add `tnt_crud_storage_nil_bucket_id_compat_total` metric to track operations + performed without bucket_ref (legacy router compatibility mode). + +### Fixed +* Fixed storage compatibility with routers < 1.7.0 by handling nil bucket_id + in get, update, and delete operations. Now storage skips bucket referencing + and logs a rate-limited warning about reduced rebalance safety during rolling upgrades. + ## [1.7.4] - 12-02-26 ### Fixed diff --git a/README.md b/README.md index 55cb850f..abcae3a8 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,21 @@ router and storage instances. > > Requests to `vinyl` spaces always run in safe mode regardless of current safe mode status. +> [!IMPORTANT] +> +> **Rolling Upgrade Order (< 1.7.0 to >= 1.7.0)** +> +> To maintain rebalance safety guarantees, follow this order: +> +> 1. Upgrade routers first. +> 2. Upgrade storages afterwards. +> +> Old routers (< 1.7.0) do not send `bucket_id` to storages. If you upgrade storages first, +> new storages will operate in a compatibility mode with reduced rebalance safety for +> `get`, `update` and `delete` operations. You will see warnings and +> `tnt_crud_storage_nil_bucket_id_compat_total` metric increments until all routers are +> upgraded. + ### Sandbox The repository provide a simple sandbox application with a test dataset on a single instance. diff --git a/crud/common/compat_warn.lua b/crud/common/compat_warn.lua new file mode 100644 index 00000000..57616e84 --- /dev/null +++ b/crud/common/compat_warn.lua @@ -0,0 +1,29 @@ +local ratelimit = require('crud.ratelimit') +local rebalance = require('crud.common.rebalance') + +local WARN_INTERVAL_SECONDS = 60 +local WARN_BURST_COUNT = 1 + +local compat_rl = ratelimit.new({ + interval = WARN_INTERVAL_SECONDS, + burst = WARN_BURST_COUNT, +}) + +--- Logs a warning when bucket_id is missing (old router compatibility mode). +local function log_nil_bucket_id(operation, space_name, engine) + local msg = string.format( + "crud.%s_on_storage called without bucket_id. " .. + "Old router compatibility mode is active: bucket_ref is skipped, " .. + "rebalance safety is reduced. Please upgrade routers to restore full guarantees. " .. + "(space=%q engine=%q)", + operation, space_name, engine + ) + + compat_rl:log_warn(msg) + + rebalance.inc_nil_bucket_id_compat(operation, engine) +end + +return { + log_nil_bucket_id = log_nil_bucket_id, +} diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index 8d3e21d5..d84c311f 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -17,6 +17,7 @@ local rebalance = { _metrics = { safe_mode_enabled_gauge = nil, router_cache_clear_ts_gauge = nil, + nil_bucket_id_compat_counter = nil, }, } @@ -155,9 +156,25 @@ function rebalance.init_storage_metrics() 'tnt_crud_storage_safe_mode_enabled', "is safe mode enabled on this storage instance" ) + rebalance._metrics.nil_bucket_id_compat_counter = metrics.counter( + 'tnt_crud_storage_nil_bucket_id_compat_total', + "count of storage operations performed with skipped bucket_ref (legacy router compatibility mode)" + ) metrics.register_callback(rebalance._metrics.storage_callback) end +function rebalance.inc_nil_bucket_id_compat(operation, engine) + if not has_metrics_module or not rebalance._metrics.nil_bucket_id_compat_counter then + return + end + + rebalance._metrics.nil_bucket_id_compat_counter:inc(1, { + operation = operation, + engine = engine, + safe_mode = rebalance.safe_mode_status() and 'enabled' or 'disabled', + }) +end + function rebalance._metrics.router_callback() if not rebalance._metrics.router_cache_clear_ts_gauge then return end rebalance._metrics.router_cache_clear_ts_gauge:set(rebalance.router_cache_clear_ts) diff --git a/crud/delete.lua b/crud/delete.lua index a8d7f087..b4e79ce3 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -2,6 +2,7 @@ local checks = require('checks') local errors = require('errors') local call = require('crud.common.call') +local compat_warn = require('crud.common.compat_warn') local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') @@ -20,7 +21,7 @@ local CRUD_DELETE_FUNC_NAME = utils.get_storage_call(DELETE_FUNC_NAME) local function delete_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { - bucket_id = 'number|cdata', + bucket_id = '?number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -44,9 +45,16 @@ local function delete_on_storage(space_name, key, field_names, opts) return nil, err end - local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id, space.engine) - if not ref_ok then - return nil, bucket_ref_err + -- Skip bucket reference if bucket_id is not provided to support old routers. + local unref = nil + if opts.bucket_id ~= nil then + local ref_ok, bucket_ref_err, unref_func = bucket_ref_unref.bucket_refrw(opts.bucket_id, space.engine) + if not ref_ok then + return nil, bucket_ref_err + end + unref = unref_func + else + compat_warn.log_nil_bucket_id('delete', space_name, space.engine) end -- add_space_schema_hash is false because @@ -58,9 +66,11 @@ local function delete_on_storage(space_name, key, field_names, opts) fetch_latest_metadata = opts.fetch_latest_metadata, }, space, key) - local unref_ok, err_unref = unref(opts.bucket_id, space.engine) - if not unref_ok then - return nil, err_unref + if unref ~= nil then + local unref_ok, err_unref = unref(opts.bucket_id, space.engine) + if not unref_ok then + return nil, err_unref + end end return result diff --git a/crud/get.lua b/crud/get.lua index 760d8490..7a22971a 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -2,6 +2,7 @@ local checks = require('checks') local errors = require('errors') local call = require('crud.common.call') +local compat_warn = require('crud.common.compat_warn') local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') @@ -20,7 +21,7 @@ local CRUD_GET_FUNC_NAME = utils.get_storage_call(GET_FUNC_NAME) local function get_on_storage(space_name, key, field_names, opts) dev_checks('string', '?', '?table', { - bucket_id = 'number|cdata', + bucket_id = '?number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -43,9 +44,16 @@ local function get_on_storage(space_name, key, field_names, opts) return nil, err end - local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refro(opts.bucket_id, space.engine) - if not ref_ok then - return nil, bucket_ref_err + -- Skip bucket reference if bucket_id is not provided to support old routers. + local unref = nil + if opts.bucket_id ~= nil then + local ref_ok, bucket_ref_err, unref_func = bucket_ref_unref.bucket_refro(opts.bucket_id, space.engine) + if not ref_ok then + return nil, bucket_ref_err + end + unref = unref_func + else + compat_warn.log_nil_bucket_id('get', space_name, space.engine) end -- add_space_schema_hash is false because @@ -56,9 +64,11 @@ local function get_on_storage(space_name, key, field_names, opts) fetch_latest_metadata = opts.fetch_latest_metadata, }, space, key) - local unref_ok, err_unref = unref(opts.bucket_id, space.engine) - if not unref_ok then - return nil, err_unref + if unref ~= nil then + local unref_ok, err_unref = unref(opts.bucket_id, space.engine) + if not unref_ok then + return nil, err_unref + end end return result diff --git a/crud/ratelimit.lua b/crud/ratelimit.lua index e39f0a63..32db95c4 100644 --- a/crud/ratelimit.lua +++ b/crud/ratelimit.lua @@ -117,6 +117,7 @@ local function log_ratelimited_closure(lvl) end Ratelimit.log_crit = log_ratelimited_closure(S_CRIT) +Ratelimit.log_warn = log_ratelimited_closure(S_WARN) return { new = ratelimit_new, diff --git a/crud/update.lua b/crud/update.lua index 8f8b4dfd..a0381d28 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -2,6 +2,7 @@ local checks = require('checks') local errors = require('errors') local call = require('crud.common.call') +local compat_warn = require('crud.common.compat_warn') local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') @@ -20,7 +21,7 @@ local CRUD_UPDATE_FUNC_NAME = utils.get_storage_call(UPDATE_FUNC_NAME) local function update_on_storage(space_name, key, operations, field_names, opts) dev_checks('string', '?', 'table', '?table', { - bucket_id = 'number|cdata', + bucket_id = '?number|cdata', sharding_key_hash = '?number', sharding_func_hash = '?number', skip_sharding_hash_check = '?boolean', @@ -44,9 +45,16 @@ local function update_on_storage(space_name, key, operations, field_names, opts) return nil, err end - local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id, space.engine) - if not ref_ok then - return nil, bucket_ref_err + -- Skip bucket reference if bucket_id is not provided to support old routers. + local unref = nil + if opts.bucket_id ~= nil then + local ref_ok, bucket_ref_err, unref_func = bucket_ref_unref.bucket_refrw(opts.bucket_id, space.engine) + if not ref_ok then + return nil, bucket_ref_err + end + unref = unref_func + else + compat_warn.log_nil_bucket_id('update', space_name, space.engine) end -- add_space_schema_hash is false because @@ -73,9 +81,11 @@ local function update_on_storage(space_name, key, operations, field_names, opts) }, space, key, operations) end - local unref_ok, err_unref = unref(opts.bucket_id, space.engine) - if not unref_ok then - return nil, err_unref + if unref ~= nil then + local unref_ok, err_unref = unref(opts.bucket_id, space.engine) + if not unref_ok then + return nil, err_unref + end end return res, err diff --git a/test/integration/metrics_test.lua b/test/integration/metrics_test.lua index 7bc1d3ce..dae2715b 100644 --- a/test/integration/metrics_test.lua +++ b/test/integration/metrics_test.lua @@ -7,11 +7,15 @@ local pgroup = t.group('metrics_integration', helpers.backend_matrix({ })) local function before_all(g) + -- Disable checks to avoid 'fiber is not registered' error. + g.old_dev_checks_value = os.getenv('TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS') + helpers.disable_dev_checks() helpers.start_default_cluster(g, 'srv_stats') end local function after_all(g) helpers.stop_cluster(g.cluster, g.params.backend) + os.setenv('TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS', g.old_dev_checks_value) end local function before_each(g) @@ -141,3 +145,61 @@ pgroup.test_router_cache_metrics = function(g) end end) end + +--- Perform operations with nil bucket_id (emulating old router < 1.7.0). +pgroup.test_nil_bucket_id_compat_metric = function(g) + local has_metrics_module = pcall(require, 'metrics') + t.skip_if(not has_metrics_module, 'No metrics module in current version') + + local storage = g.cluster:server('s1-master') + + local common_opts = {bucket_id = nil, skip_sharding_hash_check = true} + local ops = { + { + name = 'get', + args = {'customers', 1, nil, common_opts} + }, + { + name = 'update', + args = {'customers', 1, {{'=', 'name', 'Test'}}, common_opts} + }, + { + name = 'delete', + args = {'customers', 1, common_opts} + }, + } + + for _, op in ipairs(ops) do + storage:call('_crud.' .. op.name .. '_on_storage', op.args) + end + + local observed = storage:eval([[ + return require('metrics').collect({ invoke_callbacks = true }) + ]]) + + local found_metrics = {} + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_nil_bucket_id_compat_total' then + table.insert(found_metrics, m) + end + end + + local function get_metric_value_by_label(metrics_list, op_name, engine_name) + for _, m in ipairs(metrics_list) do + if m.label_pairs.operation == op_name and m.label_pairs.engine == engine_name then + return m.value + end + end + return nil + end + + local engine = 'memtx' + + local get_val = get_metric_value_by_label(found_metrics, 'get', engine) + local update_val = get_metric_value_by_label(found_metrics, 'update', engine) + local delete_val = get_metric_value_by_label(found_metrics, 'delete', engine) + + t.assert_equals(get_val, 1) + t.assert_equals(update_val, 1) + t.assert_equals(delete_val, 1) +end diff --git a/test/integration/simple_operations_test.lua b/test/integration/simple_operations_test.lua index 4963a5c9..851d72e6 100644 --- a/test/integration/simple_operations_test.lua +++ b/test/integration/simple_operations_test.lua @@ -1931,3 +1931,47 @@ pgroup.test_get_invalid_bucket_id = function(g) t.assert_str_contains(err.err or err.str, expected_err) end end + +pgroup.before_test('test_storage_compat_nil_bucket_id', function(g) + local storage = g.cluster:server('s1-master') + + -- Disable yield checks to avoid 'fiber is not registered' error + storage:eval([[ + local yield_checks = require('crud.common.yield_checks') + rawset(_G, '_old_check_no_yields', yield_checks.check_no_yields) + yield_checks.check_no_yields = function() end + ]]) +end) + +pgroup.after_test('test_storage_compat_nil_bucket_id', function(g) + local storage = g.cluster:server('s1-master') + + storage:eval([[ + local yield_checks = require('crud.common.yield_checks') + yield_checks.check_no_yields = rawget(_G, '_old_check_no_yields') + rawset(_G, '_old_check_no_yields', nil) + ]]) +end) + +pgroup.test_storage_compat_nil_bucket_id = function(g) + local storage = g.cluster:server('s1-master') + + -- get_on_storage with nil bucket_id (emulating old router < 1.7.0) + local _, err = storage:call('_crud.get_on_storage', { + 'customers', 1, nil, {bucket_id = nil, skip_sharding_hash_check = true}, + }) + t.assert_equals(err, nil) + + -- update_on_storage with nil bucket_id + local _, err = storage:call('_crud.update_on_storage', { + 'customers', 1, {{'=', 'name', 'UpdatedUser'}}, nil, + {bucket_id = nil, skip_sharding_hash_check = true}, + }) + t.assert_equals(err, nil) + + -- delete_on_storage with nil bucket_id + local _, err = storage:call('_crud.delete_on_storage', { + 'customers', 1, nil, {bucket_id = nil, skip_sharding_hash_check = true}, + }) + t.assert_equals(err, nil) +end