Skip to content

Write with parallel#772

Merged
ColinLeeo merged 6 commits intodevelopfrom
write_with_parallel
Apr 10, 2026
Merged

Write with parallel#772
ColinLeeo merged 6 commits intodevelopfrom
write_with_parallel

Conversation

@ColinLeeo
Copy link
Copy Markdown
Contributor

@ColinLeeo ColinLeeo commented Apr 9, 2026

This change introduces thread pool-based write parallelization to the TsFile C++ write path.

Threading is controlled at compile time via the ENABLE_THREADS CMake option, with parallel_write_enabled_ and write_thread_count_ added to ConfigValue for runtime control.

The core approach: write_table() first sorts the Tablet by device (via the new Tablet::sort_by_device()) to make same-device rows contiguous, then precomputes page boundaries from page_writer_max_point_num_ and the current time page's existing point count, and finally dispatches the time column and all value columns into the thread pool in parallel — each column writes the same row segments and explicitly seals pages at the precomputed boundaries, preserving time/value page alignment required by the aligned model.

On the testing side, TsFileWriterTableTest has been converted to a gtest parameterized test, running both Serial
(parallel_write_enabled_=false) and Parallel (parallel_write_enabled_=true) suites — all 34 test cases pass under a single build configuration.

@ColinLeeo ColinLeeo force-pushed the write_with_parallel branch from 38b7336 to d0cf8a4 Compare April 9, 2026 13:17
@ColinLeeo ColinLeeo force-pushed the write_with_parallel branch from d0cf8a4 to 9207296 Compare April 9, 2026 13:24
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 10, 2026

Codecov Report

❌ Patch coverage is 80.80808% with 19 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.63%. Comparing base (e2fba46) to head (f311bf4).

Files with missing lines Patch % Lines
cpp/src/writer/tsfile_writer.cc 74.24% 5 Missing and 12 partials ⚠️
cpp/src/common/thread_pool.h 91.66% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #772      +/-   ##
===========================================
+ Coverage    62.46%   62.63%   +0.17%     
===========================================
  Files          705      706       +1     
  Lines        42677    42672       -5     
  Branches      6308     6287      -21     
===========================================
+ Hits         26658    26728      +70     
+ Misses       15040    14960      -80     
- Partials       979      984       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces optional thread pool–based parallelization to the TsFile C++ table write path, alongside new configuration knobs and updated tests to validate both serial and parallel modes.

Changes:

  • Adds ENABLE_THREADS build option plus runtime config flags (parallel_write_enabled_, write_thread_count_) to control column-parallel writes.
  • Refactors TsFileWriter::write_table() to write time/value columns in parallel using precomputed page boundaries.
  • Converts TsFileWriterTableTest to a parameterized gtest suite that runs in both serial and parallel configurations.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
cpp/test/writer/table_view/tsfile_writer_table_test.cc Parameterizes tests to run with parallel write enabled/disabled.
cpp/src/writer/tsfile_writer.h Adds a per-writer thread pool member under ENABLE_THREADS.
cpp/src/writer/tsfile_writer.cc Implements segmented column writes and parallel dispatch in write_table().
cpp/src/common/thread_pool.h Adds a fixed-size thread pool implementation (new file).
cpp/src/common/tablet.h Exposes small accessors (get_timestamp, is_null) and makes class section explicitly public.
cpp/src/common/global.h Adds setters/getters for parallel write config and thread count.
cpp/src/common/global.cc Initializes new config fields with defaults.
cpp/src/common/config/config.h Adds new config fields to ConfigValue.
cpp/CMakeLists.txt Adds ENABLE_THREADS option and compile definition.
Comments suppressed due to low confidence (1)

cpp/test/writer/table_view/tsfile_writer_table_test.cc:53

  • SetUp() mutates the process-global g_config_value_.parallel_write_enabled_ but does not restore the previous value in TearDown(). Since this is a global config, it can leak into other tests in the same binary and create order-dependent behavior. Save the old value in SetUp() and restore it in TearDown() (and consider using set_parallel_write_enabled() rather than writing the struct field directly).
    void SetUp() override {
        libtsfile_init();
        g_config_value_.parallel_write_enabled_ = GetParam();
        file_name_ = std::string("tsfile_writer_table_test_") +
                     generate_random_string(10) + std::string(".tsfile");
        remove(file_name_.c_str());
        int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
        flags |= O_BINARY;
#endif
        mode_t mode = 0666;
        write_file_.create(file_name_, flags, mode);
    }
    void TearDown() override {
        remove(file_name_.c_str());
        libtsfile_destroy();
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

#include <future>
#include <mutex>
#include <queue>
#include <thread>
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool uses std::result_of in the submit(F&&) template but the header does not include <type_traits>, where std::result_of is defined. This makes the header non-self-contained and can fail to compile depending on transitive includes. Add the missing include (or switch to std::invoke_result if/when the project moves off C++11).

Suggested change
#include <thread>
#include <thread>
#include <type_traits>

Copilot uses AI. Check for mistakes.
Comment on lines +90 to +106
void worker_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mu_);
cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
{
std::lock_guard<std::mutex> lk(mu_);
active_--;
}
cv_done_.notify_one();
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_loop() executes task() without any exception handling. If a task throws, the worker thread will terminate and active_ will never be decremented, causing wait_all() (and any code waiting on futures) to potentially block forever. Wrap task execution in a try/catch that ensures active_ is decremented and optionally stores/logs the exception.

Copilot uses AI. Check for mistakes.
message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")

if (ENABLE_THREADS)
add_definitions(-DENABLE_THREADS)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ENABLE_THREADS adds a compile definition, but the build does not link against the platform thread library (e.g., -pthread / Threads::Threads). On many toolchains this will cause unresolved symbols when using std::thread. When ENABLE_THREADS is ON, find_package(Threads REQUIRED) and link Threads::Threads to the relevant targets (at least tsfile and any object libs that end up in it).

Suggested change
add_definitions(-DENABLE_THREADS)
add_definitions(-DENABLE_THREADS)
find_package(Threads REQUIRED)
link_libraries(Threads::Threads)

Copilot uses AI. Check for mistakes.
Comment on lines +1192 to +1202
// Write one column in segments defined by page_boundaries, sealing
// at each boundary. Works for both time and value columns.
// We control page sealing explicitly at precomputed boundaries, so
// auto-seal must be disabled — otherwise a segment of exactly
// page_max_points would trigger auto-seal AND our explicit seal,
// double-sealing (sealing an empty page → crash).
auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
ei](TimeChunkWriter* tcw) -> int {
int r = E_OK;
tcw->set_enable_page_seal_if_full(false);
uint32_t seg_start = si;
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new segmented write path disables set_enable_page_seal_if_full and seals only at precomputed point-count boundaries. That bypasses the existing page-full logic which also enforces page_writer_max_memory_bytes_ (see TimeChunkWriter::is_cur_page_full() / ValueChunkWriter::is_cur_page_full()). For varlen columns (STRING/TEXT/BLOB) or when strict sizing is expected, pages can grow past the memory threshold because auto-seal is off. Consider keeping the previous strict/varlen behavior (or at least ensuring memory-threshold-based sealing remains effective) before parallelizing.

Copilot uses AI. Check for mistakes.
Comment on lines +1171 to +1176
uint32_t time_cur_points = time_chunk_writer->get_point_numer();
const uint32_t first_seg_cap =
(time_cur_points > 0 && time_cur_points < page_max_points)
? (page_max_points - time_cur_points)
: page_max_points;

Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first_seg_cap is derived from time_chunk_writer->get_point_numer(), but there is no handling for the case where the existing unsealed time page is already full (by points or memory). Previously write_table explicitly sealed a full leftover page before continuing. With auto-seal disabled later, writing into an already-full page can produce oversized pages or unexpected behavior. Consider sealing the current page(s) when the existing page meets the configured thresholds before starting segmented writes.

Copilot uses AI. Check for mistakes.
Comment on lines +1210 to +1214
if (seg_start < ei) {
r = time_write_column(tcw, tablet, seg_start, ei);
}
tcw->set_enable_page_seal_if_full(true);
return r;
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_time_in_segments / write_value_in_segments unconditionally restore set_enable_page_seal_if_full(true) at the end. That changes writer behavior when g_config_value_.strict_page_size_ is false (and may also override any previous state), making sealing behavior inconsistent across calls. Restore the flag to the intended configuration value (or the prior state) instead of forcing true.

Copilot uses AI. Check for mistakes.
Comment on lines +202 to +203
common::ThreadPool thread_pool_{
(size_t)common::g_config_value_.write_thread_count_};
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread_pool_ is constructed using the current value of common::g_config_value_.write_thread_count_ at member-initialization time. This makes the pool size effectively fixed per TsFileWriter instance and can also be problematic if the global config hasn't been initialized yet (or is accidentally 0), leading to a pool with 0 workers and tasks that never run. Consider clamping to >=1 and initializing the pool in the constructor after libtsfile_init()/config init (or lazily on first parallel use), and clarify how runtime changes to write_thread_count_ are intended to take effect.

Suggested change
common::ThreadPool thread_pool_{
(size_t)common::g_config_value_.write_thread_count_};
static size_t get_thread_pool_size() {
const int thread_count = common::g_config_value_.write_thread_count_;
return thread_count > 0 ? static_cast<size_t>(thread_count) : size_t{1};
}
common::ThreadPool thread_pool_{get_thread_pool_size()};

Copilot uses AI. Check for mistakes.
Comment on lines +174 to +178
FORCE_INLINE int set_write_thread_count(int32_t count) {
if (count < 1 || count > 64) return E_INVALID_ARG;
g_config_value_.write_thread_count_ = count;
return E_OK;
}
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_write_thread_count() updates the global config, but TsFileWriter's thread_pool_ is constructed once and never resized. As written, calling this at runtime will not affect existing writers and may mislead API consumers. Either document that it must be called before constructing writers, or adjust the writer/pool to honor runtime updates.

Copilot uses AI. Check for mistakes.
Comment on lines +1232 to +1236
if (seg_start < ei) {
r = value_write_column(vcw, tablet, col_idx, seg_start, ei);
}
vcw->set_enable_page_seal_if_full(true);
return r;
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_value_in_segments restores vcw->set_enable_page_seal_if_full(true) unconditionally. This can override the intended strict_page_size_ setting and makes the writer state depend on whether write_table() ran. Restore to the configured value (or the previous state) instead of forcing true.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@jt2594838 jt2594838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce this feature in ReadMe

@ColinLeeo ColinLeeo merged commit 87f66d9 into develop Apr 10, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants