[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455
[KYUUBI #6832] Initial impl Spark DSv2 YARN Connector that supports reading YARN aggregated logs#7455pan3793 wants to merge 3 commits into
Conversation
|
|
||
| <artifactId>kyuubi-spark-connector-yarn_${scala.binary.version}</artifactId> | ||
| <packaging>jar</packaging> | ||
| <name>Kyuubi Spark Hadoop YARN Connector</name> |
There was a problem hiding this comment.
Use a generic name - we may extend it to include other interactions with YARN, for example, use YarnClient to retrieve app list from RM, implement STORE PROCEDUREs which are equivalent to the yarn commands
|
This is a longstanding missing feature for Hadoop. YARN-1440 was raised in 2013 - Yarn aggregated logs are difficult for external tools to understand. Large-scale log processing is a typical use case for Hadoop, I'm surprised there is not an out-of-the-box solution in 2026 to analyze logs of applications run in Hadoop YARN in batch. @aajisaka @wForget @cxzl25, could you please take a look? and also would like to know if you have any better ideas to process yarn aggregate logs across the application |
|
I agreed it's a long standing missing feature. I'll take a look further. |
| private def listFilesWithFilters(): Array[FileStatus] = { | ||
| val baseDir = remoteAppLogDir | ||
| val bucketDir = s"bucket-$remoteAppLogDirSuffix-tfile" | ||
| var path = s"$baseDir/{{USER}}/$bucketDir/{{BUCKET}}/{{APP_ID}}/{{HOST}}_*" |
There was a problem hiding this comment.
In the future, we should be able to support paths without YARN-6929, which does not have buckets.
There was a problem hiding this comment.
thank you for pointing this out! YARN-6929 landed in Hadoop 3.3.0, I'm reading the branch-3.3 code, so missing this part, will add a TODO here for now.
| logWarning(s"Unsupported filter: $f") | ||
| } | ||
| val globPath = path | ||
| .replace("{{BUCKET}}", "*") // TODO parallize bucket listing |
There was a problem hiding this comment.
If an app_id is entered, the bucket can also be calculated in advance to improve list performance.
org.apache.hadoop.yarn.logaggregation.LogAggregationUtils#getRemoteBucketDir
int bucket = appId.getId() % 10000;
String bucketDir = String.format("%04d", bucket);There was a problem hiding this comment.
good point, thanks for pointing this out
aajisaka
left a comment
There was a problem hiding this comment.
Can we create a sample YARN aggregated log and create a unit test to verify?
| val fs = path.getFileSystem(hadoopConf) | ||
|
|
||
| val start = System.currentTimeMillis() | ||
| val files = fs.globStatus(path) |
There was a problem hiding this comment.
In case of Observer NameNode setup, would you call fs.msync() before listing files to avoid stale reads?
There was a problem hiding this comment.
haven't experienced Observer NameNode ...
according to the API javadocs
Synchronize client metadata state.
In some FileSystem implementations such as HDFS metadata synchronization is essential to guarantee consistency of read requests particularly in HA setting.
and seems non-HDFS will throw UnsupportedOperationException?
BTW, is fs.msync() also expected to be called for listing data files on scanning Hive tables? I think we don't do that ...
There was a problem hiding this comment.
non-HDFS will throw UnsupportedOperationException?
Yes. We need to ignore if it's unsupported. Example in Delta Lake: https://github.com/delta-io/delta/blob/b5e5aecb69cfba6d5a1f4b1ecce81681414051ac/storage/src/main/java/io/delta/storage/HDFSLogStore.java#L171
BTW, is fs.msync() also expected to be called for listing data files on scanning Hive tables? I think we don't do that ...
We don't usually do because it's new and minor setup, but yes, fs.msync() is also expected to be called.
| override def listTables(namespace: Array[String]): Array[Identifier] = { | ||
| Array(Identifier.of(namespace, "app_logs")) | ||
| } |
There was a problem hiding this comment.
This catalog is not aware of namespace and it makes catalog.any_namespace.app_logs resolve to the same table. We need to at least document the limitation.
There was a problem hiding this comment.
it's not intended, will limit it to catalog.app_logs
yeah, I plan to do that. |
Why are the changes needed?
Close #6832. This connector gives the Hadoop administrator a chance to analyze YARN aggregated logs at the cluster level, for example, aggregate container logs across applications by host to diagnose potential host hardware issues
The current initial implementation has several limitations:
TFile, but notIFileapp_id,user,hostfilters, does not support pushing downcontainer_id,log_type, though they are supposed to be selectiveTODO: unit tests and user docs
How was this patch tested?
UT is not added yet, as this requires a real YARN cluster with history agg logs.
Was this patch authored or co-authored using generative AI tooling?
Assisted-by: Claude Opus 4.7.