diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 18bbb08f35e6d..6d96e523a25d8 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -77,6 +77,7 @@ class AsyncWriterConnectionResumedState buffer_size_lwm_(buffer_size_lwm), buffer_size_hwm_(buffer_size_hwm) { finalized_future_ = finalized_.get_future(); + closed_future_ = closed_.get_future(); options_ = internal::MakeImmutableOptions(options); auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { @@ -145,6 +146,19 @@ class AsyncWriterConnectionResumedState return f; } + future Close(storage::WritePayload const& p) { + std::unique_lock lk(mu_); + if (close_ || closed_promise_completed_) { + return make_ready_future(internal::FailedPreconditionError( + "Close() already called", GCP_ERROR_INFO())); + } + resend_buffer_.Append(WritePayloadImpl::GetImpl(p)); + close_ = true; + // Force flush to drain the buffer first. + HandleNewData(std::move(lk), true); + return std::move(closed_future_); + } + future> Query() { return Impl(std::unique_lock(mu_))->Query(); } @@ -219,6 +233,10 @@ class AsyncWriterConnectionResumedState // FinalizeStep will set the finalizing_ flag. return FinalizeStep(std::move(lk)); } + if (close_ && !closing_) { + // CloseStep will set the closing_ flag. + return CloseStep(std::move(lk)); + } // If not finalizing, check if an empty flush is needed. if (flush_) { state_ = State::kWriting; @@ -254,6 +272,25 @@ class AsyncWriterConnectionResumedState SetFinalized(std::unique_lock(mu_), std::move(result)); } + void CloseStep(std::unique_lock lk) { + if (closing_ || state_ != State::kIdle) { + return; + } + state_ = State::kWriting; + closing_ = true; + auto impl = Impl(lk); + lk.unlock(); + (void)impl->Close(storage::WritePayload{}) + .then([w = WeakFromThis()](auto f) { + if (auto self = w.lock()) return self->OnClose(f.get()); + }); + } + + void OnClose(Status result) { + if (!result.ok()) return Resume(std::move(result)); + SetClosed(std::unique_lock(mu_), std::move(result)); + } + void FlushStep(std::unique_lock lk, absl::Cord payload) { auto impl = Impl(lk); lk.unlock(); @@ -385,93 +422,92 @@ class AsyncWriterConnectionResumedState append_object_spec.set_generation(first_response_.resource().generation()); ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status)); - // Capture the finalization state *before* starting the async resume. + // Capture the finalization and close state *before* starting the async + // resume. bool was_finalizing; + bool was_closing; { std::unique_lock lk(mu_); if (state_ == State::kResuming) return; was_finalizing = finalizing_; + was_closing = closing_; if (!s.ok() && cancelled_) { return SetError(std::move(lk), std::move(s)); } state_ = State::kResuming; } - // Pass the original status `s` and `was_finalizing` to the callback. + // Pass the original status `s`, `was_finalizing`, and `was_closing` to the + // callback. factory_(std::move(request)) - .then([s, was_finalizing, w = WeakFromThis()](auto f) { + .then([s, was_finalizing, was_closing, w = WeakFromThis()](auto f) { if (auto self = w.lock()) - return self->OnResume(s, was_finalizing, f.get()); + return self->OnResume(s, was_finalizing, was_closing, f.get()); }); } void OnResume(Status const& original_status, bool was_finalizing, - StatusOr res) { + bool was_closing, StatusOr res) { std::unique_lock lk(mu_); // Update write_handle from any resume response that contains it. if (res && res->first_response.has_write_handle()) { latest_write_handle_ = res->first_response.write_handle(); } - if (was_finalizing) { - // If resuming due to a finalization error, we *must* complete the - // finalized_ promise now, based on the resume attempt's outcome. - if (!res) { - // The resume attempt itself failed. Use that error. - return SetError(std::move(lk), std::move(res).status()); - } - // Resume attempt succeeded, check the persisted state. - auto state = impl_->PersistedState(); - if (absl::holds_alternative(state)) { - // Resume found the object is finalized. Success. - return SetFinalized( - std::move(lk), - absl::get(std::move(state))); - } - // Resume succeeded, but the object is still not finalized. - // This means the original finalization attempt failed permanently. - // Use the original status that triggered the resume. Reset finalizing_ - // before setting the error, as the attempt is now over. - finalizing_ = false; - return SetError(std::move(lk), std::move(original_status)); - } - - // Resume was *not* triggered by finalization failure. if (!res) { - // Regular resume attempt failed. + // Resume attempt failed. SetError will complete everything. return SetError(std::move(lk), std::move(res).status()); } - // Regular resume attempt succeeded. Check state. + + // Resume attempt succeeded. Check if finalized. std::int64_t persisted_offset = 0; absl::optional checksums; - - if (res->first_response.has_resource()) { - if (!res->first_response.has_write_handle()) { - // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized(std::move(lk), - std::move(*res->first_response.mutable_resource())); - } - auto const& resource = res->first_response.resource(); - persisted_offset = resource.size(); - if (resource.has_checksums()) { - checksums = resource.checksums(); + bool finalized = false; + google::storage::v2::Object finalized_object; + + auto const& first_res = res->first_response; + if (first_res.has_resource() && !first_res.has_write_handle()) { + finalized = true; + finalized_object = first_res.resource(); + } else if (first_res.has_resource()) { + persisted_offset = first_res.resource().size(); + if (first_res.resource().has_checksums()) { + checksums = first_res.resource().checksums(); } - } else if (res->first_response.has_persisted_size()) { - persisted_offset = res->first_response.persisted_size(); - if (res->first_response.has_persisted_data_checksums()) { - checksums = res->first_response.persisted_data_checksums(); + } else if (first_res.has_persisted_size()) { + persisted_offset = first_res.persisted_size(); + if (first_res.has_persisted_data_checksums()) { + checksums = first_res.persisted_data_checksums(); } } else { auto state = impl_->PersistedState(); if (absl::holds_alternative(state)) { - // Found finalized object (maybe finalized concurrently or resumed). - return SetFinalized( - std::move(lk), - absl::get(std::move(state))); + finalized = true; + finalized_object = + absl::get(std::move(state)); + } else { + persisted_offset = absl::get(state); + checksums = impl_->PersistedChecksums(); } - persisted_offset = absl::get(state); - checksums = impl_->PersistedChecksums(); } + if (finalized) { + if (was_closing) { + return SetClosed(std::move(lk), Status{}); + } + return SetFinalized(std::move(lk), std::move(finalized_object)); + } + + // Handle failure if we expected finalization/closing but didn't get it. + if (was_finalizing) { + finalizing_ = false; + return SetError(std::move(lk), std::move(original_status)); + } + if (was_closing) { + closing_ = false; + return SetError(std::move(lk), std::move(original_status)); + } + + // Recreate the underlying stream if still active. auto hash = hash_function_; if (checksums && checksums->has_crc32c()) { hash = std::make_shared< @@ -517,6 +553,30 @@ class AsyncWriterConnectionResumedState p.set_value(std::move(object)); // Set value on the moved promise } + void SetClosed(std::unique_lock lk, Status status) { + resend_buffer_.Clear(); + state_ = State::kIdle; + close_ = false; + closing_ = false; + flush_ = false; + // Check if the promise has already been completed. + if (closed_promise_completed_) { + lk.unlock(); // Release lock before returning + return; + } + // Mark the promise as completed before moving it. + closed_promise_completed_ = true; + auto handlers = ClearHandlers(lk); + // Also clear any pending flush promises on success. + auto pending_flushes = std::move(pending_flush_promises_); + auto p = std::move(closed_); // Move the member promise. + lk.unlock(); + // Notify handlers and pending flushes after releasing the lock. + for (auto& h : handlers) h->Execute(status); + for (auto& pf : pending_flushes) pf.set_value(status); + p.set_value(std::move(status)); // Set value on the moved promise. + } + void SetFlushed(std::unique_lock lk, Status const& result) { if (!result.ok()) return SetError(std::move(lk), std::move(result)); flush_ = false; // Reset flush flag; WriteLoop may set it again. @@ -546,6 +606,8 @@ class AsyncWriterConnectionResumedState state_ = State::kIdle; finalize_ = false; finalizing_ = false; // Reset finalizing flag + close_ = false; + closing_ = false; // Reset closing flag flush_ = false; // Always clear handlers and pending flushes on error. @@ -553,20 +615,23 @@ class AsyncWriterConnectionResumedState auto pending_flushes = std::move(pending_flush_promises_); // Check if the finalized promise has already been completed. - if (finalized_promise_completed_) { - // Finalized promise already set, just notify handlers and pending - // flushes. - lk.unlock(); // Release lock before notifying - for (auto& h : handlers) h->Execute(status); - for (auto& pf : pending_flushes) pf.set_value(status); - return; + bool complete_finalized = false; + promise> finalized_to_complete; + if (!finalized_promise_completed_) { + finalized_promise_completed_ = true; + finalized_to_complete = std::move(finalized_); + complete_finalized = true; + } + + // Check if the closed promise has already been completed. + bool complete_closed = false; + promise closed_to_complete; + if (!closed_promise_completed_) { + closed_promise_completed_ = true; + closed_to_complete = std::move(closed_); + complete_closed = true; } - // Mark the finalized promise as completed *before* moving it under the - // lock. - finalized_promise_completed_ = true; - // Move the finalized promise. - auto p = std::move(finalized_); lk.unlock(); // Release lock before notifying // Notify handlers first. @@ -575,8 +640,13 @@ class AsyncWriterConnectionResumedState for (auto& pf : pending_flushes) { pf.set_value(status); } - // Set error on the moved finalized promise *once*. - p.set_value(status); + // Set error on the moved promises *once*. + if (complete_finalized) { + finalized_to_complete.set_value(status); + } + if (complete_closed) { + closed_to_complete.set_value(status); + } } std::shared_ptr Impl( @@ -630,6 +700,14 @@ class AsyncWriterConnectionResumedState // finalized_. future> finalized_future_; + // The result of calling `Close()`. Note that only one such call is ever + // made. + promise closed_; + + // Retrieve the future in the constructor, as some operations reset + // closed_. + future closed_future_; + // Queue of promises for outstanding Flush() calls. std::deque> pending_flush_promises_; @@ -682,6 +760,15 @@ class AsyncWriterConnectionResumedState // Tracks if the final promise (`finalized_`) has been completed. bool finalized_promise_completed_ = false; + // If true, all the data to close an upload is in `resend_buffer_`. + bool close_ = false; + + // True if CloseStep has been initiated. Prevents re-entry. + bool closing_ = false; + + // Tracks if the final promise (`closed_`) has been completed. + bool closed_promise_completed_ = false; + // Track the latest write handle seen in responses. absl::optional latest_write_handle_; }; @@ -762,6 +849,10 @@ class AsyncWriterConnectionResumed : public storage::AsyncWriterConnection { return state_->Flush(std::move(p)); } + future Close(storage::WritePayload p) override { + return state_->Close(std::move(p)); + } + future> Query() override { return state_->Query(); } RpcMetadata GetRequestMetadata() override { diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index 07543ed643638..e510ddad65e3c 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -32,6 +32,7 @@ namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +using ::google::cloud::storage::testing::canonical_errors::PermanentError; using ::google::cloud::storage::testing::canonical_errors::TransientError; using ::google::cloud::storage_mocks::MockAsyncWriterConnection; using ::google::cloud::testing_util::AsyncSequencer; @@ -39,6 +40,8 @@ using ::google::cloud::testing_util::IsOkAndHolds; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; using ::testing::_; +using ::testing::Eq; +using ::testing::ResultOf; using ::testing::Return; using ::testing::VariantWith; @@ -962,6 +965,239 @@ TEST(WriterConnectionResumed, ResumeUsesChecksumsFromFirstResponse) { EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk)); } +TEST(WriterConnectionResumed, CloseEmpty) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, UploadId).WillOnce(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillRepeatedly([&](auto) { + return sequencer.PushBack("Close").then([](auto f) -> Status { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + EXPECT_EQ(connection->UploadId(), "test-upload-id"); + EXPECT_THAT(connection->PersistedState(), VariantWith(0)); + + auto close = connection->Close({}); + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + EXPECT_STATUS_OK(close.get()); +} + +TEST(WriterConnectionResumed, DuplicateCloseFails) { + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([](auto) { + return make_ready_future(Status{}); + }); + + MockFactory mock_factory; + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + auto close1 = connection->Close({}); + auto close2 = connection->Close({}); + + EXPECT_STATUS_OK(close1.get()); + EXPECT_THAT(close2.get(), StatusIs(StatusCode::kFailedPrecondition)); +} + +TEST(WriterConnectionResumed, CloseWithPayload) { + AsyncSequencer sequencer; + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + auto expected_write_size = [](std::size_t n) { + return ResultOf( + "payload size", [](auto payload) { return payload.size(); }, Eq(n)); + }; + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + // The payload is flushed first. + EXPECT_CALL(*mock, Flush(expected_write_size(8 * 1024))).WillOnce([&](auto) { + return sequencer.PushBack("Flush").then([](auto) { return Status{}; }); + }); + EXPECT_CALL(*mock, Query).WillOnce([&]() { + return sequencer.PushBack("Query").then([](auto) { + return make_status_or(static_cast(8 * 1024)); + }); + }); + // Then the stream is closed with empty payload. + EXPECT_CALL(*mock, Close(expected_write_size(0))).WillOnce([&](auto) { + return sequencer.PushBack("Close").then([](auto) { return Status{}; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + auto close = connection->Close(TestPayload(8 * 1024)); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Query"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + EXPECT_STATUS_OK(close.get()); +} + +TEST(WriterConnectionResumed, CloseFailsAndResumeFails) { + AsyncSequencer sequencer; + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + auto resume_error = PermanentError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [](auto) { return TransientError(); }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&](auto) { + return sequencer.PushBack("Retry").then( + [&](auto) { return StatusOr(resume_error); }); + }); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Retry"); + next.first.set_value(true); + + EXPECT_THAT(close.get(), StatusIs(resume_error.code())); +} + +TEST(WriterConnectionResumed, CloseFailsAndResumeSucceedsButNotClosed) { + AsyncSequencer sequencer; + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + auto close_error = TransientError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [close_error](auto) { return close_error; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&](auto) { + return sequencer.PushBack("Resume").then([](auto) { + google::storage::v2::BidiWriteObjectResponse response; + response.set_persisted_size(0); + return StatusOr( + WriteObject::WriteResult{nullptr, response}); + }); + }); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Resume"); + next.first.set_value(true); + + EXPECT_THAT(close.get(), StatusIs(close_error.code())); +} + +TEST(WriterConnectionResumed, CloseFailsAndResumeSucceedsAndFinalized) { + AsyncSequencer sequencer; + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + auto close_error = TransientError(); + + auto mock = std::make_unique(); + EXPECT_CALL(*mock, UploadId).WillRepeatedly(Return("test-upload-id")); + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Close).WillOnce([&](auto) { + return sequencer.PushBack("Close").then( + [close_error](auto) { return close_error; }); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).WillOnce([&](auto) { + return sequencer.PushBack("Resume").then([](auto) { + google::storage::v2::BidiWriteObjectResponse response; + *response.mutable_resource() = TestObject(); + return StatusOr( + WriteObject::WriteResult{nullptr, response}); + }); + }); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + auto close = connection->Close({}); + ASSERT_FALSE(close.is_ready()); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Close"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Resume"); + next.first.set_value(true); + + EXPECT_STATUS_OK(close.get()); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal