mindef-overdracht/llm-throughput-tests-mindef-metadateren/benchmark_llm.py
2026-06-02 11:46:20 +02:00

1194 lines
45 KiB
Python

"""
LLM Benchmarking Tool
Benchmarks LLM deployments via OpenAI-compatible endpoints with async concurrency.
Supports YAML config files and CLI arguments.
"""
import asyncio
import time
import argparse
import json
import yaml
import logging
import httpx
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from openai import AsyncOpenAI, RateLimitError
# ============================================================================
# CONSTANTS
# ============================================================================
TOKEN_TO_CHAR_RATIO = 7
CONFIDENCE_95_Z_SCORE = 1.96
MIN_HTTP_CONNECTIONS = 200
CONNECTION_MULTIPLIER = 2
TOKEN_VALIDATION_THRESHOLD = 100
SHUTDOWN_SENTINEL = object()
# ============================================================================
# DATA MODELS
# ============================================================================
@dataclass
class BenchmarkConfig:
"""Configuration for a single benchmark run."""
input_tokens: int
batch_size: int
num_batches: int
output_tokens: int
@dataclass
class RequestResult:
"""Results from a single request."""
total_tokens: int
content_tokens: int
reasoning_tokens: int
elapsed_time: float
time_to_first_token: float
prompt_tokens: int # Actual input tokens from API response
start_time: float
end_time: float
success: bool = True
error_message: Optional[str] = None
batch_id: Optional[int] = None # Batch identifier if using batching
requests_in_batch: Optional[int] = None # Number of requests in this batch
@property
def tokens_per_second(self) -> float:
"""Calculate tokens per second for this request."""
return self.total_tokens / self.elapsed_time if self.elapsed_time > 0 else 0
@property
def content_tokens_per_second(self) -> float:
return self.content_tokens / self.elapsed_time if self.elapsed_time > 0 else 0
# ============================================================================
# LOGGING SETUP
# ============================================================================
def setup_logging(results_dir: Path, log_io: bool = False) -> Optional[logging.Logger]:
"""Setup logging with optional I/O logging."""
# Configure root logger for console output
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # Ensure console output
]
)
# logging.getLogger("httpx").setLevel(logging.DEBUG)
# logging.getLogger("httpcore").setLevel(logging.DEBUG)
if not log_io:
return None
# Setup separate I/O logger for detailed request/response logging
io_logger = logging.getLogger('io_logger')
io_logger.handlers.clear()
io_log_path = results_dir / 'benchmark_io.log'
io_handler = logging.FileHandler(io_log_path)
io_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
io_logger.addHandler(io_handler)
io_logger.setLevel(logging.INFO)
io_logger.propagate = False
logging.info(f"I/O logging enabled: {io_log_path}")
return io_logger
# ============================================================================
# PROMPT GENERATION
# ============================================================================
def generate_prompt(target_tokens: int) -> str:
"""
Generate a prompt with approximately the target number of tokens.
Parameters:
target_tokens: Target number of tokens for the generated prompt
Returns:
Generated prompt string
"""
chars_needed = target_tokens * TOKEN_TO_CHAR_RATIO
base_text = (
"You are an AI assistant analyzing complex systems. Consider the following "
"context about modern computing, AI architectures, distributed systems, "
"cloud infrastructure, ML model deployment, data pipelines, scalability, "
"microservices, containerization, Kubernetes, monitoring, observability, "
"performance optimization, security, cost optimization, and emerging AI/ML trends. "
)
base_len = len(base_text)
full_reps = chars_needed // base_len
remainder = chars_needed % base_len
expanded_text = base_text * full_reps + base_text[:remainder]
question = "\n\nBased on the context above, provide a comprehensive analysis."
return expanded_text + question
def load_conversation_dataset(dataset_path: str) -> dict:
"""Load pre-generated conversation dataset from JSON file."""
try:
with open(dataset_path, 'r', encoding='utf-8') as f:
dataset = json.load(f)
logging.info(f"Loaded conversation dataset from {dataset_path}")
# Log bucket information
for bucket, conversations in dataset.items():
logging.info(f" Bucket {bucket}: {len(conversations)} conversations")
return dataset
except FileNotFoundError:
logging.error(f"Dataset file not found: {dataset_path}")
logging.error("Generate dataset first using: python create_test_dataset.py")
raise
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in dataset file: {e}")
raise
def get_conversation_for_tokens(
dataset: dict,
target_tokens: int,
request_index: int
) -> list[dict]:
"""
Get a conversation from the dataset for the given token count.
Uses request_index to cycle through available conversations.
"""
bucket_key = str(target_tokens)
if bucket_key not in dataset:
# Find closest bucket
available_buckets = [int(k) for k in dataset.keys()]
closest_bucket = min(available_buckets, key=lambda x: abs(x - target_tokens))
bucket_key = str(closest_bucket)
logging.debug(f"No exact bucket for {target_tokens} tokens, using {closest_bucket}")
conversations = dataset[bucket_key]
if not conversations:
raise ValueError(f"No conversations available for bucket {bucket_key}")
# Cycle through conversations using request index
conversation_idx = request_index % len(conversations)
conversation = conversations[conversation_idx]
messages = list(conversation["messages"])
# Ensure the conversation ends with a user message so the API can generate a response
if messages and messages[-1].get("role") == "assistant":
messages.append({"role": "user", "content": "Please continue."})
return messages
# ============================================================================
# REQUEST PROCESSING
# ============================================================================
async def process_stream(stream, log_io: bool, io_logger, request_id: int) -> Tuple:
"""Process streaming response and capture metrics."""
first_token_time = None
output_text = ""
reasoning_text = ""
completion_tokens = 0
prompt_tokens = 0
content_tokens = 0
reasoning_tokens = 0
try:
async for chunk in stream:
if hasattr(chunk, 'choices') and chunk.choices:
delta = chunk.choices[0].delta
if hasattr(delta, 'reasoning_content') and delta.reasoning_content:
if log_io:
reasoning_text += delta.reasoning_content
if hasattr(delta, 'content') and delta.content:
if first_token_time is None:
first_token_time = time.time()
output_text += delta.content
# Get token counts from API
if hasattr(chunk, 'usage') and chunk.usage:
if hasattr(chunk.usage, 'completion_tokens'):
completion_tokens = chunk.usage.completion_tokens
if hasattr(chunk.usage, 'prompt_tokens'):
prompt_tokens = chunk.usage.prompt_tokens
if hasattr(chunk.usage, 'completion_tokens_details') and chunk.usage.completion_tokens_details:
details = chunk.usage.completion_tokens_details
if hasattr(details, 'reasoning_tokens'):
reasoning_tokens = details.reasoning_tokens
if hasattr(details, 'content_tokens'):
content_tokens = details.content_tokens
except Exception as e:
logging.error(f"Error processing stream: {e}")
raise
# Fallback calculation
if content_tokens == 0 and reasoning_tokens == 0 and completion_tokens > 0:
content_tokens = completion_tokens
elif reasoning_tokens == 0 and content_tokens > 0 and content_tokens < completion_tokens:
reasoning_tokens = completion_tokens - content_tokens
elif content_tokens == 0 and reasoning_tokens > 0 and completion_tokens > reasoning_tokens:
content_tokens = completion_tokens - reasoning_tokens
return (
first_token_time,
completion_tokens,
prompt_tokens,
content_tokens,
reasoning_tokens,
output_text,
reasoning_text
)
async def make_request(
client: AsyncOpenAI,
config: BenchmarkConfig,
model_name: str,
request_timeout: int,
log_io: bool = False,
io_logger: Optional[logging.Logger] = None,
request_id: Optional[int] = None,
dataset: Optional[dict] = None,
text_content: Optional[str] = None,
stream: bool = True
) -> Optional[RequestResult]:
"""Make a single request to the model."""
start_time = time.time()
# Determine input source priority: text_content > dataset > generated
if text_content is not None:
# Use provided text directly (ignores input_tokens, uses actual text length)
messages = [{'role': 'user', 'content': text_content}]
elif dataset is not None:
try:
messages = get_conversation_for_tokens(dataset, config.input_tokens, request_id)
except (ValueError, KeyError) as e:
logging.warning(f"Failed to get conversation from dataset: {e}. Falling back to generated prompt.")
prompt = generate_prompt(config.input_tokens)
messages = [{'role': 'user', 'content': prompt}]
else:
# Generate synthetic prompt
prompt = generate_prompt(config.input_tokens)
messages = [{'role': 'user', 'content': prompt}]
if log_io and io_logger:
io_logger.info(f"\n{'='*80}")
io_logger.info(f"REQUEST {request_id} - Target: {config.input_tokens} tokens")
io_logger.info(f"Model: {model_name}, Batch size: {config.batch_size}")
if text_content:
io_logger.info(f"Source: Custom text")
elif dataset:
io_logger.info(f"Source: Conversation dataset")
else:
io_logger.info(f"Source: Generated prompt")
io_logger.info(f"{'='*80}")
io_logger.info(f"MESSAGES:\n{json.dumps(messages, indent=2)}")
io_logger.info(f"{'-'*80}")
try:
stream = False
print(f"Streaming: {stream}")
if stream:
stream_obj = await client.chat.completions.create(
model=model_name,
messages=messages,
max_tokens=config.output_tokens,
stream=True,
stream_options={"include_usage": True}
)
result = await asyncio.wait_for(
process_stream(stream_obj, log_io, io_logger, request_id),
timeout=request_timeout
)
(
first_token_time,
completion_tokens,
prompt_tokens,
content_tokens,
reasoning_tokens,
output_text,
reasoning_text
) = result
#logging.warning(f"[STREAM RESPONSE] {output_text}")
end_time = time.time()
elapsed_time = end_time - start_time
ttft = first_token_time - start_time if first_token_time else elapsed_time
else:
response = await asyncio.wait_for(
client.chat.completions.create(
model=model_name,
messages=messages,
max_tokens=config.output_tokens,
stream=False
),
timeout=request_timeout
)
# logging.warning(f"Response object: {response}")
end_time = time.time()
elapsed_time = end_time - start_time
ttft = elapsed_time # No first-token event in non-streaming mode
completion_tokens = response.usage.completion_tokens if response.usage else 0
prompt_tokens = response.usage.prompt_tokens if response.usage else 0
content_tokens = completion_tokens
reasoning_tokens = 0
output_text = response.choices[0].message.content or "" if response.choices else ""
reasoning_text = ""
if hasattr(response.usage, 'completion_tokens_details') and response.usage.completion_tokens_details:
details = response.usage.completion_tokens_details
if hasattr(details, 'reasoning_tokens') and details.reasoning_tokens:
reasoning_tokens = details.reasoning_tokens
content_tokens = completion_tokens - reasoning_tokens
if log_io and io_logger:
if reasoning_text:
io_logger.info(f"REASONING ({reasoning_tokens} tokens):\n{reasoning_text}")
io_logger.info(f"{'-'*80}")
io_logger.info(f"CONTENT ({content_tokens} tokens):\n{output_text}")
io_logger.info(f"{'-'*80}")
io_logger.info(f"TTFT: {ttft:.3f}s | Latency: {elapsed_time:.3f}s")
io_logger.info(f"Throughput: {completion_tokens/elapsed_time:.2f} tok/s")
io_logger.info(f"Token accuracy: target={config.input_tokens}, actual={prompt_tokens}")
io_logger.info(f"{'='*80}\n")
# Validate token counts
if content_tokens == 0:
logging.warning("Request completed but got no content tokens")
return RequestResult(
total_tokens=completion_tokens,
content_tokens=content_tokens,
reasoning_tokens=reasoning_tokens,
elapsed_time=elapsed_time,
time_to_first_token=ttft,
prompt_tokens=prompt_tokens,
start_time=start_time,
end_time=end_time,
success=False,
error_message="No content tokens generated"
)
token_diff = abs(prompt_tokens - config.input_tokens)
if token_diff > TOKEN_VALIDATION_THRESHOLD:
logging.warning(
f"Token count difference: target={config.input_tokens}, "
f"actual={prompt_tokens}, diff={token_diff}"
)
return RequestResult(
total_tokens=completion_tokens,
content_tokens=content_tokens,
reasoning_tokens=reasoning_tokens,
elapsed_time=elapsed_time,
time_to_first_token=ttft,
prompt_tokens=prompt_tokens,
start_time=start_time,
end_time=end_time,
success=True
)
except asyncio.TimeoutError:
end_time = time.time()
logging.warning(f"Request {request_id} timed out after {request_timeout}s")
if log_io and io_logger:
io_logger.info(f"REQUEST {request_id} - TIMEOUT after {request_timeout}s\n")
return RequestResult(
total_tokens=0,
content_tokens=0,
reasoning_tokens=0,
elapsed_time=request_timeout,
time_to_first_token=request_timeout,
prompt_tokens=config.input_tokens,
start_time=start_time,
end_time=end_time,
success=False,
error_message=f"Timeout after {request_timeout}s"
)
except RateLimitError as e:
end_time = time.time()
elapsed = end_time - start_time
error_msg = f"429 Rate Limit: {str(e)}"
logging.warning(f"Request {request_id} got 429 (rate limited) after {elapsed:.3f}s")
if log_io and io_logger:
io_logger.info(f"REQUEST {request_id} - 429 RATE LIMITED after {elapsed:.3f}s\n")
return RequestResult(
total_tokens=0,
content_tokens=0,
reasoning_tokens=0,
elapsed_time=elapsed,
time_to_first_token=0,
prompt_tokens=config.input_tokens,
start_time=start_time,
end_time=end_time,
success=False,
error_message=error_msg
)
except Exception as e:
end_time = time.time()
elapsed = end_time - start_time
error_msg = f"{type(e).__name__}: {str(e)}"
logging.error(f"Request {request_id} error after {elapsed:.3f}s: {error_msg}", exc_info=True)
if log_io and io_logger:
io_logger.info(f"REQUEST {request_id} - ERROR after {elapsed:.3f}s: {error_msg}\n")
return RequestResult(
total_tokens=0,
content_tokens=0,
reasoning_tokens=0,
elapsed_time=elapsed,
time_to_first_token=0,
prompt_tokens=config.input_tokens,
start_time=start_time,
end_time=end_time,
success=False,
error_message=error_msg
)
async def make_batch_request(
client: AsyncOpenAI,
config: BenchmarkConfig,
model_name: str,
request_timeout: int,
batch_size: int,
batch_id: int,
log_io: bool = False,
io_logger: Optional[logging.Logger] = None,
dataset: Optional[dict] = None,
text_content: Optional[str] = None,
stream: bool = True
) -> List[RequestResult]:
"""
Make a batch of requests simultaneously and return results.
This sends multiple independent requests at the exact same time
and measures their collective performance.
"""
batch_start_time = time.time()
if log_io and io_logger:
io_logger.info(f"\n{'#'*80}")
io_logger.info(f"BATCH {batch_id} - Sending {batch_size} requests")
io_logger.info(f"Model: {model_name}, Input: {config.input_tokens} tokens")
io_logger.info(f"{'#'*80}")
# Create all requests simultaneously
tasks = []
for i in range(batch_size):
request_id = batch_id * batch_size + i
task = make_request(
client=client,
config=config,
model_name=model_name,
request_timeout=request_timeout,
log_io=log_io,
io_logger=io_logger,
request_id=request_id,
dataset=dataset,
text_content=text_content,
stream=stream
)
tasks.append(task)
# Execute all requests simultaneously
results = await asyncio.gather(*tasks)
batch_end_time = time.time()
batch_elapsed = batch_end_time - batch_start_time
# Add batch metadata to results
enhanced_results = []
for result in results:
if result:
result.batch_id = batch_id
result.requests_in_batch = batch_size
enhanced_results.append(result)
successful = sum(1 for r in enhanced_results if r.success)
total_tokens = sum(r.total_tokens for r in enhanced_results if r.success)
batch_throughput = total_tokens / batch_elapsed if batch_elapsed > 0 else 0
if log_io and io_logger:
io_logger.info(f"\n{'#'*80}")
io_logger.info(f"BATCH {batch_id} COMPLETE")
io_logger.info(f" Duration: {batch_elapsed:.3f}s")
io_logger.info(f" Successful: {successful}/{batch_size}")
io_logger.info(f" Batch Throughput: {batch_throughput:.2f} tokens/s")
io_logger.info(f"{'#'*80}\n")
logging.info(
f"Batch {batch_id}: {successful}/{batch_size} successful, "
f"{batch_elapsed:.2f}s, {batch_throughput:.2f} tok/s"
)
return enhanced_results
# ============================================================================
# STATISTICS CALCULATION
# ============================================================================
def calculate_statistics(results: List[RequestResult], config: BenchmarkConfig) -> Dict:
"""Calculate aggregate statistics from benchmark results."""
import numpy as np
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
if not successful:
return {'success_rate': 0, 'error': 'No successful requests'}
success_rate = len(successful) / len(results) * 100
# Latency stats
latencies = [r.elapsed_time for r in successful]
avg_lat = float(np.mean(latencies))
std_lat = float(np.std(latencies))
margin = CONFIDENCE_95_Z_SCORE * std_lat / np.sqrt(len(successful))
# TTFT stats
ttft_values = [r.time_to_first_token for r in successful if r.time_to_first_token]
# Throughput calculation
actual_start = min(r.start_time for r in successful)
actual_end = max(r.end_time for r in successful)
wall_time = actual_end - actual_start
total_output_tokens = sum(r.total_tokens for r in successful)
total_content_tokens = sum(r.content_tokens for r in successful)
concurrent_throughput = total_output_tokens / wall_time if wall_time > 0 else 0
content_throughput = total_content_tokens / wall_time if wall_time > 0 else 0
# Efficiency
per_request_throughputs = [r.tokens_per_second for r in successful]
avg_per_request = float(np.mean(per_request_throughputs))
theoretical_max = config.batch_size * avg_per_request
efficiency = min((concurrent_throughput / theoretical_max * 100) if theoretical_max > 0 else 0, 100)
# Batch metrics
batch_metrics = None
batch_ids = [r.batch_id for r in successful if r.batch_id is not None]
if batch_ids:
unique_batches = set(batch_ids)
batch_sizes = {}
batch_throughputs = {}
for batch_id in unique_batches:
batch_results = [r for r in successful if r.batch_id == batch_id]
if batch_results:
batch_start = min(r.start_time for r in batch_results)
batch_end = max(r.end_time for r in batch_results)
batch_time = batch_end - batch_start
batch_tokens = sum(r.total_tokens for r in batch_results)
batch_sizes[batch_id] = len(batch_results)
batch_throughputs[batch_id] = batch_tokens / batch_time if batch_time > 0 else 0
throughput_values = list(batch_throughputs.values())
batch_metrics = {
'num_batches': len(unique_batches),
'avg_batch_size': round(float(np.mean(list(batch_sizes.values()))), 2),
'avg_batch_throughput': round(float(np.mean(throughput_values)), 2),
'min_batch_throughput': round(float(np.min(throughput_values)), 2),
'max_batch_throughput': round(float(np.max(throughput_values)), 2),
}
stats_dict = {
'config': {
'input_tokens': config.input_tokens,
'output_tokens': config.output_tokens,
'batch_size': config.batch_size,
'num_batches': config.num_batches,
'total_requests': len(results),
'actual_input_tokens': round(float(np.mean([r.prompt_tokens for r in successful])))
},
'success_metrics': {
'success_rate': round(success_rate, 2),
'successful_requests': len(successful),
'failed_requests': len(failed)
},
'latency': {
'mean': round(avg_lat, 3),
'std': round(std_lat, 3),
'min': round(float(np.min(latencies)), 3),
'max': round(float(np.max(latencies)), 3),
'p50': round(float(np.percentile(latencies, 50)), 3),
'p95': round(float(np.percentile(latencies, 95)), 3),
'p99': round(float(np.percentile(latencies, 99)), 3),
'ci_95_lower': round(avg_lat - margin, 3),
'ci_95_upper': round(avg_lat + margin, 3)
},
'ttft': {
'mean': round(float(np.mean(ttft_values)), 3) if ttft_values else None,
'std': round(float(np.std(ttft_values)), 3) if ttft_values else None,
'p50': round(float(np.percentile(ttft_values, 50)), 3) if ttft_values else None,
'p90': round(float(np.percentile(ttft_values, 90)), 3) if ttft_values else None
},
'tokens': {
'total_generated': total_output_tokens,
'content_tokens': total_content_tokens,
'reasoning_tokens': sum(r.reasoning_tokens for r in successful),
'avg_per_request': round(total_output_tokens / len(successful), 2)
},
'throughput': {
'concurrent_total_tps': round(concurrent_throughput, 2),
'concurrent_content_tps': round(content_throughput, 2),
'requests_per_second': round(len(successful) / wall_time, 2) if wall_time > 0 else 0,
'actual_wall_time': round(wall_time, 3),
'efficiency_percent': round(efficiency, 2)
}
}
# Add batch metrics if available
if batch_metrics:
stats_dict['batch_metrics'] = batch_metrics
return stats_dict
# ============================================================================
# HTTP CLIENT
# ============================================================================
def create_http_client(concurrent_requests: int, request_timeout: int) -> httpx.AsyncClient:
"""Create HTTP client configured for high concurrency."""
max_connections = max(concurrent_requests * CONNECTION_MULTIPLIER, MIN_HTTP_CONNECTIONS)
logging.info(f"Creating HTTP client with max_connections={max_connections}")
return httpx.AsyncClient(
# http2=True,
limits=httpx.Limits(
max_keepalive_connections=max_connections,
max_connections=max_connections,
keepalive_expiry=1800
),
timeout=httpx.Timeout(
timeout=request_timeout,
connect=1800.0,
read=request_timeout,
write=1800.0,
pool=5.0
)
)
# ============================================================================
# MODEL INITIALIZATION
# ============================================================================
async def wait_for_model(
endpoint_url: str,
api_key: str,
model_name: str,
max_retries: int = 10,
retry_delay: int = 30
) -> bool:
"""Wait for model to be ready."""
logging.info(f"Waiting for model '{model_name}' to initialize...")
http_client = create_http_client(1, 60)
client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0)
try:
for attempt in range(1, max_retries + 1):
try:
logging.info(f"Initialization check {attempt}/{max_retries}...")
await client.chat.completions.create(
model=model_name,
messages=[{'role': 'user', 'content': 'Hello'}],
max_tokens=10,
stream=False
)
logging.info("Model is ready!")
return True
except Exception as e:
error_msg = str(e).lower()
if any(kw in error_msg for kw in ['initializing', 'loading', 'starting', 'not ready', 'unavailable']):
if attempt < max_retries:
logging.info(f"Model initializing... waiting {retry_delay}s")
await asyncio.sleep(retry_delay)
else:
logging.error(f"Model failed to initialize after {max_retries} attempts")
return False
else:
logging.error(f"Error: {e}")
return False
finally:
await http_client.aclose()
return False
# ============================================================================
# BENCHMARK EXECUTION
# ============================================================================
async def run_benchmark(
config: BenchmarkConfig,
endpoint_url: str,
api_key: str,
model_name: str,
request_timeout: int,
log_io: bool = False,
io_logger: Optional[logging.Logger] = None,
dataset: Optional[dict] = None,
text_content: Optional[str] = None,
stream: bool = True
) -> Optional[Dict]:
"""Run a single benchmark configuration in batch mode."""
total_requests = config.num_batches * config.batch_size
logging.info(f"\n{'='*60}")
logging.info(f"Starting benchmark: {config.num_batches} batches"
f"{config.batch_size} requests/batch = {total_requests} total")
logging.info(f"Target: {config.input_tokens} tokens, Output: {config.output_tokens} tokens")
if text_content:
logging.info(f"Using custom text input")
elif dataset:
logging.info(f"Using conversation dataset")
logging.info(f"{'='*60}")
http_client = create_http_client(config.batch_size, request_timeout)
try:
client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0)
results = []
start_time = time.time()
# Send batches of requests
for batch_id in range(config.num_batches):
batch_start = time.time()
logging.info(f" Starting batch {batch_id + 1}/{config.num_batches} "
f"({config.batch_size} simultaneous requests)...")
batch_results = await make_batch_request(
client=client,
config=config,
model_name=model_name,
request_timeout=request_timeout,
batch_size=config.batch_size,
batch_id=batch_id,
log_io=log_io,
io_logger=io_logger,
dataset=dataset,
text_content=text_content,
stream=stream
)
results.extend(batch_results)
# Log batch completion with detailed metrics
batch_time = time.time() - batch_start
successful = sum(1 for r in batch_results if r.success)
failed = len(batch_results) - successful
batch_throughput = sum(r.total_tokens for r in batch_results if r.success) / batch_time if batch_time > 0 else 0
# Calculate average TTFT for this batch
ttfts = [r.time_to_first_token for r in batch_results if r.success and r.time_to_first_token]
avg_ttft = sum(ttfts) / len(ttfts) if ttfts else 0
# Calculate average latency for this batch
latencies = [r.elapsed_time for r in batch_results if r.success]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
status_icon = "" if failed == 0 else ""
logging.info(f"{status_icon} Batch {batch_id + 1}/{config.num_batches} complete: "
f"{successful}/{len(batch_results)} successful "
f"| {batch_time:.2f}s total "
f"| {batch_throughput:.0f} tok/s "
f"| TTFT: {avg_ttft:.3f}s "
f"| Latency: {avg_latency:.2f}s")
if failed > 0:
logging.warning(f" {failed} request(s) failed in batch {batch_id + 1}")
# Small delay between batches to avoid overwhelming the system
if batch_id < config.num_batches - 1:
await asyncio.sleep(0.1)
end_time = time.time()
if not results:
logging.error("No results collected!")
return None
stats = calculate_statistics(results, config)
logging.info(f"✓ Benchmark complete in {end_time - start_time:.2f}s")
if 'success_metrics' in stats:
logging.info(f" Success: {stats['success_metrics']['success_rate']}% "
f"({stats['success_metrics']['successful_requests']}/{total_requests})")
logging.info(f" P95 Latency: {stats['latency']['p95']}s")
logging.info(f" Throughput: {stats['throughput']['concurrent_total_tps']} tokens/s")
if 'batch_metrics' in stats:
logging.info(f" Avg Batch Throughput: {stats['batch_metrics']['avg_batch_throughput']} tokens/s")
else:
logging.error(f" All {total_requests} requests failed. Error: {stats.get('error', 'unknown')}")
return stats
finally:
await http_client.aclose()
async def run_all_benchmarks(
configs: List[BenchmarkConfig],
endpoint_url: str,
api_key: str,
model_name: str,
request_timeout: int,
delay_between_runs: int = 5,
log_io: bool = False,
io_logger: Optional[logging.Logger] = None,
wait_for_ready: bool = True,
max_init_retries: int = 10,
init_retry_delay: int = 30,
dataset: Optional[dict] = None,
text_content: Optional[str] = None,
stream: bool = True
) -> List[Dict]:
"""Run all benchmark configurations."""
if wait_for_ready:
if not await wait_for_model(endpoint_url, api_key, model_name, max_init_retries, init_retry_delay):
logging.error("Model initialization failed. Aborting.")
return []
all_results = []
for i, config in enumerate(configs, 1):
logging.info(f"\n--- Benchmark {i}/{len(configs)} ---")
result = await run_benchmark(
config, endpoint_url, api_key, model_name, request_timeout,
log_io, io_logger, dataset, text_content, stream
)
if result:
all_results.append(result)
else:
logging.error(f"Benchmark {i} failed")
if i < len(configs):
logging.info(f"Waiting {delay_between_runs}s before next run...")
await asyncio.sleep(delay_between_runs)
# Log summary
if all_results:
successful = len(all_results)
total = len(configs)
logging.info(f"\n{'='*60}")
logging.info(f"ALL BENCHMARKS COMPLETE")
logging.info(f"{'='*60}")
logging.info(f"Completed: {successful}/{total} benchmarks")
# Overall stats
total_requests = sum(r['config']['total_requests'] for r in all_results)
total_successful = sum(r['success_metrics']['successful_requests'] for r in all_results)
overall_success_rate = (total_successful / total_requests * 100) if total_requests > 0 else 0
logging.info(f"Total requests: {total_requests}")
logging.info(f"Successful: {total_successful} ({overall_success_rate:.1f}%)")
logging.info(f"{'='*60}")
return all_results
# ============================================================================
# FILE I/O
# ============================================================================
def create_results_directory(model_name: str) -> Tuple[Path, str]:
"""Create results directory."""
if 'ubiops-deployment/' in model_name:
parts = model_name.split('/')
if len(parts) >= 4:
model_name = parts[-1]
safe_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in model_name)
results_dir = Path('results') / f'results_{safe_name}'
results_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Results directory: {results_dir}")
return results_dir, safe_name
def save_results(results: List[Dict], results_dir: Path, model_name: str) -> Path:
"""Save benchmark results to JSON."""
output_path = results_dir / 'benchmark_results.json'
output_data = {
'timestamp': datetime.now().isoformat(),
'model_name': model_name,
'results': results
}
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
logging.info(f"Results saved: {output_path}")
return output_path
def load_config_file(config_path: str) -> Dict:
"""Load configuration from YAML file."""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def save_config_copy(config_data: Dict, results_dir: Path) -> None:
"""Save sanitized config copy."""
if 'endpoint' in config_data and 'api_key' in config_data['endpoint']:
config_data['endpoint']['api_key'] = '<REDACTED>'
config_path = results_dir / 'config_used.yaml'
with open(config_path, 'w') as f:
yaml.dump(config_data, f, default_flow_style=False, sort_keys=False)
logging.info(f"Config saved: {config_path}")
# ============================================================================
# CLI INTERFACE
# ============================================================================
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Benchmark LLM deployments via OpenAI-compatible endpoints",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Using config file
python benchmark_llm.py --config benchmark_config.yaml
# Using CLI arguments
python benchmark_llm.py --endpoint_url https://api.example.com/v1 \\
--api_key YOUR_KEY \\
--model_name gpt-4 \\
--input_tokens 1000 5000 10000 \\
--concurrent_requests 1 8 16 32
"""
)
parser.add_argument('--config', type=str, help="Path to YAML config file")
parser.add_argument('--endpoint_url', type=str, help="OpenAI-compatible endpoint URL")
parser.add_argument('--api_key', type=str, help="API key")
parser.add_argument('--model_name', type=str, help="Model name")
parser.add_argument('--input_tokens', type=int, nargs='+', default=[1000, 5000, 10000],
help="Input token counts to test")
parser.add_argument('--batch_sizes', type=int, nargs='+', default=[16, 32, 64, 128],
help="Batch sizes to test (number of simultaneous requests)")
parser.add_argument('--num_batches', type=int, default=10,
help="Number of batches to send per configuration")
parser.add_argument('--output_tokens', type=int, default=256,
help="Output tokens per request")
parser.add_argument('--dataset', type=str, default=None,
help="Path to conversation dataset JSON file (optional)")
parser.add_argument('--text', type=str, default=None,
help="Custom text to use as input for all requests (same text repeated)")
parser.add_argument('--request_timeout', type=int, default=900,
help="Request timeout (seconds)")
parser.add_argument('--delay_between_runs', type=int, default=5,
help="Delay between runs (seconds)")
parser.add_argument('--no_stream', action='store_true',
help="Use non-streaming requests instead of streaming")
parser.add_argument('--log_io', action='store_true',
help="Log all input/output")
parser.add_argument('--skip_init_wait', action='store_true',
help="Skip model initialization wait")
parser.add_argument('--max_init_retries', type=int, default=10,
help="Max initialization retries")
parser.add_argument('--init_retry_delay', type=int, default=30,
help="Delay between init retries")
return parser.parse_args()
def generate_configs(
input_tokens: List[int],
batch_sizes: List[int],
num_batches: int,
output_tokens: int
) -> List[BenchmarkConfig]:
"""Generate benchmark configurations."""
configs = []
for input_tok in input_tokens:
for batch_size in batch_sizes:
configs.append(BenchmarkConfig(
input_tokens=input_tok,
batch_size=batch_size,
num_batches=num_batches,
output_tokens=output_tokens
))
return configs
# ============================================================================
# MAIN
# ============================================================================
async def main():
"""Main entry point."""
args = parse_args()
# Load configuration
if args.config:
config_data = load_config_file(args.config)
endpoint_url = config_data['endpoint']['url']
api_key = config_data['endpoint']['api_key']
model_name = config_data['endpoint']['model_name']
bench_config = config_data['benchmark']
input_tokens = bench_config['input_tokens']
batch_sizes = bench_config['batch_sizes']
num_batches = bench_config['num_batches']
output_tokens = bench_config['output_tokens']
dataset_path = bench_config.get('dataset', None)
custom_text = bench_config.get('text', None)
runtime = config_data.get('runtime', {})
request_timeout = runtime.get('request_timeout', 900)
delay_between_runs = runtime.get('delay_between_runs', 5)
log_io = runtime.get('log_io', False)
stream = runtime.get('stream', True)
wait_for_ready = runtime.get('wait_for_ready', True)
max_init_retries = runtime.get('max_init_retries', 10)
init_retry_delay = runtime.get('init_retry_delay', 30)
else:
# Use CLI arguments
if not all([args.endpoint_url, args.api_key, args.model_name]):
logging.error("Must provide --config or all of --endpoint_url, --api_key, --model_name")
return
config_data = None
endpoint_url = args.endpoint_url
api_key = args.api_key
model_name = args.model_name
input_tokens = args.input_tokens
batch_sizes = args.batch_sizes
num_batches = args.num_batches
output_tokens = args.output_tokens
dataset_path = args.dataset
custom_text = args.text
request_timeout = args.request_timeout
delay_between_runs = args.delay_between_runs
log_io = args.log_io
stream = not args.no_stream
wait_for_ready = not args.skip_init_wait
max_init_retries = args.max_init_retries
init_retry_delay = args.init_retry_delay
# Load conversation dataset if provided
dataset = None
if dataset_path:
try:
dataset = load_conversation_dataset(dataset_path)
except Exception as e:
logging.error(f"Failed to load dataset: {e}")
logging.error("Continuing with generated prompts...")
# Use custom text if provided (takes priority over everything)
text_content = custom_text
# Create results directory
results_dir, safe_name = create_results_directory(model_name)
io_logger = setup_logging(results_dir, log_io)
if config_data:
save_config_copy(config_data, results_dir)
# Generate benchmark configurations
configs = generate_configs(input_tokens, batch_sizes, num_batches, output_tokens)
logging.info(f"\n{'='*60}")
logging.info(f"BENCHMARK CONFIGURATION")
logging.info(f"{'='*60}")
logging.info(f"Model: {model_name}")
logging.info(f"Endpoint: {endpoint_url}")
logging.info(f"Mode: BATCH ({'streaming' if stream else 'non-streaming'})")
if text_content:
logging.info(f"Input: Custom text ({len(text_content)} chars)")
elif dataset:
logging.info(f"Input: {dataset_path} (real conversations)")
else:
logging.info(f"Input: Generated prompts")
logging.info(f"Total configurations: {len(configs)}")
logging.info(f"Input tokens: {input_tokens}")
logging.info(f"Batch sizes: {batch_sizes}")
logging.info(f"Batches per config: {num_batches}")
logging.info(f"Output tokens: {output_tokens}")
logging.info(f"{'='*60}\n")
# Run benchmarks
results = await run_all_benchmarks(
configs=configs,
endpoint_url=endpoint_url,
api_key=api_key,
model_name=model_name,
request_timeout=request_timeout,
delay_between_runs=delay_between_runs,
log_io=log_io,
io_logger=io_logger,
wait_for_ready=wait_for_ready,
max_init_retries=max_init_retries,
init_retry_delay=init_retry_delay,
dataset=dataset,
text_content=text_content,
stream=stream
)
if results:
results_path = save_results(results, results_dir, model_name)
logging.info(f"\n{'='*60}")
logging.info("BENCHMARK COMPLETE!")
logging.info(f"{'='*60}")
logging.info(f"Results: {results_path}")
if log_io:
logging.info(f"I/O log: {results_dir / 'benchmark_io.log'}")
logging.info(f"\nVisualize: python visualize_results.py --input {results_path}")
else:
logging.error("No results generated")
if __name__ == '__main__':
asyncio.run(main())