diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index f074711854c296..fc5b2ef601abbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3125,6 +3125,19 @@ private String getClientAddrAsString() { return addr == null ? "unknown" : addr.hostname; } + private List getAllCloudPrimaryBackends(CloudReplica cloudReplica) { + CloudSystemInfoService cloudSystemInfoService = (CloudSystemInfoService) Env.getCurrentSystemInfo(); + List backends = Lists.newArrayList(); + Set backendIds = new HashSet<>(); + for (String cloudClusterId : cloudSystemInfoService.getCloudClusterIds()) { + Backend primaryBackend = cloudReplica.getPrimaryBackend(cloudClusterId, true); + if (primaryBackend != null && backendIds.add(primaryBackend.getId())) { + backends.add(primaryBackend); + } + } + return backends; + } + @Override public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException { TWaitingTxnStatusResult result = new TWaitingTxnStatusResult(); @@ -3518,7 +3531,7 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos if (Config.isCloudMode()) { CloudReplica cloudReplica = (CloudReplica) replica; if (!request.isSetWarmUpJobId()) { - backends = cloudReplica.getAllPrimaryBes(); + backends = getAllCloudPrimaryBackends(cloudReplica); } else { // On the cloud, the PrimaryBackend of a tablet // indicates the BE where the tablet is stably located, @@ -3541,7 +3554,13 @@ public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfos replicaInfo.setBePort(backend.getBePort()); replicaInfo.setHttpPort(backend.getHttpPort()); replicaInfo.setBrpcPort(backend.getBrpcPort()); - replicaInfo.setReplicaId(replica.getId()); + if (Config.isCloudMode() && backend.getHost().equals(clientAddr)) { + replicaInfo.setReplicaId(replica.getId()); + } else if (Config.isCloudMode()) { + replicaInfo.setReplicaId(backend.getId()); + } else { + replicaInfo.setReplicaId(replica.getId()); + } replicaInfos.add(replicaInfo); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java index 00903892b23448..0ae776a5e76609 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/service/FrontendServiceImplTest.java @@ -21,6 +21,10 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.cloud.catalog.CloudReplica; +import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.DatasourcePrintableMap; @@ -30,6 +34,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.tablefunction.BackendsTableValuedFunction; import org.apache.doris.thrift.TBackendsMetadataParams; import org.apache.doris.thrift.TCreatePartitionRequest; @@ -38,9 +44,12 @@ import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; +import org.apache.doris.thrift.TGetTabletReplicaInfosRequest; +import org.apache.doris.thrift.TGetTabletReplicaInfosResult; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNullableStringLiteral; +import org.apache.doris.thrift.TReplicaInfo; import org.apache.doris.thrift.TSchemaTableName; import org.apache.doris.thrift.TSchemaTableRequestParams; import org.apache.doris.thrift.TShowUserRequest; @@ -48,7 +57,9 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.utframe.UtFrameUtils; +import mockit.Mock; import mockit.Mocked; +import mockit.MockUp; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -58,15 +69,95 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; public class FrontendServiceImplTest { private static String runningDir = "fe/mocked/FrontendServiceImplTest/" + UUID.randomUUID().toString() + "/"; private static ConnectContext connectContext; + + private static class TestTabletInvertedIndex extends TabletInvertedIndex { + private final Map> replicasByTabletId = new HashMap<>(); + + void setReplicas(long tabletId, List replicas) { + replicasByTabletId.put(tabletId, replicas); + } + + @Override + public List getReplicas(Long tabletId) { + return getReplicasByTabletId(tabletId); + } + + @Override + public void deleteTablet(long tabletId) { + replicasByTabletId.remove(tabletId); + } + + @Override + public void addReplica(long tabletId, Replica replica) { + replicasByTabletId.computeIfAbsent(tabletId, ignored -> new ArrayList<>()).add(replica); + } + + @Override + public void deleteReplica(long tabletId, long backendId) { + List replicas = replicasByTabletId.get(tabletId); + if (replicas == null) { + return; + } + replicas.removeIf(replica -> replica.getBackendIdWithoutException() == backendId); + } + + @Override + public Replica getReplica(long tabletId, long backendId) { + return getReplicasByTabletId(tabletId).stream() + .filter(replica -> replica.getBackendIdWithoutException() == backendId) + .findFirst() + .orElse(null); + } + + @Override + public List getReplicasByTabletId(long tabletId) { + return replicasByTabletId.getOrDefault(tabletId, Collections.emptyList()); + } + + @Override + protected void innerClear() { + replicasByTabletId.clear(); + } + } + + private static class TestCloudReplica extends CloudReplica { + private final Map primaryBackends; + + TestCloudReplica(long replicaId, Map primaryBackends) { + super(replicaId, -1L, Replica.ReplicaState.NORMAL, 1L, 0, 0, 0, 0, 0, 0); + this.primaryBackends = primaryBackends; + } + + @Override + public Backend getPrimaryBackend(String clusterId, boolean setIfAbsent) { + return primaryBackends.get(clusterId); + } + } + + private static class TestCloudSystemInfoService extends CloudSystemInfoService { + private final List clusterIds; + + TestCloudSystemInfoService(List clusterIds) { + this.clusterIds = clusterIds; + } + + @Override + public List getCloudClusterIds() { + return new ArrayList<>(clusterIds); + } + } + @Rule public ExpectedException expectedException = ExpectedException.none(); @Mocked @@ -303,4 +394,83 @@ public void testFetchAuthenticationIntegrationsSchemaTableData() throws Exceptio Env.getCurrentEnv().getAuthenticationIntegrationMgr().dropAuthenticationIntegration(integrationName, true); } } + + @Test + public void testGetTabletReplicaInfosReturnsAllCloudPrimaryBackends() { + String originalCloudUniqueId = Config.cloud_unique_id; + try { + Config.cloud_unique_id = "test_cloud"; + + long tabletId = 10001L; + long replicaId = 20001L; + Backend localBackend = new Backend(11L, "unknown", 9050); + localBackend.setBePort(9060); + localBackend.setHttpPort(8040); + localBackend.setBrpcPort(8060); + + Backend remoteBackend1 = new Backend(12L, "be-remote-1", 9050); + remoteBackend1.setBePort(9061); + remoteBackend1.setHttpPort(8041); + remoteBackend1.setBrpcPort(8061); + + Backend remoteBackend2 = new Backend(13L, "be-remote-2", 9050); + remoteBackend2.setBePort(9062); + remoteBackend2.setHttpPort(8042); + remoteBackend2.setBrpcPort(8062); + + Map primaryBackends = new HashMap<>(); + primaryBackends.put("cluster-a", localBackend); + primaryBackends.put("cluster-b", remoteBackend1); + primaryBackends.put("cluster-c", remoteBackend1); + primaryBackends.put("cluster-d", remoteBackend2); + + TestTabletInvertedIndex invertedIndex = new TestTabletInvertedIndex(); + invertedIndex.setReplicas(tabletId, + Collections.singletonList(new TestCloudReplica(replicaId, primaryBackends))); + + TestCloudSystemInfoService cloudSystemInfoService = new TestCloudSystemInfoService( + Arrays.asList("cluster-a", "cluster-b", "cluster-c", "cluster-d")); + + new MockUp() { + @Mock + public TabletInvertedIndex getTabletInvertedIndex() { + return invertedIndex; + } + + @Mock + public SystemInfoService getClusterInfo() { + return cloudSystemInfoService; + } + + @Mock + public String getToken() { + return "test-token"; + } + }; + + FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv); + TGetTabletReplicaInfosRequest request = new TGetTabletReplicaInfosRequest(); + request.setTabletIds(Collections.singletonList(tabletId)); + + TGetTabletReplicaInfosResult result = impl.getTabletReplicaInfos(request); + + Assert.assertEquals(TStatusCode.OK, result.getStatus().getStatusCode()); + Assert.assertEquals("test-token", result.getToken()); + + List replicaInfos = result.getTabletReplicaInfos().get(tabletId); + Assert.assertNotNull(replicaInfos); + Assert.assertEquals(3, replicaInfos.size()); + + Assert.assertEquals(localBackend.getHost(), replicaInfos.get(0).getHost()); + Assert.assertEquals(replicaId, replicaInfos.get(0).getReplicaId()); + + Assert.assertEquals(remoteBackend1.getHost(), replicaInfos.get(1).getHost()); + Assert.assertEquals(remoteBackend1.getId(), replicaInfos.get(1).getReplicaId()); + + Assert.assertEquals(remoteBackend2.getHost(), replicaInfos.get(2).getHost()); + Assert.assertEquals(remoteBackend2.getId(), replicaInfos.get(2).getReplicaId()); + } finally { + Config.cloud_unique_id = originalCloudUniqueId; + } + } }