|
| 1 | +#!/usr/bin/env python |
| 2 | +# Copyright ScyllaDB, Inc. |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | + |
| 16 | +""" |
| 17 | +Benchmark for VectorType serialization performance. |
| 18 | +
|
| 19 | +Tests different optimization strategies: |
| 20 | +1. Current implementation (Python io.BytesIO loop) |
| 21 | +2. Python struct.pack batch format string |
| 22 | +3. Cython SerVectorType serializer (when available) |
| 23 | +4. BoundStatement.bind() end-to-end with 1 vector column (when available) |
| 24 | +
|
| 25 | +Run with: python benchmarks/vector_serialize.py |
| 26 | +""" |
| 27 | + |
| 28 | +import sys |
| 29 | +import time |
| 30 | +import struct |
| 31 | + |
| 32 | +# Add parent directory to path |
| 33 | +sys.path.insert(0, '.') |
| 34 | + |
| 35 | +from cassandra.cqltypes import FloatType, DoubleType, Int32Type, lookup_casstype |
| 36 | +from cassandra.marshal import float_pack, double_pack, int32_pack |
| 37 | + |
| 38 | + |
| 39 | +def create_test_values(vector_size, element_type): |
| 40 | + """Create test values for serialization benchmarks.""" |
| 41 | + if element_type == FloatType: |
| 42 | + return [float(i * 0.1) for i in range(vector_size)] |
| 43 | + elif element_type == DoubleType: |
| 44 | + return [float(i * 0.1) for i in range(vector_size)] |
| 45 | + elif element_type == Int32Type: |
| 46 | + return list(range(vector_size)) |
| 47 | + else: |
| 48 | + raise ValueError(f"Unsupported element type: {element_type}") |
| 49 | + |
| 50 | + |
| 51 | +def benchmark_current_implementation(vector_type, values, iterations=10000): |
| 52 | + """Benchmark the current VectorType.serialize implementation (io.BytesIO loop).""" |
| 53 | + protocol_version = 4 |
| 54 | + |
| 55 | + start = time.perf_counter() |
| 56 | + for _ in range(iterations): |
| 57 | + result = vector_type.serialize(values, protocol_version) |
| 58 | + end = time.perf_counter() |
| 59 | + |
| 60 | + elapsed = end - start |
| 61 | + per_op = (elapsed / iterations) * 1_000_000 # microseconds |
| 62 | + |
| 63 | + return elapsed, per_op, result |
| 64 | + |
| 65 | + |
| 66 | +def benchmark_struct_pack(vector_type, values, iterations=10000): |
| 67 | + """Benchmark struct.pack batch format string optimization.""" |
| 68 | + vector_size = vector_type.vector_size |
| 69 | + subtype = vector_type.subtype |
| 70 | + |
| 71 | + # Determine format string |
| 72 | + if subtype is FloatType or (isinstance(subtype, type) and issubclass(subtype, FloatType)): |
| 73 | + format_str = f'>{vector_size}f' |
| 74 | + elif subtype is DoubleType or (isinstance(subtype, type) and issubclass(subtype, DoubleType)): |
| 75 | + format_str = f'>{vector_size}d' |
| 76 | + elif subtype is Int32Type or (isinstance(subtype, type) and issubclass(subtype, Int32Type)): |
| 77 | + format_str = f'>{vector_size}i' |
| 78 | + else: |
| 79 | + return None, None, None |
| 80 | + |
| 81 | + # Pre-compile the struct for fair comparison |
| 82 | + packer = struct.Struct(format_str) |
| 83 | + |
| 84 | + start = time.perf_counter() |
| 85 | + for _ in range(iterations): |
| 86 | + result = packer.pack(*values) |
| 87 | + end = time.perf_counter() |
| 88 | + |
| 89 | + elapsed = end - start |
| 90 | + per_op = (elapsed / iterations) * 1_000_000 # microseconds |
| 91 | + |
| 92 | + return elapsed, per_op, result |
| 93 | + |
| 94 | + |
| 95 | +def benchmark_cython_serializer(vector_type, values, iterations=10000): |
| 96 | + """Benchmark Cython SerVectorType serializer (when available).""" |
| 97 | + try: |
| 98 | + from cassandra.serializers import find_serializer |
| 99 | + except ImportError: |
| 100 | + return None, None, None |
| 101 | + |
| 102 | + protocol_version = 4 |
| 103 | + |
| 104 | + # Get the Cython serializer |
| 105 | + serializer = find_serializer(vector_type) |
| 106 | + |
| 107 | + # Check if we got the Cython serializer (not generic fallback) |
| 108 | + if serializer.__class__.__name__ != 'SerVectorType': |
| 109 | + return None, None, None |
| 110 | + |
| 111 | + start = time.perf_counter() |
| 112 | + for _ in range(iterations): |
| 113 | + result = serializer.serialize(values, protocol_version) |
| 114 | + end = time.perf_counter() |
| 115 | + |
| 116 | + elapsed = end - start |
| 117 | + per_op = (elapsed / iterations) * 1_000_000 # microseconds |
| 118 | + |
| 119 | + return elapsed, per_op, result |
| 120 | + |
| 121 | + |
| 122 | +def benchmark_bind_statement(vector_type, values, iterations=10000): |
| 123 | + """Benchmark BoundStatement.bind() end-to-end with 1 vector column. |
| 124 | +
|
| 125 | + This simulates the full bind path for a prepared statement with a single |
| 126 | + vector column, including column metadata lookup and serialization. |
| 127 | + """ |
| 128 | + from unittest.mock import MagicMock |
| 129 | + |
| 130 | + try: |
| 131 | + from cassandra.query import BoundStatement, PreparedStatement, UNSET_VALUE |
| 132 | + except ImportError: |
| 133 | + return None, None, None |
| 134 | + |
| 135 | + # Create a mock PreparedStatement with one vector column |
| 136 | + col_meta_mock = MagicMock() |
| 137 | + col_meta_mock.keyspace_name = 'test_ks' |
| 138 | + col_meta_mock.table_name = 'test_table' |
| 139 | + col_meta_mock.name = 'vec_col' |
| 140 | + col_meta_mock.type = vector_type |
| 141 | + |
| 142 | + prepared = MagicMock(spec=PreparedStatement) |
| 143 | + prepared.protocol_version = 4 |
| 144 | + prepared.column_metadata = [col_meta_mock] |
| 145 | + prepared.column_encryption_policy = None |
| 146 | + prepared.routing_key_indexes = None |
| 147 | + prepared.is_idempotent = False |
| 148 | + prepared.result_metadata = None |
| 149 | + prepared.keyspace = 'test_ks' |
| 150 | + |
| 151 | + start = time.perf_counter() |
| 152 | + for _ in range(iterations): |
| 153 | + bs = BoundStatement.__new__(BoundStatement) |
| 154 | + bs.prepared_statement = prepared |
| 155 | + bs.values = [] |
| 156 | + bs.raw_values = [values] |
| 157 | + # Inline the core serialization path (no CE policy) |
| 158 | + bs.values.append(vector_type.serialize(values, 4)) |
| 159 | + end = time.perf_counter() |
| 160 | + |
| 161 | + elapsed = end - start |
| 162 | + per_op = (elapsed / iterations) * 1_000_000 # microseconds |
| 163 | + |
| 164 | + return elapsed, per_op, bs.values[0] |
| 165 | + |
| 166 | + |
| 167 | +def verify_results(reference, *results): |
| 168 | + """Verify that all serialization results produce identical bytes.""" |
| 169 | + for i, result in enumerate(results): |
| 170 | + if result is None: |
| 171 | + continue |
| 172 | + if result != reference: |
| 173 | + print(f" Result {i} mismatch: {len(result)} bytes vs {len(reference)} bytes (reference)") |
| 174 | + # Show first divergence |
| 175 | + for j in range(min(len(result), len(reference))): |
| 176 | + if result[j] != reference[j]: |
| 177 | + print(f" First difference at byte {j}: {result[j]:#04x} vs {reference[j]:#04x}") |
| 178 | + break |
| 179 | + return False |
| 180 | + return True |
| 181 | + |
| 182 | + |
| 183 | +def run_benchmark_suite(vector_size, element_type, type_name, iterations=10000): |
| 184 | + """Run complete benchmark suite for a given vector configuration.""" |
| 185 | + sep = '=' * 80 |
| 186 | + print(f"\n{sep}") |
| 187 | + print(f"Benchmark: Vector<{type_name}, {vector_size}>") |
| 188 | + print(f"{sep}") |
| 189 | + print(f"Iterations: {iterations:,}") |
| 190 | + |
| 191 | + # Create vector type |
| 192 | + cass_typename = f'org.apache.cassandra.db.marshal.{element_type.__name__}' |
| 193 | + vector_typename = f'org.apache.cassandra.db.marshal.VectorType({cass_typename}, {vector_size})' |
| 194 | + vector_type = lookup_casstype(vector_typename) |
| 195 | + |
| 196 | + values = create_test_values(vector_size, element_type) |
| 197 | + |
| 198 | + # Get reference serialization for verification |
| 199 | + reference_bytes = vector_type.serialize(values, 4) |
| 200 | + data_size = len(reference_bytes) |
| 201 | + |
| 202 | + print(f"Serialized size: {data_size:,} bytes") |
| 203 | + print() |
| 204 | + |
| 205 | + # Collect results for verification |
| 206 | + all_results = [] |
| 207 | + |
| 208 | + # 1. Current implementation (baseline) |
| 209 | + print("1. Current implementation (io.BytesIO loop, baseline)...") |
| 210 | + elapsed, per_op, result = benchmark_current_implementation( |
| 211 | + vector_type, values, iterations) |
| 212 | + all_results.append(result) |
| 213 | + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} us") |
| 214 | + baseline_time = per_op |
| 215 | + |
| 216 | + # 2. struct.pack batch format string |
| 217 | + print("2. Python struct.pack batch format string...") |
| 218 | + elapsed, per_op, result = benchmark_struct_pack( |
| 219 | + vector_type, values, iterations) |
| 220 | + all_results.append(result) |
| 221 | + if per_op is not None: |
| 222 | + speedup = baseline_time / per_op |
| 223 | + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} us, Speedup: {speedup:.2f}x") |
| 224 | + else: |
| 225 | + print(" Not applicable for this type") |
| 226 | + |
| 227 | + # 3. Cython serializer |
| 228 | + print("3. Cython SerVectorType serializer...") |
| 229 | + elapsed, per_op, result = benchmark_cython_serializer( |
| 230 | + vector_type, values, iterations) |
| 231 | + all_results.append(result) |
| 232 | + if per_op is not None: |
| 233 | + speedup = baseline_time / per_op |
| 234 | + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} us, Speedup: {speedup:.2f}x") |
| 235 | + else: |
| 236 | + print(" Cython serializers not available") |
| 237 | + |
| 238 | + # 4. BoundStatement.bind() end-to-end |
| 239 | + print("4. BoundStatement.bind() end-to-end (1 vector column)...") |
| 240 | + elapsed, per_op, result = benchmark_bind_statement( |
| 241 | + vector_type, values, iterations) |
| 242 | + all_results.append(result) |
| 243 | + if per_op is not None: |
| 244 | + speedup = baseline_time / per_op |
| 245 | + print(f" Total: {elapsed:.4f}s, Per-op: {per_op:.2f} us, Overhead vs baseline: {speedup:.2f}x") |
| 246 | + else: |
| 247 | + print(" BoundStatement benchmark not available") |
| 248 | + |
| 249 | + # Verify results |
| 250 | + print("\nVerifying results...") |
| 251 | + if verify_results(reference_bytes, *all_results): |
| 252 | + print(" All results match!") |
| 253 | + else: |
| 254 | + print(" Result mismatch detected!") |
| 255 | + |
| 256 | + return baseline_time |
| 257 | + |
| 258 | + |
| 259 | +def main(): |
| 260 | + """Run all benchmarks.""" |
| 261 | + # Pin to single CPU core for consistent measurements |
| 262 | + try: |
| 263 | + import os |
| 264 | + os.sched_setaffinity(0, {0}) # Pin to CPU core 0 |
| 265 | + print("Pinned to CPU core 0 for consistent measurements") |
| 266 | + except (AttributeError, OSError) as e: |
| 267 | + print(f"Could not pin to single core: {e}") |
| 268 | + print("Running without CPU affinity...") |
| 269 | + |
| 270 | + sep = '=' * 80 |
| 271 | + print(sep) |
| 272 | + print("VectorType Serialization Performance Benchmark") |
| 273 | + print(sep) |
| 274 | + |
| 275 | + # Test configurations: (vector_size, element_type, type_name, iterations) |
| 276 | + test_configs = [ |
| 277 | + # Small vectors |
| 278 | + (3, FloatType, "float", 50000), |
| 279 | + |
| 280 | + # Medium vectors (common in ML) |
| 281 | + (128, FloatType, "float", 10000), |
| 282 | + |
| 283 | + # Large vectors (embeddings) |
| 284 | + (768, FloatType, "float", 5000), |
| 285 | + (1536, FloatType, "float", 2000), |
| 286 | + |
| 287 | + # Other types |
| 288 | + (128, DoubleType, "double", 10000), |
| 289 | + (768, DoubleType, "double", 5000), |
| 290 | + (1536, DoubleType, "double", 2000), |
| 291 | + (128, Int32Type, "int", 10000), |
| 292 | + ] |
| 293 | + |
| 294 | + summary = [] |
| 295 | + |
| 296 | + for vector_size, element_type, type_name, iterations in test_configs: |
| 297 | + baseline = run_benchmark_suite(vector_size, element_type, type_name, iterations) |
| 298 | + summary.append((f"Vector<{type_name}, {vector_size}>", baseline)) |
| 299 | + |
| 300 | + # Print summary |
| 301 | + print(f"\n{sep}") |
| 302 | + print("SUMMARY - Serialization Baseline Performance (io.BytesIO loop)") |
| 303 | + print(sep) |
| 304 | + for config, baseline_time in summary: |
| 305 | + print(f"{config:30s}: {baseline_time:8.2f} us") |
| 306 | + |
| 307 | + print(f"\n{sep}") |
| 308 | + print("Benchmark complete!") |
| 309 | + print(sep) |
| 310 | + |
| 311 | + |
| 312 | +if __name__ == '__main__': |
| 313 | + main() |
0 commit comments