Skip to content

Commit 5172a0d

Browse files
Dynamically skip sharding L0 when SAI Vector index present
This is a partial solution to IllegalStateException thrown by VectorPostings. It works by using a single shard at L0 when a vector index is present. As noted in the jira ticket, there are edge cases that may still produce errors, notably the case where there are multiple data directories. The key trade offs here are related to the time complexity for search. Since graph search is log(n), and searching m graphs is m * log(n), we see better search performance by building bigger graphs which is essentially log(m * n). We could pre-shard, which comes at a cost of increased search time complexity. patch by Michael Marshall,Dmitry Konstantinov; reviewed by Caleb Rackliffe,Dmitry Konstantinov,Michael Semb Wever for CASSANDRA-19661 Co-authored-by: Michael Marshall <mmarshall@apache.org> Co-authored-by: Dmitry Konstantinov <netudima@gmail.com>
1 parent 53118ba commit 5172a0d

6 files changed

Lines changed: 56 additions & 1 deletion

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.0.7
2+
* Dynamically skip sharding L0 when SAI Vector index present (CASSANDRA-19661)
23
* Optionally force IndexStatusManager to use the optimized index status format (CASSANDRA-21132)
34
* No need to evict already prepared statements, as it creates a race condition between multiple threads (CASSANDRA-17401)
45
* Upgrade logback version to 1.5.18 and slf4j dependencies to 2.0.17 (CASSANDRA-21137)

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,9 @@ public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
270270
{
271271
ShardManager shardManager = getShardManager();
272272
double flushDensity = cfs.metric.flushSizeOnDisk.get() * shardManager.shardSetCoverage() / shardManager.localSpaceCoverage();
273-
ShardTracker boundaries = shardManager.boundaries(controller.getNumShards(flushDensity));
273+
boolean supportsSharding = sstableLevel > 0 || indexGroups.stream().allMatch(Index.Group::supportsL0Shards);
274+
int numShards = supportsSharding ? controller.getNumShards(flushDensity) : 1;
275+
ShardTracker boundaries = shardManager.boundaries(numShards);
274276
return new ShardedMultiWriter(cfs,
275277
descriptor,
276278
keyCount,

src/java/org/apache/cassandra/index/Index.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,15 @@ default boolean validateSSTableAttachedIndexes(Collection<SSTableReader> sstable
890890
{
891891
return true;
892892
}
893+
894+
/**
895+
* Whether this index group supports sharding when flushing memtables, e.g. level 0 of UCS.
896+
* @return true iff all indexes in the group support L0 sharding.
897+
*/
898+
default boolean supportsL0Shards()
899+
{
900+
return true;
901+
}
893902
}
894903

895904
/**

src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,15 @@ public BooleanSupplier isIndexValid()
703703
return () -> valid;
704704
}
705705

706+
/**
707+
* Vector indexes do not supporrt L0 shards due to the cost associated with resharding at flush time.
708+
* @return true iff the index supports sharding at L0.
709+
*/
710+
public boolean supportsL0Shards()
711+
{
712+
return !indexTermType.isVector();
713+
}
714+
706715
public boolean hasClustering()
707716
{
708717
return baseCfs.getComparator().size() > 0;

src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,17 @@ else if (throwOnIncomplete)
382382
return complete;
383383
}
384384

385+
@Override
386+
public boolean supportsL0Shards()
387+
{
388+
for (StorageAttachedIndex index : indexes)
389+
if (!index.supportsL0Shards())
390+
return false;
391+
392+
// All indexes must support L0 sharding for the flush to shard at L0
393+
return true;
394+
}
395+
385396
/**
386397
* open index files by checking number of {@link SSTableContext} and {@link SSTableIndex},
387398
* so transient open files during validation and files that are still open for in-flight requests will not be tracked.

test/unit/org/apache/cassandra/index/sai/cql/VectorLocalTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,29 @@ public void multipleNonAnnSegmentsTest() throws Throwable
511511
}
512512
}
513513

514+
@Test
515+
public void flushSuccessfullyVectorIndexToShardedSSTable()
516+
{
517+
// UCS is configured to use 2 static shards
518+
createTable(String.format("CREATE TABLE %%s (k int PRIMARY KEY, v vector<float, %d>) WITH compaction = {\n" +
519+
" 'class': 'UnifiedCompactionStrategy',\n" +
520+
" 'base_shard_count': '2',\n" +
521+
" 'min_sstable_size' : '0MiB', \n" +
522+
" 'sstable_growth' : '1'\n" +
523+
"}", word2vec.dimension()));
524+
createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'");
525+
526+
527+
int vectorCount = 100;
528+
List<float[]> vectors = IntStream.range(0, vectorCount).mapToObj(s -> randomVector()).collect(Collectors.toList());
529+
530+
int pk = 0;
531+
for (float[] vector : vectors)
532+
execute("INSERT INTO %s (k, v) VALUES (?," + vectorString(vector) + ")", pk++);
533+
534+
flush();
535+
}
536+
514537
private UntypedResultSet search(String stringValue, float[] queryVector, int limit)
515538
{
516539
UntypedResultSet result = execute("SELECT * FROM %s WHERE str_val = '" + stringValue + "' ORDER BY val ann of " + Arrays.toString(queryVector) + " LIMIT " + limit);

0 commit comments

Comments
 (0)