Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.phoenix.coprocessor;

import static org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;

import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
Expand All @@ -30,7 +33,9 @@
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.cache.ServerMetadataCacheImpl;
Expand All @@ -46,6 +51,7 @@
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
Expand All @@ -61,6 +67,7 @@ public class PhoenixRegionServerEndpoint extends
private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
protected ServerName serverName;
private ExecutorService prewarmExecutor;

// regionserver level thread pool used by Uncovered Indexes to scan data table rows
Expand All @@ -69,6 +76,9 @@ public class PhoenixRegionServerEndpoint extends
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
if (env instanceof RegionServerCoprocessorEnvironment) {
this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName();
}
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
Expand All @@ -89,6 +99,10 @@ public void start(CoprocessorEnvironment env) throws IOException {
public void stop(CoprocessorEnvironment env) throws IOException {
// Stop replication log replay
ReplicationLogReplayService.getInstance(conf).stop();
// Close all ReplicationLogGroup instances belonging to this server
if (serverName != null) {
ReplicationLogGroup.closeAll(serverName);
}
RegionServerCoprocessor.super.stop(env);
if (uncoveredIndexThreadPool != null) {
uncoveredIndexThreadPool
Expand Down Expand Up @@ -282,13 +296,26 @@ private void startHAGroupStoreClientPrewarming() {
}

// Phase 2: Prewarm individual HAGroupStoreClients with retry
// and eagerly initialize ReplicationLogGroup instances
boolean shouldInitReplicationLogGroup = serverName != null && conf
.getBoolean(SYNCHRONOUS_REPLICATION_ENABLED, DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
try {
while (!pending.isEmpty()) {
Iterator<String> iterator = pending.iterator();
while (iterator.hasNext()) {
String haGroup = iterator.next();
try {
manager.getClusterRoleRecord(haGroup);
if (shouldInitReplicationLogGroup) {
try {
ReplicationLogGroup.get(conf, serverName, haGroup);
LOGGER.info("Eagerly initialized ReplicationLogGroup {} on server {}", haGroup,
serverName);
} catch (Exception e) {
LOGGER.warn("Failed to eagerly initialize ReplicationLogGroup for HA group: {}."
+ " Will be lazily initialized on first mutation.", haGroup, e);
}
}
iterator.remove();
LOGGER.info("Prewarmed HAGroupStoreClient: {} ({} remaining)", haGroup,
pending.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public void close() {
if (closed) {
return;
}
LOG.info("Closing HAGroup {}", this);
// setting closed to true prevents future producers to add events to the ring buffer
closed = true;
// Remove from instances cache
Expand All @@ -652,6 +653,25 @@ public void close() {
}
}

/**
* Close all ReplicationLogGroup instances belonging to the given server. Called during
* RegionServer shutdown.
*/
public static void closeAll(ServerName serverName) {
LOG.info("Closing all ReplicationLogGroup instances for server {}, total count={}", serverName,
INSTANCES.size());
List<ReplicationLogGroup> groups = new ArrayList<>(INSTANCES.values());
for (ReplicationLogGroup group : groups) {
if (group.serverName.equals(serverName)) {
try {
group.close();
} catch (Exception e) {
LOG.warn("Error closing ReplicationLogGroup for HA group: {}", group.haGroupName, e);
}
}
}
}

private void shutdownDisruptorExecutor() {
disruptorExecutor.shutdown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.phoenix.end2end;

import java.io.IOException;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.phoenix.cache.ServerMetadataCache;
import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;

Expand All @@ -29,13 +25,6 @@
* support keeping multiple cache instances.
*/
public class PhoenixRegionServerEndpointTestImpl extends PhoenixRegionServerEndpoint {
protected ServerName serverName;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
super.start(env);
this.serverName = ((RegionServerCoprocessorEnvironment) env).getServerName();
}

@Override
public ServerMetadataCache getServerMetadataCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;

import static org.apache.phoenix.replication.ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY;
import static org.apache.phoenix.replication.ReplicationShardDirectoryManager.PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -106,7 +105,6 @@ public void setUpBase() throws Exception {
// small value of replication round duration
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY,
TEST_REPLICATION_ROUND_DURATION_SECONDS);
conf.setDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY, 0.0);
overrideConf(conf);

// initialize the group store record
Expand Down Expand Up @@ -136,12 +134,16 @@ protected void recreateLogGroup() throws Exception {
}

private ReplicationLogGroup createAndInitLogGroup() throws Exception {
ReplicationLogGroup group =
spy(new TestableLogGroup(conf, serverName, haGroupName, haGroupStoreManager));
ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, haGroupName,
haGroupStoreManager, useAlignedRotation()));
group.init();
return group;
}

protected boolean useAlignedRotation() {
return false;
}

protected static void waitForRotationTick(int roundDurationSeconds) throws InterruptedException {
Thread.sleep((long) (roundDurationSeconds * 1000 * 1.25));
LOG.info("Waking up after waiting for rotation tick");
Expand All @@ -154,41 +156,48 @@ private HAGroupStoreRecord initHAGroupStoreRecord() {
}

static class TestableLogGroup extends ReplicationLogGroup {
private final boolean useAlignedRotation;

public TestableLogGroup(Configuration conf, ServerName serverName, String haGroupName,
HAGroupStoreManager haGroupStoreManager) {
HAGroupStoreManager haGroupStoreManager, boolean useAlignedRotation) {
super(conf, serverName, haGroupName, haGroupStoreManager);
this.useAlignedRotation = useAlignedRotation;
}

@Override
protected ReplicationLog createStandbyLog() throws IOException {
return spy(new TestableLog(this, peerShardManager));
return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
}

@Override
protected ReplicationLog createFallbackLog() throws IOException {
return spy(new TestableLog(this, localShardManager));
return spy(new TestableLog(this, localShardManager, useAlignedRotation));
}

}

/**
* Testable version of ReplicationLog that allows spying on the log. Overrides
* startRotationExecutor to always use a full round as initial delay so that the rotation task
* never fires unexpectedly when a test happens to start near a round boundary.
* startRotationExecutor to use a configurable initial delay. By default uses a full round
* duration so the rotation task never fires early when the test happens to start near a round
* boundary. Set {@code useAlignedRotation} to true to use the production-aligned delay.
*/
static class TestableLog extends ReplicationLog {
private final boolean useAlignedRotation;

public TestableLog(ReplicationLogGroup logGroup,
ReplicationShardDirectoryManager shardManager) {
public TestableLog(ReplicationLogGroup logGroup, ReplicationShardDirectoryManager shardManager,
boolean useAlignedRotation) {
super(logGroup, shardManager);
this.useAlignedRotation = useAlignedRotation;
}

@Override
protected void startRotationExecutor() {
// Use a full round as the initial delay so the rotation task never fires early when the
// test happens to start close to a round boundary (e.g. initialDelay of 852ms on a 60s round)
super.startRotationExecutor(rotationTimeMs);
if (useAlignedRotation) {
super.startRotationExecutor();
} else {
super.startRotationExecutor(rotationTimeMs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ protected void overrideConf(Configuration conf) {
conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 20);
}

@Override
protected boolean useAlignedRotation() {
return true;
}

@Before
public void setUp() throws IOException {
ReplicationMode mode = logGroup.getMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,67 +1082,77 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc
.when(discovery)
.processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file3Prefix)));

// Process in-progress directory
discovery.processInProgressDirectory();
// Inject an advancing clock so that rename timestamps from markInProgress are always
// strictly before the renameTimestampThreshold computed on the next loop iteration
AtomicLong clock = new AtomicLong(EnvironmentEdgeManager.currentTime());
EnvironmentEdgeManager.injectEdge(clock::getAndIncrement);

// Verify that markInProgress was called 7 times (5 initially + 2 for retries)
Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class));
try {
// Process in-progress directory
discovery.processInProgressDirectory();

// Verify that markInProgress was called for each expected file
// Files 1 and 3 are called twice (initial attempt + retry), others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried
Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that markInProgress was called 7 times (5 initially + 2 for retries)
Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class));

// Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for
// failed once that would succeed in next retry)
Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class));
// Verify that markInProgress was called for each expected file
// Files 1 and 3 are called twice (initial attempt + retry), others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried
Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress(
Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
}

// Verify that processFile was called for each specific file (using prefix matching)
// Files 1 and 3 should be called twice (fail once, succeed on retry), others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail +
// retry success)
Mockito.verify(discovery, Mockito.times(expectedTimes))
.processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
}
// Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for
// failed once that would succeed in next retry)
Mockito.verify(discovery, Mockito.times(7)).processFile(Mockito.any(Path.class));

// Verify that markCompleted was called for each successfully processed file
Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class));
// Verify that processFile was called for each specific file (using prefix matching)
// Files 1 and 3 should be called twice (fail once, succeed on retry), others once
for (int i = 0; i < allInProgressFiles.size(); i++) {
Path expectedFile = allInProgressFiles.get(i);
String expectedPrefix = extractPrefix(expectedFile.getName());
int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail +
// retry success)
Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile(
Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
}

// Verify that markCompleted was called for 2 intermittent failed processed file
Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class));
// Verify that markCompleted was called for each successfully processed file
Mockito.verify(fileTracker, Mockito.times(5)).markCompleted(Mockito.any(Path.class));

// Verify that markFailed was called once ONLY for failed files
String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
Mockito.verify(fileTracker, Mockito.times(1))
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1)));
String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
Mockito.verify(fileTracker, Mockito.times(1))
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3)));
// Verify that markCompleted was called for 2 intermittent failed processed file
Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class));

// Verify that markFailed was NOT called for files processed successfully in first iteration
String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0)));
String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2)));
String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4)));
// Verify that markFailed was called once ONLY for failed files
String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName());
Mockito.verify(fileTracker, Mockito.times(1))
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1)));
String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName());
Mockito.verify(fileTracker, Mockito.times(1))
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3)));

// Verify that markCompleted was called for each successfully processed file with correct paths
for (Path expectedFile : allInProgressFiles) {
String expectedPrefix = extractPrefix(expectedFile.getName());
Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
// Verify that markFailed was NOT called for files processed successfully in first iteration
String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0)));
String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2)));
String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName());
Mockito.verify(fileTracker, Mockito.never())
.markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4)));

// Verify that markCompleted was called for each successfully processed file with correct
// paths
for (Path expectedFile : allInProgressFiles) {
String expectedPrefix = extractPrefix(expectedFile.getName());
Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(
Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix)));
}
} finally {
EnvironmentEdgeManager.reset();
}
}

Expand Down