salience-editor/api/benchmarks/using_timeit.py

198 lines
6.5 KiB
Python

"""
Legacy benchmark using timeit library instead of pytest-benchmark.
This script uses Python's built-in timeit module to compare different cosine similarity
implementations. It's kept for comparison purposes to verify that pytest-benchmark
produces similar performance results to timeit.
First run: python 01-generate_embeddings.py
Then run: python using_timeit.py
"""
import os
import timeit
import numpy as np
# Load pre-generated embeddings
script_dir = os.path.dirname(os.path.abspath(__file__))
embeddings_path = os.path.join(script_dir, 'genfiles', 'embeddings.npy')
vectors = np.load(embeddings_path)
print(f"Loaded embeddings with shape: {vectors.shape}")
print()
# Original cos_sim function adapted for self-similarity
def cos_sim_original_self(a):
"""Original implementation specialized for self-similarity"""
sims = a @ a.T
norms = np.linalg.norm(a, axis=-1)
a_normalized = (sims.T / norms.T).T
sims = a_normalized / norms
return sims
# Nested for loop version - PROPERLY IMPLEMENTED (norms calculated once)
def cos_sim_nested_loop_self(a):
"""Naive nested loop but with norms calculated once using numpy"""
n = a.shape[0]
sims = np.zeros((n, n))
# Calculate ALL norms once using vectorized numpy (not in the loop!)
norms = np.linalg.norm(a, axis=-1)
for i in range(n):
for j in range(n):
dot_product = np.dot(a[i], a[j])
sims[i, j] = dot_product / (norms[i] * norms[j])
return sims
# E*E^T with manual in-place normalization
def cos_sim_inplace_norm_self(a):
"""In-place normalization specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE (not separate a_norms and b_norms)
norms = np.linalg.norm(a, axis=-1)
# Normalize in place
for i in range(sims.shape[0]):
for j in range(sims.shape[1]):
sims[i, j] = sims[i, j] / (norms[i] * norms[j])
return sims
# Broadcast division with in-place operations
def cos_sim_broadcast_inplace_self(a):
"""Broadcast in-place specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE with keepdims for broadcasting
norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1)
# Normalize in-place using broadcasting
# Divide by norms (broadcasting across columns)
sims /= norms
# Divide by norms.T (broadcasting across rows)
sims /= norms.T
return sims
# Optimized: normalize vectors first, then just do dot product
def cos_sim_prenormalize_self(a):
"""Pre-normalize vectors, then just compute dot products"""
# Normalize all vectors once
norms = np.linalg.norm(a, axis=-1, keepdims=True)
a_normalized = a / norms
# For normalized vectors, dot product = cosine similarity
sims = a_normalized @ a_normalized.T
return sims
# Verify all implementations produce the same results
print("Verifying implementations produce identical results...")
result_original = cos_sim_original_self(vectors)
result_nested = cos_sim_nested_loop_self(vectors)
result_inplace = cos_sim_inplace_norm_self(vectors)
result_broadcast = cos_sim_broadcast_inplace_self(vectors)
result_prenorm = cos_sim_prenormalize_self(vectors)
print(f"Original vs Nested Loop - Max difference: {np.max(np.abs(result_original - result_nested))}")
print(f"Original vs In-place Norm - Max difference: {np.max(np.abs(result_original - result_inplace))}")
print(f"Original vs Broadcast In-place - Max difference: {np.max(np.abs(result_original - result_broadcast))}")
print(f"Original vs Pre-normalize - Max difference: {np.max(np.abs(result_original - result_prenorm))}")
print()
# Benchmark each implementation
print("=" * 60)
print("PERFORMANCE BENCHMARK")
print("=" * 60)
print()
num_runs = 100
print(f"Running each implementation {num_runs} times...")
print()
# Benchmark original implementation
time_original = timeit.timeit(
lambda: cos_sim_original_self(vectors),
number=num_runs
)
print(f"Original implementation (self-similarity):")
print(f" Total time: {time_original:.4f} seconds")
print(f" Average per run: {time_original/num_runs*1000:.4f} ms")
print()
# Benchmark nested loop implementation
time_nested = timeit.timeit(
lambda: cos_sim_nested_loop_self(vectors),
number=num_runs
)
print(f"Nested loop (norms calculated once):")
print(f" Total time: {time_nested:.4f} seconds")
print(f" Average per run: {time_nested/num_runs*1000:.4f} ms")
print(f" Slowdown vs original: {time_nested/time_original:.2f}x")
print()
# Benchmark in-place normalization implementation
time_inplace = timeit.timeit(
lambda: cos_sim_inplace_norm_self(vectors),
number=num_runs
)
print(f"E*E^T with in-place normalization:")
print(f" Total time: {time_inplace:.4f} seconds")
print(f" Average per run: {time_inplace/num_runs*1000:.4f} ms")
print(f" Slowdown vs original: {time_inplace/time_original:.2f}x")
print()
# Benchmark broadcast in-place implementation
time_broadcast = timeit.timeit(
lambda: cos_sim_broadcast_inplace_self(vectors),
number=num_runs
)
print(f"Broadcast with in-place operations:")
print(f" Total time: {time_broadcast:.4f} seconds")
print(f" Average per run: {time_broadcast/num_runs*1000:.4f} ms")
print(f" Speedup vs original: {time_original/time_broadcast:.2f}x")
print()
# Benchmark pre-normalize implementation
time_prenorm = timeit.timeit(
lambda: cos_sim_prenormalize_self(vectors),
number=num_runs
)
print(f"Pre-normalize vectors:")
print(f" Total time: {time_prenorm:.4f} seconds")
print(f" Average per run: {time_prenorm/num_runs*1000:.4f} ms")
print(f" Speedup vs original: {time_original/time_prenorm:.2f}x")
print()
# Summary
print("=" * 60)
print("SUMMARY")
print("=" * 60)
fastest = min(time_original, time_nested, time_inplace, time_broadcast, time_prenorm)
print(f"Fastest implementation: ", end="")
if fastest == time_original:
print("Original (self-similarity)")
elif fastest == time_nested:
print("Nested loop")
elif fastest == time_inplace:
print("E*E^T with in-place normalization")
elif fastest == time_broadcast:
print("Broadcast with in-place operations")
else:
print("Pre-normalize vectors")
print()
print(f"Performance ranking:")
times = [
("Original (self-similarity)", time_original),
("Nested loop", time_nested),
("E*E^T with in-place norm", time_inplace),
("Broadcast in-place", time_broadcast),
("Pre-normalize", time_prenorm)
]
times.sort(key=lambda x: x[1])
for i, (name, time) in enumerate(times, 1):
print(f" {i}. {name}: {time/num_runs*1000:.4f} ms per run ({time/fastest:.2f}x vs fastest)")