""" LLM Benchmarking Tool Benchmarks LLM deployments via OpenAI-compatible endpoints with async concurrency. Supports YAML config files and CLI arguments. """ import asyncio import time import argparse import json import yaml import logging import httpx from pathlib import Path from datetime import datetime from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Tuple from openai import AsyncOpenAI, RateLimitError # ============================================================================ # CONSTANTS # ============================================================================ TOKEN_TO_CHAR_RATIO = 7 CONFIDENCE_95_Z_SCORE = 1.96 MIN_HTTP_CONNECTIONS = 200 CONNECTION_MULTIPLIER = 2 TOKEN_VALIDATION_THRESHOLD = 100 SHUTDOWN_SENTINEL = object() # ============================================================================ # DATA MODELS # ============================================================================ @dataclass class BenchmarkConfig: """Configuration for a single benchmark run.""" input_tokens: int batch_size: int num_batches: int output_tokens: int @dataclass class RequestResult: """Results from a single request.""" total_tokens: int content_tokens: int reasoning_tokens: int elapsed_time: float time_to_first_token: float prompt_tokens: int # Actual input tokens from API response start_time: float end_time: float success: bool = True error_message: Optional[str] = None batch_id: Optional[int] = None # Batch identifier if using batching requests_in_batch: Optional[int] = None # Number of requests in this batch @property def tokens_per_second(self) -> float: """Calculate tokens per second for this request.""" return self.total_tokens / self.elapsed_time if self.elapsed_time > 0 else 0 @property def content_tokens_per_second(self) -> float: return self.content_tokens / self.elapsed_time if self.elapsed_time > 0 else 0 # ============================================================================ # LOGGING SETUP # ============================================================================ def setup_logging(results_dir: Path, log_io: bool = False) -> Optional[logging.Logger]: """Setup logging with optional I/O logging.""" # Configure root logger for console output logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() # Ensure console output ] ) # logging.getLogger("httpx").setLevel(logging.DEBUG) # logging.getLogger("httpcore").setLevel(logging.DEBUG) if not log_io: return None # Setup separate I/O logger for detailed request/response logging io_logger = logging.getLogger('io_logger') io_logger.handlers.clear() io_log_path = results_dir / 'benchmark_io.log' io_handler = logging.FileHandler(io_log_path) io_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) io_logger.addHandler(io_handler) io_logger.setLevel(logging.INFO) io_logger.propagate = False logging.info(f"I/O logging enabled: {io_log_path}") return io_logger # ============================================================================ # PROMPT GENERATION # ============================================================================ def generate_prompt(target_tokens: int) -> str: """ Generate a prompt with approximately the target number of tokens. Parameters: target_tokens: Target number of tokens for the generated prompt Returns: Generated prompt string """ chars_needed = target_tokens * TOKEN_TO_CHAR_RATIO base_text = ( "You are an AI assistant analyzing complex systems. Consider the following " "context about modern computing, AI architectures, distributed systems, " "cloud infrastructure, ML model deployment, data pipelines, scalability, " "microservices, containerization, Kubernetes, monitoring, observability, " "performance optimization, security, cost optimization, and emerging AI/ML trends. " ) base_len = len(base_text) full_reps = chars_needed // base_len remainder = chars_needed % base_len expanded_text = base_text * full_reps + base_text[:remainder] question = "\n\nBased on the context above, provide a comprehensive analysis." return expanded_text + question def load_conversation_dataset(dataset_path: str) -> dict: """Load pre-generated conversation dataset from JSON file.""" try: with open(dataset_path, 'r', encoding='utf-8') as f: dataset = json.load(f) logging.info(f"Loaded conversation dataset from {dataset_path}") # Log bucket information for bucket, conversations in dataset.items(): logging.info(f" Bucket {bucket}: {len(conversations)} conversations") return dataset except FileNotFoundError: logging.error(f"Dataset file not found: {dataset_path}") logging.error("Generate dataset first using: python create_test_dataset.py") raise except json.JSONDecodeError as e: logging.error(f"Invalid JSON in dataset file: {e}") raise def get_conversation_for_tokens( dataset: dict, target_tokens: int, request_index: int ) -> list[dict]: """ Get a conversation from the dataset for the given token count. Uses request_index to cycle through available conversations. """ bucket_key = str(target_tokens) if bucket_key not in dataset: # Find closest bucket available_buckets = [int(k) for k in dataset.keys()] closest_bucket = min(available_buckets, key=lambda x: abs(x - target_tokens)) bucket_key = str(closest_bucket) logging.debug(f"No exact bucket for {target_tokens} tokens, using {closest_bucket}") conversations = dataset[bucket_key] if not conversations: raise ValueError(f"No conversations available for bucket {bucket_key}") # Cycle through conversations using request index conversation_idx = request_index % len(conversations) conversation = conversations[conversation_idx] messages = list(conversation["messages"]) # Ensure the conversation ends with a user message so the API can generate a response if messages and messages[-1].get("role") == "assistant": messages.append({"role": "user", "content": "Please continue."}) return messages # ============================================================================ # REQUEST PROCESSING # ============================================================================ async def process_stream(stream, log_io: bool, io_logger, request_id: int) -> Tuple: """Process streaming response and capture metrics.""" first_token_time = None output_text = "" reasoning_text = "" completion_tokens = 0 prompt_tokens = 0 content_tokens = 0 reasoning_tokens = 0 try: async for chunk in stream: if hasattr(chunk, 'choices') and chunk.choices: delta = chunk.choices[0].delta if hasattr(delta, 'reasoning_content') and delta.reasoning_content: if log_io: reasoning_text += delta.reasoning_content if hasattr(delta, 'content') and delta.content: if first_token_time is None: first_token_time = time.time() output_text += delta.content # Get token counts from API if hasattr(chunk, 'usage') and chunk.usage: if hasattr(chunk.usage, 'completion_tokens'): completion_tokens = chunk.usage.completion_tokens if hasattr(chunk.usage, 'prompt_tokens'): prompt_tokens = chunk.usage.prompt_tokens if hasattr(chunk.usage, 'completion_tokens_details') and chunk.usage.completion_tokens_details: details = chunk.usage.completion_tokens_details if hasattr(details, 'reasoning_tokens'): reasoning_tokens = details.reasoning_tokens if hasattr(details, 'content_tokens'): content_tokens = details.content_tokens except Exception as e: logging.error(f"Error processing stream: {e}") raise # Fallback calculation if content_tokens == 0 and reasoning_tokens == 0 and completion_tokens > 0: content_tokens = completion_tokens elif reasoning_tokens == 0 and content_tokens > 0 and content_tokens < completion_tokens: reasoning_tokens = completion_tokens - content_tokens elif content_tokens == 0 and reasoning_tokens > 0 and completion_tokens > reasoning_tokens: content_tokens = completion_tokens - reasoning_tokens return ( first_token_time, completion_tokens, prompt_tokens, content_tokens, reasoning_tokens, output_text, reasoning_text ) async def make_request( client: AsyncOpenAI, config: BenchmarkConfig, model_name: str, request_timeout: int, log_io: bool = False, io_logger: Optional[logging.Logger] = None, request_id: Optional[int] = None, dataset: Optional[dict] = None, text_content: Optional[str] = None, stream: bool = True ) -> Optional[RequestResult]: """Make a single request to the model.""" start_time = time.time() # Determine input source priority: text_content > dataset > generated if text_content is not None: # Use provided text directly (ignores input_tokens, uses actual text length) messages = [{'role': 'user', 'content': text_content}] elif dataset is not None: try: messages = get_conversation_for_tokens(dataset, config.input_tokens, request_id) except (ValueError, KeyError) as e: logging.warning(f"Failed to get conversation from dataset: {e}. Falling back to generated prompt.") prompt = generate_prompt(config.input_tokens) messages = [{'role': 'user', 'content': prompt}] else: # Generate synthetic prompt prompt = generate_prompt(config.input_tokens) messages = [{'role': 'user', 'content': prompt}] if log_io and io_logger: io_logger.info(f"\n{'='*80}") io_logger.info(f"REQUEST {request_id} - Target: {config.input_tokens} tokens") io_logger.info(f"Model: {model_name}, Batch size: {config.batch_size}") if text_content: io_logger.info(f"Source: Custom text") elif dataset: io_logger.info(f"Source: Conversation dataset") else: io_logger.info(f"Source: Generated prompt") io_logger.info(f"{'='*80}") io_logger.info(f"MESSAGES:\n{json.dumps(messages, indent=2)}") io_logger.info(f"{'-'*80}") try: stream = False print(f"Streaming: {stream}") if stream: stream_obj = await client.chat.completions.create( model=model_name, messages=messages, max_tokens=config.output_tokens, stream=True, stream_options={"include_usage": True} ) result = await asyncio.wait_for( process_stream(stream_obj, log_io, io_logger, request_id), timeout=request_timeout ) ( first_token_time, completion_tokens, prompt_tokens, content_tokens, reasoning_tokens, output_text, reasoning_text ) = result #logging.warning(f"[STREAM RESPONSE] {output_text}") end_time = time.time() elapsed_time = end_time - start_time ttft = first_token_time - start_time if first_token_time else elapsed_time else: response = await asyncio.wait_for( client.chat.completions.create( model=model_name, messages=messages, max_tokens=config.output_tokens, stream=False ), timeout=request_timeout ) # logging.warning(f"Response object: {response}") end_time = time.time() elapsed_time = end_time - start_time ttft = elapsed_time # No first-token event in non-streaming mode completion_tokens = response.usage.completion_tokens if response.usage else 0 prompt_tokens = response.usage.prompt_tokens if response.usage else 0 content_tokens = completion_tokens reasoning_tokens = 0 output_text = response.choices[0].message.content or "" if response.choices else "" reasoning_text = "" if hasattr(response.usage, 'completion_tokens_details') and response.usage.completion_tokens_details: details = response.usage.completion_tokens_details if hasattr(details, 'reasoning_tokens') and details.reasoning_tokens: reasoning_tokens = details.reasoning_tokens content_tokens = completion_tokens - reasoning_tokens if log_io and io_logger: if reasoning_text: io_logger.info(f"REASONING ({reasoning_tokens} tokens):\n{reasoning_text}") io_logger.info(f"{'-'*80}") io_logger.info(f"CONTENT ({content_tokens} tokens):\n{output_text}") io_logger.info(f"{'-'*80}") io_logger.info(f"TTFT: {ttft:.3f}s | Latency: {elapsed_time:.3f}s") io_logger.info(f"Throughput: {completion_tokens/elapsed_time:.2f} tok/s") io_logger.info(f"Token accuracy: target={config.input_tokens}, actual={prompt_tokens}") io_logger.info(f"{'='*80}\n") # Validate token counts if content_tokens == 0: logging.warning("Request completed but got no content tokens") return RequestResult( total_tokens=completion_tokens, content_tokens=content_tokens, reasoning_tokens=reasoning_tokens, elapsed_time=elapsed_time, time_to_first_token=ttft, prompt_tokens=prompt_tokens, start_time=start_time, end_time=end_time, success=False, error_message="No content tokens generated" ) token_diff = abs(prompt_tokens - config.input_tokens) if token_diff > TOKEN_VALIDATION_THRESHOLD: logging.warning( f"Token count difference: target={config.input_tokens}, " f"actual={prompt_tokens}, diff={token_diff}" ) return RequestResult( total_tokens=completion_tokens, content_tokens=content_tokens, reasoning_tokens=reasoning_tokens, elapsed_time=elapsed_time, time_to_first_token=ttft, prompt_tokens=prompt_tokens, start_time=start_time, end_time=end_time, success=True ) except asyncio.TimeoutError: end_time = time.time() logging.warning(f"Request {request_id} timed out after {request_timeout}s") if log_io and io_logger: io_logger.info(f"REQUEST {request_id} - TIMEOUT after {request_timeout}s\n") return RequestResult( total_tokens=0, content_tokens=0, reasoning_tokens=0, elapsed_time=request_timeout, time_to_first_token=request_timeout, prompt_tokens=config.input_tokens, start_time=start_time, end_time=end_time, success=False, error_message=f"Timeout after {request_timeout}s" ) except RateLimitError as e: end_time = time.time() elapsed = end_time - start_time error_msg = f"429 Rate Limit: {str(e)}" logging.warning(f"Request {request_id} got 429 (rate limited) after {elapsed:.3f}s") if log_io and io_logger: io_logger.info(f"REQUEST {request_id} - 429 RATE LIMITED after {elapsed:.3f}s\n") return RequestResult( total_tokens=0, content_tokens=0, reasoning_tokens=0, elapsed_time=elapsed, time_to_first_token=0, prompt_tokens=config.input_tokens, start_time=start_time, end_time=end_time, success=False, error_message=error_msg ) except Exception as e: end_time = time.time() elapsed = end_time - start_time error_msg = f"{type(e).__name__}: {str(e)}" logging.error(f"Request {request_id} error after {elapsed:.3f}s: {error_msg}", exc_info=True) if log_io and io_logger: io_logger.info(f"REQUEST {request_id} - ERROR after {elapsed:.3f}s: {error_msg}\n") return RequestResult( total_tokens=0, content_tokens=0, reasoning_tokens=0, elapsed_time=elapsed, time_to_first_token=0, prompt_tokens=config.input_tokens, start_time=start_time, end_time=end_time, success=False, error_message=error_msg ) async def make_batch_request( client: AsyncOpenAI, config: BenchmarkConfig, model_name: str, request_timeout: int, batch_size: int, batch_id: int, log_io: bool = False, io_logger: Optional[logging.Logger] = None, dataset: Optional[dict] = None, text_content: Optional[str] = None, stream: bool = True ) -> List[RequestResult]: """ Make a batch of requests simultaneously and return results. This sends multiple independent requests at the exact same time and measures their collective performance. """ batch_start_time = time.time() if log_io and io_logger: io_logger.info(f"\n{'#'*80}") io_logger.info(f"BATCH {batch_id} - Sending {batch_size} requests") io_logger.info(f"Model: {model_name}, Input: {config.input_tokens} tokens") io_logger.info(f"{'#'*80}") # Create all requests simultaneously tasks = [] for i in range(batch_size): request_id = batch_id * batch_size + i task = make_request( client=client, config=config, model_name=model_name, request_timeout=request_timeout, log_io=log_io, io_logger=io_logger, request_id=request_id, dataset=dataset, text_content=text_content, stream=stream ) tasks.append(task) # Execute all requests simultaneously results = await asyncio.gather(*tasks) batch_end_time = time.time() batch_elapsed = batch_end_time - batch_start_time # Add batch metadata to results enhanced_results = [] for result in results: if result: result.batch_id = batch_id result.requests_in_batch = batch_size enhanced_results.append(result) successful = sum(1 for r in enhanced_results if r.success) total_tokens = sum(r.total_tokens for r in enhanced_results if r.success) batch_throughput = total_tokens / batch_elapsed if batch_elapsed > 0 else 0 if log_io and io_logger: io_logger.info(f"\n{'#'*80}") io_logger.info(f"BATCH {batch_id} COMPLETE") io_logger.info(f" Duration: {batch_elapsed:.3f}s") io_logger.info(f" Successful: {successful}/{batch_size}") io_logger.info(f" Batch Throughput: {batch_throughput:.2f} tokens/s") io_logger.info(f"{'#'*80}\n") logging.info( f"Batch {batch_id}: {successful}/{batch_size} successful, " f"{batch_elapsed:.2f}s, {batch_throughput:.2f} tok/s" ) return enhanced_results # ============================================================================ # STATISTICS CALCULATION # ============================================================================ def calculate_statistics(results: List[RequestResult], config: BenchmarkConfig) -> Dict: """Calculate aggregate statistics from benchmark results.""" import numpy as np successful = [r for r in results if r.success] failed = [r for r in results if not r.success] if not successful: return {'success_rate': 0, 'error': 'No successful requests'} success_rate = len(successful) / len(results) * 100 # Latency stats latencies = [r.elapsed_time for r in successful] avg_lat = float(np.mean(latencies)) std_lat = float(np.std(latencies)) margin = CONFIDENCE_95_Z_SCORE * std_lat / np.sqrt(len(successful)) # TTFT stats ttft_values = [r.time_to_first_token for r in successful if r.time_to_first_token] # Throughput calculation actual_start = min(r.start_time for r in successful) actual_end = max(r.end_time for r in successful) wall_time = actual_end - actual_start total_output_tokens = sum(r.total_tokens for r in successful) total_content_tokens = sum(r.content_tokens for r in successful) concurrent_throughput = total_output_tokens / wall_time if wall_time > 0 else 0 content_throughput = total_content_tokens / wall_time if wall_time > 0 else 0 # Efficiency per_request_throughputs = [r.tokens_per_second for r in successful] avg_per_request = float(np.mean(per_request_throughputs)) theoretical_max = config.batch_size * avg_per_request efficiency = min((concurrent_throughput / theoretical_max * 100) if theoretical_max > 0 else 0, 100) # Batch metrics batch_metrics = None batch_ids = [r.batch_id for r in successful if r.batch_id is not None] if batch_ids: unique_batches = set(batch_ids) batch_sizes = {} batch_throughputs = {} for batch_id in unique_batches: batch_results = [r for r in successful if r.batch_id == batch_id] if batch_results: batch_start = min(r.start_time for r in batch_results) batch_end = max(r.end_time for r in batch_results) batch_time = batch_end - batch_start batch_tokens = sum(r.total_tokens for r in batch_results) batch_sizes[batch_id] = len(batch_results) batch_throughputs[batch_id] = batch_tokens / batch_time if batch_time > 0 else 0 throughput_values = list(batch_throughputs.values()) batch_metrics = { 'num_batches': len(unique_batches), 'avg_batch_size': round(float(np.mean(list(batch_sizes.values()))), 2), 'avg_batch_throughput': round(float(np.mean(throughput_values)), 2), 'min_batch_throughput': round(float(np.min(throughput_values)), 2), 'max_batch_throughput': round(float(np.max(throughput_values)), 2), } stats_dict = { 'config': { 'input_tokens': config.input_tokens, 'output_tokens': config.output_tokens, 'batch_size': config.batch_size, 'num_batches': config.num_batches, 'total_requests': len(results), 'actual_input_tokens': round(float(np.mean([r.prompt_tokens for r in successful]))) }, 'success_metrics': { 'success_rate': round(success_rate, 2), 'successful_requests': len(successful), 'failed_requests': len(failed) }, 'latency': { 'mean': round(avg_lat, 3), 'std': round(std_lat, 3), 'min': round(float(np.min(latencies)), 3), 'max': round(float(np.max(latencies)), 3), 'p50': round(float(np.percentile(latencies, 50)), 3), 'p95': round(float(np.percentile(latencies, 95)), 3), 'p99': round(float(np.percentile(latencies, 99)), 3), 'ci_95_lower': round(avg_lat - margin, 3), 'ci_95_upper': round(avg_lat + margin, 3) }, 'ttft': { 'mean': round(float(np.mean(ttft_values)), 3) if ttft_values else None, 'std': round(float(np.std(ttft_values)), 3) if ttft_values else None, 'p50': round(float(np.percentile(ttft_values, 50)), 3) if ttft_values else None, 'p90': round(float(np.percentile(ttft_values, 90)), 3) if ttft_values else None }, 'tokens': { 'total_generated': total_output_tokens, 'content_tokens': total_content_tokens, 'reasoning_tokens': sum(r.reasoning_tokens for r in successful), 'avg_per_request': round(total_output_tokens / len(successful), 2) }, 'throughput': { 'concurrent_total_tps': round(concurrent_throughput, 2), 'concurrent_content_tps': round(content_throughput, 2), 'requests_per_second': round(len(successful) / wall_time, 2) if wall_time > 0 else 0, 'actual_wall_time': round(wall_time, 3), 'efficiency_percent': round(efficiency, 2) } } # Add batch metrics if available if batch_metrics: stats_dict['batch_metrics'] = batch_metrics return stats_dict # ============================================================================ # HTTP CLIENT # ============================================================================ def create_http_client(concurrent_requests: int, request_timeout: int) -> httpx.AsyncClient: """Create HTTP client configured for high concurrency.""" max_connections = max(concurrent_requests * CONNECTION_MULTIPLIER, MIN_HTTP_CONNECTIONS) logging.info(f"Creating HTTP client with max_connections={max_connections}") return httpx.AsyncClient( # http2=True, limits=httpx.Limits( max_keepalive_connections=max_connections, max_connections=max_connections, keepalive_expiry=1800 ), timeout=httpx.Timeout( timeout=request_timeout, connect=1800.0, read=request_timeout, write=1800.0, pool=5.0 ) ) # ============================================================================ # MODEL INITIALIZATION # ============================================================================ async def wait_for_model( endpoint_url: str, api_key: str, model_name: str, max_retries: int = 10, retry_delay: int = 30 ) -> bool: """Wait for model to be ready.""" logging.info(f"Waiting for model '{model_name}' to initialize...") http_client = create_http_client(1, 60) client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0) try: for attempt in range(1, max_retries + 1): try: logging.info(f"Initialization check {attempt}/{max_retries}...") await client.chat.completions.create( model=model_name, messages=[{'role': 'user', 'content': 'Hello'}], max_tokens=10, stream=False ) logging.info("Model is ready!") return True except Exception as e: error_msg = str(e).lower() if any(kw in error_msg for kw in ['initializing', 'loading', 'starting', 'not ready', 'unavailable']): if attempt < max_retries: logging.info(f"Model initializing... waiting {retry_delay}s") await asyncio.sleep(retry_delay) else: logging.error(f"Model failed to initialize after {max_retries} attempts") return False else: logging.error(f"Error: {e}") return False finally: await http_client.aclose() return False # ============================================================================ # BENCHMARK EXECUTION # ============================================================================ async def run_benchmark( config: BenchmarkConfig, endpoint_url: str, api_key: str, model_name: str, request_timeout: int, log_io: bool = False, io_logger: Optional[logging.Logger] = None, dataset: Optional[dict] = None, text_content: Optional[str] = None, stream: bool = True ) -> Optional[Dict]: """Run a single benchmark configuration in batch mode.""" total_requests = config.num_batches * config.batch_size logging.info(f"\n{'='*60}") logging.info(f"Starting benchmark: {config.num_batches} batches" f"{config.batch_size} requests/batch = {total_requests} total") logging.info(f"Target: {config.input_tokens} tokens, Output: {config.output_tokens} tokens") if text_content: logging.info(f"Using custom text input") elif dataset: logging.info(f"Using conversation dataset") logging.info(f"{'='*60}") http_client = create_http_client(config.batch_size, request_timeout) try: client = AsyncOpenAI(base_url=endpoint_url, api_key=api_key, http_client=http_client, max_retries=0) results = [] start_time = time.time() # Send batches of requests for batch_id in range(config.num_batches): batch_start = time.time() logging.info(f" Starting batch {batch_id + 1}/{config.num_batches} " f"({config.batch_size} simultaneous requests)...") batch_results = await make_batch_request( client=client, config=config, model_name=model_name, request_timeout=request_timeout, batch_size=config.batch_size, batch_id=batch_id, log_io=log_io, io_logger=io_logger, dataset=dataset, text_content=text_content, stream=stream ) results.extend(batch_results) # Log batch completion with detailed metrics batch_time = time.time() - batch_start successful = sum(1 for r in batch_results if r.success) failed = len(batch_results) - successful batch_throughput = sum(r.total_tokens for r in batch_results if r.success) / batch_time if batch_time > 0 else 0 # Calculate average TTFT for this batch ttfts = [r.time_to_first_token for r in batch_results if r.success and r.time_to_first_token] avg_ttft = sum(ttfts) / len(ttfts) if ttfts else 0 # Calculate average latency for this batch latencies = [r.elapsed_time for r in batch_results if r.success] avg_latency = sum(latencies) / len(latencies) if latencies else 0 status_icon = "✓" if failed == 0 else "⚠" logging.info(f"{status_icon} Batch {batch_id + 1}/{config.num_batches} complete: " f"{successful}/{len(batch_results)} successful " f"| {batch_time:.2f}s total " f"| {batch_throughput:.0f} tok/s " f"| TTFT: {avg_ttft:.3f}s " f"| Latency: {avg_latency:.2f}s") if failed > 0: logging.warning(f" {failed} request(s) failed in batch {batch_id + 1}") # Small delay between batches to avoid overwhelming the system if batch_id < config.num_batches - 1: await asyncio.sleep(0.1) end_time = time.time() if not results: logging.error("No results collected!") return None stats = calculate_statistics(results, config) logging.info(f"✓ Benchmark complete in {end_time - start_time:.2f}s") if 'success_metrics' in stats: logging.info(f" Success: {stats['success_metrics']['success_rate']}% " f"({stats['success_metrics']['successful_requests']}/{total_requests})") logging.info(f" P95 Latency: {stats['latency']['p95']}s") logging.info(f" Throughput: {stats['throughput']['concurrent_total_tps']} tokens/s") if 'batch_metrics' in stats: logging.info(f" Avg Batch Throughput: {stats['batch_metrics']['avg_batch_throughput']} tokens/s") else: logging.error(f" All {total_requests} requests failed. Error: {stats.get('error', 'unknown')}") return stats finally: await http_client.aclose() async def run_all_benchmarks( configs: List[BenchmarkConfig], endpoint_url: str, api_key: str, model_name: str, request_timeout: int, delay_between_runs: int = 5, log_io: bool = False, io_logger: Optional[logging.Logger] = None, wait_for_ready: bool = True, max_init_retries: int = 10, init_retry_delay: int = 30, dataset: Optional[dict] = None, text_content: Optional[str] = None, stream: bool = True ) -> List[Dict]: """Run all benchmark configurations.""" if wait_for_ready: if not await wait_for_model(endpoint_url, api_key, model_name, max_init_retries, init_retry_delay): logging.error("Model initialization failed. Aborting.") return [] all_results = [] for i, config in enumerate(configs, 1): logging.info(f"\n--- Benchmark {i}/{len(configs)} ---") result = await run_benchmark( config, endpoint_url, api_key, model_name, request_timeout, log_io, io_logger, dataset, text_content, stream ) if result: all_results.append(result) else: logging.error(f"Benchmark {i} failed") if i < len(configs): logging.info(f"Waiting {delay_between_runs}s before next run...") await asyncio.sleep(delay_between_runs) # Log summary if all_results: successful = len(all_results) total = len(configs) logging.info(f"\n{'='*60}") logging.info(f"ALL BENCHMARKS COMPLETE") logging.info(f"{'='*60}") logging.info(f"Completed: {successful}/{total} benchmarks") # Overall stats total_requests = sum(r['config']['total_requests'] for r in all_results) total_successful = sum(r['success_metrics']['successful_requests'] for r in all_results) overall_success_rate = (total_successful / total_requests * 100) if total_requests > 0 else 0 logging.info(f"Total requests: {total_requests}") logging.info(f"Successful: {total_successful} ({overall_success_rate:.1f}%)") logging.info(f"{'='*60}") return all_results # ============================================================================ # FILE I/O # ============================================================================ def create_results_directory(model_name: str) -> Tuple[Path, str]: """Create results directory.""" if 'ubiops-deployment/' in model_name: parts = model_name.split('/') if len(parts) >= 4: model_name = parts[-1] safe_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in model_name) results_dir = Path('results') / f'results_{safe_name}' results_dir.mkdir(parents=True, exist_ok=True) logging.info(f"Results directory: {results_dir}") return results_dir, safe_name def save_results(results: List[Dict], results_dir: Path, model_name: str) -> Path: """Save benchmark results to JSON.""" output_path = results_dir / 'benchmark_results.json' output_data = { 'timestamp': datetime.now().isoformat(), 'model_name': model_name, 'results': results } with open(output_path, 'w') as f: json.dump(output_data, f, indent=2) logging.info(f"Results saved: {output_path}") return output_path def load_config_file(config_path: str) -> Dict: """Load configuration from YAML file.""" with open(config_path, 'r') as f: return yaml.safe_load(f) def save_config_copy(config_data: Dict, results_dir: Path) -> None: """Save sanitized config copy.""" if 'endpoint' in config_data and 'api_key' in config_data['endpoint']: config_data['endpoint']['api_key'] = '' config_path = results_dir / 'config_used.yaml' with open(config_path, 'w') as f: yaml.dump(config_data, f, default_flow_style=False, sort_keys=False) logging.info(f"Config saved: {config_path}") # ============================================================================ # CLI INTERFACE # ============================================================================ def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Benchmark LLM deployments via OpenAI-compatible endpoints", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Using config file python benchmark_llm.py --config benchmark_config.yaml # Using CLI arguments python benchmark_llm.py --endpoint_url https://api.example.com/v1 \\ --api_key YOUR_KEY \\ --model_name gpt-4 \\ --input_tokens 1000 5000 10000 \\ --concurrent_requests 1 8 16 32 """ ) parser.add_argument('--config', type=str, help="Path to YAML config file") parser.add_argument('--endpoint_url', type=str, help="OpenAI-compatible endpoint URL") parser.add_argument('--api_key', type=str, help="API key") parser.add_argument('--model_name', type=str, help="Model name") parser.add_argument('--input_tokens', type=int, nargs='+', default=[1000, 5000, 10000], help="Input token counts to test") parser.add_argument('--batch_sizes', type=int, nargs='+', default=[16, 32, 64, 128], help="Batch sizes to test (number of simultaneous requests)") parser.add_argument('--num_batches', type=int, default=10, help="Number of batches to send per configuration") parser.add_argument('--output_tokens', type=int, default=256, help="Output tokens per request") parser.add_argument('--dataset', type=str, default=None, help="Path to conversation dataset JSON file (optional)") parser.add_argument('--text', type=str, default=None, help="Custom text to use as input for all requests (same text repeated)") parser.add_argument('--request_timeout', type=int, default=900, help="Request timeout (seconds)") parser.add_argument('--delay_between_runs', type=int, default=5, help="Delay between runs (seconds)") parser.add_argument('--no_stream', action='store_true', help="Use non-streaming requests instead of streaming") parser.add_argument('--log_io', action='store_true', help="Log all input/output") parser.add_argument('--skip_init_wait', action='store_true', help="Skip model initialization wait") parser.add_argument('--max_init_retries', type=int, default=10, help="Max initialization retries") parser.add_argument('--init_retry_delay', type=int, default=30, help="Delay between init retries") return parser.parse_args() def generate_configs( input_tokens: List[int], batch_sizes: List[int], num_batches: int, output_tokens: int ) -> List[BenchmarkConfig]: """Generate benchmark configurations.""" configs = [] for input_tok in input_tokens: for batch_size in batch_sizes: configs.append(BenchmarkConfig( input_tokens=input_tok, batch_size=batch_size, num_batches=num_batches, output_tokens=output_tokens )) return configs # ============================================================================ # MAIN # ============================================================================ async def main(): """Main entry point.""" args = parse_args() # Load configuration if args.config: config_data = load_config_file(args.config) endpoint_url = config_data['endpoint']['url'] api_key = config_data['endpoint']['api_key'] model_name = config_data['endpoint']['model_name'] bench_config = config_data['benchmark'] input_tokens = bench_config['input_tokens'] batch_sizes = bench_config['batch_sizes'] num_batches = bench_config['num_batches'] output_tokens = bench_config['output_tokens'] dataset_path = bench_config.get('dataset', None) custom_text = bench_config.get('text', None) runtime = config_data.get('runtime', {}) request_timeout = runtime.get('request_timeout', 900) delay_between_runs = runtime.get('delay_between_runs', 5) log_io = runtime.get('log_io', False) stream = runtime.get('stream', True) wait_for_ready = runtime.get('wait_for_ready', True) max_init_retries = runtime.get('max_init_retries', 10) init_retry_delay = runtime.get('init_retry_delay', 30) else: # Use CLI arguments if not all([args.endpoint_url, args.api_key, args.model_name]): logging.error("Must provide --config or all of --endpoint_url, --api_key, --model_name") return config_data = None endpoint_url = args.endpoint_url api_key = args.api_key model_name = args.model_name input_tokens = args.input_tokens batch_sizes = args.batch_sizes num_batches = args.num_batches output_tokens = args.output_tokens dataset_path = args.dataset custom_text = args.text request_timeout = args.request_timeout delay_between_runs = args.delay_between_runs log_io = args.log_io stream = not args.no_stream wait_for_ready = not args.skip_init_wait max_init_retries = args.max_init_retries init_retry_delay = args.init_retry_delay # Load conversation dataset if provided dataset = None if dataset_path: try: dataset = load_conversation_dataset(dataset_path) except Exception as e: logging.error(f"Failed to load dataset: {e}") logging.error("Continuing with generated prompts...") # Use custom text if provided (takes priority over everything) text_content = custom_text # Create results directory results_dir, safe_name = create_results_directory(model_name) io_logger = setup_logging(results_dir, log_io) if config_data: save_config_copy(config_data, results_dir) # Generate benchmark configurations configs = generate_configs(input_tokens, batch_sizes, num_batches, output_tokens) logging.info(f"\n{'='*60}") logging.info(f"BENCHMARK CONFIGURATION") logging.info(f"{'='*60}") logging.info(f"Model: {model_name}") logging.info(f"Endpoint: {endpoint_url}") logging.info(f"Mode: BATCH ({'streaming' if stream else 'non-streaming'})") if text_content: logging.info(f"Input: Custom text ({len(text_content)} chars)") elif dataset: logging.info(f"Input: {dataset_path} (real conversations)") else: logging.info(f"Input: Generated prompts") logging.info(f"Total configurations: {len(configs)}") logging.info(f"Input tokens: {input_tokens}") logging.info(f"Batch sizes: {batch_sizes}") logging.info(f"Batches per config: {num_batches}") logging.info(f"Output tokens: {output_tokens}") logging.info(f"{'='*60}\n") # Run benchmarks results = await run_all_benchmarks( configs=configs, endpoint_url=endpoint_url, api_key=api_key, model_name=model_name, request_timeout=request_timeout, delay_between_runs=delay_between_runs, log_io=log_io, io_logger=io_logger, wait_for_ready=wait_for_ready, max_init_retries=max_init_retries, init_retry_delay=init_retry_delay, dataset=dataset, text_content=text_content, stream=stream ) if results: results_path = save_results(results, results_dir, model_name) logging.info(f"\n{'='*60}") logging.info("BENCHMARK COMPLETE!") logging.info(f"{'='*60}") logging.info(f"Results: {results_path}") if log_io: logging.info(f"I/O log: {results_dir / 'benchmark_io.log'}") logging.info(f"\nVisualize: python visualize_results.py --input {results_path}") else: logging.error("No results generated") if __name__ == '__main__': asyncio.run(main())