Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 157 additions & 66 deletions google/cloud/storage/internal/async/writer_connection_resumed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class AsyncWriterConnectionResumedState
buffer_size_lwm_(buffer_size_lwm),
buffer_size_hwm_(buffer_size_hwm) {
finalized_future_ = finalized_.get_future();
closed_future_ = closed_.get_future();
options_ = internal::MakeImmutableOptions(options);
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
Expand Down Expand Up @@ -145,6 +146,19 @@ class AsyncWriterConnectionResumedState
return f;
}

future<Status> Close(storage::WritePayload const& p) {
std::unique_lock<std::mutex> lk(mu_);
if (close_ || closed_promise_completed_) {
return make_ready_future(internal::FailedPreconditionError(
"Close() already called", GCP_ERROR_INFO()));
}
resend_buffer_.Append(WritePayloadImpl::GetImpl(p));
close_ = true;
// Force flush to drain the buffer first.
HandleNewData(std::move(lk), true);
return std::move(closed_future_);
}

future<StatusOr<std::int64_t>> Query() {
return Impl(std::unique_lock<std::mutex>(mu_))->Query();
}
Expand Down Expand Up @@ -219,6 +233,10 @@ class AsyncWriterConnectionResumedState
// FinalizeStep will set the finalizing_ flag.
return FinalizeStep(std::move(lk));
}
if (close_ && !closing_) {
// CloseStep will set the closing_ flag.
return CloseStep(std::move(lk));
}
// If not finalizing, check if an empty flush is needed.
if (flush_) {
state_ = State::kWriting;
Expand Down Expand Up @@ -254,6 +272,25 @@ class AsyncWriterConnectionResumedState
SetFinalized(std::unique_lock<std::mutex>(mu_), std::move(result));
}

void CloseStep(std::unique_lock<std::mutex> lk) {
if (closing_ || state_ != State::kIdle) {
return;
}
state_ = State::kWriting;
closing_ = true;
auto impl = Impl(lk);
lk.unlock();
(void)impl->Close(storage::WritePayload{})
.then([w = WeakFromThis()](auto f) {
if (auto self = w.lock()) return self->OnClose(f.get());
});
}

void OnClose(Status result) {
if (!result.ok()) return Resume(std::move(result));
SetClosed(std::unique_lock<std::mutex>(mu_), std::move(result));
}

void FlushStep(std::unique_lock<std::mutex> lk, absl::Cord payload) {
auto impl = Impl(lk);
lk.unlock();
Expand Down Expand Up @@ -385,93 +422,92 @@ class AsyncWriterConnectionResumedState
append_object_spec.set_generation(first_response_.resource().generation());
ApplyWriteRedirectErrors(append_object_spec, std::move(proto_status));

// Capture the finalization state *before* starting the async resume.
// Capture the finalization and close state *before* starting the async
// resume.
bool was_finalizing;
bool was_closing;
{
std::unique_lock<std::mutex> lk(mu_);
if (state_ == State::kResuming) return;
was_finalizing = finalizing_;
was_closing = closing_;
if (!s.ok() && cancelled_) {
return SetError(std::move(lk), std::move(s));
}
state_ = State::kResuming;
}
// Pass the original status `s` and `was_finalizing` to the callback.
// Pass the original status `s`, `was_finalizing`, and `was_closing` to the
// callback.
factory_(std::move(request))
.then([s, was_finalizing, w = WeakFromThis()](auto f) {
.then([s, was_finalizing, was_closing, w = WeakFromThis()](auto f) {
if (auto self = w.lock())
return self->OnResume(s, was_finalizing, f.get());
return self->OnResume(s, was_finalizing, was_closing, f.get());
});
}

void OnResume(Status const& original_status, bool was_finalizing,
StatusOr<WriteObject::WriteResult> res) {
bool was_closing, StatusOr<WriteObject::WriteResult> res) {
std::unique_lock<std::mutex> lk(mu_);
// Update write_handle from any resume response that contains it.
if (res && res->first_response.has_write_handle()) {
latest_write_handle_ = res->first_response.write_handle();
}

if (was_finalizing) {
// If resuming due to a finalization error, we *must* complete the
// finalized_ promise now, based on the resume attempt's outcome.
if (!res) {
// The resume attempt itself failed. Use that error.
return SetError(std::move(lk), std::move(res).status());
}
// Resume attempt succeeded, check the persisted state.
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Resume found the object is finalized. Success.
return SetFinalized(
std::move(lk),
absl::get<google::storage::v2::Object>(std::move(state)));
}
// Resume succeeded, but the object is still not finalized.
// This means the original finalization attempt failed permanently.
// Use the original status that triggered the resume. Reset finalizing_
// before setting the error, as the attempt is now over.
finalizing_ = false;
return SetError(std::move(lk), std::move(original_status));
}

// Resume was *not* triggered by finalization failure.
if (!res) {
// Regular resume attempt failed.
// Resume attempt failed. SetError will complete everything.
return SetError(std::move(lk), std::move(res).status());
}
// Regular resume attempt succeeded. Check state.

// Resume attempt succeeded. Check if finalized.
std::int64_t persisted_offset = 0;
absl::optional<google::storage::v2::ObjectChecksums> checksums;

if (res->first_response.has_resource()) {
if (!res->first_response.has_write_handle()) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(std::move(lk),
std::move(*res->first_response.mutable_resource()));
}
auto const& resource = res->first_response.resource();
persisted_offset = resource.size();
if (resource.has_checksums()) {
checksums = resource.checksums();
bool finalized = false;
google::storage::v2::Object finalized_object;

auto const& first_res = res->first_response;
if (first_res.has_resource() && !first_res.has_write_handle()) {
finalized = true;
finalized_object = first_res.resource();
} else if (first_res.has_resource()) {
persisted_offset = first_res.resource().size();
if (first_res.resource().has_checksums()) {
checksums = first_res.resource().checksums();
}
} else if (res->first_response.has_persisted_size()) {
persisted_offset = res->first_response.persisted_size();
if (res->first_response.has_persisted_data_checksums()) {
checksums = res->first_response.persisted_data_checksums();
} else if (first_res.has_persisted_size()) {
persisted_offset = first_res.persisted_size();
if (first_res.has_persisted_data_checksums()) {
checksums = first_res.persisted_data_checksums();
}
} else {
auto state = impl_->PersistedState();
if (absl::holds_alternative<google::storage::v2::Object>(state)) {
// Found finalized object (maybe finalized concurrently or resumed).
return SetFinalized(
std::move(lk),
absl::get<google::storage::v2::Object>(std::move(state)));
finalized = true;
finalized_object =
absl::get<google::storage::v2::Object>(std::move(state));
} else {
persisted_offset = absl::get<std::int64_t>(state);
checksums = impl_->PersistedChecksums();
}
persisted_offset = absl::get<std::int64_t>(state);
checksums = impl_->PersistedChecksums();
}

if (finalized) {
if (was_closing) {
return SetClosed(std::move(lk), Status{});
}
return SetFinalized(std::move(lk), std::move(finalized_object));
}

// Handle failure if we expected finalization/closing but didn't get it.
if (was_finalizing) {
finalizing_ = false;
return SetError(std::move(lk), std::move(original_status));
}
if (was_closing) {
closing_ = false;
return SetError(std::move(lk), std::move(original_status));
}

// Recreate the underlying stream if still active.
auto hash = hash_function_;
if (checksums && checksums->has_crc32c()) {
hash = std::make_shared<
Expand Down Expand Up @@ -517,6 +553,30 @@ class AsyncWriterConnectionResumedState
p.set_value(std::move(object)); // Set value on the moved promise
}

void SetClosed(std::unique_lock<std::mutex> lk, Status status) {
resend_buffer_.Clear();
state_ = State::kIdle;
close_ = false;
closing_ = false;
flush_ = false;
// Check if the promise has already been completed.
if (closed_promise_completed_) {
lk.unlock(); // Release lock before returning
return;
}
// Mark the promise as completed before moving it.
closed_promise_completed_ = true;
auto handlers = ClearHandlers(lk);
// Also clear any pending flush promises on success.
auto pending_flushes = std::move(pending_flush_promises_);
auto p = std::move(closed_); // Move the member promise.
lk.unlock();
// Notify handlers and pending flushes after releasing the lock.
for (auto& h : handlers) h->Execute(status);
for (auto& pf : pending_flushes) pf.set_value(status);
p.set_value(std::move(status)); // Set value on the moved promise.
}

void SetFlushed(std::unique_lock<std::mutex> lk, Status const& result) {
if (!result.ok()) return SetError(std::move(lk), std::move(result));
flush_ = false; // Reset flush flag; WriteLoop may set it again.
Expand Down Expand Up @@ -546,27 +606,32 @@ class AsyncWriterConnectionResumedState
state_ = State::kIdle;
finalize_ = false;
finalizing_ = false; // Reset finalizing flag
close_ = false;
closing_ = false; // Reset closing flag
flush_ = false;

// Always clear handlers and pending flushes on error.
auto handlers = ClearHandlers(lk);
auto pending_flushes = std::move(pending_flush_promises_);

// Check if the finalized promise has already been completed.
if (finalized_promise_completed_) {
// Finalized promise already set, just notify handlers and pending
// flushes.
lk.unlock(); // Release lock before notifying
for (auto& h : handlers) h->Execute(status);
for (auto& pf : pending_flushes) pf.set_value(status);
return;
bool complete_finalized = false;
promise<StatusOr<google::storage::v2::Object>> finalized_to_complete;
if (!finalized_promise_completed_) {
finalized_promise_completed_ = true;
finalized_to_complete = std::move(finalized_);
complete_finalized = true;
}

// Check if the closed promise has already been completed.
bool complete_closed = false;
promise<Status> closed_to_complete;
if (!closed_promise_completed_) {
closed_promise_completed_ = true;
closed_to_complete = std::move(closed_);
complete_closed = true;
}

// Mark the finalized promise as completed *before* moving it under the
// lock.
finalized_promise_completed_ = true;
// Move the finalized promise.
auto p = std::move(finalized_);
lk.unlock(); // Release lock before notifying

// Notify handlers first.
Expand All @@ -575,8 +640,13 @@ class AsyncWriterConnectionResumedState
for (auto& pf : pending_flushes) {
pf.set_value(status);
}
// Set error on the moved finalized promise *once*.
p.set_value(status);
// Set error on the moved promises *once*.
if (complete_finalized) {
finalized_to_complete.set_value(status);
}
if (complete_closed) {
closed_to_complete.set_value(status);
}
}

std::shared_ptr<storage::AsyncWriterConnection> Impl(
Expand Down Expand Up @@ -630,6 +700,14 @@ class AsyncWriterConnectionResumedState
// finalized_.
future<StatusOr<google::storage::v2::Object>> finalized_future_;

// The result of calling `Close()`. Note that only one such call is ever
// made.
promise<Status> closed_;

// Retrieve the future in the constructor, as some operations reset
// closed_.
future<Status> closed_future_;

// Queue of promises for outstanding Flush() calls.
std::deque<promise<Status>> pending_flush_promises_;

Expand Down Expand Up @@ -682,6 +760,15 @@ class AsyncWriterConnectionResumedState
// Tracks if the final promise (`finalized_`) has been completed.
bool finalized_promise_completed_ = false;

// If true, all the data to close an upload is in `resend_buffer_`.
bool close_ = false;

// True if CloseStep has been initiated. Prevents re-entry.
bool closing_ = false;

// Tracks if the final promise (`closed_`) has been completed.
bool closed_promise_completed_ = false;

// Track the latest write handle seen in responses.
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
};
Expand Down Expand Up @@ -762,6 +849,10 @@ class AsyncWriterConnectionResumed : public storage::AsyncWriterConnection {
return state_->Flush(std::move(p));
}

future<Status> Close(storage::WritePayload p) override {
return state_->Close(std::move(p));
}

future<StatusOr<std::int64_t>> Query() override { return state_->Query(); }

RpcMetadata GetRequestMetadata() override {
Expand Down
Loading
Loading