1194 lines
45 KiB
Python
1194 lines
45 KiB
Python
|
|
"""
|
|
LLM Benchmarking Tool
|
|
|
|
Benchmarks LLM deployments via OpenAI-compatible endpoints with async concurrency.
|
|
Supports YAML config files and CLI arguments.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
import argparse
|
|
import json
|
|
import yaml
|
|
import logging
|
|
import httpx
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, asdict
|
|
from typing import List, Dict, Optional, Tuple
|
|
from openai import AsyncOpenAI, RateLimitError
|
|
|
|
# ============================================================================
|
|
# CONSTANTS
|
|
# ============================================================================
|
|
|
|
TOKEN_TO_CHAR_RATIO = 7
|
|
CONFIDENCE_95_Z_SCORE = 1.96
|
|
MIN_HTTP_CONNECTIONS = 200
|
|
CONNECTION_MULTIPLIER = 2
|
|
TOKEN_VALIDATION_THRESHOLD = 100
|
|
SHUTDOWN_SENTINEL = object()
|
|
|
|
# ============================================================================
|
|
# DATA MODELS
|
|
# ============================================================================
|
|
|
|
@dataclass
|
|
class BenchmarkConfig:
|
|
"""Configuration for a single benchmark run."""
|
|
input_tokens: int
|
|
batch_size: int
|
|
num_batches: int
|
|
output_tokens: int
|
|
|
|
|
|
@dataclass
|
|
class RequestResult:
|
|
"""Results from a single request."""
|
|
total_tokens: int
|
|
content_tokens: int
|
|
reasoning_tokens: int
|
|
elapsed_time: float
|
|
time_to_first_token: float
|
|
prompt_tokens: int # Actual input tokens from API response
|
|
start_time: float
|
|
end_time: float
|
|
success: bool = True
|
|
error_message: Optional[str] = None
|
|
batch_id: Optional[int] = None # Batch identifier if using batching
|
|
requests_in_batch: Optional[int] = None # Number of requests in this batch
|
|
|
|
@property
|
|
def tokens_per_second(self) -> float:
|
|
"""Calculate tokens per second for this request."""
|
|
return self.total_tokens / self.elapsed_time if self.elapsed_time > 0 else 0
|
|
|
|
@property
|
|
def content_tokens_per_second(self) -> float:
|
|
return self.content_tokens / self.elapsed_time if self.elapsed_time > 0 else 0
|
|
|
|
|
|
# ============================================================================
|
|
# LOGGING SETUP
|
|
# ============================================================================
|
|
|
|
def setup_logging(results_dir: Path, log_io: bool = False) -> Optional[logging.Logger]:
|
|
"""Setup logging with optional I/O logging."""
|
|
# Configure root logger for console output
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler() # Ensure console output
|
|
]
|
|
)
|
|
# logging.getLogger("httpx").setLevel(logging.DEBUG)
|
|
# logging.getLogger("httpcore").setLevel(logging.DEBUG)
|
|
|
|
if not log_io:
|
|
return None
|
|
|
|
# Setup separate I/O logger for detailed request/response logging
|
|
io_logger = logging.getLogger('io_logger')
|
|
io_logger.handlers.clear()
|
|
|
|
io_log_path = results_dir / 'benchmark_io.log'
|
|
io_handler = logging.FileHandler(io_log_path)
|
|
io_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
|
|
io_logger.addHandler(io_handler)
|
|
io_logger.setLevel(logging.INFO)
|
|
io_logger.propagate = False
|
|
|
|
logging.info(f"I/O logging enabled: {io_log_path}")
|
|
|
|
return io_logger
|
|
|
|
|
|
# ============================================================================
|
|
# PROMPT GENERATION
|
|
# ============================================================================
|
|
|
|
def generate_prompt(target_tokens: int) -> str:
|
|
"""
|
|
Generate a prompt with approximately the target number of tokens.
|
|
|
|
Parameters:
|
|
target_tokens: Target number of tokens for the generated prompt
|
|
|
|
Returns:
|
|
Generated prompt string
|
|
"""
|
|
chars_needed = target_tokens * TOKEN_TO_CHAR_RATIO
|
|
|
|
base_text = (
|
|
"You are an AI assistant analyzing complex systems. Consider the following "
|
|
"context about modern computing, AI architectures, distributed systems, "
|
|
"cloud infrastructure, ML model deployment, data pipelines, scalability, "
|
|
"microservices, containerization, Kubernetes, monitoring, observability, "
|
|
"performance optimization, security, cost optimization, and emerging AI/ML trends. "
|
|
)
|
|
|
|
base_len = len(base_text)
|
|
full_reps = chars_needed // base_len
|
|
remainder = chars_needed % base_len
|
|
|
|
expanded_text = base_text * full_reps + base_text[:remainder]
|
|
question = "\n\nBased on the context above, provide a comprehensive analysis."
|
|
|
|
return expanded_text + question
|
|
|
|
|
|
def load_conversation_dataset(dataset_path: str) -> dict:
|
|
"""Load pre-generated conversation dataset from JSON file."""
|
|
try:
|
|
with open(dataset_path, 'r', encoding='utf-8') as f:
|
|
dataset = json.load(f)
|
|
|
|
logging.info(f"Loaded conversation dataset from {dataset_path}")
|
|
|
|
# Log bucket information
|
|
for bucket, conversations in dataset.items():
|
|
logging.info(f" Bucket {bucket}: {len(conversations)} conversations")
|
|
|
|
return dataset
|
|
except FileNotFoundError:
|
|
logging.error(f"Dataset file not found: {dataset_path}")
|
|
logging.error("Generate dataset first using: python create_test_dataset.py")
|
|
raise
|
|
except json.JSONDecodeError as e:
|
|
logging.error(f"Invalid JSON in dataset file: {e}")
|
|
raise
|
|
|
|
|
|
def get_conversation_for_tokens(
|
|
dataset: dict,
|
|
target_tokens: int,
|
|
request_index: int
|
|
) -> list[dict]:
|
|
"""
|
|
Get a conversation from the dataset for the given token count.
|
|
|
|
Uses request_index to cycle through available conversations.
|
|
"""
|
|
bucket_key = str(target_tokens)
|
|
|
|
if bucket_key not in dataset:
|
|
# Find closest bucket
|
|
available_buckets = [int(k) for k in dataset.keys()]
|
|
closest_bucket = min(available_buckets, key=lambda x: abs(x - target_tokens))
|
|
bucket_key = str(closest_bucket)
|
|
logging.debug(f"No exact bucket for {target_tokens} tokens, using {closest_bucket}")
|
|
|
|
conversations = dataset[bucket_key]
|
|
|
|
if not conversations:
|
|
raise ValueError(f"No conversations available for bucket {bucket_key}")
|
|
|
|
# Cycle through conversations using request index
|
|
conversation_idx = request_index % len(conversations)
|
|
conversation = conversations[conversation_idx]
|
|
|
|
messages = list(conversation["messages"])
|
|
|
|
# Ensure the conversation ends with a user message so the API can generate a response
|
|
if messages and messages[-1].get("role") == "assistant":
|
|
messages.append({"role": "user", "content": "Please continue."})
|
|
|
|
return messages
|
|
|
|
|
|
# ============================================================================
|
|
# REQUEST PROCESSING
|
|
# ============================================================================
|
|
|
|
async def process_stream(stream, log_io: bool, io_logger, request_id: int) -> Tuple:
|
|
"""Process streaming response and capture metrics."""
|
|
first_token_time = None
|
|
output_text = ""
|
|
reasoning_text = ""
|
|
|
|
completion_tokens = 0
|
|
prompt_tokens = 0
|
|
content_tokens = 0
|
|
reasoning_tokens = 0
|
|
|
|
try:
|
|
async for chunk in stream:
|
|
if hasattr(chunk, 'choices') and chunk.choices:
|
|
delta = chunk.choices[0].delta
|
|
|
|
if hasattr(delta, 'reasoning_content') and delta.reasoning_content:
|
|
if log_io:
|
|
reasoning_text += delta.reasoning_content
|
|
|
|
if hasattr(delta, 'content') and delta.content:
|
|
if first_token_time is None:
|
|
first_token_time = time.time()
|
|
output_text += delta.content
|
|
|
|
# Get token counts from API
|
|
if hasattr(chunk, 'usage') and chunk.usage:
|
|
if hasattr(chunk.usage, 'completion_tokens'):
|
|
completion_tokens = chunk.usage.completion_tokens
|
|
if hasattr(chunk.usage, 'prompt_tokens'):
|
|
prompt_tokens = chunk.usage.prompt_tokens
|
|
|
|
if hasattr(chunk.usage, 'completion_tokens_details') and chunk.usage.completion_tokens_details:
|
|
details = chunk.usage.completion_tokens_details
|
|
if hasattr(details, 'reasoning_tokens'):
|
|
reasoning_tokens = details.reasoning_tokens
|
|
if hasattr(details, 'content_tokens'):
|
|
content_tokens = details.content_tokens
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing stream: {e}")
|
|
raise
|
|
|
|
# Fallback calculation
|
|
if content_tokens == 0 and reasoning_tokens == 0 and completion_tokens > 0:
|
|
content_tokens = completion_tokens
|
|
elif reasoning_tokens == 0 and content_tokens > 0 and content_tokens < completion_tokens:
|
|
reasoning_tokens = completion_tokens - content_tokens
|
|
elif content_tokens == 0 and reasoning_tokens > 0 and completion_tokens > reasoning_tokens:
|
|
content_tokens = completion_tokens - reasoning_tokens
|
|
|
|
return (
|
|
first_token_time,
|
|
completion_tokens,
|
|
prompt_tokens,
|
|
content_tokens,
|
|
reasoning_tokens,
|
|
output_text,
|
|
reasoning_text
|
|
)
|
|
|
|
|
|
async def make_request(
|
|
client: AsyncOpenAI,
|
|
config: BenchmarkConfig,
|
|
model_name: str,
|
|
request_timeout: int,
|
|
log_io: bool = False,
|
|
io_logger: Optional[logging.Logger] = None,
|
|
request_id: Optional[int] = None,
|
|
dataset: Optional[dict] = None,
|
|
text_content: Optional[str] = None,
|
|
stream: bool = True
|
|
) -> Optional[RequestResult]:
|
|
"""Make a single request to the model."""
|
|
start_time = time.time()
|
|
|
|
# Determine input source priority: text_content > dataset > generated
|
|
if text_content is not None:
|
|
# Use provided text directly (ignores input_tokens, uses actual text length)
|
|
messages = [{'role': 'user', 'content': text_content}]
|
|
elif dataset is not None:
|
|
try:
|
|
messages = get_conversation_for_tokens(dataset, config.input_tokens, request_id)
|
|
except (ValueError, KeyError) as e:
|
|
logging.warning(f"Failed to get conversation from dataset: {e}. Falling back to generated prompt.")
|
|
prompt = generate_prompt(config.input_tokens)
|
|
messages = [{'role': 'user', 'content': prompt}]
|
|
else:
|
|
# Generate synthetic prompt
|
|
prompt = generate_prompt(config.input_tokens)
|
|
messages = [{'role': 'user', 'content': prompt}]
|
|
|
|
if log_io and io_logger:
|
|
io_logger.info(f"\n{'='*80}")
|
|
io_logger.info(f"REQUEST {request_id} - Target: {config.input_tokens} tokens")
|
|
io_logger.info(f"Model: {model_name}, Batch size: {config.batch_size}")
|
|
if text_content:
|
|
io_logger.info(f"Source: Custom text")
|
|
elif dataset:
|
|
io_logger.info(f"Source: Conversation dataset")
|
|
else:
|
|
io_logger.info(f"Source: Generated prompt")
|
|
io_logger.info(f"{'='*80}")
|
|
io_logger.info(f"MESSAGES:\n{json.dumps(messages, indent=2)}")
|
|
io_logger.info(f"{'-'*80}")
|
|
|
|
try:
|
|
stream = False
|
|
print(f"Streaming: {stream}")
|
|
|
|
if stream:
|
|
stream_obj = await client.chat.completions.create(
|
|
model=model_name,
|
|
messages=messages,
|
|
max_tokens=config.output_tokens,
|
|
stream=True,
|
|
stream_options={"include_usage": True}
|
|
)
|
|
|
|
result = await asyncio.wait_for(
|
|
process_stream(stream_obj, log_io, io_logger, request_id),
|
|
timeout=request_timeout
|
|
)
|
|
|
|
(
|
|
first_token_time,
|
|
completion_tokens,
|
|
prompt_tokens,
|
|
content_tokens,
|
|
reasoning_tokens,
|
|
output_text,
|
|
reasoning_text
|
|
) = result
|
|
|
|
#logging.warning(f"[STREAM RESPONSE] {output_text}")
|
|
|
|
end_time = time.time()
|
|
elapsed_time = end_time - start_time
|
|
ttft = first_token_time - start_time if first_token_time else elapsed_time
|
|
else:
|
|
response = await asyncio.wait_for(
|
|
client.chat.completions.create(
|
|
model=model_name,
|
|
messages=messages,
|
|
max_tokens=config.output_tokens,
|
|
stream=False
|
|
),
|
|
timeout=request_timeout
|
|
)
|
|
# logging.warning(f"Response object: {response}")
|
|
|
|
end_time = time.time()
|
|
elapsed_time = end_time - start_time
|
|
ttft = elapsed_time # No first-token event in non-streaming mode
|
|
|
|
completion_tokens = response.usage.completion_tokens if response.usage else 0
|
|
prompt_tokens = response.usage.prompt_tokens if response.usage else 0
|
|
content_tokens = completion_tokens
|
|
reasoning_tokens = 0
|
|
output_text = response.choices[0].message.content or "" if response.choices else ""
|
|
reasoning_text = ""
|
|
|
|
if hasattr(response.usage, 'completion_tokens_details') and response.usage.completion_tokens_details:
|
|
details = response.usage.completion_tokens_details
|
|
if hasattr(details, 'reasoning_tokens') and details.reasoning_tokens:
|
|
reasoning_tokens = details.reasoning_tokens
|
|
content_tokens = completion_tokens - reasoning_tokens
|
|
|
|
if log_io and io_logger:
|
|
if reasoning_text:
|
|
io_logger.info(f"REASONING ({reasoning_tokens} tokens):\n{reasoning_text}")
|
|
io_logger.info(f"{'-'*80}")
|
|
io_logger.info(f"CONTENT ({content_tokens} tokens):\n{output_text}")
|
|
io_logger.info(f"{'-'*80}")
|
|
io_logger.info(f"TTFT: {ttft:.3f}s | Latency: {elapsed_time:.3f}s")
|
|
io_logger.info(f"Throughput: {completion_tokens/elapsed_time:.2f} tok/s")
|
|
io_logger.info(f"Token accuracy: target={config.input_tokens}, actual={prompt_tokens}")
|
|
io_logger.info(f"{'='*80}\n")
|
|
|
|
# Validate token counts
|
|
if content_tokens == 0:
|
|
logging.warning("Request completed but got no content tokens")
|
|
return RequestResult(
|
|
total_tokens=completion_tokens,
|
|
content_tokens=content_tokens,
|
|
reasoning_tokens=reasoning_tokens,
|
|
elapsed_time=elapsed_time,
|
|
time_to_first_token=ttft,
|
|
prompt_tokens=prompt_tokens,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=False,
|
|
error_message="No content tokens generated"
|
|
)
|
|
|
|
token_diff = abs(prompt_tokens - config.input_tokens)
|
|
if token_diff > TOKEN_VALIDATION_THRESHOLD:
|
|
logging.warning(
|
|
f"Token count difference: target={config.input_tokens}, "
|
|
f"actual={prompt_tokens}, diff={token_diff}"
|
|
)
|
|
|
|
return RequestResult(
|
|
total_tokens=completion_tokens,
|
|
content_tokens=content_tokens,
|
|
reasoning_tokens=reasoning_tokens,
|
|
elapsed_time=elapsed_time,
|
|
time_to_first_token=ttft,
|
|
prompt_tokens=prompt_tokens,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=True
|
|
)
|
|
|
|
except asyncio.TimeoutError:
|
|
end_time = time.time()
|
|
logging.warning(f"Request {request_id} timed out after {request_timeout}s")
|
|
if log_io and io_logger:
|
|
io_logger.info(f"REQUEST {request_id} - TIMEOUT after {request_timeout}s\n")
|
|
return RequestResult(
|
|
total_tokens=0,
|
|
content_tokens=0,
|
|
reasoning_tokens=0,
|
|
elapsed_time=request_timeout,
|
|
time_to_first_token=request_timeout,
|
|
prompt_tokens=config.input_tokens,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=False,
|
|
error_message=f"Timeout after {request_timeout}s"
|
|
)
|
|
|
|
except RateLimitError as e:
|
|
end_time = time.time()
|
|
elapsed = end_time - start_time
|
|
error_msg = f"429 Rate Limit: {str(e)}"
|
|
logging.warning(f"Request {request_id} got 429 (rate limited) after {elapsed:.3f}s")
|
|
if log_io and io_logger:
|
|
io_logger.info(f"REQUEST {request_id} - 429 RATE LIMITED after {elapsed:.3f}s\n")
|
|
return RequestResult(
|
|
total_tokens=0,
|
|
content_tokens=0,
|
|
reasoning_tokens=0,
|
|
elapsed_time=elapsed,
|
|
time_to_first_token=0,
|
|
prompt_tokens=config.input_tokens,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=False,
|
|
error_message=error_msg
|
|
)
|
|
|
|
except Exception as e:
|
|
end_time = time.time()
|
|
elapsed = end_time - start_time
|
|
error_msg = f"{type(e).__name__}: {str(e)}"
|
|
logging.error(f"Request {request_id} error after {elapsed:.3f}s: {error_msg}", exc_info=True)
|
|
if log_io and io_logger:
|
|
io_logger.info(f"REQUEST {request_id} - ERROR after {elapsed:.3f}s: {error_msg}\n")
|
|
return RequestResult(
|
|
total_tokens=0,
|
|
content_tokens=0,
|
|
reasoning_tokens=0,
|
|
elapsed_time=elapsed,
|
|
time_to_first_token=0,
|
|
prompt_tokens=config.input_tokens,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
success=False,
|
|
error_message=error_msg
|
|
)
|
|
|
|
|
|
async def make_batch_request(
|
|
client: AsyncOpenAI,
|
|
config: BenchmarkConfig,
|
|
model_name: str,
|
|
request_timeout: int,
|
|
batch_size: int,
|
|
batch_id: int,
|
|
log_io: bool = False,
|
|
io_logger: Optional[logging.Logger] = None,
|
|
dataset: Optional[dict] = None,
|
|
text_content: Optional[str] = None,
|
|
stream: bool = True
|
|
) -> List[RequestResult]:
|
|
"""
|
|
Make a batch of requests simultaneously and return results.
|
|
|
|
This sends multiple independent requests at the exact same time
|
|
and measures their collective performance.
|
|
"""
|
|
batch_start_time = time.time()
|
|
|
|
if log_io and io_logger:
|
|
io_logger.info(f"\n{'#'*80}")
|
|
io_logger.info(f"BATCH {batch_id} - Sending {batch_size} requests")
|
|
io_logger.info(f"Model: {model_name}, Input: {config.input_tokens} tokens")
|
|
io_logger.info(f"{'#'*80}")
|
|
|
|
# Create all requests simultaneously
|
|
tasks = []
|
|
for i in range(batch_size):
|
|
request_id = batch_id * batch_size + i
|
|
task = make_request(
|
|
client=client,
|
|
config=config,
|
|
model_name=model_name,
|
|
request_timeout=request_timeout,
|
|
log_io=log_io,
|
|
io_logger=io_logger,
|
|
request_id=request_id,
|
|
dataset=dataset,
|
|
text_content=text_content,
|
|
stream=stream
|
|
)
|
|
tasks.append(task)
|
|
|
|
# Execute all requests simultaneously
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
batch_end_time = time.time()
|
|
batch_elapsed = batch_end_time - batch_start_time
|
|
|
|
# Add batch metadata to results
|
|
enhanced_results = []
|
|
for result in results:
|
|
if result:
|
|
result.batch_id = batch_id
|
|
result.requests_in_batch = batch_size
|
|
enhanced_results.append(result)
|
|
|
|
successful = sum(1 for r in enhanced_results if r.success)
|
|
total_tokens = sum(r.total_tokens for r in enhanced_results if r.success)
|
|
batch_throughput = total_tokens / batch_elapsed if batch_elapsed > 0 else 0
|
|
|
|
if log_io and io_logger:
|
|
io_logger.info(f"\n{'#'*80}")
|
|
io_logger.info(f"BATCH {batch_id} COMPLETE")
|
|
io_logger.info(f" Duration: {batch_elapsed:.3f}s")
|
|
io_logger.info(f" Successful: {successful}/{batch_size}")
|
|
io_logger.info(f" Batch Throughput: {batch_throughput:.2f} tokens/s")
|
|
io_logger.info(f"{'#'*80}\n")
|
|
|
|
logging.info(
|
|
f"Batch {batch_id}: {successful}/{batch_size} successful, "
|
|
f"{batch_elapsed:.2f}s, {batch_throughput:.2f} tok/s"
|
|
)
|
|
|
|
return enhanced_results
|
|
|
|
|
|
# ============================================================================
|
|
# STATISTICS CALCULATION
|
|
# ============================================================================
|
|
|
|
def calculate_statistics(results: List[RequestResult], config: BenchmarkConfig) -> Dict:
|
|
"""Calculate aggregate statistics from benchmark results."""
|
|
import numpy as np
|
|
|
|
successful = [r for r in results if r.success]
|
|
failed = [r for r in results if not r.success]
|
|
|
|
if not successful:
|
|
return {'success_rate': 0, 'error': 'No successful requests'}
|
|
|
|
success_rate = len(successful) / len(results) * 100
|
|
|
|
# Latency stats
|
|
latencies = [r.elapsed_time for r in successful]
|
|
avg_lat = float(np.mean(latencies))
|
|
std_lat = float(np.std(latencies))
|
|
margin = CONFIDENCE_95_Z_SCORE * std_lat / np.sqrt(len(successful))
|
|
|
|
# TTFT stats
|
|
ttft_values = [r.time_to_first_token for r in successful if r.time_to_first_token]
|
|
|
|
# Throughput calculation
|
|
actual_start = min(r.start_time for r in successful)
|
|
actual_end = max(r.end_time for r in successful)
|
|
wall_time = actual_end - actual_start
|
|
|
|
total_output_tokens = sum(r.total_tokens for r in successful)
|
|
total_content_tokens = sum(r.content_tokens for r in successful)
|
|
|
|
concurrent_throughput = total_output_tokens / wall_time if wall_time > 0 else 0
|
|
content_throughput = total_content_tokens / wall_time if wall_time > 0 else 0
|
|
|
|
# Efficiency
|
|
per_request_throughputs = [r.tokens_per_second for r in successful]
|
|
avg_per_request = float(np.mean(per_request_throughputs))
|
|
theoretical_max = config.batch_size * avg_per_request
|
|
efficiency = min((concurrent_throughput / theoretical_max * 100) if theoretical_max > 0 else 0, 100)
|
|
|
|
# Batch metrics
|
|
batch_metrics = None
|
|
batch_ids = [r.batch_id for r in successful if r.batch_id is not None]
|
|
if batch_ids:
|
|
unique_batches = set(batch_ids)
|
|
batch_sizes = {}
|
|
batch_throughputs = {}
|
|
|
|
for batch_id in unique_batches:
|
|
batch_results = [r for r in successful if r.batch_id == batch_id]
|
|
if batch_results:
|
|
batch_start = min(r.start_time for r in batch_results)
|
|
batch_end = max(r.end_time for r in batch_results)
|
|
batch_time = batch_end - batch_start
|
|
batch_tokens = sum(r.total_tokens for r in batch_results)
|
|
|
|
batch_sizes[batch_id] = len(batch_results)
|
|
batch_throughputs[batch_id] = batch_tokens / batch_time if batch_time > 0 else 0
|
|
|
|
throughput_values = list(batch_throughputs.values())
|
|
batch_metrics = {
|
|
'num_batches': len(unique_batches),
|
|
'avg_batch_size': round(float(np.mean(list(batch_sizes.values()))), 2),
|
|
'avg_batch_throughput': round(float(np.mean(throughput_values)), 2),
|
|
'min_batch_throughput': round(float(np.min(throughput_values)), 2),
|
|
'max_batch_throughput': round(float(np.max(throughput_values)), 2),
|
|
}
|
|
|
|
stats_dict = {
|
|
'config': {
|
|
'input_tokens': config.input_tokens,
|
|
'output_tokens': config.output_tokens,
|
|
'batch_size': config.batch_size,
|
|
'num_batches': config.num_batches,
|
|
'total_requests': len(results),
|
|
'actual_input_tokens': round(float(np.mean([r.prompt_tokens for r in successful])))
|
|
},
|
|
'success_metrics': {
|
|
'success_rate': round(success_rate, 2),
|
|
'successful_requests': len(successful),
|
|
'failed_requests': len(failed)
|
|
},
|
|
'latency': {
|
|
'mean': round(avg_lat, 3),
|
|
'std': round(std_lat, 3),
|
|
'min': round(float(np.min(latencies)), 3),
|
|
'max': round(float(np.max(latencies)), 3),
|
|
'p50': round(float(np.percentile(latencies, 50)), 3),
|
|
'p95': round(float(np.percentile(latencies, 95)), 3),
|
|
'p99': round(float(np.percentile(latencies, 99)), 3),
|
|
'ci_95_lower': round(avg_lat - margin, 3),
|
|
'ci_95_upper': round(avg_lat + margin, 3)
|
|
},
|
|
'ttft': {
|
|
'mean': round(float(np.mean(ttft_values)), 3) if ttft_values else None,
|
|
'std': round(float(np.std(ttft_values)), 3) if ttft_values else None,
|
|
'p50': round(float(np.percentile(ttft_values, 50)), 3) if ttft_values else None,
|
|
'p90': round(float(np.percentile(ttft_values, 90)), 3) if ttft_values else None
|
|
},
|
|
'tokens': {
|
|
'total_generated': total_output_tokens,
|
|
'content_tokens': total_content_tokens,
|
|
'reasoning_tokens': sum(r.reasoning_tokens for r in successful),
|
|
'avg_per_request': round(total_output_tokens / len(successful), 2)
|
|
},
|
|
'throughput': {
|
|
'concurrent_total_tps': round(concurrent_throughput, 2),
|
|
'concurrent_content_tps': round(content_throughput, 2),
|
|
'requests_per_second': round(len(successful) / wall_time, 2) if wall_time > 0 else 0,
|
|
'actual_wall_time': round(wall_time, 3),
|
|
'efficiency_percent': round(efficiency, 2)
|
|
}
|
|
}
|
|
|
|
# Add batch metrics if available
|
|
if batch_metrics:
|
|
stats_dict['batch_metrics'] = batch_metrics
|
|
|
|
return stats_dict
|
|
|
|
|
|
# ============================================================================
|
|
# HTTP CLIENT
|
|
# ============================================================================
|
|
|
|
def create_http_client(concurrent_requests: int, request_timeout: int) -> httpx.AsyncClient:
|
|
"""Create HTTP client configured for high concurrency."""
|
|
max_connections = max(concurrent_requests * CONNECTION_MULTIPLIER, MIN_HTTP_CONNECTIONS)
|
|
|
|
logging.info(f"Creating HTTP client with max_connections={max_connections}")
|
|
|
|
return httpx.AsyncClient(
|
|
# http2=True,
|
|
limits=httpx.Limits(
|
|
max_keepalive_connections=max_connections,
|
|
max_connections=max_connections,
|
|
keepalive_expiry=1800
|
|
),
|
|
timeout=httpx.Timeout(
|
|
timeout=request_timeout,
|
|
connect=1800.0,
|
|
read=request_timeout,
|
|
write=1800.0,
|
|
pool=5.0
|
|
)
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# MODEL INITIALIZATION
|
|
# ============================================================================
|
|
|
|
async def wait_for_model(
|
|
endpoint_url: str,
|
|
api_key: str,
|
|
model_name: str,
|
|
max_retries: int = 10,
|
|
retry_delay: int = 30
|
|
) -> bool:
|
|
"""Wait for model to be ready."""
|
|
logging.info(f"Waiting for model '{model_name}' to initialize...")
|
|
|
|
http_client = create_http_client(1, 60)
|
|
client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0)
|
|
|
|
try:
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
logging.info(f"Initialization check {attempt}/{max_retries}...")
|
|
|
|
await client.chat.completions.create(
|
|
model=model_name,
|
|
messages=[{'role': 'user', 'content': 'Hello'}],
|
|
max_tokens=10,
|
|
stream=False
|
|
)
|
|
|
|
logging.info("Model is ready!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
|
|
if any(kw in error_msg for kw in ['initializing', 'loading', 'starting', 'not ready', 'unavailable']):
|
|
if attempt < max_retries:
|
|
logging.info(f"Model initializing... waiting {retry_delay}s")
|
|
await asyncio.sleep(retry_delay)
|
|
else:
|
|
logging.error(f"Model failed to initialize after {max_retries} attempts")
|
|
return False
|
|
else:
|
|
logging.error(f"Error: {e}")
|
|
return False
|
|
finally:
|
|
await http_client.aclose()
|
|
|
|
return False
|
|
|
|
|
|
# ============================================================================
|
|
# BENCHMARK EXECUTION
|
|
# ============================================================================
|
|
|
|
async def run_benchmark(
|
|
config: BenchmarkConfig,
|
|
endpoint_url: str,
|
|
api_key: str,
|
|
model_name: str,
|
|
request_timeout: int,
|
|
log_io: bool = False,
|
|
io_logger: Optional[logging.Logger] = None,
|
|
dataset: Optional[dict] = None,
|
|
text_content: Optional[str] = None,
|
|
stream: bool = True
|
|
) -> Optional[Dict]:
|
|
"""Run a single benchmark configuration in batch mode."""
|
|
total_requests = config.num_batches * config.batch_size
|
|
|
|
logging.info(f"\n{'='*60}")
|
|
logging.info(f"Starting benchmark: {config.num_batches} batches"
|
|
f"{config.batch_size} requests/batch = {total_requests} total")
|
|
logging.info(f"Target: {config.input_tokens} tokens, Output: {config.output_tokens} tokens")
|
|
if text_content:
|
|
logging.info(f"Using custom text input")
|
|
elif dataset:
|
|
logging.info(f"Using conversation dataset")
|
|
logging.info(f"{'='*60}")
|
|
|
|
http_client = create_http_client(config.batch_size, request_timeout)
|
|
|
|
try:
|
|
client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0)
|
|
results = []
|
|
|
|
start_time = time.time()
|
|
|
|
# Send batches of requests
|
|
for batch_id in range(config.num_batches):
|
|
batch_start = time.time()
|
|
|
|
logging.info(f" Starting batch {batch_id + 1}/{config.num_batches} "
|
|
f"({config.batch_size} simultaneous requests)...")
|
|
|
|
batch_results = await make_batch_request(
|
|
client=client,
|
|
config=config,
|
|
model_name=model_name,
|
|
request_timeout=request_timeout,
|
|
batch_size=config.batch_size,
|
|
batch_id=batch_id,
|
|
log_io=log_io,
|
|
io_logger=io_logger,
|
|
dataset=dataset,
|
|
text_content=text_content,
|
|
stream=stream
|
|
)
|
|
results.extend(batch_results)
|
|
|
|
# Log batch completion with detailed metrics
|
|
batch_time = time.time() - batch_start
|
|
successful = sum(1 for r in batch_results if r.success)
|
|
failed = len(batch_results) - successful
|
|
batch_throughput = sum(r.total_tokens for r in batch_results if r.success) / batch_time if batch_time > 0 else 0
|
|
|
|
# Calculate average TTFT for this batch
|
|
ttfts = [r.time_to_first_token for r in batch_results if r.success and r.time_to_first_token]
|
|
avg_ttft = sum(ttfts) / len(ttfts) if ttfts else 0
|
|
|
|
# Calculate average latency for this batch
|
|
latencies = [r.elapsed_time for r in batch_results if r.success]
|
|
avg_latency = sum(latencies) / len(latencies) if latencies else 0
|
|
|
|
status_icon = "✓" if failed == 0 else "⚠"
|
|
logging.info(f"{status_icon} Batch {batch_id + 1}/{config.num_batches} complete: "
|
|
f"{successful}/{len(batch_results)} successful "
|
|
f"| {batch_time:.2f}s total "
|
|
f"| {batch_throughput:.0f} tok/s "
|
|
f"| TTFT: {avg_ttft:.3f}s "
|
|
f"| Latency: {avg_latency:.2f}s")
|
|
|
|
if failed > 0:
|
|
logging.warning(f" {failed} request(s) failed in batch {batch_id + 1}")
|
|
|
|
# Small delay between batches to avoid overwhelming the system
|
|
if batch_id < config.num_batches - 1:
|
|
await asyncio.sleep(0.1)
|
|
|
|
end_time = time.time()
|
|
|
|
if not results:
|
|
logging.error("No results collected!")
|
|
return None
|
|
|
|
stats = calculate_statistics(results, config)
|
|
|
|
logging.info(f"✓ Benchmark complete in {end_time - start_time:.2f}s")
|
|
if 'success_metrics' in stats:
|
|
logging.info(f" Success: {stats['success_metrics']['success_rate']}% "
|
|
f"({stats['success_metrics']['successful_requests']}/{total_requests})")
|
|
logging.info(f" P95 Latency: {stats['latency']['p95']}s")
|
|
logging.info(f" Throughput: {stats['throughput']['concurrent_total_tps']} tokens/s")
|
|
|
|
if 'batch_metrics' in stats:
|
|
logging.info(f" Avg Batch Throughput: {stats['batch_metrics']['avg_batch_throughput']} tokens/s")
|
|
else:
|
|
logging.error(f" All {total_requests} requests failed. Error: {stats.get('error', 'unknown')}")
|
|
|
|
return stats
|
|
|
|
finally:
|
|
await http_client.aclose()
|
|
|
|
|
|
async def run_all_benchmarks(
|
|
configs: List[BenchmarkConfig],
|
|
endpoint_url: str,
|
|
api_key: str,
|
|
model_name: str,
|
|
request_timeout: int,
|
|
delay_between_runs: int = 5,
|
|
log_io: bool = False,
|
|
io_logger: Optional[logging.Logger] = None,
|
|
wait_for_ready: bool = True,
|
|
max_init_retries: int = 10,
|
|
init_retry_delay: int = 30,
|
|
dataset: Optional[dict] = None,
|
|
text_content: Optional[str] = None,
|
|
stream: bool = True
|
|
) -> List[Dict]:
|
|
"""Run all benchmark configurations."""
|
|
if wait_for_ready:
|
|
if not await wait_for_model(endpoint_url, api_key, model_name, max_init_retries, init_retry_delay):
|
|
logging.error("Model initialization failed. Aborting.")
|
|
return []
|
|
|
|
all_results = []
|
|
|
|
for i, config in enumerate(configs, 1):
|
|
logging.info(f"\n--- Benchmark {i}/{len(configs)} ---")
|
|
result = await run_benchmark(
|
|
config, endpoint_url, api_key, model_name, request_timeout,
|
|
log_io, io_logger, dataset, text_content, stream
|
|
)
|
|
|
|
if result:
|
|
all_results.append(result)
|
|
else:
|
|
logging.error(f"Benchmark {i} failed")
|
|
|
|
if i < len(configs):
|
|
logging.info(f"Waiting {delay_between_runs}s before next run...")
|
|
await asyncio.sleep(delay_between_runs)
|
|
|
|
# Log summary
|
|
if all_results:
|
|
successful = len(all_results)
|
|
total = len(configs)
|
|
logging.info(f"\n{'='*60}")
|
|
logging.info(f"ALL BENCHMARKS COMPLETE")
|
|
logging.info(f"{'='*60}")
|
|
logging.info(f"Completed: {successful}/{total} benchmarks")
|
|
|
|
# Overall stats
|
|
total_requests = sum(r['config']['total_requests'] for r in all_results)
|
|
total_successful = sum(r['success_metrics']['successful_requests'] for r in all_results)
|
|
overall_success_rate = (total_successful / total_requests * 100) if total_requests > 0 else 0
|
|
|
|
logging.info(f"Total requests: {total_requests}")
|
|
logging.info(f"Successful: {total_successful} ({overall_success_rate:.1f}%)")
|
|
logging.info(f"{'='*60}")
|
|
|
|
return all_results
|
|
|
|
|
|
# ============================================================================
|
|
# FILE I/O
|
|
# ============================================================================
|
|
|
|
def create_results_directory(model_name: str) -> Tuple[Path, str]:
|
|
"""Create results directory."""
|
|
if 'ubiops-deployment/' in model_name:
|
|
parts = model_name.split('/')
|
|
if len(parts) >= 4:
|
|
model_name = parts[-1]
|
|
|
|
safe_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in model_name)
|
|
results_dir = Path('results') / f'results_{safe_name}'
|
|
results_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.info(f"Results directory: {results_dir}")
|
|
return results_dir, safe_name
|
|
|
|
|
|
def save_results(results: List[Dict], results_dir: Path, model_name: str) -> Path:
|
|
"""Save benchmark results to JSON."""
|
|
output_path = results_dir / 'benchmark_results.json'
|
|
|
|
output_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'model_name': model_name,
|
|
'results': results
|
|
}
|
|
|
|
with open(output_path, 'w') as f:
|
|
json.dump(output_data, f, indent=2)
|
|
|
|
logging.info(f"Results saved: {output_path}")
|
|
return output_path
|
|
|
|
|
|
def load_config_file(config_path: str) -> Dict:
|
|
"""Load configuration from YAML file."""
|
|
with open(config_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
def save_config_copy(config_data: Dict, results_dir: Path) -> None:
|
|
"""Save sanitized config copy."""
|
|
if 'endpoint' in config_data and 'api_key' in config_data['endpoint']:
|
|
config_data['endpoint']['api_key'] = '<REDACTED>'
|
|
|
|
config_path = results_dir / 'config_used.yaml'
|
|
with open(config_path, 'w') as f:
|
|
yaml.dump(config_data, f, default_flow_style=False, sort_keys=False)
|
|
|
|
logging.info(f"Config saved: {config_path}")
|
|
|
|
|
|
# ============================================================================
|
|
# CLI INTERFACE
|
|
# ============================================================================
|
|
|
|
def parse_args():
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Benchmark LLM deployments via OpenAI-compatible endpoints",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Using config file
|
|
python benchmark_llm.py --config benchmark_config.yaml
|
|
|
|
# Using CLI arguments
|
|
python benchmark_llm.py --endpoint_url https://api.example.com/v1 \\
|
|
--api_key YOUR_KEY \\
|
|
--model_name gpt-4 \\
|
|
--input_tokens 1000 5000 10000 \\
|
|
--concurrent_requests 1 8 16 32
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('--config', type=str, help="Path to YAML config file")
|
|
parser.add_argument('--endpoint_url', type=str, help="OpenAI-compatible endpoint URL")
|
|
parser.add_argument('--api_key', type=str, help="API key")
|
|
parser.add_argument('--model_name', type=str, help="Model name")
|
|
parser.add_argument('--input_tokens', type=int, nargs='+', default=[1000, 5000, 10000],
|
|
help="Input token counts to test")
|
|
parser.add_argument('--batch_sizes', type=int, nargs='+', default=[16, 32, 64, 128],
|
|
help="Batch sizes to test (number of simultaneous requests)")
|
|
parser.add_argument('--num_batches', type=int, default=10,
|
|
help="Number of batches to send per configuration")
|
|
parser.add_argument('--output_tokens', type=int, default=256,
|
|
help="Output tokens per request")
|
|
parser.add_argument('--dataset', type=str, default=None,
|
|
help="Path to conversation dataset JSON file (optional)")
|
|
parser.add_argument('--text', type=str, default=None,
|
|
help="Custom text to use as input for all requests (same text repeated)")
|
|
parser.add_argument('--request_timeout', type=int, default=900,
|
|
help="Request timeout (seconds)")
|
|
parser.add_argument('--delay_between_runs', type=int, default=5,
|
|
help="Delay between runs (seconds)")
|
|
parser.add_argument('--no_stream', action='store_true',
|
|
help="Use non-streaming requests instead of streaming")
|
|
parser.add_argument('--log_io', action='store_true',
|
|
help="Log all input/output")
|
|
parser.add_argument('--skip_init_wait', action='store_true',
|
|
help="Skip model initialization wait")
|
|
parser.add_argument('--max_init_retries', type=int, default=10,
|
|
help="Max initialization retries")
|
|
parser.add_argument('--init_retry_delay', type=int, default=30,
|
|
help="Delay between init retries")
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def generate_configs(
|
|
input_tokens: List[int],
|
|
batch_sizes: List[int],
|
|
num_batches: int,
|
|
output_tokens: int
|
|
) -> List[BenchmarkConfig]:
|
|
"""Generate benchmark configurations."""
|
|
configs = []
|
|
for input_tok in input_tokens:
|
|
for batch_size in batch_sizes:
|
|
configs.append(BenchmarkConfig(
|
|
input_tokens=input_tok,
|
|
batch_size=batch_size,
|
|
num_batches=num_batches,
|
|
output_tokens=output_tokens
|
|
))
|
|
return configs
|
|
|
|
|
|
# ============================================================================
|
|
# MAIN
|
|
# ============================================================================
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
args = parse_args()
|
|
|
|
# Load configuration
|
|
if args.config:
|
|
config_data = load_config_file(args.config)
|
|
endpoint_url = config_data['endpoint']['url']
|
|
api_key = config_data['endpoint']['api_key']
|
|
model_name = config_data['endpoint']['model_name']
|
|
|
|
bench_config = config_data['benchmark']
|
|
input_tokens = bench_config['input_tokens']
|
|
batch_sizes = bench_config['batch_sizes']
|
|
num_batches = bench_config['num_batches']
|
|
output_tokens = bench_config['output_tokens']
|
|
dataset_path = bench_config.get('dataset', None)
|
|
custom_text = bench_config.get('text', None)
|
|
|
|
runtime = config_data.get('runtime', {})
|
|
request_timeout = runtime.get('request_timeout', 900)
|
|
delay_between_runs = runtime.get('delay_between_runs', 5)
|
|
log_io = runtime.get('log_io', False)
|
|
stream = runtime.get('stream', True)
|
|
wait_for_ready = runtime.get('wait_for_ready', True)
|
|
max_init_retries = runtime.get('max_init_retries', 10)
|
|
init_retry_delay = runtime.get('init_retry_delay', 30)
|
|
else:
|
|
# Use CLI arguments
|
|
if not all([args.endpoint_url, args.api_key, args.model_name]):
|
|
logging.error("Must provide --config or all of --endpoint_url, --api_key, --model_name")
|
|
return
|
|
|
|
config_data = None
|
|
endpoint_url = args.endpoint_url
|
|
api_key = args.api_key
|
|
model_name = args.model_name
|
|
input_tokens = args.input_tokens
|
|
batch_sizes = args.batch_sizes
|
|
num_batches = args.num_batches
|
|
output_tokens = args.output_tokens
|
|
dataset_path = args.dataset
|
|
custom_text = args.text
|
|
request_timeout = args.request_timeout
|
|
delay_between_runs = args.delay_between_runs
|
|
log_io = args.log_io
|
|
stream = not args.no_stream
|
|
wait_for_ready = not args.skip_init_wait
|
|
max_init_retries = args.max_init_retries
|
|
init_retry_delay = args.init_retry_delay
|
|
|
|
# Load conversation dataset if provided
|
|
dataset = None
|
|
if dataset_path:
|
|
try:
|
|
dataset = load_conversation_dataset(dataset_path)
|
|
except Exception as e:
|
|
logging.error(f"Failed to load dataset: {e}")
|
|
logging.error("Continuing with generated prompts...")
|
|
|
|
# Use custom text if provided (takes priority over everything)
|
|
text_content = custom_text
|
|
|
|
# Create results directory
|
|
results_dir, safe_name = create_results_directory(model_name)
|
|
io_logger = setup_logging(results_dir, log_io)
|
|
|
|
if config_data:
|
|
save_config_copy(config_data, results_dir)
|
|
|
|
# Generate benchmark configurations
|
|
configs = generate_configs(input_tokens, batch_sizes, num_batches, output_tokens)
|
|
|
|
logging.info(f"\n{'='*60}")
|
|
logging.info(f"BENCHMARK CONFIGURATION")
|
|
logging.info(f"{'='*60}")
|
|
logging.info(f"Model: {model_name}")
|
|
logging.info(f"Endpoint: {endpoint_url}")
|
|
logging.info(f"Mode: BATCH ({'streaming' if stream else 'non-streaming'})")
|
|
if text_content:
|
|
logging.info(f"Input: Custom text ({len(text_content)} chars)")
|
|
elif dataset:
|
|
logging.info(f"Input: {dataset_path} (real conversations)")
|
|
else:
|
|
logging.info(f"Input: Generated prompts")
|
|
logging.info(f"Total configurations: {len(configs)}")
|
|
logging.info(f"Input tokens: {input_tokens}")
|
|
logging.info(f"Batch sizes: {batch_sizes}")
|
|
logging.info(f"Batches per config: {num_batches}")
|
|
logging.info(f"Output tokens: {output_tokens}")
|
|
logging.info(f"{'='*60}\n")
|
|
|
|
# Run benchmarks
|
|
results = await run_all_benchmarks(
|
|
configs=configs,
|
|
endpoint_url=endpoint_url,
|
|
api_key=api_key,
|
|
model_name=model_name,
|
|
request_timeout=request_timeout,
|
|
delay_between_runs=delay_between_runs,
|
|
log_io=log_io,
|
|
io_logger=io_logger,
|
|
wait_for_ready=wait_for_ready,
|
|
max_init_retries=max_init_retries,
|
|
init_retry_delay=init_retry_delay,
|
|
dataset=dataset,
|
|
text_content=text_content,
|
|
stream=stream
|
|
)
|
|
|
|
if results:
|
|
results_path = save_results(results, results_dir, model_name)
|
|
|
|
logging.info(f"\n{'='*60}")
|
|
logging.info("BENCHMARK COMPLETE!")
|
|
logging.info(f"{'='*60}")
|
|
logging.info(f"Results: {results_path}")
|
|
if log_io:
|
|
logging.info(f"I/O log: {results_dir / 'benchmark_io.log'}")
|
|
logging.info(f"\nVisualize: python visualize_results.py --input {results_path}")
|
|
else:
|
|
logging.error("No results generated")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|