diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 06b6045cccd99..acc649085a0f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,8 +26,7 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -227,7 +226,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") } parentPool.addSchedulable(manager) - logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + - log"${MDC(LogKeys.POOL_NAME, poolName)}") + + logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( + properties, + log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " + + log"${MDC(LogKeys.POOL_NAME, poolName)}" + ) + ) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala new file mode 100644 index 0000000000000..38df3afe43344 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{Locale, Properties} + +import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext} + +/** + * A logging trait for scheduler components where log messages should include + * structured streaming identifiers (query ID and batch ID). + * + * Streaming execution sets these identifiers via + * [[org.apache.spark.SparkContext#setLocalProperty]], which is thread-local. + * Scheduler code typically runs on a different thread (e.g. the + * task-scheduler-event-loop-worker), so `getLocalProperty` would not have + * the streaming context. This trait instead reads the identifiers from the + * task's [[java.util.Properties]], which are propagated with the + * [[org.apache.spark.scheduler.TaskSet]] across thread boundaries. + * + * Mix this trait into any scheduler component that has access to task + * properties and needs streaming-aware log output. + */ +private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { + // we gather the query and batch Id from the properties of a given TaskSet + protected def properties: Properties + + override protected def logInfo(msg: => String): Unit = + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logInfo(entry: LogEntry): Unit = { + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logWarning(msg: => String): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logWarning(entry: LogEntry): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logWarning(msg: => String, t: Throwable): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logWarning(entry: LogEntry, t: Throwable): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } + + override protected def logDebug(msg: => String): Unit = + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logDebug(entry: LogEntry): Unit = { + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logError(msg: => String): Unit = + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logError(entry: LogEntry): Unit = { + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logTrace(msg: => String): Unit = + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logTrace(entry: LogEntry): Unit = { + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } +} + +/** + * Helpers for constructing log entries enriched with structured streaming + * identifiers extracted from task properties. + */ +private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging { + val QUERY_ID_KEY = "sql.streaming.queryId" + val BATCH_ID_KEY = "streaming.sql.batchId" + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + entry: LogEntry): LogEntry = { + if (properties == null) { + return entry + } + + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + + // formatMessage truncates the queryId for readability + // so we use a blank messageWithContext to overwrite the full query Id to the context + formatMessage( + queryId, + batchId, + entry + ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) + } + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + msg: => String): LogEntry = { + if (properties == null) { + return new LogEntry( + MessageWithContext(msg, java.util.Collections.emptyMap()) + ) + } + + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + new LogEntry({ + MessageWithContext( + formatMessage( + queryId, + batchId, + msg + ), + constructStreamingContext(queryId, batchId) + ) + }) + } + + private def constructStreamingContext( + queryId: Option[String], + batchId: Option[String]): + java.util.HashMap[String, String] = { + val streamingContext = new java.util.HashMap[String, String]() + // MDC places the log key in the context as all lowercase, so we do the same here + queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) + batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) + streamingContext + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => String): String = { + val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg) + queryId.map(qId => s"[queryId = ${qId.take(5)}] $msgWithBatchId").getOrElse(msgWithBatchId) + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => LogEntry): MessageWithContext = { + val msgWithBatchId: MessageWithContext = batchId.map( + bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) + ).getOrElse(toMessageWithContext(msg)) + queryId.map( + qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " + msgWithBatchId + ).getOrElse(msgWithBatchId) + } + + private def toMessageWithContext(entry: LogEntry): MessageWithContext = { + MessageWithContext(entry.message, entry.context) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c077a7a3bbb8..1f1d67d020812 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer +import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -30,7 +31,7 @@ import org.apache.spark.InternalAccumulator import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging, LogKeys} +import org.apache.spark.internal.{config, LogKeys} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ @@ -58,10 +59,12 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, healthTracker: Option[HealthTracker] = None, - clock: Clock = new SystemClock()) extends Schedulable with Logging { + clock: Clock = new SystemClock()) + extends Schedulable with StructuredStreamingIdAwareSchedulerLogging { - private val conf = sched.sc.conf + override def properties: Properties = taskSet.properties + private val conf = sched.sc.conf val maxResultSize = conf.get(config.MAX_RESULT_SIZE) // Serializer for closures and tasks. diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 30ed80dbe848d..873ec4023c9cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -367,6 +367,44 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-56326: Fair Scheduler addTaskSetManager logs include " + + "streaming query Id and batch Id") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) + schedulableBuilder.buildPools() + + val testQueryId = "test-query-id-5678" + val testBatchId = "99" + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1") + properties.setProperty("sql.streaming.queryId", testQueryId) + properties.setProperty("streaming.sql.batchId", testBatchId) + + val taskSetManager = createTaskSetManager(0, 1, taskScheduler) + + val logAppender = new LogAppender("pool streaming logs", maxEvents = 1000) + val loggerName = classOf[FairSchedulableBuilder].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + schedulableBuilder.addTaskSetManager(taskSetManager, properties) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + val addedLogs = logs.filter(msg => + msg.contains("Added task set") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(addedLogs.nonEmpty, + s"Expected 'Added task set' log to contain '$expectedQueryPrefix' " + + s"and '$expectedBatchPrefix'.\nCaptured logs:\n${logs.mkString("\n")}") + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala new file mode 100644 index 0000000000000..d314ce29aef5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{Locale, Properties} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.{Logging, LogKey, LogKeys} +import org.apache.spark.scheduler.StructuredStreamingIdAwareSchedulerLogging.{ + BATCH_ID_KEY, + QUERY_ID_KEY +} + +class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { + + override def beforeAll(): Unit = { + super.beforeAll() + Logging.enableStructuredLogging() + } + + override def afterAll(): Unit = { + Logging.disableStructuredLogging() + super.afterAll() + } + + private def assertContextValue( + context: java.util.Map[String, String], + key: LogKey, + expected: String): Unit = { + assert(context.get(key.name.toLowerCase(Locale.ROOT)) === expected) + } + + private def assertContextAbsent( + context: java.util.Map[String, String], + key: LogKey): Unit = { + assert(!context.containsKey(key.name.toLowerCase(Locale.ROOT))) + } + + private val testQueryId = "abc-query-id" + private val testBatchId = "42" + + private def propsWithBothIds(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, testBatchId) + props + } + + private def propsWithQueryIdOnly(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props + } + + test("SPARK-56326: constructStreamingLogEntry with String - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), "test message") + + assertResult(s"[queryId = ${testQueryId.take(5)}] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry with String - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message") + + assertResult(s"[queryId = ${testQueryId.take(5)}] test message")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + } + + test("SPARK-56326: constructStreamingLogEntry with String - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(new Properties(), "test message") + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(null, "test message") + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult(s"[queryId = ${testQueryId.take(5)}] " + + s"[batchId = $testBatchId] test message Dummy Context")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithQueryIdOnly(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult(s"[queryId = ${testQueryId.take(5)}] test message Dummy Context")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(new Properties(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(null, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index adcb57a0187a4..3b66945d89a90 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2867,6 +2867,57 @@ class TaskSetManagerSuite assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun) } + test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " + + "messages") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val testQueryId = "test-query-id-1234" + val testBatchId = "42" + // Create a TaskSet with a non-null Properties containing the streaming metadata. + val properties = new Properties() + properties.setProperty("sql.streaming.queryId", testQueryId) + properties.setProperty("streaming.sql.batchId", testBatchId) + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val clock = new ManualClock + val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000) + val loggerName = classOf[TaskSetManager].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + + // resourceOffer triggers prepareLaunchingTask which logs "Starting ..." + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + + clock.advance(1) + // handleSuccessfulTask logs "Finished ..." + manager.handleSuccessfulTask(0, createTaskResult(0)) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + + val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + + // Verify the "Starting" log line includes query Id and batch Id + val startingLogs = logs.filter(msg => + msg.contains("Starting") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(startingLogs.nonEmpty, + s"Expected 'Starting' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + + // Verify the "Finished" log line includes query Id and batch Id + val finishedLogs = logs.filter(msg => + msg.contains("Finished") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(finishedLogs.nonEmpty, + s"Expected 'Finished' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + } + } class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) {