Skip to content
Open
8 changes: 8 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ struct PAIMON_EXPORT Options {
/// Default value is false.
static const char DELETION_VECTORS_ENABLED[];

/// "deletion-vector.index-file.target-size" - The target size of deletion vector index file.
/// Default value is 2MB.
static const char DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[];

/// "deletion-vectors.bitmap64" - Enable 64 bit bitmap implementation. Note that only 64 bit
/// bitmap implementation is compatible with Iceberg. Default value is "false".
static const char DELETION_VECTOR_BITMAP64[];

/// @note `CHANGELOG_PRODUCER` currently only support `none`
///
/// "changelog-producer" - Whether to double write to a changelog file. This changelog file
Expand Down
11 changes: 11 additions & 0 deletions include/paimon/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum class StatusCode : char {
IOError = 5,
CapacityError = 6,
IndexError = 7,
Cancelled = 8,
UnknownError = 9,
NotImplemented = 10,
SerializationError = 11,
Expand Down Expand Up @@ -176,6 +177,12 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
return Status::FromArgs(StatusCode::IndexError, std::forward<Args>(args)...);
}

/// Return an error status for cancelled operation
template <typename... Args>
static Status Cancelled(Args&&... args) {
return Status::FromArgs(StatusCode::Cancelled, std::forward<Args>(args)...);
}

/// Return an error status when a container's capacity would exceed its limits
template <typename... Args>
static Status CapacityError(Args&&... args) {
Expand Down Expand Up @@ -223,6 +230,10 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
bool IsInvalid() const {
return code() == StatusCode::Invalid;
}
/// Return true iff the status indicates a cancelled operation.
bool IsCancelled() const {
return code() == StatusCode::Cancelled;
}
/// Return true iff the status indicates an IO-related failure.
bool IsIOError() const {
return code() == StatusCode::IOError;
Expand Down
5 changes: 5 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,14 @@ if(PAIMON_BUILD_TESTS)
core/catalog/file_system_catalog_test.cpp
core/catalog/catalog_test.cpp
core/catalog/identifier_test.cpp
core/compact/compact_deletion_file_test.cpp
core/core_options_test.cpp
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
core/deletionvectors/bitmap_deletion_vector_test.cpp
core/deletionvectors/bucketed_dv_maintainer_test.cpp
core/deletionvectors/deletion_file_writer_test.cpp
core/deletionvectors/deletion_vector_test.cpp
core/deletionvectors/deletion_vector_index_file_writer_test.cpp
core/deletionvectors/deletion_vectors_index_file_test.cpp
core/index/index_in_data_file_dir_path_factory_test.cpp
core/index/deletion_vector_meta_test.cpp
Expand Down Expand Up @@ -540,6 +543,7 @@ if(PAIMON_BUILD_TESTS)
core/manifest/partition_entry_test.cpp
core/manifest/file_entry_test.cpp
core/manifest/index_manifest_entry_serializer_test.cpp
core/manifest/index_manifest_file_handler_test.cpp
core/mergetree/levels_test.cpp
core/mergetree/lookup_file_test.cpp
core/mergetree/lookup_levels_test.cpp
Expand Down Expand Up @@ -586,6 +590,7 @@ if(PAIMON_BUILD_TESTS)
core/operation/append_only_file_store_scan_test.cpp
core/operation/key_value_file_store_scan_test.cpp
core/operation/file_store_scan_test.cpp
core/operation/file_system_write_restore_test.cpp
core/operation/file_store_write_test.cpp
core/operation/manifest_file_merger_test.cpp
core/operation/merge_file_split_read_test.cpp
Expand Down
14 changes: 14 additions & 0 deletions src/paimon/common/data/binary_row.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sstream>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -163,6 +164,19 @@ struct hash<std::pair<paimon::BinaryRow, int32_t>> {
}
};

/// for std::unordered_map<std::tuple<paimon::BinaryRow, int32_t, std::string>, ...>
template <>
struct hash<std::tuple<paimon::BinaryRow, int32_t, std::string>> {
size_t operator()(
const std::tuple<paimon::BinaryRow, int32_t, std::string>& partition_bucket_type) const {
const auto& [partition, bucket, index_type] = partition_bucket_type;
size_t hash = paimon::MurmurHashUtils::HashUnsafeBytes(
reinterpret_cast<const void*>(&bucket), 0, sizeof(bucket), partition.HashCode());
return paimon::MurmurHashUtils::HashUnsafeBytes(index_type.data(), 0, index_type.size(),
hash);
}
};

template <>
struct hash<paimon::BinaryRow> {
size_t operator()(const paimon::BinaryRow& row) const {
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const char Options::SORT_ENGINE[] = "sort-engine";
const char Options::IGNORE_DELETE[] = "ignore-delete";
const char Options::FIELDS_DEFAULT_AGG_FUNC[] = "fields.default-aggregate-function";
const char Options::DELETION_VECTORS_ENABLED[] = "deletion-vectors.enabled";
const char Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[] =
"deletion-vector.index-file.target-size";
const char Options::DELETION_VECTOR_BITMAP64[] = "deletion-vectors.bitmap64";
const char Options::CHANGELOG_PRODUCER[] = "changelog-producer";
const char Options::FORCE_LOOKUP[] = "force-lookup";
const char Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE[] =
Expand Down
5 changes: 4 additions & 1 deletion src/paimon/common/utils/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ std::string Status::CodeAsString(StatusCode code) {
case StatusCode::IndexError:
type = "Index error";
break;
case StatusCode::Cancelled:
type = "Cancelled";
break;
case StatusCode::UnknownError:
type = "Unknown error";
break;
Expand Down Expand Up @@ -129,7 +132,7 @@ void Status::Abort() const {
}

void Status::Abort(const std::string& message) const {
std::cerr << "-- Arrow Fatal Error --\n";
std::cerr << "-- Paimon Fatal Error --\n";
if (!message.empty()) {
std::cerr << message << "\n";
}
Expand Down
11 changes: 10 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
const std::vector<std::shared_ptr<DataFileMeta>>& restored,
const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, int32_t min_file_num,
int64_t target_file_size, int64_t compaction_file_size, bool force_rewrite_all_files,
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter)
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
const std::shared_ptr<std::atomic_bool>& cancel_flag)
: executor_(executor),
dv_maintainer_(dv_maintainer),
min_file_num_(min_file_num),
Expand All @@ -38,12 +39,18 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
[](const std::shared_ptr<DataFileMeta>& lhs, const std::shared_ptr<DataFileMeta>& rhs) {
return lhs->min_sequence_number > rhs->min_sequence_number;
}),
cancel_flag_(cancel_flag ? cancel_flag : std::make_shared<std::atomic_bool>(false)),
logger_(Logger::GetLogger("BucketedAppendCompactManager")) {
for (const auto& file : restored) {
to_compact_.push(file);
}
}

void BucketedAppendCompactManager::CancelCompaction() {
cancel_flag_->store(true, std::memory_order_relaxed);
CompactFutureManager::CancelCompaction();
}

Status BucketedAppendCompactManager::TriggerCompaction(bool full_compaction) {
if (full_compaction) {
PAIMON_RETURN_NOT_OK(TriggerFullCompaction());
Expand Down Expand Up @@ -71,6 +78,7 @@ Status BucketedAppendCompactManager::TriggerFullCompaction() {
compacting.push_back(to_compact_.top());
to_compact_.pop();
}
cancel_flag_->store(false, std::memory_order_relaxed);
auto compact_task = std::make_shared<FullCompactTask>(reporter_, dv_maintainer_, compacting,
compaction_file_size_,
force_rewrite_all_files_, rewriter_);
Expand All @@ -87,6 +95,7 @@ void BucketedAppendCompactManager::TriggerCompactionWithBestEffort() {
}
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> picked = PickCompactBefore();
if (picked) {
cancel_flag_->store(false, std::memory_order_relaxed);
compacting_ = picked.value();
auto compact_task = std::make_shared<AutoCompactTask>(reporter_, dv_maintainer_,
compacting_.value(), rewriter_);
Expand Down
7 changes: 6 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -72,9 +73,12 @@ class BucketedAppendCompactManager : public CompactFutureManager {
int32_t min_file_num, int64_t target_file_size,
int64_t compaction_file_size, bool force_rewrite_all_files,
CompactRewriter rewriter,
const std::shared_ptr<CompactionMetrics::Reporter>& reporter);
const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
const std::shared_ptr<std::atomic_bool>& cancel_flag);
~BucketedAppendCompactManager() override = default;

void CancelCompaction() override;

Status TriggerCompaction(bool full_compaction) override;

bool ShouldWaitForLatestCompaction() const override {
Expand Down Expand Up @@ -195,6 +199,7 @@ class BucketedAppendCompactManager : public CompactFutureManager {
std::shared_ptr<CompactionMetrics::Reporter> reporter_;
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> compacting_;
DataFileMetaPriorityQueue to_compact_;
std::shared_ptr<std::atomic_bool> cancel_flag_;
std::unique_ptr<Logger> logger_;
};

Expand Down
56 changes: 55 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

#include "paimon/core/append/bucketed_append_compact_manager.h"

#include <atomic>
#include <chrono>
#include <future>
#include <optional>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand All @@ -27,6 +31,7 @@
#include "paimon/core/stats/simple_stats.h"
#include "paimon/executor.h"
#include "paimon/result.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

Expand Down Expand Up @@ -70,7 +75,8 @@ class BucketedAppendCompactManagerTest : public testing::Test {
BucketedAppendCompactManager manager(
executor_, to_compact_before_pick,
/*dv_maintainer=*/nullptr, min_file_num, target_file_size, threshold,
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr);
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr,
/*cancel_flag=*/std::make_shared<std::atomic_bool>(false));
auto actual = manager.PickCompactBefore();
if (expected_present) {
ASSERT_TRUE(actual.has_value());
Expand Down Expand Up @@ -260,4 +266,52 @@ TEST_F(BucketedAppendCompactManagerTest, TestPick) {
NewFile(2601, 2610), NewFile(2611, 2620), NewFile(2621, 2630)});
}

TEST_F(BucketedAppendCompactManagerTest, TestCancelCompactionPropagatesToRewriteLoop) {
auto cancel_flag = std::make_shared<std::atomic_bool>(false);
auto exit_signal = std::make_shared<std::promise<void>>();
auto exit_future = exit_signal->get_future();

auto rewriter = [cancel_flag,
exit_signal](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> {
while (!cancel_flag->load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
exit_signal->set_value();
return Status::Invalid("compaction cancelled in rewrite loop");
};

BucketedAppendCompactManager manager(
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/700,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr, cancel_flag);

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
manager.CancelCompaction();

EXPECT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready);
}

TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag) {
auto cancel_flag = std::make_shared<std::atomic_bool>(true);
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

BucketedAppendCompactManager manager(
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/700,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr, cancel_flag);

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
EXPECT_FALSE(cancel_flag->load(std::memory_order_relaxed));
}

} // namespace paimon::test
5 changes: 5 additions & 0 deletions src/paimon/core/compact/compact_deletion_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

namespace paimon {

/// Deletion File from compaction.
class CompactDeletionFile {
public:
virtual ~CompactDeletionFile() = default;

/// Used by async compaction, when compaction task is completed, deletions file will be
/// generated immediately, so when updateCompactResult, we need to merge old deletion files
/// (just delete them).
static Result<std::shared_ptr<CompactDeletionFile>> GenerateFiles(
const std::shared_ptr<BucketedDvMaintainer>& maintainer);

Expand All @@ -41,6 +45,7 @@ class CompactDeletionFile {
virtual void Clean() = 0;
};

/// A generated files implementation of `CompactDeletionFile`.
class GeneratedDeletionFile : public CompactDeletionFile,
public std::enable_shared_from_this<GeneratedDeletionFile> {
public:
Expand Down
Loading
Loading