From 9b1bb8311499567550d52b0bd612459a39ccaf69 Mon Sep 17 00:00:00 2001 From: tkhurana Date: Tue, 5 May 2026 17:13:30 -0700 Subject: [PATCH] PHOENIX-7769 Initialize and Close replication log groups as part of RS startup and shutdown --- .../PhoenixRegionServerEndpoint.java | 27 +++++ .../replication/ReplicationLogGroup.java | 20 +++ .../PhoenixRegionServerEndpointTestImpl.java | 11 -- .../replication/ReplicationLogBaseTest.java | 37 +++--- .../ReplicationLogDiscoveryForwarderTest.java | 5 + .../ReplicationLogDiscoveryTest.java | 114 ++++++++++-------- 6 files changed, 137 insertions(+), 77 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index cdb80df9267..3f35b40cef8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.coprocessor; +import static org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED; + import com.google.protobuf.ByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -30,7 +33,9 @@ import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerMetadataCache; import org.apache.phoenix.cache.ServerMetadataCacheImpl; @@ -46,6 +51,7 @@ import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.ReplicationLogGroup; import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.util.ClientUtil; import org.apache.phoenix.util.SchemaUtil; @@ -61,6 +67,7 @@ public class PhoenixRegionServerEndpoint extends private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class); private MetricsMetadataCachingSource metricsSource; protected Configuration conf; + protected ServerName serverName; private ExecutorService prewarmExecutor; // regionserver level thread pool used by Uncovered Indexes to scan data table rows @@ -69,6 +76,9 @@ public class PhoenixRegionServerEndpoint extends @Override public void start(CoprocessorEnvironment env) throws IOException { this.conf = env.getConfiguration(); + if (env instanceof RegionServerCoprocessorEnvironment) { + this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName(); + } this.metricsSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource(); initUncoveredIndexThreadPool(this.conf); @@ -89,6 +99,10 @@ public void start(CoprocessorEnvironment env) throws IOException { public void stop(CoprocessorEnvironment env) throws IOException { // Stop replication log replay ReplicationLogReplayService.getInstance(conf).stop(); + // Close all ReplicationLogGroup instances belonging to this server + if (serverName != null) { + ReplicationLogGroup.closeAll(serverName); + } RegionServerCoprocessor.super.stop(env); if (uncoveredIndexThreadPool != null) { uncoveredIndexThreadPool @@ -282,6 +296,9 @@ private void startHAGroupStoreClientPrewarming() { } // Phase 2: Prewarm individual HAGroupStoreClients with retry + // and eagerly initialize ReplicationLogGroup instances + boolean shouldInitReplicationLogGroup = serverName != null && conf + .getBoolean(SYNCHRONOUS_REPLICATION_ENABLED, DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED); try { while (!pending.isEmpty()) { Iterator iterator = pending.iterator(); @@ -289,6 +306,16 @@ private void startHAGroupStoreClientPrewarming() { String haGroup = iterator.next(); try { manager.getClusterRoleRecord(haGroup); + if (shouldInitReplicationLogGroup) { + try { + ReplicationLogGroup.get(conf, serverName, haGroup); + LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup, + serverName); + } catch (Exception e) { + LOGGER.warn("Failed to eagerly initialize ReplicationLogGroup for HA group: {}." + + " Will be lazily initialized on first mutation.", haGroup, e); + } + } iterator.remove(); LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)", haGroup, pending.size()); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java index d4ac31b4931..c88d9b2647b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java @@ -630,6 +630,7 @@ public void close() { if (closed) { return; } + LOG.info("Closing HAGroup {}", this); // setting closed to true prevents future producers to add events to the ring buffer closed = true; // Remove from instances cache @@ -652,6 +653,25 @@ public void close() { } } + /** + * Close all ReplicationLogGroup instances belonging to the given server. Called during + * RegionServer shutdown. + */ + public static void closeAll(ServerName serverName) { + LOG.info("Closing all ReplicationLogGroup instances for server {}, total count={}", serverName, + INSTANCES.size()); + List groups = new ArrayList<>(INSTANCES.values()); + for (ReplicationLogGroup group : groups) { + if (group.serverName.equals(serverName)) { + try { + group.close(); + } catch (Exception e) { + LOG.warn("Error closing ReplicationLogGroup for HA group: {}", group.haGroupName, e); + } + } + } + } + private void shutdownDisruptorExecutor() { disruptorExecutor.shutdown(); try { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java index eda9c73c1ef..a0a6e657966 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointTestImpl.java @@ -17,10 +17,6 @@ */ package org.apache.phoenix.end2end; -import java.io.IOException; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.phoenix.cache.ServerMetadataCache; import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint; @@ -29,13 +25,6 @@ * support keeping multiple cache instances. */ public class PhoenixRegionServerEndpointTestImpl extends PhoenixRegionServerEndpoint { - protected ServerName serverName; - - @Override - public void start(CoprocessorEnvironment env) throws IOException { - super.start(env); - this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName(); - } @Override public ServerMetadataCache getServerMetadataCache() { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java index 063020523e3..f748e3d1a8c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java @@ -17,7 +17,6 @@ */ package org.apache.phoenix.replication; -import static org.apache.phoenix.replication.ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY; import static org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -106,7 +105,6 @@ public void setUpBase() throws Exception { // small value of replication round duration conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, TEST_REPLICATION_ROUND_DURATION_SECONDS); - conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0); overrideConf(conf); // initialize the group store record @@ -136,12 +134,16 @@ protected void recreateLogGroup() throws Exception { } private ReplicationLogGroup createAndInitLogGroup() throws Exception { - ReplicationLogGroup group = - spy(new TestableLogGroup(conf, serverName, haGroupName, haGroupStoreManager)); + ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName, + haGroupStoreManager, useAlignedRotation())); group.init(); return group; } + protected boolean useAlignedRotation() { + return false; + } + protected static void waitForRotationTick(int roundDurationSeconds) throws InterruptedException { Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25)); LOG.info("Waking up after waiting for rotation tick"); @@ -154,41 +156,48 @@ private HAGroupStoreRecord initHAGroupStoreRecord() { } static class TestableLogGroup extends ReplicationLogGroup { + private final boolean useAlignedRotation; public TestableLogGroup(Configuration conf, ServerName serverName, String haGroupName, - HAGroupStoreManager haGroupStoreManager) { + HAGroupStoreManager haGroupStoreManager, boolean useAlignedRotation) { super(conf, serverName, haGroupName, haGroupStoreManager); + this.useAlignedRotation = useAlignedRotation; } @Override protected ReplicationLog createStandbyLog() throws IOException { - return spy(new TestableLog(this, peerShardManager)); + return spy(new TestableLog(this, peerShardManager, useAlignedRotation)); } @Override protected ReplicationLog createFallbackLog() throws IOException { - return spy(new TestableLog(this, localShardManager)); + return spy(new TestableLog(this, localShardManager, useAlignedRotation)); } } /** * Testable version of ReplicationLog that allows spying on the log. Overrides - * startRotationExecutor to always use a full round as initial delay so that the rotation task - * never fires unexpectedly when a test happens to start near a round boundary. + * startRotationExecutor to use a configurable initial delay. By default uses a full round + * duration so the rotation task never fires early when the test happens to start near a round + * boundary. Set {@code useAlignedRotation} to true to use the production-aligned delay. */ static class TestableLog extends ReplicationLog { + private final boolean useAlignedRotation; - public TestableLog(ReplicationLogGroup logGroup, - ReplicationShardDirectoryManager shardManager) { + public TestableLog(ReplicationLogGroup logGroup, ReplicationShardDirectoryManager shardManager, + boolean useAlignedRotation) { super(logGroup, shardManager); + this.useAlignedRotation = useAlignedRotation; } @Override protected void startRotationExecutor() { - // Use a full round as the initial delay so the rotation task never fires early when the - // test happens to start close to a round boundary (e.g. initialDelay of 852ms on a 60s round) - super.startRotationExecutor(rotationTimeMs); + if (useAlignedRotation) { + super.startRotationExecutor(); + } else { + super.startRotationExecutor(rotationTimeMs); + } } @Override diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java index 770956b70dc..46efbf196e4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java @@ -62,6 +62,11 @@ protected void overrideConf(Configuration conf) { conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20); } + @Override + protected boolean useAlignedRotation() { + return true; + } + @Before public void setUp() throws IOException { ReplicationMode mode = logGroup.getMode(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java index 6ed7d8723fb..4af410e56e3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java @@ -1082,67 +1082,77 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc .when(discovery) .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file3Prefix))); - // Process in-progress directory - discovery.processInProgressDirectory(); + // Inject an advancing clock so that rename timestamps from markInProgress are always + // strictly before the renameTimestampThreshold computed on the next loop iteration + AtomicLong clock = new AtomicLong(EnvironmentEdgeManager.currentTime()); + EnvironmentEdgeManager.injectEdge(clock::getAndIncrement); - // Verify that markInProgress was called 7 times (5 initially + 2 for retries) - Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); + try { + // Process in-progress directory + discovery.processInProgressDirectory(); - // Verify that markInProgress was called for each expected file - // Files 1 and 3 are called twice (initial attempt + retry), others once - for (int i = 0; i < allInProgressFiles.size(); i++) { - Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = extractPrefix(expectedFile.getName()); - int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried - Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress( - Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); - } + // Verify that markInProgress was called 7 times (5 initially + 2 for retries) + Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); - // Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for - // failed once that would succeed in next retry) - Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class)); + // Verify that markInProgress was called for each expected file + // Files 1 and 3 are called twice (initial attempt + retry), others once + for (int i = 0; i < allInProgressFiles.size(); i++) { + Path expectedFile = allInProgressFiles.get(i); + String expectedPrefix = extractPrefix(expectedFile.getName()); + int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried + Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } - // Verify that processFile was called for each specific file (using prefix matching) - // Files 1 and 3 should be called twice (fail once, succeed on retry), others once - for (int i = 0; i < allInProgressFiles.size(); i++) { - Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = extractPrefix(expectedFile.getName()); - int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail + - // retry success) - Mockito.verify(discovery, Mockito.times(expectedTimes)) - .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); - } + // Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for + // failed once that would succeed in next retry) + Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class)); - // Verify that markCompleted was called for each successfully processed file - Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class)); + // Verify that processFile was called for each specific file (using prefix matching) + // Files 1 and 3 should be called twice (fail once, succeed on retry), others once + for (int i = 0; i < allInProgressFiles.size(); i++) { + Path expectedFile = allInProgressFiles.get(i); + String expectedPrefix = extractPrefix(expectedFile.getName()); + int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail + + // retry success) + Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } - // Verify that markCompleted was called for 2 intermittent failed processed file - Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class)); + // Verify that markCompleted was called for each successfully processed file + Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class)); - // Verify that markFailed was called once ONLY for failed files - String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName()); - Mockito.verify(fileTracker, Mockito.times(1)) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1))); - String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName()); - Mockito.verify(fileTracker, Mockito.times(1)) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3))); + // Verify that markCompleted was called for 2 intermittent failed processed file + Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class)); - // Verify that markFailed was NOT called for files processed successfully in first iteration - String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0))); - String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2))); - String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName()); - Mockito.verify(fileTracker, Mockito.never()) - .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4))); + // Verify that markFailed was called once ONLY for failed files + String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1))); + String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3))); - // Verify that markCompleted was called for each successfully processed file with correct paths - for (Path expectedFile : allInProgressFiles) { - String expectedPrefix = extractPrefix(expectedFile.getName()); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( - Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + // Verify that markFailed was NOT called for files processed successfully in first iteration + String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0))); + String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2))); + String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4))); + + // Verify that markCompleted was called for each successfully processed file with correct + // paths + for (Path expectedFile : allInProgressFiles) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + } finally { + EnvironmentEdgeManager.reset(); } }