Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public static HRegionLocation getRegionLocation(Connection connection, byte[] re
}
Get get = new Get(row);
get.addFamily(HConstants.CATALOG_FAMILY);
get.setPriority(HConstants.INTERNAL_READ_QOS);
Result r = get(getMetaHTable(connection), get);
RegionLocations locations = getRegionLocations(r);
return locations == null
Expand All @@ -310,6 +311,7 @@ public static Result getCatalogFamilyRow(Connection connection, RegionInfo ri)
throws IOException {
Get get = new Get(getMetaKeyForRegion(ri));
get.addFamily(HConstants.CATALOG_FAMILY);
get.setPriority(HConstants.INTERNAL_READ_QOS);
return get(getMetaHTable(connection), get);
}

Expand Down Expand Up @@ -339,9 +341,7 @@ public static RegionInfo parseRegionInfoFromRegionName(byte[] regionName) throws
*/
public static Result getRegionResult(Connection connection, RegionInfo regionInfo)
throws IOException {
Get get = new Get(getMetaKeyForRegion(regionInfo));
get.addFamily(HConstants.CATALOG_FAMILY);
return get(getMetaHTable(connection), get);
return getCatalogFamilyRow(connection, regionInfo);
}

/**
Expand Down Expand Up @@ -576,6 +576,7 @@ private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
scan.setReadType(Scan.ReadType.PREAD);
}
scan.setCaching(scannerCaching);
scan.setPriority(HConstants.INTERNAL_READ_QOS);
return scan;
}

Expand Down Expand Up @@ -776,9 +777,11 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start
}

if (LOG.isTraceEnabled()) {
LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(startRow)
+ " stopping at row=" + Bytes.toStringBinary(stopRow) + " for max=" + rowUpperLimit
+ " with caching=" + scan.getCaching());
LOG.trace(
"Scanning META starting at row={} stopping at row={} for max={} with caching={} "
+ "priority={}",
Bytes.toStringBinary(startRow), Bytes.toStringBinary(stopRow), rowUpperLimit,
scan.getCaching(), scan.getPriority());
}

int currentRow = 0;
Expand Down Expand Up @@ -1774,7 +1777,7 @@ private static void updateLocation(Connection connection, RegionInfo regionInfo,
addRegionInfo(put, regionInfo);
addLocation(put, sn, openSeqNum, regionInfo.getReplicaId());
putToMetaTable(connection, put);
LOG.info("Updated row {} with server=", regionInfo.getRegionNameAsString(), sn);
LOG.info("Updated row {} with server = {}", regionInfo.getRegionNameAsString(), sn);
}

/**
Expand Down Expand Up @@ -1899,7 +1902,7 @@ public static Put addLocation(Put p, ServerName sn, long openSeqNum, int replica
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getAddress().toString())).build())
.add(builder.clear().setRow(p.getRow()).setFamily(getCatalogFamily())
.setQualifier(getStartCodeColumn(replicaId)).setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getStartcode())).build())
.setType(Cell.Type.Put).setValue(Bytes.toBytes(sn.getStartCode())).build())
.add(builder.clear().setRow(p.getRow()).setFamily(getCatalogFamily())
.setQualifier(getSeqNumColumn(replicaId)).setTimestamp(p.getTimestamp())
.setType(Cell.Type.Put).setValue(Bytes.toBytes(openSeqNum)).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,8 @@ public enum OperationStatusCode {
public static final int ADMIN_QOS = 100;
public static final int HIGH_QOS = 200;
public static final int SYSTEMTABLE_QOS = HIGH_QOS;
// QOS for internal meta read requests
public static final int INTERNAL_READ_QOS = 250;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought of another approch where we can define INTERNAL_READ_QOS in AnnotationReadingPriorityFunction (hbase-server module) and then let the master pass the priority while requesting to MetaTableAccessor, but that way we would end up doing many changes.

/**
* @deprecated the name "META_QOS" is a bit ambiguous, actually only meta region transition can
* use this priority, and you should not use this directly. Will be removed in 3.0.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -32,7 +33,10 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
"hbase.ipc.server.metacallqueue.read.ratio";
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.scan.ratio";
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;
public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
"hbase.ipc.server.metacallqueue.handler.factor";
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f;
private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f;

public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
Expand All @@ -46,6 +50,22 @@ protected float getReadShare(final Configuration conf) {

@Override
protected float getScanShare(final Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_SCAN_SHARE);
}

@Override
public boolean dispatch(CallRunner callTask) {
RpcCall call = callTask.getRpcCall();
int level = call.getHeader().getPriority();
final boolean toWriteQueue = isWriteRequest(call.getHeader(), call.getParam());
// dispatch client system read request to read handlers
// dispatch internal system read request to scan handlers
final boolean toScanQueue = getNumScanQueues() > 0 && level == HConstants.INTERNAL_READ_QOS;
return dispatchTo(toWriteQueue, toScanQueue, callTask);
}

@Override
protected float getCallQueueHandlerFactor(Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,8 @@ private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration
((ConfigurationObserver) balancer).onConfigurationChange(conf);
}
}

protected int getNumScanQueues() {
return numScanQueues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
this.conf = conf;
this.abortable = abortable;

float callQueuesHandlersFactor = this.conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
float callQueuesHandlersFactor = getCallQueueHandlerFactor(conf);
if (
Float.compare(callQueuesHandlersFactor, 1.0f) > 0
|| Float.compare(0.0f, callQueuesHandlersFactor) > 0
Expand Down Expand Up @@ -468,4 +468,8 @@ public void onConfigurationChange(Configuration conf) {
}
}
}

protected float getCallQueueHandlerFactor(Configuration conf) {
return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testBasic() throws IOException, InterruptedException {
RpcScheduler scheduler = new SimpleRpcScheduler(conf, 10, 0, 0, qosFunction, 0);
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
CallRunner task = createMockTask(HConstants.NORMAL_QOS);
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
}
Expand All @@ -185,9 +185,9 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
CallRunner generalTask = createMockTask();
CallRunner priorityTask = createMockTask();
CallRunner replicationTask = createMockTask();
CallRunner generalTask = createMockTask(HConstants.NORMAL_QOS);
CallRunner priorityTask = createMockTask(HConstants.HIGH_QOS + 1);
CallRunner replicationTask = createMockTask(HConstants.REPLICATION_QOS);
List<CallRunner> tasks = ImmutableList.of(generalTask, priorityTask, replicationTask);
Map<CallRunner, Integer> qos = ImmutableMap.of(generalTask, 0, priorityTask,
HConstants.HIGH_QOS + 1, replicationTask, HConstants.REPLICATION_QOS);
Expand Down Expand Up @@ -227,10 +227,12 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
}

private CallRunner createMockTask() {
private CallRunner createMockTask(int priority) {
ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build();
when(task.getRpcCall()).thenReturn(call);
when(call.getHeader()).thenReturn(header);
return task;
}

Expand Down Expand Up @@ -707,7 +709,7 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except
@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);

Expand All @@ -728,36 +730,37 @@ public void testMetaRWScanQueues() throws Exception {
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);

CallRunner getCallTask = mock(CallRunner.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);

CallRunner scanCallTask = mock(CallRunner.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);
CallRunner clientReadCallTask = mock(CallRunner.class);
ServerCall clientReadCall = mock(ServerCall.class);
RequestHeader clientReadHead = RequestHeader.newBuilder().setMethodName("get").build();
when(clientReadCallTask.getRpcCall()).thenReturn(clientReadCall);
when(clientReadCall.getHeader()).thenReturn(clientReadHead);

CallRunner internalReadCallTask = mock(CallRunner.class);
ServerCall internalReadCall = mock(ServerCall.class);
internalReadCall.param = ScanRequest.newBuilder().build();
RequestHeader masterReadHead = RequestHeader.newBuilder().setMethodName("scan")
.setPriority(HConstants.INTERNAL_READ_QOS).build();
when(internalReadCallTask.getRpcCall()).thenReturn(internalReadCall);
when(internalReadCall.getHeader()).thenReturn(masterReadHead);
when(internalReadCall.getParam()).thenReturn(internalReadCall.param);

ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
doAnswerTaskExecution(clientReadCallTask, work, 2, 1000);
doAnswerTaskExecution(internalReadCallTask, work, 3, 1000);

// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(clientReadCallTask);
scheduler.dispatch(internalReadCallTask);
scheduler.dispatch(internalReadCallTask);
scheduler.dispatch(internalReadCallTask);

while (work.size() < 6) {
Thread.sleep(100);
Expand Down