Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,50 @@ trait CommonStringExprs {
None
}
}

def minutesOfTimeToProto(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
val childOpt = expr.children.headOption.orElse {
withInfo(expr, "MinutesOfTime has no child expression")
None
}

childOpt.flatMap { child =>
val timeZoneId = {
val exprClass = expr.getClass
try {
val timeZoneIdMethod = exprClass.getMethod("timeZoneId")
timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]]
} catch {
case _: NoSuchMethodException =>
try {
val timeZoneIdField = exprClass.getField("timeZoneId")
timeZoneIdField.get(expr).asInstanceOf[Option[String]]
} catch {
case _: NoSuchFieldException | _: SecurityException => None
}
}
}

exprToProtoInternal(child, inputs, binding)
.map { childExpr =>
val builder = ExprOuterClass.Minute.newBuilder()
builder.setChild(childExpr)

val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)

ExprOuterClass.Expr
.newBuilder()
.setMinute(builder)
.build()
}
.orElse {
withInfo(expr, child)
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
Expand All @@ -43,6 +44,9 @@ trait CometExprShim extends CommonStringExprs {
// Right child is the encoding expression.
stringDecode(expr, s.charset, s.bin, inputs, binding)

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _ if expr.getClass.getSimpleName == "MinutesOfTime" =>
minutesOfTimeToProto(expr, inputs, binding)

case _ => None
}
}
Expand Down
17 changes: 17 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("MinutesOfTime expression support") {
// This test verifies that minute() function works correctly with timestamp columns.
// If Spark generates MinutesOfTime expression (a RuntimeReplaceable expression),
// it will be handled by the version-specific shim and converted to Minute proto.
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000)
readParquetFile(path.toString) { df =>
val query = df.select(expr("minute(_1)"))

checkSparkAnswerAndOperator(query)
}
}
}
}

test("hour on int96 timestamp column") {
import testImplicits._

Expand Down