Skip to content

Commit fbed520

Browse files
committed
fix
1 parent 5e2fcd7 commit fbed520

10 files changed

Lines changed: 45 additions & 29 deletions

src/paimon/core/global_index/global_index_scan_impl.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,11 @@ Status GlobalIndexScanImpl::Scan() {
114114
options_.DataFilePrefix(), options_.LegacyPartitionNameEnabled(), external_paths,
115115
global_index_external_path, options_.IndexFileInDataFileDir(), pool_));
116116

117-
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<IndexManifestFile> index_manifest_file,
118-
IndexManifestFile::Create(
119-
options_.GetFileSystem(), options_.GetManifestFormat(),
120-
options_.GetManifestCompression(), path_factory_, pool_, options_));
117+
PAIMON_ASSIGN_OR_RAISE(
118+
std::unique_ptr<IndexManifestFile> index_manifest_file,
119+
IndexManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(),
120+
options_.GetManifestCompression(), path_factory_,
121+
options_.GetBucket(), pool_, options_));
121122
auto index_file_handler =
122123
std::make_unique<IndexFileHandler>(options_.GetFileSystem(), std::move(index_manifest_file),
123124
std::make_shared<IndexFilePathFactories>(path_factory_),

src/paimon/core/index/index_file_handler_test.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ class IndexFileHandlerTest : public testing::Test {
6868
/*index_file_in_data_file_dir=*/core_options.IndexFileInDataFileDir(),
6969
memory_pool_));
7070
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<IndexManifestFile> index_manifest_file,
71-
IndexManifestFile::Create(core_options.GetFileSystem(),
72-
core_options.GetManifestFormat(),
73-
core_options.GetManifestCompression(),
74-
path_factory, memory_pool_, core_options));
71+
IndexManifestFile::Create(
72+
core_options.GetFileSystem(), core_options.GetManifestFormat(),
73+
core_options.GetManifestCompression(), path_factory,
74+
core_options.GetBucket(), memory_pool_, core_options));
7575
auto path_factories = std::make_shared<IndexFilePathFactories>(path_factory);
7676
return std::make_unique<IndexFileHandler>(
7777
core_options.GetFileSystem(), std::move(index_manifest_file), path_factories,

src/paimon/core/manifest/index_manifest_file.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class MemoryPool;
4444
Result<std::unique_ptr<IndexManifestFile>> IndexManifestFile::Create(
4545
const std::shared_ptr<FileSystem>& file_system, const std::shared_ptr<FileFormat>& file_format,
4646
const std::string& compression, const std::shared_ptr<FileStorePathFactory>& path_factory,
47-
const std::shared_ptr<MemoryPool>& pool, const CoreOptions& options) {
47+
int32_t bucket_mode, const std::shared_ptr<MemoryPool>& pool, const CoreOptions& options) {
4848
std::shared_ptr<arrow::DataType> data_type =
4949
VersionedObjectSerializer<IndexManifestEntry>::VersionType(IndexManifestEntry::DataType());
5050

@@ -69,19 +69,21 @@ IndexManifestFile::IndexManifestFile(const std::shared_ptr<FileSystem>& file_sys
6969
const std::shared_ptr<WriterBuilder>& writer_builder,
7070
const std::string& compression,
7171
const std::shared_ptr<PathFactory>& path_factory,
72-
const std::shared_ptr<MemoryPool>& pool)
72+
int32_t bucket_mode, const std::shared_ptr<MemoryPool>& pool)
7373
: ObjectsFile<IndexManifestEntry>(file_system, reader_builder, writer_builder,
7474
std::make_unique<IndexManifestEntrySerializer>(pool),
75-
compression, path_factory, pool) {}
75+
compression, path_factory, pool),
76+
bucket_mode_(bucket_mode) {}
7677

7778
Result<std::optional<std::string>> IndexManifestFile::WriteIndexFiles(
7879
const std::optional<std::string>& previous_index_manifest,
7980
const std::vector<IndexManifestEntry>& new_index_files) {
8081
if (new_index_files.empty()) {
8182
return previous_index_manifest;
8283
}
83-
PAIMON_ASSIGN_OR_RAISE(std::string file, IndexManifestFileHandler::Write(
84-
previous_index_manifest, new_index_files, this));
84+
PAIMON_ASSIGN_OR_RAISE(std::string file,
85+
IndexManifestFileHandler::Write(previous_index_manifest, new_index_files,
86+
bucket_mode_, this));
8587
return std::optional<std::string>(file);
8688
}
8789

src/paimon/core/manifest/index_manifest_file.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class IndexManifestFile : public ObjectsFile<IndexManifestEntry> {
4646
static Result<std::unique_ptr<IndexManifestFile>> Create(
4747
const std::shared_ptr<FileSystem>& file_system,
4848
const std::shared_ptr<FileFormat>& file_format, const std::string& compression,
49-
const std::shared_ptr<FileStorePathFactory>& path_factory,
50-
const std::shared_ptr<MemoryPool>& pool, int32_t bucket_mode, const CoreOptions& options);
49+
const std::shared_ptr<FileStorePathFactory>& path_factory, int32_t bucket_mode,
50+
const std::shared_ptr<MemoryPool>& pool, const CoreOptions& options);
5151

5252
/// Write new index files to index manifest.
5353
Result<std::optional<std::string>> WriteIndexFiles(
@@ -59,7 +59,9 @@ class IndexManifestFile : public ObjectsFile<IndexManifestEntry> {
5959
const std::shared_ptr<ReaderBuilder>& reader_builder,
6060
const std::shared_ptr<WriterBuilder>& writer_builder,
6161
const std::string& compression,
62-
const std::shared_ptr<PathFactory>& path_factory,
62+
const std::shared_ptr<PathFactory>& path_factory, int32_t bucket_mode,
6363
const std::shared_ptr<MemoryPool>& pool);
64+
65+
const int32_t bucket_mode_;
6466
};
6567
} // namespace paimon

src/paimon/core/manifest/index_manifest_file_handler.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ std::vector<IndexManifestEntry> IndexManifestFileHandler::GlobalFileNameCombiner
106106

107107
Result<std::string> IndexManifestFileHandler::Write(
108108
const std::optional<std::string>& previous_index_manifest,
109-
const std::vector<IndexManifestEntry>& new_index_entries,
109+
const std::vector<IndexManifestEntry>& new_index_entries, int32_t bucket_mode,
110110
IndexManifestFile* index_manifest_file) {
111111
std::vector<IndexManifestEntry> entries;
112112
if (previous_index_manifest != std::nullopt) {
@@ -135,7 +135,7 @@ Result<std::string> IndexManifestFileHandler::Write(
135135
for (const auto& index_type : index_types) {
136136
PAIMON_ASSIGN_OR_RAISE(
137137
std::unique_ptr<IndexManifestFileHandler::IndexManifestFileCombiner> combiner,
138-
GetIndexManifestFileCombine(index_type));
138+
GetIndexManifestFileCombine(index_type, bucket_mode));
139139
std::vector<IndexManifestEntry> typed_previous_entries = previous[index_type];
140140
std::vector<IndexManifestEntry> typed_current_entries = current[index_type];
141141
std::vector<IndexManifestEntry> combined_entries =
@@ -162,15 +162,15 @@ IndexManifestFileHandler::SeparateIndexEntries(
162162
}
163163

164164
Result<std::unique_ptr<IndexManifestFileHandler::IndexManifestFileCombiner>>
165-
IndexManifestFileHandler::GetIndexManifestFileCombine(const std::string& index_type) {
165+
IndexManifestFileHandler::GetIndexManifestFileCombine(const std::string& index_type,
166+
int32_t bucket_mode) {
166167
if (index_type != DeletionVectorsIndexFile::DELETION_VECTORS_INDEX && index_type != "HASH") {
167168
return std::make_unique<GlobalFileNameCombiner>();
168169
}
169170
if (index_type == DeletionVectorsIndexFile::DELETION_VECTORS_INDEX && bucket_mode == -1) {
170171
return Status::NotImplemented("not yet support dv with BUCKET_UNAWARE mode");
171-
} else {
172-
return std::make_unique<BucketedCombiner>();
173172
}
173+
return std::make_unique<BucketedCombiner>();
174174
}
175175

176176
} // namespace paimon

src/paimon/core/manifest/index_manifest_file_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,6 @@ class IndexManifestFileHandler {
6666
const std::vector<IndexManifestEntry>& index_entries);
6767

6868
static Result<std::unique_ptr<IndexManifestFileCombiner>> GetIndexManifestFileCombine(
69-
const std::string& index_type);
69+
const std::string& index_type, int32_t bucket_mode);
7070
};
7171
} // namespace paimon

src/paimon/core/mergetree/lookup_levels.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,21 @@ template <typename T>
187187
Status LookupLevels<T>::CreateSstFileFromDataFile(const std::shared_ptr<DataFileMeta>& file,
188188
const std::string& kv_file_path) {
189189
// Prepare reader to iterate KeyValue
190+
auto dv_factory =
191+
[this](const std::string& file_name) -> Result<std::shared_ptr<DeletionVector>> {
192+
auto iter = deletion_file_map_.find(file_name);
193+
if (iter != deletion_file_map_.end()) {
194+
PAIMON_ASSIGN_OR_RAISE(
195+
std::shared_ptr<DeletionVector> dv,
196+
DeletionVector::Read(options_.GetFileSystem().get(), iter->second, pool_.get()));
197+
return dv;
198+
}
199+
return std::shared_ptr<DeletionVector>();
200+
};
190201
PAIMON_ASSIGN_OR_RAISE(
191202
std::vector<std::unique_ptr<FileBatchReader>> raw_readers,
192203
split_read_->CreateRawFileReaders(partition_, {file}, read_schema_,
193-
/*predicate=*/nullptr, deletion_file_map_,
204+
/*predicate=*/nullptr, dv_factory,
194205
/*row_ranges=*/std::nullopt, data_file_path_factory_));
195206
if (raw_readers.size() != 1) {
196207
return Status::Invalid("Unexpected, CreateSstFileFromDataFile only create single reader");

src/paimon/core/operation/file_store_commit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ Result<std::unique_ptr<FileStoreCommit>> FileStoreCommit::Create(
121121
std::shared_ptr<IndexManifestFile> index_manifest_file,
122122
IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(),
123123
options.GetManifestCompression(), path_factory,
124-
ctx->GetMemoryPool(), options));
124+
options.GetBucket(), ctx->GetMemoryPool(), options));
125125

126126
auto expire_snapshots = std::make_shared<ExpireSnapshots>(
127127
snapshot_manager, path_factory, manifest_list, manifest_file, options.GetFileSystem(),

src/paimon/core/operation/file_store_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ Result<std::unique_ptr<FileStoreWrite>> FileStoreWrite::Create(std::unique_ptr<W
133133
std::unique_ptr<IndexManifestFile> index_manifest_file,
134134
IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(),
135135
options.GetManifestCompression(), file_store_path_factory,
136-
ctx->GetMemoryPool(), options));
136+
options.GetBucket(), ctx->GetMemoryPool(), options));
137137
auto index_file_handler = std::make_shared<IndexFileHandler>(
138138
options.GetFileSystem(), std::move(index_manifest_file),
139139
std::make_shared<IndexFilePathFactories>(file_store_path_factory),

src/paimon/core/table/source/table_scan.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ class TableScanImpl {
140140
const CoreOptions& core_options, const std::shared_ptr<FileStorePathFactory>& path_factory,
141141
const std::shared_ptr<MemoryPool>& memory_pool) {
142142
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<IndexManifestFile> index_manifest_file,
143-
IndexManifestFile::Create(core_options.GetFileSystem(),
144-
core_options.GetManifestFormat(),
145-
core_options.GetManifestCompression(),
146-
path_factory, memory_pool, core_options));
143+
IndexManifestFile::Create(
144+
core_options.GetFileSystem(), core_options.GetManifestFormat(),
145+
core_options.GetManifestCompression(), path_factory,
146+
core_options.GetBucket(), memory_pool, core_options));
147147
return std::make_unique<IndexFileHandler>(
148148
core_options.GetFileSystem(), std::move(index_manifest_file),
149149
std::make_shared<IndexFilePathFactories>(path_factory),

0 commit comments

Comments
 (0)