Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit ace3189

Browse files
author
Chungmin Lee
committed
Data Skipping Index Part 5: ValueListSketch
1 parent f8202d9 commit ace3189

10 files changed

Lines changed: 950 additions & 4 deletions

File tree

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (2021) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.dataskipping.sketch
18+
19+
import org.apache.spark.sql.catalyst.expressions._
20+
import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet
21+
import org.apache.spark.sql.catalyst.util.TypeUtils
22+
import org.apache.spark.sql.types.{ArrayType, DataType}
23+
24+
import com.microsoft.hyperspace.index.dataskipping.util._
25+
import com.microsoft.hyperspace.index.dataskipping.util.ArrayUtils.toArray
26+
27+
/**
28+
* Sketch based on distinct values for a given expression.
29+
*
30+
* This is not really a sketch, as it stores all distinct values for a given
31+
* expression. It can be useful when the number of distinct values is expected to
32+
* be small and each file tends to store only a subset of the values.
33+
*/
34+
case class ValueListSketch(
35+
override val expr: String,
36+
override val dataType: Option[DataType] = None)
37+
extends SingleExprSketch[ValueListSketch](expr, dataType) {
38+
override def name: String = "ValueList"
39+
40+
override def withNewExpression(newExpr: (String, Option[DataType])): ValueListSketch = {
41+
copy(expr = newExpr._1, dataType = newExpr._2)
42+
}
43+
44+
override def aggregateFunctions: Seq[Expression] =
45+
new ArraySort(CollectSet(parsedExpr).toAggregateExpression()) :: Nil
46+
47+
override def convertPredicate(
48+
predicate: Expression,
49+
sketchValues: Seq[Expression],
50+
nameMap: Map[ExprId, String],
51+
resolvedExprs: Seq[Expression]): Option[Expression] = {
52+
val valueList = sketchValues(0)
53+
val min = ElementAt(valueList, Literal(1))
54+
val max = ElementAt(valueList, Literal(-1))
55+
// TODO: Consider shared sketches
56+
// HasNullSketch as described in MinMaxSketch.convertPredicate
57+
// can be useful for ValueListSketch too, as it can be used to
58+
// to optimize Not(EqualTo) as well as IsNull.
59+
val resolvedExpr = resolvedExprs.head
60+
val dataType = resolvedExpr.dataType
61+
val ordering = TypeUtils.getInterpretedOrdering(dataType)
62+
val exprMatcher = NormalizedExprMatcher(resolvedExpr, nameMap)
63+
val ExprIsTrue = IsTrueExtractor(exprMatcher)
64+
val ExprIsFalse = IsFalseExtractor(exprMatcher)
65+
val ExprIsNotNull = IsNotNullExtractor(exprMatcher)
66+
val ExprEqualTo = EqualToExtractor(exprMatcher)
67+
val ExprLessThan = LessThanExtractor(exprMatcher)
68+
val ExprLessThanOrEqual = LessThanOrEqualToExtractor(exprMatcher)
69+
val ExprGreaterThan = GreaterThanExtractor(exprMatcher)
70+
val ExprGreaterThanOrEqual = GreaterThanOrEqualToExtractor(exprMatcher)
71+
val ExprIn = InExtractor(exprMatcher)
72+
val ExprInSet = InSetExtractor(exprMatcher)
73+
def Empty(arr: Expression) = EqualTo(Size(arr), Literal(0))
74+
Option(predicate).collect {
75+
case ExprIsTrue() => ArrayContains(valueList, Literal(true))
76+
case ExprIsFalse() => ArrayContains(valueList, Literal(false))
77+
case ExprIsNotNull() => Not(Empty(valueList))
78+
case ExprEqualTo(v) => SortedArrayContains(valueList, v)
79+
case Not(ExprEqualTo(v)) =>
80+
Or(
81+
GreaterThan(Size(valueList), Literal(1)),
82+
Not(EqualTo(ElementAt(valueList, Literal(1)), v)))
83+
case ExprLessThan(v) => LessThan(min, v)
84+
case ExprLessThanOrEqual(v) => LessThanOrEqual(min, v)
85+
case ExprGreaterThan(v) => GreaterThan(max, v)
86+
case ExprGreaterThanOrEqual(v) => GreaterThanOrEqual(max, v)
87+
case ExprIn(vs) =>
88+
SortedArrayContainsAny(valueList, toArray(vs.map(_.eval()).sorted(ordering), dataType))
89+
case ExprInSet(vs) =>
90+
SortedArrayContainsAny(
91+
valueList,
92+
toArray(vs.filter(_ != null).toArray.sorted(ordering), dataType))
93+
// TODO: StartsWith, Like with constant prefix
94+
}
95+
}
96+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (2021) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.dataskipping.util
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, Predicate}
21+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
22+
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
23+
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
24+
import org.apache.spark.sql.types.BooleanType
25+
26+
/**
27+
* Returns true if the sorted array (left) might contain the value (right).
28+
*
29+
* The array must not be null.
30+
* Elements in the array must be in ascending order.
31+
* The array must not contain null elements.
32+
* The array must not contain duplicate elements.
33+
* The value must not be null.
34+
*/
35+
case class SortedArrayContains(left: Expression, right: Expression)
36+
extends BinaryExpression
37+
with Predicate {
38+
39+
override def prettyName: String = "sorted_array_contains"
40+
41+
@transient private lazy val ordering: Ordering[Any] =
42+
TypeUtils.getInterpretedOrdering(right.dataType)
43+
44+
override def nullable: Boolean = false
45+
46+
override def eval(input: InternalRow): Boolean = {
47+
val arr = left.eval(input).asInstanceOf[ArrayData]
48+
val value = right.eval(input)
49+
val dt = right.dataType
50+
val n = arr.numElements()
51+
if (n > 0 &&
52+
ordering.lteq(arr.get(0, dt), value) &&
53+
ordering.lteq(value, arr.get(n - 1, dt))) {
54+
val (found, _) = SortedArrayUtils.binarySearch(arr, dt, ordering, 0, n, value)
55+
if (found) return true
56+
}
57+
false
58+
}
59+
60+
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
61+
val leftGen = left.genCode(ctx)
62+
val arr = leftGen.value
63+
val rightGen = right.genCode(ctx)
64+
val value = rightGen.value
65+
val dt = right.dataType
66+
val n = ctx.freshName("n")
67+
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
68+
val resultCode =
69+
s"""
70+
|int $n = $arr.numElements();
71+
|if ($n > 0 &&
72+
| !(${ctx.genGreater(dt, CodeGenerator.getValue(arr, dt, "0"), value)}) &&
73+
| !(${ctx.genGreater(dt, value, CodeGenerator.getValue(arr, dt, s"$n - 1"))})) {
74+
| ${ev.value} = $binarySearch($arr, 0, $n, $value).found();
75+
|}
76+
""".stripMargin
77+
ev.copy(
78+
code = code"""
79+
${leftGen.code}
80+
${rightGen.code}
81+
boolean ${ev.value} = false;
82+
$resultCode""",
83+
isNull = FalseLiteral)
84+
}
85+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (2021) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.dataskipping.util
18+
19+
import org.apache.spark.sql.catalyst.InternalRow
20+
import org.apache.spark.sql.catalyst.expressions.{Expression, Predicate, UnaryExpression}
21+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral}
22+
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
23+
import org.apache.spark.sql.catalyst.util.{ArrayData, TypeUtils}
24+
import org.apache.spark.sql.types.{ArrayType, BooleanType}
25+
26+
/**
27+
* Returns true if the sorted array (child) contains any of the values.
28+
*
29+
* If either array is empty, false is returned.
30+
*
31+
* Both arrays must not be null.
32+
* Elements in the arrays must be in ascending order.
33+
* The left array should not contain duplicate elements.
34+
* The arrays must not contain null elements.
35+
*
36+
* If the element type can be represented as a primitive type in Scala,
37+
* then the right array must be an array of the primitive type.
38+
*/
39+
case class SortedArrayContainsAny(child: Expression, values: Any)
40+
extends UnaryExpression
41+
with Predicate {
42+
43+
override def prettyName: String = "sorted_array_contains_any"
44+
45+
@transient private lazy val ordering: Ordering[Any] =
46+
TypeUtils.getInterpretedOrdering(child.dataType.asInstanceOf[ArrayType].elementType)
47+
48+
override def nullable: Boolean = false
49+
50+
override def eval(input: InternalRow): Boolean = {
51+
val arr1 = child.eval(input).asInstanceOf[ArrayData]
52+
val arr2 = values.asInstanceOf[Array[_]]
53+
val dt = child.dataType.asInstanceOf[ArrayType].elementType
54+
val n = arr1.numElements()
55+
val m = arr2.length
56+
if (n > 0 && m > 0 &&
57+
ordering.lteq(arr1.get(0, dt), arr2(m - 1)) &&
58+
ordering.lteq(arr2(0), arr1.get(n - 1, dt))) {
59+
var i = 0
60+
var j = 0
61+
do {
62+
val v = arr1.get(i, dt)
63+
while (j < m && ordering.lt(arr2(j), v)) j += 1
64+
if (j == m) return false
65+
val u = arr2(j)
66+
j += 1
67+
val (found, k) = SortedArrayUtils.binarySearch(arr1, dt, ordering, i, n, u)
68+
if (found) return true
69+
if (k == n) return false
70+
i = k
71+
} while (j < m)
72+
}
73+
false
74+
}
75+
76+
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
77+
val childGen = child.genCode(ctx)
78+
val arr1 = childGen.value
79+
val arr2 = ctx.freshName("values")
80+
val dt = child.dataType.asInstanceOf[ArrayType].elementType
81+
val javaType = CodeGenerator.javaType(dt)
82+
val arrayType = if (values.isInstanceOf[Array[Any]]) "java.lang.Object[]" else s"$javaType[]"
83+
val valuesRef = ctx.addReferenceObj("values", values, arrayType)
84+
val n = ctx.freshName("n")
85+
val m = ctx.freshName("m")
86+
val i = ctx.freshName("i")
87+
val j = ctx.freshName("j")
88+
val v = ctx.freshName("v")
89+
val u = ctx.freshName("u")
90+
val result = ctx.freshName("result")
91+
val binarySearchResultType =
92+
SortedArrayUtils.BinarySearchResult.getClass.getCanonicalName.stripSuffix("$")
93+
val binarySearch = SortedArrayUtils.binarySearchCodeGen(ctx, dt)
94+
import CodeGenerator.getValue
95+
val resultCode =
96+
s"""
97+
|int $n = $arr1.numElements();
98+
|int $m = $arr2.length;
99+
|if ($n > 0 && $m > 0 &&
100+
| !(${ctx.genGreater(dt, getValue(arr1, dt, "0"), s"(($javaType) $arr2[$m - 1])")}) &&
101+
| !(${ctx.genGreater(dt, s"(($javaType) $arr2[0])", getValue(arr1, dt, s"$n - 1"))})) {
102+
| int $i = 0;
103+
| int $j = 0;
104+
| do {
105+
| $javaType $v = ${getValue(arr1, dt, i)};
106+
| while ($j < $m && ${ctx.genGreater(dt, v, s"(($javaType) $arr2[$j])")}) $j += 1;
107+
| if ($j == $m) break;
108+
| $javaType $u = ($javaType) $arr2[$j];
109+
| $j += 1;
110+
| $binarySearchResultType $result = $binarySearch($arr1, $i, $n, $u);
111+
| if ($result.found()) {
112+
| ${ev.value} = true;
113+
| break;
114+
| }
115+
| if ($result.index() == $n) break;
116+
| $i = $result.index();
117+
| } while ($j < $m);
118+
|}
119+
""".stripMargin
120+
ev.copy(
121+
code = code"""
122+
${childGen.code}
123+
$arrayType $arr2 = $valuesRef;
124+
boolean ${ev.value} = false;
125+
$resultCode""",
126+
isNull = FalseLiteral)
127+
}
128+
129+
override def equals(that: Any): Boolean = {
130+
that match {
131+
case SortedArrayContainsAny(thatChild, thatValues) =>
132+
child == thatChild &&
133+
values.asInstanceOf[Array[_]].sameElements(thatValues.asInstanceOf[Array[_]])
134+
case _ => false
135+
}
136+
}
137+
138+
override def hashCode: Int = {
139+
(child, values.asInstanceOf[Array[_]].toSeq).hashCode
140+
}
141+
}

src/test/scala/com/microsoft/hyperspace/index/dataskipping/DataSkippingIndexConfigTest.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package com.microsoft.hyperspace.index.dataskipping
1818

1919
import org.apache.hadoop.fs.Path
20-
import org.apache.spark.sql.functions.{input_file_name, max, min}
21-
import org.apache.spark.sql.types.{LongType, StringType}
20+
import org.apache.spark.sql.functions.{array_sort, collect_set, input_file_name, max, min}
21+
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
2222
import org.apache.spark.util.sketch.BloomFilter
2323

2424
import com.microsoft.hyperspace.HyperspaceException
@@ -86,6 +86,19 @@ class DataSkippingIndexConfigTest extends DataSkippingSuite with BloomFilterTest
8686
checkAnswer(indexData, withFileId(expectedSketchValues))
8787
}
8888

89+
test("createIndex works correctly with a ValueListSketch.") {
90+
val sourceData =
91+
createSourceData(spark.range(100).selectExpr("cast(id / 10 as int) as A").toDF)
92+
val indexConfig = DataSkippingIndexConfig("MyIndex", ValueListSketch("A"))
93+
val (index, indexData) = indexConfig.createIndex(ctx, sourceData, Map())
94+
assert(index.sketches === Seq(ValueListSketch("A", Some(IntegerType))))
95+
val expectedSketchValues = sourceData
96+
.groupBy(input_file_name().as(fileNameCol))
97+
.agg(array_sort(collect_set("A")))
98+
checkAnswer(indexData, withFileId(expectedSketchValues))
99+
assert(indexData.columns === Seq(IndexConstants.DATA_FILE_NAME_ID, "ValueList_A__0"))
100+
}
101+
89102
test("createIndex works correctly with a BloomFilterSketch.") {
90103
val sourceData = createSourceData(spark.range(100).toDF("A"))
91104
val indexConfig = DataSkippingIndexConfig("MyIndex", BloomFilterSketch("A", 0.001, 20))

0 commit comments

Comments
 (0)