Skip to content

[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473

Draft
Aggarwal-Raghav wants to merge 3 commits into
apache:masterfrom
Aggarwal-Raghav:TEZ-4665
Draft

[DISCUSS][WIP]TEZ-4665: [Cloud] Unmanaged task containers - task container discovery#473
Aggarwal-Raghav wants to merge 3 commits into
apache:masterfrom
Aggarwal-Raghav:TEZ-4665

Conversation

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor

No description provided.

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor Author

With the docker-compose in TEZ-4700, the TezChild#main() constructor is working in ZK mode, now the run() method needs to be worked upon.
Now with this changes TezChild can find the applicationId written by TezAM in zk path /tez-external-sessions/tez_am/server/, In logs the line shows that this change is working

ZK Mode: Discovered AM application_1774893768744_0000 at tez-am:10001

tez-child.log

@tez-yetus

Copy link
Copy Markdown

💔 -1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 7m 28s Docker mode activated.
_ Prechecks _
+1 💚 dupname 0m 0s No case conflicting files found.
+0 🆗 detsecrets 0m 0s detect-secrets was not available.
+1 💚 @author 0m 0s The patch does not contain any @author tags.
-1 ❌ test4tests 0m 0s The patch doesn't appear to include any new or modified tests. Please justify why no new tests are needed for this patch. Also please list what manual steps were performed to verify this patch.
_ master Compile Tests _
+1 💚 mvninstall 6m 47s master passed
+1 💚 compile 0m 33s master passed
+1 💚 checkstyle 0m 35s master passed
+1 💚 javadoc 0m 31s master passed
+0 🆗 spotbugs 0m 59s tez-runtime-internals in master has 111 extant spotbugs warnings.
_ Patch Compile Tests _
+1 💚 mvninstall 0m 20s the patch passed
+1 💚 codespell 0m 51s No new issues.
+1 💚 compile 0m 18s the patch passed
+1 💚 javac 0m 18s the patch passed
+1 💚 blanks 0m 0s The patch has no blanks issues.
-0 ⚠️ checkstyle 0m 11s /results-checkstyle-tez-runtime-internals.txt tez-runtime-internals: The patch generated 1 new + 7 unchanged - 0 fixed = 8 total (was 7)
+1 💚 javadoc 0m 11s the patch passed
+1 💚 spotbugs 0m 54s the patch passed
_ Other Tests _
+1 💚 unit 0m 51s tez-runtime-internals in the patch passed.
+1 💚 asflicense 0m 9s The patch does not generate ASF License warnings.
21m 53s
Subsystem Report/Notes
Docker ClientAPI=1.54 ServerAPI=1.54 base: https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/1/artifact/out/Dockerfile
GITHUB PR #473
Optional Tests dupname asflicense javac javadoc unit spotbugs checkstyle codespell detsecrets compile
uname Linux 54480487ad69 5.15.0-164-generic #174-Ubuntu SMP Fri Nov 14 20:25:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality /home/jenkins/jenkins-agent/workspace/tez-multibranch_PR-473/src/.yetus/personality.sh
git revision master / ec55ca4
Default Java Ubuntu-21.0.10+7-Ubuntu-124.04
Test Results https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/1/testReport/
Max. process+thread count 251 (vs. ulimit of 5500)
modules C: tez-runtime-internals U: tez-runtime-internals
Console output https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/1/console
versions git=2.43.0 maven=3.8.7 spotbugs=4.9.3 codespell=2.4.1
Powered by Apache Yetus 0.15.1 https://yetus.apache.org

This message was automatically generated.

addIfService(containerHeartbeatHandler, true);

jobTokenSecretManager = new JobTokenSecretManager(amConf);
jobTokenSecretManager = TezCommonUtils.createJobTokenSecretManager(amConf);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is temporary change, I believe for each AM we need to wirte the token in a new zkPath and then the TezChild need to read that path to get the token and start the TezChild container.

amPluginDescriptorProto = confProto.getAmPluginDescriptor();
}

String frameworkMode = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Zookeeper based scheduler,launcher

}
}

if (!isLocal) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,139 [ERROR] [IPC Server handler 1 on default port 10001] |impl.DAGImpl|: Uncaught Exception when handling event DAG_INIT on Dag dag_1775213951724_0000_1 at currentState=NEW
2026-04-03 16:29:41.140 | java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.apache.tez.dag.app.AppContext.getTaskScheduerIdentifier(String)" is null
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.VertexImpl.<init>(VertexImpl.java:1133)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.createVertex(DAGImpl.java:1742)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.initializeDAG(DAGImpl.java:1606)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1878)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl$InitTransition.transition(DAGImpl.java:1855)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory$MultipleInternalArc.doTransition(StateMachineFactory.java:385)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory.access$500(StateMachineFactory.java:46)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:493)
2026-04-03 16:29:41.140 | 	at org.apache.tez.state.StateMachineTez.doTransition(StateMachineTez.java:63)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:1219)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.dag.impl.DAGImpl.handle(DAGImpl.java:160)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster$DagEventDispatcher.handle(DAGAppMaster.java:2292)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.startDAGExecution(DAGAppMaster.java:2735)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.startDAG(DAGAppMaster.java:2687)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.app.DAGAppMaster.submitDAGToAppMaster(DAGAppMaster.java:1389)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.DAGClientHandler.submitDAG(DAGClientHandler.java:143)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl.submitDAG(DAGClientAMProtocolBlockingPBServerImpl.java:187)
2026-04-03 16:29:41.140 | 	at org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC$DAGClientAMProtocol$2.callBlockingMethod(DAGClientAMProtocolRPC.java:9713)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server.processCall(ProtobufRpcEngine.java:484)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:595)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1228)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1246)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1169)
2026-04-03 16:29:41.140 | 	at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
2026-04-03 16:29:41.140 | 	at java.base/javax.security.auth.Subject.doAs(Subject.java:525)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
2026-04-03 16:29:41.140 | 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3198)
2026-04-03 16:29:41.140 | 2026-04-03 10:59:41,140 [INFO] [Dispatcher thread {Central}] |impl.DAGImpl|: dag_1775213951724_0000_1 terminating due to internal error. 

fullZkNamespace = appendNamespace(fullZkNamespace, namespace);
fullZkAMNamespace = appendNamespace(fullZkAMNamespace, amNamespace);

String fullZkTaskNameSpace = ZK_NAMESPACE_PREFIX;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TezChild will write its path in zk, the multiple container is because of multiple restart for tez child in docker compose, please ignore it

Image

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor Author

This PR is incomplete still lot of work is required, the tez am is not able to assign work to tez child but pushing it for feedback about implementation/design

@tez-yetus

Copy link
Copy Markdown

💔 -1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 7m 14s Docker mode activated.
_ Prechecks _
+1 💚 dupname 0m 0s No case conflicting files found.
+0 🆗 detsecrets 0m 0s detect-secrets was not available.
+0 🆗 xmllint 0m 1s xmllint was not available.
+0 🆗 shelldocs 0m 1s Shelldocs was not available.
+1 💚 @author 0m 0s The patch does not contain any @author tags.
+1 💚 test4tests 0m 0s The patch appears to include 3 new or modified test files.
_ master Compile Tests _
+0 🆗 mvndep 1m 25s Maven dependency ordering for branch
+1 💚 mvninstall 6m 12s master passed
+1 💚 compile 2m 53s master passed
+1 💚 checkstyle 2m 29s master passed
+1 💚 javadoc 2m 29s master passed
+0 🆗 spotbugs 1m 35s tez-api in master has 545 extant spotbugs warnings.
+0 🆗 spotbugs 0m 54s tez-runtime-internals in master has 111 extant spotbugs warnings.
+0 🆗 spotbugs 0m 44s tez-examples in master has 2 extant spotbugs warnings.
+0 🆗 spotbugs 1m 44s tez-dag in master has 749 extant spotbugs warnings.
+0 🆗 spotbugs 0m 31s branch/tez-dist no spotbugs output file (spotbugsXml.xml)
_ Patch Compile Tests _
+0 🆗 mvndep 0m 8s Maven dependency ordering for patch
+1 💚 mvninstall 1m 50s the patch passed
-1 ❌ codespell 0m 52s /results-codespell.txt The patch generated 1 new + 5 unchanged - 0 fixed = 6 total (was 5)
+1 💚 compile 1m 48s the patch passed
+1 💚 javac 1m 48s the patch passed
+1 💚 blanks 0m 0s The patch has no blanks issues.
-0 ⚠️ checkstyle 0m 15s /results-checkstyle-tez-api.txt tez-api: The patch generated 1 new + 51 unchanged - 0 fixed = 52 total (was 51)
-0 ⚠️ checkstyle 0m 11s /results-checkstyle-tez-runtime-internals.txt tez-runtime-internals: The patch generated 1 new + 7 unchanged - 0 fixed = 8 total (was 7)
-0 ⚠️ checkstyle 0m 25s /results-checkstyle-tez-dag.txt tez-dag: The patch generated 2 new + 450 unchanged - 0 fixed = 452 total (was 450)
+1 💚 hadolint 0m 1s No new issues.
+1 💚 markdownlint 0m 3s No new issues.
+1 💚 shellcheck 0m 2s No new issues.
+1 💚 yamllint 0m 0s No new issues.
+1 💚 javadoc 1m 12s the patch passed
-1 ❌ spotbugs 1m 30s /new-spotbugs-tez-api.html tez-api generated 1 new + 545 unchanged - 0 fixed = 546 total (was 545)
-1 ❌ spotbugs 0m 54s /new-spotbugs-tez-runtime-internals.html tez-runtime-internals generated 1 new + 111 unchanged - 0 fixed = 112 total (was 111)
-1 ❌ spotbugs 1m 47s /new-spotbugs-tez-dag.html tez-dag generated 1 new + 749 unchanged - 0 fixed = 750 total (was 749)
+0 🆗 spotbugs 0m 16s tez-dist has no data from spotbugs
_ Other Tests _
+1 💚 unit 2m 17s tez-api in the patch passed.
+1 💚 unit 0m 49s tez-runtime-internals in the patch passed.
+1 💚 unit 0m 16s tez-examples in the patch passed.
+1 💚 unit 5m 47s tez-dag in the patch passed.
+1 💚 unit 0m 14s tez-dist in the patch passed.
+1 💚 asflicense 0m 41s The patch does not generate ASF License warnings.
53m 5s
Reason Tests
SpotBugs module:tez-api
Found reliance on default encoding in org.apache.tez.common.TezCommonUtils.createJobTokenSecretManager(Configuration):in org.apache.tez.common.TezCommonUtils.createJobTokenSecretManager(Configuration): String.getBytes() At TezCommonUtils.java:[line 73]
SpotBugs module:tez-runtime-internals
org.apache.tez.runtime.task.TezChild.main(String[]) uses the nextDouble method of Random to generate a random integer; using nextInt is more efficient At TezChild.java:of Random to generate a random integer; using nextInt is more efficient At TezChild.java:[line 574]
SpotBugs module:tez-dag
Found reliance on default encoding in org.apache.tez.dag.app.rm.ZookeeperTaskScheduler.lambda$initialize$0(String, ChildData):in org.apache.tez.dag.app.rm.ZookeeperTaskScheduler.lambda$initialize$0(String, ChildData): new String(byte[]) At ZookeeperTaskScheduler.java:[line 103]
Subsystem Report/Notes
Docker ClientAPI=1.54 ServerAPI=1.54 base: https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/2/artifact/out/Dockerfile
GITHUB PR #473
Optional Tests dupname asflicense javac javadoc unit spotbugs checkstyle codespell detsecrets compile xmllint shellcheck shelldocs yamllint hadolint markdownlint
uname Linux a40cb1467d18 5.15.0-164-generic #174-Ubuntu SMP Fri Nov 14 20:25:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality /home/jenkins/jenkins-agent/workspace/tez-multibranch_PR-473/src/.yetus/personality.sh
git revision master / 4443ed5
Default Java Ubuntu-21.0.10+7-Ubuntu-124.04
Test Results https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/2/testReport/
Max. process+thread count 691 (vs. ulimit of 5500)
modules C: tez-api tez-runtime-internals tez-examples tez-dag tez-dist U: .
Console output https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/2/console
versions git=2.43.0 maven=3.8.7 hadolint=1.18.0-0-g76eee5c spotbugs=4.9.3 codespell=2.4.1 markdownlint=0.46.0 shellcheck=0.7.1 yamllint=1.38.0
Powered by Apache Yetus 0.15.1 https://yetus.apache.org

This message was automatically generated.

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor Author

ok, made it work through some hacks.

docker exec -u root tez-am chmod -R 777 /tmp /data
docker exec -u root tez-child-1 chmod -R 777 /tmp /data
docker exec -u root tez-child-2 chmod -R 777 /tmp /data
docker exec -u root tez-am chmod 777 /data
docker exec -it tez-am /bin/bash
echo "Hello world Hello" > /data/input.txt

java -cp ./*:./lib/*:tez-examples-1.0.0-SNAPSHOT.jar org.apache.tez.examples.ExternalAmWordCount file:///data/input.txt file:///data/output
Screenshot 2026-04-06 at 12 15 08 AM Screenshot 2026-04-06 at 12 16 24 AM Screenshot 2026-04-06 at 12 18 52 AM

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor Author

CC @abstractdog

LOG.info("Container with id: " + containerId
+ " is invalid and will be killed");
+ " is not yet registered, asking it to wait");
task = null;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done because org.apache.tez.dag.app.rm.TaskSchedulerContextImpl#containerAllocated() used in ZookeeperTaskScheduler.java is just incrementing the counter, for externally created containers it is sending shoulddie response but instead it should send back null. this will help ContainerReporter.java#callInteral() for loop condition to wait until ZookeeperTaskScheduler creates the container, and registers it in the AM.

@VisibleForTesting
protected void fetchNext() throws InterruptedException, IOException {
try {
if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffling is forced to read from disk i.e. shared path.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should clearly distinguish between local disk fetch and the containerized approach in the first implementation
how local disk fetch optimization works on a classic yarn cluster is that containers on the same host can copy data quickly: in docker containers we cannot assume that the local folder used for shuffle intermediate data is shared in any way, so to make this clear in the code, fetchers should be able to know from the config framework mode, that we're in standalone zookeeper mode and should not set up local fetch I believe
later, this same code path could still be optimized: e.g. fetchers to know that even if they are in TezChild containers, they are on the same node + the folder is shared, so they can leverage local fetch, but in the meantime, for simplicity's sake, we can just disable it (TODO + clarifying code comment can tell that this area needs further optimization)

however, a http fetch assumes a shuffle handler on the producer side, which might be an additional burden on this POC, because another design decision has to be made: how to run a ShuffleHandler, this is up to your fantasy

I think by a docker-compose you cannot co-locate tezchild-1 + shufflehandler-1, tez-child2 + shufflehandler-2 (to make them share the same hostname, you need to find a way to run a shuffle handler parallely for each tezchild, sharing the same local folders, so shuffle handlers can serve them)
note that if shufflehandler somehow runs in the same container/jvm, they see the same hostname, so you're sorted out with the local fetch optimization

this is obviously not the perfect production setup, but this ticket is the ground POC of running tez DAGs in a distributed way without yarn


if (!sendEmptyPartitionDetails || outputGenerated) {
String host = context.getExecutionContext().getHostName();
if (host == null) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was causing NPE

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should not simply hardcode "localhost" here: if this caused an NPE, it means ExecutionContext needs to be improved and prepared for the unmanaged sessions scenario
let's assume that we're going to run TezChild containers as part of a docker-compose and later as k8s pods in a deployment or statefulset: so how we create an ExecutionContext now should be in line with that kind
e.g. currently we get NM_HOST:

System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),

this is Yarn-specific, but fortunately you worked on TEZ-4689, so whatever you did there for node context, it should be followed here for execution context as well

nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
for (int idle = 1; containerTask == null; idle++) {
long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
long sleepTimeMilliSecs = Math.min(idle * 10000, getTaskMaxSleepTime);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be configurable from TezConfiguration i guess

@tez-yetus

Copy link
Copy Markdown

💔 -1 overall

Vote Subsystem Runtime Logfile Comment
+0 🆗 reexec 0m 16s Docker mode activated.
_ Prechecks _
+1 💚 dupname 0m 0s No case conflicting files found.
+0 🆗 detsecrets 0m 0s detect-secrets was not available.
+0 🆗 xmllint 0m 0s xmllint was not available.
+0 🆗 shelldocs 0m 1s Shelldocs was not available.
+1 💚 @author 0m 0s The patch does not contain any @author tags.
+1 💚 test4tests 0m 0s The patch appears to include 3 new or modified test files.
_ master Compile Tests _
+0 🆗 mvndep 0m 55s Maven dependency ordering for branch
+1 💚 mvninstall 6m 18s master passed
+1 💚 compile 3m 38s master passed
+1 💚 checkstyle 2m 59s master passed
+1 💚 javadoc 3m 5s master passed
+0 🆗 spotbugs 1m 32s tez-api in master has 545 extant spotbugs warnings.
+0 🆗 spotbugs 0m 53s tez-runtime-internals in master has 111 extant spotbugs warnings.
+0 🆗 spotbugs 1m 13s tez-runtime-library in master has 235 extant spotbugs warnings.
+0 🆗 spotbugs 0m 44s tez-examples in master has 2 extant spotbugs warnings.
+0 🆗 spotbugs 1m 44s tez-dag in master has 749 extant spotbugs warnings.
+0 🆗 spotbugs 0m 31s branch/tez-dist no spotbugs output file (spotbugsXml.xml)
_ Patch Compile Tests _
+0 🆗 mvndep 0m 9s Maven dependency ordering for patch
+1 💚 mvninstall 2m 17s the patch passed
-1 ❌ codespell 0m 51s /results-codespell.txt The patch generated 1 new + 9 unchanged - 0 fixed = 10 total (was 9)
+1 💚 compile 2m 13s the patch passed
+1 💚 javac 2m 13s the patch passed
+1 💚 blanks 0m 0s The patch has no blanks issues.
-0 ⚠️ checkstyle 0m 15s /results-checkstyle-tez-api.txt tez-api: The patch generated 1 new + 51 unchanged - 0 fixed = 52 total (was 51)
-0 ⚠️ checkstyle 0m 12s /results-checkstyle-tez-runtime-internals.txt tez-runtime-internals: The patch generated 1 new + 9 unchanged - 0 fixed = 10 total (was 9)
+1 💚 checkstyle 0m 18s tez-runtime-library: The patch generated 0 new + 44 unchanged - 1 fixed = 44 total (was 45)
+1 💚 checkstyle 0m 10s The patch passed checkstyle in tez-examples
-0 ⚠️ checkstyle 0m 26s /results-checkstyle-tez-dag.txt tez-dag: The patch generated 2 new + 470 unchanged - 0 fixed = 472 total (was 470)
+1 💚 checkstyle 0m 7s The patch passed checkstyle in tez-dist
+1 💚 hadolint 0m 1s No new issues.
+1 💚 markdownlint 0m 2s No new issues.
+1 💚 shellcheck 0m 0s No new issues.
+1 💚 yamllint 0m 0s No new issues.
+1 💚 javadoc 1m 35s the patch passed
-1 ❌ spotbugs 1m 30s /new-spotbugs-tez-api.html tez-api generated 1 new + 545 unchanged - 0 fixed = 546 total (was 545)
-1 ❌ spotbugs 0m 55s /new-spotbugs-tez-runtime-internals.html tez-runtime-internals generated 1 new + 111 unchanged - 0 fixed = 112 total (was 111)
-1 ❌ spotbugs 1m 46s /new-spotbugs-tez-dag.html tez-dag generated 1 new + 749 unchanged - 0 fixed = 750 total (was 749)
+0 🆗 spotbugs 0m 16s tez-dist has no data from spotbugs
_ Other Tests _
+1 💚 unit 2m 17s tez-api in the patch passed.
+1 💚 unit 0m 50s tez-runtime-internals in the patch passed.
-1 ❌ unit 5m 54s /patch-unit-tez-runtime-library.txt tez-runtime-library in the patch passed.
+1 💚 unit 0m 18s tez-examples in the patch passed.
+1 💚 unit 5m 56s tez-dag in the patch passed.
+1 💚 unit 0m 14s tez-dist in the patch passed.
+1 💚 asflicense 0m 51s The patch does not generate ASF License warnings.
58m 11s
Reason Tests
SpotBugs module:tez-api
Found reliance on default encoding in org.apache.tez.common.TezCommonUtils.createJobTokenSecretManager(Configuration):in org.apache.tez.common.TezCommonUtils.createJobTokenSecretManager(Configuration): String.getBytes() At TezCommonUtils.java:[line 73]
SpotBugs module:tez-runtime-internals
org.apache.tez.runtime.task.TezChild.main(String[]) uses the nextDouble method of Random to generate a random integer; using nextInt is more efficient At TezChild.java:of Random to generate a random integer; using nextInt is more efficient At TezChild.java:[line 574]
SpotBugs module:tez-dag
Found reliance on default encoding in org.apache.tez.dag.app.rm.ZookeeperTaskScheduler.lambda$initialize$0(String, ChildData):in org.apache.tez.dag.app.rm.ZookeeperTaskScheduler.lambda$initialize$0(String, ChildData): new String(byte[]) At ZookeeperTaskScheduler.java:[line 103]
Failed junit tests tez.runtime.library.common.shuffle.orderedgrouped.TestFetcher
Subsystem Report/Notes
Docker ClientAPI=1.54 ServerAPI=1.54 base: https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/3/artifact/out/Dockerfile
GITHUB PR #473
Optional Tests dupname asflicense javac javadoc unit spotbugs checkstyle codespell detsecrets compile xmllint shellcheck shelldocs yamllint hadolint markdownlint
uname Linux e384ef1434e9 5.15.0-164-generic #174-Ubuntu SMP Fri Nov 14 20:25:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Build tool maven
Personality /home/jenkins/jenkins-agent/workspace/tez-multibranch_PR-473/src/.yetus/personality.sh
git revision master / 89039db
Default Java Ubuntu-21.0.10+7-Ubuntu-124.04
Test Results https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/3/testReport/
Max. process+thread count 1109 (vs. ulimit of 5500)
modules C: tez-api tez-runtime-internals tez-runtime-library tez-examples tez-dag tez-dist U: .
Console output https://ci-hadoop.apache.org/job/tez-multibranch/job/PR-473/3/console
versions git=2.43.0 maven=3.8.7 hadolint=1.18.0-0-g76eee5c spotbugs=4.9.3 codespell=2.4.1 markdownlint=0.46.0 shellcheck=0.7.1 yamllint=1.38.0
Powered by Apache Yetus 0.15.1 https://yetus.apache.org

This message was automatically generated.

@abstractdog

Copy link
Copy Markdown
Contributor

@Aggarwal-Raghav : this is amazing, and I'm glad to see that you made it work!
I'll find some time to review this further next week, promise
in the meantime, can you extract TEZ-4700 related changes from this patch to make this a bit smaller, this is already very big
we can make TEZ-4700 much easier to happen I guess

@abstractdog abstractdog left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aggarwal-Raghav : I left a lot of comments that maybe clarified some vague parts
I'm not over this one yet, but I submitted the review to make you able to think about it in the meantime

@ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_TASK_REGISTRY_NAMESPACE = TEZ_TASK_PREFIX + "registry.namespace";
public static final String TEZ_TASK_REGISTRY_NAMESPACE_DEFAULT = "/tez_am/workers";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/tez_am/workers looks a bit strange, a component is an am or a worker
as the zookeeper registry hasn't been released yet, we're free to modify the default values, what about:
for AM: /tez-am
for tasks: /tez-worker

so if we consider the root /tez-external-sessions prefix, they'll end up in:

/tez-external-sessions/tez-am
/tez-external-sessions/tez-worker

this way we can stick to hyphen for the static prefixes, and underscore for the dynamic container ids

additional changes needed: moving this below to TezConstants class and making it public:

ZK_NAMESPACE_PREFIX = "/tez-external-sessions";

/** This method is no-op it's just informing AM to change container state */
@Override
public void stopContainer(ContainerStopRequest stopRequest) {
LOG.info("Lifecycle is externally managed for container: {}", stopRequest.getContainerId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message should contain the actual fact also that this is a "stop" call

/** This method is no-op it's just informing AM to change container state */
@Override
public void launchContainer(ContainerLaunchRequest launchRequest) {
LOG.info("Container is externally managed: {}", launchRequest.getContainerId());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message should contain the actual fact also that this is a "launch" call

@VisibleForTesting
protected void fetchNext() throws InterruptedException, IOException {
try {
if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should clearly distinguish between local disk fetch and the containerized approach in the first implementation
how local disk fetch optimization works on a classic yarn cluster is that containers on the same host can copy data quickly: in docker containers we cannot assume that the local folder used for shuffle intermediate data is shared in any way, so to make this clear in the code, fetchers should be able to know from the config framework mode, that we're in standalone zookeeper mode and should not set up local fetch I believe
later, this same code path could still be optimized: e.g. fetchers to know that even if they are in TezChild containers, they are on the same node + the folder is shared, so they can leverage local fetch, but in the meantime, for simplicity's sake, we can just disable it (TODO + clarifying code comment can tell that this area needs further optimization)

however, a http fetch assumes a shuffle handler on the producer side, which might be an additional burden on this POC, because another design decision has to be made: how to run a ShuffleHandler, this is up to your fantasy

I think by a docker-compose you cannot co-locate tezchild-1 + shufflehandler-1, tez-child2 + shufflehandler-2 (to make them share the same hostname, you need to find a way to run a shuffle handler parallely for each tezchild, sharing the same local folders, so shuffle handlers can serve them)
note that if shufflehandler somehow runs in the same container/jvm, they see the same hostname, so you're sorted out with the local fetch optimization

this is obviously not the perfect production setup, but this ticket is the ground POC of running tez DAGs in a distributed way without yarn

Comment on lines +274 to +276
if (meta == null) {
return 0;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, need to know what is this a workaround for

Comment on lines +157 to +163
* **zookeeper:** Required by the Tez AM for standalone session
discovery

* **tez-am:** It automatically waits for Zookeeper and HDFS to
be healthy before starting up.

* **tez-child:** TBD

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this list of containers is not a must-have here, docker-compose must be clean and self-descriptive

Comment on lines +183 to +196
8. To mount custom plugins or JARs required by Tez AM (e.g., for split
generation — typically the hive-exec jar, but in general, any UDFs or
dependencies previously managed via YARN localization:

* Create a directory `tez-plugins` and add all required jars.

* Uncomment the following lines in docker compose under the `tez-am`
and `tez-child` services to mount this directory as a volume to
`/opt/tez/plugins` in the docker container.

```yaml
volumes:
- ./tez-plugins:/opt/tez/plugins
```

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be taken care of by the same as above in 6)

defaultConf.getTrimmed(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, "12000-12000");
port = Integer.parseInt(portRange.split("[-,]")[0]);

appId = amRecord.getApplicationId().toString();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is crucial from design perspective: the current implementation here assumes the following behavior:

  1. AM starts first
  2. AM registers its application id in zookeeper
  3. Task container starts
  4. Task starts a registry client, and picks the first discoverable AM

let's assume a kubernetes cluster later, multiple tenants, so multiple applications, and therefore concurrent DAGs
I'm afraid that with the current implementation, we simply lose our control to start an application with the desired resources (so the desired amount of task containers), because task containers just start and pick the first AM from the zookeeper registry

instead, we should have a clear connection between them, which might be the application_id/container_id (which is similar to yarn world tez container mode), and when the task container starts, it already knows which application it belongs to: I guess here this logical connection is around because you already pass a container id for the TezChild

Comment on lines +574 to +588
int randomSeq = (int) (Math.random() * 900000) + 100000;
containerIdentifier = baseContainerId + "_01_" + randomSeq;

String zkQuorum = defaultConf.get(TezConfiguration.TEZ_AM_ZOOKEEPER_QUORUM);
ZkConfig zkconfig = new ZkConfig(defaultConf);
zkWorkerClient = CuratorFrameworkFactory.newClient(zkQuorum, zkconfig.getRetryPolicy());
zkWorkerClient.start();

// Create Ephemeral node representing this worker
String workerPath = zkconfig.getZkTaskNameSpace() + "/" + appId + "/" + containerIdentifier;
zkWorkerClient
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(workerPath, host.getBytes(java.nio.charset.StandardCharsets.UTF_8));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a random + create(), an incremental + retry logic should be implemented here:

  1. try to acquire n, pass
  2. if fails with n, try n+1

Comment on lines +601 to +606
assert args.length == 5;
host = args[0];
port = Integer.parseInt(args[1]);
containerIdentifier = args[2];
tokenIdentifier = args[3];
attemptNumber = Integer.parseInt(args[4]);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of if else, acquiring these could be refactored to separate methods or even classes
also, if we feel that TezChild has changed so badly, I would also consider introducing class hierarchy for that (not sure at the moment if composition or inheritance), because another type of TezChild could also take care of special things in the same JVM (e.g. shuffle handler)

@Aggarwal-Raghav

Copy link
Copy Markdown
Contributor Author

Thank you very much, @abstractdog . I will address those matters in due course, possibly by tomorrow or the day after. I truly appreciate your efforts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants