Skip to content

[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455

Open
pan3793 wants to merge 3 commits into
apache:masterfrom
pan3793:yarn-agg-log
Open

[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455
pan3793 wants to merge 3 commits into
apache:masterfrom
pan3793:yarn-agg-log

Conversation

@pan3793
Copy link
Copy Markdown
Member

@pan3793 pan3793 commented May 17, 2026

Why are the changes needed?

Close #6832. This connector gives the Hadoop administrator a chance to analyze YARN aggregated logs at the cluster level, for example, aggregate container logs across applications by host to diagnose potential host hardware issues

The current initial implementation has several limitations:

  • feat: only support TFile, but not IFile
  • perf: only supports pushing down app_id, user, host filters, does not support pushing down container_id, log_type, though they are supposed to be selective
  • perf: listing aggregated log files runs in a single thread during the planning phase - for a large cluster, it should run in parallel on the driver side or launch a job to do that on the executor side.
  • etc.

TODO: unit tests and user docs

How was this patch tested?

UT is not added yet, as this requires a real YARN cluster with history agg logs.

$ kyuubi-beeline -u 'jdbc:kyuubi://spark-dev1.foo.bar:10009/default' \
  --conf spark.jars=/tmp/kyuubi-spark-connector-yarn_2.12-1.12.0-SNAPSHOT.jar \
  --conf spark.sql.catalog.yarn=org.apache.kyuubi.spark.connector.yarn.YarnCatalog
0: > select
. .>   mtime, app_id, container_id, host, log_type, message
. .> from yarn.app_logs
. .> where
. .>   user = 'hadoop'
. .>   and host = 'spark-dev2.foo.bar'
. .>   and message like '%ERROR%'
. .>   and message not like '%RECEIVED SIGNAL TERM%'
. .>   and message not like '%Aborting task%'
. .> limit 2;
...
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
|          mtime           |             app_id              |              container_id               |        host         | log_type  |                      message                       |
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
| 2025-04-03 18:07:18.893  | application_1743671377509_0001  | container_1743671377509_0001_01_000001  | spark-dev2.foo.bar  | stdout    | 25/04/03 18:07:15 ERROR ApplicationMaster$AMEndpoint: Driver terminated with exit code 1! Shutting down. spark-dev1.foo.bar:16601 |
| 2025-04-03 18:07:18.893  | application_1743671377509_0001  | container_1743671377509_0001_01_000001  | spark-dev2.foo.bar  | stdout    | 25/04/03 18:07:15 ERROR ApplicationMaster$AMEndpoint: Driver terminated with exit code 1! Shutting down. spark-dev1.foo.bar:16601 |
+--------------------------+---------------------------------+-----------------------------------------+---------------------+-----------+----------------------------------------------------+
2 rows selected (0.648 seconds)

Was this patch authored or co-authored using generative AI tooling?

Assisted-by: Claude Opus 4.7.


<artifactId>kyuubi-spark-connector-yarn_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Spark Hadoop YARN Connector</name>
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a generic name - we may extend it to include other interactions with YARN, for example, use YarnClient to retrieve app list from RM, implement STORE PROCEDUREs which are equivalent to the yarn commands

@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 17, 2026

This is a longstanding missing feature for Hadoop. YARN-1440 was raised in 2013 - Yarn aggregated logs are difficult for external tools to understand. Large-scale log processing is a typical use case for Hadoop, I'm surprised there is not an out-of-the-box solution in 2026 to analyze logs of applications run in Hadoop YARN in batch.

@aajisaka @wForget @cxzl25, could you please take a look? and also would like to know if you have any better ideas to process yarn aggregate logs across the application

@aajisaka
Copy link
Copy Markdown
Member

I agreed it's a long standing missing feature. I'll take a look further.

private def listFilesWithFilters(): Array[FileStatus] = {
val baseDir = remoteAppLogDir
val bucketDir = s"bucket-$remoteAppLogDirSuffix-tfile"
var path = s"$baseDir/{{USER}}/$bucketDir/{{BUCKET}}/{{APP_ID}}/{{HOST}}_*"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we should be able to support paths without YARN-6929, which does not have buckets.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for pointing this out! YARN-6929 landed in Hadoop 3.3.0, I'm reading the branch-3.3 code, so missing this part, will add a TODO here for now.

logWarning(s"Unsupported filter: $f")
}
val globPath = path
.replace("{{BUCKET}}", "*") // TODO parallize bucket listing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an app_id is entered, the bucket can also be calculated in advance to improve list performance.

org.apache.hadoop.yarn.logaggregation.LogAggregationUtils#getRemoteBucketDir

    int bucket = appId.getId() % 10000;
    String bucketDir = String.format("%04d", bucket);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks for pointing this out

Copy link
Copy Markdown
Member

@aajisaka aajisaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a sample YARN aggregated log and create a unit test to verify?

val fs = path.getFileSystem(hadoopConf)

val start = System.currentTimeMillis()
val files = fs.globStatus(path)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of Observer NameNode setup, would you call fs.msync() before listing files to avoid stale reads?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't experienced Observer NameNode ...

according to the API javadocs

Synchronize client metadata state.
In some FileSystem implementations such as HDFS metadata synchronization is essential to guarantee consistency of read requests particularly in HA setting.

and seems non-HDFS will throw UnsupportedOperationException?

BTW, is fs.msync() also expected to be called for listing data files on scanning Hive tables? I think we don't do that ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-HDFS will throw UnsupportedOperationException?

Yes. We need to ignore if it's unsupported. Example in Delta Lake: https://github.com/delta-io/delta/blob/b5e5aecb69cfba6d5a1f4b1ecce81681414051ac/storage/src/main/java/io/delta/storage/HDFSLogStore.java#L171

BTW, is fs.msync() also expected to be called for listing data files on scanning Hive tables? I think we don't do that ...

We don't usually do because it's new and minor setup, but yes, fs.msync() is also expected to be called.

Comment on lines +40 to +42
override def listTables(namespace: Array[String]): Array[Identifier] = {
Array(Identifier.of(namespace, "app_logs"))
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catalog is not aware of namespace and it makes catalog.any_namespace.app_logs resolve to the same table. We need to at least document the limitation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not intended, will limit it to catalog.app_logs

@pan3793
Copy link
Copy Markdown
Member Author

pan3793 commented May 18, 2026

Can we create a sample YARN aggregated log and create a unit test to verify?

yeah, I plan to do that.

Copy link
Copy Markdown
Member

@wForget wForget left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pan3793 , LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Impl Spark DSv2 YARN Connector that supports reading YARN aggregation logs

4 participants