""" Legacy benchmark using timeit library instead of pytest-benchmark. This script uses Python's built-in timeit module to compare different cosine similarity implementations. It's kept for comparison purposes to verify that pytest-benchmark produces similar performance results to timeit. First run: python 01-generate_embeddings.py Then run: python using_timeit.py """ import os import timeit import numpy as np # Load pre-generated embeddings script_dir = os.path.dirname(os.path.abspath(__file__)) embeddings_path = os.path.join(script_dir, 'genfiles', 'embeddings.npy') vectors = np.load(embeddings_path) print(f"Loaded embeddings with shape: {vectors.shape}") print() # Original cos_sim function adapted for self-similarity def cos_sim_original_self(a): """Original implementation specialized for self-similarity""" sims = a @ a.T norms = np.linalg.norm(a, axis=-1) a_normalized = (sims.T / norms.T).T sims = a_normalized / norms return sims # Nested for loop version - PROPERLY IMPLEMENTED (norms calculated once) def cos_sim_nested_loop_self(a): """Naive nested loop but with norms calculated once using numpy""" n = a.shape[0] sims = np.zeros((n, n)) # Calculate ALL norms once using vectorized numpy (not in the loop!) norms = np.linalg.norm(a, axis=-1) for i in range(n): for j in range(n): dot_product = np.dot(a[i], a[j]) sims[i, j] = dot_product / (norms[i] * norms[j]) return sims # E*E^T with manual in-place normalization def cos_sim_inplace_norm_self(a): """In-place normalization specialized for self-similarity""" # Compute raw dot products sims = a @ a.T # Compute norms ONCE (not separate a_norms and b_norms) norms = np.linalg.norm(a, axis=-1) # Normalize in place for i in range(sims.shape[0]): for j in range(sims.shape[1]): sims[i, j] = sims[i, j] / (norms[i] * norms[j]) return sims # Broadcast division with in-place operations def cos_sim_broadcast_inplace_self(a): """Broadcast in-place specialized for self-similarity""" # Compute raw dot products sims = a @ a.T # Compute norms ONCE with keepdims for broadcasting norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1) # Normalize in-place using broadcasting # Divide by norms (broadcasting across columns) sims /= norms # Divide by norms.T (broadcasting across rows) sims /= norms.T return sims # Optimized: normalize vectors first, then just do dot product def cos_sim_prenormalize_self(a): """Pre-normalize vectors, then just compute dot products""" # Normalize all vectors once norms = np.linalg.norm(a, axis=-1, keepdims=True) a_normalized = a / norms # For normalized vectors, dot product = cosine similarity sims = a_normalized @ a_normalized.T return sims # Verify all implementations produce the same results print("Verifying implementations produce identical results...") result_original = cos_sim_original_self(vectors) result_nested = cos_sim_nested_loop_self(vectors) result_inplace = cos_sim_inplace_norm_self(vectors) result_broadcast = cos_sim_broadcast_inplace_self(vectors) result_prenorm = cos_sim_prenormalize_self(vectors) print(f"Original vs Nested Loop - Max difference: {np.max(np.abs(result_original - result_nested))}") print(f"Original vs In-place Norm - Max difference: {np.max(np.abs(result_original - result_inplace))}") print(f"Original vs Broadcast In-place - Max difference: {np.max(np.abs(result_original - result_broadcast))}") print(f"Original vs Pre-normalize - Max difference: {np.max(np.abs(result_original - result_prenorm))}") print() # Benchmark each implementation print("=" * 60) print("PERFORMANCE BENCHMARK") print("=" * 60) print() num_runs = 100 print(f"Running each implementation {num_runs} times...") print() # Benchmark original implementation time_original = timeit.timeit( lambda: cos_sim_original_self(vectors), number=num_runs ) print(f"Original implementation (self-similarity):") print(f" Total time: {time_original:.4f} seconds") print(f" Average per run: {time_original/num_runs*1000:.4f} ms") print() # Benchmark nested loop implementation time_nested = timeit.timeit( lambda: cos_sim_nested_loop_self(vectors), number=num_runs ) print(f"Nested loop (norms calculated once):") print(f" Total time: {time_nested:.4f} seconds") print(f" Average per run: {time_nested/num_runs*1000:.4f} ms") print(f" Slowdown vs original: {time_nested/time_original:.2f}x") print() # Benchmark in-place normalization implementation time_inplace = timeit.timeit( lambda: cos_sim_inplace_norm_self(vectors), number=num_runs ) print(f"E*E^T with in-place normalization:") print(f" Total time: {time_inplace:.4f} seconds") print(f" Average per run: {time_inplace/num_runs*1000:.4f} ms") print(f" Slowdown vs original: {time_inplace/time_original:.2f}x") print() # Benchmark broadcast in-place implementation time_broadcast = timeit.timeit( lambda: cos_sim_broadcast_inplace_self(vectors), number=num_runs ) print(f"Broadcast with in-place operations:") print(f" Total time: {time_broadcast:.4f} seconds") print(f" Average per run: {time_broadcast/num_runs*1000:.4f} ms") print(f" Speedup vs original: {time_original/time_broadcast:.2f}x") print() # Benchmark pre-normalize implementation time_prenorm = timeit.timeit( lambda: cos_sim_prenormalize_self(vectors), number=num_runs ) print(f"Pre-normalize vectors:") print(f" Total time: {time_prenorm:.4f} seconds") print(f" Average per run: {time_prenorm/num_runs*1000:.4f} ms") print(f" Speedup vs original: {time_original/time_prenorm:.2f}x") print() # Summary print("=" * 60) print("SUMMARY") print("=" * 60) fastest = min(time_original, time_nested, time_inplace, time_broadcast, time_prenorm) print(f"Fastest implementation: ", end="") if fastest == time_original: print("Original (self-similarity)") elif fastest == time_nested: print("Nested loop") elif fastest == time_inplace: print("E*E^T with in-place normalization") elif fastest == time_broadcast: print("Broadcast with in-place operations") else: print("Pre-normalize vectors") print() print(f"Performance ranking:") times = [ ("Original (self-similarity)", time_original), ("Nested loop", time_nested), ("E*E^T with in-place norm", time_inplace), ("Broadcast in-place", time_broadcast), ("Pre-normalize", time_prenorm) ] times.sort(key=lambda x: x[1]) for i, (name, time) in enumerate(times, 1): print(f" {i}. {name}: {time/num_runs*1000:.4f} ms per run ({time/fastest:.2f}x vs fastest)")