[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473
[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473Aggarwal-Raghav wants to merge 3 commits into
Conversation
|
With the docker-compose in TEZ-4700, the |
|
💔 -1 overall
This message was automatically generated. |
ec55ca4 to
4443ed5
Compare
| addIfService(containerHeartbeatHandler, true); | ||
|
|
||
| jobTokenSecretManager = new JobTokenSecretManager(amConf); | ||
| jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf); |
There was a problem hiding this comment.
this is temporary change, I believe for each AM we need to wirte the token in a new zkPath and then the TezChild need to read that path to get the token and start the TezChild container.
| amPluginDescriptorProto = confProto.getAmPluginDescriptor(); | ||
| } | ||
|
|
||
| String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); |
There was a problem hiding this comment.
Use Zookeeper based scheduler,launcher
| } | ||
| } | ||
|
|
||
| if (!isLocal) { |
There was a problem hiding this comment.
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,139 [ERROR] [IPC Server handler 1 on default port 10001] |impl.DAGImpl|: Uncaught Exception when handling event DAG_INIT on Dag dag_1775213951724_0000_1 at currentState=NEW
2026-04-03 16:29:41.140 | java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.apache.tez.dag.app.AppContext.getTaskScheduerIdentifier(String)" is null
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.VertexImpl.<init>(VertexImpl.java:1133)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.createVertex(DAGImpl.java:1742)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.initializeDAG(DAGImpl.java:1606)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1878)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1855)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory$MultipleInternalArc.doTransition(StateMachineFactory.java:385)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory.access$500(StateMachineFactory.java:46)
2026-04-03 16:29:41.140 | at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:493)
2026-04-03 16:29:41.140 | at org.apache.tez.state.StateMachineTez.doTransition(StateMachineTez.java:63)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:1219)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:160)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster$DagEventDispatcher.handle(DAGAppMaster.java:2292)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.startDAGExecution(DAGAppMaster.java:2735)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:2687)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.app.DAGAppMaster.submitDAGToAppMaster(DAGAppMaster.java:1389)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.DAGClientHandler.submitDAG(DAGClientHandler.java:143)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl.submitDAG(DAGClientAMProtocolBlockingPBServerImpl.java:187)
2026-04-03 16:29:41.140 | at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC$DAGClientAMProtocol$2.callBlockingMethod(DAGClientAMProtocolRPC.java:9713)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine$Server.processCall(ProtobufRpcEngine.java:484)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:595)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1228)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1246)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1169)
2026-04-03 16:29:41.140 | at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
2026-04-03 16:29:41.140 | at java.base/javax.security.auth.Subject.doAs(Subject.java:525)
2026-04-03 16:29:41.140 | at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
2026-04-03 16:29:41.140 | at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3198)
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,140 [INFO] [Dispatcher thread {Central}] |impl.DAGImpl|: dag_1775213951724_0000_1 terminating due to internal error.
| fullZkNamespace = appendNamespace(fullZkNamespace, namespace); | ||
| fullZkAMNamespace = appendNamespace(fullZkAMNamespace, amNamespace); | ||
|
|
||
| String fullZkTaskNameSpace = ZK_NAMESPACE_PREFIX; |
|
This PR is incomplete still lot of work is required, the tez am is not able to assign work to tez child but pushing it for feedback about implementation/design |
|
💔 -1 overall
This message was automatically generated. |
|
CC @abstractdog |
| LOG.info("Container with id: " + containerId | ||
| + " is invalid and will be killed"); | ||
| + " is not yet registered, asking it to wait"); | ||
| task = null; |
There was a problem hiding this comment.
this is done because org.apache.tez.dag.app.rm.TaskSchedulerContextImpl#containerAllocated() used in ZookeeperTaskScheduler.java is just incrementing the counter, for externally created containers it is sending shoulddie response but instead it should send back null. this will help ContainerReporter.java#callInteral() for loop condition to wait until ZookeeperTaskScheduler creates the container, and registers it in the AM.
| @VisibleForTesting | ||
| protected void fetchNext() throws InterruptedException, IOException { | ||
| try { | ||
| if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { |
There was a problem hiding this comment.
Shuffling is forced to read from disk i.e. shared path.
There was a problem hiding this comment.
we should clearly distinguish between local disk fetch and the containerized approach in the first implementation
how local disk fetch optimization works on a classic yarn cluster is that containers on the same host can copy data quickly: in docker containers we cannot assume that the local folder used for shuffle intermediate data is shared in any way, so to make this clear in the code, fetchers should be able to know from the config framework mode, that we're in standalone zookeeper mode and should not set up local fetch I believe
later, this same code path could still be optimized: e.g. fetchers to know that even if they are in TezChild containers, they are on the same node + the folder is shared, so they can leverage local fetch, but in the meantime, for simplicity's sake, we can just disable it (TODO + clarifying code comment can tell that this area needs further optimization)
however, a http fetch assumes a shuffle handler on the producer side, which might be an additional burden on this POC, because another design decision has to be made: how to run a ShuffleHandler, this is up to your fantasy
I think by a docker-compose you cannot co-locate tezchild-1 + shufflehandler-1, tez-child2 + shufflehandler-2 (to make them share the same hostname, you need to find a way to run a shuffle handler parallely for each tezchild, sharing the same local folders, so shuffle handlers can serve them)
note that if shufflehandler somehow runs in the same container/jvm, they see the same hostname, so you're sorted out with the local fetch optimization
this is obviously not the perfect production setup, but this ticket is the ground POC of running tez DAGs in a distributed way without yarn
|
|
||
| if (!sendEmptyPartitionDetails || outputGenerated) { | ||
| String host = context.getExecutionContext().getHostName(); | ||
| if (host == null) { |
There was a problem hiding this comment.
was causing NPE
There was a problem hiding this comment.
I believe we should not simply hardcode "localhost" here: if this caused an NPE, it means ExecutionContext needs to be improved and prepared for the unmanaged sessions scenario
let's assume that we're going to run TezChild containers as part of a docker-compose and later as k8s pods in a deployment or statefulset: so how we create an ExecutionContext now should be in line with that kind
e.g. currently we get NM_HOST:
this is Yarn-specific, but fortunately you worked on TEZ-4689, so whatever you did there for node context, it should be followed here for execution context as well
| nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL; | ||
| for (int idle = 1; containerTask == null; idle++) { | ||
| long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime); | ||
| long sleepTimeMilliSecs = Math.min(idle * 10000, getTaskMaxSleepTime); |
There was a problem hiding this comment.
should be configurable from TezConfiguration i guess
|
💔 -1 overall
This message was automatically generated. |
|
@Aggarwal-Raghav : this is amazing, and I'm glad to see that you made it work! |
89039db to
d837ca9
Compare
abstractdog
left a comment
There was a problem hiding this comment.
@Aggarwal-Raghav : I left a lot of comments that maybe clarified some vague parts
I'm not over this one yet, but I submitted the review to make you able to think about it in the meantime
| @ConfigurationScope(Scope.VERTEX) | ||
| @ConfigurationProperty | ||
| public static final String TEZ_TASK_REGISTRY_NAMESPACE = TEZ_TASK_PREFIX + "registry.namespace"; | ||
| public static final String TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT = "/tez_am/workers"; |
There was a problem hiding this comment.
/tez_am/workers looks a bit strange, a component is an am or a worker
as the zookeeper registry hasn't been released yet, we're free to modify the default values, what about:
for AM: /tez-am
for tasks: /tez-worker
so if we consider the root /tez-external-sessions prefix, they'll end up in:
/tez-external-sessions/tez-am
/tez-external-sessions/tez-worker
this way we can stick to hyphen for the static prefixes, and underscore for the dynamic container ids
additional changes needed: moving this below to TezConstants class and making it public:
ZK_NAMESPACE_PREFIX = "/tez-external-sessions";
| /** This method is no-op it's just informing AM to change container state */ | ||
| @Override | ||
| public void stopContainer(ContainerStopRequest stopRequest) { | ||
| LOG.info("Lifecycle is externally managed for container: {}", stopRequest.getContainerId()); |
There was a problem hiding this comment.
message should contain the actual fact also that this is a "stop" call
| /** This method is no-op it's just informing AM to change container state */ | ||
| @Override | ||
| public void launchContainer(ContainerLaunchRequest launchRequest) { | ||
| LOG.info("Container is externally managed: {}", launchRequest.getContainerId()); |
There was a problem hiding this comment.
message should contain the actual fact also that this is a "launch" call
| @VisibleForTesting | ||
| protected void fetchNext() throws InterruptedException, IOException { | ||
| try { | ||
| if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { |
There was a problem hiding this comment.
we should clearly distinguish between local disk fetch and the containerized approach in the first implementation
how local disk fetch optimization works on a classic yarn cluster is that containers on the same host can copy data quickly: in docker containers we cannot assume that the local folder used for shuffle intermediate data is shared in any way, so to make this clear in the code, fetchers should be able to know from the config framework mode, that we're in standalone zookeeper mode and should not set up local fetch I believe
later, this same code path could still be optimized: e.g. fetchers to know that even if they are in TezChild containers, they are on the same node + the folder is shared, so they can leverage local fetch, but in the meantime, for simplicity's sake, we can just disable it (TODO + clarifying code comment can tell that this area needs further optimization)
however, a http fetch assumes a shuffle handler on the producer side, which might be an additional burden on this POC, because another design decision has to be made: how to run a ShuffleHandler, this is up to your fantasy
I think by a docker-compose you cannot co-locate tezchild-1 + shufflehandler-1, tez-child2 + shufflehandler-2 (to make them share the same hostname, you need to find a way to run a shuffle handler parallely for each tezchild, sharing the same local folders, so shuffle handlers can serve them)
note that if shufflehandler somehow runs in the same container/jvm, they see the same hostname, so you're sorted out with the local fetch optimization
this is obviously not the perfect production setup, but this ticket is the ground POC of running tez DAGs in a distributed way without yarn
| if (meta == null) { | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
hm, need to know what is this a workaround for
| * **zookeeper:** Required by the Tez AM for standalone session | ||
| discovery | ||
|
|
||
| * **tez-am:** It automatically waits for Zookeeper and HDFS to | ||
| be healthy before starting up. | ||
|
|
||
| * **tez-child:** TBD |
There was a problem hiding this comment.
this list of containers is not a must-have here, docker-compose must be clean and self-descriptive
| 8. To mount custom plugins or JARs required by Tez AM (e.g., for split | ||
| generation — typically the hive-exec jar, but in general, any UDFs or | ||
| dependencies previously managed via YARN localization: | ||
|
|
||
| * Create a directory `tez-plugins` and add all required jars. | ||
|
|
||
| * Uncomment the following lines in docker compose under the `tez-am` | ||
| and `tez-child` services to mount this directory as a volume to | ||
| `/opt/tez/plugins` in the docker container. | ||
|
|
||
| ```yaml | ||
| volumes: | ||
| - ./tez-plugins:/opt/tez/plugins | ||
| ``` |
There was a problem hiding this comment.
this should be taken care of by the same as above in 6)
| defaultConf.getTrimmed(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, "12000-12000"); | ||
| port = Integer.parseInt(portRange.split("[-,]")[0]); | ||
|
|
||
| appId = amRecord.getApplicationId().toString(); |
There was a problem hiding this comment.
this part is crucial from design perspective: the current implementation here assumes the following behavior:
- AM starts first
- AM registers its application id in zookeeper
- Task container starts
- Task starts a registry client, and picks the first discoverable AM
let's assume a kubernetes cluster later, multiple tenants, so multiple applications, and therefore concurrent DAGs
I'm afraid that with the current implementation, we simply lose our control to start an application with the desired resources (so the desired amount of task containers), because task containers just start and pick the first AM from the zookeeper registry
instead, we should have a clear connection between them, which might be the application_id/container_id (which is similar to yarn world tez container mode), and when the task container starts, it already knows which application it belongs to: I guess here this logical connection is around because you already pass a container id for the TezChild
| int randomSeq = (int) (Math.random() * 900000) + 100000; | ||
| containerIdentifier = baseContainerId + "_01_" + randomSeq; | ||
|
|
||
| String zkQuorum = defaultConf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM); | ||
| ZkConfig zkconfig = new ZkConfig(defaultConf); | ||
| zkWorkerClient = CuratorFrameworkFactory.newClient(zkQuorum, zkconfig.getRetryPolicy()); | ||
| zkWorkerClient.start(); | ||
|
|
||
| // Create Ephemeral node representing this worker | ||
| String workerPath = zkconfig.getZkTaskNameSpace() + "/" + appId + "/" + containerIdentifier; | ||
| zkWorkerClient | ||
| .create() | ||
| .creatingParentsIfNeeded() | ||
| .withMode(CreateMode.EPHEMERAL) | ||
| .forPath(workerPath, host.getBytes(java.nio.charset.StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
instead of a random + create(), an incremental + retry logic should be implemented here:
- try to acquire
n, pass - if fails with
n, tryn+1
| assert args.length == 5; | ||
| host = args[0]; | ||
| port = Integer.parseInt(args[1]); | ||
| containerIdentifier = args[2]; | ||
| tokenIdentifier = args[3]; | ||
| attemptNumber = Integer.parseInt(args[4]); |
There was a problem hiding this comment.
instead of if else, acquiring these could be refactored to separate methods or even classes
also, if we feel that TezChild has changed so badly, I would also consider introducing class hierarchy for that (not sure at the moment if composition or inheritance), because another type of TezChild could also take care of special things in the same JVM (e.g. shuffle handler)
|
Thank you very much, @abstractdog . I will address those matters in due course, possibly by tomorrow or the day after. I truly appreciate your efforts. |




No description provided.