diff --git a/redis/takeover_master.lua b/redis/takeover_master.lua new file mode 100644 index 0000000..54f7e46 --- /dev/null +++ b/redis/takeover_master.lua @@ -0,0 +1,50 @@ +-- Atomically attempt to take over as master when current master is dead during setup +-- Returns 1 if takeover succeeded, 0 otherwise + +local master_status_key = KEYS[1] +local master_worker_id_key = KEYS[2] +local master_setup_heartbeat_key = KEYS[3] + +local new_worker_id = ARGV[1] +local current_time = tonumber(ARGV[2]) +local heartbeat_timeout = tonumber(ARGV[3]) +local redis_ttl = tonumber(ARGV[4]) + +-- Step 1: Verify status is still 'setup' +local status = redis.call('get', master_status_key) +if status ~= 'setup' then + return 0 +end + +-- Step 2: Check if heartbeat is stale or missing +local last_heartbeat = redis.call('get', master_setup_heartbeat_key) +if last_heartbeat then + local heartbeat_age = current_time - tonumber(last_heartbeat) + if heartbeat_age < heartbeat_timeout then + -- Master is still alive, heartbeat is fresh + return 0 + end +end +-- If no heartbeat exists and status is 'setup', master may have died before first heartbeat +-- Allow takeover in this case (heartbeat_timeout acts as grace period for initial setup) + +-- Step 3: Delete old master-status to allow SETNX +redis.call('del', master_status_key) + +-- Step 4: Atomically try to claim master role +local claimed = redis.call('setnx', master_status_key, 'setup') +if claimed == 0 then + -- Another worker beat us to it + return 0 +end + +-- Step 5: We got master role - update worker ID and heartbeat +redis.call('set', master_worker_id_key, new_worker_id) +redis.call('set', master_setup_heartbeat_key, current_time) + +-- Set TTLs +redis.call('expire', master_status_key, redis_ttl) +redis.call('expire', master_worker_id_key, redis_ttl) +redis.call('expire', master_setup_heartbeat_key, redis_ttl) + +return 1 diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 9e9c133..200e889 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -15,6 +15,7 @@ class Configuration attr_accessor :timing_redis_url attr_accessor :write_duration_averages attr_accessor :heartbeat_grace_period, :heartbeat_interval + attr_accessor :master_setup_heartbeat_interval, :master_setup_heartbeat_timeout attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -66,7 +67,9 @@ def initialize( branch: nil, timing_redis_url: nil, heartbeat_grace_period: 30, - heartbeat_interval: 10 + heartbeat_interval: 10, + master_setup_heartbeat_interval: 5, + master_setup_heartbeat_timeout: 30 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -105,6 +108,8 @@ def initialize( @write_duration_averages = false @heartbeat_grace_period = heartbeat_grace_period @heartbeat_interval = heartbeat_interval + @master_setup_heartbeat_interval = master_setup_heartbeat_interval + @master_setup_heartbeat_timeout = master_setup_heartbeat_timeout end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 9ebdd58..27326ab 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -56,9 +56,27 @@ def progress def wait_for_master(timeout: 120) return true if master? + last_takeover_check = CI::Queue.time_now.to_f + (timeout * 10 + 1).to_i.times do return true if queue_initialized? + # Periodically check if master is dead and attempt takeover + current_time = CI::Queue.time_now.to_f + if current_time - last_takeover_check >= config.master_setup_heartbeat_interval + last_takeover_check = current_time + + if queue_initializing? && master_setup_heartbeat_stale? + if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover + # Takeover succeeded - run master setup + if respond_to?(:execute_master_setup, true) + execute_master_setup + return true + end + end + end + end + sleep 0.1 end raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting." @@ -97,6 +115,19 @@ def master_worker_id redis.get(key('master-worker-id')) end + # Check if the master setup heartbeat is stale (or missing) + # Returns true if heartbeat is older than master_setup_heartbeat_timeout + def master_setup_heartbeat_stale? + heartbeat = redis.get(key('master-setup-heartbeat')) + return true unless heartbeat # No heartbeat = stale (master may have died before first heartbeat) + + current_time = CI::Queue.time_now.to_f + heartbeat_age = current_time - heartbeat.to_f + heartbeat_age >= config.master_setup_heartbeat_timeout + rescue *CONNECTION_ERRORS + false # On connection error, don't attempt takeover + end + private attr_reader :redis, :redis_url diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 7de1f17..8e04eeb 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -2,6 +2,8 @@ require 'ci/queue/static' require 'concurrent/set' +require 'digest/sha2' +require 'set' module CI module Queue @@ -37,18 +39,10 @@ def populate(tests, random: Random.new) # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size + @random = random + @tests = tests - if acquire_master_role? - executables = reorder_tests(tests, random: random) - - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - - store_chunk_metadata(chunks) if chunks.any? - - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) - end + execute_master_setup if acquire_master_role? register_worker_presence @@ -294,6 +288,44 @@ def with_heartbeat(test_id) heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish end + # Runs a block while sending periodic heartbeats during master setup. + # This allows other workers to detect if the master dies during setup. + def with_master_setup_heartbeat + return yield unless config.master_setup_heartbeat_interval&.positive? + + # Send initial heartbeat immediately + send_master_setup_heartbeat + + stop_heartbeat = false + heartbeat_thread = Thread.new do + until stop_heartbeat + sleep(config.master_setup_heartbeat_interval) + break if stop_heartbeat + + begin + send_master_setup_heartbeat + rescue StandardError => e + warn("[master-setup-heartbeat] Failed to send heartbeat: #{e.message}") + end + end + end + + yield + ensure + stop_heartbeat = true + heartbeat_thread&.kill + heartbeat_thread&.join(1) + end + + # Send a heartbeat to indicate master is still alive during setup + def send_master_setup_heartbeat + current_time = CI::Queue.time_now.to_f + redis.set(key('master-setup-heartbeat'), current_time) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + rescue *CONNECTION_ERRORS => e + warn("[master-setup-heartbeat] Connection error: #{e.message}") + end + def ensure_connection_and_script(script) # Pre-initialize Redis connection and script in current thread context # This ensures background threads use the same initialized connection @@ -439,18 +471,38 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size + return unless @master - if @master - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') + # Use WATCH/MULTI for atomic check-and-push to prevent TOCTOU race. + # If master-worker-id changes between WATCH and MULTI, transaction aborts. + result = redis.watch(key('master-worker-id')) do |rd| + current_master = rd.get(key('master-worker-id')) - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) + if current_master && current_master != worker_id + # We're not the master anymore, unwatch and abort + rd.unwatch + :not_master + else + # We're still master, execute atomic transaction + rd.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end end end + + # result is nil if WATCH detected a change (race condition) + # result is :not_master if we detected we're not master + # result is an array of responses if transaction succeeded + if result.nil? || result == :not_master + warn "Worker #{worker_id} lost master role (race detected), aborting push" + @master = false + end rescue *CONNECTION_ERRORS raise if @master end @@ -467,9 +519,15 @@ def acquire_master_role? begin redis.set(key('master-worker-id'), worker_id) redis.expire(key('master-worker-id'), config.redis_ttl) + + # Set initial heartbeat immediately to prevent premature takeover + # This closes the window where status='setup' but no heartbeat exists + redis.set(key('master-setup-heartbeat'), CI::Queue.time_now.to_f) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + warn "Worker #{worker_id} elected as master" rescue *CONNECTION_ERRORS - # If setting master-worker-id fails, we still have master status + # If setting master-worker-id/heartbeat fails, we still have master status # Log but don't lose master role warn("Failed to set master-worker-id: #{$!.message}") end @@ -480,6 +538,43 @@ def acquire_master_role? false end + # Attempt to take over as master when current master appears dead during setup. + # Uses atomic Lua script to ensure only one worker can win the takeover. + # Returns true if takeover succeeded, false otherwise. + def attempt_master_takeover + return false if @master # Already master + + warn "Worker #{worker_id} attempting to takeover as master from #{master_worker_id}" + + current_time = CI::Queue.time_now.to_f + result = eval_script( + :takeover_master, + keys: [ + key('master-status'), + key('master-worker-id'), + key('master-setup-heartbeat') + ], + argv: [ + worker_id, + current_time, + config.master_setup_heartbeat_timeout, + config.redis_ttl + ] + ) + + if result == 1 + @master = true + warn "Worker #{worker_id} took over as master (previous master died during setup)" + true + else + warn "Failed to takeover as master. Current master is #{master_worker_id}" + false + end + rescue *CONNECTION_ERRORS => e + warn "[takeover] Connection error during takeover attempt: #{e.message}" + false + end + def register_worker_presence register redis.expire(key('workers'), config.redis_ttl) @@ -487,6 +582,23 @@ def register_worker_presence raise if master? end + # Shared logic for master setup - reorders tests, stores chunk metadata, and pushes to queue. + # Used by both initial master setup (populate) and takeover. + def execute_master_setup + return unless @master && @index + with_master_setup_heartbeat do + executables = reorder_tests(@tests, random: @random) + + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end + end + def store_chunk_metadata(chunks) # Batch operations to avoid exceeding Redis multi operation limits # Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively diff --git a/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb b/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb new file mode 100644 index 0000000..2cfea5c --- /dev/null +++ b/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb @@ -0,0 +1,395 @@ +# frozen_string_literal: true + +require 'test_helper' + +module CI + module Queue + module Redis + class MasterSetupHeartbeatTest < Minitest::Test + TEST_LIST = %w[ + ATest#test_foo + ATest#test_bar + BTest#test_foo + ].freeze + + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = ::Redis.new(url: @redis_url) + @redis.flushdb + end + + def teardown + @redis.flushdb + end + + # === Configuration Tests === + + def test_master_setup_heartbeat_interval_default + config = Configuration.new + assert_equal 5, config.master_setup_heartbeat_interval + end + + def test_master_setup_heartbeat_timeout_default + config = Configuration.new + assert_equal 30, config.master_setup_heartbeat_timeout + end + + def test_master_setup_heartbeat_interval_configurable + config = Configuration.new(master_setup_heartbeat_interval: 10) + assert_equal 10, config.master_setup_heartbeat_interval + end + + def test_master_setup_heartbeat_timeout_configurable + config = Configuration.new(master_setup_heartbeat_timeout: 60) + assert_equal 60, config.master_setup_heartbeat_timeout + end + + # === master_setup_heartbeat_stale? Tests === + + def test_master_setup_heartbeat_stale_when_no_heartbeat_exists + worker = create_worker(1) + assert worker.master_setup_heartbeat_stale?, 'Should be stale when no heartbeat exists' + end + + def test_master_setup_heartbeat_not_stale_when_fresh + # Set a fresh heartbeat + @redis.set('build:42:master-setup-heartbeat', CI::Queue.time_now.to_f) + + worker = create_worker(1, master_setup_heartbeat_timeout: 30) + refute worker.master_setup_heartbeat_stale?, 'Should not be stale when heartbeat is fresh' + end + + def test_master_setup_heartbeat_stale_when_old + # Set a stale heartbeat (older than timeout) + old_time = CI::Queue.time_now.to_f - 60 # 60 seconds ago + @redis.set('build:42:master-setup-heartbeat', old_time) + + worker = create_worker(1, master_setup_heartbeat_timeout: 30) + assert worker.master_setup_heartbeat_stale?, 'Should be stale when heartbeat is older than timeout' + end + + def test_master_setup_heartbeat_exactly_at_timeout + # Set heartbeat exactly at timeout boundary + timeout = 30 + boundary_time = CI::Queue.time_now.to_f - timeout + @redis.set('build:42:master-setup-heartbeat', boundary_time) + + worker = create_worker(1, master_setup_heartbeat_timeout: timeout) + assert worker.master_setup_heartbeat_stale?, 'Should be stale at exactly timeout' + end + + # === send_master_setup_heartbeat Tests === + + def test_send_master_setup_heartbeat_sets_key + worker = create_worker(1) + worker.send(:send_master_setup_heartbeat) + + heartbeat = @redis.get('build:42:master-setup-heartbeat') + refute_nil heartbeat, 'Heartbeat key should be set' + assert_in_delta CI::Queue.time_now.to_f, heartbeat.to_f, 1.0 + end + + def test_send_master_setup_heartbeat_sets_ttl + worker = create_worker(1) + worker.send(:send_master_setup_heartbeat) + + ttl = @redis.ttl('build:42:master-setup-heartbeat') + assert ttl > 0, 'Heartbeat should have TTL set' + end + + # === acquire_master_role? Tests === + + def test_acquire_master_role_sets_initial_heartbeat + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + heartbeat = @redis.get('build:42:master-setup-heartbeat') + refute_nil heartbeat, 'Initial heartbeat should be set after acquiring master' + end + + def test_acquire_master_role_sets_master_worker_id + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + master_id = @redis.get('build:42:master-worker-id') + assert_equal '1', master_id + end + + # === attempt_master_takeover Tests === + + def test_attempt_master_takeover_fails_when_already_master + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + refute worker.send(:attempt_master_takeover), 'Should not attempt takeover when already master' + end + + def test_attempt_master_takeover_fails_when_heartbeat_fresh + # Set up initial master + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + @redis.set('build:42:master-setup-heartbeat', CI::Queue.time_now.to_f) + + # Second worker tries to take over - should fail + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + refute worker2.send(:attempt_master_takeover), 'Should not takeover when heartbeat is fresh' + + # Original master should still be set + assert_equal '1', @redis.get('build:42:master-worker-id') + end + + def test_attempt_master_takeover_succeeds_when_heartbeat_stale + # Set up initial master with stale heartbeat + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Second worker tries to take over - should succeed + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + assert worker2.send(:attempt_master_takeover), 'Should takeover when heartbeat is stale' + + # Worker 2 should now be master + assert_equal '2', @redis.get('build:42:master-worker-id') + assert worker2.master?, 'Worker 2 should be master after takeover' + end + + def test_attempt_master_takeover_fails_when_status_not_setup + # Set up master that completed setup + @redis.set('build:42:master-status', 'ready') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Second worker tries to take over - should fail because status is 'ready' + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + refute worker2.send(:attempt_master_takeover), 'Should not takeover when status is not setup' + end + + def test_attempt_master_takeover_only_one_wins + # Set up initial master with stale heartbeat + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Multiple workers try to take over simultaneously + workers = (2..5).map { |i| create_worker(i, master_setup_heartbeat_timeout: 30) } + + results = workers.map { |w| w.send(:attempt_master_takeover) } + + # Exactly one should succeed + assert_equal 1, results.count(true), 'Exactly one worker should win takeover' + end + + # === with_master_setup_heartbeat Tests === + + def test_with_master_setup_heartbeat_sends_heartbeats + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + + heartbeat_times = [] + worker.send(:with_master_setup_heartbeat) do + 3.times do + sleep 0.15 + heartbeat = @redis.get('build:42:master-setup-heartbeat') + heartbeat_times << heartbeat.to_f if heartbeat + end + end + + # Should have received multiple heartbeats + assert heartbeat_times.size >= 2, "Should have at least 2 heartbeat readings, got #{heartbeat_times.size}" + # Heartbeats should be progressively newer + assert heartbeat_times == heartbeat_times.sort, 'Heartbeats should be progressively newer' + end + + def test_with_master_setup_heartbeat_stops_after_block + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + + worker.send(:with_master_setup_heartbeat) do + sleep 0.2 + end + + initial_heartbeat = @redis.get('build:42:master-setup-heartbeat').to_f + sleep 0.3 + final_heartbeat = @redis.get('build:42:master-setup-heartbeat').to_f + + # Heartbeat should not have updated after block completed + assert_equal initial_heartbeat, final_heartbeat, 'Heartbeat should stop updating after block' + end + + # === wait_for_master with Takeover Tests === + + def test_wait_for_master_triggers_takeover_when_master_dead + # Set up a dead master (status = setup, but stale heartbeat) + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Worker 2 waits for master - should trigger takeover + worker2 = create_worker( + 2, + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Populate will trigger wait_for_master for non-master workers + # We need to simulate this by having populate fail to acquire master + # but then wait_for_master should detect dead master and takeover + + # First, create the index without populating + worker2.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + worker2.instance_variable_set(:@total, tests.size) + worker2.instance_variable_set(:@random, Random.new(0)) + worker2.instance_variable_set(:@tests, tests) + worker2.instance_variable_set(:@master, false) + + # Now call wait_for_master - should detect dead master and take over + result = worker2.wait_for_master(timeout: 5) + + assert result, 'wait_for_master should return true after takeover' + assert worker2.master?, 'Worker 2 should be master after takeover' + assert_equal '2', @redis.get('build:42:master-worker-id') + end + + def test_wait_for_master_does_not_takeover_when_master_alive + # Master worker sets up and sends heartbeats + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Start master in background thread + master_thread = Thread.new do + worker1.populate(tests, random: Random.new(0)) + end + + # Worker 2 waits - should wait for master to finish, not take over + worker2 = create_worker( + 2, + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1, + populate: false + ) + + result = worker2.wait_for_master(timeout: 5) + master_thread.join + + assert result, 'wait_for_master should return true' + refute worker2.master?, 'Worker 2 should not be master' + assert worker1.master?, 'Worker 1 should be master' + end + + # === Push with WATCH/MULTI Tests === + + def test_push_aborts_when_master_id_changes + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up worker1 as master + worker1.instance_variable_set(:@master, true) + worker1.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + @redis.set('build:42:master-worker-id', '1') + + # Simulate race: another worker changed master-worker-id between WATCH and MULTI + # We can't directly test the race, but we can verify the behavior + # when master-worker-id doesn't match + + # Change master-worker-id before push + @redis.set('build:42:master-worker-id', '2') + + worker1.send(:push, TEST_LIST) + + # Worker1 should have detected the race and aborted + refute worker1.master?, 'Worker1 should lose master role after race detection' + end + + def test_push_succeeds_when_master_id_matches + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Actually populate to set up master correctly + worker1.populate(tests, random: Random.new(0)) + + assert worker1.master?, 'Worker1 should be master' + assert_equal 'ready', @redis.get('build:42:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:42:queue') + end + + # === execute_master_setup Tests === + + def test_execute_master_setup_runs_with_heartbeat + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up worker as master with required state + worker.instance_variable_set(:@master, true) + worker.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + worker.instance_variable_set(:@tests, tests) + worker.instance_variable_set(:@random, Random.new(0)) + @redis.set('build:42:master-worker-id', '1') + + worker.send(:execute_master_setup) + + # Queue should be populated + assert_equal 'ready', @redis.get('build:42:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:42:queue') + end + + def test_execute_master_setup_does_nothing_when_not_master + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + worker.instance_variable_set(:@master, false) + worker.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + + worker.send(:execute_master_setup) + + # Queue should not be populated + assert_nil @redis.get('build:42:master-status') + end + + private + + class MockTest + attr_reader :id + + def initialize(id) + @id = id + end + + def <=>(other) + id <=> other.id + end + + def flaky? + false + end + + def tests + [self] + end + end + + def create_worker(id, **options) + skip_populate = options.delete(:populate) == false + config_options = { + build_id: '42', + worker_id: id.to_s, + timeout: 0.2, + timing_redis_url: @redis_url, + master_setup_heartbeat_interval: options.delete(:master_setup_heartbeat_interval) || 5, + master_setup_heartbeat_timeout: options.delete(:master_setup_heartbeat_timeout) || 30 + }.merge(options) + + CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new(**config_options) + ) + end + end + end + end +end diff --git a/ruby/test/integration/master_setup_heartbeat_integration_test.rb b/ruby/test/integration/master_setup_heartbeat_integration_test.rb new file mode 100644 index 0000000..9e4d902 --- /dev/null +++ b/ruby/test/integration/master_setup_heartbeat_integration_test.rb @@ -0,0 +1,325 @@ +# frozen_string_literal: true + +require 'test_helper' +require 'timeout' + +module Integration + class MasterSetupHeartbeatIntegrationTest < Minitest::Test + TEST_LIST = %w[ + ATest#test_foo + ATest#test_bar + BTest#test_foo + BTest#test_bar + CTest#test_foo + ].freeze + + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = ::Redis.new(url: @redis_url) + @redis.flushdb + @threads = [] + end + + def teardown + @threads.each(&:kill) + @threads.each { |t| t.join(1) } + @redis.flushdb + end + + # === Scenario: Master dies during setup, follower takes over === + + def test_follower_takes_over_when_master_dies_during_setup + # Simulate master that starts setup but dies before completing + # by setting up the 'setup' state without completing it + + # Create a "dead" master state + @redis.set('build:100:master-status', 'setup') + @redis.set('build:100:master-worker-id', 'dead_master') + # Set a stale heartbeat (old enough to trigger takeover) + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:100:master-setup-heartbeat', old_time) + + # Start a follower worker that should detect dead master and take over + tests = TEST_LIST.map { |id| MockTest.new(id) } + follower = create_worker( + 1, + build_id: '100', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.2 + ) + + # Populate will trigger wait_for_master which should detect dead master + follower.populate(tests, random: Random.new(0)) + + # Follower should have taken over + assert follower.master?, 'Follower should become master after takeover' + assert_equal '1', @redis.get('build:100:master-worker-id') + assert_equal 'ready', @redis.get('build:100:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:100:queue') + end + + def test_multiple_followers_competing_for_takeover + # Simulate dead master + @redis.set('build:101:master-status', 'setup') + @redis.set('build:101:master-worker-id', 'dead_master') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:101:master-setup-heartbeat', old_time) + + tests = TEST_LIST.map { |id| MockTest.new(id) } + workers = [] + results = [] + mutex = Mutex.new + + # Start multiple followers simultaneously + 5.times do |i| + @threads << Thread.new do + worker = create_worker( + i + 1, + build_id: '101', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + begin + worker.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + mutex.synchronize do + workers << worker + results << worker.master? + end + rescue StandardError => e + mutex.synchronize { results << false } + end + end + end + + # Wait for all threads to complete + @threads.each { |t| t.join(10) } + + # Exactly one worker should be master + master_count = results.count(true) + assert_equal 1, master_count, "Expected exactly 1 master, got #{master_count}" + + # Queue should be properly set up + assert_equal 'ready', @redis.get('build:101:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:101:queue') + end + + def test_follower_waits_for_healthy_master_to_complete + tests = TEST_LIST.map { |id| MockTest.new(id) } + master_completed = false + follower_completed = false + + # Start master worker that takes time to complete setup + @threads << Thread.new do + master = create_worker( + 1, + build_id: '102', + master_setup_heartbeat_interval: 0.1 + ) + # Simulate slow setup by adding sleep in the middle + # The with_master_setup_heartbeat should keep heartbeat fresh + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + master_completed = true + end + + # Give master time to start setup + sleep 0.2 + + # Start follower - should wait for master, not take over + @threads << Thread.new do + follower = create_worker( + 2, + build_id: '102', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + follower_completed = true + refute follower.master?, 'Follower should not become master when master is healthy' + end + + # Wait for both to complete + @threads.each { |t| t.join(10) } + + assert master_completed, 'Master should have completed' + assert follower_completed, 'Follower should have completed' + + # Master should still be worker 1 + assert_equal '1', @redis.get('build:102:master-worker-id') + end + + def test_heartbeat_prevents_premature_takeover + # Test that a fresh heartbeat prevents takeover even with short timeout + + # First, a real master starts up and begins setup + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Create master and acquire role + master = create_worker( + 1, + build_id: '103', + master_setup_heartbeat_interval: 0.1 + ) + + # Manually acquire master role to control timing + master.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + master.instance_variable_set(:@total, tests.size) + master.instance_variable_set(:@tests, tests) + master.instance_variable_set(:@random, Random.new(0)) + + # Acquire master role - this sets up initial state + master.send(:acquire_master_role?) + assert master.master?, 'Worker 1 should be master' + + # Send a fresh heartbeat + master.send(:send_master_setup_heartbeat) + + # Now try to have another worker take over - should fail + worker2 = create_worker( + 2, + build_id: '103', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + + # Worker 2 should not be able to take over because heartbeat is fresh + refute worker2.send(:attempt_master_takeover), 'Should not takeover when heartbeat is fresh' + assert_equal '1', @redis.get('build:103:master-worker-id') + end + + def test_end_to_end_normal_operation_with_heartbeat + # Test that normal operation with heartbeat works correctly + tests = TEST_LIST.map { |id| MockTest.new(id) } + processed_tests = [] + mutex = Mutex.new + + # Start master + master = create_worker( + 1, + build_id: '104', + master_setup_heartbeat_interval: 0.2 + ) + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Start follower + follower = create_worker( + 2, + build_id: '104', + master_setup_heartbeat_interval: 0.2 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Process tests from both workers + [master, follower].each do |worker| + @threads << Thread.new do + worker.poll do |test| + mutex.synchronize { processed_tests << test.id } + worker.acknowledge(test) + end + end + end + + # Wait for processing to complete + @threads.each { |t| t.join(10) } + + # All tests should be processed + assert_equal TEST_LIST.sort, processed_tests.sort + assert master.exhausted? + assert follower.exhausted? + end + + def test_takeover_with_immediate_queue_initialization + # Test that after takeover, the new master properly initializes the queue + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up dead master state + @redis.set('build:105:master-status', 'setup') + @redis.set('build:105:master-worker-id', 'dead_master') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:105:master-setup-heartbeat', old_time) + + # New worker takes over + new_master = create_worker( + 1, + build_id: '105', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + new_master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Verify queue is fully functional + assert new_master.master? + assert new_master.queue_initialized? + refute new_master.exhausted? + assert_equal TEST_LIST.size, new_master.size + end + + def test_no_takeover_when_master_finishes_quickly + # Test that no takeover happens when master finishes before timeout + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Master completes setup quickly + master = create_worker( + 1, + build_id: '106', + master_setup_heartbeat_interval: 0.1 + ) + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Give it a moment + sleep 0.1 + + # Follower joins - should not attempt takeover + follower = create_worker( + 2, + build_id: '106', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Master should still be worker 1 + assert master.master? + refute follower.master? + assert_equal '1', @redis.get('build:106:master-worker-id') + end + + private + + class MockTest + attr_reader :id + + def initialize(id) + @id = id + end + + def <=>(other) + id <=> other.id + end + + def flaky? + false + end + + def tests + [self] + end + end + + def create_worker(id, **options) + build_id = options.delete(:build_id) || '42' + config_options = { + build_id: build_id, + worker_id: id.to_s, + timeout: 1, + timing_redis_url: @redis_url, + master_setup_heartbeat_interval: options.delete(:master_setup_heartbeat_interval) || 5, + master_setup_heartbeat_timeout: options.delete(:master_setup_heartbeat_timeout) || 30 + }.merge(options) + + CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new(**config_options) + ) + end + end +end