Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 42 additions & 104 deletions be/src/olap/collection_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <set>
#include <sstream>
#include <stack>

#include "common/exception.h"
#include "olap/rowset/rowset.h"
Expand Down Expand Up @@ -108,94 +109,15 @@ Status CollectionStatistics::collect(
return Status::OK();
}

vectorized::VSlotRef* find_slot_ref(const vectorized::VExprSPtr& expr) {
if (!expr) return nullptr;
auto cur = vectorized::VExpr::expr_without_cast(expr);
if (cur->node_type() == TExprNodeType::SLOT_REF) {
return static_cast<vectorized::VSlotRef*>(cur.get());
}
for (auto& ch : cur->children()) {
if (auto* s = find_slot_ref(ch)) return s;
}
return nullptr;
}

Status handle_match_pred(RuntimeState* state, const TabletSchemaSPtr& tablet_schema,
const vectorized::VExprSPtr& expr,
std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
auto* left_slot_ref = find_slot_ref(expr->children()[0]);
if (left_slot_ref == nullptr) {
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"Index statistics collection failed: Cannot find slot reference in match predicate "
"left expression");
}
auto* right_literal = static_cast<vectorized::VLiteral*>(expr->children()[1].get());
DCHECK(right_literal != nullptr);

const auto* sd = state->desc_tbl().get_slot_descriptor(left_slot_ref->slot_id());
if (sd == nullptr) {
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"Index statistics collection failed: Cannot find slot descriptor for slot_id={}",
left_slot_ref->slot_id());
}
int32_t col_idx = tablet_schema->field_index(left_slot_ref->column_name());
if (col_idx == -1) {
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"Index statistics collection failed: Cannot find column index for column={}",
left_slot_ref->column_name());
}

const auto& column = tablet_schema->column(col_idx);
auto index_metas = tablet_schema->inverted_indexs(sd->col_unique_id(), column.suffix_path());
#ifndef BE_TEST
if (index_metas.empty()) {
return Status::Error<ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
"Index statistics collection failed: Score query is not supported without inverted "
"index for column={}",
left_slot_ref->column_name());
}
#endif

auto format_options = vectorized::DataTypeSerDe::get_default_format_options();
format_options.timezone = &state->timezone_obj();
for (const auto* index_meta : index_metas) {
if (!InvertedIndexAnalyzer::should_analyzer(index_meta->properties())) {
continue;
}
if (!segment_v2::IndexReaderHelper::is_need_similarity_score(expr->op(), index_meta)) {
continue;
}

auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
right_literal->value(format_options), index_meta->properties());
if (term_infos.empty()) {
LOG(WARNING) << "Index statistics collection: no terms extracted from literal value, "
<< "col_unique_id=" << index_meta->col_unique_ids()[0];
continue;
}

std::string field_name = std::to_string(index_meta->col_unique_ids()[0]);
if (!column.suffix_path().empty()) {
field_name += "." + column.suffix_path();
}
std::wstring ws_field_name = StringHelper::to_wstring(field_name);
auto iter = collect_infos->find(ws_field_name);
if (iter == collect_infos->end()) {
CollectInfo collect_info;
collect_info.term_infos.insert(term_infos.begin(), term_infos.end());
collect_info.index_meta = index_meta;
(*collect_infos)[ws_field_name] = std::move(collect_info);
} else {
iter->second.term_infos.insert(term_infos.begin(), term_infos.end());
}
}
return Status::OK();
}

Status CollectionStatistics::extract_collect_info(
RuntimeState* state, const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
const TabletSchemaSPtr& tablet_schema,
std::unordered_map<std::wstring, CollectInfo>* collect_infos) {
const TabletSchemaSPtr& tablet_schema, CollectInfoMap* collect_infos) {
DCHECK(collect_infos != nullptr);

std::unordered_map<TExprNodeType::type, PredicateCollectorPtr> collectors;
collectors[TExprNodeType::MATCH_PRED] = std::make_unique<MatchPredicateCollector>();
collectors[TExprNodeType::SEARCH_EXPR] = std::make_unique<SearchPredicateCollector>();

for (const auto& root_expr_ctx : common_expr_ctxs_push_down) {
const auto& root_expr = root_expr_ctx->root();
if (root_expr == nullptr) {
Expand All @@ -206,27 +128,35 @@ Status CollectionStatistics::extract_collect_info(
stack.emplace(root_expr);

while (!stack.empty()) {
const auto& expr = stack.top();
auto expr = stack.top();
stack.pop();

if (expr->node_type() == TExprNodeType::MATCH_PRED) {
RETURN_IF_ERROR(handle_match_pred(state, tablet_schema, expr, collect_infos));
if (!expr) {
continue;
}

auto collector_it = collectors.find(expr->node_type());
if (collector_it != collectors.end()) {
RETURN_IF_ERROR(
collector_it->second->collect(state, tablet_schema, expr, collect_infos));
}

const auto& children = expr->children();
for (int32_t i = static_cast<int32_t>(children.size()) - 1; i >= 0; --i) {
if (!children[i]->children().empty()) {
stack.emplace(children[i]);
}
for (const auto& child : children) {
stack.push(child);
}
}
}

LOG(INFO) << "Extracted collect info for " << collect_infos->size() << " fields";

return Status::OK();
}

Status CollectionStatistics::process_segment(
const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* tablet_schema,
const std::unordered_map<std::wstring, CollectInfo>& collect_infos, io::IOContext* io_ctx) {
Status CollectionStatistics::process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
const TabletSchema* tablet_schema,
const CollectInfoMap& collect_infos,
io::IOContext* io_ctx) {
auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
auto rowset_meta = rowset->rowset_meta();

Expand All @@ -238,36 +168,42 @@ Status CollectionStatistics::process_segment(
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));

int32_t total_seg_num_docs = 0;

for (const auto& [ws_field_name, collect_info] : collect_infos) {
lucene::search::IndexSearcher* index_searcher = nullptr;
lucene::index::IndexReader* index_reader = nullptr;

#ifdef BE_TEST
auto compound_reader = DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
auto* reader = lucene::index::IndexReader::open(compound_reader.get());
auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);

auto* index_reader = index_searcher->getReader();
auto searcher_ptr = std::make_shared<lucene::search::IndexSearcher>(reader, true);
index_searcher = searcher_ptr.get();
index_reader = index_searcher->getReader();
#else
InvertedIndexCacheHandle inverted_index_cache_handle;
auto index_file_key = idx_file_reader->get_index_file_cache_key(collect_info.index_meta);
InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key);

if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
&inverted_index_cache_handle)) {
auto compound_reader =
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
auto* reader = lucene::index::IndexReader::open(compound_reader.get());
size_t reader_size = reader->getTermInfosRAMUsed();
auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(reader, true);
auto searcher_ptr = std::make_shared<lucene::search::IndexSearcher>(reader, true);
auto* cache_value = new InvertedIndexSearcherCache::CacheValue(
std::move(index_searcher), reader_size, UnixMillis());
std::move(searcher_ptr), reader_size, UnixMillis());
InvertedIndexSearcherCache::instance()->insert(searcher_cache_key, cache_value,
&inverted_index_cache_handle);
}

auto searcher_variant = inverted_index_cache_handle.get_index_searcher();
auto index_searcher = std::get<FulltextIndexSearcherPtr>(searcher_variant);
auto* index_reader = index_searcher->getReader();
auto index_searcher_ptr = std::get<FulltextIndexSearcherPtr>(searcher_variant);
index_searcher = index_searcher_ptr.get();
index_reader = index_searcher->getReader();
#endif

total_seg_num_docs = std::max(total_seg_num_docs, index_reader->maxDoc());

_total_num_tokens[ws_field_name] +=
index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);

Expand All @@ -277,7 +213,9 @@ Status CollectionStatistics::process_segment(
_term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
}
}

_total_num_docs += total_seg_num_docs;

return Status::OK();
}

Expand Down
19 changes: 4 additions & 15 deletions be/src/olap/collection_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/be_mock_util.h"
#include "olap/olap_common.h"
#include "olap/predicate_collector.h"
#include "olap/rowset/segment_v2/inverted_index/query/query_info.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr_fwd.h"
Expand All @@ -44,18 +45,6 @@ class TabletIndex;
class TabletSchema;
using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;

struct TermInfoComparer {
bool operator()(const segment_v2::TermInfo& lhs, const segment_v2::TermInfo& rhs) const {
return lhs.term < rhs.term;
}
};

class CollectInfo {
public:
std::set<segment_v2::TermInfo, TermInfoComparer> term_infos;
const TabletIndex* index_meta = nullptr;
};

class CollectionStatistics {
public:
CollectionStatistics() = default;
Expand All @@ -74,10 +63,9 @@ class CollectionStatistics {
Status extract_collect_info(RuntimeState* state,
const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
const TabletSchemaSPtr& tablet_schema,
std::unordered_map<std::wstring, CollectInfo>* collect_infos);
CollectInfoMap* collect_infos);
Status process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
const TabletSchema* tablet_schema,
const std::unordered_map<std::wstring, CollectInfo>& collect_infos,
const TabletSchema* tablet_schema, const CollectInfoMap& collect_infos,
io::IOContext* io_ctx);

uint64_t get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
Expand All @@ -95,6 +83,7 @@ class CollectionStatistics {
MOCK_DEFINE(friend class BM25SimilarityTest;)
MOCK_DEFINE(friend class CollectionStatisticsTest;)
MOCK_DEFINE(friend class BooleanQueryTest;)
MOCK_DEFINE(friend class OccurBooleanQueryTest;)
};
using CollectionStatisticsPtr = std::shared_ptr<CollectionStatistics>;

Expand Down
Loading
Loading