Source code for scitex_container.apptainer._lockgen

#!/usr/bin/env python3
# Timestamp: "2026-05-24"
# File: src/scitex_container/apptainer/_lockgen.py
"""Lock capture, locked-def generation, and version-set comparison.

Three pure pieces of the reproducible round-trip:

1. ``capture_lock`` — introspect a built SIF for the *actually installed*
   versions (pip / dpkg / node) and write a single combined ``.lock``
   file. The lock is only knowable post-build; that's why up-front
   pinning is impossible and the round-trip is needed.
2. ``generate_locked_def`` — emit a new ``.def`` from a rough ``.def``
   plus a captured lock, pinning every pip package to its frozen
   version (``pip install pkg==ver``). apt/node pinning is captured in
   the lock for the round-trip comparison but apt-version injection is
   deferred (operator step 3) — the version-set comparison still proves
   reproducibility of the pip surface, the dominant drift source.
3. ``compare_locks`` — the round-trip gate: are two locks' version sets
   identical? Returns a structured diff naming what drifted.

The on-disk ``.lock`` format is a single text file with section headers
so a human can read it and ``compare_locks`` can parse it back::

    # scitex-container lock
    [pip]
    numpy==2.1.0
    ...
    [dpkg]
    libc6=2.39-0ubuntu8
    ...
    [node]
    {"dependencies": {...}}
"""

from __future__ import annotations

import json
import logging
import re
import subprocess
from dataclasses import dataclass, field
from pathlib import Path

from scitex_container._compat import supports_return_as

from ._utils import detect_container_cmd

logger = logging.getLogger(__name__)

_SECTIONS = ("pip", "dpkg", "node")


[docs] @dataclass class Lock: """Parsed version sets captured from a SIF. pip / dpkg are ``{name: version}`` maps; node is the raw ``npm list -g --json`` text (kept verbatim — the JSON shape is the version set for node globals). """ pip: dict[str, str] = field(default_factory=dict) dpkg: dict[str, str] = field(default_factory=dict) node: str = ""
[docs] def version_set(self) -> dict[str, str]: """Flat ``{qualified_name: version}`` map for set comparison. node packages are flattened from the npm JSON into ``node:<name> -> <version>`` entries so a node-global drift is caught by the same comparison as pip/dpkg. """ merged: dict[str, str] = {} for name, ver in self.pip.items(): merged[f"pip:{name}"] = ver for name, ver in self.dpkg.items(): merged[f"dpkg:{name}"] = ver for name, ver in _parse_node_versions(self.node).items(): merged[f"node:{name}"] = ver return merged
def _parse_node_versions(node_json: str) -> dict[str, str]: """Extract ``{name: version}`` from ``npm list -g --json`` output.""" if not node_json.strip(): return {} try: data = json.loads(node_json) except (json.JSONDecodeError, ValueError): return {} deps = data.get("dependencies", {}) if isinstance(data, dict) else {} out: dict[str, str] = {} for name, info in deps.items(): if isinstance(info, dict) and "version" in info: out[name] = str(info["version"]) return out def _parse_pip_freeze(text: str) -> dict[str, str]: """Parse ``pip freeze`` output into ``{name: version}``. Handles the common ``name==ver`` form; lines without ``==`` (editable installs, VCS pins, ``-e`` URLs) are kept under their raw key so a drift in them is still caught. """ out: dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith("#"): continue if "==" in line: name, ver = line.split("==", 1) out[name.strip().lower()] = ver.strip() else: # editable / VCS / url pin — key on the whole line, no version out[line] = "" return out def _parse_dpkg(text: str) -> dict[str, str]: """Parse ``dpkg-query -W -f=${Package}=${Version}`` into a map.""" out: dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line or "=" not in line: continue name, ver = line.split("=", 1) out[name.strip()] = ver.strip() return out
[docs] @supports_return_as def capture_lock( sif_path: str | Path, lock_path: str | Path, ) -> Lock: """Introspect a built SIF and write a combined ``.lock`` file. Parameters ---------- sif_path : str or Path Path to the built ``.sif``. lock_path : str or Path Destination for the combined lock file (``<layer>-<ts>.lock``). Returns ------- Lock The captured version sets. Raises ------ FileNotFoundError If the SIF or the container command is missing. """ sif_path = Path(sif_path) if not sif_path.exists(): raise FileNotFoundError(f"SIF not found: {sif_path}") cmd = detect_container_cmd() lock = Lock() # pip may be `pip` or `pip3` depending on the base image; try both and # take the first that yields output. for pip_bin in ("pip", "pip3"): pip_out = _exec_capture(cmd, sif_path, [pip_bin, "freeze"]) if pip_out.strip(): lock.pip = _parse_pip_freeze(pip_out) break dpkg_out = _exec_capture( cmd, sif_path, ["dpkg-query", "-W", "-f=${Package}=${Version}\n"] ) lock.dpkg = _parse_dpkg(dpkg_out) node_out = _exec_capture( cmd, sif_path, ["npm", "list", "-g", "--depth=0", "--json"] ) lock.node = node_out write_lock(lock, lock_path) logger.info( "Captured lock: %d pip, %d dpkg, %d node packages", len(lock.pip), len(lock.dpkg), len(_parse_node_versions(lock.node)), ) return lock
def _exec_capture(cmd: str, sif_path: Path, argv: list[str]) -> str: """Run an introspection command inside the SIF, isolated from the host. Uses ``--cleanenv --no-home --containall`` so the *container's* own package managers and site-packages are introspected — NOT the host's. Without these, apptainer auto-mounts ``$HOME`` and inherits the host ``PATH`` / ``PYTHONPATH`` / ``~/.local``, so ``pip freeze`` would capture the host environment and silently defeat reproducibility. Returns the command's stdout, or ``""`` when it fails (e.g. the tool is absent from this base image). """ result = subprocess.run( [cmd, "exec", "--cleanenv", "--no-home", str(sif_path), *argv], capture_output=True, text=True, ) return result.stdout if result.returncode == 0 else ""
[docs] @supports_return_as def write_lock(lock: Lock, lock_path: str | Path) -> Path: """Serialize a ``Lock`` to the combined ``.lock`` text format.""" lock_path = Path(lock_path) lock_path.parent.mkdir(parents=True, exist_ok=True) lines = ["# scitex-container lock", "[pip]"] for name in sorted(lock.pip): ver = lock.pip[name] lines.append(f"{name}=={ver}" if ver else name) lines.append("[dpkg]") for name in sorted(lock.dpkg): lines.append(f"{name}={lock.dpkg[name]}") lines.append("[node]") lines.append(lock.node.strip()) lock_path.write_text("\n".join(lines) + "\n") return lock_path
[docs] @supports_return_as def read_lock(lock_path: str | Path) -> Lock: """Parse a combined ``.lock`` file back into a ``Lock``.""" lock_path = Path(lock_path) text = lock_path.read_text() lock = Lock() section: str | None = None node_lines: list[str] = [] for raw in text.splitlines(): line = raw.rstrip("\n") stripped = line.strip() if stripped.startswith("#"): continue m = re.match(r"^\[(\w+)\]$", stripped) if m and m.group(1) in _SECTIONS: section = m.group(1) continue if section == "pip": if not stripped: continue if "==" in stripped: name, ver = stripped.split("==", 1) lock.pip[name.strip().lower()] = ver.strip() else: lock.pip[stripped] = "" elif section == "dpkg": if not stripped or "=" not in stripped: continue name, ver = stripped.split("=", 1) lock.dpkg[name.strip()] = ver.strip() elif section == "node": node_lines.append(line) lock.node = "\n".join(node_lines).strip() return lock
[docs] @supports_return_as def generate_locked_def( rough_def: str | Path, lock: Lock, out_def: str | Path, ) -> Path: """Emit a version-pinned ``.def`` from a rough ``.def`` + a lock. The locked def is the rough def with a pinned-pip ``%post`` stanza appended: ``pip install pkg==ver ...`` for every frozen pip package. Re-installing already-installed packages at their exact versions is a no-op in the happy path and *forces* the pin in the drift path — so a rebuild from this def reproduces the captured pip version set. apt/node version sets are recorded in the lock and compared by the round-trip, but apt-version injection into the def is deferred (operator step 3 — ``pkg=ver`` from dpkg or ``snapshot.ubuntu.com``). Parameters ---------- rough_def : str or Path The loosely-pinned recipe the rough build used. lock : Lock Captured versions from the rough build. out_def : str or Path Destination for the generated locked ``.def``. Returns ------- Path Path to the written locked ``.def``. """ rough_def = Path(rough_def) out_def = Path(out_def) base = rough_def.read_text() pins = [f"{name}=={ver}" for name, ver in sorted(lock.pip.items()) if ver] stanza_lines = [ "", "# --- scitex-container: pinned pip versions (generated) ---------------", "# Re-pin every pip package to the version frozen from the rough build.", "# A no-op when the resolver already lands these versions; forces the", "# pin when it would otherwise drift. Reproduces the captured pip set.", "%post", " set -e", ] if pins: # Heredoc a requirements file rather than a long argv — robust to # any pin count and shell-quoting-safe. stanza_lines.append(" cat > /tmp/scitex-pins.txt <<'SCITEX_PINS'") for pin in pins: stanza_lines.append(f"{pin}") stanza_lines.append("SCITEX_PINS") # Pick the pip the base ships (pip / pip3) and add # --break-system-packages only when this pip understands it # (PEP 668 environments: Ubuntu 24.04, Debian 12, alpine 3.19+). # --no-deps: pin exactly the captured set without re-resolving. stanza_lines += [ ' _pip="$(command -v pip3 || command -v pip)"', ' _bsp=""', ' if "$_pip" install --help 2>/dev/null | grep -q -- ' "--break-system-packages; then", ' _bsp="--break-system-packages"', " fi", ' "$_pip" install --no-deps $_bsp -r /tmp/scitex-pins.txt', " rm -f /tmp/scitex-pins.txt", ] else: stanza_lines.append(" : # no pip packages captured") stanza_lines.append( "# --- end scitex-container pins ---------------------------------------" ) stanza_lines.append("") out_def.parent.mkdir(parents=True, exist_ok=True) out_def.write_text(base.rstrip("\n") + "\n" + "\n".join(stanza_lines)) logger.info( "Generated locked def with %d pinned pip packages: %s", len(pins), out_def ) return out_def
[docs] @dataclass(frozen=True) class LockDiff: """Result of comparing two locks' version sets.""" identical: bool changed: dict[str, tuple[str, str]] = field(default_factory=dict) # name -> (a, b) only_in_a: dict[str, str] = field(default_factory=dict) only_in_b: dict[str, str] = field(default_factory=dict)
[docs] def summary(self) -> str: """One-paragraph human summary of the drift, or 'identical'.""" if self.identical: return "version sets identical" parts: list[str] = [] if self.changed: sample = ", ".join( f"{n}: {a} -> {b}" for n, (a, b) in sorted(self.changed.items())[:10] ) parts.append(f"{len(self.changed)} changed ({sample})") if self.only_in_a: parts.append(f"{len(self.only_in_a)} only in rough") if self.only_in_b: parts.append(f"{len(self.only_in_b)} only in rebuild") return "; ".join(parts)
[docs] @supports_return_as def compare_locks(lock_a: Lock, lock_b: Lock) -> LockDiff: """Compare two locks' flat version sets — the round-trip gate. Parameters ---------- lock_a, lock_b : Lock The rough-build lock and the rebuild lock. Returns ------- LockDiff ``identical=True`` when every package name maps to the same version in both; otherwise the structured diff names every changed / added / missing package. """ set_a = lock_a.version_set() set_b = lock_b.version_set() names_a = set(set_a) names_b = set(set_b) only_in_a = {n: set_a[n] for n in names_a - names_b} only_in_b = {n: set_b[n] for n in names_b - names_a} changed = { n: (set_a[n], set_b[n]) for n in names_a & names_b if set_a[n] != set_b[n] } identical = not (changed or only_in_a or only_in_b) return LockDiff( identical=identical, changed=changed, only_in_a=only_in_a, only_in_b=only_in_b, )
# EOF