Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit 0275d41

Browse files
committed
Add join v2 rule
1 parent 661df17 commit 0275d41

13 files changed

Lines changed: 1428 additions & 338 deletions

src/main/scala/com/microsoft/hyperspace/index/IndexConstants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ object IndexConstants {
5252
val INDEX_FILTER_RULE_USE_BUCKET_SPEC = "spark.hyperspace.index.filterRule.useBucketSpec"
5353
val INDEX_FILTER_RULE_USE_BUCKET_SPEC_DEFAULT = "false"
5454

55+
// Config used to enable join v2 rule.
56+
val INDEX_JOIN_V2_RULE_ENABLED = "spark.hyperspace.index.joinv2.enabled"
57+
val INDEX_JOIN_V2_RULE_ENABLED_DEFAULT = "false"
58+
5559
// TODO: Remove dev config when nested column is fully supported.
5660
val DEV_NESTED_COLUMN_ENABLED = "spark.hyperspace.dev.index.nestedColumn.enabled"
5761
val DEV_NESTED_COLUMN_ENABLED_DEFAULT = "false"

src/main/scala/com/microsoft/hyperspace/index/covering/JoinIndexRule.scala

Lines changed: 11 additions & 316 deletions
Large diffs are not rendered by default.
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
/*
2+
* Copyright (2021) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.covering
18+
19+
import org.apache.spark.sql.catalyst.expressions.Expression
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
22+
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
23+
import com.microsoft.hyperspace.index.plananalysis.FilterReasons
24+
import com.microsoft.hyperspace.index.rules.{HyperspaceRule, IndexRankFilter, IndexTypeFilter, QueryPlanIndexFilter, RuleUtils}
25+
import com.microsoft.hyperspace.index.rules.ApplyHyperspace.{PlanToIndexesMap, PlanToSelectedIndexMap}
26+
import com.microsoft.hyperspace.index.sources.FileBasedRelation
27+
import com.microsoft.hyperspace.shim.JoinWithoutHint
28+
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEventLogging, HyperspaceIndexUsageEvent}
29+
import com.microsoft.hyperspace.util.HyperspaceConf
30+
import com.microsoft.hyperspace.util.ResolverUtils.resolve
31+
32+
/**
33+
* JoinPlanNodeFilter filters indexes if
34+
* 1) the given plan is not eligible join plan.
35+
* 1-1) Join does not have condition.
36+
* 1-2) (removed) ~~Left or Right child is not linear plan.~~
37+
* 1-3) Join condition is not eligible - only Equi-joins and simple CNF form are supported.
38+
* 2) the source plan of indexes is not part of the join (neither Left nor Right).
39+
*/
40+
object JoinV2PlanNodeFilter extends QueryPlanIndexFilter with JoinQueryPlanIndexFilter {
41+
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
42+
if (candidateIndexes.isEmpty) {
43+
return Map.empty
44+
}
45+
46+
plan match {
47+
case JoinWithoutHint(l, r, _, Some(condition)) =>
48+
val left = RuleUtils.getRelation(spark, l)
49+
val right = RuleUtils.getRelation(spark, r)
50+
51+
if (!(left.isDefined && right.isDefined && !RuleUtils.isIndexApplied(
52+
left.get) && !RuleUtils
53+
.isIndexApplied(right.get))) {
54+
return Map.empty
55+
}
56+
57+
val leftAndRightIndexes =
58+
candidateIndexes.getOrElse(left.get.plan, Nil) ++ candidateIndexes
59+
.getOrElse(right.get.plan, Nil)
60+
61+
val sortMergeJoinCond = withFilterReasonTag(
62+
plan,
63+
leftAndRightIndexes,
64+
FilterReasons.NotEligibleJoin("Not SortMergeJoin")) {
65+
isSortMergeJoin(spark, plan)
66+
}
67+
68+
val joinConditionCond = withFilterReasonTag(
69+
plan,
70+
leftAndRightIndexes,
71+
FilterReasons.NotEligibleJoin("Non equi-join or has literal")) {
72+
isJoinConditionSupported(condition)
73+
}
74+
75+
val leftPlanLinearCond = isPlanLinear(l)
76+
val rightPlanLinearCond = isPlanLinear(r)
77+
78+
if (sortMergeJoinCond && joinConditionCond && (leftPlanLinearCond || rightPlanLinearCond)) {
79+
// check left or right
80+
if (leftPlanLinearCond) {
81+
JoinIndexV2Rule.leftRelation.set(left.get)
82+
}
83+
if (rightPlanLinearCond) {
84+
JoinIndexV2Rule.rightRelation.set(right.get)
85+
}
86+
87+
JoinIndexV2Rule.joinCondition.set(condition)
88+
(candidateIndexes.get(left.get.plan).map(lIndexes => left.get.plan -> lIndexes) ++
89+
candidateIndexes
90+
.get(right.get.plan)
91+
.map(rIndexes => right.get.plan -> rIndexes)).toMap
92+
} else {
93+
Map.empty
94+
}
95+
case JoinWithoutHint(_, _, _, None) =>
96+
setFilterReasonTag(
97+
plan,
98+
candidateIndexes.values.flatten.toSeq,
99+
FilterReasons.NotEligibleJoin("No join condition"))
100+
Map.empty
101+
case _ =>
102+
Map.empty
103+
}
104+
}
105+
}
106+
107+
/**
108+
* JoinAttributeFilter filters indexes out if
109+
* 1) each join condition column should com from relations directly
110+
* 2) attributes from left plan must exclusively have one-to-one mapping with attribute
111+
* from attributes from right plan.
112+
*/
113+
object JoinV2AttributeFilter extends JoinQueryPlanIndexFilter {
114+
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
115+
if (candidateIndexes.isEmpty) {
116+
return Map.empty
117+
}
118+
119+
if (withFilterReasonTag(
120+
plan,
121+
candidateIndexes.flatMap(_._2).toSeq,
122+
FilterReasons.NotEligibleJoin("incompatible left and right join columns")) {
123+
ensureAttributeRequirements(
124+
JoinIndexV2Rule.leftRelation.get,
125+
JoinIndexV2Rule.rightRelation.get,
126+
JoinIndexV2Rule.joinCondition.get)
127+
}) {
128+
candidateIndexes
129+
} else {
130+
Map.empty
131+
}
132+
}
133+
134+
}
135+
136+
/**
137+
* JoinColumnFilter filters indexes out if
138+
* 1) an index does not contain all required columns
139+
* 2) all join column should be the indexed columns of an index
140+
*/
141+
object JoinV2ColumnFilter extends JoinQueryPlanIndexFilter {
142+
override def apply(plan: LogicalPlan, candidateIndexes: PlanToIndexesMap): PlanToIndexesMap = {
143+
if (candidateIndexes.isEmpty || candidateIndexes.size != 2) {
144+
return Map.empty
145+
}
146+
147+
val leftRelation = JoinIndexV2Rule.leftRelation.get
148+
val rightRelation = JoinIndexV2Rule.rightRelation.get
149+
150+
val lBaseAttrs = leftRelation.plan.output.map(_.name)
151+
val rBaseAttrs = rightRelation.plan.output.map(_.name)
152+
153+
// Map of left resolved columns with their corresponding right resolved
154+
// columns from condition.
155+
val lRMap = getLRColumnMapping(lBaseAttrs, rBaseAttrs, JoinIndexV2Rule.joinCondition.get)
156+
JoinIndexV2Rule.leftToRightColumnMap.set(lRMap)
157+
val lRequiredIndexedCols = lRMap.keys.toSeq
158+
val rRequiredIndexedCols = lRMap.values.toSeq
159+
160+
plan match {
161+
case JoinWithoutHint(l, r, _, _) =>
162+
// All required columns resolved with base relation.
163+
val lRequiredAllCols = resolve(spark, allRequiredCols(l), lBaseAttrs).get
164+
val rRequiredAllCols = resolve(spark, allRequiredCols(r), rBaseAttrs).get
165+
166+
// Make sure required indexed columns are subset of all required columns.
167+
assert(
168+
resolve(spark, lRequiredIndexedCols, lRequiredAllCols).isDefined &&
169+
resolve(spark, rRequiredIndexedCols, rRequiredAllCols).isDefined)
170+
171+
val lIndexes =
172+
getUsableIndexes(
173+
plan,
174+
candidateIndexes.getOrElse(leftRelation.plan, Nil),
175+
lRequiredIndexedCols,
176+
lRequiredAllCols,
177+
"left")
178+
val rIndexes =
179+
getUsableIndexes(
180+
plan,
181+
candidateIndexes.getOrElse(rightRelation.plan, Nil),
182+
rRequiredIndexedCols,
183+
rRequiredAllCols,
184+
"right")
185+
186+
Map(leftRelation.plan -> lIndexes, rightRelation.plan -> rIndexes)
187+
}
188+
}
189+
}
190+
191+
/**
192+
* JoinV2RankFilter selected the best applicable indexes.
193+
*/
194+
object JoinV2RankFilter extends IndexRankFilter {
195+
override def apply(plan: LogicalPlan, indexes: PlanToIndexesMap): PlanToSelectedIndexMap = {
196+
if (indexes.isEmpty) {
197+
return Map.empty
198+
}
199+
200+
val rPlan = JoinIndexV2Rule.rightRelation.get.plan
201+
val lPlan = JoinIndexV2Rule.leftRelation.get.plan
202+
203+
// Pick one index with the largest coverage.
204+
val lIndexes = indexes.get(lPlan)
205+
val rIndexes = indexes.get(rPlan)
206+
207+
if (lIndexes.isEmpty) {
208+
val candidate = mostLargestIndex(rPlan, rIndexes.get)
209+
setFilterReasonTagForRank(plan, rIndexes.get, candidate)
210+
Map(rPlan -> mostLargestIndex(rPlan, rIndexes.get))
211+
} else if (rIndexes.isEmpty) {
212+
val candidate = mostLargestIndex(lPlan, lIndexes.get)
213+
setFilterReasonTagForRank(plan, lIndexes.get, candidate)
214+
Map(lPlan -> candidate)
215+
} else {
216+
// Apply one index which has a larger data to reduce the shuffle/sorting time.
217+
// If we apply for both left having a different indexed columns and bucket number,
218+
// Spark optimizer might remove exchange of small side, and add a shuffle for large dataset.
219+
val lSize = JoinIndexV2Rule.leftRelation.get.allFileSizeInBytes
220+
val rSize = JoinIndexV2Rule.rightRelation.get.allFileSizeInBytes
221+
if (lSize > rSize) {
222+
val candidate = mostLargestIndex(lPlan, lIndexes.get)
223+
setFilterReasonTagForRank(plan, rIndexes.get, candidate)
224+
Map(lPlan -> candidate)
225+
} else {
226+
val candidate = mostLargestIndex(rPlan, rIndexes.get)
227+
setFilterReasonTagForRank(plan, lIndexes.get, candidate)
228+
Map(rPlan -> candidate)
229+
}
230+
}
231+
}
232+
233+
private def mostLargestIndex(
234+
plan: LogicalPlan,
235+
candidates: Seq[IndexLogEntry]): IndexLogEntry = {
236+
// On the other side, there's no applicable index.
237+
if (HyperspaceConf.hybridScanEnabled(spark)) {
238+
candidates.maxBy(
239+
_.getTagValue(plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES).getOrElse(0L))
240+
} else {
241+
candidates.maxBy(_.indexFilesSizeInBytes)
242+
}
243+
}
244+
}
245+
246+
/**
247+
* Rule to optimize a join between two indexed dataframes.
248+
*
249+
* This rule improves a SortMergeJoin performance by replacing data files with index files.
250+
* The index files being bucketed and sorted, will eliminate a full shuffle of the data
251+
* during a sort-merge-join operation.
252+
*
253+
* For e.g.
254+
* SELECT T1.A, T1.B, T2.C, T2.D FROM T1, T2 WHERE T1.A = T2.C
255+
* The above query can be optimized to use indexes if indexes of the following configs exist:
256+
* Index1: indexedColumns: T1.A, includedColumns: T1.B
257+
* Index2: indexedColumns: T2.C, includedColumns: T2.D
258+
*
259+
* These indexes are indexed by the join columns and can improve the query performance by
260+
* avoiding full shuffling of T1 and T2.
261+
*/
262+
object JoinIndexV2Rule extends HyperspaceRule with HyperspaceEventLogging {
263+
264+
override val filtersOnQueryPlan: Seq[QueryPlanIndexFilter] =
265+
IndexTypeFilter[CoveringIndex]() ::
266+
JoinV2PlanNodeFilter ::
267+
JoinV2AttributeFilter ::
268+
JoinV2ColumnFilter ::
269+
Nil
270+
271+
override val indexRanker: IndexRankFilter = JoinV2RankFilter
272+
273+
// Execution context
274+
var leftRelation: ThreadLocal[FileBasedRelation] = new ThreadLocal[FileBasedRelation]
275+
var rightRelation: ThreadLocal[FileBasedRelation] = new ThreadLocal[FileBasedRelation]
276+
var joinCondition: ThreadLocal[Expression] = new ThreadLocal[Expression]
277+
var leftToRightColumnMap: ThreadLocal[Map[String, String]] =
278+
new ThreadLocal[Map[String, String]]
279+
280+
override def applyIndex(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): LogicalPlan = {
281+
if (indexes.size != 1) {
282+
return plan
283+
}
284+
285+
plan match {
286+
case join @ JoinWithoutHint(l, r, _, _) =>
287+
val updatedPlan = if (indexes.contains(leftRelation.get.plan)) {
288+
val lIndex = indexes(leftRelation.get.plan)
289+
join
290+
.copy(left = CoveringIndexRuleUtils.transformPlanToUseIndex(
291+
spark,
292+
lIndex,
293+
l,
294+
useBucketSpec = true,
295+
useBucketUnionForAppended = true))
296+
297+
} else {
298+
val rIndex = indexes(rightRelation.get.plan)
299+
join
300+
.copy(right = CoveringIndexRuleUtils.transformPlanToUseIndex(
301+
spark,
302+
rIndex,
303+
r,
304+
useBucketSpec = true,
305+
useBucketUnionForAppended = true))
306+
}
307+
308+
logEvent(
309+
HyperspaceIndexUsageEvent(
310+
AppInfo(sparkContext.sparkUser, sparkContext.applicationId, sparkContext.appName),
311+
Seq(indexes.values.toSeq.head),
312+
join.toString,
313+
updatedPlan.toString,
314+
"Join index v2 rule applied."))
315+
updatedPlan
316+
}
317+
}
318+
319+
override def score(plan: LogicalPlan, indexes: PlanToSelectedIndexMap): Int = {
320+
if (indexes.size != 1) {
321+
return 0
322+
}
323+
324+
val targetRel = if (indexes.contains(leftRelation.get.plan)) {
325+
leftRelation.get
326+
} else {
327+
rightRelation.get
328+
}
329+
330+
def getCommonBytes(index: IndexLogEntry, relation: FileBasedRelation): Long = {
331+
index
332+
.getTagValue(relation.plan, IndexLogEntryTags.COMMON_SOURCE_SIZE_IN_BYTES)
333+
.getOrElse {
334+
relation.allFileInfos.foldLeft(0L) { (res, f) =>
335+
if (index.sourceFileInfoSet.contains(f)) {
336+
res + f.size // count, total bytes
337+
} else {
338+
res
339+
}
340+
}
341+
}
342+
}
343+
344+
val commonBytes = getCommonBytes(indexes.head._2, targetRel)
345+
346+
// TODO Enhance scoring function.
347+
// See https://github.com/microsoft/hyperspace/issues/444
348+
(60 * (commonBytes.toFloat / targetRel.allFileSizeInBytes)).round
349+
}
350+
}

0 commit comments

Comments
 (0)