diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java index da3a577824..511b494c52 100644 --- a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java @@ -130,7 +130,7 @@ public static AMRecord getAMRecord(final ChildData childData) throws IOException public void start() throws Exception { ZkConfig zkConf = new ZkConfig(this.conf); client = zkConf.createCuratorFramework(); - cache = new TreeCache(client, zkConf.getZkNamespace()); + cache = new TreeCache(client, zkConf.getZkAMNamespace()); client.start(); client.blockUntilConnected(); cache.start(); diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkConfig.java b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkConfig.java index af59072532..c76183ff8f 100644 --- a/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkConfig.java +++ b/tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkConfig.java @@ -44,7 +44,8 @@ public class ZkConfig { public final static String DEFAULT_COMPUTE_GROUP_NAME = "default-compute"; private final String zkQuorum; - private final String zkNamespace; + private final String zkAMNamespace; + private final String zkTaskNameSpace; private final int curatorBackoffSleepMs; private final int curatorMaxRetries; private final int sessionTimeoutMs; @@ -54,27 +55,42 @@ public ZkConfig(Configuration conf) { zkQuorum = conf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM); Preconditions.checkArgument(!Strings.isNullOrEmpty(zkQuorum), "zkQuorum cannot be null or empty"); - String fullZkNamespace = ZK_NAMESPACE_PREFIX; + String fullZkAMNamespace = ZK_NAMESPACE_PREFIX; - String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, + String amNamespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE, TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE_DEFAULT); - Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace cannot be null or empty"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(amNamespace), "namespace cannot be null or empty"); - fullZkNamespace = appendNamespace(fullZkNamespace, namespace); + fullZkAMNamespace = appendNamespace(fullZkAMNamespace, amNamespace); + + String fullZkTaskNameSpace = ZK_NAMESPACE_PREFIX; + + String taskNamespace = conf.get(TezConfiguration.TEZ_TASK_REGISTRY_NAMESPACE, + TezConfiguration.TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT); + Preconditions.checkArgument(!Strings.isNullOrEmpty(taskNamespace), "taskNamespace cannot be null or empty"); + + fullZkTaskNameSpace = appendNamespace(fullZkTaskNameSpace, taskNamespace); boolean enableComputeGroups = conf.getBoolean(TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS, TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS_DEFAULT); if (enableComputeGroups) { final String subNamespace = System.getenv(COMPUTE_GROUP_NAME_ENV); if (subNamespace != null && !subNamespace.isEmpty()) { - fullZkNamespace = appendNamespace(fullZkNamespace, subNamespace); - LOG.info("Compute groups enabled: subNamespace: {} fullZkNamespace: {}", subNamespace, fullZkNamespace); + fullZkAMNamespace = appendNamespace(fullZkAMNamespace, subNamespace); + fullZkTaskNameSpace = appendNamespace(fullZkTaskNameSpace, subNamespace); + LOG.info( + "Compute groups enabled: subNamespace: {} fullZkNamespace: {} fullZkTaskNameSpace: {}", + subNamespace, + fullZkAMNamespace, + fullZkTaskNameSpace); } } else { - LOG.info("Compute groups disabled: fullZkNamespace: {}", fullZkNamespace); + LOG.info("Compute groups disabled: fullZkNamespace: {} fullZkTaskNameSpace: {}", fullZkAMNamespace, fullZkTaskNameSpace); } - zkNamespace = fullZkNamespace; - LOG.info("Using ZK namespace: {}", fullZkNamespace); + zkAMNamespace = fullZkAMNamespace; + zkTaskNameSpace = fullZkTaskNameSpace; + LOG.info("Using ZK namespace: {}", fullZkAMNamespace); + LOG.info("Using ZK task namespace: {}", fullZkTaskNameSpace); curatorBackoffSleepMs = Math.toIntExact(conf.getTimeDuration(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP, TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP_DEFAULT, TimeUnit.MILLISECONDS)); @@ -90,8 +106,12 @@ public String getZkQuorum() { return zkQuorum; } - public String getZkNamespace() { - return zkNamespace; + public String getZkAMNamespace() { + return zkAMNamespace; + } + + public String getZkTaskNameSpace() { + return zkTaskNameSpace; } public int getCuratorBackoffSleepMs() { diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java index f376ba2384..8c3953a38b 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; @@ -66,6 +67,14 @@ public final class TezCommonUtils { public static final String TEZ_SYSTEM_SUB_DIR = ".tez"; + public static JobTokenSecretManager createJobTokenSecretManager(Configuration conf) { + String clusterId = conf.get(TezConfiguration.TEZ_AM_CLUSTER_ID); + if (clusterId != null) { + return new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(clusterId.getBytes()), conf); + } + return new JobTokenSecretManager(conf); + } + private TezCommonUtils() {} /** diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 240546b103..0a12944d92 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1250,6 +1250,9 @@ public TezConfiguration(boolean loadDefaults) { TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled"; public static final boolean TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT = false; + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_CLUSTER_ID = TEZ_AM_PREFIX + "cluster-id"; + @Private @Unstable /** @@ -2420,4 +2423,12 @@ static Set getPropertySet() { @ConfigurationScope(Scope.AM) @ConfigurationProperty public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs"; + + /** + * String value. Namespace in ZooKeeper registry for the TezChild + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty + public static final String TEZ_TASK_REGISTRY_NAMESPACE = TEZ_TASK_PREFIX + "registry.namespace"; + public static final String TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT = "/tez_am/workers"; } diff --git a/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkConfig.java b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkConfig.java index 7a4ab20db4..e2f655eded 100644 --- a/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkConfig.java +++ b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkConfig.java @@ -40,7 +40,7 @@ public void testBasicConfiguration() { ZkConfig zkConfig = new ZkConfig(conf); assertEquals("localhost:2181", zkConfig.getZkQuorum()); - assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace()); } @Test @@ -80,7 +80,7 @@ public void testCustomConfigurationValues() { ZkConfig zkConfig = new ZkConfig(conf); assertEquals("zk1:2181,zk2:2181", zkConfig.getZkQuorum()); - assertEquals("/tez-external-sessions/custom-namespace", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/custom-namespace", zkConfig.getZkAMNamespace()); assertEquals(2000, zkConfig.getCuratorBackoffSleepMs()); assertEquals(5, zkConfig.getCuratorMaxRetries()); assertEquals(200000, zkConfig.getSessionTimeoutMs()); @@ -95,7 +95,7 @@ public void testNamespaceWithLeadingSlash() { ZkConfig zkConfig = new ZkConfig(conf); - assertEquals("/tez-external-sessions/namespace-with-slash", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/namespace-with-slash", zkConfig.getZkAMNamespace()); } @Test @@ -106,7 +106,7 @@ public void testNamespaceWithoutLeadingSlash() { ZkConfig zkConfig = new ZkConfig(conf); - assertEquals("/tez-external-sessions/namespace-without-slash", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/namespace-without-slash", zkConfig.getZkAMNamespace()); } @Test @@ -118,7 +118,7 @@ public void testComputeGroupsDisabled() { ZkConfig zkConfig = new ZkConfig(conf); - assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace()); } @Test @@ -132,7 +132,7 @@ public void testComputeGroupsEnabledWithoutEnvVar() { ZkConfig zkConfig = new ZkConfig(conf); // Namespace should start with base namespace (env var not set, so no sub-namespace added) - assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace()); + assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace()); } @Test @@ -225,6 +225,6 @@ public void testDefaultNamespace() { // Don't set namespace, should use default ZkConfig zkConfig = new ZkConfig(conf); assertEquals("/tez-external-sessions" + TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE_DEFAULT, - zkConfig.getZkNamespace()); + zkConfig.getZkAMNamespace()); } } diff --git a/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java index efb5f98733..cb0c0efed9 100644 --- a/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/registry/zookeeper/TestZkFrameworkClient.java @@ -224,7 +224,7 @@ private void registerMockAM(TezConfiguration tezConf, ApplicationId appId, Strin // Wait for connection curatorClient.blockUntilConnected(); - String namespace = zkConfig.getZkNamespace(); + String namespace = zkConfig.getZkAMNamespace(); String path = namespace + "/" + appId.toString(); // Create parent directories if needed diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java index db07124afc..458ccbbb82 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java @@ -63,7 +63,7 @@ public ZkAMRegistry(String externalId) { public void init(Configuration conf) { zkConfig = new ZkConfig(conf); this.client = zkConfig.createCuratorFramework(); - this.namespace = zkConfig.getZkNamespace(); + this.namespace = zkConfig.getZkAMNamespace(); LOG.info("ZkAMRegistry initialized"); } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c119d572ab..70234094f7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -20,6 +20,8 @@ +import static org.apache.tez.frameworkplugins.FrameworkMode.STANDALONE_ZOOKEEPER; + import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -130,6 +132,8 @@ import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; +import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; import org.apache.tez.dag.app.dag.DAG; @@ -514,7 +518,7 @@ protected void serviceInit(final Configuration conf) throws Exception { containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf); addIfService(containerHeartbeatHandler, true); - jobTokenSecretManager = new JobTokenSecretManager(amConf); + jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf); sessionToken = frameworkService.getAMExtensions().getSessionToken( appAttemptID, jobTokenSecretManager, amCredentials); @@ -2469,6 +2473,49 @@ public static void main(String[] args) { amPluginDescriptorProto = confProto.getAmPluginDescriptor(); } + String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); + if (STANDALONE_ZOOKEEPER.name().equalsIgnoreCase(frameworkMode)) { + LOG.info("External AM: Injecting Standalone ZK Service Plugins dynamically"); + + TezEntityDescriptorProto schedulerEntity = + TezEntityDescriptorProto.newBuilder() + .setClassName("org.apache.tez.dag.app.rm.ZookeeperTaskScheduler") + .build(); + TezNamedEntityDescriptorProto schedulerNamed = + TezNamedEntityDescriptorProto.newBuilder() + .setName("zk_scheduler") + .setEntityDescriptor(schedulerEntity) + .build(); + + TezEntityDescriptorProto launcherEntity = + TezEntityDescriptorProto.newBuilder() + .setClassName("org.apache.tez.dag.app.launcher.NoOpContainerLauncher") + .build(); + TezNamedEntityDescriptorProto launcherNamed = + TezNamedEntityDescriptorProto.newBuilder() + .setName("zk_launcher") + .setEntityDescriptor(launcherEntity) + .build(); + + TezEntityDescriptorProto commEntity = + TezEntityDescriptorProto.newBuilder() + .setClassName("org.apache.tez.dag.app.TezTaskCommunicatorImpl") + .build(); + TezNamedEntityDescriptorProto commNamed = + TezNamedEntityDescriptorProto.newBuilder() + .setName("zk_communicator") + .setEntityDescriptor(commEntity) + .build(); + + amPluginDescriptorProto = + AMPluginDescriptorProto.newBuilder() + .setContainersEnabled(false) + .addTaskSchedulers(schedulerNamed) + .addContainerLaunchers(launcherNamed) + .addTaskCommunicators(commNamed) + .build(); + } + UserGroupInformation.setConfiguration(conf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 411e3cd0d4..57ba9a7011 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -411,11 +411,12 @@ private ContainerTask getContainerTask(ContainerId containerId) throws IOExcepti if (getContext().isKnownContainer(containerId)) { LOG.info("Container with id: " + containerId + " is valid, but no longer registered, and will be killed"); + task = TASK_FOR_INVALID_JVM; } else { LOG.info("Container with id: " + containerId - + " is invalid and will be killed"); + + " is not yet registered, asking it to wait"); + task = null; } - task = TASK_FOR_INVALID_JVM; } else { synchronized (containerInfo) { getContext().containerAlive(containerId); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 392739e292..c57cd3e8d9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -39,6 +39,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.Nullable; @@ -1129,6 +1131,32 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, } } + if (!isLocal) { + taskSchedulerName = + resolveFallbackName( + taskSchedulerName, + "Task scheduler", + "scheduler", + appContext::getTaskScheduerIdentifier, + () -> appContext.getTaskSchedulerName(0)); + + containerLauncherName = + resolveFallbackName( + containerLauncherName, + "Container launcher", + "launcher", + appContext::getContainerLauncherIdentifier, + () -> appContext.getContainerLauncherName(0)); + + taskCommName = + resolveFallbackName( + taskCommName, + "Task communicator", + "communicator", + appContext::getTaskCommunicatorIdentifier, + () -> appContext.getTaskCommunicatorName(0)); + } + try { taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); } catch (Exception e) { @@ -5022,4 +5050,27 @@ public void initShuffleDeletionContext(int deletionHeight) { vShuffleDeletionContext.setSpannedVertices(this); this.vShuffleDeletionContext = vShuffleDeletionContext; } + + /** Helper method to resolve plugin fallback configurations. */ + private String resolveFallbackName( + String currentName, + String logPrefix, + String logSuffix, + Function identifierProvider, + Supplier fallbackProvider) { + + if (identifierProvider.apply(currentName) == null) { + String fallback = fallbackProvider.get(); + if (fallback != null) { + LOG.info( + "{} {} not found, using default {} {} at index 0", + logPrefix, + currentName, + logSuffix, + fallback); + return fallback; + } + } + return currentName; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/NoOpContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/NoOpContainerLauncher.java new file mode 100644 index 0000000000..24eccc37c0 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/NoOpContainerLauncher.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A No-Op Container Launcher where TezChild processes are started externally (e.g., via Docker + * Compose or Kubernetes). + */ +public class NoOpContainerLauncher extends ContainerLauncher { + + private static final Logger LOG = LoggerFactory.getLogger(NoOpContainerLauncher.class); + + public NoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + /** This method is no-op it's just informing AM to change container state */ + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + LOG.info("Container is externally managed: {}", launchRequest.getContainerId()); + + // Immediately tell AM that the container is "launched" + getContext().containerLaunched(launchRequest.getContainerId()); + } + + /** This method is no-op it's just informing AM to change container state */ + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + LOG.info("Lifecycle is externally managed for container: {}", stopRequest.getContainerId()); + getContext().containerStopRequested(stopRequest.getContainerId()); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ZookeeperTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ZookeeperTaskScheduler.java new file mode 100644 index 0000000000..28ee9f22c3 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ZookeeperTaskScheduler.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.annotation.Nullable; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.registry.zookeeper.ZkConfig; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A TaskScheduler implementation that discovers externally managed workers via Zookeeper. This + * scheduler does not communicate with YARN RM/NM and manages containers discovered via ZK. + */ +public class ZookeeperTaskScheduler extends TaskScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperTaskScheduler.class); + + private record TaskRequest(Object task, Object clientCookie) {} + + private CuratorFramework zkClient; + private CuratorCache workerCache; + + private final Queue availableContainers = new ConcurrentLinkedQueue<>(); + private final Queue pendingTasks = new ConcurrentLinkedQueue<>(); + + public ZookeeperTaskScheduler(TaskSchedulerContext taskSchedulerContext) { + super(taskSchedulerContext); + } + + @Override + public void initialize() throws Exception { + Configuration conf; + try { + conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload()); + } catch (IOException e) { + LOG.warn("Failed to derive configuration from UserPayload, using default Configuration"); + conf = new Configuration(false); + } + + String zkQuorum = conf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM); + String appId = getContext().getApplicationAttemptId().getApplicationId().toString(); + + ZkConfig zkconfig = new ZkConfig(conf); + zkClient = CuratorFrameworkFactory.newClient(zkQuorum, zkconfig.getRetryPolicy()); + zkClient.start(); + + + String workerPath = zkconfig.getZkTaskNameSpace() + "/" + appId; + + if (zkClient.checkExists().forPath(workerPath) == null) { + zkClient.create().creatingParentsIfNeeded().forPath(workerPath); + } + + workerCache = CuratorCache.build(zkClient, workerPath); + + CuratorCacheListener listener = + CuratorCacheListener.builder() + .forCreates( + node -> { + String path = node.getPath(); + if (path.equals(workerPath)) { + return; + } + String containerIdStr = path.substring(path.lastIndexOf('/') + 1); + String host = new String(node.getData()); + + LOG.info("Discovered new TezChild via ZK: {} on host {}", containerIdStr, host); + + try { + ContainerId cId = ContainerId.fromString(containerIdStr); + NodeId mockNodeId = NodeId.newInstance(host, 0); + + // Using hardcoded resource value for now + Container mockContainer = + Container.newInstance( + cId, + mockNodeId, + "dummy:0", + Resource.newInstance(1024, 1), + Priority.newInstance(1), + null); + + availableContainers.offer(mockContainer); + + // Inform the AM IMMEDIATELY so it adds the cotainer to the Idle Pool + getContext().containerAllocated(mockContainer); + tryAllocate(); + + } catch (Exception e) { + LOG.error( + "Failed to parse and allocate discovered container: {}", containerIdStr, e); + } + }) + .forDeletes( + node -> { + LOG.warn("TezChild removed from ZK: {}", node.getPath()); + }) + .build(); + + workerCache.listenable().addListener(listener); + } + + @Override + public void start() throws Exception { + workerCache.start(); + } + + @Override + public void shutdown() throws Exception { + if (workerCache != null) { + workerCache.close(); + } + if (zkClient != null) { + zkClient.close(); + } + } + + @Override + public Resource getAvailableResources() { + // Return hardcoded resource based on available containers + return Resource.newInstance(1024 * availableContainers.size(), availableContainers.size()); + } + + @Override + public Resource getTotalResources() { + // Hardcoded total resources + return Resource.newInstance(1024 * 100, 100); + } + + @Override + public int getClusterNodeCount() { + return availableContainers.size(); + } + + @Override + public void blacklistNode(NodeId nodeId) { + // No-op for standalone mode + } + + @Override + public void unblacklistNode(NodeId nodeId) { + // No-op for standalone mode + } + + @Override + public void allocateTask( + Object task, + Resource capability, + String[] hosts, + String[] racks, + Priority priority, + Object containerSignature, + Object clientCookie) { + pendingTasks.offer(new TaskRequest(task, clientCookie)); + tryAllocate(); + } + + @Override + public void allocateTask( + Object task, + Resource capability, + ContainerId containerId, + Priority priority, + Object containerSignature, + Object clientCookie) { + // Affinity requested, but fallback to general allocation in standalone mode + allocateTask(task, capability, null, null, priority, containerSignature, clientCookie); + } + + private synchronized void tryAllocate() { + while (!pendingTasks.isEmpty() && !availableContainers.isEmpty()) { + TaskRequest request = pendingTasks.poll(); + Container container = availableContainers.poll(); + + if (request != null && container != null) { + LOG.info("Assigning Task to container: {}", container.getId().toString()); + getContext().taskAllocated(request.task(), request.clientCookie(), container); + } + } + } + + @Override + public boolean deallocateTask( + Object task, + boolean taskComplete, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) { + // This scheduler doesn't manage container reuse explicitly, so just return true + return true; + } + + @Override + public Object deallocateContainer(ContainerId containerId) { + return null; + } + + @Override + public void setShouldUnregister() { + // No-op + } + + @Override + public boolean hasUnregistered() { + return false; + } + + @Override + public void dagComplete() { + // No-op + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/zookeeper/TestZkAMRegistry.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/zookeeper/TestZkAMRegistry.java index cf3b9872e0..b5673ba504 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/zookeeper/TestZkAMRegistry.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/zookeeper/TestZkAMRegistry.java @@ -169,7 +169,7 @@ public void testAddAndRemoveAmRecordUpdatesZooKeeper() throws Exception { // Add record and verify node contents registry.add(record); - String path = zkConfig.getZkNamespace() + "/" + appId.toString(); + String path = zkConfig.getZkAMNamespace() + "/" + appId.toString(); byte[] data = checkClient.getData().forPath(path); assertNotNull("Data should be written to ZooKeeper for AMRecord", data); diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml index 5940a996ac..6c4e7e5d36 100644 --- a/tez-dist/pom.xml +++ b/tez-dist/pom.xml @@ -135,7 +135,7 @@ /bin/bash - ${project.basedir}/src/docker/tez-am/build-am-docker.sh + ${project.basedir}/src/docker/build.sh -tez ${project.version} -repo apache diff --git a/tez-dist/src/docker/tez-am/Dockerfile.am b/tez-dist/src/docker/Dockerfile similarity index 88% rename from tez-dist/src/docker/tez-am/Dockerfile.am rename to tez-dist/src/docker/Dockerfile index 01647f336c..9d16ff4332 100644 --- a/tez-dist/src/docker/tez-am/Dockerfile.am +++ b/tez-dist/src/docker/Dockerfile @@ -47,9 +47,8 @@ RUN set -ex; \ # Set necessary environment variables ENV TEZ_HOME=/opt/tez \ - TEZ_CONF_DIR=/opt/tez/conf - -ENV TEZ_CLIENT_VERSION=$TEZ_VERSION + TEZ_CONF_DIR=/opt/tez/conf \ + TEZ_CLIENT_VERSION=$TEZ_VERSION ENV PATH=$TEZ_HOME/bin:$PATH @@ -57,15 +56,15 @@ COPY --from=env --chown=tez /opt/tez $TEZ_HOME RUN mkdir -p $TEZ_CONF_DIR && chown tez:tez $TEZ_CONF_DIR -COPY --chown=tez am-entrypoint.sh / +COPY --chown=tez entrypoint.sh am-entrypoint.sh child-entrypoint.sh / COPY --chown=tez conf $TEZ_CONF_DIR # Create Extension Point Directory RUN mkdir -p /opt/tez/plugins && chown tez:tez /opt/tez/plugins && chmod 755 /opt/tez/plugins -RUN chmod +x /am-entrypoint.sh +RUN chmod +x /entrypoint.sh /am-entrypoint.sh /child-entrypoint.sh USER tez WORKDIR $TEZ_HOME -ENTRYPOINT ["/am-entrypoint.sh"] +ENTRYPOINT ["/entrypoint.sh"] diff --git a/tez-dist/src/docker/README.md b/tez-dist/src/docker/README.md new file mode 100644 index 0000000000..f815e225d8 --- /dev/null +++ b/tez-dist/src/docker/README.md @@ -0,0 +1,196 @@ + + +# Apache Tez Docker + +This directory contains a unified Docker implementation for running TezAM +and TezChild process from a single container image. Based on +`TEZ_COMPONENT` environment variable the entrypoint is dynamically selected + +1. Building the docker image: + + ```bash + mvn clean install -DskipTests -Pdocker + ``` + + Alternatively, you can build it explicitly via the provided script: + + ```bash + ./tez-dist/src/docker/build.sh -tez -repo apache + ``` + +2. Local Zookeeper Setup (Standalone): + + If you are running the AM container use the official Docker + image (Refer to docker-compose.yml): + + ```bash + docker pull zookeeper:3.8.4 + + docker run -d \ + --name zookeeper-server \ + -p 2181:2181 \ + -p 8080:8080 \ + -e ZOO_MY_ID=1 \ + zookeeper:3.8.4 + ``` + +3. Running the Tez containers explicitly: + + **Running the Tez AM:** + + ```bash + export TEZ_VERSION=1.0.0-SNAPSHOT + + docker run --rm \ + -p 10001:10001 \ + --env-file tez-dist/src/docker/am.env \ + --name tez-am \ + --hostname localhost \ + apache/tez:$TEZ_VERSION + ``` + + * `TEZ_VERSION` corresponds to the Maven `${project.version}`. + Set this environment variable in your shell before running the commands. + + * Expose ports using the `-p` flag based on the + `tez.am.client.am.port-range` property in `tez-site.xml`. + + * The `--hostname` flag configures the container's hostname, allowing + services on the host (e.g., macOS) to connect to it. + + * Ensure the `--env-file` flag is included, or at a minimum, pass + `-e TEZ_FRAMEWORK_MODE=STANDALONE_ZOOKEEPER` and `-e TEZ_COMPONENT=AM` + to the `docker run` command. + + **Running the Tez Child:** + + The child container requires specific arguments (` + `) to connect back to the + Application Master. + + Assuming your AM is running on `localhost` port `10001`, and the AM + assigned the container ID `container_1703023223000_0001_01_000001`: + + ```bash + docker run --rm \ + --network host \ + --env-file tez-dist/src/docker/child.env \ + --name tez-child \ + --hostname localhost \ + apache/tez:1.0.0-SNAPSHOT \ + localhost 10001 container_1703023223000_0001_01_000001 dummy_token_abc 1 + ``` + +4. Debugging the Tez containers: + Uncomment the `JAVA_TOOL_OPTIONS` in `am.env` (or `child.env` for + port 5006) and expose the debug port using `-p` flag: + + ```bash + docker run --rm \ + -p 10001:10001 -p 5005:5005 \ + --env-file tez-dist/src/docker/am.env \ + --name tez-am \ + --hostname localhost \ + apache/tez:$TEZ_VERSION + ``` + +5. To override the tez-site.xml in docker image use: + + * Set the `TEZ_CUSTOM_CONF_DIR` environment variable in `am.env` / + `child.env` or via the `docker run` command (e.g., + `/opt/tez/custom-conf`). + + ```bash + export TEZ_SITE_PATH=$(pwd)/tez-dist/src/docker/conf/tez-site.xml + + docker run --rm \ + -p 10001:10001 \ + --env-file tez-dist/src/docker/am.env \ + -v "$TEZ_SITE_PATH:/opt/tez/custom-conf/tez-site.xml" \ + --name tez-am \ + --hostname localhost \ + apache/tez:$TEZ_VERSION + ``` + +6. To add plugin jars in docker image use: + + * The plugin directory path inside the Docker container is fixed at + `/opt/tez/plugins`. + + ```bash + docker run --rm \ + -p 10001:10001 \ + --env-file tez-dist/src/docker/am.env \ + -v "/path/to/your/local/plugins:/opt/tez/plugins" \ + --name tez-am \ + --hostname localhost \ + apache/tez:$TEZ_VERSION + ``` + +7. Using Docker Compose (Local Testing Cluster): + + The provided `docker-compose.yml` offers a complete, minimal Hadoop + ecosystem to test Tez in a distributed manner locally without setting + up a real cluster. + + **Services Included:** + + * **namenode & datanode:** A minimal Apache Hadoop HDFS cluster + (lean image) + + * **zookeeper:** Required by the Tez AM for standalone session + discovery + + * **tez-am:** It automatically waits for Zookeeper and HDFS to + be healthy before starting up. + + * **tez-child:** TBD + + **To start the full cluster:** + + ```bash + docker-compose -f tez-dist/src/docker/docker-compose.yml up -d + ``` + + **To monitor the Application Master logs:** + + ```bash + docker-compose -f tez-dist/src/docker/docker-compose.yml logs -f tez-am + ``` + + **To shut down the cluster and clean up volumes (HDFS/Zookeeper data):** + + ```bash + docker-compose -f tez-dist/src/docker/docker-compose.yml down -v + ``` + +8. To mount custom plugins or JARs required by Tez AM (e.g., for split + generation — typically the hive-exec jar, but in general, any UDFs or + dependencies previously managed via YARN localization: + + * Create a directory `tez-plugins` and add all required jars. + + * Uncomment the following lines in docker compose under the `tez-am` + and `tez-child` services to mount this directory as a volume to + `/opt/tez/plugins` in the docker container. + + ```yaml + volumes: + - ./tez-plugins:/opt/tez/plugins + ``` diff --git a/tez-dist/src/docker/tez-am/am-entrypoint.sh b/tez-dist/src/docker/am-entrypoint.sh similarity index 91% rename from tez-dist/src/docker/tez-am/am-entrypoint.sh rename to tez-dist/src/docker/am-entrypoint.sh index a6128419ce..53f52821a7 100644 --- a/tez-dist/src/docker/tez-am/am-entrypoint.sh +++ b/tez-dist/src/docker/am-entrypoint.sh @@ -18,9 +18,9 @@ set -xeou pipefail -################################################ -# 1. Mocking DAGAppMaster#main() env variables # -################################################ +############################################# +# Mocking DAGAppMaster#main() env variables # +############################################# : "${USER:="tez"}" : "${LOCAL_DIRS:="/tmp"}" @@ -70,12 +70,6 @@ CLASSPATH="${CLASSPATH}:${TEZ_HOME}/*:${TEZ_HOME}/lib/*" ############# # Execution # ############# -TEZ_DAG_JAR=$(find "$TEZ_HOME" -maxdepth 1 -name "tez-dag-*.jar" ! -name "*-tests.jar" | head -n 1) - -if [ -z "$TEZ_DAG_JAR" ]; then - echo "Error: Could not find tez-dag-*.jar in $TEZ_HOME" - exit 1 -fi echo "--> Starting DAGAppMaster..." diff --git a/tez-dist/src/docker/tez-am/am.env b/tez-dist/src/docker/am.env similarity index 98% rename from tez-dist/src/docker/tez-am/am.env rename to tez-dist/src/docker/am.env index 93cabeea32..2feb3548d2 100644 --- a/tez-dist/src/docker/tez-am/am.env +++ b/tez-dist/src/docker/am.env @@ -19,6 +19,7 @@ USER=tez LOG_DIRS=/opt/tez/logs +TEZ_COMPONENT=AM TEZ_FRAMEWORK_MODE=STANDALONE_ZOOKEEPER TEZ_CUSTOM_CONF_DIR=/opt/tez/custom-conf # TEZ_AM_HEAP_OPTS configures the maximum heap size (Xmx) for the Tez AM. diff --git a/tez-dist/src/docker/tez-am/build-am-docker.sh b/tez-dist/src/docker/build.sh similarity index 84% rename from tez-dist/src/docker/tez-am/build-am-docker.sh rename to tez-dist/src/docker/build.sh index 66bf7fc738..17cc85e376 100755 --- a/tez-dist/src/docker/tez-am/build-am-docker.sh +++ b/tez-dist/src/docker/build.sh @@ -25,7 +25,7 @@ REPO= usage() { cat <&2 Usage: $0 [-h] [-tez ] [-repo ] -Build the Apache Tez AM Docker image +Build the Apache Tez (AM and CHILD) Docker image -help Display help -tez Build image with the specified Tez version -repo Docker repository @@ -59,8 +59,8 @@ SCRIPT_DIR=$( pwd ) -DIST_DIR=${DIST_DIR:-"$SCRIPT_DIR/../../.."} -PROJECT_ROOT=${PROJECT_ROOT:-"$SCRIPT_DIR/../../../.."} +DIST_DIR=${DIST_DIR:-"$SCRIPT_DIR/../../"} +PROJECT_ROOT=${PROJECT_ROOT:-"$SCRIPT_DIR/../../../"} REPO=${REPO:-apache} WORK_DIR="$(mktemp -d)" @@ -86,17 +86,19 @@ fi # ------------------------------------------------------------------------- # BUILD CONTEXT PREPARATION # ------------------------------------------------------------------------- -cp -R "$SCRIPT_DIR/conf" "$WORK_DIR/" 2>/dev/null || mkdir -p "$WORK_DIR/conf" +cp -R "$SCRIPT_DIR/conf" "$WORK_DIR/" +cp "$SCRIPT_DIR/entrypoint.sh" "$WORK_DIR/" cp "$SCRIPT_DIR/am-entrypoint.sh" "$WORK_DIR/" -cp "$SCRIPT_DIR/Dockerfile.am" "$WORK_DIR/" +cp "$SCRIPT_DIR/child-entrypoint.sh" "$WORK_DIR/" +cp "$SCRIPT_DIR/Dockerfile" "$WORK_DIR/" echo "Building Docker image..." docker build \ "$WORK_DIR" \ - -f "$WORK_DIR/Dockerfile.am" \ - -t "$REPO/tez-am:$TEZ_VERSION" \ + -f "$WORK_DIR/Dockerfile" \ + -t "$REPO/tez:$TEZ_VERSION" \ --build-arg "BUILD_ENV=unarchive" \ --build-arg "TEZ_VERSION=$TEZ_VERSION" rm -r "${WORK_DIR}" -echo "Docker image $REPO/tez-am:$TEZ_VERSION built successfully." +echo "Docker image $REPO/tez:$TEZ_VERSION built successfully." diff --git a/tez-dist/src/docker/child-entrypoint.sh b/tez-dist/src/docker/child-entrypoint.sh new file mode 100644 index 0000000000..b8fb0dd576 --- /dev/null +++ b/tez-dist/src/docker/child-entrypoint.sh @@ -0,0 +1,104 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -xeou pipefail + +######################################### +# Mocking TezChild#main() env variables # +######################################### + +: "${USER:="tez"}" +: "${LOCAL_DIRS:="/tmp"}" +: "${LOG_DIRS:="/opt/tez/logs"}" + +export USER LOCAL_DIRS LOG_DIRS + +mkdir -p "$LOG_DIRS" + +########################### +# Custom Config directory # +########################### +if [[ -n "${TEZ_CUSTOM_CONF_DIR:-}" ]] && [[ -d "$TEZ_CUSTOM_CONF_DIR" ]]; then + echo "--> Using custom configuration directory: $TEZ_CUSTOM_CONF_DIR" + find "${TEZ_CUSTOM_CONF_DIR}" -type f -exec \ + ln -sf {} "${TEZ_CONF_DIR}"/ \; + + # Remove template keyword if it exists + if [[ -f "$TEZ_CONF_DIR/tez-site.xml.template" ]]; then + envsubst < "$TEZ_CONF_DIR/tez-site.xml.template" > "$TEZ_CONF_DIR/tez-site.xml" + fi +fi + +############# +# CLASSPATH # +############# + +# Order is: conf -> plugins -> tez jars +CLASSPATH="${TEZ_CONF_DIR}" + +# Custom Plugins +# This allows mounting a volume at /opt/tez/plugins containing aux jars +PLUGIN_DIR="/opt/tez/plugins" +if [[ -d "$PLUGIN_DIR" ]]; then + count=$(find "$PLUGIN_DIR" -maxdepth 1 -name "*.jar" 2>/dev/null | wc -l) + if [ "$count" != "0" ]; then + echo "--> Found $count plugin jars. Prepending to classpath." + CLASSPATH="${CLASSPATH}:${PLUGIN_DIR}/*" + fi +fi + +# Tez Jars +CLASSPATH="${CLASSPATH}:${TEZ_HOME}/*:${TEZ_HOME}/lib/*" + +############# +# Execution # +############# + +echo "--> Starting TezChild..." + +: "${TEZ_CHILD_HEAP_OPTS:="-Xmx1024m"}" + +JAVA_ADD_OPENS=( + "--add-opens=java.base/java.lang=ALL-UNNAMED" + "--add-opens=java.base/java.util=ALL-UNNAMED" + "--add-opens=java.base/java.io=ALL-UNNAMED" + "--add-opens=java.base/java.net=ALL-UNNAMED" + "--add-opens=java.base/java.nio=ALL-UNNAMED" + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED" + "--add-opens=java.base/java.util.regex=ALL-UNNAMED" + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED" + "--add-opens=java.sql/java.sql=ALL-UNNAMED" + "--add-opens=java.base/java.text=ALL-UNNAMED" + "-Dnet.bytebuddy.experimental=true" +) + +read -r -a JAVA_OPTS_ARR <<< "${JAVA_OPTS:-}" +read -r -a HEAP_OPTS_ARR <<< "${TEZ_CHILD_HEAP_OPTS}" + +exec java "${HEAP_OPTS_ARR[@]}" "${JAVA_OPTS_ARR[@]}" "${JAVA_ADD_OPENS[@]}" \ + -Djava.net.preferIPv4Stack=true \ + -Djava.io.tmpdir="$PWD/tmp" \ + -Dtez.root.logger=INFO,CLA,console \ + -Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator \ + -Dlog4j.configuration=tez-container-log4j.properties \ + -Dyarn.app.container.log.dir="$LOG_DIRS" \ + -Dtez.conf.dir="$TEZ_CONF_DIR" \ + -cp "$CLASSPATH" \ + org.apache.tez.runtime.task.TezChild \ + "$@" diff --git a/tez-dist/src/docker/child.env b/tez-dist/src/docker/child.env new file mode 100644 index 0000000000..6a499121e9 --- /dev/null +++ b/tez-dist/src/docker/child.env @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Tez Child Container Environment Configuration + +USER=tez +LOG_DIRS=/opt/tez/logs +TEZ_COMPONENT=CHILD +TEZ_FRAMEWORK_MODE=STANDALONE_ZOOKEEPER +TEZ_CUSTOM_CONF_DIR=/opt/tez/custom-conf +TEZ_CHILD_HEAP_OPTS=-Xmx1024m +# JAVA_TOOL_OPTIONS='-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5006' diff --git a/tez-dist/src/docker/tez-am/conf/core-site.xml b/tez-dist/src/docker/conf/core-site.xml similarity index 100% rename from tez-dist/src/docker/tez-am/conf/core-site.xml rename to tez-dist/src/docker/conf/core-site.xml diff --git a/tez-dist/src/docker/tez-am/conf/hdfs-site.xml b/tez-dist/src/docker/conf/hdfs-site.xml similarity index 100% rename from tez-dist/src/docker/tez-am/conf/hdfs-site.xml rename to tez-dist/src/docker/conf/hdfs-site.xml diff --git a/tez-dist/src/docker/conf/tez-site.xml b/tez-dist/src/docker/conf/tez-site.xml new file mode 100644 index 0000000000..da36a6dcba --- /dev/null +++ b/tez-dist/src/docker/conf/tez-site.xml @@ -0,0 +1,119 @@ + + + + + + tez.am.client.am.port-range + 10001-10003 + + + + tez.am.tez-ui.webservice.enable + false + + + + + tez.am.zookeeper.quorum + zookeeper:2181 + + + + tez.am.log.level + INFO + + + + tez.local.mode + false + + + + + tez.session.am.dag.submit.timeout.secs + -1 + + + + + dfs.client.use.datanode.hostname + true + + + + + + tez.am.container.idle.release-timeout-min.millis + 21600000 + + + + tez.am.container.idle.release-timeout-max.millis + 21600000 + + + + + + tez.am.container.reuse.enabled + true + + + + + + tez.am.container.reuse.non-local-fallback.enabled + true + + + + tez.am.container.reuse.rack-fallback.enabled + true + + + + tez.task.get-task.sleep.interval-ms.max + 900000 + + + + tez.am.task.am.port-range + 12000-12000 + + + + ipc.client.connect.max.retries + 50 + + + + ipc.client.connect.retry.interval + 20000 + + + + tez.am.cluster-id + tez-standalone-cluster + + + + tez.runtime.optimize.local.fetch + true + + + diff --git a/tez-dist/src/docker/tez-am/docker-compose.yml b/tez-dist/src/docker/docker-compose.yml similarity index 82% rename from tez-dist/src/docker/tez-am/docker-compose.yml rename to tez-dist/src/docker/docker-compose.yml index 3740fabe8a..6a7edb55cc 100644 --- a/tez-dist/src/docker/tez-am/docker-compose.yml +++ b/tez-dist/src/docker/docker-compose.yml @@ -96,14 +96,18 @@ services: " tez-am: - image: apache/tez-am:${TEZ_VERSION:-1.0.0-SNAPSHOT} + image: apache/tez:${TEZ_VERSION:-1.0.0-SNAPSHOT} container_name: tez-am hostname: tez-am networks: - hadoop-network ports: - "10001:10001" + - "12000:12000" # - "5005:5005" # Uncomment for remote debugging + volumes: + - tez_shared_data:/data + - tez_shared_tmp:/tmp env_file: - ./am.env # Already define TEZ_CUSTOM_CONF_DIR in the env file, @@ -126,6 +130,36 @@ services: datanode: condition: service_started + tez-child-1: + image: apache/tez:${TEZ_VERSION:-1.0.0-SNAPSHOT} + container_name: tez-child-1 + hostname: tez-child-1 + networks: + - hadoop-network + volumes: + - tez_shared_data:/data + - tez_shared_tmp:/tmp + env_file: + - ./child.env + depends_on: + tez-am: + condition: service_started + + tez-child-2: + image: apache/tez:${TEZ_VERSION:-1.0.0-SNAPSHOT} + container_name: tez-child-2 + hostname: tez-child-2 + networks: + - hadoop-network + volumes: + - tez_shared_data:/data + - tez_shared_tmp:/tmp + env_file: + - ./child.env + depends_on: + tez-am: + condition: service_started + networks: hadoop-network: name: hadoop-network @@ -142,3 +176,7 @@ volumes: name: zookeeper_datalog zookeeper_logs: name: zookeeper_logs + tez_shared_data: + name: tez_shared_data + tez_shared_tmp: + name: tez_shared_tmp diff --git a/tez-dist/src/docker/entrypoint.sh b/tez-dist/src/docker/entrypoint.sh new file mode 100644 index 0000000000..b80dbef4f8 --- /dev/null +++ b/tez-dist/src/docker/entrypoint.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -xeou pipefail + +: "${TEZ_COMPONENT:="AM"}" + +echo "--> Starting Tez Component: $TEZ_COMPONENT" + +if [[ "$TEZ_COMPONENT" == "AM" ]]; then + echo "--> Routing to Tez AM Entrypoint" + exec /am-entrypoint.sh "$@" +elif [[ "$TEZ_COMPONENT" == "CHILD" ]]; then + echo "--> Routing to Tez Child Entrypoint" + exec /child-entrypoint.sh "$@" +else + echo "Error: Unknown TEZ_COMPONENT '$TEZ_COMPONENT'. Must be 'AM' or 'CHILD'." + exit 1 +fi diff --git a/tez-dist/src/docker/tez-am/README.md b/tez-dist/src/docker/tez-am/README.md deleted file mode 100644 index 987f381853..0000000000 --- a/tez-dist/src/docker/tez-am/README.md +++ /dev/null @@ -1,136 +0,0 @@ - - -# Tez AM Docker - -1. Building the docker image: - - ```bash - mvn clean install -DskipTests -Pdocker - ``` - -2. Install zookeeper in mac: - - a. Via brew: set the `tez.am.zookeeper.quorum` value as - `host.docker.internal:2181` in `tez-site.xml` - - ```bash - brew install zookeeper - zkServer start - ``` - - b. Use Zookeeper docker image (Refer to docker compose yml): - - ```bash - docker pull zookeeper:3.8.4 - - docker run -d \ - --name zookeeper-server \ - -p 2181:2181 \ - -p 8080:8080 \ - -e ZOO_MY_ID=1 \ - zookeeper:3.8.4 - ``` - -3. Running the Tez AM container explicitly: - - ```bash - export TEZ_VERSION=1.0.0-SNAPSHOT - - docker run --rm \ - -p 10001:10001 \ - --env-file tez-dist/src/docker/tez-am/am.env \ - --name tez-am \ - --hostname localhost \ - apache/tez-am:$TEZ_VERSION - ``` - - * `TEZ_VERSION` corresponds to the Maven `${project.version}`. - Set this environment variable in your shell before running the commands. - * Expose ports using the `-p` flag based on the - `tez.am.client.am.port-range` property in `tez-site.xml`. - * The `--hostname` flag configures the container's hostname, allowing - services on the host (e.g., macOS) to connect to it. - * Ensure the `--env-file` flag is included, or at a minimum, pass - `-e TEZ_FRAMEWORK_MODE=STANDALONE_ZOOKEEPER` to the `docker run` command. - -4. Debugging the Tez AM container: -Uncomment the `JAVA_TOOL_OPTIONS` in `am.env` and expose 5005 port -using `-p` flag - - ```bash - docker run --rm \ - -p 10001:10001 -p 5005:5005 \ - --env-file tez-dist/src/docker/tez-am/am.env \ - --name tez-am \ - --hostname localhost \ - apache/tez-am:$TEZ_VERSION - ``` - -5. To override the tez-site.xml in docker image use: - * Set the `TEZ_CUSTOM_CONF_DIR` environment variable in `am.env` - or via the `docker run` command (e.g., `/opt/tez/custom-conf`). - - ```bash - export TEZ_SITE_PATH=$(pwd)/tez-dist/src/docker/conf/tez-site.xml - - docker run --rm \ - -p 10001:10001 \ - --env-file tez-dist/src/docker/tez-am/am.env \ - -v "$TEZ_SITE_PATH:/opt/tez/custom-conf/tez-site.xml" \ - --name tez-am \ - --hostname localhost \ - apache/tez-am:$TEZ_VERSION - ``` - -6. To add plugin jars in docker image use: - * The plugin directory path inside the Docker container is fixed at `/opt/tez/plugins`. - - ```bash - docker run --rm \ - -p 10001:10001 \ - --env-file tez-dist/src/docker/tez-am/am.env \ - -v "/path/to/your/local/plugins:/opt/tez/plugins" \ - --name tez-am \ - --hostname localhost \ - apache/tez-am:$TEZ_VERSION - ``` - -7. Using Docker Compose: - * Refer to the `docker-compose.yml` file in this directory for - an example of how to run both the Tez AM and Zookeeper containers - together using Docker Compose. - - ```bash - docker-compose -f tez-dist/src/docker/tez-am/docker-compose.yml up -d --build - ``` - - * This command will start both the Tez AM, Zookeeper, Minimal - Hadoop containers as defined in the `docker-compose.yml` file. - -8. To mount custom plugins or JARs required by Tez AM (e.g., for split generation - — typically the hive-exec jar, but in general, any UDFs or dependencies - previously managed via YARN localization: - * Create a directory tez-plugins and add all required jars. - * Uncomment the following lines in docker compose under the tez-am service - to mount this directory as a volume to `/opt/tez/plugins` in the docker container. - - ```yml - volumes: - - ./tez-plugins:/opt/tez/plugins - ``` diff --git a/tez-dist/src/docker/tez-am/conf/tez-site.xml b/tez-dist/src/docker/tez-am/conf/tez-site.xml deleted file mode 100644 index 81add40eb1..0000000000 --- a/tez-dist/src/docker/tez-am/conf/tez-site.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - tez.am.client.am.port-range - 10001-10003 - - - - tez.am.tez-ui.webservice.enable - false - - - - - tez.am.zookeeper.quorum - zookeeper:2181 - - - - tez.am.log.level - INFO - - - - tez.local.mode - true - - - - - tez.session.am.dag.submit.timeout.secs - -1 - - - - - dfs.client.use.datanode.hostname - true - - - diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java index 2d77436ab4..606c104f58 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/ExternalAmWordCount.java @@ -53,6 +53,10 @@ import org.apache.tez.runtime.library.api.KeyValuesReader; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; @@ -79,8 +83,26 @@ public static void main(String[] args) throws Exception { LOG.info( "Initializing TezClient to connect to External AM via ZooKeeper quorum at {}", ZK_ADDRESS); + ServicePluginsDescriptor servicePluginsDescriptor = + ServicePluginsDescriptor.create( + new TaskSchedulerDescriptor[] { + TaskSchedulerDescriptor.create( + "zk_scheduler", "org.apache.tez.dag.app.rm.ZookeeperTaskScheduler") + }, + new ContainerLauncherDescriptor[] { + ContainerLauncherDescriptor.create( + "zk_launcher", "org.apache.tez.dag.app.launcher.NoOpContainerLauncher") + }, + new TaskCommunicatorDescriptor[] { + TaskCommunicatorDescriptor.create( + "zk_comm", "org.apache.tez.dag.app.TezTaskCommunicatorImpl") + }); + final TezClient tezClient = - TezClient.newBuilder("ExternalAmWordCount", tezConf).setIsSession(true).build(); + TezClient.newBuilder("ExternalAmWordCount", tezConf) + .setIsSession(true) + .setServicePluginDescriptor(servicePluginsDescriptor) + .build(); try { LOG.info("Querying ZooKeeper quorum to discover an active Tez AM session"); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java index a7192f4051..75c016f0a4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java @@ -59,7 +59,7 @@ protected ContainerTask callInternal() throws Exception { long getTaskPollStartTime = System.currentTimeMillis(); nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL; for (int idle = 1; containerTask == null; idle++) { - long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime); + long sleepTimeMilliSecs = Math.min(idle * 10000, getTaskMaxSleepTime); maybeLogSleepMessage(sleepTimeMilliSecs); TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs); containerTask = umbilical.getTask(containerContext); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 60faec56fa..b1d48872c6 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -18,6 +18,9 @@ package org.apache.tez.runtime.task; + +import static org.apache.tez.frameworkplugins.FrameworkMode.STANDALONE_ZOOKEEPER; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -33,13 +36,17 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -51,6 +58,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.log4j.helpers.ThreadLocalMap; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.client.registry.zookeeper.ZkAMRegistryClient; +import org.apache.tez.client.registry.zookeeper.ZkConfig; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.Preconditions; @@ -66,6 +76,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -80,6 +91,7 @@ import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezRuntimeShutdownHandler; +import org.apache.zookeeper.CreateMode; import com.google.common.base.Function; import com.google.common.collect.HashMultimap; @@ -133,6 +145,8 @@ public class TezChild { private final TezExecutors sharedExecutor; private ThreadLocalMap mdcContext; + private static CuratorFramework zkWorkerClient; + public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, Map serviceProviderEnvMap, @@ -517,45 +531,112 @@ public static TezChild newTezChild(Configuration conf, String host, int port, St hadoopShim); } - public static void main(String[] args) throws IOException, InterruptedException, TezException { + public static void main(String[] args) throws Exception { TezClassLoader.setupTezClassLoader(); final Configuration defaultConf = new Configuration(); + String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); + + String host, appId, tokenIdentifier, containerIdentifier; + int port, attemptNumber; + Credentials credentials = new Credentials(); + + if (STANDALONE_ZOOKEEPER.name().equalsIgnoreCase(frameworkMode)) { + DAGProtos.ConfigurationProto confProtoBefore = TezUtilsInternal.loadConfProtoFromText(); + TezUtilsInternal.addUserSpecifiedTezConfiguration( + defaultConf, confProtoBefore.getConfKeyValuesList()); + + ZkAMRegistryClient registry = ZkAMRegistryClient.getClient(defaultConf); + registry.start(); + + // TODO: Expose retry counter and sleep time as config — in ZKconfig or TezConfig? + while (!registry.isInitialized()) { + TimeUnit.SECONDS.sleep(5); + } + + List records = registry.getAllRecords(); + if (records.isEmpty()) { + throw new RuntimeException("No AM found in ZooKeeper registry"); + } + // TODO: Should we always get the first or there should be some sophisticated logic? + AMRecord amRecord = records.getFirst(); + + host = amRecord.getHostName(); + String portRange = + defaultConf.getTrimmed(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, "12000-12000"); + port = Integer.parseInt(portRange.split("[-,]")[0]); + + appId = amRecord.getApplicationId().toString(); + tokenIdentifier = appId; + attemptNumber = 1; + + String baseContainerId = appId.replace("application_", "container_"); + int randomSeq = (int) (Math.random() * 900000) + 100000; + containerIdentifier = baseContainerId + "_01_" + randomSeq; + + String zkQuorum = defaultConf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM); + ZkConfig zkconfig = new ZkConfig(defaultConf); + zkWorkerClient = CuratorFrameworkFactory.newClient(zkQuorum, zkconfig.getRetryPolicy()); + zkWorkerClient.start(); + + // Create Ephemeral node representing this worker + String workerPath = zkconfig.getZkTaskNameSpace() + "/" + appId + "/" + containerIdentifier; + zkWorkerClient + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(workerPath, host.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + + LOG.info("Registered TezChild Worker in ZK at path: {}", workerPath); + + // FIX: Deterministic token + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(appId)); + Token sessionToken = + new Token<>(identifier, TezCommonUtils.createJobTokenSecretManager(defaultConf)); + credentials.addToken(new Text("SessionToken"), sessionToken); + + LOG.info("ZK Mode: Discovered AM {} at {}:{}", appId, host, port); + + } else { + assert args.length == 5; + host = args[0]; + port = Integer.parseInt(args[1]); + containerIdentifier = args[2]; + tokenIdentifier = args[3]; + attemptNumber = Integer.parseInt(args[4]); + + DAGProtos.ConfigurationProto confProto = + TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); + TezUtilsInternal.addUserSpecifiedTezConfiguration( + defaultConf, confProto.getConfKeyValuesList()); + } Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); final String pid = System.getenv().get("JVM_PID"); + String[] localDirs = + TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())); - - assert args.length == 5; - String host = args[0]; - int port = Integer.parseInt(args[1]); - final String containerIdentifier = args[2]; - final String tokenIdentifier = args[3]; - final int attemptNumber = Integer.parseInt(args[4]); - final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS - .name())); - CallerContext.setCurrent(new CallerContext.Builder("tez_"+tokenIdentifier).build()); - LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier); + CallerContext.setCurrent(new CallerContext.Builder("tez_" + tokenIdentifier).build()); + LOG.info("TezChild starting with PID={}, containerIdentifier={}", pid, containerIdentifier); if (LOG.isDebugEnabled()) { - LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port - + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber - + " tokenIdentifier: " + tokenIdentifier); + LOG.debug( + "Info from cmd line: AM-host: {} AM-port: {} containerIdentifier: {} appAttemptNumber: {} tokenIdentifier: {}", + host, + port, + containerIdentifier, + attemptNumber, + tokenIdentifier); + credentials = UserGroupInformation.getCurrentUser().getCredentials(); } // Security framework already loaded the tokens into current ugi - DAGProtos.ConfigurationProto confProto = - TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); - TezUtilsInternal.addUserSpecifiedTezConfiguration(defaultConf, confProto.getConfKeyValuesList()); UserGroupInformation.setConfiguration(defaultConf); - Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); HadoopShim hadoopShim = new HadoopShimsLoader(defaultConf).getHadoopShim(); // log the system properties if (LOG.isInfoEnabled()) { String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(defaultConf); - if (systemPropsToLog != null) { - LOG.info(systemPropsToLog); - } + LOG.info(systemPropsToLog); } TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 44a753bf6c..a0237b573e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -271,6 +271,9 @@ public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url, public static int deserializeShuffleProviderMetaData(ByteBuffer meta) throws IOException { + if (meta == null) { + return 0; + } try (DataInputByteBuffer in = new DataInputByteBuffer()) { in.reset(meta); return in.readInt(); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index fdc15dd85e..d74bea9682 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -316,6 +316,9 @@ static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, if (!sendEmptyPartitionDetails || outputGenerated) { String host = context.getExecutionContext().getHostName(); + if (host == null) { + host = "localhost"; + } ByteBuffer shuffleMetadata = context .getServiceProviderMetaData(auxiliaryService); int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index e863533cd1..22d88d58a8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -180,7 +180,7 @@ public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, @VisibleForTesting protected void fetchNext() throws InterruptedException, IOException { try { - if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { + if (localDiskFetchEnabled) { setupLocalDiskFetch(mapHost); } else { // Shuffle