Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/rdma_performance/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class PerformanceTest {

int Init() {
brpc::ChannelOptions options;
options.use_rdma = FLAGS_use_rdma;
options.socket_mode = FLAGS_use_rdma? RDMA : TCP;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_rpc_timeout_ms;
Expand Down
2 changes: 1 addition & 1 deletion example/rdma_performance/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
g_last_time.store(0, butil::memory_order_relaxed);

brpc::ServerOptions options;
options.use_rdma = FLAGS_use_rdma;
options.socket_mode = FLAGS_use_rdma? RDMA : TCP;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
15 changes: 4 additions & 11 deletions src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "butil/time.h" // gettimeofday_us
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/acceptor.h"
#include "brpc/transport_factory.h"


namespace brpc {
Expand All @@ -40,7 +41,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
, _empty_cond(&_map_mutex)
, _force_ssl(false)
, _ssl_ctx(NULL)
, _use_rdma(false)
, socket_mode(TCP)
, _bthread_tag(BTHREAD_TAG_DEFAULT) {
}

Expand Down Expand Up @@ -282,18 +283,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
options.need_on_edge_trigger = true;
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
#if BRPC_WITH_RDMA
if (am->_use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
} else {
#else
{
#endif
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
options.socket_mode = am->socket_mode;
options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
Expand Down
5 changes: 3 additions & 2 deletions src/brpc/acceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "butil/synchronization/condition_variable.h"
#include "butil/containers/flat_map.h"
#include "brpc/input_messenger.h"
#include "brpc/common.h"


namespace brpc {
Expand Down Expand Up @@ -110,8 +111,8 @@ friend class Server;
bool _force_ssl;
std::shared_ptr<SocketSSLContext> _ssl_ctx;

// Whether to use rdma or not
bool _use_rdma;
// Choose to use a certain socket: 0 TCP, 1 RDMA
Mode socket_mode;

// Acceptor belongs to this tag
bthread_tag_t _bthread_tag;
Expand Down
39 changes: 7 additions & 32 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
#include "brpc/transport_factory.h"

namespace brpc {

Expand All @@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
, connection_type(CONNECTION_TYPE_UNKNOWN)
, succeed_without_server(true)
, log_succeed_without_server(true)
, use_rdma(false)
, socket_mode(TCP)
, auth(NULL)
, backup_request_policy(NULL)
, retry_policy(NULL)
Expand Down Expand Up @@ -120,7 +121,7 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
} else {
// All disabled ChannelSSLOptions are the same
}
if (opt.use_rdma) {
if (opt.socket_mode == RDMA) {
buf.append("|rdma");
}
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
Expand Down Expand Up @@ -163,20 +164,6 @@ Channel::~Channel() {
}
}

#if BRPC_WITH_RDMA
static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
if (opt->has_ssl_options()) {
LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
return false;
}
if (!rdma::SupportedByRdma(opt->protocol.name())) {
LOG(WARNING) << "Cannot use " << opt->protocol.name()
<< " over RDMA";
return false;
}
return true;
}
#endif

int Channel::InitChannelOptions(const ChannelOptions* options) {
if (options) { // Override default options if user provided one.
Expand All @@ -191,20 +178,8 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
_options.hc_option.health_check_path = FLAGS_health_check_path;
_options.hc_option.health_check_timeout_ms = FLAGS_health_check_timeout_ms;
}
if (_options.use_rdma) {
#if BRPC_WITH_RDMA
if (!OptionsAvailableForRdma(&_options)) {
return -1;
}
rdma::GlobalRdmaInitializeOrDie();
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
return -1;
}
#else
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
return -1;
#endif
}
auto ret = TransportFactory::ContextInitOrDie(options->socket_mode, false, &_options);
CHECK(ret == 0);
Comment thread
zchuango marked this conversation as resolved.
Outdated

_serialize_request = protocol->serialize_request;
_pack_request = protocol->pack_request;
Expand Down Expand Up @@ -369,7 +344,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
return -1;
}
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) {
&_server_id, ssl_ctx, _options.socket_mode, _options.hc_option) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
Expand Down Expand Up @@ -406,7 +381,7 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
ns_opt.use_rdma = _options.use_rdma;
ns_opt.socket_mode = _options.socket_mode;
ns_opt.channel_signature = ComputeChannelSignature(_options);
ns_opt.hc_option = _options.hc_option;
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
Expand Down
7 changes: 4 additions & 3 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"
#include "brpc/health_check_option.h"
#include "brpc/common.h"

namespace brpc {

Expand Down Expand Up @@ -105,9 +106,9 @@ struct ChannelOptions {
const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
ChannelSSLOptions* mutable_ssl_options();

// Let this channel use rdma rather than tcp.
// Default: false
bool use_rdma;
// Let this channel Choose to use a certain socket: 0 TCP, 1 RDMA.
// Default: TCP
Mode socket_mode;

// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
Expand Down
24 changes: 24 additions & 0 deletions src/brpc/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BRPC_COMMON_H
#define BRPC_COMMON_H
enum Mode {
Comment thread
zchuango marked this conversation as resolved.
Outdated
TCP = 0,
Comment thread
zchuango marked this conversation as resolved.
Outdated
RDMA = 1
};
Comment thread
zchuango marked this conversation as resolved.
#endif //BRPC_COMMON_H
2 changes: 1 addition & 1 deletion src/brpc/details/naming_service_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void NamingServiceThread::Actions::ResetServers(
// to pick those Sockets with the right settings during OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx,
_owner->_options.use_rdma, _owner->_options.hc_option));
_owner->_options.socket_mode, _owner->_options.hc_option));
_added_sockets.push_back(tagged_id);
}

Expand Down
7 changes: 4 additions & 3 deletions src/brpc/details/naming_service_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "brpc/naming_service.h" // NamingService
#include "brpc/naming_service_filter.h" // NamingServiceFilter
#include "brpc/socket_map.h"
#include "brpc/common.h"

namespace brpc {

Expand All @@ -45,11 +46,11 @@ struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
, log_succeed_without_server(true)
, use_rdma(false) {}
, socket_mode(TCP) {}

bool succeed_without_server;
bool log_succeed_without_server;
bool use_rdma;
Mode socket_mode;
HealthCheckOption hc_option;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/input_message_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class InputMessageBase : public Destroyable {
friend class InputMessenger;
friend void* ProcessInputMessage(void*);
friend class Stream;
friend class Transport;
int64_t _received_us;
int64_t _base_real_us;
SocketUniquePtr _socket;
Expand Down
70 changes: 12 additions & 58 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "brpc/protocol.h" // ListProtocols
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/input_messenger.h"

#include "brpc/transport_factory.h"

namespace brpc {

Expand Down Expand Up @@ -112,8 +112,7 @@ ParseResult InputMessenger::CutInputMessage(
// The length of `data' must be PROTO_DUMMY_LEN + 1 to store extra ending char '\0'
char data[PROTO_DUMMY_LEN + 1];
m->_read_buf.copy_to_cstr(data, PROTO_DUMMY_LEN);
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0 &&
m->_rdma_state == Socket::RDMA_OFF) {
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0) {
// To avoid timeout when client uses RDMA but server uses TCP
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
Expand Down Expand Up @@ -191,46 +190,13 @@ struct RunLastMessage {
}
};

static void QueueMessage(InputMessageBase* to_run_msg,
int* num_bthread_created,
bthread_keytable_pool_t* keytable_pool) {
if (!to_run_msg) {
return;
}

#if BRPC_WITH_RDMA
if (rdma::FLAGS_rdma_disable_bthread) {
ProcessInputMessage(to_run_msg);
return;
}
#endif
// Create bthread for last_msg. The bthread is not scheduled
// until bthread_flush() is called (in the worse case).

// TODO(gejun): Join threads.
bthread_t th;
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
tmp.tag = bthread_self_tag();
bthread_attr_set_name(&tmp, "ProcessInputMessage");

if (!FLAGS_usercode_in_coroutine && bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
} else {
ProcessInputMessage(to_run_msg);
}
}

InputMessenger::InputMessageClosure::~InputMessageClosure() noexcept(false) {
InputMessageClosure::~InputMessageClosure() noexcept(false) {
if (_msg) {
ProcessInputMessage(_msg);
}
}

void InputMessenger::InputMessageClosure::reset(InputMessageBase* m) {
void InputMessageClosure::reset(InputMessageBase* m) {
if (_msg) {
ProcessInputMessage(_msg);
}
Expand Down Expand Up @@ -303,7 +269,8 @@ int InputMessenger::ProcessNewMessage(
// This unique_ptr prevents msg to be lost before transfering
// ownership to last_msg
DestroyingPtr<InputMessageBase> msg(pr.message());
QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool);
// QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool, m->socket_mode);
Comment thread
zchuango marked this conversation as resolved.
Outdated
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
if (_handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
Expand Down Expand Up @@ -336,22 +303,19 @@ int InputMessenger::ProcessNewMessage(
// Transfer ownership to last_msg
last_msg.reset(msg.release());
} else {
QueueMessage(msg.release(), &num_bthread_created,
m->_keytable_pool);
last_msg.reset(msg.release());
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
bthread_flush();
num_bthread_created = 0;
}
}
#if BRPC_WITH_RDMA
// In RDMA polling mode, all messages must be executed in a new bthread and
// not in the bthread where the polling bthread is located, because the
// method for processing messages may call synchronization primitives,
// causing the polling bthread to be scheduled out.
if (rdma::FLAGS_rdma_use_polling) {
QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
if (m->_socket_mode == RDMA) {
m->_transport->QueueMessage(last_msg, &num_bthread_created, true);
}
#endif
if (num_bthread_created) {
bthread_flush();
}
Expand Down Expand Up @@ -414,8 +378,7 @@ void InputMessenger::OnNewMessages(Socket* m) {
}
}

if (m->_rdma_state == Socket::RDMA_OFF && messenger->ProcessNewMessage(
m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
return;
}
}
Expand Down Expand Up @@ -533,16 +496,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,

int InputMessenger::Create(SocketOptions options, SocketId* id) {
options.user = this;
#if BRPC_WITH_RDMA
if (options.use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
options.app_connect = std::make_shared<rdma::RdmaConnect>();
} else {
#else
{
#endif
options.on_edge_triggered_events = OnNewMessages;
}
options.need_on_edge_trigger = true;
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
if (options.keepalive_options || FLAGS_socket_keepalive) {
Expand Down
Loading
Loading