-
Notifications
You must be signed in to change notification settings - Fork 440
[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery #473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ | |
|
|
||
|
|
||
|
|
||
| import static org.apache.tez.frameworkplugins.FrameworkMode.STANDALONE_ZOOKEEPER; | ||
|
|
||
| import java.io.File; | ||
| import java.io.FileInputStream; | ||
| import java.io.FileNotFoundException; | ||
|
|
@@ -130,6 +132,8 @@ | |
| import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; | ||
| import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; | ||
| import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; | ||
| import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; | ||
| import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; | ||
| import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; | ||
| import org.apache.tez.dag.app.RecoveryParser.DAGRecoveryData; | ||
| import org.apache.tez.dag.app.dag.DAG; | ||
|
|
@@ -514,7 +518,7 @@ protected void serviceInit(final Configuration conf) throws Exception { | |
| containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf); | ||
| addIfService(containerHeartbeatHandler, true); | ||
|
|
||
| jobTokenSecretManager = new JobTokenSecretManager(amConf); | ||
| jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is temporary change, I believe for each AM we need to wirte the token in a new zkPath and then the TezChild need to read that path to get the token and start the TezChild container. |
||
|
|
||
| sessionToken = frameworkService.getAMExtensions().getSessionToken( | ||
| appAttemptID, jobTokenSecretManager, amCredentials); | ||
|
|
@@ -2469,6 +2473,49 @@ public static void main(String[] args) { | |
| amPluginDescriptorProto = confProto.getAmPluginDescriptor(); | ||
| } | ||
|
|
||
| String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Zookeeper based scheduler,launcher |
||
| if (STANDALONE_ZOOKEEPER.name().equalsIgnoreCase(frameworkMode)) { | ||
| LOG.info("External AM: Injecting Standalone ZK Service Plugins dynamically"); | ||
|
|
||
| TezEntityDescriptorProto schedulerEntity = | ||
| TezEntityDescriptorProto.newBuilder() | ||
| .setClassName("org.apache.tez.dag.app.rm.ZookeeperTaskScheduler") | ||
| .build(); | ||
| TezNamedEntityDescriptorProto schedulerNamed = | ||
| TezNamedEntityDescriptorProto.newBuilder() | ||
| .setName("zk_scheduler") | ||
| .setEntityDescriptor(schedulerEntity) | ||
| .build(); | ||
|
|
||
| TezEntityDescriptorProto launcherEntity = | ||
| TezEntityDescriptorProto.newBuilder() | ||
| .setClassName("org.apache.tez.dag.app.launcher.NoOpContainerLauncher") | ||
| .build(); | ||
| TezNamedEntityDescriptorProto launcherNamed = | ||
| TezNamedEntityDescriptorProto.newBuilder() | ||
| .setName("zk_launcher") | ||
| .setEntityDescriptor(launcherEntity) | ||
| .build(); | ||
|
|
||
| TezEntityDescriptorProto commEntity = | ||
| TezEntityDescriptorProto.newBuilder() | ||
| .setClassName("org.apache.tez.dag.app.TezTaskCommunicatorImpl") | ||
| .build(); | ||
| TezNamedEntityDescriptorProto commNamed = | ||
| TezNamedEntityDescriptorProto.newBuilder() | ||
| .setName("zk_communicator") | ||
| .setEntityDescriptor(commEntity) | ||
| .build(); | ||
|
|
||
| amPluginDescriptorProto = | ||
| AMPluginDescriptorProto.newBuilder() | ||
| .setContainersEnabled(false) | ||
| .addTaskSchedulers(schedulerNamed) | ||
| .addContainerLaunchers(launcherNamed) | ||
| .addTaskCommunicators(commNamed) | ||
| .build(); | ||
| } | ||
|
|
||
| UserGroupInformation.setConfiguration(conf); | ||
| Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -411,11 +411,12 @@ private ContainerTask getContainerTask(ContainerId containerId) throws IOExcepti | |
| if (getContext().isKnownContainer(containerId)) { | ||
| LOG.info("Container with id: " + containerId | ||
| + " is valid, but no longer registered, and will be killed"); | ||
| task = TASK_FOR_INVALID_JVM; | ||
| } else { | ||
| LOG.info("Container with id: " + containerId | ||
| + " is invalid and will be killed"); | ||
| + " is not yet registered, asking it to wait"); | ||
| task = null; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is done because |
||
| } | ||
| task = TASK_FOR_INVALID_JVM; | ||
| } else { | ||
| synchronized (containerInfo) { | ||
| getContext().containerAlive(containerId); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,8 @@ | |
| import java.util.concurrent.locks.Lock; | ||
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -1129,6 +1131,32 @@ public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, | |
| } | ||
| } | ||
|
|
||
| if (!isLocal) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| taskSchedulerName = | ||
| resolveFallbackName( | ||
| taskSchedulerName, | ||
| "Task scheduler", | ||
| "scheduler", | ||
| appContext::getTaskScheduerIdentifier, | ||
| () -> appContext.getTaskSchedulerName(0)); | ||
|
|
||
| containerLauncherName = | ||
| resolveFallbackName( | ||
| containerLauncherName, | ||
| "Container launcher", | ||
| "launcher", | ||
| appContext::getContainerLauncherIdentifier, | ||
| () -> appContext.getContainerLauncherName(0)); | ||
|
|
||
| taskCommName = | ||
| resolveFallbackName( | ||
| taskCommName, | ||
| "Task communicator", | ||
| "communicator", | ||
| appContext::getTaskCommunicatorIdentifier, | ||
| () -> appContext.getTaskCommunicatorName(0)); | ||
| } | ||
|
|
||
| try { | ||
| taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName); | ||
| } catch (Exception e) { | ||
|
|
@@ -5022,4 +5050,27 @@ public void initShuffleDeletionContext(int deletionHeight) { | |
| vShuffleDeletionContext.setSpannedVertices(this); | ||
| this.vShuffleDeletionContext = vShuffleDeletionContext; | ||
| } | ||
|
|
||
| /** Helper method to resolve plugin fallback configurations. */ | ||
| private String resolveFallbackName( | ||
| String currentName, | ||
| String logPrefix, | ||
| String logSuffix, | ||
| Function<String, Integer> identifierProvider, | ||
| Supplier<String> fallbackProvider) { | ||
|
|
||
| if (identifierProvider.apply(currentName) == null) { | ||
| String fallback = fallbackProvider.get(); | ||
| if (fallback != null) { | ||
| LOG.info( | ||
| "{} {} not found, using default {} {} at index 0", | ||
| logPrefix, | ||
| currentName, | ||
| logSuffix, | ||
| fallback); | ||
| return fallback; | ||
| } | ||
| } | ||
| return currentName; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TezChild will write its path in zk, the multiple container is because of multiple restart for tez child in docker compose, please ignore it