Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public static AMRecord getAMRecord(final ChildData childData) throws IOException
public void start() throws Exception {
ZkConfig zkConf = new ZkConfig(this.conf);
client = zkConf.createCuratorFramework();
cache = new TreeCache(client, zkConf.getZkNamespace());
cache = new TreeCache(client, zkConf.getZkAMNamespace());
client.start();
client.blockUntilConnected();
cache.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class ZkConfig {
public final static String DEFAULT_COMPUTE_GROUP_NAME = "default-compute";

private final String zkQuorum;
private final String zkNamespace;
private final String zkAMNamespace;
private final String zkTaskNameSpace;
private final int curatorBackoffSleepMs;
private final int curatorMaxRetries;
private final int sessionTimeoutMs;
Expand All @@ -54,27 +55,42 @@ public ZkConfig(Configuration conf) {
zkQuorum = conf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM);
Preconditions.checkArgument(!Strings.isNullOrEmpty(zkQuorum), "zkQuorum cannot be null or empty");

String fullZkNamespace = ZK_NAMESPACE_PREFIX;
String fullZkAMNamespace = ZK_NAMESPACE_PREFIX;

String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE,
String amNamespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE,
TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE_DEFAULT);
Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace cannot be null or empty");
Preconditions.checkArgument(!Strings.isNullOrEmpty(amNamespace), "namespace cannot be null or empty");

fullZkNamespace = appendNamespace(fullZkNamespace, namespace);
fullZkAMNamespace = appendNamespace(fullZkAMNamespace, amNamespace);

String fullZkTaskNameSpace = ZK_NAMESPACE_PREFIX;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TezChild will write its path in zk, the multiple container is because of multiple restart for tez child in docker compose, please ignore it

Image


String taskNamespace = conf.get(TezConfiguration.TEZ_TASK_REGISTRY_NAMESPACE,
TezConfiguration.TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT);
Preconditions.checkArgument(!Strings.isNullOrEmpty(taskNamespace), "taskNamespace cannot be null or empty");

fullZkTaskNameSpace = appendNamespace(fullZkTaskNameSpace, taskNamespace);

boolean enableComputeGroups = conf.getBoolean(TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS,
TezConfiguration.TEZ_AM_REGISTRY_ENABLE_COMPUTE_GROUPS_DEFAULT);
if (enableComputeGroups) {
final String subNamespace = System.getenv(COMPUTE_GROUP_NAME_ENV);
if (subNamespace != null && !subNamespace.isEmpty()) {
fullZkNamespace = appendNamespace(fullZkNamespace, subNamespace);
LOG.info("Compute groups enabled: subNamespace: {} fullZkNamespace: {}", subNamespace, fullZkNamespace);
fullZkAMNamespace = appendNamespace(fullZkAMNamespace, subNamespace);
fullZkTaskNameSpace = appendNamespace(fullZkTaskNameSpace, subNamespace);
LOG.info(
"Compute groups enabled: subNamespace: {} fullZkNamespace: {} fullZkTaskNameSpace: {}",
subNamespace,
fullZkAMNamespace,
fullZkTaskNameSpace);
}
} else {
LOG.info("Compute groups disabled: fullZkNamespace: {}", fullZkNamespace);
LOG.info("Compute groups disabled: fullZkNamespace: {} fullZkTaskNameSpace: {}", fullZkAMNamespace, fullZkTaskNameSpace);
}
zkNamespace = fullZkNamespace;
LOG.info("Using ZK namespace: {}", fullZkNamespace);
zkAMNamespace = fullZkAMNamespace;
zkTaskNameSpace = fullZkTaskNameSpace;
LOG.info("Using ZK namespace: {}", fullZkAMNamespace);
LOG.info("Using ZK task namespace: {}", fullZkTaskNameSpace);

curatorBackoffSleepMs = Math.toIntExact(conf.getTimeDuration(TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP,
TezConfiguration.TEZ_AM_CURATOR_BACKOFF_SLEEP_DEFAULT, TimeUnit.MILLISECONDS));
Expand All @@ -90,8 +106,12 @@ public String getZkQuorum() {
return zkQuorum;
}

public String getZkNamespace() {
return zkNamespace;
public String getZkAMNamespace() {
return zkAMNamespace;
}

public String getZkTaskNameSpace() {
return zkTaskNameSpace;
}

public int getCuratorBackoffSleepMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
Expand All @@ -66,6 +67,14 @@ public final class TezCommonUtils {

public static final String TEZ_SYSTEM_SUB_DIR = ".tez";

public static JobTokenSecretManager createJobTokenSecretManager(Configuration conf) {
String clusterId = conf.get(TezConfiguration.TEZ_AM_CLUSTER_ID);
if (clusterId != null) {
return new JobTokenSecretManager(JobTokenSecretManager.createSecretKey(clusterId.getBytes()), conf);
}
return new JobTokenSecretManager(conf);
}

private TezCommonUtils() {}

/**
Expand Down
11 changes: 11 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,9 @@ public TezConfiguration(boolean loadDefaults) {
TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled";
public static final boolean TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT = false;

@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CLUSTER_ID = TEZ_AM_PREFIX + "cluster-id";

@Private
@Unstable
/**
Expand Down Expand Up @@ -2420,4 +2423,12 @@ static Set<String> getPropertySet() {
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs";

/**
* String value. Namespace in ZooKeeper registry for the TezChild
*/
@ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_TASK_REGISTRY_NAMESPACE = TEZ_TASK_PREFIX + "registry.namespace";
public static final String TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT = "/tez_am/workers";
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testBasicConfiguration() {
ZkConfig zkConfig = new ZkConfig(conf);

assertEquals("localhost:2181", zkConfig.getZkQuorum());
assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace());
}

@Test
Expand Down Expand Up @@ -80,7 +80,7 @@ public void testCustomConfigurationValues() {
ZkConfig zkConfig = new ZkConfig(conf);

assertEquals("zk1:2181,zk2:2181", zkConfig.getZkQuorum());
assertEquals("/tez-external-sessions/custom-namespace", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/custom-namespace", zkConfig.getZkAMNamespace());
assertEquals(2000, zkConfig.getCuratorBackoffSleepMs());
assertEquals(5, zkConfig.getCuratorMaxRetries());
assertEquals(200000, zkConfig.getSessionTimeoutMs());
Expand All @@ -95,7 +95,7 @@ public void testNamespaceWithLeadingSlash() {

ZkConfig zkConfig = new ZkConfig(conf);

assertEquals("/tez-external-sessions/namespace-with-slash", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/namespace-with-slash", zkConfig.getZkAMNamespace());
}

@Test
Expand All @@ -106,7 +106,7 @@ public void testNamespaceWithoutLeadingSlash() {

ZkConfig zkConfig = new ZkConfig(conf);

assertEquals("/tez-external-sessions/namespace-without-slash", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/namespace-without-slash", zkConfig.getZkAMNamespace());
}

@Test
Expand All @@ -118,7 +118,7 @@ public void testComputeGroupsDisabled() {

ZkConfig zkConfig = new ZkConfig(conf);

assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace());
}

@Test
Expand All @@ -132,7 +132,7 @@ public void testComputeGroupsEnabledWithoutEnvVar() {
ZkConfig zkConfig = new ZkConfig(conf);

// Namespace should start with base namespace (env var not set, so no sub-namespace added)
assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkNamespace());
assertEquals("/tez-external-sessions/test-namespace", zkConfig.getZkAMNamespace());
}

@Test
Expand Down Expand Up @@ -225,6 +225,6 @@ public void testDefaultNamespace() {
// Don't set namespace, should use default
ZkConfig zkConfig = new ZkConfig(conf);
assertEquals("/tez-external-sessions" + TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE_DEFAULT,
zkConfig.getZkNamespace());
zkConfig.getZkAMNamespace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private void registerMockAM(TezConfiguration tezConf, ApplicationId appId, Strin
// Wait for connection
curatorClient.blockUntilConnected();

String namespace = zkConfig.getZkNamespace();
String namespace = zkConfig.getZkAMNamespace();
String path = namespace + "/" + appId.toString();

// Create parent directories if needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ZkAMRegistry(String externalId) {
public void init(Configuration conf) {
zkConfig = new ZkConfig(conf);
this.client = zkConfig.createCuratorFramework();
this.namespace = zkConfig.getZkNamespace();
this.namespace = zkConfig.getZkAMNamespace();
LOG.info("ZkAMRegistry initialized");
}

Expand Down
49 changes: 48 additions & 1 deletion tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@



import static org.apache.tez.frameworkplugins.FrameworkMode.STANDALONE_ZOOKEEPER;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -130,6 +132,8 @@
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData;
import org.apache.tez.dag.app.dag.DAG;
Expand Down Expand Up @@ -514,7 +518,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
addIfService(containerHeartbeatHandler, true);

jobTokenSecretManager = new JobTokenSecretManager(amConf);
jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is temporary change, I believe for each AM we need to wirte the token in a new zkPath and then the TezChild need to read that path to get the token and start the TezChild container.


sessionToken = frameworkService.getAMExtensions().getSessionToken(
appAttemptID, jobTokenSecretManager, amCredentials);
Expand Down Expand Up @@ -2469,6 +2473,49 @@ public static void main(String[] args) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
}

String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Zookeeper based scheduler,launcher

if (STANDALONE_ZOOKEEPER.name().equalsIgnoreCase(frameworkMode)) {
LOG.info("External AM: Injecting Standalone ZK Service Plugins dynamically");

TezEntityDescriptorProto schedulerEntity =
TezEntityDescriptorProto.newBuilder()
.setClassName("org.apache.tez.dag.app.rm.ZookeeperTaskScheduler")
.build();
TezNamedEntityDescriptorProto schedulerNamed =
TezNamedEntityDescriptorProto.newBuilder()
.setName("zk_scheduler")
.setEntityDescriptor(schedulerEntity)
.build();

TezEntityDescriptorProto launcherEntity =
TezEntityDescriptorProto.newBuilder()
.setClassName("org.apache.tez.dag.app.launcher.NoOpContainerLauncher")
.build();
TezNamedEntityDescriptorProto launcherNamed =
TezNamedEntityDescriptorProto.newBuilder()
.setName("zk_launcher")
.setEntityDescriptor(launcherEntity)
.build();

TezEntityDescriptorProto commEntity =
TezEntityDescriptorProto.newBuilder()
.setClassName("org.apache.tez.dag.app.TezTaskCommunicatorImpl")
.build();
TezNamedEntityDescriptorProto commNamed =
TezNamedEntityDescriptorProto.newBuilder()
.setName("zk_communicator")
.setEntityDescriptor(commEntity)
.build();

amPluginDescriptorProto =
AMPluginDescriptorProto.newBuilder()
.setContainersEnabled(false)
.addTaskSchedulers(schedulerNamed)
.addContainerLaunchers(launcherNamed)
.addTaskCommunicators(commNamed)
.build();
}

UserGroupInformation.setConfiguration(conf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,12 @@ private ContainerTask getContainerTask(ContainerId containerId) throws IOExcepti
if (getContext().isKnownContainer(containerId)) {
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed");
task = TASK_FOR_INVALID_JVM;
} else {
LOG.info("Container with id: " + containerId
+ " is invalid and will be killed");
+ " is not yet registered, asking it to wait");
task = null;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done because org.apache.tez.dag.app.rm.TaskSchedulerContextImpl#containerAllocated() used in ZookeeperTaskScheduler.java is just incrementing the counter, for externally created containers it is sending shoulddie response but instead it should send back null. this will help ContainerReporter.java#callInteral() for loop condition to wait until ZookeeperTaskScheduler creates the container, and registers it in the AM.

}
task = TASK_FOR_INVALID_JVM;
} else {
synchronized (containerInfo) {
getContext().containerAlive(containerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -1129,6 +1131,32 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
}
}

if (!isLocal) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,139 [ERROR] [IPC Server handler 1 on default port 10001] |impl.DAGImpl|: Uncaught Exception when handling event DAG_INIT on Dag dag_1775213951724_0000_1 at currentState=NEW
2026-04-03 16:29:41.140 | java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.apache.tez.dag.app.AppContext.getTaskScheduerIdentifier(String)" is null
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.VertexImpl.<init>(VertexImpl.java:1133)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.createVertex(DAGImpl.java:1742)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.initializeDAG(DAGImpl.java:1606)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1878)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1855)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory$MultipleInternalArc.doTransition(StateMachineFactory.java:385)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory.access$500(StateMachineFactory.java:46)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:493)
2026-04-03 16:29:41.140 | 	at org.apache.tez.state.StateMachineTez.doTransition(StateMachineTez.java:63)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:1219)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:160)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster$DagEventDispatcher.handle(DAGAppMaster.java:2292)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.startDAGExecution(DAGAppMaster.java:2735)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:2687)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.submitDAGToAppMaster(DAGAppMaster.java:1389)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.DAGClientHandler.submitDAG(DAGClientHandler.java:143)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl.submitDAG(DAGClientAMProtocolBlockingPBServerImpl.java:187)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC$DAGClientAMProtocol$2.callBlockingMethod(DAGClientAMProtocolRPC.java:9713)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server.processCall(ProtobufRpcEngine.java:484)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:595)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1228)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1246)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1169)
2026-04-03 16:29:41.140 | 	at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
2026-04-03 16:29:41.140 | 	at java.base/javax.security.auth.Subject.doAs(Subject.java:525)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3198)
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,140 [INFO] [Dispatcher thread {Central}] |impl.DAGImpl|: dag_1775213951724_0000_1 terminating due to internal error. 

taskSchedulerName =
resolveFallbackName(
taskSchedulerName,
"Task scheduler",
"scheduler",
appContext::getTaskScheduerIdentifier,
() -> appContext.getTaskSchedulerName(0));

containerLauncherName =
resolveFallbackName(
containerLauncherName,
"Container launcher",
"launcher",
appContext::getContainerLauncherIdentifier,
() -> appContext.getContainerLauncherName(0));

taskCommName =
resolveFallbackName(
taskCommName,
"Task communicator",
"communicator",
appContext::getTaskCommunicatorIdentifier,
() -> appContext.getTaskCommunicatorName(0));
}

try {
taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
} catch (Exception e) {
Expand Down Expand Up @@ -5022,4 +5050,27 @@ public void initShuffleDeletionContext(int deletionHeight) {
vShuffleDeletionContext.setSpannedVertices(this);
this.vShuffleDeletionContext = vShuffleDeletionContext;
}

/** Helper method to resolve plugin fallback configurations. */
private String resolveFallbackName(
String currentName,
String logPrefix,
String logSuffix,
Function<String, Integer> identifierProvider,
Supplier<String> fallbackProvider) {

if (identifierProvider.apply(currentName) == null) {
String fallback = fallbackProvider.get();
if (fallback != null) {
LOG.info(
"{} {} not found, using default {} {} at index 0",
logPrefix,
currentName,
logSuffix,
fallback);
return fallback;
}
}
return currentName;
}
}
Loading