-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[net] Add SendPutReq to RCurlConnection for S3 uploads #22376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -552,6 +552,45 @@ void ReverseDisplacements(std::vector<std::size_t> &displacements, ROOT::Interna | |
| } | ||
| } | ||
|
|
||
| struct RUploadState { | ||
| const unsigned char *fData = nullptr; | ||
| std::size_t fLength = 0; | ||
| std::size_t fOffset = 0; | ||
| std::string fErrorMsg; ///< Set on failure, reported back to the caller via RStatus | ||
| }; | ||
|
|
||
| std::size_t CallbackPutRead(char *buffer, std::size_t size, std::size_t nmemb, void *userdata) | ||
| { | ||
| auto *state = static_cast<RUploadState *>(userdata); | ||
| assert(state->fOffset <= state->fLength); | ||
|
|
||
| std::size_t remaining = state->fLength - state->fOffset; | ||
|
JasMehta08 marked this conversation as resolved.
|
||
| if (remaining == 0) | ||
| return 0; | ||
|
|
||
| std::size_t requested = size * nmemb; | ||
| if (requested > remaining) { | ||
| state->fErrorMsg = "curl requested " + std::to_string(requested) + | ||
| " bytes but only " + std::to_string(remaining) + " remain"; | ||
| return CURL_READFUNC_ABORT; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can this return value be disambiguated with the same number of bytes? You should probably return something like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is up to libcurl: the function signature and the constant are prescribed by the library. I'd assume libcurl never asks for 256MiB or more in a single callback. We can assert so.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think it's pretty bad interface-wise to mix up valid and invalid return values especially if they are both positive and separated just by some unclear boundary. Since we're in control of the function signature I don't think it costs us much to clearly separate valid and invalid return values - unless there is some reason I'm not seeing that makes that undesirable. Unless you mean we assert that this function always returns valid values, in which case ok - we just never return an error.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we are not in control of the function signature. It's a libcurl callback. So I think asserting the conditions is all we can do.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add an assert(requested < CURL_READFUNC_ABORT) to verify curl never asks for ≥ 256 MiB in a single callback invocation, and remove the CURL_READFUNC_ABORT return path entirely. That way there's no unclearness.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah, that's what I missed :) |
||
| } | ||
|
|
||
| memcpy(buffer, state->fData + state->fOffset, requested); | ||
| state->fOffset += requested; | ||
| return requested; | ||
| } | ||
|
|
||
| int CallbackPutSeek(void *userdata, curl_off_t offset, int origin) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like above, add a brief doc comment |
||
| { | ||
| auto *state = static_cast<RUploadState *>(userdata); | ||
| if (origin != SEEK_SET) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a comment that explains that curl promises to only use |
||
| return CURL_SEEKFUNC_CANTSEEK; | ||
| if (offset < 0 || static_cast<std::size_t>(offset) > state->fLength) | ||
| return CURL_SEEKFUNC_FAIL; | ||
| state->fOffset = static_cast<std::size_t>(offset); | ||
| return CURL_SEEKFUNC_OK; | ||
| } | ||
|
|
||
| std::string GetCurlErrorString(CURLcode code) | ||
| { | ||
| return std::string(curl_easy_strerror(code)) + " (" + std::to_string(code) + ")"; | ||
|
|
@@ -660,6 +699,35 @@ void ROOT::Internal::RCurlConnection::SetOptions() | |
| R__ASSERT(rc == CURLE_OK); | ||
| } | ||
|
|
||
| void ROOT::Internal::RCurlConnection::ResetHandle() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add documentation |
||
| { | ||
| auto rc = curl_easy_setopt(fHandle, CURLOPT_NOBODY, 0L); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it really necessary to do this assertion for every line? If so, consider adding a helper function to do so. Also, instead of The error can then be propagated by the callers of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a hard assert is ok here. These calls should never fail (option setting). However, libcurl does provide a return code, so we better check... |
||
| rc = curl_easy_setopt(fHandle, CURLOPT_HTTPGET, 0L); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_UPLOAD, 0L); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_RANGE, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_READFUNCTION, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_READDATA, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_SEEKFUNCTION, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_SEEKDATA, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(-1)); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
| #ifndef HAS_CURL_EASY_HEADER | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HEADERFUNCTION, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HEADERDATA, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| #endif | ||
| } | ||
|
|
||
| ROOT::RResult<void> ROOT::Internal::RCurlConnection::SetUrl(const std::string &url) | ||
| { | ||
| CURLU *cu = curl_url(); | ||
|
|
@@ -731,17 +799,9 @@ ROOT::Internal::RCurlConnection::RStatus ROOT::Internal::RCurlConnection::SendHe | |
| { | ||
| remoteSize = kUnknownSize; | ||
|
|
||
| ResetHandle(); | ||
| auto rc = curl_easy_setopt(fHandle, CURLOPT_NOBODY, 1); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_RANGE, NULL); // may have been set by a previous SendRangesReq() on the object | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
| #ifndef HAS_CURL_EASY_HEADER | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HEADERFUNCTION, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HEADERDATA, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| #endif | ||
|
|
||
| RStatus status; | ||
| Perform(status); | ||
|
|
@@ -778,6 +838,7 @@ ROOT::Internal::RCurlConnection::SendRangesReq(std::size_t N, RUserRange *ranges | |
| return RStatus(RStatus::kSuccess); | ||
| } | ||
|
|
||
| ResetHandle(); | ||
| auto rc = curl_easy_setopt(fHandle, CURLOPT_HTTPGET, 1); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
|
|
@@ -849,6 +910,58 @@ ROOT::Internal::RCurlConnection::SendRangesReq(std::size_t N, RUserRange *ranges | |
| return status; | ||
| } | ||
|
|
||
| ROOT::Internal::RCurlConnection::RStatus | ||
| ROOT::Internal::RCurlConnection::SendPutReq(const unsigned char *data, std::size_t length) | ||
| { | ||
| static constexpr std::size_t kExpect100Threshold = 4096; | ||
|
|
||
| ResetHandle(); | ||
|
|
||
| auto rc = curl_easy_setopt(fHandle, CURLOPT_UPLOAD, 1L); | ||
|
JasMehta08 marked this conversation as resolved.
|
||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(length)); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
| // For small uploads, disable Expect: 100-continue to avoid an extra round trip. | ||
| // For large uploads, let curl send it (the default) so the server can reject early. | ||
| struct curl_slist *extraHeaders = nullptr; | ||
| if (length < kExpect100Threshold) { | ||
| extraHeaders = curl_slist_append(extraHeaders, "Expect:"); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HTTPHEADER, extraHeaders); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| } | ||
|
Comment on lines
+925
to
+932
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, probably not needed after all |
||
|
|
||
| RUploadState uploadState{data, length, 0, {}}; | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_READFUNCTION, CallbackPutRead); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_READDATA, &uploadState); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
| // Seek callback is required when CURLOPT_FOLLOWLOCATION is enabled: if the server responds | ||
| // with a redirect, curl needs to rewind the upload data and resend it to the new location. | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_SEEKFUNCTION, CallbackPutSeek); | ||
| R__ASSERT(rc == CURLE_OK); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_SEEKDATA, &uploadState); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, we should probably return a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it would touch all the existing methods too (
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be in this PR but as a separate commit |
||
|
|
||
| RStatus status; | ||
| Perform(status); | ||
|
|
||
| if (extraHeaders) | ||
| curl_slist_free_all(extraHeaders); | ||
| rc = curl_easy_setopt(fHandle, CURLOPT_HTTPHEADER, NULL); | ||
| R__ASSERT(rc == CURLE_OK); | ||
|
|
||
| if (!uploadState.fErrorMsg.empty()) { | ||
| status.fStatusCode = RStatus::kIOError; | ||
| if (!status.fStatusMsg.empty()) | ||
| status.fStatusMsg += "; "; | ||
| status.fStatusMsg += uploadState.fErrorMsg; | ||
| } | ||
|
|
||
| return status; | ||
| } | ||
|
|
||
| void ROOT::Internal::RCurlConnection::SetCredentials(const RS3Credentials &credentials) | ||
| { | ||
| ClearCredentials(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,11 +4,68 @@ | |
|
|
||
| #include "TServerSocket.h" | ||
|
|
||
| #include <algorithm> | ||
| #include <cctype> | ||
| #include <cstdint> | ||
| #include <cstring> | ||
| #include <string> | ||
| #include <thread> | ||
|
|
||
| /// Convert a string to lower-case in place and return a reference to it. | ||
| static std::string &ToLower(std::string &s) | ||
| { | ||
| std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return std::tolower(c); }); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this function should just return the new string by value, rather than modifying the input (to make the calling site a bit prettier) |
||
| return s; | ||
| } | ||
|
|
||
| /// Accept a PUT request: read headers + body, optionally respond to Expect: 100-continue, send 200 OK. | ||
| static void TaskRecvPut(TServerSocket *serverSocket, std::string *requestHeaders, std::string *requestBody) | ||
| { | ||
| requestHeaders->clear(); | ||
| requestBody->clear(); | ||
| auto sock = serverSocket->Accept(); | ||
|
|
||
| const char *eof = "\r\n\r\n"; | ||
| const std::size_t eofLen = strlen(eof); | ||
| std::size_t nextInEof = 0; | ||
| char c; | ||
| while (sock->RecvRaw(&c, 1)) { | ||
| requestHeaders->push_back(c); | ||
| if (c == eof[nextInEof]) { | ||
| if (++nextInEof == eofLen) | ||
| break; | ||
| } else { | ||
| nextInEof = 0; | ||
| } | ||
| } | ||
|
|
||
| // If the client sent Expect: 100-continue, respond with HTTP 100 before reading the body | ||
| std::string headersLower = *requestHeaders; | ||
| ToLower(headersLower); | ||
| if (headersLower.find("expect: 100-continue") != std::string::npos) { | ||
| const char *continueResponse = "HTTP/1.1 100 Continue\r\n\r\n"; | ||
| sock->SendRaw(continueResponse, strlen(continueResponse)); | ||
| } | ||
|
|
||
| // Parse content-length (case-insensitive) | ||
| std::size_t contentLength = 0; | ||
| auto pos = headersLower.find("content-length: "); | ||
| if (pos != std::string::npos) { | ||
|
JasMehta08 marked this conversation as resolved.
|
||
| auto valStart = pos + strlen("content-length: "); | ||
| auto valEnd = headersLower.find("\r\n", valStart); | ||
| contentLength = std::stoul(headersLower.substr(valStart, valEnd - valStart)); | ||
| } | ||
|
|
||
| if (contentLength > 0) { | ||
| requestBody->resize(contentLength); | ||
| sock->RecvRaw(&(*requestBody)[0], contentLength); | ||
| } | ||
|
|
||
| const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; | ||
|
JasMehta08 marked this conversation as resolved.
|
||
| sock->SendRaw(response, strlen(response)); | ||
| sock->Close(); | ||
| } | ||
|
|
||
| static void TaskRecv(TServerSocket *serverSocket, std::string *request) | ||
| { | ||
| request->clear(); | ||
|
|
@@ -63,3 +120,32 @@ TEST(RCurlConnection, Cred) | |
| threadRecv.join(); | ||
| EXPECT_EQ(std::string::npos, request.find("\r\nAuthorization: ")); | ||
| } | ||
|
|
||
| TEST(RCurlConnection, Put) | ||
| { | ||
| TServerSocket sock(0, false, TServerSocket::kDefaultBacklog, -1, ESocketBindOption::kInaddrLoopback); | ||
| const std::string url = | ||
| std::string("http://") + sock.GetLocalInetAddress().GetHostAddress() + ":" + std::to_string(sock.GetLocalPort()); | ||
|
|
||
| const unsigned char payload[] = "Hello, S3!"; | ||
| const std::size_t payloadLen = sizeof(payload) - 1; // exclude null terminator | ||
|
|
||
| std::string headers; | ||
| std::string body; | ||
| std::thread threadRecv(TaskRecvPut, &sock, &headers, &body); | ||
|
|
||
| ROOT::Internal::RCurlConnection conn(url); | ||
| auto status = conn.SendPutReq(payload, payloadLen); | ||
|
|
||
| threadRecv.join(); | ||
| EXPECT_TRUE(static_cast<bool>(status)); | ||
| EXPECT_EQ(0u, headers.find("PUT ")); | ||
|
|
||
| // Normalize headers to lower-case for case-insensitive matching | ||
| std::string headersLower = headers; | ||
| ToLower(headersLower); | ||
| auto clPos = headersLower.find("content-length: " + std::to_string(payloadLen)); | ||
| ASSERT_NE(std::string::npos, clPos) << "content-length header not found in request"; | ||
|
|
||
| EXPECT_EQ(std::string(reinterpret_cast<const char *>(payload), payloadLen), body); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a brief doc comment (in particular to document what does this function return)