[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473
[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473Aggarwal-Raghav wants to merge 3 commits intoapache:masterfrom
Conversation
|
With the docker-compose in TEZ-4700, the |
|
💔 -1 overall
This message was automatically generated. |
ec55ca4 to
4443ed5
Compare
| addIfService(containerHeartbeatHandler, true); | ||
|
|
||
| jobTokenSecretManager = new JobTokenSecretManager(amConf); | ||
| jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf); |
There was a problem hiding this comment.
this is temporary change, I believe for each AM we need to wirte the token in a new zkPath and then the TezChild need to read that path to get the token and start the TezChild container.
| amPluginDescriptorProto = confProto.getAmPluginDescriptor(); | ||
| } | ||
|
|
||
| String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); |
There was a problem hiding this comment.
Use Zookeeper based scheduler,launcher
| } | ||
| } | ||
|
|
||
| if (!isLocal) { |
There was a problem hiding this comment.
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,139 [ERROR] [IPC Server handler 1 on default port 10001] |impl.DAGImpl|: Uncaught Exception when handling event DAG_INIT on Dag dag_1775213951724_0000_1 at currentState=NEW
2026-04-03 16:29:41.140 | java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.apache.tez.dag.app.AppContext.getTaskScheduerIdentifier(String)" is null
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.VertexImpl.<init>(VertexImpl.java:1133)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.createVertex(DAGImpl.java:1742)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.initializeDAG(DAGImpl.java:1606)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1878)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1855)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory$MultipleInternalArc.doTransition(StateMachineFactory.java:385)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory.access$500(StateMachineFactory.java:46)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:493)
2026-04-03 16:29:41.140 | at org.apache.tez.state.StateMachineTez.doTransition(StateMachineTez.java:63)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:1219)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:160)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster$DagEventDispatcher.handle(DAGAppMaster.java:2292)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.startDAGExecution(DAGAppMaster.java:2735)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:2687)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.submitDAGToAppMaster(DAGAppMaster.java:1389)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.DAGClientHandler.submitDAG(DAGClientHandler.java:143)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl.submitDAG(DAGClientAMProtocolBlockingPBServerImpl.java:187)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC$DAGClientAMProtocol$2.callBlockingMethod(DAGClientAMProtocolRPC.java:9713)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine$Server.processCall(ProtobufRpcEngine.java:484)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:595)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1228)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1246)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1169)
2026-04-03 16:29:41.140 | at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
2026-04-03 16:29:41.140 | at java.base/javax.security.auth.Subject.doAs(Subject.java:525)
2026-04-03 16:29:41.140 | at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3198)
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,140 [INFO] [Dispatcher thread {Central}] |impl.DAGImpl|: dag_1775213951724_0000_1 terminating due to internal error.
| fullZkNamespace = appendNamespace(fullZkNamespace, namespace); | ||
| fullZkAMNamespace = appendNamespace(fullZkAMNamespace, amNamespace); | ||
|
|
||
| String fullZkTaskNameSpace = ZK_NAMESPACE_PREFIX; |
|
This PR is incomplete still lot of work is required, the tez am is not able to assign work to tez child but pushing it for feedback about implementation/design |
|
💔 -1 overall
This message was automatically generated. |
|
CC @abstractdog |
| LOG.info("Container with id: " + containerId | ||
| + " is invalid and will be killed"); | ||
| + " is not yet registered, asking it to wait"); | ||
| task = null; |
There was a problem hiding this comment.
this is done because org.apache.tez.dag.app.rm.TaskSchedulerContextImpl#containerAllocated() used in ZookeeperTaskScheduler.java is just incrementing the counter, for externally created containers it is sending shoulddie response but instead it should send back null. this will help ContainerReporter.java#callInteral() for loop condition to wait until ZookeeperTaskScheduler creates the container, and registers it in the AM.
| @VisibleForTesting | ||
| protected void fetchNext() throws InterruptedException, IOException { | ||
| try { | ||
| if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { |
There was a problem hiding this comment.
Shuffling is forced to read from disk i.e. shared path.
|
|
||
| if (!sendEmptyPartitionDetails || outputGenerated) { | ||
| String host = context.getExecutionContext().getHostName(); | ||
| if (host == null) { |
There was a problem hiding this comment.
was causing NPE
| nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL; | ||
| for (int idle = 1; containerTask == null; idle++) { | ||
| long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime); | ||
| long sleepTimeMilliSecs = Math.min(idle * 10000, getTaskMaxSleepTime); |
There was a problem hiding this comment.
should be configurable from TezConfiguration i guess
|
💔 -1 overall
This message was automatically generated. |




No description provided.