Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions be/src/core/data_type_serde/data_type_quantilestate_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,26 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe {

Status write_column_to_pb(const IColumn& column, PValues& result, int64_t start,
int64_t end) const override {
auto ptype = result.mutable_type();
ptype->set_id(PGenericType::QUANTILE_STATE);
const auto& col = assert_cast<const ColumnQuantileState&>(column);
result.mutable_bytes_value()->Reserve(cast_set<int>(end - start));
for (size_t row_num = start; row_num < end; ++row_num) {
StringRef data = column.get_data_at(row_num);
result.add_bytes_value(data.to_string());
const auto& val = col.get_element(row_num);
size_t size = val.get_serialized_size();
std::string buf(size, '\0');
val.serialize(reinterpret_cast<uint8_t*>(buf.data()));
result.add_bytes_value(std::move(buf));
}
return Status::OK();
}
Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
column.reserve(arg.bytes_value_size());
auto& col = assert_cast<ColumnQuantileState&>(column);
col.reserve(arg.bytes_value_size());
for (int i = 0; i < arg.bytes_value_size(); ++i) {
column.insert_data(arg.bytes_value(i).c_str(), arg.bytes_value(i).size());
QuantileState value;
value.deserialize(Slice(arg.bytes_value(i)));
col.insert_value(std::move(value));
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
// under the License.

#include <arrow/array/builder_base.h>
#include <gen_cpp/types.pb.h>
#include <gtest/gtest.h>

#include <cmath>

#include "core/column/column_complex.h"
#include "core/data_type_serde/data_type_quantilestate_serde.h"
#include "util/jsonb_writer.h"
Expand Down Expand Up @@ -62,28 +65,65 @@ TEST(QuantileStateSerdeTest, writeOneCellToJsonb) {
TEST(QuantileStateSerdeTest, writeColumnToPb) {
auto quantile_state_serde = std::make_shared<DataTypeQuantileStateSerDe>(1);
auto column_quantile_state = ColumnQuantileState::create();

// EMPTY state
column_quantile_state->insert_value(QuantileState());
QuantileState quantile_state;
quantile_state.add_value(123);
column_quantile_state->insert_value(quantile_state);
ASSERT_EQ(column_quantile_state->size(), 2);

// SINGLE state
QuantileState single_state;
single_state.add_value(123.0);
column_quantile_state->insert_value(single_state);

// EXPLICIT state (2~2048 values)
QuantileState explicit_state;
explicit_state.add_value(1.0);
explicit_state.add_value(2.0);
explicit_state.add_value(3.0);
column_quantile_state->insert_value(explicit_state);

// TDIGEST state (>2048 values triggers conversion)
QuantileState tdigest_state;
for (int i = 0; i < QUANTILE_STATE_EXPLICIT_NUM + 1; ++i) {
tdigest_state.add_value(static_cast<double>(i));
}
column_quantile_state->insert_value(tdigest_state);

ASSERT_EQ(column_quantile_state->size(), 4);

PValues pv = PValues();
Status st = quantile_state_serde->write_column_to_pb(*column_quantile_state, pv, 0,
column_quantile_state->size());
EXPECT_TRUE(st.ok());

// assert protobuf type id
EXPECT_EQ(pv.type().id(), PGenericType::QUANTILE_STATE);

auto except_column = ColumnQuantileState::create();
st = quantile_state_serde->read_column_from_pb(*except_column, pv);
EXPECT_TRUE(st.ok()) << st.to_string();
// check pb value from expected column
PValues as_pv = PValues();
st = quantile_state_serde->write_column_to_pb(*except_column, as_pv, 0, except_column->size());
EXPECT_TRUE(st.ok()) << st.to_string();
EXPECT_EQ(pv.bytes_value_size(), as_pv.bytes_value_size());
// check column value
for (size_t j = 0; j < column_quantile_state->size(); ++j) {
EXPECT_EQ(column_quantile_state->get_data_at(j), except_column->get_data_at(j));
}
ASSERT_EQ(except_column->size(), column_quantile_state->size());

// check column values via logical percentile comparison
// EMPTY
EXPECT_TRUE(std::isnan(column_quantile_state->get_element(0).get_value_by_percentile(1.0)));
EXPECT_TRUE(std::isnan(except_column->get_element(0).get_value_by_percentile(1.0)));

// SINGLE
EXPECT_EQ(column_quantile_state->get_element(1).get_value_by_percentile(1.0),
except_column->get_element(1).get_value_by_percentile(1.0));

// EXPLICIT
EXPECT_EQ(column_quantile_state->get_element(2).get_value_by_percentile(1.0),
except_column->get_element(2).get_value_by_percentile(1.0));
EXPECT_EQ(column_quantile_state->get_element(2).get_value_by_percentile(0.5),
except_column->get_element(2).get_value_by_percentile(0.5));

// TDIGEST
EXPECT_DOUBLE_EQ(column_quantile_state->get_element(3).get_value_by_percentile(0.5),
except_column->get_element(3).get_value_by_percentile(0.5));
EXPECT_DOUBLE_EQ(column_quantile_state->get_element(3).get_value_by_percentile(1.0),
except_column->get_element(3).get_value_by_percentile(1.0));

std::cout << "test write/read_column_to_pb" << std::endl;
}

Expand Down
Loading