exposure-check.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #!/usr/bin/env python3
  2. """Match on-disk installed packages against an IOC exposure catalog.
  3. Answers the post-advisory question: "an advisory named package X@Y — do we
  4. have it installed right now, and where?" Cross-platform (works on Windows,
  5. unlike Perplexity's Bumblebee, whose exposure-catalog JSON format this borrows).
  6. Reads npm lockfiles and Python installed metadata; no package-manager execution,
  7. no network, no source reads.
  8. Usage: exposure-check.py [--catalog PATH] [--root DIR]... [--json] [--findings-only]
  9. Input: --root dirs (default: cwd); --catalog file or dir of *.json
  10. (default: bundled assets/exposure-catalog.json)
  11. Output: stdout = findings (or all components), NDJSON-ish JSON with --json
  12. Stderr: progress, summary, errors
  13. Exit: 0 no exposure, 2 usage, 3 catalog-not-found, 4 invalid-catalog,
  14. 10 EXPOSURE FOUND (>=1 installed package matches the catalog)
  15. Examples:
  16. exposure-check.py --root ~/code
  17. exposure-check.py --root . --json | jq '.data.findings[]'
  18. exposure-check.py --catalog ./my-iocs.json --root /srv/app --findings-only
  19. """
  20. import argparse, json, os, re, sys
  21. from pathlib import Path
  22. from typing import NoReturn
  23. EXIT_OK, EXIT_USAGE, EXIT_NOT_FOUND, EXIT_INVALID, EXIT_EXPOSED = 0, 2, 3, 4, 10
  24. SKIP_DIRS = {".git", ".hg", ".svn", "worktrees"}
  25. DEFAULT_CATALOG = Path(__file__).resolve().parent.parent / "assets" / "exposure-catalog.json"
  26. def log(msg): print(msg, file=sys.stderr)
  27. def die(msg, code) -> NoReturn:
  28. log(f"ERROR: {msg}")
  29. sys.exit(code)
  30. def load_catalog(path: Path):
  31. files = []
  32. if path.is_dir():
  33. files = sorted(path.glob("*.json"))
  34. elif path.is_file():
  35. files = [path]
  36. if not files:
  37. die(f"catalog not found: {path}", EXIT_NOT_FOUND)
  38. entries, ver = [], None
  39. for f in files:
  40. doc = {}
  41. try:
  42. doc = json.loads(f.read_text(encoding="utf-8"))
  43. except (json.JSONDecodeError, OSError) as e:
  44. die(f"invalid catalog {f}: {e}", EXIT_INVALID)
  45. if ver is None:
  46. ver = doc.get("schema_version")
  47. elif doc.get("schema_version") != ver:
  48. die(f"schema_version mismatch across catalogs: {f}", EXIT_INVALID)
  49. entries.extend(doc.get("entries", []))
  50. # index: (ecosystem, lowercased package name) -> {version: entry}
  51. index = {}
  52. for e in entries:
  53. key = (e.get("ecosystem", ""), str(e.get("package", "")).lower())
  54. index.setdefault(key, {})
  55. for v in e.get("versions", []):
  56. index[key][str(v)] = e
  57. return index, ver, len(entries)
  58. def walk(roots):
  59. for root in roots:
  60. base = Path(root).expanduser()
  61. if not base.exists():
  62. log(f"[warn] root does not exist: {base}")
  63. continue
  64. for dirpath, dirnames, filenames in os.walk(base):
  65. dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
  66. yield Path(dirpath), filenames
  67. def add(components, ecosystem, name, version, source):
  68. if name and version:
  69. components.append({"ecosystem": ecosystem, "name": str(name),
  70. "version": str(version), "source": str(source)})
  71. def parse_npm_lock(path: Path, components):
  72. try:
  73. doc = json.loads(path.read_text(encoding="utf-8"))
  74. except (json.JSONDecodeError, OSError):
  75. return
  76. # lockfileVersion 2/3: packages{} keyed by "node_modules/<name>"
  77. for pkgpath, meta in (doc.get("packages") or {}).items():
  78. if not pkgpath:
  79. continue # root package entry ""
  80. name = pkgpath.split("node_modules/")[-1]
  81. add(components, "npm", name, meta.get("version"), path)
  82. # lockfileVersion 1: dependencies{} (recursive)
  83. def walk_deps(deps):
  84. for name, meta in (deps or {}).items():
  85. add(components, "npm", name, meta.get("version"), path)
  86. walk_deps(meta.get("dependencies"))
  87. walk_deps(doc.get("dependencies"))
  88. REQ_RE = re.compile(r"^\s*([A-Za-z0-9_.\-]+)\s*==\s*([A-Za-z0-9_.\-]+)")
  89. def parse_requirements(path: Path, components):
  90. try:
  91. for line in path.read_text(encoding="utf-8").splitlines():
  92. m = REQ_RE.match(line)
  93. if m:
  94. add(components, "pypi", m.group(1), m.group(2), path)
  95. except OSError:
  96. pass
  97. def parse_dist_info(path: Path, components): # *.dist-info/METADATA
  98. name = ver = None
  99. try:
  100. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  101. if line.startswith("Name:"):
  102. name = line.split(":", 1)[1].strip()
  103. elif line.startswith("Version:"):
  104. ver = line.split(":", 1)[1].strip()
  105. if name and ver:
  106. break
  107. except OSError:
  108. return
  109. add(components, "pypi", name, ver, path)
  110. def collect(roots):
  111. components = []
  112. for dirpath, filenames in walk(roots):
  113. for fn in filenames:
  114. full = dirpath / fn
  115. if fn in ("package-lock.json", "npm-shrinkwrap.json", ".package-lock.json"):
  116. parse_npm_lock(full, components)
  117. elif fn.startswith("requirements") and fn.endswith(".txt"):
  118. parse_requirements(full, components)
  119. elif fn == "METADATA" and dirpath.name.endswith(".dist-info"):
  120. parse_dist_info(full, components)
  121. return components
  122. def main():
  123. # Force UTF-8 on Windows so help text / output never crash on cp1252
  124. # (the same class of bug GuardDog hits — see references/tooling-landscape.md).
  125. for stream in (sys.stdout, sys.stderr):
  126. try:
  127. stream.reconfigure(encoding="utf-8") # type: ignore[attr-defined]
  128. except (AttributeError, ValueError):
  129. pass
  130. ap = argparse.ArgumentParser(add_help=True, description=__doc__,
  131. formatter_class=argparse.RawDescriptionHelpFormatter)
  132. ap.add_argument("--catalog", default=str(DEFAULT_CATALOG),
  133. help="IOC catalog JSON file or dir of *.json")
  134. ap.add_argument("--root", action="append", default=[],
  135. help="directory to scan (repeatable; default: cwd)")
  136. ap.add_argument("--json", action="store_true", help="machine-readable output")
  137. ap.add_argument("--findings-only", action="store_true",
  138. help="emit only matches, not the full component inventory")
  139. args = ap.parse_args()
  140. roots = args.root or ["."]
  141. index, schema_ver, n_entries = load_catalog(Path(args.catalog).expanduser())
  142. log(f"=== exposure-check: {n_entries} IOC entries (schema {schema_ver}), "
  143. f"roots: {', '.join(roots)} ===")
  144. components = collect(roots)
  145. findings = []
  146. for c in components:
  147. bucket = index.get((c["ecosystem"], c["name"].lower()))
  148. if bucket and c["version"] in bucket:
  149. e = bucket[c["version"]]
  150. findings.append({**c, "ioc_id": e.get("id"),
  151. "severity": e.get("severity", "unknown"),
  152. "note": e.get("note", "")})
  153. if args.json:
  154. data: dict[str, object] = {"findings": findings}
  155. if not args.findings_only:
  156. data["components_scanned"] = len(components)
  157. print(json.dumps({"data": data, "meta": {
  158. "exposed": bool(findings), "findings": len(findings),
  159. "components_scanned": len(components), "ioc_entries": n_entries,
  160. "schema": "axiom.tool.exposure-check.report/v1"}}))
  161. else:
  162. if not args.findings_only:
  163. for c in components:
  164. print(f"{c['ecosystem']}\t{c['name']}\t{c['version']}\t{c['source']}")
  165. for f in findings:
  166. log(f" [EXPOSED] {f['ecosystem']} {f['name']}@{f['version']} "
  167. f"({f['severity']}, {f['ioc_id']}) - {f['source']}")
  168. if findings:
  169. log(f"EXPOSED: {len(findings)} installed package(s) match the IOC catalog. "
  170. f"Treat as incident: isolate, rotate creds, remove the package.")
  171. sys.exit(EXIT_EXPOSED)
  172. log(f"Clean: 0 of {len(components)} scanned components match the catalog.")
  173. sys.exit(EXIT_OK)
  174. if __name__ == "__main__":
  175. main()