[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030
[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030hanliu0830 wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements Phase 1 of JBOD (multi local disk) support in the Fluss TabletServer by introducing a LocalDiskManager that validates/locks multiple local directories, persists per-disk identity, and enables per-directory checkpointing + placement decisions across Log/KV/Replica/RemoteLog components.
Changes:
- Add
data.dirsconfig option and a newLocalDiskManagerthat validates/locks directories and manages per-disk metadata + simple load counters. - Plumb multi-directory awareness through
TabletServer,LogManager,KvManager,ReplicaManager,Replica, andRemoteLogManager(per-dir checkpoints, placement, cache). - Extend/adjust unit tests to cover multi-directory placement and per-directory checkpoint/cache behavior.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java | New unit tests for directory validation, locking, disk.properties, and placement counters. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java | Test base now initializes LocalDiskManager and supports tagged JBOD multi-dir tests. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java | Adds JBOD multi-dir tests for distribution and per-dir high-watermark checkpointing. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java | Updates replica-manager construction in tests to pass LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java | Adds JBOD multi-dir test verifying index cache is tied to replica directory. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java | Uses per-dir RemoteLogIndexCache accessor. |
| fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java | Adds JBOD tests for per-dir recovery checkpoints and clean shutdown markers. |
| fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java | Updates tests to pass explicit data dir and close/recreate LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java | Updates KV tests to pass LocalDiskManager and explicit data dir for tablet creation. |
| fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java | Refactors to operate on a list of data dirs; lists tablets per dir; adds per-dir executor helper. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java | Creates/closes LocalDiskManager and passes it into log/kv/replica managers. |
| fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java | New component that owns multi-dir validation, locking, disk identity, and placement counters. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java | Per-dir high-watermark checkpoints; placement uses LocalDiskManager; remote log manager ctor updated. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java | Replica now carries dataDir to ensure Log/KV tablets and paths are created in the chosen directory. |
| fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java | Maintains per-dir remote index caches; resolves cache based on bucket/log location. |
| fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java | Multi-dir recovery/shutdown, per-dir recovery-point checkpoints, and explicit data-dir log creation. |
| fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java | Multi-dir tablet creation APIs; plumbs selected data dir into KV tablet creation. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds new data.dirs configuration option. |
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java:134
- With multiple
data.dirs, log recovery/shutdown now creates per-directory thread pools. BecauserecoveryThreadsPerDataDiris sourced fromnetty.server.num-worker-threads(default 8), the total thread count scales asnumDataDirs * netty.server.num-worker-threads(plus extra per-dir executors), which can be much higher than before. Consider introducing a dedicated recovery thread config (total vs per-dir) or otherwise capping threads to avoid startup/shutdown thread explosion on hosts with many disks.
return new LogManager(
localDiskManager,
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
scheduler,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public int lookupPositionForOffset(RemoteLogSegment remoteLogSegment, long offset) { | ||
| return remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); | ||
| RemoteLogIndexCache remoteLogIndexCache = | ||
| remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()); | ||
| return remoteLogIndexCache == null | ||
| ? -1 |
There was a problem hiding this comment.
lookupPositionForOffset returns -1 when remoteLogIndexCacheForBucket(...) cannot resolve a cache. That firstStartPos is propagated into RemoteLogFetchInfo and the client uses it as the starting position within the first remote segment; -1 effectively causes the client to read from the beginning of the segment (wrong offset / duplicates). Consider failing fast (throw) or providing a deterministic fallback cache so a valid position is always computed for buckets that have remote segments.
| for (File dataDir : localDiskManager.dataDirs()) { | ||
| remoteLogIndexCachesByDir.put( | ||
| dataDir, | ||
| new RemoteLogIndexCache( | ||
| (int) | ||
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | ||
| .getBytes(), |
There was a problem hiding this comment.
REMOTE_LOG_INDEX_FILE_CACHE_SIZE is applied per configured dataDir by creating one RemoteLogIndexCache per directory, which multiplies the effective on-disk cache budget by #data.dirs (and changes the meaning of the config option, whose description implies a total budget). Consider splitting the configured cache size across directories or updating the configuration semantics/documentation accordingly.
| for (File dataDir : localDiskManager.dataDirs()) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| (int) | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | |
| .getBytes(), | |
| List<File> dataDirs = localDiskManager.dataDirs(); | |
| long totalRemoteLogIndexFileCacheSizeBytes = | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE).getBytes(); | |
| int perDataDirRemoteLogIndexFileCacheSizeBytes = | |
| dataDirs.isEmpty() | |
| ? (int) totalRemoteLogIndexFileCacheSizeBytes | |
| : (int) (totalRemoteLogIndexFileCacheSizeBytes / dataDirs.size()); | |
| for (File dataDir : dataDirs) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| perDataDirRemoteLogIndexFileCacheSizeBytes, |
| String serverId = properties.getProperty(SERVER_ID_KEY); | ||
| return new DiskProperties(parsedVersion, diskId, serverId); | ||
| } catch (NoSuchFileException e) { | ||
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); |
There was a problem hiding this comment.
Missing disk.properties appears to be an expected state on first startup for a fresh directory, but this is logged at WARN. This can create noisy logs during normal bootstraps; consider lowering this to INFO/DEBUG (or only warn when the directory is non-empty / previously initialized).
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); | |
| LOG.info("No disk.properties file under dir {}", file.getAbsolutePath()); |
| /** Test for {@link LogManager}. */ | ||
| final class LogManagerTest extends LogTestBase { | ||
|
|
||
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; |
There was a problem hiding this comment.
JBOD_MULTI_DIR_TAG duplicates the same tag constant already defined in ReplicaTestBase (ReplicaTestBase.JBOD_MULTI_DIR_TAG). Reusing the shared constant would avoid accidental drift if the tag string changes.
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; | |
| private static final String JBOD_MULTI_DIR_TAG = | |
| org.apache.fluss.server.replica.ReplicaTestBase.JBOD_MULTI_DIR_TAG; |
(The sections below can be removed for hotfixes or typos)
-->
Purpose
Linked issue: close #145
Brief change log
Tests
API and Format
Documentation