Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cpp/src/common/path.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "common/path.h"

#include "constant/tsfile_constant.h"

#ifdef ENABLE_ANTLR4
#include "parser/path_nodes_generator.h"
#endif
Expand Down Expand Up @@ -47,10 +49,20 @@ Path::Path(const std::string& path_sc, bool if_split) {
IDeviceID::split_string(path_sc, '.');
#endif
if (nodes.size() > 1) {
device_id_ = std::make_shared<StringArrayDeviceID>(
std::vector<std::string>(nodes.begin(), nodes.end() - 1));
// Build device_id from the same string form as writers use so
// StringArrayDeviceID::split_device_id_string canonicalizes
// multi-segment paths (e.g. root.sg1.FeederA) consistently.
std::string device_str;
for (size_t j = 0; j + 1 < nodes.size(); ++j) {
if (j > 0) {
device_str += PATH_SEPARATOR;
}
device_str += nodes[j];
}
device_id_ = std::make_shared<StringArrayDeviceID>(device_str);
measurement_ = nodes[nodes.size() - 1];
full_path_ = device_id_->get_device_name() + "." + measurement_;
full_path_ = device_id_->get_device_name() + PATH_SEPARATOR +
measurement_;
} else {
full_path_ = path_sc;
device_id_ = std::make_shared<StringArrayDeviceID>();
Expand Down
11 changes: 10 additions & 1 deletion cpp/src/file/tsfile_io_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "file/tsfile_io_reader.h"

#include <algorithm>
#include <vector>

#include "common/allocator/alloc_base.h"

using namespace common;
Expand Down Expand Up @@ -457,8 +460,14 @@ int TsFileIOReader::get_timeseries_indexes(
get_time_column_metadata(top_node, timeseries_index, pa);
}

// Sort names so slot order is identical on all platforms (unordered_set
// iteration order differs between libc++ and libstdc++).
std::vector<std::string> sorted_names(measurement_names.begin(),
measurement_names.end());
std::sort(sorted_names.begin(), sorted_names.end());

int64_t idx = 0;
for (const auto& measurement_name : measurement_names) {
for (const auto& measurement_name : sorted_names) {
if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node,
measurement_index_entry,
measurement_ie_end_offset))) {
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/reader/block/single_device_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "single_device_tsblock_reader.h"

#include <unordered_set>

namespace storage {

SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
Expand Down Expand Up @@ -139,14 +141,20 @@ int SingleDeviceTsBlockReader::init_internal(DeviceQueryTask* device_query_task,
col_appenders_[i] = new common::ColAppender(i, current_block_);
}
row_appender_ = new common::RowAppender(current_block_);
std::vector<ITimeseriesIndex*> time_series_indexs(
device_query_task_->get_column_mapping()
->get_measurement_columns()
.size());
// ColumnMapping already uses unordered_set; iteration order matches
// TsFileIOReader::get_timeseries_indexes (unchanged from prior behavior).
std::unordered_set<std::string> meas_names(
device_query_task_->get_column_mapping()->get_measurement_columns());
if (meas_names.empty()) {
for (const auto& f :
device_query_task_->get_internal_row_scan_fields()) {
meas_names.insert(f);
}
}
std::vector<ITimeseriesIndex*> time_series_indexs(meas_names.size());
if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(
device_query_task->get_device_id(),
device_query_task->get_column_mapping()->get_measurement_columns(),
time_series_indexs, pa_))) {
device_query_task->get_device_id(), meas_names, time_series_indexs,
pa_))) {
return ret;
}

Expand Down
77 changes: 53 additions & 24 deletions cpp/src/reader/qds_without_timegenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "qds_without_timegenerator.h"

#include "utils/errno_define.h"
#include "utils/util_define.h"

using namespace common;
Expand Down Expand Up @@ -49,9 +50,8 @@ int QDSWithoutTimeGenerator::init_internal(TsFileIOReader* io_reader,
io_reader_ = io_reader;
qe_ = qe;

std::vector<Path> paths = qe_->selected_series_;
size_t origin_path_count = paths.size();
std::vector<Path> valid_paths;
const std::vector<Path> paths = qe_->selected_series_;
const size_t origin_path_count = paths.size();
std::vector<std::string> column_names;
std::vector<common::TSDataType> data_types;
column_names.reserve(origin_path_count);
Expand All @@ -61,52 +61,71 @@ int QDSWithoutTimeGenerator::init_internal(TsFileIOReader* io_reader,
if (global_time_expression != nullptr) {
global_time_filter = global_time_expression->filter_;
}
index_lookup_.clear();
index_lookup_.insert({"time", 0});
ssi_vec_.clear();
path_has_ssi_.clear();
path_has_ssi_.reserve(origin_path_count);

auto register_path_columns = [&](size_t i, const Path& p) {
column_names.push_back(p.full_path_);
const uint32_t col_idx = static_cast<uint32_t>(i) + 1;
index_lookup_.insert({p.measurement_, col_idx});
if (p.full_path_ != p.measurement_) {
index_lookup_.insert({p.full_path_, col_idx});
}
};

for (size_t i = 0; i < origin_path_count; i++) {
TsFileSeriesScanIterator* ssi = nullptr;
ret = io_reader_->alloc_ssi(paths[i].device_id_, paths[i].measurement_,
ssi, pa_, global_time_filter);
if (ret == E_MEASUREMENT_NOT_EXIST) {
ssi_vec_.push_back(nullptr);
path_has_ssi_.push_back(0);
register_path_columns(i, paths[i]);
continue;
}
if (ret != 0) {
return ret;
} else {
index_lookup_.insert({paths[i].measurement_, i + 1});
if (paths[i].full_path_ != paths[i].measurement_) {
index_lookup_.insert({paths[i].full_path_, i + 1});
}
ssi_vec_.push_back(ssi);
valid_paths.push_back(paths[i]);
column_names.push_back(paths[i].full_path_);
}
ssi_vec_.push_back(ssi);
path_has_ssi_.push_back(1);
register_path_columns(i, paths[i]);
}

size_t path_count = valid_paths.size();
const size_t path_count = origin_path_count;
is_single_path_ = (path_count == 1);
// Only push offset/limit to SSI for single-path; multi-path applies at
// merge.
for (size_t i = 0; i < path_count; i++) {
ssi_vec_[i]->set_row_range(is_single_path_ ? remaining_offset_ : 0,
is_single_path_ ? remaining_limit_ : -1);
}
row_record_ = new RowRecord(path_count + 1);

tsblocks_.resize(path_count);
time_iters_.resize(path_count);
value_iters_.resize(path_count);

for (size_t i = 0; i < path_count; i++) {
if (!path_has_ssi_[i]) {
tsblocks_[i] = nullptr;
time_iters_[i] = nullptr;
value_iters_[i] = nullptr;
data_types.push_back(TSDataType::NULL_TYPE);
continue;
}
ssi_vec_[i]->set_row_range(is_single_path_ ? remaining_offset_ : 0,
is_single_path_ ? remaining_limit_ : -1);
get_next_tsblock(i, true);
data_types.push_back(value_iters_[i] != nullptr
? value_iters_[i]->get_data_type()
: TSDataType::NULL_TYPE);
}
row_record_ = new RowRecord(path_count + 1);
// Single-path: SSI may have consumed offset/limit by skipping chunks/pages
// during first get_next_tsblock(); sync so QDS does not double-apply.
if (is_single_path_) {
if (is_single_path_ && path_count > 0 && path_has_ssi_[0]) {
remaining_offset_ = ssi_vec_[0]->get_row_offset();
remaining_limit_ = ssi_vec_[0]->get_row_limit();
}
result_set_metadata_ =
std::make_shared<ResultSetMetadata>(column_names, data_types);
return E_OK; // ignore invalid timeseries
return E_OK;
}

void QDSWithoutTimeGenerator::close() {
Expand All @@ -128,13 +147,17 @@ void QDSWithoutTimeGenerator::close() {

ASSERT(ssi_vec_.size() == tsblocks_.size());
for (size_t i = 0; i < ssi_vec_.size(); i++) {
ssi_vec_[i]->revert_tsblock();
if (path_has_ssi_[i]) {
ssi_vec_[i]->revert_tsblock();
}
}
for (size_t i = 0; i < ssi_vec_.size(); i++) {
TsFileSeriesScanIterator* ssi = ssi_vec_[i];
io_reader_->revert_ssi(ssi);
if (path_has_ssi_[i]) {
io_reader_->revert_ssi(ssi_vec_[i]);
}
}
ssi_vec_.clear();
path_has_ssi_.clear();
if (qe_ != nullptr) {
delete qe_;
qe_ = nullptr;
Expand Down Expand Up @@ -283,6 +306,9 @@ std::shared_ptr<ResultSetMetadata> QDSWithoutTimeGenerator::get_metadata() {
}

int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) {
if (index >= ssi_vec_.size() || ssi_vec_[index] == nullptr) {
return E_OK;
}
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
Expand Down Expand Up @@ -320,6 +346,9 @@ int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) {
int QDSWithoutTimeGenerator::get_next_tsblock_with_hint(uint32_t index,
bool alloc_mem,
int64_t min_time_hint) {
if (index >= ssi_vec_.size() || ssi_vec_[index] == nullptr) {
return E_OK;
}
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/reader/qds_without_timegenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class QDSWithoutTimeGenerator : public ResultSet {
time_iters_(),
value_iters_(),
heap_time_(),
path_has_ssi_(),
remaining_offset_(0),
remaining_limit_(-1),
is_single_path_(false) {}
Expand Down Expand Up @@ -70,6 +71,9 @@ class QDSWithoutTimeGenerator : public ResultSet {
std::vector<common::ColIterator*> value_iters_;
std::multimap<int64_t, uint32_t>
heap_time_; // key-->time, value-->path_index
/** Same length as ssi_vec_; false if measurement missing (Java
* EmptyFileSeriesReader). */
std::vector<uint8_t> path_has_ssi_;
int remaining_offset_;
int remaining_limit_;
bool is_single_path_;
Expand Down
59 changes: 55 additions & 4 deletions cpp/src/reader/table_query_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,44 @@

#include "reader/table_query_executor.h"

#include <vector>

#include "common/db_common.h"
#include "utils/db_utils.h"

namespace storage {
namespace {

/** When the query selects no FIELD columns (TAG-only), drive row scan from the
* table's VECTOR time axis if unambiguous, else all FIELD series (aligned
* dense row count), instead of only the first FIELD column. */
std::vector<std::string> collect_tag_only_scan_fields(
const std::shared_ptr<TableSchema>& table_schema) {
std::vector<std::string> all_fields;
std::vector<std::string> vector_fields;
const auto categories = table_schema->get_column_categories();
const auto& schemas = table_schema->get_measurement_schemas();
for (size_t i = 0; i < schemas.size() && i < categories.size(); ++i) {
if (categories[i] != common::ColumnCategory::FIELD) {
continue;
}
std::string name = schemas[i]->measurement_name_;
to_lowercase_inplace(name);
all_fields.push_back(name);
if (schemas[i]->data_type_ == common::TSDataType::VECTOR) {
vector_fields.push_back(name);
}
}
if (vector_fields.size() == 1U) {
return vector_fields;
}
if (!vector_fields.empty()) {
return vector_fields;
}
return all_fields;
}

} // namespace
int TableQueryExecutor::query(const std::string& table_name,
const std::vector<std::string>& columns,
Filter* time_filter, Filter* tag_filter,
Expand Down Expand Up @@ -136,10 +171,26 @@ int TableQueryExecutor::query(const std::string& table_name,
data_types.push_back(table_schema->get_data_types()[ind]);
}

auto device_task_iterator =
std::unique_ptr<DeviceTaskIterator>(new DeviceTaskIterator(
lower_case_column_names, table_root, column_mapping,
meta_data_querier_, tag_filter, table_schema));
std::vector<std::string> internal_row_scan_fields;
const auto column_categories = table_schema->get_column_categories();
bool selection_has_field = false;
for (const auto& col : lower_case_column_names) {
int ind = table_schema->find_column_index(col);
if (ind >= 0 && static_cast<size_t>(ind) < column_categories.size() &&
column_categories[static_cast<size_t>(ind)] ==
common::ColumnCategory::FIELD) {
selection_has_field = true;
break;
}
}
if (!selection_has_field) {
internal_row_scan_fields = collect_tag_only_scan_fields(table_schema);
}

auto device_task_iterator = std::unique_ptr<DeviceTaskIterator>(
new DeviceTaskIterator(lower_case_column_names, table_root,
column_mapping, meta_data_querier_, tag_filter,
table_schema, internal_row_scan_fields));

std::unique_ptr<TsBlockReader> tsblock_reader;
switch (table_query_ordering_) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/reader/task/device_query_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ namespace storage {
DeviceQueryTask* DeviceQueryTask::create_device_query_task(
std::shared_ptr<IDeviceID> device_id, std::vector<std::string> column_names,
std::shared_ptr<ColumnMapping> column_mapping, MetaIndexNode* index_root,
std::shared_ptr<TableSchema> table_schema, common::PageArena& pa) {
std::shared_ptr<TableSchema> table_schema, common::PageArena& pa,
const std::vector<std::string>& internal_row_scan_fields) {
void* buf = pa.alloc(sizeof(DeviceQueryTask));
if (UNLIKELY(buf == nullptr)) {
return nullptr;
}
DeviceQueryTask* task = new (buf) DeviceQueryTask(
device_id, column_names, column_mapping, index_root, table_schema);
device_id, std::move(column_names), std::move(column_mapping),
index_root, table_schema, internal_row_scan_fields);
return task;
}

Expand Down
Loading
Loading