| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- #!/usr/bin/env python3
- """Normalized media inspection via ffprobe — the probe-first doctrine's tool.
- Wraps ffprobe's verbose, build-varying JSON into one stable, compact envelope:
- container, duration, per-stream codec/dimensions/fps/pix_fmt/color/rotation,
- and (on request) the keyframes nearest a timestamp so the agent can decide
- whether a stream-copy cut is safe.
- --doctor turns the probe into triage: each detected processing hazard (VFR,
- HDR transfer, rotation metadata, interlacing, non-yuv420p delivery, moov at
- EOF) is reported WITH the exact fix command, and the exit code becomes a
- branchable signal.
- Usage: probe-media.py [--json] [--keyframes-near SECONDS] [--doctor] <file>
- Input: one media file path as positional
- Output: stdout = human summary, or envelope {"data":...,"meta":...} with --json
- (schema claude-mods.ffmpeg-ops.probe/v1)
- Stderr: warnings, errors
- Exit: 0 ok, 2 usage, 3 file not found, 4 not parseable media,
- 5 ffprobe missing, 10 --doctor found at least one issue
- Examples:
- probe-media.py input.mp4
- probe-media.py --json input.mp4 | jq '.data.video.fps'
- probe-media.py --keyframes-near 92.5 input.mp4
- probe-media.py --doctor input.mp4 || echo "fix before processing"
- probe-media.py --doctor --json input.mp4 | jq -r '.data.doctor.findings[].fix'
- """
- import argparse
- import json
- import shutil
- import subprocess
- import sys
- from fractions import Fraction
- from pathlib import Path
- from typing import NoReturn
- SCHEMA = "claude-mods.ffmpeg-ops.probe/v1"
- EXIT_OK, EXIT_USAGE, EXIT_NOT_FOUND, EXIT_VALIDATION, EXIT_MISSING_DEP = 0, 2, 3, 4, 5
- EXIT_FINDINGS = 10
- def err(args_json: bool, code: str, message: str, exit_code: int) -> NoReturn:
- if args_json:
- print(json.dumps({"error": {"code": code, "message": message, "details": {}}}))
- print(f"ERROR: {message}", file=sys.stderr)
- sys.exit(exit_code)
- def parse_rate(rate: str) -> float:
- """ffprobe rates arrive as '30000/1001' or '25/1'; '0/0' means unknown."""
- try:
- f = Fraction(rate)
- return round(float(f), 3) if f else 0.0
- except (ValueError, ZeroDivisionError):
- return 0.0
- def stream_rotation(stream: dict) -> int:
- # Modern ffprobe: displaymatrix side data; legacy: tags.rotate.
- for sd in stream.get("side_data_list", []) or []:
- if "rotation" in sd:
- try:
- return int(sd["rotation"]) % 360
- except (TypeError, ValueError):
- pass
- try:
- return int(stream.get("tags", {}).get("rotate", 0)) % 360
- except (TypeError, ValueError):
- return 0
- def normalize(raw: dict, path: Path) -> dict:
- fmt = raw.get("format", {})
- out = {
- "file": str(path),
- "container": fmt.get("format_name", ""),
- "duration_s": round(float(fmt.get("duration", 0) or 0), 3),
- "size_bytes": int(fmt.get("size", 0) or 0),
- "bitrate_bps": int(fmt.get("bit_rate", 0) or 0),
- "stream_count": int(fmt.get("nb_streams", 0) or 0),
- "video": None,
- "audio": [],
- "subtitles": [],
- "streams": [],
- }
- for s in raw.get("streams", []):
- kind = s.get("codec_type", "unknown")
- entry = {
- "index": s.get("index"),
- "type": kind,
- "codec": s.get("codec_name", ""),
- "profile": s.get("profile", ""),
- "language": (s.get("tags", {}) or {}).get("language", ""),
- "default": bool((s.get("disposition", {}) or {}).get("default", 0)),
- }
- if kind == "video":
- avg = parse_rate(s.get("avg_frame_rate", "0/0"))
- real = parse_rate(s.get("r_frame_rate", "0/0"))
- entry.update({
- "width": s.get("width", 0),
- "height": s.get("height", 0),
- "fps": avg or real,
- # avg != r is the cheap variable-frame-rate tell.
- "vfr_suspect": bool(avg and real and abs(avg - real) > 0.01),
- "pix_fmt": s.get("pix_fmt", ""),
- "field_order": s.get("field_order", ""),
- "color_space": s.get("color_space", ""),
- "color_transfer": s.get("color_transfer", ""),
- "color_primaries": s.get("color_primaries", ""),
- "rotation_deg": stream_rotation(s),
- "bitrate_bps": int(s.get("bit_rate", 0) or 0),
- })
- if out["video"] is None and not s.get("disposition", {}).get("attached_pic"):
- out["video"] = entry
- elif kind == "audio":
- entry.update({
- "sample_rate": int(s.get("sample_rate", 0) or 0),
- "channels": s.get("channels", 0),
- "channel_layout": s.get("channel_layout", ""),
- "bitrate_bps": int(s.get("bit_rate", 0) or 0),
- })
- out["audio"].append(entry)
- elif kind == "subtitle":
- out["subtitles"].append(entry)
- out["streams"].append(entry)
- return out
- def moov_after_mdat(path: Path) -> bool:
- """Walk top-level MP4/MOV atoms: True if moov sits after mdat (no faststart)."""
- try:
- with path.open("rb") as f:
- pos, size = 0, path.stat().st_size
- seen_mdat = False
- while pos + 8 <= size:
- f.seek(pos)
- header = f.read(16)
- if len(header) < 8:
- break
- box_len = int.from_bytes(header[0:4], "big")
- box_type = header[4:8]
- if box_len == 1 and len(header) >= 16: # 64-bit largesize
- box_len = int.from_bytes(header[8:16], "big")
- elif box_len == 0: # box runs to EOF
- box_len = size - pos
- if box_len < 8:
- break
- if box_type == b"mdat":
- seen_mdat = True
- elif box_type == b"moov":
- return seen_mdat
- pos += box_len
- except OSError:
- pass
- return False
- def doctor(data: dict, path: Path) -> list:
- """Triage: each finding pairs the hazard with the exact fix command."""
- findings = []
- q = f'"{path}"'
- v = data["video"]
- def add(severity: str, issue: str, why: str, fix: str) -> None:
- findings.append({"severity": severity, "issue": issue, "why": why, "fix": fix})
- if v:
- if v["vfr_suspect"]:
- add("warn", "variable frame rate (VFR) suspected",
- "cut math drifts, concat desyncs, players/editors stutter",
- f"ffmpeg -i {q} -c:v libx264 -crf 18 -preset fast -pix_fmt yuv420p "
- f"-fps_mode cfr -r {round(v['fps']) or 30} -c:a aac -b:a 192k normalized.mp4")
- if v["color_transfer"] in ("smpte2084", "arib-std-b67"):
- kind = "PQ/HDR10" if v["color_transfer"] == "smpte2084" else "HLG"
- add("warn", f"HDR transfer ({kind})",
- "re-encoding without tonemapping produces grey, washed-out SDR",
- f"ffmpeg -i {q} -vf \"zscale=t=linear:npl=100,format=gbrpf32le,"
- f"zscale=p=bt709,tonemap=tonemap=hable:desat=0,"
- f"zscale=t=bt709:m=bt709:r=tv,format=yuv420p\" "
- f"-c:v libx264 -crf 20 -c:a copy sdr.mp4")
- if v["rotation_deg"]:
- add("warn", f"rotation metadata ({v['rotation_deg']} deg)",
- "filters/thumbnails operate on unrotated pixels; some pipelines drop the flag",
- f"ffmpeg -display_rotation 0 -i {q} -c copy upright.mp4 "
- f"# or bake: -vf transpose + re-encode")
- if v["field_order"] not in ("", "progressive", "unknown"):
- add("warn", f"interlaced (field_order={v['field_order']})",
- "combing artifacts on motion after any scale/re-encode",
- f"ffmpeg -i {q} -vf bwdif=mode=send_field -c:v libx264 -crf 19 "
- f"-c:a copy deinterlaced.mp4")
- # H.264 delivery must be 8-bit 4:2:0; HEVC Main10 (yuv420p10le) is a
- # legitimate delivery profile (and mandatory for HDR10) — don't flag it.
- ok_pix = ("", "yuv420p") if v["codec"] == "h264" else \
- ("", "yuv420p", "yuv420p10le")
- if v["codec"] in ("h264", "hevc") and v["pix_fmt"] not in ok_pix:
- add("warn", f"pix_fmt {v['pix_fmt']} on a delivery codec",
- "Safari/QuickTime/TVs show black or refuse playback on >4:2:0",
- f"ffmpeg -i {q} -c:v libx264 -crf 18 -pix_fmt yuv420p -c:a copy "
- f"-movflags +faststart compatible.mp4")
- elif data["audio"]:
- add("info", "no video stream (audio-only)",
- "video operations will fail; audio/STT workflows are fine", "")
- if "mp4" in data["container"] or "mov" in data["container"]:
- if moov_after_mdat(path):
- add("warn", "moov atom after mdat (no faststart)",
- "browsers must download the whole file before playback starts",
- f"ffmpeg -i {q} -c copy -movflags +faststart faststart.mp4")
- if data["duration_s"] <= 0:
- add("warn", "container reports no duration",
- "truncated/still-recording file, or a stream needing -fflags +genpts",
- f"ffmpeg -v error -i {q} -f null - # decode check; then remux -c copy")
- return findings
- def keyframes_near(ffprobe: str, path: Path, ts: float, window: float = 30.0) -> dict:
- start = max(0.0, ts - window)
- proc = subprocess.run(
- [ffprobe, "-v", "error", "-select_streams", "v:0",
- "-show_entries", "packet=pts_time,flags", "-of", "csv=p=0",
- "-read_intervals", f"{start}%{ts + window}", str(path)],
- capture_output=True, text=True)
- keys = []
- for line in proc.stdout.splitlines():
- parts = line.strip().split(",")
- if len(parts) >= 2 and "K" in parts[1]:
- try:
- keys.append(float(parts[0]))
- except ValueError:
- continue
- keys.sort()
- prev = max((k for k in keys if k <= ts), default=None)
- nxt = min((k for k in keys if k > ts), default=None)
- return {
- "target_s": ts,
- "prev_keyframe_s": prev,
- "next_keyframe_s": nxt,
- "copy_cut_drift_s": round(ts - prev, 3) if prev is not None else None,
- "window_scanned_s": [round(start, 3), round(ts + window, 3)],
- }
- def main() -> int:
- ap = argparse.ArgumentParser(
- description="Normalized media inspection via ffprobe.",
- epilog="Examples:\n"
- " probe-media.py input.mp4\n"
- " probe-media.py --json input.mp4 | jq '.data.video.fps'\n"
- " probe-media.py --keyframes-near 92.5 input.mp4\n",
- formatter_class=argparse.RawDescriptionHelpFormatter)
- ap.add_argument("file", help="media file to probe")
- ap.add_argument("--json", action="store_true", help="emit JSON envelope on stdout")
- ap.add_argument("--keyframes-near", type=float, metavar="SECONDS", default=None,
- help="also report nearest keyframes to this timestamp")
- ap.add_argument("--doctor", action="store_true",
- help="triage mode: report processing hazards with exact fix "
- "commands; exit 10 if any found")
- args = ap.parse_args()
- ffprobe = shutil.which("ffprobe")
- if not ffprobe:
- err(args.json, "MISSING_DEPENDENCY",
- "ffprobe not found on PATH (install ffmpeg)", EXIT_MISSING_DEP)
- path = Path(args.file)
- if not path.is_file():
- err(args.json, "NOT_FOUND", f"file not found: {path}", EXIT_NOT_FOUND)
- proc = subprocess.run(
- [ffprobe, "-v", "error", "-print_format", "json",
- "-show_format", "-show_streams", str(path)],
- capture_output=True, text=True)
- if proc.returncode != 0 or not proc.stdout.strip():
- err(args.json, "VALIDATION",
- f"ffprobe could not parse '{path.name}' as media: "
- f"{proc.stderr.strip().splitlines()[-1] if proc.stderr.strip() else 'no detail'}",
- EXIT_VALIDATION)
- data = normalize(json.loads(proc.stdout), path)
- if args.keyframes_near is not None:
- if data["video"] is None:
- err(args.json, "VALIDATION", "no video stream; --keyframes-near needs one",
- EXIT_VALIDATION)
- data["keyframes"] = keyframes_near(ffprobe, path, args.keyframes_near)
- findings = []
- if args.doctor:
- findings = doctor(data, path)
- has_warn = any(f["severity"] != "info" for f in findings)
- data["doctor"] = {"findings": findings, "clean": not has_warn}
- if args.json:
- print(json.dumps({"data": data, "meta": {"schema": SCHEMA}}, indent=2))
- if args.doctor and not data["doctor"]["clean"]:
- return EXIT_FINDINGS
- return EXIT_OK
- # Human summary (stdout is still the data product — keep it grep-friendly).
- v = data["video"]
- print(f"file {data['file']}")
- print(f"container {data['container']} "
- f"{data['duration_s']}s {data['size_bytes']} bytes "
- f"{data['bitrate_bps'] // 1000} kb/s {data['stream_count']} streams")
- if v:
- vfr = " VFR-SUSPECT" if v["vfr_suspect"] else ""
- rot = f" rotation={v['rotation_deg']}" if v["rotation_deg"] else ""
- print(f"video {v['codec']} {v['width']}x{v['height']} "
- f"{v['fps']}fps {v['pix_fmt']}{rot}{vfr}")
- if v["color_space"] or v["color_transfer"]:
- print(f"color space={v['color_space'] or '?'} "
- f"transfer={v['color_transfer'] or '?'} "
- f"primaries={v['color_primaries'] or '?'}")
- for a in data["audio"]:
- print(f"audio #{a['index']} {a['codec']} {a['sample_rate']}Hz "
- f"{a['channels']}ch {a['channel_layout']} lang={a['language'] or '-'}")
- for s in data["subtitles"]:
- print(f"subs #{s['index']} {s['codec']} lang={s['language'] or '-'}")
- if "keyframes" in data:
- k = data["keyframes"]
- print(f"keyframes target={k['target_s']}s "
- f"prev={k['prev_keyframe_s']}s next={k['next_keyframe_s']}s "
- f"copy-cut-drift={k['copy_cut_drift_s']}s")
- if args.doctor:
- if not findings:
- print("doctor clean — no processing hazards detected")
- for f in findings:
- print(f"doctor [{f['severity']}] {f['issue']} — {f['why']}")
- if f["fix"]:
- print(f" fix: {f['fix']}")
- if not data["doctor"]["clean"]:
- return EXIT_FINDINGS
- return EXIT_OK
- if __name__ == "__main__":
- sys.exit(main())
|