diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt index d3ba8914f9fb3e..c83e11a9eb6ab3 100644 --- a/cloud/src/common/CMakeLists.txt +++ b/cloud/src/common/CMakeLists.txt @@ -10,6 +10,7 @@ set(COMMON_FILES logging.cpp bvars.cpp encryption_util.cpp + http_helper.cpp metric.cpp kms.cpp network_util.cpp diff --git a/cloud/src/common/configbase.cpp b/cloud/src/common/configbase.cpp index ae32ffb5f4da4f..877d9ed755910a 100644 --- a/cloud/src/common/configbase.cpp +++ b/cloud/src/common/configbase.cpp @@ -462,4 +462,53 @@ std::pair set_config(std::unordered_mapfind(name); + std::string value = (it != full_conf_map->end()) ? it->second : ""; + + if (!first) oss << ","; + first = false; + oss << "[\"" << name << "\",\"" << field.type << "\",\"" << value << "\"," + << (field.valmutable ? "true" : "false") << "]"; + } + oss << "]"; + return oss.str(); +} + +std::pair update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path) { + if (configs.empty()) { + return {false, "query param `configs` should not be empty"}; + } + + std::unordered_map conf_map; + std::istringstream ss(configs); + std::string conf; + while (std::getline(ss, conf, ',')) { + auto pos = conf.find('='); + if (pos == std::string::npos) { + return {false, fmt::format("config {} is invalid", conf)}; + } + std::string key = conf.substr(0, pos); + std::string val = conf.substr(pos + 1); + trim(key); + trim(val); + conf_map.emplace(std::move(key), std::move(val)); + } + + return set_config(std::move(conf_map), persist, custom_conf_path); +} + } // namespace doris::cloud::config diff --git a/cloud/src/common/configbase.h b/cloud/src/common/configbase.h index f1cc3716ba320c..b6eff9ae101c06 100644 --- a/cloud/src/common/configbase.h +++ b/cloud/src/common/configbase.h @@ -181,4 +181,9 @@ std::pair set_config(std::unordered_map update_config(const std::string& configs, bool persist, + const std::string& custom_conf_path); + } // namespace doris::cloud::config diff --git a/cloud/src/common/http_helper.cpp b/cloud/src/common/http_helper.cpp new file mode 100644 index 00000000000000..85566c1fd230f0 --- /dev/null +++ b/cloud/src/common/http_helper.cpp @@ -0,0 +1,1135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http_helper.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common/metric.h" +#include "cpp/s3_rate_limiter.h" +#include "meta-service/meta_service.h" +#include "meta-service/meta_service_helper.h" +#include "meta-service/meta_service_http.h" +#include "recycler/recycler.h" +#include "recycler/recycler_service.h" +namespace doris::cloud { + +template +static google::protobuf::util::Status parse_json_message(const std::string& unresolved_path, + const std::string& body, Message* req) { + static_assert(std::is_base_of_v); + auto st = google::protobuf::util::JsonStringToMessage(body, req); + if (!st.ok()) { + std::string msg = "failed to strictly parse http request for '" + unresolved_path + + "' error: " + st.ToString(); + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); + + // ignore unknown fields + google::protobuf::util::JsonParseOptions json_parse_options; + json_parse_options.ignore_unknown_fields = true; + return google::protobuf::util::JsonStringToMessage(body, req, json_parse_options); + } + return {}; +} + +#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ + do { \ + std::string body = ctrl->request_attachment().to_string(); \ + auto& unresolved_path = ctrl->http_request().unresolved_path(); \ + auto st = parse_json_message(unresolved_path, body, &req); \ + if (!st.ok()) { \ + std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ + LOG_WARNING(msg).tag("body", encryt_sk(hide_access_key(body))); \ + return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ + } \ + } while (0) + +const std::unordered_map& get_http_handlers() { + using MS = MetaServiceImpl; + using RS = RecyclerServiceImpl; + + static std::unordered_map handlers { + // MetaService APIs + {"add_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_endpoint", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"update_cluster_mysql_user_name", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"add_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"decommission_node", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_cluster_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"notify_decommissioned", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_vcluster_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"create_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_create_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"rename_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"enable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"disable_instance_sse", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_instance_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"add_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"legacy_update_ak_sk", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"update_ak_sk", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_ak_sk((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_storage_vaults", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_obj_store_info((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"add_hdfs_vault", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_storage_vault((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"add_s3_vault", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_storage_vault((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_s3_vault", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_storage_vault((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_s3_vault", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_storage_vault((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"drop_hdfs_vault", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_storage_vault((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_obj_info", + {.handler = + [](void* s, brpc::Controller* c) { + return process_alter_obj_store_info((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + + {"decode_key", + {.handler = [](void* s, brpc::Controller* c) { return process_decode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"encode_key", + {.handler = [](void* s, brpc::Controller* c) { return process_encode_key((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_value", + {.handler = [](void* s, brpc::Controller* c) { return process_get_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_value", + {.handler = [](void* s, brpc::Controller* c) { return process_set_value((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"show_meta_ranges", + {.handler = [](void* s, + brpc::Controller* c) { return process_show_meta_ranges((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"txn_lazy_commit", + {.handler = [](void* s, + brpc::Controller* c) { return process_txn_lazy_commit((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"injection_point", + {.handler = [](void* s, + brpc::Controller* c) { return process_injection_point((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_stats", + {.handler = [](void* s, + brpc::Controller* c) { return process_fix_tablet_stats((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"fix_tablet_db_id", + {.handler = [](void* s, + brpc::Controller* c) { return process_fix_tablet_db_id((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"get_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_instance_info((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_obj_store_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_obj_store_info((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_cluster((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_tablet_stats", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_tablet_stats((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_stage", + {.handler = [](void* s, brpc::Controller* c) { return process_get_stage((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"get_cluster_status", + {.handler = [](void* s, + brpc::Controller* c) { return process_get_cluster_status((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + {"list_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_list_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"set_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"get_snapshot_property", + {.handler = + [](void* s, brpc::Controller* c) { + return process_get_snapshot_property((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"set_multi_version_status", + {.handler = + [](void* s, brpc::Controller* c) { + return process_set_multi_version_status((MS*)s, c); + }, + .role = HttpRole::META_SERVICE}}, + {"compact_snapshot", + {.handler = [](void* s, + brpc::Controller* c) { return process_compact_snapshot((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"decouple_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_decouple_instance((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"abort_txn", + {.handler = [](void* s, brpc::Controller* c) { return process_abort_txn((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"abort_tablet_job", + {.handler = [](void* s, + brpc::Controller* c) { return process_abort_tablet_job((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_ram_user", + {.handler = [](void* s, + brpc::Controller* c) { return process_alter_ram_user((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"alter_iam", + {.handler = [](void* s, brpc::Controller* c) { return process_alter_iam((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"adjust_rate_limit", + {.handler = [](void* s, + brpc::Controller* c) { return process_adjust_rate_limit((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + {"list_rate_limit", + {.handler = [](void* s, + brpc::Controller* c) { return process_query_rate_limit((MS*)s, c); }, + .role = HttpRole::META_SERVICE}}, + + // Recycler APIs + {"recycle_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_recycle_instance((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"statistics_recycle", + {.handler = [](void* s, + brpc::Controller* c) { return process_statistics_recycle((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"recycle_copy_jobs", + {.handler = [](void* s, + brpc::Controller* c) { return process_recycle_copy_jobs((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"recycle_job_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_recycle_job_info((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_instance", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_instance((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_job_info", + {.handler = [](void* s, + brpc::Controller* c) { return process_check_job_info((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"check_meta", + {.handler = [](void* s, brpc::Controller* c) { return process_check_meta((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + {"adjust_rate_limiter", + {.handler = [](void* s, + brpc::Controller* c) { return process_adjust_rate_limiter((RS*)s, c); }, + .role = HttpRole::RECYCLER}}, + + // Shared APIs + {"show_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_show_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + {"update_config", + {.handler = [](void* s, + brpc::Controller* c) { return process_update_config((MS*)s, c); }, + .role = HttpRole::BOTH}}, + }; + + return handlers; +} + +HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, + {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, + {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, + {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, + {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, + {"add_node", AlterClusterRequest::ADD_NODE}, + {"drop_node", AlterClusterRequest::DROP_NODE}, + {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, + {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, + {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, + {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, + }; + + auto& path = ctrl->http_request().unresolved_path(); + auto body = ctrl->request_attachment().to_string(); + auto it = operations.find(remove_version_prefix(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter cluster operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + req.set_op(it->second); + AlterClusterResponse resp; + service->alter_cluster(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + GetObjStoreInfoResponse resp; + service->get_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, + {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, + {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(remove_version_prefix(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter obj store info operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_obj_store_info(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map operations { + {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, + {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, + {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, + {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, + {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(remove_version_prefix(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter storage vault operation: " + path; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterObjStoreInfoRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_op(it->second); + + AlterObjStoreInfoResponse resp; + service->alter_storage_vault(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { + UpdateAkSkRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + UpdateAkSkResponse resp; + service->update_ak_sk(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + CreateInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + CreateInstanceResponse resp; + service->create_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + static std::unordered_map> + operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, + {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, + {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, + {"drop_instance", {AlterInstanceRequest::DROP}}, + {"set_instance_status", + {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; + + auto& path = ctrl->http_request().unresolved_path(); + auto it = operations.find(remove_version_prefix(path)); + if (it == operations.end()) { + std::string msg = "not supportted alter instance operation: '" + path + + "', remove version prefix=" + std::string(remove_version_prefix(path)); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. + if ((it->second).size() == 1) { + req.set_op((it->second)[0]); + } + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { + AbortTxnRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AbortTxnResponse resp; + service->abort_txn(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { + FinishTabletJobRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + req.set_action(FinishTabletJobRequest::ABORT); + FinishTabletJobResponse resp; + service->finish_tablet_job(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterRamUserRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterRamUserResponse resp; + service->alter_ram_user(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterIamRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + AlterIamResponse resp; + service->alter_iam(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; + auto rpc_name = std::string {http_query(uri, "rpc_name")}; + auto instance_id = std::string {http_query(uri, "instance_id")}; + + auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { + DCHECK(!qps_limit_str.empty()); + int64_t qps_limit = -1; + try { + qps_limit = std::stoll(qps_limit_str); + } catch (const std::exception& ex) { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); + } + if (qps_limit < 0) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "`qps_limit` should not be less than 0"); + } + if (cb(qps_limit)) { + return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); + } + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("failed to adjust rate limit for qps_limit={}, " + "rpc_name={}, instance_id={}, plz ensure correct " + "rpc/instance name", + qps_limit_str, rpc_name, instance_id)); + }; + + auto set_global_qps_limit = [process_set_qps_limit, service]() { + return process_set_qps_limit([service](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit); + }); + }; + + auto set_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); + }); + }; + + auto set_instance_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); + }); + }; + + auto set_instance_rpc_qps_limit = [&]() { + return process_set_qps_limit([&](int64_t qps_limit) { + return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); + }); + }; + + auto process_invalid_arguments = [&]() -> HttpResponse { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid argument: qps_limit(required)={}, " + "rpc_name(optional)={}, instance_id(optional)={}", + qps_limit_str, rpc_name, instance_id)); + }; + + // We have 3 optional params and 2^3 combination, and 4 of them are illegal. + // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. + std::array, 8> processors; + std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); + processors[0b001] = std::move(set_global_qps_limit); + processors[0b011] = std::move(set_rpc_qps_limit); + processors[0b101] = std::move(set_instance_qps_limit); + processors[0b111] = std::move(set_instance_rpc_qps_limit); + + uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | + ((0x01 & !instance_id.empty()) << 2); + + DCHECK_LT(level, 8); + + return processors[level](); +} + +HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { + auto rate_limiter = service->rate_limiter(); + rapidjson::Document d; + d.SetObject(); + auto get_qps_limit = [&d](std::string_view rpc_name, + std::shared_ptr rpc_limiter) { + rapidjson::Document node; + node.SetObject(); + rapidjson::Document sub; + sub.SetObject(); + auto get_qps_token_limit = [&](std::string_view instance_id, + std::shared_ptr qps_token) { + sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), + qps_token->max_qps_limit(), d.GetAllocator()); + }; + rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); + + node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); + node.AddMember("instance specific qps limit", sub, d.GetAllocator()); + d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); + }; + rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); + + rapidjson::StringBuffer sb; + rapidjson::PrettyWriter writer(sb); + d.Accept(writer); + return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); +} + +// Recycler HTTP handlers +HttpResponse process_recycle_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + RecycleInstanceRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse RecycleInstanceRequest"); + } + RecycleInstanceResponse res; + service->recycle_instance(cntl, &req, &res, nullptr); + return http_text_reply(res.status(), res.status().msg()); +} + +HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, brpc::Controller* cntl) { + std::string request_body = cntl->request_attachment().to_string(); + StatisticsRecycleRequest req; + auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); + if (!st.ok()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "failed to parse StatisticsRecycleRequest"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + service->statistics_recycle(req, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + recycle_copy_jobs(service->txn_kv(), *instance_id, code, msg, + service->recycler()->thread_pool_group(), service->txn_lazy_committer()); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_recycle_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + if (!service->checker()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "checker not enabled"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + service->check_instance(*instance_id, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); + if (!instance_id || instance_id->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); + } + MetaServiceCode code = MetaServiceCode::OK; + std::string msg, key; + job_check_key({*instance_id}, &key); + recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); + return http_text_reply(code, msg, msg); +} + +HttpResponse process_check_meta(RecyclerServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* instance_id = uri.GetQuery("instance_id"); + const auto* host = uri.GetQuery("host"); + const auto* port = uri.GetQuery("port"); + const auto* user = uri.GetQuery("user"); + const auto* password = uri.GetQuery("password"); + if (!instance_id || instance_id->empty() || !host || host->empty() || !port || port->empty() || + !password || !user || user->empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "missing required parameters"); + } + std::string msg; + check_meta(service->txn_kv(), *instance_id, *host, *port, *user, *password, msg); + return http_text_reply(MetaServiceCode::OK, msg, msg); +} + +HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + const auto* type_string = uri.GetQuery("type"); + const auto* speed = uri.GetQuery("speed"); + const auto* burst = uri.GetQuery("burst"); + const auto* limit = uri.GetQuery("limit"); + if (!type_string || type_string->empty() || !speed || !burst || !limit || + (*type_string != "get" && *type_string != "put")) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "invalid arguments"); + } + auto max_speed = speed->empty() ? 0 : std::stoul(*speed); + auto max_burst = burst->empty() ? 0 : std::stoul(*burst); + auto max_limit = limit->empty() ? 0 : std::stoul(*limit); + if (reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, max_burst, + max_limit) != 0) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "adjust failed"); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl) { + auto& uri = cntl->http_request().uri(); + std::string_view conf_name = http_query(uri, "conf_key"); + + if (config::full_conf_map == nullptr) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "config map not initialized"); + } + + std::string result = config::show_config(std::string(conf_name)); + return http_json_reply(MetaServiceCode::OK, "", result); +} + +HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { + const auto& uri = cntl->http_request().uri(); + bool persist = (http_query(uri, "persist") == "true"); + auto configs = std::string {http_query(uri, "configs")}; + auto reason = std::string {http_query(uri, "reason")}; + LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs + << ", persist=" << http_query(uri, "persist"); + + if (auto [succ, cause] = config::update_config(configs, persist, config::custom_conf_path); + !succ) { + LOG(WARNING) << cause; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); + } + return http_json_reply(MetaServiceCode::OK, ""); +} + +HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view key = http_query(uri, "key"); + if (key.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); + } + + bool unicode = http_query(uri, "unicode") != "false"; + std::string body = prettify_key(key, unicode); + if (body.empty()) { + std::string msg = "failed to decode key, key=" + std::string(key); + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { + return process_http_encode_key(ctrl->http_request().uri()); +} + +HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); +} + +HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_set_value(service->txn_kv().get(), ctrl); +} + +// show all key ranges and their count. +HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); + if (!txn_kv) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "this method only support fdb txn kv"); + } + + std::vector partition_boundaries; + TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); + if (code != TxnErrorCode::TXN_OK) { + auto msg = fmt::format("failed to get boundaries, code={}", code); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + std::unordered_map partition_count; + get_kv_range_boundaries_count(partition_boundaries, partition_count); + + // sort ranges by count + std::vector> meta_ranges; + meta_ranges.reserve(partition_count.size()); + for (auto&& [key, count] : partition_count) { + meta_ranges.emplace_back(key, count); + } + + std::sort(meta_ranges.begin(), meta_ranges.end(), + [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); + + std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); + for (auto&& [key, count] : meta_ranges) { + body += fmt::format("{}: {}\n", key, count); + } + return http_text_reply(MetaServiceCode::OK, "", body); +} + +HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + return http_json_reply_message(code, msg, instance); +} + +HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + + bool get_all_cluster_info = false; + // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. + if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { + get_all_cluster_info = true; + } + + GetClusterResponse resp; + service->get_cluster(ctrl, &req, &resp, nullptr); + + if (resp.status().code() == MetaServiceCode::OK) { + if (get_all_cluster_info) { + return http_json_reply_message(resp.status(), resp); + } else { + // ATTN: only returns the first cluster pb. + return http_json_reply_message(resp.status(), resp.cluster(0)); + } + } else { + return http_json_reply(resp.status()); + } +} + +HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetTabletStatsRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetTabletStatsResponse resp; + service->get_tablet_stats(ctrl, &req, &resp, nullptr); + + std::string body; + if (resp.status().code() == MetaServiceCode::OK) { + body = resp.DebugString(); + } + return http_text_reply(resp.status(), body); +} + +HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + std::string_view table_id = http_query(uri, "table_id"); + std::string_view tablet_id = http_query(uri, "tablet_id"); + + MetaServiceResponseStatus st = service->fix_tablet_stats( + std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); + return http_text_reply(st, st.DebugString()); +} + +HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string tablet_id_str(http_query(uri, "tablet_id")); + std::string db_id_str(http_query(uri, "db_id")); + + int64_t tablet_id = 0, db_id = 0; + try { + db_id = std::stol(db_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + try { + tablet_id = std::stol(tablet_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, + e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); + return http_text_reply(code, msg, ""); +} + +HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetStageRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetStageResponse resp; + service->get_stage(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + GetClusterStatusRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + GetClusterStatusResponse resp; + service->get_cluster_status(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string txn_id_str(http_query(uri, "txn_id")); + if (instance_id.empty() || txn_id_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); + } + + int64_t txn_id = 0; + try { + txn_id = std::stol(txn_id_str); + } catch (const std::exception& e) { + auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); + } + + DCHECK_GT(txn_id, 0); + + auto txn_lazy_committer = service->txn_lazy_committer(); + if (!txn_lazy_committer) { + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); + } + + std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); + auto [code, msg] = task->wait(); + return http_json_reply(code, msg); +} + +HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + ListSnapshotRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + ListSnapshotResponse resp; + service->list_snapshot(ctrl, &req, &resp, nullptr); + return http_json_reply_message(resp.status(), resp); +} + +HttpResponse process_compact_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); + } + CompactSnapshotRequest req; + req.set_instance_id(instance_id); + CompactSnapshotResponse resp; + service->compact_snapshot(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_decouple_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); + } + auto [code, msg] = service->snapshot_manager()->decouple_instance(instance_id); + return http_json_reply(code, msg); +} + +HttpResponse process_set_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + AlterInstanceRequest req; + PARSE_MESSAGE_OR_RETURN(ctrl, req); + auto* properties = req.mutable_properties(); + if (properties->contains("status")) { + std::string status = properties->at("status"); + if (status != "ENABLED" && status != "DISABLED") { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "Invalid value for status property: " + status + + ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); + } + std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; + const std::string& property_name = + AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); + (*properties)[property_name] = is_enable; + properties->erase("status"); + } + if (properties->contains("max_reserved_snapshots")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); + (*properties)[property_name] = properties->at("max_reserved_snapshots"); + properties->erase("max_reserved_snapshots"); + } + if (properties->contains("snapshot_interval_seconds")) { + const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( + AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); + (*properties)[property_name] = properties->at("snapshot_interval_seconds"); + properties->erase("snapshot_interval_seconds"); + } + req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); + AlterInstanceResponse resp; + service->alter_instance(ctrl, &req, &resp, nullptr); + return http_json_reply(resp.status()); +} + +HttpResponse process_set_multi_version_status(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string instance_id(http_query(uri, "instance_id")); + std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); + std::string multi_version_status_str(http_query(uri, "multi_version_status")); + + // Prefer instance_id if provided, fallback to cloud_unique_id + if (instance_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); + } + + if (multi_version_status_str.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "multi_version_status is required"); + } + + // Parse multi_version_status from string to enum + MultiVersionStatus multi_version_status; + std::string multi_version_status_upper = multi_version_status_str; + std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), + ::toupper); + + if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; + } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; + } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; + } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { + multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; + } else { + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " + "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); + } + // Call snapshot manager directly + auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, + multi_version_status); + + return http_json_reply(code, msg); +} + +HttpResponse process_get_snapshot_property(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + std::string_view instance_id = http_query(uri, "instance_id"); + std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); + + if (instance_id.empty() && cloud_unique_id.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + "empty instance_id and cloud_unique_id"); + } + + InstanceInfoPB instance; + auto [code, msg] = service->get_instance_info(std::string(instance_id), + std::string(cloud_unique_id), &instance); + if (code != MetaServiceCode::OK) { + return http_json_reply(code, msg); + } + + // Build snapshot properties response + rapidjson::Document doc; + doc.SetObject(); + + // Snapshot switch status + std::string_view switch_status; + switch (instance.snapshot_switch_status()) { + case SNAPSHOT_SWITCH_DISABLED: + switch_status = "UNSUPPORTED"; + break; + case SNAPSHOT_SWITCH_OFF: + switch_status = "DISABLED"; + break; + case SNAPSHOT_SWITCH_ON: + switch_status = "ENABLED"; + break; + default: + switch_status = "UNKNOWN"; + break; + } + doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), + doc.GetAllocator()); + + // Max reserved snapshots + if (instance.has_max_reserved_snapshot()) { + doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), + doc.GetAllocator()); + } + + // Snapshot interval seconds + if (instance.has_snapshot_interval_seconds()) { + doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), + doc.GetAllocator()); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + doc.Accept(writer); + + return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); +} + +} // namespace doris::cloud diff --git a/cloud/src/common/http_helper.h b/cloud/src/common/http_helper.h new file mode 100644 index 00000000000000..4ab84dbfd339b3 --- /dev/null +++ b/cloud/src/common/http_helper.h @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "meta-service/meta_service_http.h" + +namespace doris::cloud { + +[[maybe_unused]] static std::string_view remove_version_prefix(std::string_view path) { + if (path.size() > 3 && path.starts_with("v1/")) { + path.remove_prefix(3); + } + return path; +} + +const std::unordered_map& get_http_handlers(); + +// injection_point_http.cpp +[[maybe_unused]] HttpResponse process_injection_point(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// MetaService Http handlers +[[maybe_unused]] HttpResponse process_alter_cluster(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_storage_vault(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_update_ak_sk(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_create_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_abort_tablet_job(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_ram_user(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_query_rate_limit(MetaServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl); + +// show all key ranges and their count. +[[maybe_unused]] HttpResponse process_show_meta_ranges(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_instance_info(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_cluster_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_list_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_compact_snapshot(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_decouple_instance(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_set_multi_version_status(MetaServiceImpl* service, + brpc::Controller* ctrl); + +[[maybe_unused]] HttpResponse process_get_snapshot_property(MetaServiceImpl* service, + brpc::Controller* ctrl); + +// Recycler HTTP handlers +[[maybe_unused]] HttpResponse process_recycle_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); +[[maybe_unused]] HttpResponse process_check_instance(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_job_info(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_check_meta(RecyclerServiceImpl* service, + brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, + brpc::Controller* cntl); + +// Both http handlers +[[maybe_unused]] HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl); + +[[maybe_unused]] HttpResponse process_update_config(MetaServiceImpl* service, + brpc::Controller* cntl); + +} // namespace doris::cloud diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 78c608a6dc4271..42eeebf5fb7837 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -32,9 +32,7 @@ #include #include -#include #include -#include #include #include #include @@ -43,30 +41,15 @@ #include #include -#include "common/config.h" -#include "common/configbase.h" +#include "common/http_helper.h" #include "common/logging.h" -#include "common/string_util.h" #include "meta-service/meta_service_helper.h" #include "meta-store/txn_kv.h" -#include "meta-store/txn_kv_error.h" #include "meta_service.h" -#include "rate-limiter/rate_limiter.h" +#include "recycler/recycler_service.h" namespace doris::cloud { -#define PARSE_MESSAGE_OR_RETURN(ctrl, req) \ - do { \ - std::string body = ctrl->request_attachment().to_string(); \ - auto& unresolved_path = ctrl->http_request().unresolved_path(); \ - auto st = parse_json_message(unresolved_path, body, &req); \ - if (!st.ok()) { \ - std::string msg = "parse http request '" + unresolved_path + "': " + st.ToString(); \ - LOG_WARNING(msg).tag("body", encryt_sk(body)); \ - return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, msg); \ - } \ - } while (0) - extern std::string get_instance_id(const std::shared_ptr& rc_mgr, const std::string& cloud_unique_id); @@ -182,885 +165,20 @@ static std::string format_http_request(brpc::Controller* cntl) { return ss.str(); } -static std::string_view remove_version_prefix(std::string_view path) { - if (path.size() > 3 && path.substr(0, 3) == "v1/") path.remove_prefix(3); - return path; -} - -HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl); - -static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, - {"drop_cluster", AlterClusterRequest::DROP_CLUSTER}, - {"rename_cluster", AlterClusterRequest::RENAME_CLUSTER}, - {"update_cluster_endpoint", AlterClusterRequest::UPDATE_CLUSTER_ENDPOINT}, - {"update_cluster_mysql_user_name", AlterClusterRequest::UPDATE_CLUSTER_MYSQL_USER_NAME}, - {"add_node", AlterClusterRequest::ADD_NODE}, - {"drop_node", AlterClusterRequest::DROP_NODE}, - {"decommission_node", AlterClusterRequest::DECOMMISSION_NODE}, - {"set_cluster_status", AlterClusterRequest::SET_CLUSTER_STATUS}, - {"notify_decommissioned", AlterClusterRequest::NOTIFY_DECOMMISSIONED}, - {"alter_vcluster_info", AlterClusterRequest::ALTER_VCLUSTER_INFO}, - }; - - auto& path = ctrl->http_request().unresolved_path(); - auto body = ctrl->request_attachment().to_string(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter cluster operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - req.set_op(it->second); - AlterClusterResponse resp; - service->alter_cluster(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_get_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - GetObjStoreInfoResponse resp; - service->get_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_alter_obj_store_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"add_obj_info", AlterObjStoreInfoRequest::ADD_OBJ_INFO}, - {"legacy_update_ak_sk", AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK}, - {"alter_obj_info", AlterObjStoreInfoRequest::ALTER_OBJ_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter obj store info operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_obj_store_info(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_storage_vault(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map operations { - {"drop_s3_vault", AlterObjStoreInfoRequest::DROP_S3_VAULT}, - {"add_s3_vault", AlterObjStoreInfoRequest::ADD_S3_VAULT}, - {"alter_s3_vault", AlterObjStoreInfoRequest::ALTER_S3_VAULT}, - {"drop_hdfs_vault", AlterObjStoreInfoRequest::DROP_HDFS_INFO}, - {"add_hdfs_vault", AlterObjStoreInfoRequest::ADD_HDFS_INFO}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter storage vault operation: " + path; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterObjStoreInfoRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_op(it->second); - - AlterObjStoreInfoResponse resp; - service->alter_storage_vault(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_update_ak_sk(MetaServiceImpl* service, brpc::Controller* ctrl) { - UpdateAkSkRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - UpdateAkSkResponse resp; - service->update_ak_sk(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_create_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - CreateInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - CreateInstanceResponse resp; - service->create_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - static std::unordered_map> - operations {{"rename_instance", {AlterInstanceRequest::RENAME}}, - {"enable_instance_sse", {AlterInstanceRequest::ENABLE_SSE}}, - {"disable_instance_sse", {AlterInstanceRequest::DISABLE_SSE}}, - {"drop_instance", {AlterInstanceRequest::DROP}}, - {"set_instance_status", - {AlterInstanceRequest::SET_NORMAL, AlterInstanceRequest::SET_OVERDUE}}}; - - auto& path = ctrl->http_request().unresolved_path(); - auto it = operations.find(remove_version_prefix(path)); - if (it == operations.end()) { - std::string msg = "not supportted alter instance operation: '" + path + - "', remove version prefix=" + std::string(remove_version_prefix(path)); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - // for unresolved path whose corresponding operation is signal, we need set operation by ourselves. - if ((it->second).size() == 1) { - req.set_op((it->second)[0]); - } - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_txn(MetaServiceImpl* service, brpc::Controller* ctrl) { - AbortTxnRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AbortTxnResponse resp; - service->abort_txn(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_abort_tablet_job(MetaServiceImpl* service, brpc::Controller* ctrl) { - FinishTabletJobRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - req.set_action(FinishTabletJobRequest::ABORT); - FinishTabletJobResponse resp; - service->finish_tablet_job(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_ram_user(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterRamUserRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterRamUserResponse resp; - service->alter_ram_user(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller* ctrl) { - AlterIamRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - AlterIamResponse resp; - service->alter_iam(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - auto qps_limit_str = std::string {http_query(uri, "qps_limit")}; - auto rpc_name = std::string {http_query(uri, "rpc_name")}; - auto instance_id = std::string {http_query(uri, "instance_id")}; - - auto process_set_qps_limit = [&](std::function cb) -> HttpResponse { - DCHECK(!qps_limit_str.empty()); - int64_t qps_limit = -1; - try { - qps_limit = std::stoll(qps_limit_str); - } catch (const std::exception& ex) { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what())); - } - if (qps_limit < 0) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "`qps_limit` should not be less than 0"); - } - if (cb(qps_limit)) { - return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit"); - } - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("failed to adjust rate limit for qps_limit={}, " - "rpc_name={}, instance_id={}, plz ensure correct " - "rpc/instance name", - qps_limit_str, rpc_name, instance_id)); - }; - - auto set_global_qps_limit = [process_set_qps_limit, service]() { - return process_set_qps_limit([service](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit); - }); - }; - - auto set_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name); - }); - }; - - auto set_instance_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id); - }); - }; - - auto set_instance_rpc_qps_limit = [&]() { - return process_set_qps_limit([&](int64_t qps_limit) { - return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id); - }); - }; - - auto process_invalid_arguments = [&]() -> HttpResponse { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("invalid argument: qps_limit(required)={}, " - "rpc_name(optional)={}, instance_id(optional)={}", - qps_limit_str, rpc_name, instance_id)); - }; - - // We have 3 optional params and 2^3 combination, and 4 of them are illegal. - // We register callbacks for them in porcessors accordings to the level, represented by 3 bits. - std::array, 8> processors; - std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments)); - processors[0b001] = std::move(set_global_qps_limit); - processors[0b011] = std::move(set_rpc_qps_limit); - processors[0b101] = std::move(set_instance_qps_limit); - processors[0b111] = std::move(set_instance_rpc_qps_limit); - - uint8_t level = (0x01 & !qps_limit_str.empty()) | ((0x01 & !rpc_name.empty()) << 1) | - ((0x01 & !instance_id.empty()) << 2); - - DCHECK_LT(level, 8); - - return processors[level](); -} - -static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) { - auto rate_limiter = service->rate_limiter(); - rapidjson::Document d; - d.SetObject(); - auto get_qps_limit = [&d](std::string_view rpc_name, - std::shared_ptr rpc_limiter) { - rapidjson::Document node; - node.SetObject(); - rapidjson::Document sub; - sub.SetObject(); - auto get_qps_token_limit = [&](std::string_view instance_id, - std::shared_ptr qps_token) { - sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()), - qps_token->max_qps_limit(), d.GetAllocator()); - }; - rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit)); - - node.AddMember("RPC qps limit", rpc_limiter->max_qps_limit(), d.GetAllocator()); - node.AddMember("instance specific qps limit", sub, d.GetAllocator()); - d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator()); - }; - rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit)); - - rapidjson::StringBuffer sb; - rapidjson::PrettyWriter writer(sb); - d.Accept(writer); - return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); -} - -static HttpResponse process_show_config(MetaServiceImpl*, brpc::Controller* cntl) { - auto& uri = cntl->http_request().uri(); - std::string_view conf_name = http_query(uri, "conf_key"); - - if (config::full_conf_map == nullptr) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "config map not initialized"); - } - - rapidjson::Document d; - d.SetArray(); - - for (auto& [name, field] : *config::Register::_s_field_map) { - if (!conf_name.empty() && name != conf_name) { - continue; - } - auto it = config::full_conf_map->find(name); - std::string value = (it != config::full_conf_map->end()) ? it->second : ""; - - rapidjson::Value entry(rapidjson::kArrayType); - entry.PushBack(rapidjson::Value(name.c_str(), d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(field.type, d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(value.c_str(), d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(field.valmutable), d.GetAllocator()); - d.PushBack(entry, d.GetAllocator()); - } - - rapidjson::StringBuffer sb; - rapidjson::PrettyWriter writer(sb); - d.Accept(writer); - return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); -} - -static HttpResponse process_update_config(MetaServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - bool persist = (http_query(uri, "persist") == "true"); - auto configs = std::string {http_query(uri, "configs")}; - auto reason = std::string {http_query(uri, "reason")}; - LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs - << ", persist=" << http_query(uri, "persist"); - if (configs.empty()) [[unlikely]] { - LOG(WARNING) << "query param `config` should not be empty"; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "query param `config` should not be empty"); - } - std::unordered_map conf_map; - auto conf_list = split(configs, ','); - for (const auto& conf : conf_list) { - auto conf_pair = split(conf, '='); - if (conf_pair.size() != 2) [[unlikely]] { - LOG(WARNING) << "failed to split config=[{}] from `k=v` pattern" << conf; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("config {} is invalid", configs)); - } - trim(conf_pair[0]); - trim(conf_pair[1]); - conf_map.emplace(std::move(conf_pair[0]), std::move(conf_pair[1])); - } - if (auto [succ, cause] = - config::set_config(std::move(conf_map), persist, config::custom_conf_path); - !succ) { - LOG(WARNING) << cause; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); - } - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view key = http_query(uri, "key"); - if (key.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no key to decode"); - } - - bool unicode = http_query(uri, "unicode") != "false"; - std::string body = prettify_key(key, unicode); - if (body.empty()) { - std::string msg = "failed to decode key, key=" + std::string(key); - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_encode_key(MetaServiceImpl*, brpc::Controller* ctrl) { - return process_http_encode_key(ctrl->http_request().uri()); -} - -static HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); -} - -static HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { - return process_http_set_value(service->txn_kv().get(), ctrl); -} - -// show all key ranges and their count. -static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto txn_kv = std::dynamic_pointer_cast(service->txn_kv()); - if (!txn_kv) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "this method only support fdb txn kv"); - } - - std::vector partition_boundaries; - TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); - if (code != TxnErrorCode::TXN_OK) { - auto msg = fmt::format("failed to get boundaries, code={}", code); - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - std::unordered_map partition_count; - get_kv_range_boundaries_count(partition_boundaries, partition_count); - - // sort ranges by count - std::vector> meta_ranges; - meta_ranges.reserve(partition_count.size()); - for (auto&& [key, count] : partition_count) { - meta_ranges.emplace_back(key, count); - } - - std::sort(meta_ranges.begin(), meta_ranges.end(), - [](const auto& lhs, const auto& rhs) { return lhs.second > rhs.second; }); - - std::string body = fmt::format("total meta ranges: {}\n", partition_boundaries.size()); - for (auto&& [key, count] : meta_ranges) { - body += fmt::format("{}: {}\n", key, count); - } - return http_text_reply(MetaServiceCode::OK, "", body); -} - -static HttpResponse process_get_instance_info(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - return http_json_reply_message(code, msg, instance); -} - -static HttpResponse process_get_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - - bool get_all_cluster_info = false; - // if cluster_id、cluster_name、mysql_user_name all empty, get this instance's all cluster info. - if (req.cluster_id().empty() && req.cluster_name().empty() && req.mysql_user_name().empty()) { - get_all_cluster_info = true; - } - - GetClusterResponse resp; - service->get_cluster(ctrl, &req, &resp, nullptr); - - if (resp.status().code() == MetaServiceCode::OK) { - if (get_all_cluster_info) { - return http_json_reply_message(resp.status(), resp); - } else { - // ATTN: only returns the first cluster pb. - return http_json_reply_message(resp.status(), resp.cluster(0)); - } - } else { - return http_json_reply(resp.status()); - } -} - -static HttpResponse process_get_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetTabletStatsRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetTabletStatsResponse resp; - service->get_tablet_stats(ctrl, &req, &resp, nullptr); - - std::string body; - if (resp.status().code() == MetaServiceCode::OK) { - body = resp.DebugString(); - } - return http_text_reply(resp.status(), body); -} - -static HttpResponse process_fix_tablet_stats(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - std::string_view table_id = http_query(uri, "table_id"); - std::string_view tablet_id = http_query(uri, "tablet_id"); - - MetaServiceResponseStatus st = service->fix_tablet_stats( - std::string(cloud_unique_id), std::string(table_id), std::string(tablet_id)); - return http_text_reply(st, st.DebugString()); -} - -static HttpResponse process_fix_tablet_db_id(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string tablet_id_str(http_query(uri, "tablet_id")); - std::string db_id_str(http_query(uri, "db_id")); - - int64_t tablet_id = 0, db_id = 0; - try { - db_id = std::stol(db_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("db_id {} must be a number, meet error={}", db_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - try { - tablet_id = std::stol(tablet_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("tablet_id {} must be a number, meet error={}", tablet_id_str, - e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - - auto [code, msg] = service->fix_tablet_db_id(instance_id, tablet_id, db_id); - return http_text_reply(code, msg, ""); -} - -static HttpResponse process_get_stage(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetStageRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetStageResponse resp; - service->get_stage(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_get_cluster_status(MetaServiceImpl* service, brpc::Controller* ctrl) { - GetClusterStatusRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - GetClusterStatusResponse resp; - service->get_cluster_status(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_txn_lazy_commit(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string txn_id_str(http_query(uri, "txn_id")); - if (instance_id.empty() || txn_id_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id or txn_id is empty"); - } - - int64_t txn_id = 0; - try { - txn_id = std::stol(txn_id_str); - } catch (const std::exception& e) { - auto msg = fmt::format("txn_id {} must be a number, meet error={}", txn_id_str, e.what()); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, msg); - } - - DCHECK_GT(txn_id, 0); - - auto txn_lazy_committer = service->txn_lazy_committer(); - if (!txn_lazy_committer) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer is nullptr"); - } - - std::shared_ptr task = txn_lazy_committer->submit(instance_id, txn_id); - auto [code, msg] = task->wait(); - return http_json_reply(code, msg); -} - -static HttpResponse process_unknown(MetaServiceImpl*, brpc::Controller* cntl) { - // ATTN: To be compatible with cloud manager versions higher than this MS - const auto& uri = cntl->http_request().uri(); - std::string query_params; - for (auto it = uri.QueryBegin(); it != uri.QueryEnd(); ++it) { - if (!query_params.empty()) query_params += "&"; - query_params += it->first + "=" + it->second; - } - LOG(WARNING) << "unknown http request path=" << uri.path() << " query_params=[" << query_params - << "]"; - return http_json_reply(MetaServiceCode::OK, "no handler found for path: " + uri.path()); -} - -static HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { - ListSnapshotRequest req; - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - if (instance_id.empty()) { - PARSE_MESSAGE_OR_RETURN(ctrl, req); - } else { - req.set_instance_id(instance_id); - } - - ListSnapshotResponse resp; - service->list_snapshot(ctrl, &req, &resp, nullptr); - return http_json_reply_message(resp.status(), resp); -} - -static HttpResponse process_drop_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string snapshot_id(http_query(uri, "snapshot_id")); - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); - } - if (snapshot_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "snapshot_id is empty"); - } - DropSnapshotRequest req; - req.set_snapshot_id(snapshot_id); - DropSnapshotResponse resp; - service->snapshot_manager()->drop_snapshot(instance_id, req, &resp); - return http_json_reply(resp.status()); -} - -static HttpResponse process_compact_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); - } - CompactSnapshotRequest req; - req.set_instance_id(instance_id); - CompactSnapshotResponse resp; - service->compact_snapshot(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_decouple_instance(MetaServiceImpl* service, brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty"); - } - auto [code, msg] = service->snapshot_manager()->decouple_instance(instance_id); - return http_json_reply(code, msg); -} - -static HttpResponse process_set_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - AlterInstanceRequest req; - PARSE_MESSAGE_OR_RETURN(ctrl, req); - auto* properties = req.mutable_properties(); - if (properties->contains("status")) { - std::string status = properties->at("status"); - if (status != "ENABLED" && status != "DISABLED") { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "Invalid value for status property: " + status + - ", expected 'ENABLED' or 'DISABLED' (case insensitive)"); - } - std::string_view is_enable = (status == "ENABLED") ? "true" : "false"; - const std::string& property_name = - AlterInstanceRequest::SnapshotProperty_Name(AlterInstanceRequest::ENABLE_SNAPSHOT); - (*properties)[property_name] = is_enable; - properties->erase("status"); - } - if (properties->contains("max_reserved_snapshots")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::MAX_RESERVED_SNAPSHOTS); - (*properties)[property_name] = properties->at("max_reserved_snapshots"); - properties->erase("max_reserved_snapshots"); - } - if (properties->contains("snapshot_interval_seconds")) { - const std::string& property_name = AlterInstanceRequest::SnapshotProperty_Name( - AlterInstanceRequest::SNAPSHOT_INTERVAL_SECONDS); - (*properties)[property_name] = properties->at("snapshot_interval_seconds"); - properties->erase("snapshot_interval_seconds"); - } - req.set_op(AlterInstanceRequest::SET_SNAPSHOT_PROPERTY); - AlterInstanceResponse resp; - service->alter_instance(ctrl, &req, &resp, nullptr); - return http_json_reply(resp.status()); -} - -static HttpResponse process_set_multi_version_status(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string instance_id(http_query(uri, "instance_id")); - std::string cloud_unique_id(http_query(uri, "cloud_unique_id")); - std::string multi_version_status_str(http_query(uri, "multi_version_status")); - - // Prefer instance_id if provided, fallback to cloud_unique_id - if (instance_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty instance id"); - } - - if (multi_version_status_str.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "multi_version_status is required"); - } - - // Parse multi_version_status from string to enum - MultiVersionStatus multi_version_status; - std::string multi_version_status_upper = multi_version_status_str; - std::ranges::transform(multi_version_status_upper, multi_version_status_upper.begin(), - ::toupper); - - if (multi_version_status_upper == "MULTI_VERSION_DISABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_DISABLED; - } else if (multi_version_status_upper == "MULTI_VERSION_WRITE_ONLY") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_WRITE_ONLY; - } else if (multi_version_status_upper == "MULTI_VERSION_READ_WRITE") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE; - } else if (multi_version_status_upper == "MULTI_VERSION_ENABLED") { - multi_version_status = MultiVersionStatus::MULTI_VERSION_ENABLED; - } else { - return http_json_reply( - MetaServiceCode::INVALID_ARGUMENT, - "invalid multi_version_status value. Supported values: MULTI_VERSION_DISABLED, " - "MULTI_VERSION_WRITE_ONLY, MULTI_VERSION_READ_WRITE, MULTI_VERSION_ENABLED"); - } - // Call snapshot manager directly - auto [code, msg] = service->snapshot_manager()->set_multi_version_status(instance_id, - multi_version_status); - - return http_json_reply(code, msg); -} - -static HttpResponse process_get_snapshot_property(MetaServiceImpl* service, - brpc::Controller* ctrl) { - auto& uri = ctrl->http_request().uri(); - std::string_view instance_id = http_query(uri, "instance_id"); - std::string_view cloud_unique_id = http_query(uri, "cloud_unique_id"); - - if (instance_id.empty() && cloud_unique_id.empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "empty instance_id and cloud_unique_id"); - } - - InstanceInfoPB instance; - auto [code, msg] = service->get_instance_info(std::string(instance_id), - std::string(cloud_unique_id), &instance); - if (code != MetaServiceCode::OK) { - return http_json_reply(code, msg); - } - - // Build snapshot properties response - rapidjson::Document doc; - doc.SetObject(); - - // Snapshot switch status - std::string_view switch_status; - switch (instance.snapshot_switch_status()) { - case SNAPSHOT_SWITCH_DISABLED: - switch_status = "UNSUPPORTED"; - break; - case SNAPSHOT_SWITCH_OFF: - switch_status = "DISABLED"; - break; - case SNAPSHOT_SWITCH_ON: - switch_status = "ENABLED"; - break; - default: - switch_status = "UNKNOWN"; - break; - } - doc.AddMember("status", rapidjson::StringRef(switch_status.data(), switch_status.size()), - doc.GetAllocator()); - - // Max reserved snapshots - if (instance.has_max_reserved_snapshot()) { - doc.AddMember("max_reserved_snapshots", instance.max_reserved_snapshot(), - doc.GetAllocator()); - } - - // Snapshot interval seconds - if (instance.has_snapshot_interval_seconds()) { - doc.AddMember("snapshot_interval_seconds", instance.snapshot_interval_seconds(), - doc.GetAllocator()); - } - - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - doc.Accept(writer); - - return http_json_reply(MetaServiceCode::OK, "", buffer.GetString()); -} - void MetaServiceImpl::http(::google::protobuf::RpcController* controller, const MetaServiceHttpRequest*, MetaServiceHttpResponse*, ::google::protobuf::Closure* done) { - using HttpHandler = HttpResponse (*)(MetaServiceImpl*, brpc::Controller*); - static std::unordered_map http_handlers { - // for alter cluster. - {"add_cluster", process_alter_cluster}, - {"drop_cluster", process_alter_cluster}, - {"rename_cluster", process_alter_cluster}, - {"update_cluster_endpoint", process_alter_cluster}, - {"update_cluster_mysql_user_name", process_alter_cluster}, - {"add_node", process_alter_cluster}, - {"drop_node", process_alter_cluster}, - {"decommission_node", process_alter_cluster}, - {"set_cluster_status", process_alter_cluster}, - {"notify_decommissioned", process_alter_cluster}, - {"alter_vcluster_info", process_alter_cluster}, - {"v1/add_cluster", process_alter_cluster}, - {"v1/drop_cluster", process_alter_cluster}, - {"v1/rename_cluster", process_alter_cluster}, - {"v1/update_cluster_endpoint", process_alter_cluster}, - {"v1/update_cluster_mysql_user_name", process_alter_cluster}, - {"v1/add_node", process_alter_cluster}, - {"v1/drop_node", process_alter_cluster}, - {"v1/decommission_node", process_alter_cluster}, - {"v1/set_cluster_status", process_alter_cluster}, - {"v1/alter_vcluster_info", process_alter_cluster}, - // for alter instance - {"create_instance", process_create_instance}, - {"drop_instance", process_alter_instance}, - {"rename_instance", process_alter_instance}, - {"enable_instance_sse", process_alter_instance}, - {"disable_instance_sse", process_alter_instance}, - {"set_instance_status", process_alter_instance}, - {"v1/create_instance", process_create_instance}, - {"v1/drop_instance", process_alter_instance}, - {"v1/rename_instance", process_alter_instance}, - {"v1/enable_instance_sse", process_alter_instance}, - {"v1/disable_instance_sse", process_alter_instance}, - {"v1/set_instance_status", process_alter_instance}, - // for alter obj store info - {"add_obj_info", process_alter_obj_store_info}, - {"legacy_update_ak_sk", process_alter_obj_store_info}, - {"update_ak_sk", process_update_ak_sk}, - {"v1/add_obj_info", process_alter_obj_store_info}, - {"v1/legacy_update_ak_sk", process_alter_obj_store_info}, - {"v1/update_ak_sk", process_update_ak_sk}, - {"show_storage_vaults", process_get_obj_store_info}, - {"add_hdfs_vault", process_alter_storage_vault}, - {"add_s3_vault", process_alter_storage_vault}, - {"alter_s3_vault", process_alter_storage_vault}, - {"drop_s3_vault", process_alter_storage_vault}, - {"drop_hdfs_vault", process_alter_storage_vault}, - {"alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_obj_info", process_alter_obj_store_info}, - {"v1/alter_s3_vault", process_alter_storage_vault}, - - // for tools - {"decode_key", process_decode_key}, - {"encode_key", process_encode_key}, - {"get_value", process_get_value}, - {"set_value", process_set_value}, - {"show_meta_ranges", process_show_meta_ranges}, - {"txn_lazy_commit", process_txn_lazy_commit}, - {"injection_point", process_injection_point}, - {"fix_tablet_stats", process_fix_tablet_stats}, - {"fix_tablet_db_id", process_fix_tablet_db_id}, - {"v1/decode_key", process_decode_key}, - {"v1/encode_key", process_encode_key}, - {"v1/get_value", process_get_value}, - {"v1/set_value", process_set_value}, - {"v1/show_meta_ranges", process_show_meta_ranges}, - {"v1/txn_lazy_commit", process_txn_lazy_commit}, - {"v1/injection_point", process_injection_point}, - {"v1/fix_tablet_stats", process_fix_tablet_stats}, - {"v1/fix_tablet_db_id", process_fix_tablet_db_id}, - // for get - {"get_instance", process_get_instance_info}, - {"get_obj_store_info", process_get_obj_store_info}, - {"get_cluster", process_get_cluster}, - {"get_tablet_stats", process_get_tablet_stats}, - {"get_stage", process_get_stage}, - {"get_cluster_status", process_get_cluster_status}, - {"v1/get_instance", process_get_instance_info}, - {"v1/get_obj_store_info", process_get_obj_store_info}, - {"v1/get_cluster", process_get_cluster}, - {"v1/get_tablet_stats", process_get_tablet_stats}, - {"v1/get_stage", process_get_stage}, - {"v1/get_cluster_status", process_get_cluster_status}, - // snapshot related - {"list_snapshot", process_list_snapshot}, - {"drop_snapshot", process_drop_snapshot}, - {"set_snapshot_property", process_set_snapshot_property}, - {"get_snapshot_property", process_get_snapshot_property}, - {"set_multi_version_status", process_set_multi_version_status}, - {"v1/list_snapshot", process_list_snapshot}, - {"v1/drop_snapshot", process_drop_snapshot}, - {"v1/set_snapshot_property", process_set_snapshot_property}, - {"v1/get_snapshot_property", process_get_snapshot_property}, - {"v1/set_multi_version_status", process_set_multi_version_status}, - {"compact_snapshot", process_compact_snapshot}, - {"v1/compact_snapshot", process_compact_snapshot}, - {"decouple_instance", process_decouple_instance}, - {"v1/decouple_instance", process_decouple_instance}, - // misc - {"abort_txn", process_abort_txn}, - {"abort_tablet_job", process_abort_tablet_job}, - {"alter_ram_user", process_alter_ram_user}, - {"alter_iam", process_alter_iam}, - {"adjust_rate_limit", process_adjust_rate_limit}, - {"list_rate_limit", process_query_rate_limit}, - {"update_config", process_update_config}, - {"show_config", process_show_config}, - {"v1/abort_txn", process_abort_txn}, - {"v1/abort_tablet_job", process_abort_tablet_job}, - {"v1/alter_ram_user", process_alter_ram_user}, - {"v1/alter_iam", process_alter_iam}, - {"v1/adjust_rate_limit", process_adjust_rate_limit}, - {"v1/list_rate_limit", process_query_rate_limit}, - {"v1/update_config", process_update_config}, - {"v1/show_config", process_show_config}, - }; - auto* cntl = static_cast(controller); brpc::ClosureGuard closure_guard(done); - // Prepare input request info LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); std::string http_request = format_http_request(cntl); std::string http_request_for_log = encryt_sk(http_request); http_request_for_log = hide_ak(http_request_for_log); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(remove_version_prefix(unresolved_path)); // Auth auto token = http_query(cntl->http_request().uri(), "token"); @@ -1075,15 +193,16 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, return; } - // Process http request - auto& unresolved_path = cntl->http_request().unresolved_path(); - HttpHandler handler = process_unknown; - auto it = http_handlers.find(unresolved_path); - if (it != http_handlers.end()) { - handler = it->second; + if (it == handlers.end() || + (it->second.role != HttpRole::META_SERVICE && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + return; } - auto [status_code, msg, body] = handler(this, cntl); + auto [status_code, msg, body] = it->second.handler(this, cntl); cntl->http_response().set_status_code(status_code); cntl->response_attachment().append(body); cntl->response_attachment().append("\n"); diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index 1dca1d3d64dd33..7a9d0044780065 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -77,4 +77,16 @@ inline static HttpResponse http_text_reply(const MetaServiceResponseStatus& stat return http_text_reply(status.code(), status.msg(), body); } +// Forward declarations +class MetaServiceImpl; +class RecyclerServiceImpl; + +// Role-based HTTP handler +enum class HttpRole { META_SERVICE = 1, RECYCLER = 2, BOTH = 3 }; + +struct HttpHandlerInfo { + std::function handler; + HttpRole role; +}; + } // namespace doris::cloud diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index d9342f7ee7b84a..e66f197edaabd9 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -37,8 +37,8 @@ #include "common/config.h" #include "common/configbase.h" #include "common/defer.h" +#include "common/http_helper.h" #include "common/logging.h" -#include "common/string_util.h" #include "common/util.h" #include "cpp/s3_rate_limiter.h" #include "meta-service/meta_service_http.h" @@ -507,217 +507,25 @@ void check_meta(const std::shared_ptr& txn_kv, const std::string& instanc #endif } -static HttpResponse process_recycle_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { - std::string request_body = cntl->request_attachment().to_string(); - RecycleInstanceRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - std::string msg = "failed to RecycleInstanceRequest, error: " + st.message().ToString(); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - RecycleInstanceResponse res; - service->recycle_instance(cntl, &req, &res, nullptr); - return http_text_reply(res.status(), res.status().msg()); -} - -static HttpResponse process_statistics_recycle(RecyclerServiceImpl* service, - brpc::Controller* cntl) { - std::string request_body = cntl->request_attachment().to_string(); - StatisticsRecycleRequest req; - auto st = google::protobuf::util::JsonStringToMessage(request_body, &req); - if (!st.ok()) { - std::string msg = "failed to StatisticsRecycleRequest, error: " + st.message().ToString(); - LOG(WARNING) << msg; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); - } - MetaServiceCode code = MetaServiceCode::OK; - std::string msg; - service->statistics_recycle(req, code, msg); - return http_text_reply(code, msg, msg); -} - -static HttpResponse process_recycle_copy_jobs(RecyclerServiceImpl* service, - brpc::Controller* cntl) { - const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); - } - MetaServiceCode code = MetaServiceCode::OK; - std::string msg; - recycle_copy_jobs(service->txn_kv(), *instance_id, code, msg, - service->recycler()->thread_pool_group(), service->txn_lazy_committer()); - return http_text_reply(code, msg, msg); -} - -static HttpResponse process_recycle_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { - const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); - } - MetaServiceCode code = MetaServiceCode::OK; - std::string msg; - std::string key; - job_recycle_key({*instance_id}, &key); - recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); - return http_text_reply(code, msg, msg); -} - -static HttpResponse process_check_instance(RecyclerServiceImpl* service, brpc::Controller* cntl) { - const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); - } - if (!service->checker()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "checker not enabled"); - } - MetaServiceCode code = MetaServiceCode::OK; - std::string msg; - service->check_instance(*instance_id, code, msg); - return http_text_reply(code, msg, msg); -} - -static HttpResponse process_check_job_info(RecyclerServiceImpl* service, brpc::Controller* cntl) { - const auto* instance_id = cntl->http_request().uri().GetQuery("instance_id"); - if (instance_id == nullptr || instance_id->empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "no instance id"); - } - MetaServiceCode code = MetaServiceCode::OK; - std::string msg; - std::string key; - job_check_key({*instance_id}, &key); - recycle_job_info(service->txn_kv(), *instance_id, key, code, msg); - return http_text_reply(code, msg, msg); -} - -static HttpResponse process_check_meta(RecyclerServiceImpl* service, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - const auto* instance_id = uri.GetQuery("instance_id"); - const auto* host = uri.GetQuery("host"); - const auto* port = uri.GetQuery("port"); - const auto* user = uri.GetQuery("user"); - const auto* password = uri.GetQuery("password"); - if (instance_id == nullptr || instance_id->empty() || host == nullptr || host->empty() || - port == nullptr || port->empty() || password == nullptr || user == nullptr || - user->empty()) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "no instance id or mysql conn str info"); - } - LOG(INFO) << " host " << *host << " port " << *port << " user " << *user << " instance " - << *instance_id; - std::string msg; - check_meta(service->txn_kv(), *instance_id, *host, *port, *user, *password, msg); - return http_text_reply(MetaServiceCode::OK, msg, msg); -} - -static HttpResponse process_adjust_rate_limiter(RecyclerServiceImpl*, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - const auto* type_string = uri.GetQuery("type"); - const auto* speed = uri.GetQuery("speed"); - const auto* burst = uri.GetQuery("burst"); - const auto* limit = uri.GetQuery("limit"); - if (type_string == nullptr || type_string->empty() || speed == nullptr || burst == nullptr || - limit == nullptr || (*type_string != "get" && *type_string != "put")) { - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "argument not suitable"); - } - auto max_speed = speed->empty() ? 0 : std::stoul(*speed); - auto max_burst = burst->empty() ? 0 : std::stoul(*burst); - auto max_limit = limit->empty() ? 0 : std::stoul(*limit); - if (0 != reset_s3_rate_limiter(string_to_s3_rate_limit_type(*type_string), max_speed, max_burst, - max_limit)) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "adjust failed"); - } - return http_json_reply(MetaServiceCode::OK, ""); -} - -static HttpResponse process_show_config(RecyclerServiceImpl*, brpc::Controller* cntl) { - const auto* conf_key_ptr = cntl->http_request().uri().GetQuery("conf_key"); - std::string conf_name = conf_key_ptr ? *conf_key_ptr : ""; - - if (config::full_conf_map == nullptr) { - return http_json_reply(MetaServiceCode::UNDEFINED_ERR, "config map not initialized"); - } - - rapidjson::Document d; - d.SetArray(); - for (auto& [name, field] : *config::Register::_s_field_map) { - if (!conf_name.empty() && name != conf_name) { - continue; - } - auto it = config::full_conf_map->find(name); - std::string value = (it != config::full_conf_map->end()) ? it->second : ""; - - rapidjson::Value entry(rapidjson::kArrayType); - entry.PushBack(rapidjson::Value(name.c_str(), d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(field.type, d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(value.c_str(), d.GetAllocator()), d.GetAllocator()); - entry.PushBack(rapidjson::Value(field.valmutable), d.GetAllocator()); - d.PushBack(entry, d.GetAllocator()); - } - rapidjson::StringBuffer sb; - rapidjson::PrettyWriter writer(sb); - d.Accept(writer); - return http_json_reply(MetaServiceCode::OK, "", sb.GetString()); -} - -static HttpResponse process_update_config(RecyclerServiceImpl*, brpc::Controller* cntl) { - const auto& uri = cntl->http_request().uri(); - bool persist = (uri.GetQuery("persist") != nullptr && *uri.GetQuery("persist") == "true"); - const auto* configs_ptr = uri.GetQuery("configs"); - const auto* reason_ptr = uri.GetQuery("reason"); - std::string configs = configs_ptr ? *configs_ptr : ""; - std::string reason = reason_ptr ? *reason_ptr : ""; - LOG(INFO) << "modify configs for reason=" << reason << ", configs=" << configs - << ", persist=" << persist; - if (configs.empty()) { - LOG(WARNING) << "query param `configs` should not be empty"; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - "query param `configs` should not be empty"); - } - std::unordered_map conf_map; - auto conf_list = split(configs, ','); - for (const auto& conf : conf_list) { - auto conf_pair = split(conf, '='); - if (conf_pair.size() != 2) { - LOG(WARNING) << "failed to split config=[" << conf << "] from `k=v` pattern"; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, - fmt::format("config {} is invalid", conf)); - } - trim(conf_pair[0]); - trim(conf_pair[1]); - conf_map.emplace(std::move(conf_pair[0]), std::move(conf_pair[1])); - } - if (auto [succ, cause] = - config::set_config(std::move(conf_map), persist, config::custom_conf_path); - !succ) { - LOG(WARNING) << cause; - return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, cause); - } - return http_json_reply(MetaServiceCode::OK, ""); -} - void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, const ::doris::cloud::MetaServiceHttpRequest*, ::doris::cloud::MetaServiceHttpResponse*, ::google::protobuf::Closure* done) { - using HttpHandler = HttpResponse (*)(RecyclerServiceImpl*, brpc::Controller*); - static const std::unordered_map http_handlers { - {"recycle_instance", process_recycle_instance}, - {"statistics_recycle", process_statistics_recycle}, - {"recycle_copy_jobs", process_recycle_copy_jobs}, - {"recycle_job_info", process_recycle_job_info}, - {"check_instance", process_check_instance}, - {"check_job_info", process_check_job_info}, - {"check_meta", process_check_meta}, - {"adjust_rate_limiter", process_adjust_rate_limiter}, - {"show_config", process_show_config}, - {"update_config", process_update_config}, - }; - auto* cntl = static_cast(controller); LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); brpc::ClosureGuard closure_guard(done); + const auto& unresolved_path = cntl->http_request().unresolved_path(); + const auto& handlers = get_http_handlers(); + auto it = handlers.find(remove_version_prefix(unresolved_path)); + if (it == handlers.end() || + (it->second.role != HttpRole::RECYCLER && it->second.role != HttpRole::BOTH)) { + std::string msg = "http path not found or not allowed"; + cntl->http_response().set_status_code(404); + cntl->response_attachment().append(msg); + cntl->response_attachment().append("\n"); + return; + } // Auth const auto* token = cntl->http_request().uri().GetQuery("token"); @@ -731,18 +539,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, return; } - const auto& unresolved_path = cntl->http_request().unresolved_path(); - auto it = http_handlers.find(unresolved_path); - if (it == http_handlers.end()) { - std::string msg = "http path " + cntl->http_request().uri().path() + - " not found, it may be not implemented"; - cntl->http_response().set_status_code(404); - cntl->response_attachment().append(msg); - cntl->response_attachment().append("\n"); - return; - } - - auto [status_code, msg, body] = it->second(this, cntl); + auto [status_code, msg, body] = it->second.handler(this, cntl); cntl->http_response().set_status_code(status_code); cntl->response_attachment().append(body); cntl->response_attachment().append("\n"); diff --git a/cloud/src/recycler/recycler_service.h b/cloud/src/recycler/recycler_service.h index a9a9f739f1e449..576dee85a2eba2 100644 --- a/cloud/src/recycler/recycler_service.h +++ b/cloud/src/recycler/recycler_service.h @@ -27,6 +27,8 @@ namespace doris::cloud { class Recycler; class Checker; +struct RecyclerThreadPoolGroup; +class TxnLazyCommitter; class RecyclerServiceImpl : public cloud::RecyclerService { public: @@ -63,4 +65,14 @@ class RecyclerServiceImpl : public cloud::RecyclerService { extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, size_t max_burst, size_t limit); +void recycle_copy_jobs(const std::shared_ptr& txn_kv, const std::string& instance_id, + MetaServiceCode& code, std::string& msg, + RecyclerThreadPoolGroup thread_pool_group, + std::shared_ptr txn_lazy_committer); +void recycle_job_info(const std::shared_ptr& txn_kv, const std::string& instance_id, + std::string_view key, MetaServiceCode& code, std::string& msg); +void check_meta(const std::shared_ptr& txn_kv, const std::string& instance_id, + const std::string& host, const std::string& port, const std::string& user, + const std::string& password, std::string& msg); + } // namespace doris::cloud diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 52fc38921f3c82..d0500bbfdac1d4 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -610,6 +610,74 @@ TEST(MetaServiceHttpTest, InstanceTestWithVersion) { } } +TEST(MetaServiceHttpTest, AlterClusterTestWithVersion) { + config::enable_cluster_name_check = true; + + HttpContext ctx; + { + CreateInstanceRequest req; + req.set_instance_id(mock_instance); + req.set_user_id("test_user"); + req.set_name("test_name"); + ObjectStoreInfoPB obj; + obj.set_ak("123"); + obj.set_sk("321"); + obj.set_bucket("456"); + obj.set_prefix("654"); + obj.set_endpoint("789"); + obj.set_region("987"); + obj.set_external_endpoint("888"); + obj.set_provider(ObjectStoreInfoPB::BOS); + req.mutable_obj_info()->CopyFrom(obj); + + auto [status_code, resp] = + ctx.forward("v1/create_instance", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_name(mock_cluster_name); + req.mutable_cluster()->set_type(ClusterPB::COMPUTE); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/add_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + GetClusterRequest req; + req.set_cloud_unique_id("1:" + mock_instance + ":xxxx"); + req.set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward_with_result("v1/get_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.status.code(), MetaServiceCode::OK); + ASSERT_TRUE(resp.result.has_value()); + ASSERT_EQ(resp.result->cluster_name(), mock_cluster_name); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + req.mutable_cluster()->set_cluster_name("rename_cluster_name"); + auto [status_code, resp] = ctx.forward("v1/rename_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } + + { + AlterClusterRequest req; + req.set_instance_id(mock_instance); + req.mutable_cluster()->set_cluster_id(mock_cluster_id); + auto [status_code, resp] = ctx.forward("v1/drop_cluster", req); + ASSERT_EQ(status_code, 200); + ASSERT_EQ(resp.code(), MetaServiceCode::OK); + } +} + TEST(MetaServiceHttpTest, AlterClusterTest) { config::enable_cluster_name_check = true; @@ -1397,8 +1465,8 @@ TEST(MetaServiceHttpTest, GetTabletStatsTest) { TEST(MetaServiceHttpTest, ToUnknownUrlTest) { HttpContext ctx; auto [status_code, content] = ctx.query("unkown_resource_xxxxxx", ""); - ASSERT_EQ(status_code, 200); - ASSERT_NE(content.find("\"code\": \"OK\""), std::string::npos); + ASSERT_EQ(status_code, 404); + ASSERT_EQ(content, "http path not found or not allowed\n"); } TEST(MetaServiceHttpTest, UnknownFields) { @@ -1820,7 +1888,7 @@ TEST(MetaServiceHttpTest, UpdateConfig) { { auto [status_code, content] = ctx.query("update_config", ""); ASSERT_EQ(status_code, 400); - std::string msg = "query param `config` should not be empty"; + std::string msg = "query param `configs` should not be empty"; ASSERT_NE(content.find(msg), std::string::npos); } {