From e86bb0a2c8c91469304d0049f05579dde7b403c0 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Thu, 7 May 2026 14:42:24 -0700 Subject: [PATCH 1/3] PHOENIX-7845 ReplicationLogGroup initialization resilience to standby cluster unavailability --- .../hbase/index/IndexRegionObserver.java | 4 +- .../ReplicationLogDiscoveryForwarder.java | 22 ++++- .../replication/ReplicationLogGroup.java | 98 ++++++++++--------- .../replication/StoreAndForwardModeImpl.java | 2 +- .../replication/SyncAndForwardModeImpl.java | 3 +- .../phoenix/replication/SyncModeImpl.java | 3 +- .../replication/ReplicationLogGroupIT.java | 2 +- .../replication/ReplicationLogBaseTest.java | 10 +- .../ReplicationLogDiscoveryForwarderTest.java | 53 ++++++++++ .../replication/ReplicationLogGroupTest.java | 47 ++++++++- 10 files changed, 182 insertions(+), 62 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 2548ee5f985..1ad0579a451 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -683,7 +683,7 @@ public void preBatchMutate(ObserverContext c, * @return HA group if present or empty if missing */ private Optional getHAGroupFromBatch(RegionCoprocessorEnvironment env, - MiniBatchOperationInProgress miniBatchOp) { + MiniBatchOperationInProgress miniBatchOp) throws IOException { if (miniBatchOp.size() > 0) { Mutation m = miniBatchOp.getOperation(0); byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); @@ -702,7 +702,7 @@ private Optional getHAGroupFromBatch(RegionCoprocessorEnvir * @return HA group if present or empty if missing */ private Optional getHAGroupFromWALKey(RegionCoprocessorEnvironment env, - org.apache.hadoop.hbase.wal.WALKey logKey) { + org.apache.hadoop.hbase.wal.WALKey logKey) throws IOException { byte[] haGroupName = logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB); if (haGroupName != null) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java index d80994369cf..9d61a052089 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java @@ -57,10 +57,17 @@ public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery { public static final String REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY = "phoenix.replication.forwarder.waiting.buffer.percentage"; + /** + * Configuration key for in-progress directory processing probability (percentage) + */ + public static final String REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY = + "phoenix.replication.forwarder.in.progress.processing.probability"; + private final ReplicationLogGroup logGroup; private final double copyThroughputThresholdBytesPerMs; // the timestamp (in future) at which we will attempt to set the HAGroup state to SYNC private long syncUpdateTS; + private ReplicationShardDirectoryManager peerShardManager; /** * Create a tracker for the replication logs in the fallback cluster. @@ -129,12 +136,19 @@ public void init() throws IOException { HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, activeInSync); } + private ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException { + if (peerShardManager == null) { + peerShardManager = logGroup.createPeerShardManager(); + } + return peerShardManager; + } + @Override protected void processFile(Path src) throws IOException { FileSystem srcFS = replicationLogTracker.getFileSystem(); FileStatus srcStat = srcFS.getFileStatus(src); long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath()); - ReplicationShardDirectoryManager remoteShardManager = logGroup.getPeerShardManager(); + ReplicationShardDirectoryManager remoteShardManager = getOrCreatePeerShardManager(); Path dst = remoteShardManager.getWriterPath(ts, logGroup.getServerName().getServerName()); long startTime = EnvironmentEdgeManager.currentTimeMillis(); FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, false, false, conf); @@ -227,4 +241,10 @@ public double getWaitingBufferPercentage() { return getConf().getDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, DEFAULT_WAITING_BUFFER_PERCENTAGE); } + + @Override + public double getInProgressDirectoryProcessProbability() { + return getConf().getDouble(REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY, + super.getInProgressDirectoryProcessProbability()); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index c88d9b2647b..e024c763e1a 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -180,7 +180,7 @@ public class ReplicationLogGroup { protected final String haGroupName; protected final HAGroupStoreManager haGroupStoreManager; protected final MetricsReplicationLogGroupSource metrics; - protected ReplicationShardDirectoryManager peerShardManager; + protected HAGroupStoreRecord haGroupStoreRecord; protected ReplicationShardDirectoryManager localShardManager; protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; @@ -331,20 +331,28 @@ public Record(String tableName, long commitId, Mutation mutation) { * @param serverName The server name * @param haGroupName The HA Group name * @return ReplicationLogGroup instance - * @throws RuntimeException if initialization fails + * @throws IOException if initialization fails */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, - String haGroupName) { - return INSTANCES.computeIfAbsent(haGroupName, k -> { - try { - ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); - group.init(); - return group; - } catch (IOException e) { - LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - throw new RuntimeException(e); + String haGroupName) throws IOException { + try { + return INSTANCES.computeIfAbsent(haGroupName, k -> { + try { + ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName); + group.init(); + return group; + } catch (IOException e) { + LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); + // computeIfAbsent does not allow checked exceptions; unwrapped in the outer catch + throw new RuntimeException(e); + } + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); } - }); + throw e; + } } /** @@ -354,21 +362,29 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, * @param haGroupName The HA Group name * @param haGroupStoreManager HA Group Store Manager instance * @return ReplicationLogGroup instance - * @throws RuntimeException if initialization fails + * @throws IOException if initialization fails */ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, - String haGroupName, HAGroupStoreManager haGroupStoreManager) { - return INSTANCES.computeIfAbsent(haGroupName, k -> { - try { - ReplicationLogGroup group = - new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); - group.init(); - return group; - } catch (IOException e) { - LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - throw new RuntimeException(e); + String haGroupName, HAGroupStoreManager haGroupStoreManager) throws IOException { + try { + return INSTANCES.computeIfAbsent(haGroupName, k -> { + try { + ReplicationLogGroup group = + new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager); + group.init(); + return group; + } catch (IOException e) { + LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); + // computeIfAbsent does not allow checked exceptions; unwrapped in the outer catch + throw new RuntimeException(e); + } + }); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); } - }); + throw e; + } } /** @@ -421,9 +437,8 @@ protected void init() throws IOException { throw new IOException(message); } HAGroupStoreRecord record = haRecord.get(); - // First initialize the shard managers - this.peerShardManager = createPeerShardManager(record); - this.localShardManager = createLocalShardManager(record); + this.haGroupStoreRecord = record; + this.localShardManager = createLocalShardManager(); // Initialize the replication log forwarder. The log forwarder is only activated when // we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode this.logForwarder = new ReplicationLogDiscoveryForwarder(this); @@ -783,21 +798,14 @@ private ReplicationShardDirectoryManager createShardManager(String uri, String l } } - /** create shard manager for the standby cluster */ - protected ReplicationShardDirectoryManager createPeerShardManager(HAGroupStoreRecord record) - throws IOException { - return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR); - } - /** create shard manager for the fallback cluster */ - protected ReplicationShardDirectoryManager createLocalShardManager(HAGroupStoreRecord record) - throws IOException { - return createShardManager(record.getHdfsUrl(), FALLBACK_DIR); + protected ReplicationShardDirectoryManager createLocalShardManager() throws IOException { + return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR); } - /** return shard manager for the standby cluster */ - protected ReplicationShardDirectoryManager getPeerShardManager() { - return peerShardManager; + /** create shard manager for the standby cluster using stored record */ + protected ReplicationShardDirectoryManager createPeerShardManager() throws IOException { + return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), STANDBY_DIR); } /** return shard manager for the fallback cluster */ @@ -809,14 +817,10 @@ private FileSystem getFileSystem(URI uri) throws IOException { return FileSystem.get(uri, conf); } - /** Create the standby(synchronous) writer */ - protected ReplicationLog createStandbyLog() throws IOException { - return new ReplicationLog(this, peerShardManager); - } - - /** Create the fallback writer */ - protected ReplicationLog createFallbackLog() throws IOException { - return new ReplicationLog(this, localShardManager); + /** Create a replication log using the given shard manager */ + protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager) + throws IOException { + return new ReplicationLog(this, shardManager); } /** Returns the log forwarder for this replication group */ diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java index ea18a5b853c..7bcb4c89e09 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java @@ -55,7 +55,7 @@ protected StoreAndForwardModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the fallback cluster - log = logGroup.createFallbackLog(); + log = logGroup.createReplicationLog(logGroup.getLocalShardManager()); log.init(); // Schedule task to periodically set the HAGroupStore state to ACTIVE_NOT_IN_SYNC startHAGroupStoreUpdateTask(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index aefce975cfd..b1a13cbf20d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -44,7 +44,8 @@ protected SyncAndForwardModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - log = logGroup.createStandbyLog(); + ReplicationShardDirectoryManager peerShardManager = logGroup.createPeerShardManager(); + log = logGroup.createReplicationLog(peerShardManager); log.init(); // no-op if the forwarder is already started logGroup.getLogForwarder().start(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 75e175ad4e7..8c9edad21cb 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -43,7 +43,8 @@ protected SyncModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - log = logGroup.createStandbyLog(); + ReplicationShardDirectoryManager peerShardManager = logGroup.createPeerShardManager(); + log = logGroup.createReplicationLog(peerShardManager); log.init(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java index 949c3f20295..c4d28a260f3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java @@ -124,7 +124,7 @@ private Map> groupLogsByTable() throws Exception { LogFileAnalyzer analyzer = new LogFileAnalyzer(); // use peer cluster conf analyzer.setConf(conf2); - Path standByLogDir = logGroup.getPeerShardManager().getRootDirectoryPath(); + Path standByLogDir = logGroup.createPeerShardManager().getRootDirectoryPath(); LOG.info("Analyzing log files at {}", standByLogDir); String[] args = { "--check", standByLogDir.toString() }; assertEquals(0, analyzer.run(args)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java index f748e3d1a8c..d49e2ffc941 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java @@ -165,13 +165,9 @@ public TestableLogGroup(Configuration conf, ServerName serverName, String haGrou } @Override - protected ReplicationLog createStandbyLog() throws IOException { - return spy(new TestableLog(this, peerShardManager, useAlignedRotation)); - } - - @Override - protected ReplicationLog createFallbackLog() throws IOException { - return spy(new TestableLog(this, localShardManager, useAlignedRotation)); + protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager) + throws IOException { + return spy(new TestableLog(this, shardManager, useAlignedRotation)); } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java index 46efbf196e4..2413a12f332 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.util.concurrent.Callable; @@ -179,4 +180,56 @@ public Object answer(InvocationOnMock invocation) { } assertEquals(SYNC, logGroup.getMode()); } + + /** + * Tests that the forwarder retries peer shard manager creation when the peer is initially + * unavailable. On the first attempt, createPeerShardManager throws; the file is marked failed and + * retried via in-progress processing. On the retry the peer becomes available and forwarding + * succeeds. + */ + @Test + public void testForwarderRetriesPeerCreation() throws Exception { + final String tableName = "TBLFWDRETRY"; + final long count = 10L; + + // Ensure in-progress files are immediately eligible for retry and always processed + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0); + conf.setDouble( + ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY, + 100.0); + // Recreate the log group with the updated config + recreateLogGroup(); + assertEquals(STORE_AND_FORWARD, logGroup.getMode()); + + // Make createPeerShardManager fail on the first call, then succeed on subsequent calls + doThrow(new IOException("Peer namenode unavailable")).doCallRealMethod().when(logGroup) + .createPeerShardManager(); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + logGroup.setMode(SYNC); + try { + logGroup.sync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return 0L; + } + }).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName); + + // Write some data so the forwarder has files to process + for (long id = 1; id <= count; ++id) { + Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2); + logGroup.append(tableName, id, put); + } + logGroup.sync(); + + // Wait for the forwarder to eventually succeed after retrying peer creation + long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000; + while (logGroup.getMode() != SYNC && EnvironmentEdgeManager.currentTimeMillis() < deadline) { + Thread.sleep(500); + } + assertEquals(SYNC, logGroup.getMode()); + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 727959541e6..77bd8ce6b94 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -1709,7 +1709,8 @@ public void testBothSyncAndSafFailuresTriggersAbort() throws Exception { doAnswer(poisonNewWriter).when(log).createNewWriter(); return log; }; - doAnswer(poisonLog).when(logGroup).createFallbackLog(); + doAnswer(poisonLog).when(logGroup) + .createReplicationLog(any(ReplicationShardDirectoryManager.class)); // Poison the already-initialized SYNC log ReplicationLog activeLog = logGroup.getActiveLog(); @@ -1793,4 +1794,48 @@ public void testCalculateSyncTimeout() throws Exception { recreateLogGroup(); assertEquals("Explicit override should take precedence", 5000L, logGroup.syncTimeoutMs); } + + /** + * Tests that when the peer cluster is unavailable at startup, the group degrades from SYNC to + * STORE_AND_FORWARD and remains functional (append/sync work). + */ + @Test + public void testInitDegradesToSafWhenPeerUnavailable() throws Exception { + final String tableName = "TBLDEG"; + final long commitId = 1L; + final Mutation put = LogFileTestUtil.newPut("row", 1, 1); + + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + doThrow(new IOException("Standby namenode unavailable")).when(group).createPeerShardManager(); + group.init(); + + try { + // Should have degraded to STORE_AND_FORWARD + assertEquals(STORE_AND_FORWARD, group.getMode()); + + // Verify the group is functional — append and sync should work via local shard manager + group.append(tableName, commitId, put); + group.sync(); + } finally { + group.close(); + } + } + + /** + * Tests that when the local cluster is unavailable at startup, init fails with IOException. + * Neither SYNC nor SAF mode can operate without a local shard manager. + */ + @Test + public void testInitFailsWhenLocalUnavailable() throws Exception { + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + doThrow(new IOException("Local namenode unavailable")).when(group).createLocalShardManager(); + try { + group.init(); + fail("Should have thrown IOException when local shard manager is unavailable"); + } catch (IOException e) { + assertTrue(e.getMessage().contains("Local namenode unavailable")); + } + } } From eb87ff6daf7c119a4506691bb1dba905952fc388 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Fri, 8 May 2026 09:37:19 -0700 Subject: [PATCH 2/3] PHOENIX-7845 Addendum: Timeout-bound peer init and use UncheckedIOException - Replace RuntimeException wrapping in get() with UncheckedIOException to avoid misclassifying unrelated RuntimeExceptions with IOException causes - Bound peer shard manager creation with configurable timeout (default 10s) via CompletableFuture.get() to prevent blocking the disruptor handler thread on peer NN outage; TimeoutException triggers SAF degradation - Consolidate peer shard manager into a single lazy synchronized accessor on ReplicationLogGroup; remove per-component caching from forwarder - Cancel the in-flight future on timeout to release the ForkJoinPool thread - Add test for timeout-triggered SAF degradation --- .../ReplicationLogDiscoveryForwarder.java | 10 +-- .../replication/ReplicationLogGroup.java | 76 +++++++++++++++---- .../replication/SyncAndForwardModeImpl.java | 2 +- .../phoenix/replication/SyncModeImpl.java | 2 +- .../replication/ReplicationLogGroupIT.java | 2 +- .../ReplicationLogDiscoveryForwarderTest.java | 10 +-- .../replication/ReplicationLogGroupTest.java | 37 ++++++++- 7 files changed, 106 insertions(+), 33 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java index 9d61a052089..17574215994 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java @@ -67,7 +67,6 @@ public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery { private final double copyThroughputThresholdBytesPerMs; // the timestamp (in future) at which we will attempt to set the HAGroup state to SYNC private long syncUpdateTS; - private ReplicationShardDirectoryManager peerShardManager; /** * Create a tracker for the replication logs in the fallback cluster. @@ -136,19 +135,12 @@ public void init() throws IOException { HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, activeInSync); } - private ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException { - if (peerShardManager == null) { - peerShardManager = logGroup.createPeerShardManager(); - } - return peerShardManager; - } - @Override protected void processFile(Path src) throws IOException { FileSystem srcFS = replicationLogTracker.getFileSystem(); FileStatus srcStat = srcFS.getFileStatus(src); long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath()); - ReplicationShardDirectoryManager remoteShardManager = getOrCreatePeerShardManager(); + ReplicationShardDirectoryManager remoteShardManager = logGroup.getOrCreatePeerShardManager(); Path dst = remoteShardManager.getWriterPath(ts, logGroup.getServerName().getServerName()); long startTime = EnvironmentEdgeManager.currentTimeMillis(); FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, false, false, conf); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index e024c763e1a..a8461f0bd9e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -36,6 +36,7 @@ import com.lmax.disruptor.dsl.ProducerType; import java.io.IOException; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -165,6 +166,9 @@ public class ReplicationLogGroup { public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY = "phoenix.replication.log.retry.delay.ms"; public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L; + public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY = + "phoenix.replication.log.peer.init.timeout.ms"; + public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L; public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout"; public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L; @@ -180,10 +184,14 @@ public class ReplicationLogGroup { protected final String haGroupName; protected final HAGroupStoreManager haGroupStoreManager; protected final MetricsReplicationLogGroupSource metrics; + // Cached at init time — HDFS URLs (local and peer) are fixed for the lifetime of this group. + // URL changes require RS restart. protected HAGroupStoreRecord haGroupStoreRecord; protected ReplicationShardDirectoryManager localShardManager; + protected volatile ReplicationShardDirectoryManager peerShardManager; protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; + protected long peerInitTimeoutMs; protected volatile boolean closed = false; /** @@ -343,15 +351,11 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, return group; } catch (IOException e) { LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - // computeIfAbsent does not allow checked exceptions; unwrapped in the outer catch - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }); - } catch (RuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw e; + } catch (UncheckedIOException e) { + throw e.getCause(); } } @@ -375,15 +379,11 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName, return group; } catch (IOException e) { LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e); - // computeIfAbsent does not allow checked exceptions; unwrapped in the outer catch - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }); - } catch (RuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw e; + } catch (UncheckedIOException e) { + throw e.getCause(); } } @@ -439,6 +439,8 @@ protected void init() throws IOException { HAGroupStoreRecord record = haRecord.get(); this.haGroupStoreRecord = record; this.localShardManager = createLocalShardManager(); + this.peerInitTimeoutMs = conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, + DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS); // Initialize the replication log forwarder. The log forwarder is only activated when // we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode this.logForwarder = new ReplicationLogDiscoveryForwarder(this); @@ -803,7 +805,51 @@ protected ReplicationShardDirectoryManager createLocalShardManager() throws IOEx return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR); } - /** create shard manager for the standby cluster using stored record */ + /** + * Get or create the peer shard manager. Thread-safe and idempotent — the first successful + * creation is cached; subsequent calls return the cached instance. Bounded by + * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the disruptor handler + * thread on a peer NN outage. + */ + protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException { + ReplicationShardDirectoryManager cached = peerShardManager; + if (cached != null) { + return cached; + } + synchronized (this) { + if (peerShardManager != null) { + return peerShardManager; + } + CompletableFuture future = + CompletableFuture.supplyAsync(() -> { + try { + return createPeerShardManager(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + try { + peerShardManager = future.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); + return peerShardManager; + } catch (UncheckedIOException e) { + throw e.getCause(); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new IOException("Failed to create peer shard manager", e.getCause()); + } catch (TimeoutException e) { + future.cancel(true); + throw new IOException("Timed out creating peer shard manager after " + peerInitTimeoutMs + + "ms for " + haGroupName, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while creating peer shard manager", e); + } + } + } + + /** Create a new peer shard manager for the standby cluster */ protected ReplicationShardDirectoryManager createPeerShardManager() throws IOException { return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), STANDBY_DIR); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java index b1a13cbf20d..02c57f7f3fd 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java @@ -44,7 +44,7 @@ protected SyncAndForwardModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - ReplicationShardDirectoryManager peerShardManager = logGroup.createPeerShardManager(); + ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager(); log = logGroup.createReplicationLog(peerShardManager); log.init(); // no-op if the forwarder is already started diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java index 8c9edad21cb..0a5b5a48c66 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java @@ -43,7 +43,7 @@ protected SyncModeImpl(ReplicationLogGroup logGroup) { void onEnter() throws IOException { LOG.info("HAGroup {} entered mode {}", logGroup, this); // create a log on the standby cluster - ReplicationShardDirectoryManager peerShardManager = logGroup.createPeerShardManager(); + ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager(); log = logGroup.createReplicationLog(peerShardManager); log.init(); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java index c4d28a260f3..651cddde6c4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java @@ -124,7 +124,7 @@ private Map> groupLogsByTable() throws Exception { LogFileAnalyzer analyzer = new LogFileAnalyzer(); // use peer cluster conf analyzer.setConf(conf2); - Path standByLogDir = logGroup.createPeerShardManager().getRootDirectoryPath(); + Path standByLogDir = logGroup.getOrCreatePeerShardManager().getRootDirectoryPath(); LOG.info("Analyzing log files at {}", standByLogDir); String[] args = { "--check", standByLogDir.toString() }; assertEquals(0, analyzer.run(args)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java index 2413a12f332..65fb764a31b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java @@ -183,9 +183,9 @@ public Object answer(InvocationOnMock invocation) { /** * Tests that the forwarder retries peer shard manager creation when the peer is initially - * unavailable. On the first attempt, createPeerShardManager throws; the file is marked failed and - * retried via in-progress processing. On the retry the peer becomes available and forwarding - * succeeds. + * unavailable. On the first attempt, getOrCreatePeerShardManager throws; the file is marked + * failed and retried via in-progress processing. On the retry the peer becomes available and + * forwarding succeeds. */ @Test public void testForwarderRetriesPeerCreation() throws Exception { @@ -201,9 +201,9 @@ public void testForwarderRetriesPeerCreation() throws Exception { recreateLogGroup(); assertEquals(STORE_AND_FORWARD, logGroup.getMode()); - // Make createPeerShardManager fail on the first call, then succeed on subsequent calls + // Make getOrCreatePeerShardManager fail on the first call, then succeed on subsequent calls doThrow(new IOException("Peer namenode unavailable")).doCallRealMethod().when(logGroup) - .createPeerShardManager(); + .getOrCreatePeerShardManager(); doAnswer(new Answer() { @Override diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 77bd8ce6b94..2bf723defd0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -1807,7 +1807,8 @@ public void testInitDegradesToSafWhenPeerUnavailable() throws Exception { ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, haGroupStoreManager, useAlignedRotation())); - doThrow(new IOException("Standby namenode unavailable")).when(group).createPeerShardManager(); + doThrow(new IOException("Standby namenode unavailable")).when(group) + .getOrCreatePeerShardManager(); group.init(); try { @@ -1838,4 +1839,38 @@ public void testInitFailsWhenLocalUnavailable() throws Exception { assertTrue(e.getMessage().contains("Local namenode unavailable")); } } + + /** + * Tests that when peer shard manager creation exceeds the configured timeout, the group degrades + * to STORE_AND_FORWARD instead of blocking the disruptor handler thread indefinitely. + */ + @Test + public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception { + final String tableName = "TBLTIMEOUT"; + final long commitId = 1L; + final Mutation put = LogFileTestUtil.newPut("row", 1, 1); + + // Set a very short peer init timeout + conf.setLong(ReplicationLogGroup.REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, 100L); + + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); + // Make createPeerShardManager block longer than the configured timeout + doAnswer(invocation -> { + Thread.sleep(5000); + return invocation.callRealMethod(); + }).when(group).createPeerShardManager(); + group.init(); + + try { + // Should have degraded to STORE_AND_FORWARD due to timeout + assertEquals(STORE_AND_FORWARD, group.getMode()); + + // Verify the group is functional + group.append(tableName, commitId, put); + group.sync(); + } finally { + group.close(); + } + } } From fedfdb445b9892431c99e98c9b6c639d6efefd5c Mon Sep 17 00:00:00 2001 From: tkhurana Date: Mon, 11 May 2026 13:40:35 -0700 Subject: [PATCH 3/3] PHOENIX-7845 Addendum: Fix peer shard manager async exception handling - Remove dead catch(UncheckedIOException) block; future.get() wraps supplier exceptions in ExecutionException, not UncheckedIOException - Unwrap UncheckedIOException inside ExecutionException handler to recover the original IOException - Cache the in-flight CompletableFuture to prevent spawning unbounded async tasks when peer HDFS is unavailable; reuse the pending future on timeout, retry only after exceptional completion - Remove no-op future.cancel(true) which has no effect on CompletableFuture (mayInterruptIfRunning is ignored) --- .../replication/ReplicationLogGroup.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index a8461f0bd9e..ded1fec7b1b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -189,6 +189,7 @@ public class ReplicationLogGroup { protected HAGroupStoreRecord haGroupStoreRecord; protected ReplicationShardDirectoryManager localShardManager; protected volatile ReplicationShardDirectoryManager peerShardManager; + private CompletableFuture peerShardManagerFuture; protected ReplicationLogDiscoveryForwarder logForwarder; protected long syncTimeoutMs; protected long peerInitTimeoutMs; @@ -820,26 +821,29 @@ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws if (peerShardManager != null) { return peerShardManager; } - CompletableFuture future = - CompletableFuture.supplyAsync(() -> { + if (peerShardManagerFuture == null || peerShardManagerFuture.isCompletedExceptionally()) { + // retry + peerShardManagerFuture = CompletableFuture.supplyAsync(() -> { try { return createPeerShardManager(); } catch (IOException e) { throw new UncheckedIOException(e); } }); + } try { - peerShardManager = future.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); + peerShardManager = peerShardManagerFuture.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS); return peerShardManager; - } catch (UncheckedIOException e) { - throw e.getCause(); } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); + Throwable cause = e.getCause(); + if (cause instanceof UncheckedIOException) { + throw ((UncheckedIOException) cause).getCause(); + } + if (cause instanceof IOException) { + throw (IOException) cause; } - throw new IOException("Failed to create peer shard manager", e.getCause()); + throw new IOException("Failed to create peer shard manager", cause); } catch (TimeoutException e) { - future.cancel(true); throw new IOException("Timed out creating peer shard manager after " + peerInitTimeoutMs + "ms for " + haGroupName, e); } catch (InterruptedException e) {