Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.MessageWithContext
import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -58,6 +59,40 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}

private[spark] object SchedulableBuilder extends Logging {
val QUERY_ID_KEY = "sql.streaming.queryId"
val BATCH_ID_KEY = "streaming.sql.batchId"

/**
* Helper method used to generate a logging prefix containing the query Id and batch Id
* when they are set. These properties are only set for streaming queries and are used to
* aid in debugging multiple streaming queries running at the same time.
*
* @param properties the task properties to check for query Id and batch Id
* @return a log prefix containing the query Id and batch Id when both are present and
* non-null; otherwise, an empty prefix
*/
def schedulingLogStreamingContext(properties: Properties): MessageWithContext = {
if (properties == null) {
return log""
}

val queryId = Option(properties.getProperty(QUERY_ID_KEY))
val batchId = Option(properties.getProperty(BATCH_ID_KEY))

queryId match {
case Some(qId) =>
val prefix = log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] "
batchId.fold(prefix)(bId =>
prefix + log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] "
)
case _ =>
// Query id not set; not a streaming query so nothing to add
log""
}
}
}

private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging {

Expand Down Expand Up @@ -227,7 +262,9 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
}
parentPool.addSchedulable(manager)
logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " +

logInfo(SchedulableBuilder.schedulingLogStreamingContext(properties) +
log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " +
log"${MDC(LogKeys.POOL_NAME, poolName)}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val tName = taskName(taskId)
logInfo(log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," +
logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) +
log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," +
log"executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}, " +
log"partition ${MDC(PARTITION_ID, task.partitionId)}, " +
log"${MDC(TASK_LOCALITY, taskLocality)}, " +
Expand Down Expand Up @@ -865,7 +866,8 @@ private[spark] class TaskSetManager(
}
if (!successful(index)) {
tasksSuccessful += 1
logInfo(log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " +
logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) +
log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " +
log"${MDC(DURATION, info.duration)} ms on ${MDC(HOST, info.host)} " +
log"(executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}) " +
log"(${MDC(NUM_SUCCESSFUL_TASKS, tasksSuccessful)}/${MDC(NUM_TASKS, numTasks)})")
Expand Down
38 changes: 38 additions & 0 deletions core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,44 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("SPARK-56326: Fair Scheduler addTaskSetManager logs include " +
"streaming query Id and batch Id") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext(LOCAL, APP_NAME, conf)
val taskScheduler = new TaskSchedulerImpl(sc)

val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()

val testQueryId = "test-query-id-5678"
val testBatchId = "99"
val properties = new Properties()
properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1")
properties.setProperty("sql.streaming.queryId", testQueryId)
properties.setProperty("streaming.sql.batchId", testBatchId)

val taskSetManager = createTaskSetManager(0, 1, taskScheduler)

val logAppender = new LogAppender("pool streaming logs", maxEvents = 1000)
val loggerName = classOf[FairSchedulableBuilder].getName

withLogAppender(logAppender, loggerNames = Seq(loggerName)) {
schedulableBuilder.addTaskSetManager(taskSetManager, properties)
}

val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
val expectedBatchPrefix = s"[batchId = $testBatchId]"
val addedLogs = logs.filter(msg =>
msg.contains("Added task set") &&
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
assert(addedLogs.nonEmpty,
s"Expected 'Added task set' log to contain '$expectedQueryPrefix' " +
s"and '$expectedBatchPrefix'.\nCaptured logs:\n${logs.mkString("\n")}")
}

private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
val selectedPool = rootPool.getSchedulableByName(poolName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.util.Properties

import org.apache.spark.SparkFunSuite
import org.apache.spark.scheduler.SchedulableBuilder.{BATCH_ID_KEY, QUERY_ID_KEY}

class SchedulableBuilderSuite extends SparkFunSuite {

test("schedulingLogStreamingContext - both queryId and batchId present") {
val props = new Properties()
val queryId = "12345-test-query-id"
val batchId = "23"

props.setProperty(QUERY_ID_KEY, queryId)
props.setProperty(BATCH_ID_KEY, batchId)

val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)

// log context should include the truncated query Id and the provided batch Id
assertResult("[queryId = 12345] [batchId = 23] ")(logContext.message)
}

test("schedulingLogStreamingContext - only queryId and no batchId present") {
val props = new Properties()
val queryId = "12345-test-query-id"

props.setProperty(QUERY_ID_KEY, queryId)

val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)

// log context should include the truncated query Id but no batch Id
assertResult("[queryId = 12345] ")(logContext.message)
}

test("schedulingLogStreamingContext - no queryId or batchId present") {
val props = new Properties()

val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)

// log context should be an empty string
assertResult("")(logContext.message)
}

test("schedulingLogStreamingContext - handles null properties") {
val logContext = SchedulableBuilder.schedulingLogStreamingContext(null)

// log context should be an empty string
assertResult("")(logContext.message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,57 @@ class TaskSetManagerSuite
assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun)
}

test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " +
"messages") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc)
val testQueryId = "test-query-id-1234"
val testBatchId = "42"
// Create a TaskSet with a non-null Properties containing the streaming metadata.
val properties = new Properties()
properties.setProperty("sql.streaming.queryId", testQueryId)
properties.setProperty("streaming.sql.batchId", testBatchId)
val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)),
0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)

val clock = new ManualClock
val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000)
val loggerName = classOf[TaskSetManager].getName

withLogAppender(logAppender, loggerNames = Seq(loggerName)) {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)

// resourceOffer triggers prepareLaunchingTask which logs "Starting ..."
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined)

clock.advance(1)
// handleSuccessfulTask logs "Finished ..."
manager.handleSuccessfulTask(0, createTaskResult(0))
}

val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)

val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
val expectedBatchPrefix = s"[batchId = $testBatchId]"

// Verify the "Starting" log line includes query Id and batch Id
val startingLogs = logs.filter(msg =>
msg.contains("Starting") &&
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
assert(startingLogs.nonEmpty,
s"Expected 'Starting' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." +
s"\nCaptured logs:\n${logs.mkString("\n")}")

// Verify the "Finished" log line includes query Id and batch Id
val finishedLogs = logs.filter(msg =>
msg.contains("Finished") &&
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
assert(finishedLogs.nonEmpty,
s"Expected 'Finished' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." +
s"\nCaptured logs:\n${logs.mkString("\n")}")
}

}

class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) {
Expand Down