Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
* @return HA group if present or empty if missing
*/
private Optional<ReplicationLogGroup> getHAGroupFromBatch(RegionCoprocessorEnvironment env,
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (miniBatchOp.size() > 0) {
Mutation m = miniBatchOp.getOperation(0);
byte[] haGroupName = m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
Expand All @@ -702,7 +702,7 @@ private Optional<ReplicationLogGroup> getHAGroupFromBatch(RegionCoprocessorEnvir
* @return HA group if present or empty if missing
*/
private Optional<ReplicationLogGroup> getHAGroupFromWALKey(RegionCoprocessorEnvironment env,
org.apache.hadoop.hbase.wal.WALKey logKey) {
org.apache.hadoop.hbase.wal.WALKey logKey) throws IOException {
byte[] haGroupName =
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery {
public static final String REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.forwarder.waiting.buffer.percentage";

/**
* Configuration key for in-progress directory processing probability (percentage)
*/
public static final String REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY =
"phoenix.replication.forwarder.in.progress.processing.probability";

private final ReplicationLogGroup logGroup;
private final double copyThroughputThresholdBytesPerMs;
// the timestamp (in future) at which we will attempt to set the HAGroup state to SYNC
Expand Down Expand Up @@ -134,7 +140,7 @@ protected void processFile(Path src) throws IOException {
FileSystem srcFS = replicationLogTracker.getFileSystem();
FileStatus srcStat = srcFS.getFileStatus(src);
long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath());
ReplicationShardDirectoryManager remoteShardManager = logGroup.getPeerShardManager();
ReplicationShardDirectoryManager remoteShardManager = logGroup.getOrCreatePeerShardManager();
Path dst = remoteShardManager.getWriterPath(ts, logGroup.getServerName().getServerName());
long startTime = EnvironmentEdgeManager.currentTimeMillis();
FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, false, false, conf);
Expand Down Expand Up @@ -227,4 +233,10 @@ public double getWaitingBufferPercentage() {
return getConf().getDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY,
DEFAULT_WAITING_BUFFER_PERCENTAGE);
}

@Override
public double getInProgressDirectoryProcessProbability() {
return getConf().getDouble(REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
super.getInProgressDirectoryProcessProbability());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -165,6 +166,9 @@ public class ReplicationLogGroup {
public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
"phoenix.replication.log.retry.delay.ms";
public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY =
"phoenix.replication.log.peer.init.timeout.ms";
public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 10_000L;
public static final String WAL_SYNC_TIMEOUT_MS_KEY = "hbase.regionserver.wal.sync.timeout";
public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;

Expand All @@ -180,10 +184,14 @@ public class ReplicationLogGroup {
protected final String haGroupName;
protected final HAGroupStoreManager haGroupStoreManager;
protected final MetricsReplicationLogGroupSource metrics;
protected ReplicationShardDirectoryManager peerShardManager;
// Cached at init time — HDFS URLs (local and peer) are fixed for the lifetime of this group.
// URL changes require RS restart.
protected HAGroupStoreRecord haGroupStoreRecord;
protected ReplicationShardDirectoryManager localShardManager;
protected volatile ReplicationShardDirectoryManager peerShardManager;
protected ReplicationLogDiscoveryForwarder logForwarder;
protected long syncTimeoutMs;
protected long peerInitTimeoutMs;
protected volatile boolean closed = false;

/**
Expand Down Expand Up @@ -331,20 +339,24 @@ public Record(String tableName, long commitId, Mutation mutation) {
* @param serverName The server name
* @param haGroupName The HA Group name
* @return ReplicationLogGroup instance
* @throws RuntimeException if initialization fails
* @throws IOException if initialization fails
*/
public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
String haGroupName) {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName);
group.init();
return group;
} catch (IOException e) {
LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e);
throw new RuntimeException(e);
}
});
String haGroupName) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, haGroupName);
group.init();
return group;
} catch (IOException e) {
LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e);
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

/**
Expand All @@ -354,21 +366,25 @@ public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
* @param haGroupName The HA Group name
* @param haGroupStoreManager HA Group Store Manager instance
* @return ReplicationLogGroup instance
* @throws RuntimeException if initialization fails
* @throws IOException if initialization fails
*/
public static ReplicationLogGroup get(Configuration conf, ServerName serverName,
String haGroupName, HAGroupStoreManager haGroupStoreManager) {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
ReplicationLogGroup group =
new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager);
group.init();
return group;
} catch (IOException e) {
LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e);
throw new RuntimeException(e);
}
});
String haGroupName, HAGroupStoreManager haGroupStoreManager) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
ReplicationLogGroup group =
new ReplicationLogGroup(conf, serverName, haGroupName, haGroupStoreManager);
group.init();
return group;
} catch (IOException e) {
LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", haGroupName, e);
throw new UncheckedIOException(e);
}
});
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

/**
Expand Down Expand Up @@ -421,9 +437,10 @@ protected void init() throws IOException {
throw new IOException(message);
}
HAGroupStoreRecord record = haRecord.get();
// First initialize the shard managers
this.peerShardManager = createPeerShardManager(record);
this.localShardManager = createLocalShardManager(record);
this.haGroupStoreRecord = record;
this.localShardManager = createLocalShardManager();
this.peerInitTimeoutMs = conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY,
DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS);
// Initialize the replication log forwarder. The log forwarder is only activated when
// we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode
this.logForwarder = new ReplicationLogDiscoveryForwarder(this);
Expand Down Expand Up @@ -783,21 +800,58 @@ private ReplicationShardDirectoryManager createShardManager(String uri, String l
}
}

/** create shard manager for the standby cluster */
protected ReplicationShardDirectoryManager createPeerShardManager(HAGroupStoreRecord record)
throws IOException {
return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
/** create shard manager for the fallback cluster */
protected ReplicationShardDirectoryManager createLocalShardManager() throws IOException {
return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}

/** create shard manager for the fallback cluster */
protected ReplicationShardDirectoryManager createLocalShardManager(HAGroupStoreRecord record)
throws IOException {
return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
/**
* Get or create the peer shard manager. Thread-safe and idempotent — the first successful
* creation is cached; subsequent calls return the cached instance. Bounded by
* {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the disruptor handler
* thread on a peer NN outage.
*/
protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() throws IOException {
ReplicationShardDirectoryManager cached = peerShardManager;
if (cached != null) {
return cached;
}
synchronized (this) {
if (peerShardManager != null) {
return peerShardManager;
}
CompletableFuture<ReplicationShardDirectoryManager> future =
CompletableFuture.supplyAsync(() -> {
try {
return createPeerShardManager();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
try {
peerShardManager = future.get(peerInitTimeoutMs, TimeUnit.MILLISECONDS);
return peerShardManager;
} catch (UncheckedIOException e) {
throw e.getCause();
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException("Failed to create peer shard manager", e.getCause());
} catch (TimeoutException e) {
future.cancel(true);
throw new IOException("Timed out creating peer shard manager after " + peerInitTimeoutMs
+ "ms for " + haGroupName, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while creating peer shard manager", e);
}
}
}

/** return shard manager for the standby cluster */
protected ReplicationShardDirectoryManager getPeerShardManager() {
return peerShardManager;
/** Create a new peer shard manager for the standby cluster */
protected ReplicationShardDirectoryManager createPeerShardManager() throws IOException {
return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), STANDBY_DIR);
}

/** return shard manager for the fallback cluster */
Expand All @@ -809,14 +863,10 @@ private FileSystem getFileSystem(URI uri) throws IOException {
return FileSystem.get(uri, conf);
}

/** Create the standby(synchronous) writer */
protected ReplicationLog createStandbyLog() throws IOException {
return new ReplicationLog(this, peerShardManager);
}

/** Create the fallback writer */
protected ReplicationLog createFallbackLog() throws IOException {
return new ReplicationLog(this, localShardManager);
/** Create a replication log using the given shard manager */
protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager)
throws IOException {
return new ReplicationLog(this, shardManager);
}

/** Returns the log forwarder for this replication group */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected StoreAndForwardModeImpl(ReplicationLogGroup logGroup) {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the fallback cluster
log = logGroup.createFallbackLog();
log = logGroup.createReplicationLog(logGroup.getLocalShardManager());
log.init();
// Schedule task to periodically set the HAGroupStore state to ACTIVE_NOT_IN_SYNC
startHAGroupStoreUpdateTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ protected SyncAndForwardModeImpl(ReplicationLogGroup logGroup) {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the standby cluster
log = logGroup.createStandbyLog();
ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager();
log = logGroup.createReplicationLog(peerShardManager);
log.init();
// no-op if the forwarder is already started
logGroup.getLogForwarder().start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ protected SyncModeImpl(ReplicationLogGroup logGroup) {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the standby cluster
log = logGroup.createStandbyLog();
ReplicationShardDirectoryManager peerShardManager = logGroup.getOrCreatePeerShardManager();
log = logGroup.createReplicationLog(peerShardManager);
log.init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private Map<String, List<Mutation>> groupLogsByTable() throws Exception {
LogFileAnalyzer analyzer = new LogFileAnalyzer();
// use peer cluster conf
analyzer.setConf(conf2);
Path standByLogDir = logGroup.getPeerShardManager().getRootDirectoryPath();
Path standByLogDir = logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
LOG.info("Analyzing log files at {}", standByLogDir);
String[] args = { "--check", standByLogDir.toString() };
assertEquals(0, analyzer.run(args));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,9 @@ public TestableLogGroup(Configuration conf, ServerName serverName, String haGrou
}

@Override
protected ReplicationLog createStandbyLog() throws IOException {
return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
}

@Override
protected ReplicationLog createFallbackLog() throws IOException {
return spy(new TestableLog(this, localShardManager, useAlignedRotation));
protected ReplicationLog createReplicationLog(ReplicationShardDirectoryManager shardManager)
throws IOException {
return spy(new TestableLog(this, shardManager, useAlignedRotation));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;

import java.io.IOException;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -179,4 +180,56 @@ public Object answer(InvocationOnMock invocation) {
}
assertEquals(SYNC, logGroup.getMode());
}

/**
* Tests that the forwarder retries peer shard manager creation when the peer is initially
* unavailable. On the first attempt, getOrCreatePeerShardManager throws; the file is marked
* failed and retried via in-progress processing. On the retry the peer becomes available and
* forwarding succeeds.
*/
@Test
public void testForwarderRetriesPeerCreation() throws Exception {
final String tableName = "TBLFWDRETRY";
final long count = 10L;

// Ensure in-progress files are immediately eligible for retry and always processed
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0);
conf.setDouble(
ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
100.0);
// Recreate the log group with the updated config
recreateLogGroup();
assertEquals(STORE_AND_FORWARD, logGroup.getMode());

// Make getOrCreatePeerShardManager fail on the first call, then succeed on subsequent calls
doThrow(new IOException("Peer namenode unavailable")).doCallRealMethod().when(logGroup)
.getOrCreatePeerShardManager();

doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
logGroup.setMode(SYNC);
try {
logGroup.sync();
} catch (IOException e) {
throw new RuntimeException(e);
}
return 0L;
}
}).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName);

// Write some data so the forwarder has files to process
for (long id = 1; id <= count; ++id) {
Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
logGroup.append(tableName, id, put);
}
logGroup.sync();

// Wait for the forwarder to eventually succeed after retrying peer creation
long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000;
while (logGroup.getMode() != SYNC && EnvironmentEdgeManager.currentTimeMillis() < deadline) {
Thread.sleep(500);
}
assertEquals(SYNC, logGroup.getMode());
}
}
Loading