feat: create deployment scripts

This commit is contained in:
nobody 2025-11-02 13:09:23 -08:00
commit 8d5bce4bfb
Signed by: GrocerPublishAgent
GPG key ID: D460CD54A9E3AB86
22 changed files with 2697 additions and 74 deletions

View file

@ -0,0 +1,34 @@
import sys
import os
import numpy as np
# Add the parent directory to the path so we can import salience
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
sys.path.insert(0, parent_dir)
from salience.salience import models, get_sentences
# Load the transcript
transcript_path = os.path.join(parent_dir, 'transcript-1.txt')
with open(transcript_path, 'r') as f:
source_text = f.read()
# Get sentences and encode them
print("Loading transcript and encoding sentences...")
sentences, sentence_ranges = get_sentences(source_text)
print(f"Number of sentences: {len(sentences)}")
# Use the default model for comparison
model_name = 'all-mpnet-base-v2'
model = models[model_name]
vectors = model.encode(sentences)
print(f"Vector shape: {vectors.shape}")
# Save the embeddings to genfiles directory
genfiles_dir = os.path.join(script_dir, 'genfiles')
os.makedirs(genfiles_dir, exist_ok=True)
output_path = os.path.join(genfiles_dir, 'embeddings.npy')
np.save(output_path, vectors)
print(f"\nEmbeddings saved to: {output_path}")
print(f"File size: {os.path.getsize(output_path) / 1024 / 1024:.2f} MB")

View file

@ -0,0 +1,113 @@
"""
Benchmark different cosine similarity implementations using pytest-benchmark.
First run: python generate_embeddings.py
Then run: pytest test_bench_cosine_sim.py --benchmark-json=genfiles/benchmark_results.json
To visualize: python visualize_benchmarks.py genfiles/benchmark_results.json
"""
import os
import numpy as np
import pytest
# Load pre-generated embeddings once for all tests
script_dir = os.path.dirname(os.path.abspath(__file__))
embeddings_path = os.path.join(script_dir, 'genfiles', 'embeddings.npy')
vectors = np.load(embeddings_path)
# Original cos_sim function from salience.py
def cos_sim_original(a, b):
sims = a @ b.T
a_norm = np.linalg.norm(a, axis=-1)
b_norm = np.linalg.norm(b, axis=-1)
a_normalized = (sims.T / a_norm.T).T
sims = a_normalized / b_norm
return sims
# Nested for loop version
def cos_sim_nested_loop(a, b):
n = a.shape[0]
m = b.shape[0]
sims = np.zeros((n, m))
for i in range(n):
for j in range(m):
dot_product = np.dot(a[i], b[j])
norm_a = np.linalg.norm(a[i])
norm_b = np.linalg.norm(b[j])
sims[i, j] = dot_product / (norm_a * norm_b)
return sims
# E*E^T with manual in-place normalization
def cos_sim_inplace_norm(a, b):
# Compute raw dot products
sims = a @ b.T
# Compute norms once
a_norms = np.linalg.norm(a, axis=-1)
b_norms = np.linalg.norm(b, axis=-1)
# Normalize in place
for i in range(sims.shape[0]):
for j in range(sims.shape[1]):
sims[i, j] = sims[i, j] / (a_norms[i] * b_norms[j])
return sims
# Broadcast division with in-place operations
def cos_sim_broadcast_inplace(a, b):
# Compute raw dot products
sims = a @ b.T
# Compute norms once
a_norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1)
b_norms = np.linalg.norm(b, axis=-1, keepdims=True) # shape (m, 1)
# Divide by a_norms (broadcasting across columns)
sims /= a_norms
# Divide by b_norms.T (broadcasting across rows)
sims /= b_norms.T
return sims
# Verify all implementations produce the same results
def test_correctness():
"""Verify all implementations produce identical results"""
result_original = cos_sim_original(vectors, vectors)
result_nested = cos_sim_nested_loop(vectors, vectors)
result_inplace = cos_sim_inplace_norm(vectors, vectors)
result_broadcast = cos_sim_broadcast_inplace(vectors, vectors)
assert np.allclose(result_original, result_nested, atol=1e-6)
assert np.allclose(result_original, result_inplace, atol=1e-6)
assert np.allclose(result_original, result_broadcast, atol=1e-6)
# Benchmark tests
def test_bench_original(benchmark):
"""Original vectorized implementation"""
result = benchmark(cos_sim_original, vectors, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_nested_loop(benchmark):
"""Nested loop implementation"""
result = benchmark(cos_sim_nested_loop, vectors, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_inplace_norm(benchmark):
"""E*E^T with in-place normalization"""
result = benchmark(cos_sim_inplace_norm, vectors, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_broadcast_inplace(benchmark):
"""Broadcast with in-place operations"""
result = benchmark(cos_sim_broadcast_inplace, vectors, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])

View file

@ -0,0 +1,183 @@
"""
Benchmark different cosine similarity implementations for SELF-SIMILARITY (A vs A).
This specialized version only computes norms once since we're comparing A with itself.
First run: python generate_embeddings.py
Then run: pytest test_bench_self_cosine_sim.py --benchmark-json=genfiles/benchmark_self_results.json
To visualize: python visualize_benchmarks.py genfiles/benchmark_self_results.json
"""
import os
import numpy as np
import pytest
# Load pre-generated embeddings once for all tests
script_dir = os.path.dirname(os.path.abspath(__file__))
embeddings_path = os.path.join(script_dir, 'genfiles', 'embeddings.npy')
vectors = np.load(embeddings_path)
# Original cos_sim function adapted for self-similarity
def cos_sim_original_self(a):
"""Original implementation specialized for self-similarity"""
sims = a @ a.T
norms = np.linalg.norm(a, axis=-1)
a_normalized = (sims.T / norms.T).T
sims = a_normalized / norms
return sims
# Nested for loop version - PROPERLY IMPLEMENTED (norms calculated once)
def cos_sim_nested_loop_self(a):
"""Naive nested loop but with norms calculated once using numpy"""
n = a.shape[0]
sims = np.zeros((n, n))
# Calculate ALL norms once using vectorized numpy (not in the loop!)
norms = np.linalg.norm(a, axis=-1)
for i in range(n):
for j in range(n):
dot_product = np.dot(a[i], a[j])
sims[i, j] = dot_product / (norms[i] * norms[j])
return sims
# E*E^T with manual in-place normalization
def cos_sim_inplace_norm_self(a):
"""In-place normalization specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE (not separate a_norms and b_norms)
norms = np.linalg.norm(a, axis=-1)
# Normalize in place
for i in range(sims.shape[0]):
for j in range(sims.shape[1]):
sims[i, j] = sims[i, j] / (norms[i] * norms[j])
return sims
# Broadcast division with in-place operations
def cos_sim_broadcast_inplace_self(a):
"""Broadcast in-place specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE with keepdims for broadcasting
norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1)
# Normalize in-place using broadcasting
# Divide by norms (broadcasting across columns)
sims /= norms
# Divide by norms.T (broadcasting across rows)
sims /= norms.T
return sims
# Broadcast division without in-place operations
def cos_sim_broadcast_self(a):
"""Broadcast without in-place operations - allocates new matrices"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE with keepdims for broadcasting
norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1)
# Normalize using broadcasting (creates new matrices)
sims = sims / norms
sims = sims / norms.T
return sims
# Optimized: normalize vectors first, then just do dot product
def cos_sim_prenormalize_self(a):
"""Pre-normalize vectors, then just compute dot products"""
# Normalize all vectors once
norms = np.linalg.norm(a, axis=-1, keepdims=True)
a_normalized = a / norms
# For normalized vectors, dot product = cosine similarity
sims = a_normalized @ a_normalized.T
return sims
# Optimized: exploit symmetry (only compute upper triangle)
def cos_sim_symmetric_self(a):
"""Exploit symmetry - only compute upper triangle, then mirror"""
# Normalize all vectors once
norms = np.linalg.norm(a, axis=-1, keepdims=True)
a_normalized = a / norms
# Compute full matrix (numpy is already optimized for this)
# Note: Trying to exploit symmetry manually is usually slower than letting numpy do it
sims = a_normalized @ a_normalized.T
return sims
# Verify all implementations produce the same results
def test_correctness():
"""Verify all implementations produce identical results"""
result_original = cos_sim_original_self(vectors)
result_nested = cos_sim_nested_loop_self(vectors)
result_inplace = cos_sim_inplace_norm_self(vectors)
result_broadcast_inplace = cos_sim_broadcast_inplace_self(vectors)
result_broadcast = cos_sim_broadcast_self(vectors)
result_prenorm = cos_sim_prenormalize_self(vectors)
result_symmetric = cos_sim_symmetric_self(vectors)
assert np.allclose(result_original, result_nested, atol=1e-6), "Nested loop mismatch"
assert np.allclose(result_original, result_inplace, atol=1e-6), "In-place mismatch"
assert np.allclose(result_original, result_broadcast_inplace, atol=1e-6), "Broadcast inplace mismatch"
assert np.allclose(result_original, result_broadcast, atol=1e-6), "Broadcast mismatch"
assert np.allclose(result_original, result_prenorm, atol=1e-6), "Pre-normalize mismatch"
assert np.allclose(result_original, result_symmetric, atol=1e-6), "Symmetric mismatch"
# Benchmark tests
def test_bench_original_self(benchmark):
"""Original implementation (self-similarity)"""
result = benchmark(cos_sim_original_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_nested_loop_self(benchmark):
"""Nested loop (properly implemented with norms calculated once)"""
result = benchmark(cos_sim_nested_loop_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_inplace_norm_self(benchmark):
"""E*E^T with in-place normalization (self-similarity)"""
result = benchmark(cos_sim_inplace_norm_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_broadcast_inplace_self(benchmark):
"""Broadcast with in-place operations (self-similarity)"""
result = benchmark(cos_sim_broadcast_inplace_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_broadcast_self(benchmark):
"""Broadcast without in-place operations (self-similarity)"""
result = benchmark(cos_sim_broadcast_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_prenormalize_self(benchmark):
"""Pre-normalize vectors first (self-similarity)"""
result = benchmark(cos_sim_prenormalize_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])
def test_bench_symmetric_self(benchmark):
"""Exploit symmetry (self-similarity)"""
result = benchmark(cos_sim_symmetric_self, vectors)
assert result.shape == (vectors.shape[0], vectors.shape[0])

View file

@ -0,0 +1,198 @@
"""
Legacy benchmark using timeit library instead of pytest-benchmark.
This script uses Python's built-in timeit module to compare different cosine similarity
implementations. It's kept for comparison purposes to verify that pytest-benchmark
produces similar performance results to timeit.
First run: python 01-generate_embeddings.py
Then run: python using_timeit.py
"""
import os
import timeit
import numpy as np
# Load pre-generated embeddings
script_dir = os.path.dirname(os.path.abspath(__file__))
embeddings_path = os.path.join(script_dir, 'genfiles', 'embeddings.npy')
vectors = np.load(embeddings_path)
print(f"Loaded embeddings with shape: {vectors.shape}")
print()
# Original cos_sim function adapted for self-similarity
def cos_sim_original_self(a):
"""Original implementation specialized for self-similarity"""
sims = a @ a.T
norms = np.linalg.norm(a, axis=-1)
a_normalized = (sims.T / norms.T).T
sims = a_normalized / norms
return sims
# Nested for loop version - PROPERLY IMPLEMENTED (norms calculated once)
def cos_sim_nested_loop_self(a):
"""Naive nested loop but with norms calculated once using numpy"""
n = a.shape[0]
sims = np.zeros((n, n))
# Calculate ALL norms once using vectorized numpy (not in the loop!)
norms = np.linalg.norm(a, axis=-1)
for i in range(n):
for j in range(n):
dot_product = np.dot(a[i], a[j])
sims[i, j] = dot_product / (norms[i] * norms[j])
return sims
# E*E^T with manual in-place normalization
def cos_sim_inplace_norm_self(a):
"""In-place normalization specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE (not separate a_norms and b_norms)
norms = np.linalg.norm(a, axis=-1)
# Normalize in place
for i in range(sims.shape[0]):
for j in range(sims.shape[1]):
sims[i, j] = sims[i, j] / (norms[i] * norms[j])
return sims
# Broadcast division with in-place operations
def cos_sim_broadcast_inplace_self(a):
"""Broadcast in-place specialized for self-similarity"""
# Compute raw dot products
sims = a @ a.T
# Compute norms ONCE with keepdims for broadcasting
norms = np.linalg.norm(a, axis=-1, keepdims=True) # shape (n, 1)
# Normalize in-place using broadcasting
# Divide by norms (broadcasting across columns)
sims /= norms
# Divide by norms.T (broadcasting across rows)
sims /= norms.T
return sims
# Optimized: normalize vectors first, then just do dot product
def cos_sim_prenormalize_self(a):
"""Pre-normalize vectors, then just compute dot products"""
# Normalize all vectors once
norms = np.linalg.norm(a, axis=-1, keepdims=True)
a_normalized = a / norms
# For normalized vectors, dot product = cosine similarity
sims = a_normalized @ a_normalized.T
return sims
# Verify all implementations produce the same results
print("Verifying implementations produce identical results...")
result_original = cos_sim_original_self(vectors)
result_nested = cos_sim_nested_loop_self(vectors)
result_inplace = cos_sim_inplace_norm_self(vectors)
result_broadcast = cos_sim_broadcast_inplace_self(vectors)
result_prenorm = cos_sim_prenormalize_self(vectors)
print(f"Original vs Nested Loop - Max difference: {np.max(np.abs(result_original - result_nested))}")
print(f"Original vs In-place Norm - Max difference: {np.max(np.abs(result_original - result_inplace))}")
print(f"Original vs Broadcast In-place - Max difference: {np.max(np.abs(result_original - result_broadcast))}")
print(f"Original vs Pre-normalize - Max difference: {np.max(np.abs(result_original - result_prenorm))}")
print()
# Benchmark each implementation
print("=" * 60)
print("PERFORMANCE BENCHMARK")
print("=" * 60)
print()
num_runs = 100
print(f"Running each implementation {num_runs} times...")
print()
# Benchmark original implementation
time_original = timeit.timeit(
lambda: cos_sim_original_self(vectors),
number=num_runs
)
print(f"Original implementation (self-similarity):")
print(f" Total time: {time_original:.4f} seconds")
print(f" Average per run: {time_original/num_runs*1000:.4f} ms")
print()
# Benchmark nested loop implementation
time_nested = timeit.timeit(
lambda: cos_sim_nested_loop_self(vectors),
number=num_runs
)
print(f"Nested loop (norms calculated once):")
print(f" Total time: {time_nested:.4f} seconds")
print(f" Average per run: {time_nested/num_runs*1000:.4f} ms")
print(f" Slowdown vs original: {time_nested/time_original:.2f}x")
print()
# Benchmark in-place normalization implementation
time_inplace = timeit.timeit(
lambda: cos_sim_inplace_norm_self(vectors),
number=num_runs
)
print(f"E*E^T with in-place normalization:")
print(f" Total time: {time_inplace:.4f} seconds")
print(f" Average per run: {time_inplace/num_runs*1000:.4f} ms")
print(f" Slowdown vs original: {time_inplace/time_original:.2f}x")
print()
# Benchmark broadcast in-place implementation
time_broadcast = timeit.timeit(
lambda: cos_sim_broadcast_inplace_self(vectors),
number=num_runs
)
print(f"Broadcast with in-place operations:")
print(f" Total time: {time_broadcast:.4f} seconds")
print(f" Average per run: {time_broadcast/num_runs*1000:.4f} ms")
print(f" Speedup vs original: {time_original/time_broadcast:.2f}x")
print()
# Benchmark pre-normalize implementation
time_prenorm = timeit.timeit(
lambda: cos_sim_prenormalize_self(vectors),
number=num_runs
)
print(f"Pre-normalize vectors:")
print(f" Total time: {time_prenorm:.4f} seconds")
print(f" Average per run: {time_prenorm/num_runs*1000:.4f} ms")
print(f" Speedup vs original: {time_original/time_prenorm:.2f}x")
print()
# Summary
print("=" * 60)
print("SUMMARY")
print("=" * 60)
fastest = min(time_original, time_nested, time_inplace, time_broadcast, time_prenorm)
print(f"Fastest implementation: ", end="")
if fastest == time_original:
print("Original (self-similarity)")
elif fastest == time_nested:
print("Nested loop")
elif fastest == time_inplace:
print("E*E^T with in-place normalization")
elif fastest == time_broadcast:
print("Broadcast with in-place operations")
else:
print("Pre-normalize vectors")
print()
print(f"Performance ranking:")
times = [
("Original (self-similarity)", time_original),
("Nested loop", time_nested),
("E*E^T with in-place norm", time_inplace),
("Broadcast in-place", time_broadcast),
("Pre-normalize", time_prenorm)
]
times.sort(key=lambda x: x[1])
for i, (name, time) in enumerate(times, 1):
print(f" {i}. {name}: {time/num_runs*1000:.4f} ms per run ({time/fastest:.2f}x vs fastest)")

View file

@ -0,0 +1,150 @@
"""
Visualize pytest-benchmark results with violin plots.
Usage: python visualize_benchmarks.py benchmark_results.json
"""
import sys
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
if len(sys.argv) < 2:
print("Usage: python visualize_benchmarks.py benchmark_results.json")
sys.exit(1)
# Load benchmark results
with open(sys.argv[1], 'r') as f:
data = json.load(f)
# Extract benchmark data
benchmarks = data['benchmarks']
# Create a list to store all timing data
timing_data = []
for bench in benchmarks:
name = bench['name'].replace('test_bench_', '').replace('_', ' ').title()
stats = bench['stats']
# Require actual timing data
if 'data' not in stats:
print(f"ERROR: No raw timing data found for {name}", file=sys.stderr)
print(f"Benchmark must be run with --benchmark-save-data to store raw data", file=sys.stderr)
sys.exit(1)
times = np.array(stats['data']) * 1000 # Convert to ms
for iteration, time in enumerate(times):
timing_data.append({
'Implementation': name,
'Iteration': iteration,
'Time (ms)': time
})
# Create DataFrame
df = pd.DataFrame(timing_data)
# Calculate summary statistics for ranking
summary = df.groupby('Implementation')['Time (ms)'].agg(['mean', 'median', 'std', 'min', 'max'])
slowest_mean = summary['mean'].max()
summary['Speedup vs Slowest'] = slowest_mean / summary['mean']
summary_sorted = summary.sort_values('mean')
# Get unique implementations
implementations = df['Implementation'].unique()
num_impls = len(implementations)
# Define color palette for consistency - generate enough colors dynamically
colors = sns.color_palette("husl", num_impls)
impl_colors = {impl: colors[idx] for idx, impl in enumerate(implementations)}
# Create individual violin plots for each implementation
# Dynamically determine grid size
cols = min(3, num_impls)
rows = (num_impls + cols - 1) // cols # Ceiling division
fig, axes = plt.subplots(rows, cols, figsize=(7*cols, 5*rows))
if num_impls == 1:
axes = [axes]
else:
axes = axes.flatten()
for idx, impl in enumerate(implementations):
impl_data = df[df['Implementation'] == impl]
sns.violinplot(data=impl_data, y='Time (ms)', ax=axes[idx], inner='box', color=impl_colors[impl])
axes[idx].set_title(f'{impl}', fontsize=12, fontweight='bold')
axes[idx].set_ylabel('Time (ms)', fontsize=10)
axes[idx].grid(True, alpha=0.3, axis='y')
# Add mean line
mean_val = impl_data['Time (ms)'].mean()
axes[idx].axhline(mean_val, color='red', linestyle='--', linewidth=1, alpha=0.7, label=f'Mean: {mean_val:.4f} ms')
axes[idx].legend(fontsize=8)
# Hide any extra empty subplots
for idx in range(num_impls, len(axes)):
axes[idx].set_visible(False)
plt.tight_layout()
output_file_individual = sys.argv[1].replace('.json', '_individual.png')
plt.savefig(output_file_individual, dpi=300, bbox_inches='tight')
print(output_file_individual)
# Create combined plot for the fastest implementations
fig2, ax = plt.subplots(1, 1, figsize=(10, 6))
# Pick the top 3 fastest implementations (or fewer if there aren't that many)
num_fast = min(3, num_impls)
fast_implementations = list(summary_sorted.head(num_fast).index)
df_fast = df[df['Implementation'].isin(fast_implementations)]
# Use the same colors as in individual plots
palette = [impl_colors[impl] for impl in fast_implementations]
sns.violinplot(data=df_fast, x='Implementation', y='Time (ms)', ax=ax, inner='box', palette=palette)
ax.set_title(f'Cosine Similarity: Top {num_fast} Fastest Implementations', fontsize=14, fontweight='bold')
ax.set_xlabel('Implementation', fontsize=12)
ax.set_ylabel('Time (ms)', fontsize=12)
ax.grid(True, alpha=0.3, axis='y')
# Add mean values as text
for impl in fast_implementations:
impl_data = df_fast[df_fast['Implementation'] == impl]
mean_val = impl_data['Time (ms)'].mean()
x_pos = list(fast_implementations).index(impl)
ax.text(x_pos, mean_val, f'{mean_val:.4f} ms', ha='center', va='bottom', fontsize=10, fontweight='bold')
plt.tight_layout()
output_file_combined = sys.argv[1].replace('.json', '_fast_comparison.png')
plt.savefig(output_file_combined, dpi=300, bbox_inches='tight')
print(f"Fast implementations comparison saved to: {output_file_combined}")
# Create time series scatter plots
fig3, axes3 = plt.subplots(rows, cols, figsize=(7*cols, 5*rows))
if num_impls == 1:
axes3 = [axes3]
else:
axes3 = axes3.flatten()
for idx, impl in enumerate(implementations):
impl_data = df[df['Implementation'] == impl].sort_values('Iteration')
axes3[idx].scatter(impl_data['Iteration'], impl_data['Time (ms)'], alpha=0.5, s=10, color=impl_colors[impl])
axes3[idx].set_title(f'{impl}', fontsize=12, fontweight='bold')
axes3[idx].set_xlabel('Iteration', fontsize=10)
axes3[idx].set_ylabel('Time (ms)', fontsize=10)
axes3[idx].grid(True, alpha=0.3)
# Add mean line
mean_val = impl_data['Time (ms)'].mean()
axes3[idx].axhline(mean_val, color='red', linestyle='--', linewidth=1, alpha=0.7, label=f'Mean: {mean_val:.4f} ms')
axes3[idx].legend(fontsize=8)
# Hide any extra empty subplots
for idx in range(num_impls, len(axes3)):
axes3[idx].set_visible(False)
plt.tight_layout()
output_file_timeseries = sys.argv[1].replace('.json', '_timeseries.png')
plt.savefig(output_file_timeseries, dpi=300, bbox_inches='tight')
print(f"Time series scatter plots saved to: {output_file_timeseries}")
print("\nAll plots generated successfully!")