Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions google-cloud-pubsub/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ end
gem "autotest-suffix", "~> 1.1"
gem "avro", "~> 1.12"
gem "bigdecimal", ">= 3.2", "< 5"
gem "google-style", "~> 1.31.1"
gem "minitest", "~> 5.25"
gem "google-style", "~> 1.32.0"
gem "irb", "~> 1.17"
gem "minitest", "~> 5.20"
gem "minitest-autotest", "~> 1.1"
gem "minitest-focus", "~> 1.4"
gem "minitest-mock", "~> 5.27"
gem "minitest-reporters", "~> 1.7.0", require: false
gem "minitest-rg", "~> 5.3"
gem "ostruct", "~> 0.6"
Expand Down
2 changes: 1 addition & 1 deletion google-cloud-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ If the logger's `progname` is not set to `"pubsub"`, these debug logs will be su

## Supported Ruby Versions

This library is supported on Ruby 3.1+.
This library is supported on Ruby 3.2+.

Google provides official support for Ruby versions that are actively supported
by Ruby Core—that is, Ruby versions that are either in normal maintenance or in
Expand Down
24 changes: 19 additions & 5 deletions google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class MessageListener
attr_reader :callback_threads
attr_reader :push_threads

attr_reader :shutdown_behavior
attr_reader :shutdown_timeout

##
# @private Implementation attributes.
attr_reader :stream_pool, :thread_pool, :buffer, :service
Expand All @@ -83,7 +86,7 @@ class MessageListener
##
# @private Create an empty {MessageListener} object.
def initialize subscription_name, callback, deadline: nil, message_ordering: nil, streams: nil, inventory: nil,
threads: {}, service: nil
threads: {}, shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, service: nil
super() # to init MonitorMixin

@callback = callback
Expand All @@ -95,6 +98,8 @@ def initialize subscription_name, callback, deadline: nil, message_ordering: nil
@message_ordering = message_ordering
@callback_threads = Integer(threads[:callback] || 8)
@push_threads = Integer(threads[:push] || 4)
@shutdown_behavior = shutdown_behavior || :wait_for_processing
@shutdown_timeout = shutdown_timeout
@exactly_once_delivery_enabled = nil

@service = service
Expand Down Expand Up @@ -140,11 +145,18 @@ def start
#
# @return [MessageListener] returns self so calls can be chained.
#
def stop
def stop shutdown_behavior: nil, shutdown_timeout: nil
shutdown_behavior ||= @shutdown_behavior
shutdown_timeout ||= @shutdown_timeout

unless [:wait_for_processing, :nack_immediately].include? shutdown_behavior
raise ArgumentError, "Invalid shutdown_behavior: #{shutdown_behavior}"
end

synchronize do
@started = false
@stopped = true
@stream_pool.map(&:stop)
@stream_pool.each { |s| s.stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout }
wait_stop_buffer_thread!
self
end
Expand Down Expand Up @@ -182,8 +194,10 @@ def wait! timeout = nil
#
# @return [MessageListener] returns self so calls can be chained.
#
def stop! timeout = nil
stop
def stop! timeout = nil, shutdown_behavior: nil, shutdown_timeout: nil
shutdown_behavior ||= @shutdown_behavior
shutdown_timeout ||= timeout || @shutdown_timeout
stop shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout
wait! timeout
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ def empty?
end
end

def wait_until_empty timeout = nil
synchronize do
if timeout
target_time = Time.now + timeout
while !@inventory.empty?
remaining = target_time - Time.now
break if remaining <= 0
@wait_cond.wait remaining
end
else
@wait_cond.wait_while { !@inventory.empty? }
end
end
end

def start
@background_thread ||= Thread.new { background_run }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def initialize subscriber
@subscriber = subscriber

@request_queue = nil
@stopped = nil
@streaming_stopped = nil
@fully_stopped = nil
@paused = nil
@pause_cond = new_cond
@exactly_once_delivery_enabled = false
Expand Down Expand Up @@ -93,9 +94,9 @@ def start
self
end

def stop
def stop shutdown_behavior: :wait_for_processing, shutdown_timeout: nil
synchronize do
break if @stopped
break if @streaming_stopped

subscriber.service.logger.log :info, "subscriber-streams" do
"stopping stream for subscription #{@subscriber.subscription_name}"
Expand All @@ -105,23 +106,60 @@ def stop
@request_queue&.push self

# Signal to the background thread that we are stopped.
@stopped = true
@streaming_stopped = true
@pause_cond.broadcast

# Now that the reception thread is stopped, immediately stop the
# callback thread pool. All queued callbacks will see the stream
# is stopped and perform a noop.
@callback_thread_pool.shutdown

# Once all the callbacks are stopped, we can stop the inventory.
@inventory.stop
if shutdown_behavior == :nack_immediately
nack_unprocessed_messages!
@fully_stopped = true
@callback_thread_pool.shutdown
@inventory.stop
else
# :wait_for_processing
@shutdown_thread = Thread.new do
if shutdown_timeout
wait_time = [shutdown_timeout - 30, 0].max
@inventory.wait_until_empty wait_time
if !@inventory.empty?
nack_unprocessed_messages!
end
else
@inventory.wait_until_empty nil
end
synchronize do
@fully_stopped = true
@callback_thread_pool.shutdown
@inventory.stop
end
end
end
end

self
end

def nack_unprocessed_messages!
synchronize do
ack_ids = @inventory.ack_ids
unless ack_ids.empty?
begin
subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0
rescue StandardError => e
subscriber.service.logger.log :error, "subscriber-streams" do
"Failed to nack unprocessed messages: #{e.message}"
end
end
@inventory.remove(*ack_ids)
end
end
end

def stopped?
synchronize { @stopped }
synchronize { @streaming_stopped }
end

def fully_stopped?
synchronize { @fully_stopped }
end

def paused?
Expand All @@ -133,6 +171,7 @@ def running?
end

def wait! timeout = nil
@shutdown_thread&.join timeout
# Wait for all queued callbacks to be processed.
@callback_thread_pool.wait_for_termination timeout

Expand Down Expand Up @@ -222,15 +261,15 @@ class RestartStream < StandardError; end
def background_run
synchronize do
# Don't allow a stream to restart if already stopped
if @stopped
if @streaming_stopped
subscriber.service.logger.log :debug, "subscriber-streams" do
"not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \
" stopped"
end
return
end

@stopped = false
@streaming_stopped = false
@paused = false

# signal to the previous queue to shut down
Expand All @@ -252,14 +291,14 @@ def background_run

loop do
synchronize do
if @paused && !@stopped
if @paused && !@streaming_stopped
@pause_cond.wait
next
end
end

# Break loop, close thread if stopped
break if synchronize { @stopped }
break if synchronize { @streaming_stopped }

begin
# Cannot synchronize the enumerator, causes deadlock
Expand Down Expand Up @@ -297,7 +336,7 @@ def background_run

# Has the loop broken but we aren't stopped?
# Could be GRPC has thrown an internal error, so restart.
raise RestartStream unless synchronize { @stopped }
raise RestartStream unless synchronize { @streaming_stopped }

# We must be stopped, tell the stream to quit.
stop
Expand Down Expand Up @@ -372,7 +411,7 @@ def perform_callback_sync rec_msg
subscriber.service.logger.log :info, "callback-delivery" do
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks"
end
@subscriber.callback.call rec_msg unless stopped?
@subscriber.callback.call rec_msg unless fully_stopped?
rescue StandardError => e
subscriber.service.logger.log :info, "callback-exceptions" do
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \
Expand Down
11 changes: 9 additions & 2 deletions google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ def wait_for_messages max: 100
# messages ({ReceivedMessage#nack!},
# {ReceivedMessage#modify_ack_deadline!}). Default is 4.
#
# @param [Symbol] shutdown_behavior Defines how active messages are treated during a stop. Use `:wait_for_processing`
# to wait for tasks to finish, or `:nack_immediately` to skip waiting and drop them. Default is `:wait_for_processing`.
# @param [Integer] shutdown_timeout Specifies precisely how long the wind-down state holds. Defaults to Nil. Optional.
#
# @yield [received_message] a block for processing new messages
# @yieldparam [ReceivedMessage] received_message the newly received
# message
Expand Down Expand Up @@ -408,13 +412,16 @@ def wait_for_messages max: 100
# # Shut down the subscriber when ready to stop receiving messages.
# listener.stop!
#
def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block
def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {},
shutdown_behavior: :wait_for_processing, shutdown_timeout: nil, &block
ensure_service!
deadline ||= self.deadline
message_ordering = message_ordering? if message_ordering.nil?

MessageListener.new name, block, deadline: deadline, streams: streams, inventory: inventory,
message_ordering: message_ordering, threads: threads, service: service
message_ordering: message_ordering, threads: threads,
shutdown_behavior: shutdown_behavior, shutdown_timeout: shutdown_timeout,
service: service
end

##
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,55 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
listener.stop
listener.wait!
end

it "should nack unprocessed messages when stopped with nack_immediately" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
response_groups = [[pull_res1]]

stub = StreamingPullStub.new response_groups

subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
sleep 0.5
end

listener.start

# Wait for message to be pulled and added to inventory
sleep 0.1

listener.stop shutdown_behavior: :nack_immediately
listener.wait!

# Verifies that exactly one 0-second ModifyAckDeadline (NACK) was dispatched.
assert_equal 1, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 }
end

it "should wait for processing when stopped with wait_for_processing" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
response_groups = [[pull_res1]]

stub = StreamingPullStub.new response_groups
called = false

subscriber.service.mocked_subscription_admin = stub

listener = subscriber.listen streams: 1 do |msg|
sleep 0.5
called = true
end

listener.start

# Wait for message to be pulled and added to inventory
sleep 0.1

listener.stop shutdown_behavior: :wait_for_processing
listener.wait!

assert called
# Confirms that NO 0-second ModifyAckDeadline (NACK) interventions were sent.
assert_equal 0, stub.modify_ack_deadline_requests.count { |req| req[2] == 0 }
end
end
1 change: 1 addition & 0 deletions google-cloud-pubsub/test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
gem "minitest"
require "minitest/autorun"
require "minitest/focus"
require "minitest/mock"
require "minitest/rg"
require "ostruct"
require "json"
Expand Down
Loading