diff --git a/google-cloud-pubsub/Gemfile b/google-cloud-pubsub/Gemfile index f90528b3cf90..90231329e079 100644 --- a/google-cloud-pubsub/Gemfile +++ b/google-cloud-pubsub/Gemfile @@ -22,10 +22,12 @@ end gem "autotest-suffix", "~> 1.1" gem "avro", "~> 1.12" gem "bigdecimal", ">= 3.2", "< 5" -gem "google-style", "~> 1.31.1" -gem "minitest", "~> 5.25" +gem "google-style", "~> 1.32.0" +gem "irb", "~> 1.17" +gem "minitest", "~> 5.20" gem "minitest-autotest", "~> 1.1" gem "minitest-focus", "~> 1.4" +gem "minitest-mock", "~> 5.27" gem "minitest-reporters", "~> 1.7.0", require: false gem "minitest-rg", "~> 5.3" gem "ostruct", "~> 0.6" diff --git a/google-cloud-pubsub/README.md b/google-cloud-pubsub/README.md index 8bf14a5864a7..e7217eb456f7 100644 --- a/google-cloud-pubsub/README.md +++ b/google-cloud-pubsub/README.md @@ -131,7 +131,7 @@ If the logger's `progname` is not set to `"pubsub"`, these debug logs will be su ## Supported Ruby Versions -This library is supported on Ruby 3.1+. +This library is supported on Ruby 3.2+. Google provides official support for Ruby versions that are actively supported by Ruby Core—that is, Ruby versions that are either in normal maintenance or in diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb index 814ed067ee7d..93974b217af3 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb @@ -72,6 +72,9 @@ class MessageListener attr_reader :callback_threads attr_reader :push_threads + attr_reader :shutdown_behavior + attr_reader :shutdown_timeout + ## # @private Implementation attributes. attr_reader :stream_pool, :thread_pool, :buffer, :service @@ -83,7 +86,7 @@ class MessageListener ## # @private Create an empty {MessageListener} object. def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil, - threads: {}, service: nil + threads: {}, shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, service: nil super() # to init MonitorMixin @callback = callback @@ -95,6 +98,8 @@ def initialize subscription_name, callback, deadline: nil, message_ordering: nil @message_ordering = message_ordering @callback_threads = Integer(threads[:callback] || 8) @push_threads = Integer(threads[:push] || 4) + @shutdown_behavior = shutdown_behavior || :wait_for_processing + @shutdown_timeout = shutdown_timeout @exactly_once_delivery_enabled = nil @service = service @@ -140,11 +145,18 @@ def start # # @return [MessageListener] returns self so calls can be chained. # - def stop + def stop shutdown_behavior: nil, shutdown_timeout: nil + shutdown_behavior ||= @shutdown_behavior + shutdown_timeout ||= @shutdown_timeout + + unless [:wait_for_processing, :nack_immediately].include? shutdown_behavior + raise ArgumentError, "Invalid shutdown_behavior: #{shutdown_behavior}" + end + synchronize do @started = false @stopped = true - @stream_pool.map(&:stop) + @stream_pool.each { |s| s.stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout } wait_stop_buffer_thread! self end @@ -182,8 +194,10 @@ def wait! timeout = nil # # @return [MessageListener] returns self so calls can be chained. # - def stop! timeout = nil - stop + def stop! timeout = nil, shutdown_behavior: nil, shutdown_timeout: nil + shutdown_behavior ||= @shutdown_behavior + shutdown_timeout ||= timeout || @shutdown_timeout + stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout wait! timeout end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index 5dca89f5665e..0dffac30f2ce 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -109,6 +109,21 @@ def empty? end end + def wait_until_empty timeout = nil + synchronize do + if timeout + target_time = Time.now + timeout + while !@inventory.empty? + remaining = target_time - Time.now + break if remaining <= 0 + @wait_cond.wait remaining + end + else + @wait_cond.wait_while { !@inventory.empty? } + end + end + end + def start @background_thread ||= Thread.new { background_run } diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index fa563dd17f57..be65deb1fe0a 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -57,7 +57,8 @@ def initialize subscriber @subscriber = subscriber @request_queue = nil - @stopped = nil + @streaming_stopped = nil + @fully_stopped = nil @paused = nil @pause_cond = new_cond @exactly_once_delivery_enabled = false @@ -93,9 +94,9 @@ def start self end - def stop + def stop shutdown_behavior: :wait_for_processing, shutdown_timeout: nil synchronize do - break if @stopped + break if @streaming_stopped subscriber.service.logger.log :info, "subscriber-streams" do "stopping stream for subscription #{@subscriber.subscription_name}" @@ -105,23 +106,60 @@ def stop @request_queue&.push self # Signal to the background thread that we are stopped. - @stopped = true + @streaming_stopped = true @pause_cond.broadcast - # Now that the reception thread is stopped, immediately stop the - # callback thread pool. All queued callbacks will see the stream - # is stopped and perform a noop. - @callback_thread_pool.shutdown - - # Once all the callbacks are stopped, we can stop the inventory. - @inventory.stop + if shutdown_behavior == :nack_immediately + nack_unprocessed_messages! + @fully_stopped = true + @callback_thread_pool.shutdown + @inventory.stop + else + # :wait_for_processing + @shutdown_thread = Thread.new do + if shutdown_timeout + wait_time = [shutdown_timeout - 30, 0].max + @inventory.wait_until_empty wait_time + if !@inventory.empty? + nack_unprocessed_messages! + end + else + @inventory.wait_until_empty nil + end + synchronize do + @fully_stopped = true + @callback_thread_pool.shutdown + @inventory.stop + end + end + end end self end + def nack_unprocessed_messages! + synchronize do + ack_ids = @inventory.ack_ids + unless ack_ids.empty? + begin + subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0 + rescue StandardError => e + subscriber.service.logger.log :error, "subscriber-streams" do + "Failed to nack unprocessed messages: #{e.message}" + end + end + @inventory.remove(*ack_ids) + end + end + end + def stopped? - synchronize { @stopped } + synchronize { @streaming_stopped } + end + + def fully_stopped? + synchronize { @fully_stopped } end def paused? @@ -133,6 +171,7 @@ def running? end def wait! timeout = nil + @shutdown_thread&.join timeout # Wait for all queued callbacks to be processed. @callback_thread_pool.wait_for_termination timeout @@ -222,7 +261,7 @@ class RestartStream < StandardError; end def background_run synchronize do # Don't allow a stream to restart if already stopped - if @stopped + if @streaming_stopped subscriber.service.logger.log :debug, "subscriber-streams" do "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ " stopped" @@ -230,7 +269,7 @@ def background_run return end - @stopped = false + @streaming_stopped = false @paused = false # signal to the previous queue to shut down @@ -252,14 +291,14 @@ def background_run loop do synchronize do - if @paused && !@stopped + if @paused && !@streaming_stopped @pause_cond.wait next end end # Break loop, close thread if stopped - break if synchronize { @stopped } + break if synchronize { @streaming_stopped } begin # Cannot synchronize the enumerator, causes deadlock @@ -297,7 +336,7 @@ def background_run # Has the loop broken but we aren't stopped? # Could be GRPC has thrown an internal error, so restart. - raise RestartStream unless synchronize { @stopped } + raise RestartStream unless synchronize { @streaming_stopped } # We must be stopped, tell the stream to quit. stop @@ -372,7 +411,7 @@ def perform_callback_sync rec_msg subscriber.service.logger.log :info, "callback-delivery" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" end - @subscriber.callback.call rec_msg unless stopped? + @subscriber.callback.call rec_msg unless fully_stopped? rescue StandardError => e subscriber.service.logger.log :info, "callback-exceptions" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb index 3abb7156f423..b5b8c8e656f7 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb @@ -325,6 +325,10 @@ def wait_for_messages max: 100 # messages ({ReceivedMessage#nack!}, # {ReceivedMessage#modify_ack_deadline!}). Default is 4. # + # @param [Symbol] shutdown_behavior Defines how active messages are treated during a stop. Use `:wait_for_processing` + # to wait for tasks to finish, or `:nack_immediately` to skip waiting and drop them. Default is `:wait_for_processing`. + # @param [Integer] shutdown_timeout Specifies precisely how long the wind-down state holds. Defaults to Nil. Optional. + # # @yield [received_message] a block for processing new messages # @yieldparam [ReceivedMessage] received_message the newly received # message @@ -408,13 +412,16 @@ def wait_for_messages max: 100 # # Shut down the subscriber when ready to stop receiving messages. # listener.stop! # - def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block + def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, + shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, &block ensure_service! deadline ||= self.deadline message_ordering = message_ordering? if message_ordering.nil? MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory, - message_ordering: message_ordering, threads: threads, service: service + message_ordering: message_ordering, threads: threads, + shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout, + service: service end ## diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb index 5f457b90405c..675d43a49094 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb @@ -217,4 +217,55 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds: listener.stop listener.wait! end + + it "should nack unprocessed messages when stopped with nack_immediately" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + response_groups = [[pull_res1]] + + stub = StreamingPullStub.new response_groups + + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + sleep 0.5 + end + + listener.start + + # Wait for message to be pulled and added to inventory + sleep 0.1 + + listener.stop shutdown_behavior: :nack_immediately + listener.wait! + + # Verifies that exactly one 0-second ModifyAckDeadline (NACK) was dispatched. + assert_equal 1, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 } + end + + it "should wait for processing when stopped with wait_for_processing" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + response_groups = [[pull_res1]] + + stub = StreamingPullStub.new response_groups + called = false + + subscriber.service.mocked_subscription_admin = stub + + listener = subscriber.listen streams: 1 do |msg| + sleep 0.5 + called = true + end + + listener.start + + # Wait for message to be pulled and added to inventory + sleep 0.1 + + listener.stop shutdown_behavior: :wait_for_processing + listener.wait! + + assert called + # Confirms that NO 0-second ModifyAckDeadline (NACK) interventions were sent. + assert_equal 0, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 } + end end diff --git a/google-cloud-pubsub/test/helper.rb b/google-cloud-pubsub/test/helper.rb index 39275e46b58d..f1f9fb76017f 100644 --- a/google-cloud-pubsub/test/helper.rb +++ b/google-cloud-pubsub/test/helper.rb @@ -17,6 +17,7 @@ gem "minitest" require "minitest/autorun" require "minitest/focus" +require "minitest/mock" require "minitest/rg" require "ostruct" require "json"