Source code for scitex_container._snapshot

#!/usr/bin/env python3
# Timestamp: "2026-02-25"
# File: src/scitex_container/_snapshot.py
"""Environment snapshot for Clew reproducibility tracking.

Captures container version, SIF hash, host package versions, dev repo git
commits, and lock file hashes into a single JSON-serializable dict.
"""

from __future__ import annotations

import hashlib
import shutil
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from scitex_container._compat import supports_return_as


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------


[docs] @supports_return_as def env_snapshot( containers_dir: str | Path | None = None, dev_repos: list[str | Path] | None = None, ) -> dict[str, Any]: """Capture a lightweight JSON-serializable environment snapshot. Gracefully degrades — never raises, just omits fields that cannot be determined. Parameters ---------- containers_dir : str or Path, optional Path to the containers directory. Auto-detected via ``find_containers_dir()`` when *None*. dev_repos : list of str or Path, optional Paths to git repositories to include in ``dev_repos`` section. Returns ------- dict JSON-serializable snapshot with keys: ``schema_version``, ``timestamp``, ``container``, ``host``, ``dev_repos``, ``lock_files``. """ snap: dict[str, Any] = { "schema_version": "1.0", "timestamp": datetime.now(tz=timezone.utc).isoformat(), } snap["container"] = _capture_container(containers_dir) snap["host"] = _capture_host() snap["dev_repos"] = _capture_dev_repos(dev_repos or []) snap["lock_files"] = _capture_lock_files(containers_dir) return snap
# --------------------------------------------------------------------------- # Container section # --------------------------------------------------------------------------- def _capture_container(containers_dir: str | Path | None) -> dict[str, Any]: """Capture container version and SIF hash.""" result: dict[str, Any] = {} try: from .apptainer import find_containers_dir, get_active_version cdir: Path if containers_dir is not None: cdir = Path(containers_dir) else: cdir = find_containers_dir() version = get_active_version(cdir) result["version"] = version # Resolve the current.sif symlink to get the actual SIF path link = cdir / "current.sif" if link.is_symlink(): sif_path = link.resolve() result["sif_path"] = str(sif_path) if sif_path.is_file(): result["sif_sha256"] = _sha256_file(sif_path) # Look for a .def-hash sidecar file def_hash_file = sif_path.with_suffix(".def-hash") if def_hash_file.is_file(): result["def_hash"] = def_hash_file.read_text().strip() elif link.is_file(): # Plain file (not a symlink) — unusual but handle it result["sif_path"] = str(link) result["sif_sha256"] = _sha256_file(link) except Exception: # Gracefully degrade — return whatever was gathered pass return result # --------------------------------------------------------------------------- # Host section # --------------------------------------------------------------------------- def _capture_host() -> dict[str, Any]: """Capture host package installation status.""" result: dict[str, Any] = {} try: from .host import check_packages packages = check_packages() for pkg_name, info in packages.items(): result[pkg_name] = { "installed": info.get("installed", False), "version": info.get("version", ""), } except Exception: pass return result # --------------------------------------------------------------------------- # Dev repos section # --------------------------------------------------------------------------- def _capture_dev_repos(repo_paths: list[str | Path]) -> list[dict[str, Any]]: """Capture git metadata for each dev repo path.""" repos: list[dict[str, Any]] = [] for raw_path in repo_paths: entry = _capture_one_repo(Path(raw_path)) repos.append(entry) return repos def _capture_one_repo(repo_path: Path) -> dict[str, Any]: """Capture git metadata for a single repository path.""" entry: dict[str, Any] = { "name": repo_path.name, "path": str(repo_path.resolve()), } if not repo_path.exists(): entry["error"] = "path does not exist" return entry git = shutil.which("git") if git is None: entry["error"] = "git not found" return entry def _run(args: list[str]) -> str: """Run a git command and return stripped stdout, or '' on failure.""" try: proc = subprocess.run( [git, "-C", str(repo_path)] + args, capture_output=True, text=True, timeout=10, ) return proc.stdout.strip() if proc.returncode == 0 else "" except Exception: return "" commit = _run(["rev-parse", "HEAD"]) if commit: entry["commit"] = commit branch = _run(["branch", "--show-current"]) if branch: entry["branch"] = branch porcelain = _run(["status", "--porcelain"]) entry["dirty"] = bool(porcelain) return entry # --------------------------------------------------------------------------- # Lock files section # --------------------------------------------------------------------------- def _capture_lock_files(containers_dir: str | Path | None) -> dict[str, Any]: """Capture SHA256 hashes of lock files in the containers directory.""" result: dict[str, Any] = {} try: from .apptainer import find_containers_dir cdir: Path if containers_dir is not None: cdir = Path(containers_dir) else: cdir = find_containers_dir() # pip lock (requirements_lock.txt or requirements.lock) for candidate in ("requirements_lock.txt", "requirements.lock"): lock_path = cdir / candidate if lock_path.is_file(): result["pip"] = _sha256_file(lock_path) break # dpkg lock for candidate in ("dpkg_lock.txt", "dpkg.lock"): lock_path = cdir / candidate if lock_path.is_file(): result["dpkg"] = _sha256_file(lock_path) break except Exception: pass return result # --------------------------------------------------------------------------- # Utilities # --------------------------------------------------------------------------- def _sha256_file(path: Path, chunk_size: int = 1 << 20) -> str: """Compute SHA256 hex digest of a file, reading in chunks.""" h = hashlib.sha256() try: with path.open("rb") as fh: while True: chunk = fh.read(chunk_size) if not chunk: break h.update(chunk) return h.hexdigest() except OSError: return "" # EOF