exposure-check.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. #!/usr/bin/env python3
  2. """Match on-disk installed packages against an IOC exposure catalog.
  3. Answers the post-advisory question: "an advisory named package X@Y — do we
  4. have it installed right now, and where?" Cross-platform (works on Windows,
  5. unlike Perplexity's Bumblebee, whose exposure-catalog JSON format this borrows).
  6. Reads lockfiles + installed metadata across npm (package-lock / pnpm-lock /
  7. yarn.lock), PyPI, Composer, Cargo, Go, and RubyGems, plus installed editor
  8. extensions; no package-manager execution, no network, no source reads.
  9. Usage: exposure-check.py [--catalog PATH] [--root DIR]... [--json] [--findings-only]
  10. Input: --root dirs (default: cwd); --catalog file or dir of *.json
  11. (default: bundled assets/exposure-catalog.json)
  12. Output: stdout = findings (or all components), NDJSON-ish JSON with --json
  13. Stderr: progress, summary, errors
  14. Exit: 0 no exposure, 2 usage, 3 catalog-not-found, 4 invalid-catalog,
  15. 10 EXPOSURE FOUND (>=1 installed package matches the catalog)
  16. Examples:
  17. exposure-check.py --root ~/code
  18. exposure-check.py --root . --json | jq '.data.findings[]'
  19. exposure-check.py --catalog ./my-iocs.json --root /srv/app --findings-only
  20. """
  21. import argparse, json, os, re, sys
  22. from pathlib import Path
  23. from typing import NoReturn
  24. EXIT_OK, EXIT_USAGE, EXIT_NOT_FOUND, EXIT_INVALID, EXIT_EXPOSED = 0, 2, 3, 4, 10
  25. SKIP_DIRS = {".git", ".hg", ".svn", "worktrees"}
  26. DEFAULT_CATALOG = Path(__file__).resolve().parent.parent / "assets" / "exposure-catalog.json"
  27. def log(msg): print(msg, file=sys.stderr)
  28. def die(msg, code) -> NoReturn:
  29. log(f"ERROR: {msg}")
  30. sys.exit(code)
  31. def load_catalog(path: Path):
  32. files = []
  33. if path.is_dir():
  34. files = sorted(path.glob("*.json"))
  35. elif path.is_file():
  36. files = [path]
  37. if not files:
  38. die(f"catalog not found: {path}", EXIT_NOT_FOUND)
  39. entries, ver = [], None
  40. for f in files:
  41. doc = {}
  42. try:
  43. doc = json.loads(f.read_text(encoding="utf-8"))
  44. except (json.JSONDecodeError, OSError) as e:
  45. die(f"invalid catalog {f}: {e}", EXIT_INVALID)
  46. if ver is None:
  47. ver = doc.get("schema_version")
  48. elif doc.get("schema_version") != ver:
  49. die(f"schema_version mismatch across catalogs: {f}", EXIT_INVALID)
  50. entries.extend(doc.get("entries", []))
  51. # index: (ecosystem, lowercased package name) -> {version: entry}
  52. index = {}
  53. for e in entries:
  54. key = (e.get("ecosystem", ""), str(e.get("package", "")).lower())
  55. index.setdefault(key, {})
  56. for v in e.get("versions", []):
  57. index[key][str(v)] = e
  58. return index, ver, len(entries)
  59. def walk(roots):
  60. for root in roots:
  61. base = Path(root).expanduser()
  62. if not base.exists():
  63. log(f"[warn] root does not exist: {base}")
  64. continue
  65. for dirpath, dirnames, filenames in os.walk(base):
  66. dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
  67. yield Path(dirpath), filenames
  68. def add(components, ecosystem, name, version, source):
  69. if name and version:
  70. components.append({"ecosystem": ecosystem, "name": str(name),
  71. "version": str(version), "source": str(source)})
  72. def parse_npm_lock(path: Path, components):
  73. try:
  74. doc = json.loads(path.read_text(encoding="utf-8"))
  75. except (json.JSONDecodeError, OSError):
  76. return
  77. # lockfileVersion 2/3: packages{} keyed by "node_modules/<name>"
  78. for pkgpath, meta in (doc.get("packages") or {}).items():
  79. if not pkgpath:
  80. continue # root package entry ""
  81. name = pkgpath.split("node_modules/")[-1]
  82. add(components, "npm", name, meta.get("version"), path)
  83. # lockfileVersion 1: dependencies{} (recursive)
  84. def walk_deps(deps):
  85. for name, meta in (deps or {}).items():
  86. add(components, "npm", name, meta.get("version"), path)
  87. walk_deps(meta.get("dependencies"))
  88. walk_deps(doc.get("dependencies"))
  89. # pnpm-lock.yaml package keys: "/axios@1.14.1:", "axios@1.14.1:", "/@vue/cli@5.0.8(...)"
  90. PNPM_RE = re.compile(r"^\s+'?/?(@?[A-Za-z0-9][\w.-]*(?:/[\w.-]+)?)@([0-9][\w.\-]*)")
  91. def parse_pnpm_lock(path: Path, components): # pnpm-lock.yaml (regex; no YAML dep)
  92. seen = set()
  93. try:
  94. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  95. m = PNPM_RE.match(line)
  96. if m and (m.group(1), m.group(2)) not in seen:
  97. seen.add((m.group(1), m.group(2)))
  98. add(components, "npm", m.group(1), m.group(2), path)
  99. except OSError:
  100. pass
  101. BUN_RE = re.compile(r'"(@?[A-Za-z0-9][\w.-]*(?:/[\w.-]+)?)@([0-9][\w.\-+]*)"')
  102. def parse_bun_lock(path: Path, components): # bun.lock (text/JSONC) — regex name@version
  103. seen = set()
  104. try:
  105. for m in BUN_RE.finditer(path.read_text(encoding="utf-8", errors="replace")):
  106. if (m.group(1), m.group(2)) not in seen:
  107. seen.add((m.group(1), m.group(2)))
  108. add(components, "npm", m.group(1), m.group(2), path)
  109. except OSError:
  110. pass
  111. def parse_yarn_lock(path: Path, components): # yarn.lock (classic + Berry)
  112. name = None; seen = set()
  113. try:
  114. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  115. if line and not line[0].isspace() and line.rstrip().endswith(":"):
  116. first = line.strip()[:-1].split(",")[0].strip().strip('"')
  117. if first.startswith("__") or "@" not in first:
  118. name = None
  119. elif first.startswith("@"):
  120. name = "@" + first[1:].split("@")[0] # @scope/pkg
  121. else:
  122. name = first.split("@")[0]
  123. elif name:
  124. m = re.match(r'\s+version[:\s]+"?([0-9][^"\s]*)"?', line)
  125. if m and (name, m.group(1)) not in seen:
  126. seen.add((name, m.group(1)))
  127. add(components, "npm", name, m.group(1), path)
  128. name = None
  129. except OSError:
  130. pass
  131. REQ_RE = re.compile(r"^\s*([A-Za-z0-9_.\-]+)\s*==\s*([A-Za-z0-9_.\-]+)")
  132. def parse_requirements(path: Path, components):
  133. try:
  134. for line in path.read_text(encoding="utf-8").splitlines():
  135. m = REQ_RE.match(line)
  136. if m:
  137. add(components, "pypi", m.group(1), m.group(2), path)
  138. except OSError:
  139. pass
  140. def parse_dist_info(path: Path, components): # *.dist-info/METADATA
  141. name = ver = None
  142. try:
  143. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  144. if line.startswith("Name:"):
  145. name = line.split(":", 1)[1].strip()
  146. elif line.startswith("Version:"):
  147. ver = line.split(":", 1)[1].strip()
  148. if name and ver:
  149. break
  150. except OSError:
  151. return
  152. add(components, "pypi", name, ver, path)
  153. def parse_composer_lock(path: Path, components): # composer.lock (JSON)
  154. try:
  155. doc = json.loads(path.read_text(encoding="utf-8"))
  156. except (json.JSONDecodeError, OSError):
  157. return
  158. for key in ("packages", "packages-dev"):
  159. for meta in (doc.get(key) or []):
  160. add(components, "composer", meta.get("name"), meta.get("version"), path)
  161. def parse_cargo_lock(path: Path, components): # Cargo.lock (TOML; needs py3.11+ tomllib)
  162. try:
  163. import tomllib
  164. except ImportError:
  165. return # tomllib is 3.11+; skip Cargo on older pythons
  166. try:
  167. doc = tomllib.loads(path.read_text(encoding="utf-8"))
  168. except Exception: # OSError or tomllib.TOMLDecodeError
  169. return
  170. for pkg in doc.get("package", []):
  171. add(components, "cargo", pkg.get("name"), pkg.get("version"), path)
  172. def parse_go_sum(path: Path, components): # go.sum lines: "<module> <version>[/go.mod] <hash>"
  173. seen = set()
  174. try:
  175. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  176. parts = line.split()
  177. if len(parts) >= 2 and parts[1].startswith("v"):
  178. mod, ver = parts[0], parts[1].replace("/go.mod", "")
  179. if (mod, ver) not in seen:
  180. seen.add((mod, ver))
  181. add(components, "go", mod, ver, path)
  182. except OSError:
  183. pass
  184. GEM_RE = re.compile(r"^\s{4}([A-Za-z0-9_.\-]+) \(([^)]+)\)\s*$")
  185. def parse_gemfile_lock(path: Path, components): # Gemfile.lock GEM/specs section
  186. try:
  187. for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
  188. m = GEM_RE.match(line)
  189. if m:
  190. add(components, "rubygems", m.group(1), m.group(2), path)
  191. except OSError:
  192. pass
  193. # Installed editor extensions live in fixed HOME dirs, not under --root. Each
  194. # extension is a <publisher>.<name>-<version>/package.json. Covers the Nx Console /
  195. # GitHub-breach vector (malicious VS Code extension) that package scanning misses.
  196. EXT_DIRS = [
  197. "~/.vscode/extensions", "~/.vscode-server/extensions", "~/.vscode-oss/extensions",
  198. "~/.cursor/extensions", "~/.windsurf/extensions",
  199. ]
  200. def collect_editor_extensions():
  201. comps = []
  202. # SC_EXT_DIRS (os.pathsep-separated) overrides the defaults — for tests or
  203. # non-standard install locations.
  204. dirs = os.environ.get("SC_EXT_DIRS", "").split(os.pathsep) if os.environ.get("SC_EXT_DIRS") else EXT_DIRS
  205. for d in dirs:
  206. if not d:
  207. continue
  208. base = Path(d).expanduser()
  209. if not base.is_dir():
  210. continue
  211. for pkg in base.glob("*/package.json"):
  212. try:
  213. doc = json.loads(pkg.read_text(encoding="utf-8", errors="replace"))
  214. except (json.JSONDecodeError, OSError):
  215. continue
  216. pub, name, ver = doc.get("publisher"), doc.get("name"), doc.get("version")
  217. if pub and name:
  218. add(comps, "editor-extension", f"{pub}.{name}", ver, pkg)
  219. return comps
  220. def collect(roots):
  221. components = []
  222. for dirpath, filenames in walk(roots):
  223. for fn in filenames:
  224. full = dirpath / fn
  225. if fn in ("package-lock.json", "npm-shrinkwrap.json", ".package-lock.json"):
  226. parse_npm_lock(full, components)
  227. elif fn == "pnpm-lock.yaml":
  228. parse_pnpm_lock(full, components)
  229. elif fn == "yarn.lock":
  230. parse_yarn_lock(full, components)
  231. elif fn == "bun.lock":
  232. parse_bun_lock(full, components)
  233. elif fn.startswith("requirements") and fn.endswith(".txt"):
  234. parse_requirements(full, components)
  235. elif fn == "METADATA" and dirpath.name.endswith(".dist-info"):
  236. parse_dist_info(full, components)
  237. elif fn == "composer.lock":
  238. parse_composer_lock(full, components)
  239. elif fn == "Cargo.lock":
  240. parse_cargo_lock(full, components)
  241. elif fn == "go.sum":
  242. parse_go_sum(full, components)
  243. elif fn == "Gemfile.lock":
  244. parse_gemfile_lock(full, components)
  245. return components
  246. def main():
  247. # Force UTF-8 on Windows so help text / output never crash on cp1252
  248. # (the same class of bug GuardDog hits — see references/tooling-landscape.md).
  249. for stream in (sys.stdout, sys.stderr):
  250. try:
  251. stream.reconfigure(encoding="utf-8") # type: ignore[attr-defined]
  252. except (AttributeError, ValueError):
  253. pass
  254. ap = argparse.ArgumentParser(add_help=True, description=__doc__,
  255. formatter_class=argparse.RawDescriptionHelpFormatter)
  256. ap.add_argument("--catalog", default=str(DEFAULT_CATALOG),
  257. help="IOC catalog JSON file or dir of *.json")
  258. ap.add_argument("--root", action="append", default=[],
  259. help="directory to scan (repeatable; default: cwd)")
  260. ap.add_argument("--json", action="store_true", help="machine-readable output")
  261. ap.add_argument("--findings-only", action="store_true",
  262. help="emit only matches, not the full component inventory")
  263. ap.add_argument("--no-extensions", action="store_true",
  264. help="skip the installed-editor-extension inventory")
  265. args = ap.parse_args()
  266. roots = args.root or ["."]
  267. index, schema_ver, n_entries = load_catalog(Path(args.catalog).expanduser())
  268. log(f"=== exposure-check: {n_entries} IOC entries (schema {schema_ver}), "
  269. f"roots: {', '.join(roots)} ===")
  270. components = collect(roots)
  271. if not args.no_extensions:
  272. components += collect_editor_extensions()
  273. findings = []
  274. for c in components:
  275. bucket = index.get((c["ecosystem"], c["name"].lower()))
  276. # "*" in a catalog entry's versions flags ANY installed version — the right
  277. # model for tag-rewrite attacks (Laravel-Lang) where every version is poisoned.
  278. if bucket and (c["version"] in bucket or "*" in bucket):
  279. e = bucket.get(c["version"]) or bucket["*"]
  280. findings.append({**c, "ioc_id": e.get("id"),
  281. "severity": e.get("severity", "unknown"),
  282. "note": e.get("note", "")})
  283. if args.json:
  284. data: dict[str, object] = {"findings": findings}
  285. if not args.findings_only:
  286. data["components_scanned"] = len(components)
  287. print(json.dumps({"data": data, "meta": {
  288. "exposed": bool(findings), "findings": len(findings),
  289. "components_scanned": len(components), "ioc_entries": n_entries,
  290. "schema": "axiom.tool.exposure-check.report/v1"}}))
  291. else:
  292. if not args.findings_only:
  293. for c in components:
  294. print(f"{c['ecosystem']}\t{c['name']}\t{c['version']}\t{c['source']}")
  295. for f in findings:
  296. log(f" [EXPOSED] {f['ecosystem']} {f['name']}@{f['version']} "
  297. f"({f['severity']}, {f['ioc_id']}) - {f['source']}")
  298. if findings:
  299. log(f"EXPOSED: {len(findings)} installed package(s) match the IOC catalog. "
  300. f"Treat as incident: isolate, rotate creds, remove the package.")
  301. sys.exit(EXIT_EXPOSED)
  302. log(f"Clean: 0 of {len(components)} scanned components match the catalog.")
  303. sys.exit(EXIT_OK)
  304. if __name__ == "__main__":
  305. main()