diff --git a/CHANGELOG.md b/CHANGELOG.md index b2f759c..060d486 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # Changelog +## 2.4.7 + +### Changed: pin @coana-tech/cli version; auto-update is now opt-in + +- Reachability analysis now runs a fixed `@coana-tech/cli` version pinned to this CLI release + (`15.3.24`) via `npx`, instead of silently pulling the latest published version on every run. + Engine version changes now ride with the Socket Python CLI release (standard `pip` upgrade), + giving advance notice of analysis-engine changes. +- The CLI no longer runs `npm install -g @coana-tech/cli`; an existing global install is left + untouched (never auto-updated or downgraded). +- Opt into always-newest with `--reach-version latest`; pin an explicit version with + `--reach-version ` (unchanged). +- Runs the engine via `npx --yes --force` (the same flags the Socket Node CLI passes for + coana); `--yes` skips npx's interactive install prompt so non-interactive/CI runs don't hang. +- Added an `npm install` + `node` fallback for when the `npx` launcher is missing or fails + before the engine starts. The installed engine is cached per version for the process + lifetime (installs once). Tunable via `SOCKET_CLI_COANA_FORCE_NPM_INSTALL` (use the fallback + as the primary path) and `SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK` (never fall back). `node` is + now part of the up-front prerequisite check. Also strips `npm_package_*` env vars before + spawning the engine to avoid `E2BIG` in large monorepos. + ## 2.4.6 ### Docs: reachability reference corrections diff --git a/docs/cli-reference.md b/docs/cli-reference.md index cdbf003..fd5875c 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -240,7 +240,7 @@ If you don't want to provide the Socket API Token every time then you can use th | Parameter | Required | Default | Description | |:---------------------------------|:---------|:--------|:---------------------------------------------------------------------------------------------------------------------------| | `--reach` | False | False | Enable reachability analysis to identify which vulnerable functions are actually called by your code. Creates a tier-1 full-application reachability scan (`scan_type=socket_tier1`). | -| `--reach-version` | False | latest | Version of @coana-tech/cli to use for analysis | +| `--reach-version` | False | *pinned* | Version of @coana-tech/cli to use. Defaults to the version pinned to this CLI release (currently `15.3.24`), so the engine only changes when you upgrade the Socket CLI. Pass `latest` to always use the newest published version (opt-in auto-update), or an explicit version (e.g. `1.2.3`) to pin it. | | `--reach-analysis-timeout` | False | *coana* | Timeout in seconds for the reachability analysis. Omitted by default, so coana applies its own (currently 600s). Alias: `--reach-timeout` | | `--reach-analysis-memory-limit` | False | *coana* | Memory limit in MB for the reachability analysis. Omitted by default, so coana applies its own (currently 8192). Alias: `--reach-memory-limit` | | `--reach-concurrency` | False | *coana* | Control parallel analysis execution (must be >= 1). Omitted by default, so coana applies its own (currently 1) | @@ -262,8 +262,9 @@ If you don't want to provide the Socket API Token every time then you can use th **Reachability Analysis Requirements:** The Python CLI verifies the following **up front** (before invoking the analysis engine) and exits with code **3** if any are unmet: -- `npm` - Required to install and run `@coana-tech/cli` (the analysis engine) -- `npx` - Required to execute `@coana-tech/cli` +- `npm` - Required (verified up front; ships alongside `npx`) +- `npx` - Required to fetch (on first use) and run `@coana-tech/cli` (the analysis engine) +- `node` - Required to run the engine (used directly by the `npm install` fallback) - `uv` - Required by the analysis engine - An **Enterprise** Socket organization plan (any `enterprise*` plan, including Enterprise trials) @@ -313,7 +314,11 @@ Sample config files: For CI-specific examples and guidance, see [`ci-cd.md`](ci-cd.md). -The CLI will automatically install `@coana-tech/cli` if not present. Use `--reach` to enable reachability analysis during a full scan, or add `--only-facts-file` (with `--reach`) to submit only the reachability facts file (`.socket.facts.json`) when creating the full scan. +The CLI runs a pinned `@coana-tech/cli` version via `npx --yes --force` (the same flags the Socket Node CLI passes for coana); it does **not** auto-update the engine or install it globally. `--yes` skips npx's interactive install prompt so non-interactive/CI runs don't hang. If the `npx` launcher is unavailable or fails before the engine starts, the CLI falls back to `npm install`-ing the pinned version into a temp directory and running it via `node`. Pass `--reach-version latest` to opt into the newest published version. Use `--reach` to enable reachability analysis during a full scan, or add `--only-facts-file` (with `--reach`) to submit only the reachability facts file (`.socket.facts.json`) when creating the full scan. + +The launcher fallback can be tuned via environment variables: +- `SOCKET_CLI_COANA_FORCE_NPM_INSTALL` — skip `npx` entirely and always use the `npm install` + `node` path (useful where `npx` is known-broken). +- `SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK` — never fall back; surface the `npx` failure directly. #### Advanced Configuration | Parameter | Required | Default | Description | diff --git a/pyproject.toml b/pyproject.toml index 54cdb82..6bed831 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" [project] name = "socketsecurity" -version = "2.4.6" +version = "2.4.7" requires-python = ">= 3.11" license = {"file" = "LICENSE"} dependencies = [ diff --git a/socketsecurity/__init__.py b/socketsecurity/__init__.py index 7ac4760..91c7faf 100644 --- a/socketsecurity/__init__.py +++ b/socketsecurity/__init__.py @@ -1,3 +1,3 @@ __author__ = 'socket.dev' -__version__ = '2.4.6' +__version__ = '2.4.7' USER_AGENT = f'SocketPythonCLI/{__version__}' diff --git a/socketsecurity/config.py b/socketsecurity/config.py index 3c258a3..9d9f62d 100644 --- a/socketsecurity/config.py +++ b/socketsecurity/config.py @@ -943,8 +943,10 @@ def create_argument_parser() -> argparse.ArgumentParser: reachability_group.add_argument( "--reach-version", dest="reach_version", - metavar="", - help="Specific version of @coana-tech/cli to use (e.g., '1.2.3')" + metavar="", + help="Version of @coana-tech/cli to use. Defaults to the version pinned to this CLI " + "release; pass 'latest' to always use the newest published version (opt-in " + "auto-update), or an explicit version (e.g. '1.2.3') to pin it." ) reachability_group.add_argument( "--reach-analysis-timeout", diff --git a/socketsecurity/core/tools/reachability.py b/socketsecurity/core/tools/reachability.py index 008bd65..013b9f1 100644 --- a/socketsecurity/core/tools/reachability.py +++ b/socketsecurity/core/tools/reachability.py @@ -7,11 +7,22 @@ import pathlib import logging import sys +import tempfile from socketsecurity import __version__ log = logging.getLogger(__name__) +# Pinned @coana-tech/cli version. Bumped deliberately per Python CLI release so the +# reachability engine version only changes through a standard pip upgrade (advance notice). +# Pass --reach-version latest to opt into the newest published version instead. +DEFAULT_COANA_CLI_VERSION = "15.3.24" + +# Resolved @coana-tech/cli script paths from the npm-install fallback, keyed by version. +# Lives for the process lifetime so repeated fallback invocations install only once +# (mirrors the Node CLI's installedCoanaScriptPathsByVersion). +_INSTALLED_COANA_SCRIPT_PATHS: Dict[str, str] = {} + def _build_caller_user_agent() -> str: """Build the SOCKET_CALLER_USER_AGENT string forwarded to the coana CLI. @@ -31,70 +42,31 @@ def __init__(self, sdk: socketdev, api_token: str): self.sdk = sdk self.api_token = api_token - def _ensure_coana_cli_installed(self, version: Optional[str] = None) -> str: + def _resolve_coana_package_spec(self, version: Optional[str] = None) -> str: """ - Check if @coana-tech/cli is installed, and install/update it if needed. - + Resolve the @coana-tech/cli package spec to run with npx. + + We pass an exact, versioned spec to npx so it runs a deterministic version from its + own cache (fetching once if absent). We intentionally do NOT ``npm install -g`` here: + that would silently auto-update the engine on every run and mutate the user's global + install. The pinned version rides with the Python CLI release instead, so engine + changes only happen through a standard pip upgrade (advance notice). + Args: - version: Specific version to install (e.g., '1.2.3'). If None, always updates to latest. - + version: Coana CLI version to use. + - None: the pinned ``DEFAULT_COANA_CLI_VERSION`` (no auto-update). + - 'latest': always the newest published version (opt-in to auto-update). + - '': that exact version. + Returns: - str: The package specifier to use with npx + str: The package specifier to use with npx (e.g. '@coana-tech/cli@15.3.24'). """ - # Determine the package specifier - package_spec = f"@coana-tech/cli@{version}" if version else "@coana-tech/cli" - - # If a specific version is requested, check if it's already installed - if version: - try: - check_cmd = ["npm", "list", "-g", "@coana-tech/cli", "--depth=0"] - result = subprocess.run( - check_cmd, - capture_output=True, - text=True, - timeout=10 - ) - - # If npm list succeeds and mentions the specific version, it's installed - if result.returncode == 0 and f"@coana-tech/cli@{version}" in result.stdout: - log.debug(f"@coana-tech/cli@{version} is already installed globally") - return package_spec - - except Exception as e: - log.debug(f"Could not check for existing @coana-tech/cli installation: {e}") - - # Install or update the package - # When no version is specified, always try to update to latest - if version: - log.info(f"Installing reachability analysis plugin (@coana-tech/cli@{version})...") - else: - log.info("Updating reachability analysis plugin (@coana-tech/cli) to latest version...") - log.info("This may take a moment...") - - try: - install_cmd = ["npm", "install", "-g", package_spec] - log.debug(f"Installing with command: {' '.join(install_cmd)}") - - result = subprocess.run( - install_cmd, - capture_output=True, - text=True, - timeout=300 # 5 minute timeout for installation - ) - - if result.returncode != 0: - log.warning(f"Global installation failed, npx will download on demand") - log.debug(f"Install stderr: {result.stderr}") - else: - log.info("Reachability analysis plugin installed successfully") - - except subprocess.TimeoutExpired: - log.warning("Installation timed out, npx will download on demand") - except Exception as e: - log.warning(f"Could not install globally: {e}, npx will download on demand") - - return package_spec - + return f"@coana-tech/cli@{self._resolve_coana_version(version)}" + + def _resolve_coana_version(self, version: Optional[str] = None) -> str: + """Resolve the effective @coana-tech/cli version string (see _resolve_coana_package_spec).""" + return (version or DEFAULT_COANA_CLI_VERSION).strip() + def run_reachability_analysis( self, @@ -147,7 +119,9 @@ def run_reachability_analysis( lazy_mode: Enable lazy mode for analysis repo_name: Repository name branch_name: Branch name - version: Specific version of @coana-tech/cli to use + version: @coana-tech/cli version to use. None uses the pinned + DEFAULT_COANA_CLI_VERSION (no auto-update); 'latest' opts into the newest + published version; '' pins an explicit version. concurrency: Concurrency level for analysis (must be >= 1) additional_params: Additional parameters to pass to coana CLI allow_unverified: Disable SSL certificate verification (sets NODE_TLS_REJECT_UNAUTHORIZED=0) @@ -157,16 +131,14 @@ def run_reachability_analysis( Returns: Dict containing scan_id and report_path """ - # Ensure @coana-tech/cli is installed - cli_package = self._ensure_coana_cli_installed(version) - - # Build CLI command arguments - cmd = ["npx", cli_package, "run", "."] - + # Build the coana CLI arguments (everything after the package spec). The launcher + # (npx, or the npm-install + node fallback) is chosen in _spawn_coana() below. + coana_args = ["run", "."] + # Add required arguments output_dir = str(pathlib.Path(output_path).parent) log.debug(f"output_dir: {output_dir}, output_path: {output_path}") - cmd.extend([ + coana_args.extend([ "--output-dir", output_dir, "--socket-mode", output_path, "--disable-report-submission" @@ -174,70 +146,70 @@ def run_reachability_analysis( # Add conditional arguments if timeout: - cmd.extend(["--analysis-timeout", str(timeout)]) + coana_args.extend(["--analysis-timeout", str(timeout)]) if memory_limit: - cmd.extend(["--memory-limit", str(memory_limit)]) + coana_args.extend(["--memory-limit", str(memory_limit)]) if disable_analytics: - cmd.append("--disable-analytics-sharing") + coana_args.append("--disable-analytics-sharing") # Analysis splitting is disabled by default; only omit the flag if explicitly enabled if not enable_analysis_splitting: - cmd.append("--disable-analysis-splitting") + coana_args.append("--disable-analysis-splitting") if detailed_analysis_log_file: - cmd.append("--print-analysis-log-file") + coana_args.append("--print-analysis-log-file") if lazy_mode: - cmd.append("--lazy-mode") + coana_args.append("--lazy-mode") # KEY POINT: Only add manifest tar hash if we have one if tar_hash: - cmd.extend(["--run-without-docker", "--manifests-tar-hash", tar_hash]) + coana_args.extend(["--run-without-docker", "--manifests-tar-hash", tar_hash]) if ecosystems: - cmd.extend(["--purl-types"] + ecosystems) + coana_args.extend(["--purl-types"] + ecosystems) if exclude_paths: - cmd.extend(["--exclude-dirs"] + exclude_paths) + coana_args.extend(["--exclude-dirs"] + exclude_paths) if min_severity: - cmd.extend(["--min-severity", min_severity]) + coana_args.extend(["--min-severity", min_severity]) if skip_cache: - cmd.append("--skip-cache-usage") + coana_args.append("--skip-cache-usage") if concurrency: - cmd.extend(["--concurrency", str(concurrency)]) + coana_args.extend(["--concurrency", str(concurrency)]) if enable_debug: - cmd.append("-d") + coana_args.append("-d") if reach_debug: - cmd.append("--debug") + coana_args.append("--debug") if disable_external_tool_checks: - cmd.append("--disable-external-tool-checks") + coana_args.append("--disable-external-tool-checks") if use_only_pregenerated_sboms: - cmd.append("--use-only-pregenerated-sboms") + coana_args.append("--use-only-pregenerated-sboms") if continue_on_analysis_errors: - cmd.append("--reach-continue-on-analysis-errors") + coana_args.append("--reach-continue-on-analysis-errors") if continue_on_install_errors: - cmd.append("--reach-continue-on-install-errors") + coana_args.append("--reach-continue-on-install-errors") if continue_on_missing_lock_files: - cmd.append("--reach-continue-on-missing-lock-files") + coana_args.append("--reach-continue-on-missing-lock-files") if continue_on_no_source_files: - cmd.append("--reach-continue-on-no-source-files") + coana_args.append("--reach-continue-on-no-source-files") # Add any additional parameters provided by the user if additional_params: - cmd.extend(additional_params) + coana_args.extend(additional_params) # Set up environment variables env = os.environ.copy() @@ -268,24 +240,18 @@ def run_reachability_analysis( if allow_unverified: env["NODE_TLS_REJECT_UNAUTHORIZED"] = "0" - # Execute CLI + # Execute coana log.info("Running reachability analysis...") - log.debug(f"Reachability command: {' '.join(cmd)}") log.debug(f"Environment: SOCKET_ORG_SLUG={org_slug}, SOCKET_REPO_NAME={repo_name or 'not set'}, SOCKET_BRANCH_NAME={branch_name or 'not set'}") - + try: - # Run with output streaming to stderr (don't capture output) - result = subprocess.run( - cmd, - env=env, - cwd=target_directory, - stdout=sys.stderr, # Send stdout to stderr so user sees it - stderr=sys.stderr, # Send stderr to stderr - ) - - if result.returncode != 0: - log.error(f"Reachability analysis failed with exit code {result.returncode}") - raise Exception(f"Reachability analysis failed with exit code {result.returncode}") + # Prefer npx (with caching disabled); fall back to `npm install` + `node` + # if the npx launcher fails before coana starts (parity with the Node CLI). + returncode = self._spawn_coana(coana_args, version, env, target_directory) + + if returncode != 0: + log.error(f"Reachability analysis failed with exit code {returncode}") + raise Exception(f"Reachability analysis failed with exit code {returncode}") # Extract scan ID from output file scan_id = self._extract_scan_id(output_path) @@ -303,7 +269,173 @@ def run_reachability_analysis( except Exception as e: log.error(f"Failed to run reachability analysis: {str(e)}") raise Exception(f"Failed to run reachability analysis: {str(e)}") - + + @staticmethod + def _sanitize_coana_env(env: Dict[str, str]) -> Dict[str, str]: + """Drop npm-injected ``npm_package_*`` vars before spawning coana. + + npm/pnpm/yarn populate one env var per leaf of the cwd's package.json + (``npm_package_dependencies_*`` etc.). In large monorepos this can be tens of KB + and push argv+env past the OS ARG_MAX, making the spawn fail with E2BIG before + coana even starts. coana doesn't read these, so dropping them is safe; we keep + ``npm_config_*`` (registry/cache/proxy) untouched. Mirrors the Node CLI. + """ + return {k: v for k, v in env.items() if not k.startswith("npm_package_")} + + @staticmethod + def _npx_launcher_failed_before_coana(returncode: int) -> bool: + """Heuristic: did npx fail *before* coana started (so retrying is worthwhile)? + + We stream coana's output (no capture), so we classify by exit code alone, like the + Node CLI does with inherited stdio: signal kills (negative codes) and codes >= 128 + are conventionally launcher/signal failures -> retry. Small positive codes (1..127) + are ambiguous (coana's own exit codes are small ints), so we do NOT retry. + """ + return returncode < 0 or returncode >= 128 + + def _spawn_coana( + self, + coana_args: List[str], + version: Optional[str], + env: Dict[str, str], + cwd: str, + ) -> int: + """Run coana for the given args, returning the process exit code. + + Primary path: ``npx --yes --force @coana-tech/cli@ ...`` — the exact flags + the Socket Node CLI passes for coana. ``--yes`` skips npx's interactive install + confirmation so non-interactive/CI runs don't hang. ``--force`` matches the Node CLI + (it opts out of npm's prompts/protections and refreshes within-range specs); note it + does NOT force a re-download of an already-cached pinned version — npx still reuses a + cached pinned package, so this is parity with the Node CLI, not a cache bypass. + + Fallback path: if npx is missing, or its launcher dies before coana starts, install + @coana-tech/cli into a temp dir via ``npm install`` and run it directly via ``node``. + Toggle with ``SOCKET_CLI_COANA_FORCE_NPM_INSTALL`` (use the fallback as the primary + path) and ``SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK`` (never fall back). + """ + effective_version = self._resolve_coana_version(version) + coana_env = self._sanitize_coana_env(env) + disable_fallback = bool(os.environ.get("SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK")) + + if os.environ.get("SOCKET_CLI_COANA_FORCE_NPM_INSTALL"): + return self._spawn_coana_via_npm_install(coana_args, effective_version, coana_env, cwd) + + package_spec = f"@coana-tech/cli@{effective_version}" + # --yes skips npx's install confirmation; --force matches the Node CLI's coana flags. + npx_cmd = ["npx", "--yes", "--force", package_spec, *coana_args] + log.debug(f"Reachability command: {' '.join(npx_cmd)}") + try: + result = subprocess.run( + npx_cmd, + env=coana_env, + cwd=cwd, + stdout=sys.stderr, # Send stdout to stderr so the user sees it + stderr=sys.stderr, + ) + except FileNotFoundError: + # npx is not on PATH: the launcher provably never started coana. + if disable_fallback: + raise + log.warning("npx not found on PATH; retrying reachability analysis via `npm install` + `node`.") + return self._spawn_coana_via_npm_install(coana_args, effective_version, coana_env, cwd) + + if result.returncode == 0: + return 0 + + if not disable_fallback and self._npx_launcher_failed_before_coana(result.returncode): + log.warning( + f"npx launcher failed (exit {result.returncode}) before coana started; " + "retrying reachability analysis via `npm install` + `node`." + ) + return self._spawn_coana_via_npm_install(coana_args, effective_version, coana_env, cwd) + + return result.returncode + + def _spawn_coana_via_npm_install( + self, + coana_args: List[str], + version: str, + env: Dict[str, str], + cwd: str, + ) -> int: + """Fallback launcher: ``npm install`` @coana-tech/cli into a temp dir, run via ``node``. + + Used when npx is unavailable or its launcher fails before coana boots. Mirrors the + Node CLI's npm-install fallback. Returns coana's exit code; raises if the install + itself fails or if ``node`` is unavailable. + """ + script_path = self._install_coana_to_tmpdir(version, env) + node_cmd = self._build_coana_node_cmd(script_path, coana_args) + log.debug(f"Reachability fallback command: {' '.join(node_cmd)}") + try: + result = subprocess.run(node_cmd, env=env, cwd=cwd, stdout=sys.stderr, stderr=sys.stderr) + except FileNotFoundError as e: + # The fallback exists for broken-launcher environments, but it still needs node. + raise Exception( + "`node` was not found on PATH; it is required to run the reachability engine " + "via the npm-install fallback." + ) from e + return result.returncode + + def _install_coana_to_tmpdir(self, version: str, env: Dict[str, str]) -> str: + """``npm install`` @coana-tech/cli@ into a temp dir; return its executable JS path. + + Caches the resolved path per version for the process lifetime so repeated fallback + invocations install only once (mirrors the Node CLI's installCoanaToTmpdir). Raises if + the install fails. + """ + cached = _INSTALLED_COANA_SCRIPT_PATHS.get(version) + if cached and os.path.exists(cached): + return cached + + install_dir = tempfile.mkdtemp(prefix="socket-coana-") + npm_cmd = [ + "npm", "install", + "--no-save", "--no-package-lock", "--no-audit", "--no-fund", + "--prefix", install_dir, + f"@coana-tech/cli@{version}", + ] + log.info("Installing reachability analysis engine via npm fallback...") + log.debug(f"npm install fallback command: {' '.join(npm_cmd)}") + install = subprocess.run(npm_cmd, env=env, stdout=sys.stderr, stderr=sys.stderr) + if install.returncode != 0: + raise Exception( + f"npm install fallback for @coana-tech/cli@{version} failed with exit code {install.returncode}" + ) + + script_path = self._resolve_coana_bin(install_dir) + _INSTALLED_COANA_SCRIPT_PATHS[version] = script_path + return script_path + + @staticmethod + def _resolve_coana_bin(install_dir: str) -> str: + """Resolve @coana-tech/cli's executable JS from its installed package.json ``bin`` field.""" + package_json_path = os.path.join( + install_dir, "node_modules", "@coana-tech", "cli", "package.json" + ) + with open(package_json_path, "r") as f: + pkg = json.load(f) + bin_field = pkg.get("bin") + relative_bin = None + if isinstance(bin_field, str): + relative_bin = bin_field + elif isinstance(bin_field, dict): + # Prefer an entry named "coana"; otherwise take the first. + relative_bin = bin_field.get("coana") or next(iter(bin_field.values()), None) + if not relative_bin: + raise Exception( + f"@coana-tech/cli package.json at {package_json_path} is missing a usable bin entry" + ) + return os.path.abspath(os.path.join(os.path.dirname(package_json_path), relative_bin)) + + @staticmethod + def _build_coana_node_cmd(script_path: str, coana_args: List[str]) -> List[str]: + """Run a .js/.mjs entry via ``node``; invoke a native binary directly (Node CLI parity).""" + if script_path.endswith(".js") or script_path.endswith(".mjs"): + return ["node", script_path, *coana_args] + return [script_path, *coana_args] + def _extract_scan_id(self, facts_file_path: str) -> Optional[str]: """ Extract tier1ReachabilityScanId from the socket facts JSON file. diff --git a/socketsecurity/socketcli.py b/socketsecurity/socketcli.py index 1e339ae..1ab9e90 100644 --- a/socketsecurity/socketcli.py +++ b/socketsecurity/socketcli.py @@ -189,7 +189,7 @@ def main_code(): # Check for required dependencies if reachability analysis is enabled if config.reach: log.info("Reachability analysis enabled, checking for required dependencies...") - required_deps = ["npm", "uv", "npx"] + required_deps = ["npm", "node", "uv", "npx"] missing_deps = [] found_deps = [] diff --git a/tests/unit/test_reachability.py b/tests/unit/test_reachability.py index 10b6d54..32a887a 100644 --- a/tests/unit/test_reachability.py +++ b/tests/unit/test_reachability.py @@ -1,8 +1,10 @@ """Tests for the reachability coana-CLI command/env construction (Node alignment). -These cover the arg-builder and environment wiring in -``socketsecurity.core.tools.reachability.ReachabilityAnalyzer`` without actually -invoking npm/npx/coana: ``_ensure_coana_cli_installed`` and ``subprocess.run`` are mocked. +These cover the arg-builder, the npx launcher (caching disabled via --yes --force), the +npm-install + node fallback, and the environment wiring in +``socketsecurity.core.tools.reachability.ReachabilityAnalyzer`` without actually invoking +npx/npm/node/coana: ``subprocess.run`` (and, for the fallback, ``tempfile.mkdtemp`` / +``_resolve_coana_bin``) are mocked. """ from unittest.mock import MagicMock @@ -11,6 +13,7 @@ from socketsecurity import __version__ from socketsecurity.core.tools import reachability from socketsecurity.core.tools.reachability import ( + DEFAULT_COANA_CLI_VERSION, ReachabilityAnalyzer, _build_caller_user_agent, ) @@ -21,19 +24,32 @@ def analyzer(): return ReachabilityAnalyzer(MagicMock(), "test-api-token") -def _run(analyzer, mocker, **kwargs): - """Invoke run_reachability_analysis with npm/npx/coana mocked; return (cmd, env).""" - mocker.patch.object(analyzer, "_ensure_coana_cli_installed", return_value="@coana-tech/cli") +@pytest.fixture(autouse=True) +def _clear_coana_install_cache(): + """The npm-install fallback caches resolved script paths in a module-level dict; isolate tests.""" + reachability._INSTALLED_COANA_SCRIPT_PATHS.clear() + yield + reachability._INSTALLED_COANA_SCRIPT_PATHS.clear() + + +def _spawn_mock(analyzer, mocker, returncode=0, **kwargs): + """Run run_reachability_analysis with subprocess.run mocked to a fixed exit code. + + Uses the real resolver / _spawn_coana; returns the run mock for inspection. + """ mocker.patch.object(analyzer, "_extract_scan_id", return_value="scan-123") completed = MagicMock() - completed.returncode = 0 + completed.returncode = returncode run_mock = mocker.patch.object(reachability.subprocess, "run", return_value=completed) analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".", **kwargs) + return run_mock + - cmd = run_mock.call_args.args[0] - env = run_mock.call_args.kwargs["env"] - return cmd, env +def _run(analyzer, mocker, **kwargs): + """Invoke run_reachability_analysis on the happy npx path; return (npx argv, env).""" + run_mock = _spawn_mock(analyzer, mocker, **kwargs) + return run_mock.call_args.args[0], run_mock.call_args.kwargs["env"] def test_build_caller_user_agent_shape(): @@ -104,3 +120,265 @@ def test_repo_branch_env_absent_when_none(analyzer, mocker): _, env = _run(analyzer, mocker, repo_name=None, branch_name=None) assert "SOCKET_REPO_NAME" not in env assert "SOCKET_BRANCH_NAME" not in env + + +# --- Coana package-spec resolution (pinned by default, latest is opt-in) --- + + +def test_resolve_spec_defaults_to_pinned_version(analyzer): + """No --reach-version -> pinned DEFAULT_COANA_CLI_VERSION (no auto-update).""" + assert ( + analyzer._resolve_coana_package_spec(None) + == f"@coana-tech/cli@{DEFAULT_COANA_CLI_VERSION}" + ) + + +def test_resolve_spec_pins_explicit_version(analyzer): + assert analyzer._resolve_coana_package_spec("1.2.3") == "@coana-tech/cli@1.2.3" + + +def test_resolve_spec_latest_opt_in(analyzer): + """'latest' opts into the newest published version.""" + assert analyzer._resolve_coana_package_spec("latest") == "@coana-tech/cli@latest" + + +def test_resolve_spec_is_always_versioned(analyzer): + """Never the bare '@coana-tech/cli' (which would let npx pick a stray global version).""" + for version in (None, "latest", "1.2.3", " 1.2.3 "): + assert analyzer._resolve_coana_package_spec(version).startswith("@coana-tech/cli@") + + +def _spec_in(cmd): + """The @coana-tech/cli@ spec from an npx argv (it follows the npx flags).""" + return next(a for a in cmd if a.startswith("@coana-tech/cli@")) + + +def test_npx_uses_yes_and_force_flags(analyzer, mocker): + """npx is invoked with --yes --force — the exact flags the Node CLI passes for coana.""" + cmd, _ = _run(analyzer, mocker) + assert cmd[0] == "npx" + assert "--yes" in cmd + assert "--force" in cmd + + +def test_npx_runs_pinned_version_by_default(analyzer, mocker): + cmd, _ = _run(analyzer, mocker) + assert _spec_in(cmd) == f"@coana-tech/cli@{DEFAULT_COANA_CLI_VERSION}" + + +def test_npx_runs_explicit_version(analyzer, mocker): + cmd, _ = _run(analyzer, mocker, version="9.9.9") + assert _spec_in(cmd) == "@coana-tech/cli@9.9.9" + + +def test_npx_runs_latest_when_opted_in(analyzer, mocker): + cmd, _ = _run(analyzer, mocker, version="latest") + assert _spec_in(cmd) == "@coana-tech/cli@latest" + + +def test_default_path_never_runs_npm_install(analyzer, mocker): + """On the happy path we use npx only — no `npm install` (no global mutation).""" + run_mock = _spawn_mock(analyzer, mocker) + for call in run_mock.call_args_list: + assert call.args[0][:2] != ["npm", "install"] + + +def test_env_strips_npm_package_vars(analyzer, mocker, monkeypatch): + """npm_package_* dropped (E2BIG guard); npm_config_* kept. Parity with the Node CLI.""" + monkeypatch.setenv("npm_package_dependencies_foo", "1.0.0") + monkeypatch.setenv("npm_config_registry", "https://example.test") + _, env = _run(analyzer, mocker) + assert "npm_package_dependencies_foo" not in env + assert env.get("npm_config_registry") == "https://example.test" + + +# --- npm-install + node fallback (when the npx launcher fails before coana starts) --- + + +def test_launcher_failure_heuristic(): + f = ReachabilityAnalyzer._npx_launcher_failed_before_coana + # Signal kills / >=128 -> launcher failure -> retry. + assert f(-9) is True + assert f(137) is True + assert f(249) is True + # Small positive exit codes are ambiguous (coana's own codes) -> do NOT retry. + assert f(1) is False + assert f(2) is False + assert f(127) is False + + +def _capture_spawns(analyzer, mocker, npx_behavior, **kwargs): + """Drive run_reachability_analysis capturing each spawned argv. + + ``npx_behavior`` is applied when argv[0] == 'npx': an int return code, or a + callable raising (e.g. FileNotFoundError). npm/node spawns always succeed. + """ + mocker.patch.object(analyzer, "_extract_scan_id", return_value="scan-123") + mocker.patch.object(reachability.tempfile, "mkdtemp", return_value="/tmp/socket-coana-x") + mocker.patch.object( + analyzer, + "_resolve_coana_bin", + return_value="/tmp/socket-coana-x/node_modules/@coana-tech/cli/coana.js", + ) + calls = [] + + def fake_run(argv, **_kw): + calls.append(argv) + if argv[0] == "npx" and callable(npx_behavior): + npx_behavior() + m = MagicMock() + m.returncode = npx_behavior if (argv[0] == "npx" and isinstance(npx_behavior, int)) else 0 + return m + + mocker.patch.object(reachability.subprocess, "run", side_effect=fake_run) + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".", **kwargs) + return calls + + +def test_falls_back_to_npm_install_when_npx_launcher_fails(analyzer, mocker): + """npx exits >=128 (launcher died) -> npm install + node run coana directly.""" + calls = _capture_spawns(analyzer, mocker, npx_behavior=137) + assert calls[0][0] == "npx" + assert calls[1][:2] == ["npm", "install"] + assert f"@coana-tech/cli@{DEFAULT_COANA_CLI_VERSION}" in calls[1] + assert calls[2][0] == "node" + assert calls[2][1] == "/tmp/socket-coana-x/node_modules/@coana-tech/cli/coana.js" + assert calls[2][2:4] == ["run", "."] + + +def test_falls_back_when_npx_missing(analyzer, mocker): + """npx not on PATH (FileNotFoundError) -> npm install + node fallback.""" + def raise_enoent(): + raise FileNotFoundError("npx") + + calls = _capture_spawns(analyzer, mocker, npx_behavior=raise_enoent) + assert calls[0][0] == "npx" + assert calls[1][:2] == ["npm", "install"] + assert calls[2][0] == "node" + + +def test_no_fallback_on_ambiguous_exit_code(analyzer, mocker): + """A small positive npx exit (coana's own failure) does NOT trigger the npm fallback.""" + mocker.patch.object(analyzer, "_extract_scan_id", return_value=None) + calls = [] + + def fake_run(argv, **_kw): + calls.append(argv) + m = MagicMock() + m.returncode = 1 + return m + + mocker.patch.object(reachability.subprocess, "run", side_effect=fake_run) + with pytest.raises(Exception): + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".") + assert calls[0][0] == "npx" + assert all(c[:2] != ["npm", "install"] for c in calls) + + +def test_force_npm_install_skips_npx(analyzer, mocker, monkeypatch): + """SOCKET_CLI_COANA_FORCE_NPM_INSTALL routes straight to npm install + node.""" + monkeypatch.setenv("SOCKET_CLI_COANA_FORCE_NPM_INSTALL", "1") + calls = _capture_spawns(analyzer, mocker, npx_behavior=0) + assert all(c[0] != "npx" for c in calls) + assert calls[0][:2] == ["npm", "install"] + assert calls[1][0] == "node" + + +def test_disable_fallback_propagates_npx_failure(analyzer, mocker, monkeypatch): + """SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK: a launcher failure is NOT retried via npm.""" + monkeypatch.setenv("SOCKET_CLI_COANA_DISABLE_NPM_FALLBACK", "1") + mocker.patch.object(analyzer, "_extract_scan_id", return_value=None) + calls = [] + + def fake_run(argv, **_kw): + calls.append(argv) + m = MagicMock() + m.returncode = 137 + return m + + mocker.patch.object(reachability.subprocess, "run", side_effect=fake_run) + with pytest.raises(Exception): + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".") + assert all(c[:2] != ["npm", "install"] for c in calls) + + +def test_fallback_installs_once_per_version(analyzer, mocker): + """A second in-process fallback for the same version reuses the install (no re-install).""" + mocker.patch.object(analyzer, "_extract_scan_id", return_value="scan-123") + mocker.patch.object(reachability.tempfile, "mkdtemp", return_value="/tmp/socket-coana-cache") + mocker.patch.object( + analyzer, + "_resolve_coana_bin", + return_value="/tmp/socket-coana-cache/node_modules/@coana-tech/cli/coana.js", + ) + # The cached script path must "exist" for the 2nd run to reuse it. + mocker.patch.object(reachability.os.path, "exists", return_value=True) + calls = [] + + def fake_run(argv, **_kw): + calls.append(argv) + m = MagicMock() + m.returncode = 137 if argv[0] == "npx" else 0 + return m + + mocker.patch.object(reachability.subprocess, "run", side_effect=fake_run) + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".") + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".") + + npm_installs = [c for c in calls if c[:2] == ["npm", "install"]] + assert len(npm_installs) == 1 # installed once, reused on the second fallback + + +def test_fallback_node_missing_raises_clear_error(analyzer, mocker): + """If `node` is missing in the fallback, surface a clear error (not opaque FileNotFoundError).""" + mocker.patch.object(analyzer, "_extract_scan_id", return_value=None) + mocker.patch.object(reachability.tempfile, "mkdtemp", return_value="/tmp/socket-coana-n") + mocker.patch.object( + analyzer, + "_resolve_coana_bin", + return_value="/tmp/socket-coana-n/node_modules/@coana-tech/cli/coana.js", + ) + + def fake_run(argv, **_kw): + if argv[0] == "npx": + m = MagicMock() + m.returncode = 137 + return m + if argv[0] == "node": + raise FileNotFoundError("node") + m = MagicMock() # npm install succeeds + m.returncode = 0 + return m + + mocker.patch.object(reachability.subprocess, "run", side_effect=fake_run) + with pytest.raises(Exception, match="node"): + analyzer.run_reachability_analysis(org_slug="my-org", target_directory=".") + + +def test_build_coana_node_cmd_js_vs_binary(): + f = ReachabilityAnalyzer._build_coana_node_cmd + assert f("/x/coana.js", ["run", "."]) == ["node", "/x/coana.js", "run", "."] + assert f("/x/coana.mjs", ["run"]) == ["node", "/x/coana.mjs", "run"] + assert f("/x/coana", ["run", "."]) == ["/x/coana", "run", "."] + + +def test_resolve_coana_bin_parses_package_json(analyzer, tmp_path): + pkg_dir = tmp_path / "node_modules" / "@coana-tech" / "cli" + pkg_dir.mkdir(parents=True) + + # string bin + (pkg_dir / "package.json").write_text('{"bin": "dist/coana.js"}') + assert analyzer._resolve_coana_bin(str(tmp_path)) == str(pkg_dir / "dist" / "coana.js") + + # dict bin, prefer the "coana" entry + (pkg_dir / "package.json").write_text('{"bin": {"coana": "dist/c.js", "other": "x.js"}}') + assert analyzer._resolve_coana_bin(str(tmp_path)) == str(pkg_dir / "dist" / "c.js") + + # dict bin without "coana" -> first value + (pkg_dir / "package.json").write_text('{"bin": {"other": "x.js"}}') + assert analyzer._resolve_coana_bin(str(tmp_path)) == str(pkg_dir / "x.js") + + # missing bin -> raises + (pkg_dir / "package.json").write_text("{}") + with pytest.raises(Exception, match="bin"): + analyzer._resolve_coana_bin(str(tmp_path)) diff --git a/uv.lock b/uv.lock index d8ebc6b..29cd641 100644 --- a/uv.lock +++ b/uv.lock @@ -1283,7 +1283,7 @@ wheels = [ [[package]] name = "socketsecurity" -version = "2.4.6" +version = "2.4.7" source = { editable = "." } dependencies = [ { name = "brotli", marker = "platform_python_implementation == 'CPython'" },