probe-media.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. #!/usr/bin/env python3
  2. """Normalized media inspection via ffprobe — the probe-first doctrine's tool.
  3. Wraps ffprobe's verbose, build-varying JSON into one stable, compact envelope:
  4. container, duration, per-stream codec/dimensions/fps/pix_fmt/color/rotation,
  5. and (on request) the keyframes nearest a timestamp so the agent can decide
  6. whether a stream-copy cut is safe.
  7. --doctor turns the probe into triage: each detected processing hazard (VFR,
  8. HDR transfer, rotation metadata, interlacing, non-yuv420p delivery, moov at
  9. EOF) is reported WITH the exact fix command, and the exit code becomes a
  10. branchable signal.
  11. Usage: probe-media.py [--json] [--keyframes-near SECONDS] [--doctor] <file>
  12. Input: one media file path as positional
  13. Output: stdout = human summary, or envelope {"data":...,"meta":...} with --json
  14. (schema claude-mods.ffmpeg-ops.probe/v1)
  15. Stderr: warnings, errors
  16. Exit: 0 ok, 2 usage, 3 file not found, 4 not parseable media,
  17. 5 ffprobe missing, 10 --doctor found at least one issue
  18. Examples:
  19. probe-media.py input.mp4
  20. probe-media.py --json input.mp4 | jq '.data.video.fps'
  21. probe-media.py --keyframes-near 92.5 input.mp4
  22. probe-media.py --doctor input.mp4 || echo "fix before processing"
  23. probe-media.py --doctor --json input.mp4 | jq -r '.data.doctor.findings[].fix'
  24. """
  25. import argparse
  26. import json
  27. import shutil
  28. import subprocess
  29. import sys
  30. from fractions import Fraction
  31. from pathlib import Path
  32. from typing import NoReturn
  33. SCHEMA = "claude-mods.ffmpeg-ops.probe/v1"
  34. EXIT_OK, EXIT_USAGE, EXIT_NOT_FOUND, EXIT_VALIDATION, EXIT_MISSING_DEP = 0, 2, 3, 4, 5
  35. EXIT_FINDINGS = 10
  36. def err(args_json: bool, code: str, message: str, exit_code: int) -> NoReturn:
  37. if args_json:
  38. print(json.dumps({"error": {"code": code, "message": message, "details": {}}}))
  39. print(f"ERROR: {message}", file=sys.stderr)
  40. sys.exit(exit_code)
  41. def parse_rate(rate: str) -> float:
  42. """ffprobe rates arrive as '30000/1001' or '25/1'; '0/0' means unknown."""
  43. try:
  44. f = Fraction(rate)
  45. return round(float(f), 3) if f else 0.0
  46. except (ValueError, ZeroDivisionError):
  47. return 0.0
  48. def stream_rotation(stream: dict) -> int:
  49. # Modern ffprobe: displaymatrix side data; legacy: tags.rotate.
  50. for sd in stream.get("side_data_list", []) or []:
  51. if "rotation" in sd:
  52. try:
  53. return int(sd["rotation"]) % 360
  54. except (TypeError, ValueError):
  55. pass
  56. try:
  57. return int(stream.get("tags", {}).get("rotate", 0)) % 360
  58. except (TypeError, ValueError):
  59. return 0
  60. def normalize(raw: dict, path: Path) -> dict:
  61. fmt = raw.get("format", {})
  62. out = {
  63. "file": str(path),
  64. "container": fmt.get("format_name", ""),
  65. "duration_s": round(float(fmt.get("duration", 0) or 0), 3),
  66. "size_bytes": int(fmt.get("size", 0) or 0),
  67. "bitrate_bps": int(fmt.get("bit_rate", 0) or 0),
  68. "stream_count": int(fmt.get("nb_streams", 0) or 0),
  69. "video": None,
  70. "audio": [],
  71. "subtitles": [],
  72. "streams": [],
  73. }
  74. for s in raw.get("streams", []):
  75. kind = s.get("codec_type", "unknown")
  76. entry = {
  77. "index": s.get("index"),
  78. "type": kind,
  79. "codec": s.get("codec_name", ""),
  80. "profile": s.get("profile", ""),
  81. "language": (s.get("tags", {}) or {}).get("language", ""),
  82. "default": bool((s.get("disposition", {}) or {}).get("default", 0)),
  83. }
  84. if kind == "video":
  85. avg = parse_rate(s.get("avg_frame_rate", "0/0"))
  86. real = parse_rate(s.get("r_frame_rate", "0/0"))
  87. entry.update({
  88. "width": s.get("width", 0),
  89. "height": s.get("height", 0),
  90. "fps": avg or real,
  91. # avg != r is the cheap variable-frame-rate tell.
  92. "vfr_suspect": bool(avg and real and abs(avg - real) > 0.01),
  93. "pix_fmt": s.get("pix_fmt", ""),
  94. "field_order": s.get("field_order", ""),
  95. "color_space": s.get("color_space", ""),
  96. "color_transfer": s.get("color_transfer", ""),
  97. "color_primaries": s.get("color_primaries", ""),
  98. "rotation_deg": stream_rotation(s),
  99. "bitrate_bps": int(s.get("bit_rate", 0) or 0),
  100. })
  101. if out["video"] is None and not s.get("disposition", {}).get("attached_pic"):
  102. out["video"] = entry
  103. elif kind == "audio":
  104. entry.update({
  105. "sample_rate": int(s.get("sample_rate", 0) or 0),
  106. "channels": s.get("channels", 0),
  107. "channel_layout": s.get("channel_layout", ""),
  108. "bitrate_bps": int(s.get("bit_rate", 0) or 0),
  109. })
  110. out["audio"].append(entry)
  111. elif kind == "subtitle":
  112. out["subtitles"].append(entry)
  113. out["streams"].append(entry)
  114. return out
  115. def moov_after_mdat(path: Path) -> bool:
  116. """Walk top-level MP4/MOV atoms: True if moov sits after mdat (no faststart)."""
  117. try:
  118. with path.open("rb") as f:
  119. pos, size = 0, path.stat().st_size
  120. seen_mdat = False
  121. while pos + 8 <= size:
  122. f.seek(pos)
  123. header = f.read(16)
  124. if len(header) < 8:
  125. break
  126. box_len = int.from_bytes(header[0:4], "big")
  127. box_type = header[4:8]
  128. if box_len == 1 and len(header) >= 16: # 64-bit largesize
  129. box_len = int.from_bytes(header[8:16], "big")
  130. elif box_len == 0: # box runs to EOF
  131. box_len = size - pos
  132. if box_len < 8:
  133. break
  134. if box_type == b"mdat":
  135. seen_mdat = True
  136. elif box_type == b"moov":
  137. return seen_mdat
  138. pos += box_len
  139. except OSError:
  140. pass
  141. return False
  142. def doctor(data: dict, path: Path) -> list:
  143. """Triage: each finding pairs the hazard with the exact fix command."""
  144. findings = []
  145. q = f'"{path}"'
  146. v = data["video"]
  147. def add(severity: str, issue: str, why: str, fix: str) -> None:
  148. findings.append({"severity": severity, "issue": issue, "why": why, "fix": fix})
  149. if v:
  150. if v["vfr_suspect"]:
  151. add("warn", "variable frame rate (VFR) suspected",
  152. "cut math drifts, concat desyncs, players/editors stutter",
  153. f"ffmpeg -i {q} -c:v libx264 -crf 18 -preset fast -pix_fmt yuv420p "
  154. f"-fps_mode cfr -r {round(v['fps']) or 30} -c:a aac -b:a 192k normalized.mp4")
  155. if v["color_transfer"] in ("smpte2084", "arib-std-b67"):
  156. kind = "PQ/HDR10" if v["color_transfer"] == "smpte2084" else "HLG"
  157. add("warn", f"HDR transfer ({kind})",
  158. "re-encoding without tonemapping produces grey, washed-out SDR",
  159. f"ffmpeg -i {q} -vf \"zscale=t=linear:npl=100,format=gbrpf32le,"
  160. f"zscale=p=bt709,tonemap=tonemap=hable:desat=0,"
  161. f"zscale=t=bt709:m=bt709:r=tv,format=yuv420p\" "
  162. f"-c:v libx264 -crf 20 -c:a copy sdr.mp4")
  163. if v["rotation_deg"]:
  164. add("warn", f"rotation metadata ({v['rotation_deg']} deg)",
  165. "filters/thumbnails operate on unrotated pixels; some pipelines drop the flag",
  166. f"ffmpeg -display_rotation 0 -i {q} -c copy upright.mp4 "
  167. f"# or bake: -vf transpose + re-encode")
  168. if v["field_order"] not in ("", "progressive", "unknown"):
  169. add("warn", f"interlaced (field_order={v['field_order']})",
  170. "combing artifacts on motion after any scale/re-encode",
  171. f"ffmpeg -i {q} -vf bwdif=mode=send_field -c:v libx264 -crf 19 "
  172. f"-c:a copy deinterlaced.mp4")
  173. # H.264 delivery must be 8-bit 4:2:0; HEVC Main10 (yuv420p10le) is a
  174. # legitimate delivery profile (and mandatory for HDR10) — don't flag it.
  175. ok_pix = ("", "yuv420p") if v["codec"] == "h264" else \
  176. ("", "yuv420p", "yuv420p10le")
  177. if v["codec"] in ("h264", "hevc") and v["pix_fmt"] not in ok_pix:
  178. add("warn", f"pix_fmt {v['pix_fmt']} on a delivery codec",
  179. "Safari/QuickTime/TVs show black or refuse playback on >4:2:0",
  180. f"ffmpeg -i {q} -c:v libx264 -crf 18 -pix_fmt yuv420p -c:a copy "
  181. f"-movflags +faststart compatible.mp4")
  182. elif data["audio"]:
  183. add("info", "no video stream (audio-only)",
  184. "video operations will fail; audio/STT workflows are fine", "")
  185. if "mp4" in data["container"] or "mov" in data["container"]:
  186. if moov_after_mdat(path):
  187. add("warn", "moov atom after mdat (no faststart)",
  188. "browsers must download the whole file before playback starts",
  189. f"ffmpeg -i {q} -c copy -movflags +faststart faststart.mp4")
  190. if data["duration_s"] <= 0:
  191. add("warn", "container reports no duration",
  192. "truncated/still-recording file, or a stream needing -fflags +genpts",
  193. f"ffmpeg -v error -i {q} -f null - # decode check; then remux -c copy")
  194. return findings
  195. def keyframes_near(ffprobe: str, path: Path, ts: float, window: float = 30.0) -> dict:
  196. start = max(0.0, ts - window)
  197. proc = subprocess.run(
  198. [ffprobe, "-v", "error", "-select_streams", "v:0",
  199. "-show_entries", "packet=pts_time,flags", "-of", "csv=p=0",
  200. "-read_intervals", f"{start}%{ts + window}", str(path)],
  201. capture_output=True, text=True)
  202. keys = []
  203. for line in proc.stdout.splitlines():
  204. parts = line.strip().split(",")
  205. if len(parts) >= 2 and "K" in parts[1]:
  206. try:
  207. keys.append(float(parts[0]))
  208. except ValueError:
  209. continue
  210. keys.sort()
  211. prev = max((k for k in keys if k <= ts), default=None)
  212. nxt = min((k for k in keys if k > ts), default=None)
  213. return {
  214. "target_s": ts,
  215. "prev_keyframe_s": prev,
  216. "next_keyframe_s": nxt,
  217. "copy_cut_drift_s": round(ts - prev, 3) if prev is not None else None,
  218. "window_scanned_s": [round(start, 3), round(ts + window, 3)],
  219. }
  220. def main() -> int:
  221. ap = argparse.ArgumentParser(
  222. description="Normalized media inspection via ffprobe.",
  223. epilog="Examples:\n"
  224. " probe-media.py input.mp4\n"
  225. " probe-media.py --json input.mp4 | jq '.data.video.fps'\n"
  226. " probe-media.py --keyframes-near 92.5 input.mp4\n",
  227. formatter_class=argparse.RawDescriptionHelpFormatter)
  228. ap.add_argument("file", help="media file to probe")
  229. ap.add_argument("--json", action="store_true", help="emit JSON envelope on stdout")
  230. ap.add_argument("--keyframes-near", type=float, metavar="SECONDS", default=None,
  231. help="also report nearest keyframes to this timestamp")
  232. ap.add_argument("--doctor", action="store_true",
  233. help="triage mode: report processing hazards with exact fix "
  234. "commands; exit 10 if any found")
  235. args = ap.parse_args()
  236. ffprobe = shutil.which("ffprobe")
  237. if not ffprobe:
  238. err(args.json, "MISSING_DEPENDENCY",
  239. "ffprobe not found on PATH (install ffmpeg)", EXIT_MISSING_DEP)
  240. path = Path(args.file)
  241. if not path.is_file():
  242. err(args.json, "NOT_FOUND", f"file not found: {path}", EXIT_NOT_FOUND)
  243. proc = subprocess.run(
  244. [ffprobe, "-v", "error", "-print_format", "json",
  245. "-show_format", "-show_streams", str(path)],
  246. capture_output=True, text=True)
  247. if proc.returncode != 0 or not proc.stdout.strip():
  248. err(args.json, "VALIDATION",
  249. f"ffprobe could not parse '{path.name}' as media: "
  250. f"{proc.stderr.strip().splitlines()[-1] if proc.stderr.strip() else 'no detail'}",
  251. EXIT_VALIDATION)
  252. data = normalize(json.loads(proc.stdout), path)
  253. if args.keyframes_near is not None:
  254. if data["video"] is None:
  255. err(args.json, "VALIDATION", "no video stream; --keyframes-near needs one",
  256. EXIT_VALIDATION)
  257. data["keyframes"] = keyframes_near(ffprobe, path, args.keyframes_near)
  258. findings = []
  259. if args.doctor:
  260. findings = doctor(data, path)
  261. has_warn = any(f["severity"] != "info" for f in findings)
  262. data["doctor"] = {"findings": findings, "clean": not has_warn}
  263. if args.json:
  264. print(json.dumps({"data": data, "meta": {"schema": SCHEMA}}, indent=2))
  265. if args.doctor and not data["doctor"]["clean"]:
  266. return EXIT_FINDINGS
  267. return EXIT_OK
  268. # Human summary (stdout is still the data product — keep it grep-friendly).
  269. v = data["video"]
  270. print(f"file {data['file']}")
  271. print(f"container {data['container']} "
  272. f"{data['duration_s']}s {data['size_bytes']} bytes "
  273. f"{data['bitrate_bps'] // 1000} kb/s {data['stream_count']} streams")
  274. if v:
  275. vfr = " VFR-SUSPECT" if v["vfr_suspect"] else ""
  276. rot = f" rotation={v['rotation_deg']}" if v["rotation_deg"] else ""
  277. print(f"video {v['codec']} {v['width']}x{v['height']} "
  278. f"{v['fps']}fps {v['pix_fmt']}{rot}{vfr}")
  279. if v["color_space"] or v["color_transfer"]:
  280. print(f"color space={v['color_space'] or '?'} "
  281. f"transfer={v['color_transfer'] or '?'} "
  282. f"primaries={v['color_primaries'] or '?'}")
  283. for a in data["audio"]:
  284. print(f"audio #{a['index']} {a['codec']} {a['sample_rate']}Hz "
  285. f"{a['channels']}ch {a['channel_layout']} lang={a['language'] or '-'}")
  286. for s in data["subtitles"]:
  287. print(f"subs #{s['index']} {s['codec']} lang={s['language'] or '-'}")
  288. if "keyframes" in data:
  289. k = data["keyframes"]
  290. print(f"keyframes target={k['target_s']}s "
  291. f"prev={k['prev_keyframe_s']}s next={k['next_keyframe_s']}s "
  292. f"copy-cut-drift={k['copy_cut_drift_s']}s")
  293. if args.doctor:
  294. if not findings:
  295. print("doctor clean — no processing hazards detected")
  296. for f in findings:
  297. print(f"doctor [{f['severity']}] {f['issue']} — {f['why']}")
  298. if f["fix"]:
  299. print(f" fix: {f['fix']}")
  300. if not data["doctor"]["clean"]:
  301. return EXIT_FINDINGS
  302. return EXIT_OK
  303. if __name__ == "__main__":
  304. sys.exit(main())