From ad2cd3373f7d0ef115fc526453494c07756b613a Mon Sep 17 00:00:00 2001 From: Eugene Rakhmatulin Date: Wed, 25 Mar 2026 14:18:00 -0700 Subject: [PATCH] .env configuration support for launch-cluster.sh --- .env.example | 29 +++ launch-cluster.sh | 138 +++++++++++ run-recipe.py | 612 +++++++++++++++++++++++++++++----------------- 3 files changed, 548 insertions(+), 231 deletions(-) create mode 100644 .env.example diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..2a3acbb --- /dev/null +++ b/.env.example @@ -0,0 +1,29 @@ +# Example .env configuration file for spark-vllm-docker +# Copy this file to .env and customize for your environment + +# Cluster configuration +# CLUSTER_NODES: Comma-separated list of node IPs (first node is the head node) +CLUSTER_NODES="192.168.1.1,192.168.1.2,192.168.1.3" + +# ETH_IF: Ethernet interface name (optional, auto-detected if not specified) +ETH_IF="eth0" + +# IB_IF: InfiniBand interface name (optional, auto-detected if not specified) +IB_IF="ib0" + +# MASTER_PORT: Port for cluster coordination (default: 29501) +MASTER_PORT="29501" + +# CONTAINER_NAME: Container name (default: vllm_node) +CONTAINER_NAME="vllm_node" + +# Container environment variables +# Any variable starting with CONTAINER_ will be converted to -e flags +# Example: CONTAINER_NCCL_DEBUG=INFO becomes -e NCCL_DEBUG=INFO +CONTAINER_NCCL_DEBUG="INFO" +CONTAINER_HF_TOKEN="your_huggingface_token_here" +CONTAINER_NCCL_IGNORE_CPU_AFFINITY="1" + +# Additional container environment variables +# CONTAINER_MAX_JOBS="16" +# CONTAINER_CUDA_VISIBLE_DEVICES="0,1" diff --git a/launch-cluster.sh b/launch-cluster.sh index f11ab11..1f6bfb9 100755 --- a/launch-cluster.sh +++ b/launch-cluster.sh @@ -30,6 +30,7 @@ MOD_PATHS=() MOD_TYPES=() LAUNCH_SCRIPT_PATH="" SCRIPT_DIR="$(dirname "$(realpath "$0")")" +CONFIG_FILE="" # Will be set to default after argument parsing ACTIONS_ARG="" SOLO_MODE="false" @@ -67,9 +68,27 @@ usage() { echo " --mem-swap-limit-gb Memory+swap limit in GB (default: mem-limit + 10, only with --non-privileged)" echo " --pids-limit Process limit (default: 4096, only with --non-privileged)" echo " --shm-size-gb Shared memory size in GB (default: 64, only with --non-privileged)" + echo " --config Path to .env configuration file (default: .env in script directory)" echo " action start | stop | status | exec (Default: start). Not compatible with --launch-script." echo " command Command to run (only for 'exec' action). Not compatible with --launch-script." echo "" + echo "Supported .env file variables:" + echo " CLUSTER_NODES Comma-separated list of node IPs" + echo " ETH_IF Ethernet interface name" + echo " IB_IF InfiniBand interface name" + echo " MASTER_PORT Port for cluster coordination (default: 29501)" + echo " CONTAINER_NAME Container name (default: vllm_node)" + echo " CONTAINER_* Any variable starting with CONTAINER_ becomes -e flag" + echo " Example: CONTAINER_NCCL_DEBUG=INFO -> -e NCCL_DEBUG=INFO" + echo "" + echo "Example .env file:" + echo " CLUSTER_NODES=192.168.1.1,192.168.1.2" + echo " ETH_IF=eth0" + echo " IB_IF=ib0" + echo " MASTER_PORT=29501" + echo " CONTAINER_NCCL_DEBUG=INFO" + echo " CONTAINER_HF_TOKEN=abc123" + echo "" echo "Launch Script Usage:" echo " $0 --launch-script examples/my-script.sh # Script copied to container and executed" echo " $0 --launch-script /path/to/script.sh # Uses absolute path to script" @@ -108,6 +127,7 @@ while [[ "$#" -gt 0 ]]; do --shm-size-gb) SHM_SIZE_GB="$2"; shift ;; -d) DAEMON_MODE="true" ;; -h|--help) usage ;; + --config) CONFIG_FILE="$2"; shift ;; start|stop|status) if [[ -n "$LAUNCH_SCRIPT_PATH" ]]; then echo "Error: Action '$1' is not compatible with --launch-script. Please omit the action or not use --launch-script." @@ -133,6 +153,108 @@ while [[ "$#" -gt 0 ]]; do shift done +# Set .env file path (use default if not specified) +if [[ -z "$CONFIG_FILE" ]]; then + CONFIG_FILE="$SCRIPT_DIR/.env" +fi + +# Load .env file if exists +if [[ -f "$CONFIG_FILE" ]]; then + echo "Loading configuration from .env file..." + + # Validate .env file syntax + if ! python3 -c " +import sys +import re + +env_file = '$CONFIG_FILE' +seen_keys = set() + +with open(env_file, 'r') as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + + # Check for key=value format + if '=' not in line: + print(f'Error: Invalid syntax at line {line_num}: missing \"=\"') + sys.exit(1) + + key = line.split('=', 1)[0].strip() + + # Validate key format (alphanumeric + underscore) + if not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', key): + print(f'Error: Invalid key format at line {line_num}: {key}') + sys.exit(1) + + # Check for duplicates + if key in seen_keys: + print(f'Error: Duplicate key at line {line_num}: {key}') + sys.exit(1) + + seen_keys.add(key) + +sys.exit(0) +" 2>/dev/null; then + echo "Error: Invalid .env file syntax. Aborting." + exit 1 + fi + + # Load .env variables with DOTENV_ prefix + while IFS='=' read -r key value || [[ -n "$key" ]]; do + # Skip comments and empty lines + [[ "$key" =~ ^[[:space:]]*# ]] && continue + [[ -z "$key" ]] && continue + + # Remove leading/trailing whitespace from key + key=$(echo "$key" | xargs) + + # Skip if key is empty after trimming + [[ -z "$key" ]] && continue + + # Remove quotes and whitespace from value using Python for proper shlex handling + value=$(python3 -c " +import shlex +import sys +value = '''$value''' +# Strip whitespace +value = value.strip() +# Remove surrounding quotes if present +if (value.startswith('\"') and value.endswith('\"')) or (value.startswith(\"'\" ) and value.endswith(\"'\")): + value = value[1:-1] +print(value) +") + + # Export with DOTENV_ prefix + export "DOTENV_$key=$value" + done < "$CONFIG_FILE" + + echo "Loaded .env variables: $(compgen -v DOTENV_ | tr '\n' ' ')" +fi + +# Apply .env configuration (CLI args take precedence) +if [[ -z "$NODES_ARG" && -n "$DOTENV_CLUSTER_NODES" ]]; then + NODES_ARG="$DOTENV_CLUSTER_NODES" +fi + +if [[ -z "$ETH_IF" && -n "$DOTENV_ETH_IF" ]]; then + ETH_IF="$DOTENV_ETH_IF" +fi + +if [[ -z "$IB_IF" && -n "$DOTENV_IB_IF" ]]; then + IB_IF="$DOTENV_IB_IF" +fi + +if [[ -z "$MASTER_PORT" || "$MASTER_PORT" == "29501" ]] && [[ -n "$DOTENV_MASTER_PORT" ]]; then + MASTER_PORT="$DOTENV_MASTER_PORT" +fi + +if [[ -z "$CONTAINER_NAME" || "$CONTAINER_NAME" == "vllm_node" ]] && [[ -n "$DOTENV_CONTAINER_NAME" ]]; then + CONTAINER_NAME="$DOTENV_CONTAINER_NAME" +fi + # Validate non-privileged mode flags if [[ "$NON_PRIVILEGED_MODE" == "true" ]]; then # Set default swap limit if not specified @@ -163,6 +285,22 @@ if [[ -n "$NCCL_DEBUG_VAL" ]]; then esac fi +# Add container environment variables from .env (CONTAINER_* pattern) +for env_var in $(compgen -v DOTENV_CONTAINER_); do + # Get the value + value="${!env_var}" + + # Extract the actual env var name (remove DOTENV_CONTAINER_ prefix) + actual_var="${env_var#DOTENV_CONTAINER_}" + + # Properly escape the value for shell using Python + escaped_value=$(python3 -c "import shlex; print(shlex.quote('$value'))") + + # Add to docker args + DOCKER_ARGS="$DOCKER_ARGS -e $actual_var=$escaped_value" + echo "Adding container env: $actual_var" +done + # Add build job parallelization environment variables if BUILD_JOBS is set if [[ -n "$BUILD_JOBS" ]]; then DOCKER_ARGS="$DOCKER_ARGS -e MAX_JOBS=$BUILD_JOBS" diff --git a/run-recipe.py b/run-recipe.py index ba4563b..c2aa072 100755 --- a/run-recipe.py +++ b/run-recipe.py @@ -105,21 +105,21 @@ LAUNCH_SCRIPT = SCRIPT_DIR / "launch-cluster.sh" BUILD_SCRIPT = SCRIPT_DIR / "build-and-copy.sh" DOWNLOAD_SCRIPT = SCRIPT_DIR / "hf-download.sh" AUTODISCOVER_SCRIPT = SCRIPT_DIR / "autodiscover.sh" -ENV_FILE = SCRIPT_DIR / ".env" +ENV_FILE = None # Will be set from CLI argument or default def load_recipe(recipe_path: Path) -> dict[str, Any]: """ Load and validate a recipe YAML file. - + This function handles recipe resolution from multiple locations and validates required fields. Recipes are the core configuration format for deployments. - + EXTENSIBILITY: - To add new required fields: Add to the 'required' list below - To add new optional fields with defaults: Add to the setdefault() calls at the end - Recipe search order: exact path -> recipes/ dir -> with .yaml -> with .yml - + RECIPE SCHEMA: name (str, required): Human-readable name for the recipe recipe_version (str, required): Schema version for compatibility checking. @@ -135,13 +135,13 @@ def load_recipe(recipe_path: Path) -> dict[str, Any]: build_args (list[str], optional): Extra args for build-and-copy.sh (e.g., ['-f', 'Dockerfile.mxfp4']) cluster_only (bool, optional): If True, recipe cannot run in solo mode solo_only (bool, optional): If True, recipe cannot run in cluster mode - + Args: recipe_path: Path object pointing to YAML file or just recipe name - + Returns: Validated recipe dictionary with all fields populated (defaults applied) - + Raises: SystemExit: If recipe not found or validation fails """ @@ -161,17 +161,17 @@ def load_recipe(recipe_path: Path) -> dict[str, Any]: print(f"Error: Recipe not found: {recipe_path}") print(f"Searched in: {recipe_path}, {RECIPES_DIR}") sys.exit(1) - + with open(recipe_path) as f: recipe = yaml.safe_load(f) - + # Validate required fields required = ["name", "recipe_version", "container", "command"] for field in required: if field not in recipe: print(f"Error: Recipe missing required field: {field}") sys.exit(1) - + # Set defaults for optional fields recipe.setdefault("description", "") recipe.setdefault("model", None) @@ -180,26 +180,28 @@ def load_recipe(recipe_path: Path) -> dict[str, Any]: recipe.setdefault("env", {}) recipe.setdefault("cluster_only", False) recipe.setdefault("solo_only", False) - + # Validate recipe version compatibility # EXTENSIBILITY: When adding new schema versions, update SUPPORTED_VERSIONS # and add migration/compatibility logic below SUPPORTED_VERSIONS = ["1"] recipe_ver = str(recipe["recipe_version"]) if recipe_ver not in SUPPORTED_VERSIONS: - print(f"Warning: Recipe uses schema version '{recipe_ver}', but this run-recipe.py supports: {SUPPORTED_VERSIONS}") + print( + f"Warning: Recipe uses schema version '{recipe_ver}', but this run-recipe.py supports: {SUPPORTED_VERSIONS}" + ) print("Some features may not work correctly. Consider updating run-recipe.py.") - + return recipe def list_recipes() -> None: """ List all available recipes with their metadata. - + Scans the recipes/ directory for YAML files and displays key information. Used by the --list CLI option. - + EXTENSIBILITY: - To show additional fields: Add them to the print statements in the loop - To support different output formats (e.g., JSON): Add a format parameter @@ -208,12 +210,12 @@ def list_recipes() -> None: if not RECIPES_DIR.exists(): print("No recipes directory found.") return - + recipes = sorted(RECIPES_DIR.glob("*.yaml")) if not recipes: print("No recipes found in recipes/ directory.") return - + print("Available recipes:\n") for recipe_path in recipes: try: @@ -227,7 +229,7 @@ def list_recipes() -> None: mods = recipe.get("mods", []) cluster_only = recipe.get("cluster_only", False) solo_only = recipe.get("solo_only", False) - + print(f" {recipe_path.name}") print(f" Name: {name}") if desc: @@ -252,77 +254,85 @@ def list_recipes() -> None: def check_image_exists(image: str, host: str | None = None) -> bool: """ Check if a Docker image exists locally or on a remote host. - + Used to avoid redundant builds and to verify cluster nodes have the image. - + EXTENSIBILITY: - To support other container runtimes (podman): Modify the docker command - To add image version/digest checking: Parse 'docker image inspect' JSON output - For custom SSH options: Modify the ssh command array - + Args: image: Docker image tag to check (e.g., 'vllm-node-mxfp4') host: Optional remote hostname/IP. If None, checks locally. - + Returns: True if image exists, False otherwise """ if host: result = subprocess.run( - ["ssh", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no", - host, f"docker image inspect '{image}'"], - capture_output=True + [ + "ssh", + "-o", + "BatchMode=yes", + "-o", + "StrictHostKeyChecking=no", + host, + f"docker image inspect '{image}'", + ], + capture_output=True, ) else: result = subprocess.run( - ["docker", "image", "inspect", image], - capture_output=True + ["docker", "image", "inspect", image], capture_output=True ) return result.returncode == 0 -def build_image(image: str, copy_to: list[str] | None = None, build_args: list[str] | None = None) -> bool: +def build_image( + image: str, copy_to: list[str] | None = None, build_args: list[str] | None = None +) -> bool: """ Build the container image using build-and-copy.sh. - + Delegates to the build-and-copy.sh script which handles multi-stage builds, cache optimization, and distribution to worker nodes. - + EXTENSIBILITY: - To add new build options: Add them to build_args in the recipe's build_args field - To support different Dockerfiles: Use build_args = ['-f', 'Dockerfile.custom'] - To add build-time secrets: Modify cmd array to include --secret flags - To add progress callbacks: Capture subprocess output line-by-line - + BUILD_ARGS EXAMPLES: ['-f', 'Dockerfile.mxfp4'] - Use alternate Dockerfile ['--no-cache'] - Force full rebuild ['--build-arg', 'VAR=value'] - Pass build-time variables - + Args: image: Target image tag copy_to: List of worker hostnames to copy image to after build build_args: Extra arguments passed to build-and-copy.sh - + Returns: True if build (and copy) succeeded, False otherwise """ if not BUILD_SCRIPT.exists(): print(f"Error: Build script not found: {BUILD_SCRIPT}") return False - + cmd = [str(BUILD_SCRIPT), "-t", image] if build_args: cmd.extend(build_args) if copy_to: cmd.extend(["--copy-to", ",".join(copy_to)]) - + print(f"Building image '{image}'...") if build_args: print(f"Build args: {' '.join(build_args)}") if copy_to: print(f"Will copy to: {', '.join(copy_to)}") - + result = subprocess.run(cmd) return result.returncode == 0 @@ -330,35 +340,35 @@ def build_image(image: str, copy_to: list[str] | None = None, build_args: list[s def download_model(model: str, copy_to: list[str] | None = None) -> bool: """ Download model from HuggingFace using hf-download.sh. - + Delegates to hf-download.sh which handles HF authentication, caching, and rsync to worker nodes. - + EXTENSIBILITY: - To support other model sources: Create a new download script and switch based on model URL - To add download progress: Capture subprocess output - To support private models: hf-download.sh uses HF_TOKEN env var - To add model verification: Check sha256 of downloaded files - + Args: model: HuggingFace model ID (e.g., 'Salyut1/GLM-4.7-NVFP4') copy_to: List of worker hostnames to copy model cache to - + Returns: True if download (and copy) succeeded, False otherwise """ if not DOWNLOAD_SCRIPT.exists(): print(f"Error: Download script not found: {DOWNLOAD_SCRIPT}") return False - + cmd = [str(DOWNLOAD_SCRIPT), model] if copy_to: cmd.extend(["--copy-to", ",".join(copy_to)]) - + print(f"Downloading model '{model}'...") if copy_to: print(f"Will copy to: {', '.join(copy_to)}") - + result = subprocess.run(cmd) return result.returncode == 0 @@ -366,17 +376,17 @@ def download_model(model: str, copy_to: list[str] | None = None) -> bool: def check_model_exists(model: str) -> bool: """ Check if a model exists in the HuggingFace cache. - + Checks the standard HF cache location for completed downloads. - + EXTENSIBILITY: - To support custom cache locations: Add HF_HOME env var support - To verify model integrity: Check for complete snapshot with config.json - To support other model sources: Add URL/path prefix detection - + Args: model: HuggingFace model ID (e.g., 'org/model-name') - + Returns: True if model appears to be fully downloaded, False otherwise """ @@ -384,7 +394,7 @@ def check_model_exists(model: str) -> bool: # e.g., "Salyut1/GLM-4.7-NVFP4" -> "models--Salyut1--GLM-4.7-NVFP4" cache_name = f"models--{model.replace('/', '--')}" cache_path = Path.home() / ".cache" / "huggingface" / "hub" / cache_name - + if cache_path.exists(): # Check for snapshots directory which indicates complete download snapshots = cache_path / "snapshots" @@ -393,19 +403,25 @@ def check_model_exists(model: str) -> bool: return False -def generate_launch_script(recipe: dict[str, Any], overrides: dict[str, Any], is_solo: bool = False, extra_args: list[str] | None = None, no_ray: bool = False) -> str: +def generate_launch_script( + recipe: dict[str, Any], + overrides: dict[str, Any], + is_solo: bool = False, + extra_args: list[str] | None = None, + no_ray: bool = False, +) -> str: """ Generate a bash launch script from the recipe. - + Creates a self-contained bash script that runs inside the container. Handles template substitution, environment variables, and solo mode adjustments. - + EXTENSIBILITY: - To add new template variables: Add them to recipe['defaults'] or CLI overrides - To add pre/post hooks: Add 'pre_command'/'post_command' fields to recipe schema - To add conditional logic: Use Jinja2 templating instead of str.format() - To support GPU selection: Add CUDA_VISIBLE_DEVICES to env handling - + TEMPLATE VARIABLES (use {variable_name} in recipe command): port: API server port (default from recipe) host: API server bind address @@ -413,42 +429,42 @@ def generate_launch_script(recipe: dict[str, Any], overrides: dict[str, Any], is gpu_memory_utilization: Fraction of GPU memory to use max_model_len: Maximum sequence length (custom variables can be added via recipe defaults) - + SOLO MODE BEHAVIOR: - Removes '--distributed-executor-backend ray' lines - Typically sets tensor_parallel=1 (handled by caller) - + EXTRA ARGS: - Appended verbatim to the end of the vLLM command - Allows passing any vLLM argument not covered by template variables - vLLM uses "last wins" semantics for duplicate arguments - + Args: recipe: Loaded recipe dictionary overrides: CLI-provided parameter overrides (take precedence over defaults) is_solo: If True, strip distributed executor configuration extra_args: Additional arguments to append to vLLM command (after --) - + Returns: Complete bash script content as string - + Raises: SystemExit: If required template variables are missing """ # Merge defaults with overrides params = {**recipe.get("defaults", {}), **overrides} - + # Build the script lines = ["#!/bin/bash", f"# Generated from recipe: {recipe['name']}", ""] - + # Add environment variables env_vars = recipe.get("env", {}) if env_vars: lines.append("# Environment variables") for key, value in env_vars.items(): - lines.append(f"export {key}=\"{value}\"") + lines.append(f'export {key}="{value}"') lines.append("") - + # Format the command with parameters command = recipe["command"] try: @@ -457,49 +473,47 @@ def generate_launch_script(recipe: dict[str, Any], overrides: dict[str, Any], is print(f"Error: Missing parameter in recipe command: {e}") print(f"Available parameters: {list(params.keys())}") sys.exit(1) - + # In solo or no-ray mode, remove --distributed-executor-backend # (not needed for solo; no-ray uses PyTorch distributed instead) if is_solo or no_ray: import re + # Remove just the flag and its value, not the whole line - command = re.sub(r'--distributed-executor-backend\s+\S+', '', command) + command = re.sub(r"--distributed-executor-backend\s+\S+", "", command) # Remove lines that are now empty or just a backslash continuation - lines_list = command.split('\n') - filtered_lines = [ - line for line in lines_list - if line.strip() not in ('', '\\') - ] - command = '\n'.join(filtered_lines) + lines_list = command.split("\n") + filtered_lines = [line for line in lines_list if line.strip() not in ("", "\\")] + command = "\n".join(filtered_lines) # Remove trailing backslash if present command = command.rstrip() - if command.endswith('\\'): - command = command.rstrip('\\\n').rstrip() - + if command.endswith("\\"): + command = command.rstrip("\\\n").rstrip() + # Append extra args if provided (after --) if extra_args: # Join extra args and append to command - extra_args_str = ' '.join(shlex.quote(a) for a in extra_args) - command = command + ' ' + extra_args_str - + extra_args_str = " ".join(shlex.quote(a) for a in extra_args) + command = command + " " + extra_args_str + lines.append("# Run the model") lines.append(command.strip()) lines.append("") - + return "\n".join(lines) def parse_nodes(nodes_arg: str | None) -> list[str]: """ Parse comma-separated node list. - + Simple utility to split node specifications. The first node is always treated as the head node for cluster deployments. - + Args: nodes_arg: Comma-separated string like '192.168.1.1,192.168.1.2' - + Returns: List of stripped node identifiers, empty list if input is None/empty """ @@ -511,13 +525,13 @@ def parse_nodes(nodes_arg: str | None) -> list[str]: def get_worker_nodes(nodes: list[str]) -> list[str]: """ Get worker nodes (all nodes except the first/head node). - + In a Ray cluster, the first node runs the head process. Workers are all subsequent nodes that join the cluster. - + Args: nodes: Full list of nodes (head first, then workers) - + Returns: List of worker nodes (excluding head), empty if single node """ @@ -529,20 +543,20 @@ def get_worker_nodes(nodes: list[str]) -> list[str]: def load_env_file() -> dict[str, str]: """ Load environment variables from .env file. - + Reads the .env file created by --discover for persistent cluster configuration. - + EXTENSIBILITY: - To add new persistent settings: Just add them to save_env_file() - To support multiple .env files: Add a --env-file CLI argument - To add validation: Check for required keys after loading - + SUPPORTED KEYS (set by --discover): CLUSTER_NODES: Comma-separated list of node IPs LOCAL_IP: This machine's IP address ETH_IF: Ethernet interface name IB_IF: InfiniBand interface name (if available) - + Returns: Dictionary of key=value pairs from .env file """ @@ -562,15 +576,15 @@ def load_env_file() -> dict[str, str]: def save_env_file(env: dict[str, str]) -> None: """ Save environment variables to .env file. - + Persists cluster configuration discovered by autodiscover.sh. Values are properly quoted if they contain spaces or commas. - + EXTENSIBILITY: - To add new persistent settings: Just add them to the env dict before calling - To add timestamps/metadata: Add comment lines to the output - To support append mode: Read existing, merge, then write - + Args: env: Dictionary of key=value pairs to save """ @@ -582,42 +596,42 @@ def save_env_file(env: dict[str, str]) -> None: else: lines.append(f"{key}={value}") lines.append("") - + with open(ENV_FILE, "w") as f: f.write("\n".join(lines)) - + print(f"Saved to {ENV_FILE}") def run_autodiscover() -> dict[str, str] | None: """ Run autodiscover.sh and return discovered configuration. - + Executes the autodiscover.sh script to detect cluster topology, then presents an interactive node selection menu. - + EXTENSIBILITY: - To add new discovery methods: Extend autodiscover.sh or add Python detection here - To add GPU detection: Add nvidia-smi parsing to discovered env - To skip interactive selection: Add a --non-interactive flag - To add node health checks: Ping/SSH test each discovered node - + DISCOVERED VARIABLES: CLUSTER_NODES: Comma-separated list of node IPs (user-selected) LOCAL_IP: This machine's IP address ETH_IF: Ethernet interface name (e.g., 'eth0') IB_IF: InfiniBand interface name (e.g., 'ibp12s0') if available - + Returns: Dictionary with discovered configuration, or None if discovery failed """ if not AUTODISCOVER_SCRIPT.exists(): print(f"Error: Autodiscover script not found: {AUTODISCOVER_SCRIPT}") return None - + print("Running autodiscover...") print() - + # Run autodiscover in a subshell and capture the variables # We source the script and print the variables we care about script = f""" @@ -630,13 +644,9 @@ def run_autodiscover() -> dict[str, str] | None: echo "ETH_IF=$ETH_IF" echo "IB_IF=$IB_IF" """ - - result = subprocess.run( - ["bash", "-c", script], - capture_output=True, - text=True - ) - + + result = subprocess.run(["bash", "-c", script], capture_output=True, text=True) + if result.returncode != 0: print("Autodiscover output:") print(result.stdout) @@ -644,33 +654,36 @@ def run_autodiscover() -> dict[str, str] | None: print(result.stderr) print("Error: Autodiscover failed") return None - + # Print the autodiscover output (excluding the final variable lines) output_lines = result.stdout.strip().split("\n") env = {} for line in output_lines: - if "=" in line and any(line.startswith(k) for k in ["CLUSTER_NODES=", "LOCAL_IP=", "ETH_IF=", "IB_IF="]): + if "=" in line and any( + line.startswith(k) + for k in ["CLUSTER_NODES=", "LOCAL_IP=", "ETH_IF=", "IB_IF="] + ): key, _, value = line.partition("=") env[key] = value else: print(line) - + print() - + # Interactive node selection if env.get("CLUSTER_NODES"): all_nodes = [n.strip() for n in env["CLUSTER_NODES"].split(",") if n.strip()] local_ip = env.get("LOCAL_IP", "") - + if len(all_nodes) > 1: print("Select which nodes to include in the cluster:") print() - + selected_nodes = [] for node in all_nodes: is_local = node == local_ip label = f"{node} (this machine)" if is_local else node - + # Default to yes for all nodes while True: response = input(f" Include {label}? [Y/n]: ").strip().lower() @@ -681,47 +694,49 @@ def run_autodiscover() -> dict[str, str] | None: break else: print(" Please enter 'y' or 'n'") - + print() - + if not selected_nodes: print("No nodes selected. Aborting.") return None - + if len(selected_nodes) == 1: print(f"Only one node selected: {selected_nodes[0]}") print("This will run in solo mode (single node).") else: - print(f"Selected {len(selected_nodes)} nodes: {', '.join(selected_nodes)}") - + print( + f"Selected {len(selected_nodes)} nodes: {', '.join(selected_nodes)}" + ) + env["CLUSTER_NODES"] = ",".join(selected_nodes) print() - + return env def main(): """ Main entry point for the recipe runner. - + Orchestrates the full deployment pipeline: 1. Parse CLI arguments and load recipe 2. Resolve cluster nodes (CLI -> .env -> autodiscover) 3. Build phase: Build container if missing, copy to workers - 4. Download phase: Download model if missing, copy to workers + 4. Download phase: Download model if missing, copy to workers 5. Run phase: Generate launch script and execute via launch-cluster.sh - + EXTENSIBILITY: - To add new CLI options: Add to the appropriate argument group - To add new phases: Insert between existing phases with similar pattern - To add pre/post hooks: Add hook execution before/after subprocess calls - To add logging: Replace print() with logging module calls - To add config file support: Load defaults from ~/.config/vllm-recipes.yaml - + EXIT CODES: 0: Success 1: Error (recipe not found, build failed, validation error, etc.) - + Returns: Exit code for sys.exit() """ @@ -757,124 +772,229 @@ Examples: # Show current .env configuration %(prog)s --show-env - """ + """, ) - + parser.add_argument( "recipe", nargs="?", - help="Path to recipe YAML file (or just the name without .yaml)" + help="Path to recipe YAML file (or just the name without .yaml)", ) parser.add_argument( - "--list", "-l", - action="store_true", - help="List available recipes" + "--list", "-l", action="store_true", help="List available recipes" ) - + # Setup options setup_group = parser.add_argument_group("Setup options") setup_group.add_argument( "--setup", action="store_true", - help="Full setup: build container (if missing) + download model (if missing) + run" + help="Full setup: build container (if missing) + download model (if missing) + run", ) setup_group.add_argument( "--build-only", action="store_true", - help="Only build/copy the container image, don't run" + help="Only build/copy the container image, don't run", ) setup_group.add_argument( "--download-only", action="store_true", - help="Only download/copy the model, don't run" + help="Only download/copy the model, don't run", ) setup_group.add_argument( - "--force-build", - action="store_true", - help="Force rebuild even if image exists" + "--force-build", action="store_true", help="Force rebuild even if image exists" ) setup_group.add_argument( "--force-download", action="store_true", - help="Force re-download even if model exists" + help="Force re-download even if model exists", ) - + parser.add_argument( "--dry-run", action="store_true", - help="Show what would be executed without running" + help="Show what would be executed without running", ) - + # Override options override_group = parser.add_argument_group("Recipe overrides") override_group.add_argument("--port", type=int, help="Override port") override_group.add_argument("--host", help="Override host") - override_group.add_argument("--tensor-parallel", "--tp", type=int, dest="tensor_parallel", help="Override tensor parallelism") - override_group.add_argument("--gpu-memory-utilization", "--gpu-mem", type=float, dest="gpu_memory_utilization", help="Override GPU memory utilization") - override_group.add_argument("--max-model-len", type=int, dest="max_model_len", help="Override max model length") - + override_group.add_argument( + "--tensor-parallel", + "--tp", + type=int, + dest="tensor_parallel", + help="Override tensor parallelism", + ) + override_group.add_argument( + "--gpu-memory-utilization", + "--gpu-mem", + type=float, + dest="gpu_memory_utilization", + help="Override GPU memory utilization", + ) + override_group.add_argument( + "--max-model-len", + type=int, + dest="max_model_len", + help="Override max model length", + ) + # Launch options (passed to launch-cluster.sh) - launch_group = parser.add_argument_group("Launch options (passed to launch-cluster.sh)") - launch_group.add_argument("--solo", action="store_true", help="Run in solo mode (single node, no Ray)") - launch_group.add_argument("-n", "--nodes", help="Comma-separated list of node IPs (first is head node)") - launch_group.add_argument("-d", "--daemon", action="store_true", help="Run in daemon mode") - launch_group.add_argument("-t", "--container", dest="container_override", help="Override container image from recipe") - launch_group.add_argument("--nccl-debug", choices=["VERSION", "WARN", "INFO", "TRACE"], help="NCCL debug level") - launch_group.add_argument("-e", "--env", action="append", dest="env_vars", default=[], metavar="VAR=VALUE", help="Environment variable to pass to container (e.g. -e HF_TOKEN=xxx). Can be used multiple times.") + launch_group = parser.add_argument_group( + "Launch options (passed to launch-cluster.sh)" + ) + launch_group.add_argument( + "--solo", action="store_true", help="Run in solo mode (single node, no Ray)" + ) + launch_group.add_argument( + "-n", "--nodes", help="Comma-separated list of node IPs (first is head node)" + ) + launch_group.add_argument( + "-d", "--daemon", action="store_true", help="Run in daemon mode" + ) + launch_group.add_argument( + "-t", + "--container", + dest="container_override", + help="Override container image from recipe", + ) + launch_group.add_argument( + "--nccl-debug", + choices=["VERSION", "WARN", "INFO", "TRACE"], + help="NCCL debug level", + ) + launch_group.add_argument( + "-e", + "--env", + action="append", + dest="env_vars", + default=[], + metavar="VAR=VALUE", + help="Environment variable to pass to container (e.g. -e HF_TOKEN=xxx). Can be used multiple times.", + ) launch_group.add_argument( "--no-ray", action="store_true", dest="no_ray", - help="No-Ray mode: run multi-node vLLM without Ray (uses PyTorch distributed backend)" + help="No-Ray mode: run multi-node vLLM without Ray (uses PyTorch distributed backend)", + ) + launch_group.add_argument( + "--master-port", + "--head-port", + type=int, + dest="master_port", + help="Port for cluster coordination (Ray head port or PyTorch distributed master port, default: 29501)", + ) + launch_group.add_argument( + "--name", + dest="container_name", + help="Override container name (default: vllm_node)", + ) + launch_group.add_argument( + "--eth-if", + dest="eth_if", + help="Ethernet interface (overrides .env and auto-detection)", + ) + launch_group.add_argument( + "--ib-if", + dest="ib_if", + help="InfiniBand interface (overrides .env and auto-detection)", + ) + launch_group.add_argument( + "-j", + dest="build_jobs", + type=int, + metavar="N", + help="Number of parallel build jobs inside container", + ) + launch_group.add_argument( + "--no-cache-dirs", + action="store_true", + dest="no_cache_dirs", + help="Do not mount ~/.cache/vllm, ~/.cache/flashinfer, ~/.triton", + ) + launch_group.add_argument( + "--non-privileged", + action="store_true", + dest="non_privileged", + help="Run in non-privileged mode (removes --privileged and --ipc=host)", + ) + launch_group.add_argument( + "--mem-limit-gb", + type=int, + dest="mem_limit_gb", + help="Memory limit in GB (only with --non-privileged)", + ) + launch_group.add_argument( + "--mem-swap-limit-gb", + type=int, + dest="mem_swap_limit_gb", + help="Memory+swap limit in GB (only with --non-privileged)", + ) + launch_group.add_argument( + "--pids-limit", + type=int, + dest="pids_limit", + help="Process limit (only with --non-privileged, default: 4096)", + ) + launch_group.add_argument( + "--shm-size-gb", + type=int, + dest="shm_size_gb", + help="Shared memory size in GB (only with --non-privileged, default: 64)", + ) + + # Config file option + parser.add_argument( + "--config", + dest="config_file", + metavar="FILE", + help="Path to .env configuration file (default: .env in script directory)", ) - launch_group.add_argument("--master-port", "--head-port", type=int, dest="master_port", help="Port for cluster coordination (Ray head port or PyTorch distributed master port, default: 29501)") - launch_group.add_argument("--name", dest="container_name", help="Override container name (default: vllm_node)") - launch_group.add_argument("--eth-if", dest="eth_if", help="Ethernet interface (overrides .env and auto-detection)") - launch_group.add_argument("--ib-if", dest="ib_if", help="InfiniBand interface (overrides .env and auto-detection)") - launch_group.add_argument("-j", dest="build_jobs", type=int, metavar="N", help="Number of parallel build jobs inside container") - launch_group.add_argument("--no-cache-dirs", action="store_true", dest="no_cache_dirs", help="Do not mount ~/.cache/vllm, ~/.cache/flashinfer, ~/.triton") - launch_group.add_argument("--non-privileged", action="store_true", dest="non_privileged", help="Run in non-privileged mode (removes --privileged and --ipc=host)") - launch_group.add_argument("--mem-limit-gb", type=int, dest="mem_limit_gb", help="Memory limit in GB (only with --non-privileged)") - launch_group.add_argument("--mem-swap-limit-gb", type=int, dest="mem_swap_limit_gb", help="Memory+swap limit in GB (only with --non-privileged)") - launch_group.add_argument("--pids-limit", type=int, dest="pids_limit", help="Process limit (only with --non-privileged, default: 4096)") - launch_group.add_argument("--shm-size-gb", type=int, dest="shm_size_gb", help="Shared memory size in GB (only with --non-privileged, default: 64)") # Cluster discovery options discover_group = parser.add_argument_group("Cluster discovery") discover_group.add_argument( "--discover", action="store_true", - help="Auto-detect cluster nodes and save to .env file" + help="Auto-detect cluster nodes and save to .env file", ) discover_group.add_argument( - "--show-env", - action="store_true", - help="Show current .env configuration" + "--show-env", action="store_true", help="Show current .env configuration" ) - + # Use parse_known_args to allow extra vLLM arguments after -- args, extra_args = parser.parse_known_args() - + + # Set .env file path (use default if not specified) + global ENV_FILE + if args.config_file: + ENV_FILE = Path(args.config_file).resolve() + else: + ENV_FILE = SCRIPT_DIR / ".env" + # Filter out the -- separator if present - if extra_args and extra_args[0] == '--': + if extra_args and extra_args[0] == "--": extra_args = extra_args[1:] - + # Handle --discover (can be run with or without a recipe) if args.discover: env = run_autodiscover() if env is None: return 1 - + print("Discovered configuration:") for key, value in sorted(env.items()): print(f" {key}={value}") print() - + save_env_file(env) - + if not args.recipe: return 0 - + # Handle --show-env if args.show_env: env = load_env_file() @@ -885,39 +1005,39 @@ Examples: else: print(f"No .env file found at {ENV_FILE}") print("Run with --discover to auto-detect cluster nodes.") - + if not args.recipe: return 0 print() - + if args.list: list_recipes() return 0 - + if not args.recipe: parser.print_help() return 1 - + # Load recipe recipe_path = Path(args.recipe) recipe = load_recipe(recipe_path) - + print(f"Recipe: {recipe['name']}") if recipe.get("description"): print(f" {recipe['description']}") print() - + # Determine container image container = args.container_override or recipe["container"] model = recipe.get("model") build_args = recipe.get("build_args", []) - + # Parse nodes - check command line first, then .env file, then autodiscover nodes = parse_nodes(args.nodes) if not args.solo else [] nodes_from_env = False eth_if = None ib_if = None - + if not args.solo: # Try to load from .env file env = load_env_file() @@ -932,16 +1052,22 @@ Examples: # No nodes specified and no .env - run autodiscover print("No cluster nodes configured. Running autodiscover...") print() - + discovered_env = run_autodiscover() if discovered_env and discovered_env.get("CLUSTER_NODES"): nodes = parse_nodes(discovered_env["CLUSTER_NODES"]) nodes_from_env = True - + if nodes: # Ask if user wants to save to .env print() - response = input("Save this configuration to .env for future use? [Y/n]: ").strip().lower() + response = ( + input( + "Save this configuration to .env for future use? [Y/n]: " + ) + .strip() + .lower() + ) if response in ("", "y", "yes"): save_env_file(discovered_env) print() @@ -954,7 +1080,7 @@ Examples: eth_if = env["ETH_IF"] if not ib_if and env.get("IB_IF"): ib_if = env["IB_IF"] - + worker_nodes = get_worker_nodes(nodes) if nodes else [] is_cluster = len(nodes) > 1 @@ -962,9 +1088,11 @@ Examples: cluster_only = recipe.get("cluster_only", False) solo_only = recipe.get("solo_only", False) is_solo = args.solo or not is_cluster - - if getattr(args, 'no_ray', False) and is_solo: - print("Error: --no-ray is incompatible with --solo. Solo mode already runs without Ray.") + + if getattr(args, "no_ray", False) and is_solo: + print( + "Error: --no-ray is incompatible with --solo. Solo mode already runs without Ray." + ) return 1 if cluster_only and is_solo: @@ -972,7 +1100,9 @@ Examples: print(f"This model is too large to run on a single node.") print() print("Options:") - print(f" 1. Specify nodes directly: {sys.argv[0]} {args.recipe} -n node1,node2") + print( + f" 1. Specify nodes directly: {sys.argv[0]} {args.recipe} -n node1,node2" + ) print(f" 2. Auto-discover and save: {sys.argv[0]} --discover") print(f" Then run: {sys.argv[0]} {args.recipe}") return 1 @@ -984,10 +1114,10 @@ Examples: print(f" 1. Run solo: {sys.argv[0]} {args.recipe} --solo") print(f" 2. Remove nodes from .env: {sys.argv[0]} --show-env") return 1 - + # Determine copy targets for cluster deployments copy_targets = worker_nodes if is_cluster else None - + if args.dry_run: print("=== Dry Run ===") print(f"Container: {container}") @@ -1007,9 +1137,13 @@ Examples: print(f" Workers: {', '.join(worker_nodes)}") print(f"Solo mode: {is_solo}") if eth_if: - print(f"Ethernet interface: {eth_if}{' (from .env)' if not args.eth_if else ''}") + print( + f"Ethernet interface: {eth_if}{' (from .env)' if not args.eth_if else ''}" + ) if ib_if: - print(f"InfiniBand interface: {ib_if}{' (from .env)' if not args.ib_if else ''}") + print( + f"InfiniBand interface: {ib_if}{' (from .env)' if not args.ib_if else ''}" + ) if args.container_name: print(f"Container name: {args.container_name}") if args.non_privileged: @@ -1031,7 +1165,7 @@ Examples: print() else: image_exists = check_image_exists(container) - + if args.force_build or not image_exists: print("=== Building Container ===") if not build_image(container, copy_targets, build_args): @@ -1053,11 +1187,11 @@ Examples: print("Error: Failed to build/copy container") return 1 print() - + if args.build_only: print("Build complete." if not args.dry_run else "") return 0 - + # --- Download Phase --- if model and (args.download_only or args.setup or args.force_download): if args.dry_run: @@ -1071,7 +1205,7 @@ Examples: print() else: model_exists = check_model_exists(model) - + if args.force_download or not model_exists: print("=== Downloading Model ===") if not download_model(model, copy_targets): @@ -1081,15 +1215,15 @@ Examples: else: print(f"Model '{model}' already exists in cache.") print() - + if args.download_only: print("Download complete." if not args.dry_run else "") return 0 - + # --- Run Phase --- if args.build_only or args.download_only: return 0 - + # Check if image exists (if not using --setup) if not args.dry_run and not args.setup and not check_image_exists(container): print(f"Container image '{container}' not found locally.") @@ -1099,48 +1233,64 @@ Examples: print(f" 2. Build manually: ./build-and-copy.sh -t {container}") print() response = input("Build now? [y/N] ").strip().lower() - if response == 'y': + if response == "y": if not build_image(container, copy_targets, build_args): print("Error: Failed to build image") return 1 else: print("Aborting.") return 1 - + # Build overrides from CLI args overrides = {} - for key in ["port", "host", "tensor_parallel", "gpu_memory_utilization", "max_model_len"]: + for key in [ + "port", + "host", + "tensor_parallel", + "gpu_memory_utilization", + "max_model_len", + ]: value = getattr(args, key, None) if value is not None: overrides[key] = value - + # In solo mode, default tensor_parallel to 1 (unless user explicitly set --tp) if is_solo and "tensor_parallel" not in overrides: overrides["tensor_parallel"] = 1 - + # Check for duplicate arguments (warn if extra_args duplicate CLI overrides) if extra_args: # Map vLLM flags to our override keys flag_to_override = { - '--port': 'port', - '--host': 'host', - '--tensor-parallel-size': 'tensor_parallel', - '-tp': 'tensor_parallel', - '--gpu-memory-utilization': 'gpu_memory_utilization', - '--max-model-len': 'max_model_len', + "--port": "port", + "--host": "host", + "--tensor-parallel-size": "tensor_parallel", + "-tp": "tensor_parallel", + "--gpu-memory-utilization": "gpu_memory_utilization", + "--max-model-len": "max_model_len", } for i, arg in enumerate(extra_args): # Check both exact flag and =value syntax - flag = arg.split('=')[0] if '=' in arg else arg + flag = arg.split("=")[0] if "=" in arg else arg if flag in flag_to_override: override_key = flag_to_override[flag] if override_key in overrides: - print(f"Warning: '{arg}' in extra args duplicates --{override_key.replace('_', '-')} override") - print(f" vLLM uses last value; extra args appear after template substitution") - + print( + f"Warning: '{arg}' in extra args duplicates --{override_key.replace('_', '-')} override" + ) + print( + f" vLLM uses last value; extra args appear after template substitution" + ) + # Generate launch script - script_content = generate_launch_script(recipe, overrides, is_solo=is_solo, extra_args=extra_args, no_ray=getattr(args, 'no_ray', False)) - + script_content = generate_launch_script( + recipe, + overrides, + is_solo=is_solo, + extra_args=extra_args, + no_ray=getattr(args, "no_ray", False), + ) + if args.dry_run: print("=== Generated Launch Script ===") print(script_content) @@ -1158,7 +1308,7 @@ Examples: cmd_parts.append("--solo") if args.daemon: cmd_parts.append("-d") - if getattr(args, 'no_ray', False): + if getattr(args, "no_ray", False): cmd_parts.append("--no-ray") if nodes: cmd_parts.extend(["-n", ",".join(nodes)]) @@ -1193,42 +1343,42 @@ Examples: print() print("3. The launch script runs inside the container") return 0 - + # Write temporary launch script - with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: + with tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as f: f.write(script_content) temp_script = f.name - + try: os.chmod(temp_script, 0o755) - + # Build launch-cluster.sh command cmd = [str(LAUNCH_SCRIPT), "-t", container] - + # Add mods for mod in recipe.get("mods", []): mod_path = SCRIPT_DIR / mod if not mod_path.exists(): print(f"Warning: Mod path not found: {mod_path}") cmd.extend(["--apply-mod", str(mod_path)]) - + # Add launch options if args.solo: cmd.append("--solo") elif not is_cluster: # Auto-enable solo mode if no cluster nodes specified cmd.append("--solo") - + if args.daemon: cmd.append("-d") - if getattr(args, 'no_ray', False): + if getattr(args, "no_ray", False): cmd.append("--no-ray") # Pass nodes to launch-cluster.sh (from command line, .env, or autodiscover) if nodes: cmd.extend(["-n", ",".join(nodes)]) - + if args.nccl_debug: cmd.extend(["--nccl-debug", args.nccl_debug]) @@ -1260,7 +1410,7 @@ Examples: # Add launch script cmd.extend(["--launch-script", temp_script]) - + print(f"=== Launching ===") print(f"Container: {container}") if recipe.get("mods"): @@ -1270,11 +1420,11 @@ Examples: else: print("Mode: Solo") print() - + # Execute result = subprocess.run(cmd) return result.returncode - + finally: # Cleanup temp script try: