diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 2548ee5f985..1ad0579a451 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -683,7 +683,7 @@ public void preBatchMutate(ObserverContext c, * @return HA group if present or empty if missing */ private Optional getHAGroupFromBatch(RegionCoprocessorEnvironment env, - MiniBatchOperationInProgress miniBatchOp) { + MiniBatchOperationInProgress miniBatchOp) throws IOException { if (miniBatchOp.size() > 0) { Mutation m = miniBatchOp.getOperation(0); byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); @@ -702,7 +702,7 @@ private Optional getHAGroupFromBatch(RegionCoprocessorEnvir * @return HA group if present or empty if missing */ private Optional getHAGroupFromWALKey(RegionCoprocessorEnvironment env, - org.apache.hadoop.hbase.wal.WALKey logKey) { + org.apache.hadoop.hbase.wal.WALKey logKey) throws IOException { byte[] haGroupName = logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java index d80994369cf..17574215994 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java @@ -57,6 +57,12 @@ public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery { public static final String REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY = "phoenix.replication.forwarder.waiting.buffer.percentage"; + /** + * Configuration key for in-progress directory processing probability (percentage) + */ + public static final String REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY = + "phoenix.replication.forwarder.in.progress.processing.probability"; + private final ReplicationLogGroup logGroup; private final double copyThroughputThresholdBytesPerMs; // the timestamp (in future) at which we will attempt to set the HAGroup state to SYNC @@ -134,7 +140,7 @@ protected void processFile(Path src) throws IOException { FileSystem srcFS = replicationLogTracker.getFileSystem(); FileStatus srcStat = srcFS.getFileStatus(src); long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath()); - ReplicationShardDirectoryManager remoteShardManager = logGroup.getPeerShardManager(); + ReplicationShardDirectoryManager remoteShardManager = logGroup.getOrCreatePeerShardManager(); Path dst = remoteShardManager.getWriterPath(ts, logGroup.getServerName().getServerName()); long startTime = EnvironmentEdgeManager.currentTimeMillis(); FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, false, false, conf); @@ -227,4 +233,10 @@ public double getWaitingBufferPercentage() { return getConf().getDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, DEFAULT_WAITING_BUFFER_PERCENTAGE); } + + @Override + public double getInProgressDirectoryProcessProbability() { + return getConf().getDouble(REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY, + super.getInProgressDirectoryProcessProbability()); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index c88d9b2647b..ded1fec7b1b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -36,6 +36,7 @@ import com.lmax.disruptor.dsl.ProducerType; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -165,6 +166,9 @@ public class ReplicationLogGroup { public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY = "phoenix.replication.log.retry.delay.ms"; public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L; + public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY = + "phoenix.replication.log.peer.init.timeout.ms"; + public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L; public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout"; public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L; @@ -180,10 +184,15 @@ public class ReplicationLogGroup { protected final String haGroupName; protected final HAGroupStoreManager haGroupStoreManager; protected final MetricsReplicationLogGroupSource metrics; - protected ReplicationShardDirectoryManager peerShardManager; + // Cached at init time — HDFS URLs (local and peer) are fixed for the lifetime of this group. + // URL changes require RS restart. + protected HAGroupStoreRecord haGroupStoreRecord; protected ReplicationShardDirectoryManager localShardManager; + protected volatile ReplicationShardDirectoryManager peerShardManager; + private CompletableFuture peerShardManagerFuture; protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; + protected long peerInitTimeoutMs; protected volatile boolean closed = false; /** @@ -331,20 +340,24 @@ public Record(String tableName, long commitId, Mutation mutation) { * @param serverName The server name * @param haGroupName The HA Group name * @return ReplicationLogGroup instance - * @throws RuntimeException if initialization fails + * @throws IOException if initialization fails */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, - String haGroupName) { - return INSTANCES.computeIfAbsent(haGroupName, k -> { - try { - ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); - group.init(); - return group; - } catch (IOException e) { - LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - throw new RuntimeException(e); - } - }); + String haGroupName) throws IOException { + try { + return INSTANCES.computeIfAbsent(haGroupName, k -> { + try { + ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); + group.init(); + return group; + } catch (IOException e) { + LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } } /** @@ -354,21 +367,25 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, * @param haGroupName The HA Group name * @param haGroupStoreManager HA Group Store Manager instance * @return ReplicationLogGroup instance - * @throws RuntimeException if initialization fails + * @throws IOException if initialization fails */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, - String haGroupName, HAGroupStoreManager haGroupStoreManager) { - return INSTANCES.computeIfAbsent(haGroupName, k -> { - try { - ReplicationLogGroup group = - new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); - group.init(); - return group; - } catch (IOException e) { - LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - throw new RuntimeException(e); - } - }); + String haGroupName, HAGroupStoreManager haGroupStoreManager) throws IOException { + try { + return INSTANCES.computeIfAbsent(haGroupName, k -> { + try { + ReplicationLogGroup group = + new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); + group.init(); + return group; + } catch (IOException e) { + LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } } /** @@ -421,9 +438,10 @@ protected void init() throws IOException { throw new IOException(message); } HAGroupStoreRecord record = haRecord.get(); - // First initialize the shard managers - this.peerShardManager = createPeerShardManager(record); - this.localShardManager = createLocalShardManager(record); + this.haGroupStoreRecord = record; + this.localShardManager = createLocalShardManager(); + this.peerInitTimeoutMs = conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, + DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS); // Initialize the replication log forwarder. The log forwarder is only activated when // we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode this.logForwarder = new ReplicationLogDiscoveryForwarder(this); @@ -783,21 +801,61 @@ private ReplicationShardDirectoryManager createShardManager(String uri, String l } } - /** create shard manager for the standby cluster */ - protected ReplicationShardDirectoryManager createPeerShardManager(HAGroupStoreRecord record) - throws IOException { - return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR); + /** create shard manager for the fallback cluster */ + protected ReplicationShardDirectoryManager createLocalShardManager() throws IOException { + return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR); } - /** create shard manager for the fallback cluster */ - protected ReplicationShardDirectoryManager createLocalShardManager(HAGroupStoreRecord record) - throws IOException { - return createShardManager(record.getHdfsUrl(), FALLBACK_DIR); + /** + * Get or create the peer shard manager. Thread-safe and idempotent — the first successful + * creation is cached; subsequent calls return the cached instance. Bounded by + * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the disruptor handler + * thread on a peer NN outage. + */ + protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException { + ReplicationShardDirectoryManager cached = peerShardManager; + if (cached != null) { + return cached; + } + synchronized (this) { + if (peerShardManager != null) { + return peerShardManager; + } + if (peerShardManagerFuture == null || peerShardManagerFuture.isCompletedExceptionally()) { + // retry + peerShardManagerFuture = CompletableFuture.supplyAsync(() -> { + try { + return createPeerShardManager(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + try { + peerShardManager = peerShardManagerFuture.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); + return peerShardManager; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof UncheckedIOException) { + throw ((UncheckedIOException) cause).getCause(); + } + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException("Failed to create peer shard manager", cause); + } catch (TimeoutException e) { + throw new IOException("Timed out creating peer shard manager after " + peerInitTimeoutMs + + "ms for " + haGroupName, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while creating peer shard manager", e); + } + } } - /** return shard manager for the standby cluster */ - protected ReplicationShardDirectoryManager getPeerShardManager() { - return peerShardManager; + /** Create a new peer shard manager for the standby cluster */ + protected ReplicationShardDirectoryManager createPeerShardManager() throws IOException { + return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), STANDBY_DIR); } /** return shard manager for the fallback cluster */ @@ -809,14 +867,10 @@ private FileSystem getFileSystem(URI uri) throws IOException { return FileSystem.get(uri, conf); } - /** Create the standby(synchronous) writer */ - protected ReplicationLog createStandbyLog() throws IOException { - return new ReplicationLog(this, peerShardManager); - } - - /** Create the fallback writer */ - protected ReplicationLog createFallbackLog() throws IOException { - return new ReplicationLog(this, localShardManager); + /** Create a replication log using the given shard manager */ + protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager) + throws IOException { + return new ReplicationLog(this, shardManager); } /** Returns the log forwarder for this replication group */ diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java index ea18a5b853c..7bcb4c89e09 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java @@ -55,7 +55,7 @@ protected StoreAndForwardModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the fallback cluster - log = logGroup.createFallbackLog(); + log = logGroup.createReplicationLog(logGroup.getLocalShardManager()); log.init(); // Schedule task to periodically set the HAGroupStore state to ACTIVE_NOT_IN_SYNC startHAGroupStoreUpdateTask(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index aefce975cfd..02c57f7f3fd 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -44,7 +44,8 @@ protected SyncAndForwardModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - log = logGroup.createStandbyLog(); + ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager(); + log = logGroup.createReplicationLog(peerShardManager); log.init(); // no-op if the forwarder is already started logGroup.getLogForwarder().start(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 75e175ad4e7..0a5b5a48c66 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -43,7 +43,8 @@ protected SyncModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - log = logGroup.createStandbyLog(); + ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager(); + log = logGroup.createReplicationLog(peerShardManager); log.init(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java index 949c3f20295..651cddde6c4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java @@ -124,7 +124,7 @@ private Map> groupLogsByTable() throws Exception { LogFileAnalyzer analyzer = new LogFileAnalyzer(); // use peer cluster conf analyzer.setConf(conf2); - Path standByLogDir = logGroup.getPeerShardManager().getRootDirectoryPath(); + Path standByLogDir = logGroup.getOrCreatePeerShardManager().getRootDirectoryPath(); LOG.info("Analyzing log files at {}", standByLogDir); String[] args = { "--check", standByLogDir.toString() }; assertEquals(0, analyzer.run(args)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java index f748e3d1a8c..d49e2ffc941 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java @@ -165,13 +165,9 @@ public TestableLogGroup(Configuration conf, ServerName serverName, String haGrou } @Override - protected ReplicationLog createStandbyLog() throws IOException { - return spy(new TestableLog(this, peerShardManager, useAlignedRotation)); - } - - @Override - protected ReplicationLog createFallbackLog() throws IOException { - return spy(new TestableLog(this, localShardManager, useAlignedRotation)); + protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager) + throws IOException { + return spy(new TestableLog(this, shardManager, useAlignedRotation)); } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java index 46efbf196e4..65fb764a31b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.util.concurrent.Callable; @@ -179,4 +180,56 @@ public Object answer(InvocationOnMock invocation) { } assertEquals(SYNC, logGroup.getMode()); } + + /** + * Tests that the forwarder retries peer shard manager creation when the peer is initially + * unavailable. On the first attempt, getOrCreatePeerShardManager throws; the file is marked + * failed and retried via in-progress processing. On the retry the peer becomes available and + * forwarding succeeds. + */ + @Test + public void testForwarderRetriesPeerCreation() throws Exception { + final String tableName = "TBLFWDRETRY"; + final long count = 10L; + + // Ensure in-progress files are immediately eligible for retry and always processed + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0); + conf.setDouble( + ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY, + 100.0); + // Recreate the log group with the updated config + recreateLogGroup(); + assertEquals(STORE_AND_FORWARD, logGroup.getMode()); + + // Make getOrCreatePeerShardManager fail on the first call, then succeed on subsequent calls + doThrow(new IOException("Peer namenode unavailable")).doCallRealMethod().when(logGroup) + .getOrCreatePeerShardManager(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + logGroup.setMode(SYNC); + try { + logGroup.sync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return 0L; + } + }).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName); + + // Write some data so the forwarder has files to process + for (long id = 1; id <= count; ++id) { + Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2); + logGroup.append(tableName, id, put); + } + logGroup.sync(); + + // Wait for the forwarder to eventually succeed after retrying peer creation + long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000; + while (logGroup.getMode() != SYNC && EnvironmentEdgeManager.currentTimeMillis() < deadline) { + Thread.sleep(500); + } + assertEquals(SYNC, logGroup.getMode()); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 727959541e6..2bf723defd0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -1709,7 +1709,8 @@ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception { doAnswer(poisonNewWriter).when(log).createNewWriter(); return log; }; - doAnswer(poisonLog).when(logGroup).createFallbackLog(); + doAnswer(poisonLog).when(logGroup) + .createReplicationLog(any(ReplicationShardDirectoryManager.class)); // Poison the already-initialized SYNC log ReplicationLog activeLog = logGroup.getActiveLog(); @@ -1793,4 +1794,83 @@ public void testCalculateSyncTimeout() throws Exception { recreateLogGroup(); assertEquals("Explicit override should take precedence", 5000L, logGroup.syncTimeoutMs); } + + /** + * Tests that when the peer cluster is unavailable at startup, the group degrades from SYNC to + * STORE_AND_FORWARD and remains functional (append/sync work). + */ + @Test + public void testInitDegradesToSafWhenPeerUnavailable() throws Exception { + final String tableName = "TBLDEG"; + final long commitId = 1L; + final Mutation put = LogFileTestUtil.newPut("row", 1, 1); + + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + doThrow(new IOException("Standby namenode unavailable")).when(group) + .getOrCreatePeerShardManager(); + group.init(); + + try { + // Should have degraded to STORE_AND_FORWARD + assertEquals(STORE_AND_FORWARD, group.getMode()); + + // Verify the group is functional — append and sync should work via local shard manager + group.append(tableName, commitId, put); + group.sync(); + } finally { + group.close(); + } + } + + /** + * Tests that when the local cluster is unavailable at startup, init fails with IOException. + * Neither SYNC nor SAF mode can operate without a local shard manager. + */ + @Test + public void testInitFailsWhenLocalUnavailable() throws Exception { + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + doThrow(new IOException("Local namenode unavailable")).when(group).createLocalShardManager(); + try { + group.init(); + fail("Should have thrown IOException when local shard manager is unavailable"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Local namenode unavailable")); + } + } + + /** + * Tests that when peer shard manager creation exceeds the configured timeout, the group degrades + * to STORE_AND_FORWARD instead of blocking the disruptor handler thread indefinitely. + */ + @Test + public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception { + final String tableName = "TBLTIMEOUT"; + final long commitId = 1L; + final Mutation put = LogFileTestUtil.newPut("row", 1, 1); + + // Set a very short peer init timeout + conf.setLong(ReplicationLogGroup.REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, 100L); + + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + // Make createPeerShardManager block longer than the configured timeout + doAnswer(invocation -> { + Thread.sleep(5000); + return invocation.callRealMethod(); + }).when(group).createPeerShardManager(); + group.init(); + + try { + // Should have degraded to STORE_AND_FORWARD due to timeout + assertEquals(STORE_AND_FORWARD, group.getMode()); + + // Verify the group is functional + group.append(tableName, commitId, put); + group.sync(); + } finally { + group.close(); + } + } }