# Pre-Flight Probe Gate for Pipeline Efficiency Insert a cheap inspection step before expensive processing in a pipeline. Probe the input (ffprobe, mediainfo, file headers, checksums) to skip work that will be wasted — wrong format, already optimized, below quality threshold, or too large to process safely. Log every decision for an audit trail. Keep a post-processing safety net as backup. Use this when your pipeline processes files in bulk and a significant percentage of inputs don't need the expensive step, or when processing the wrong input would waste time, storage, or GPU cycles. --- ## Prerequisites - A pipeline with at least one expensive processing step (transcoding, inference, embedding, etc.) - A probe tool that can inspect inputs cheaply (< 1 second per file) - Clear criteria for what constitutes a "skip" vs "process" decision --- ## Inputs Prompt the user for all of these before executing: ``` PIPELINE_NAME= # Human-readable name (e.g., "video-transcoder", "pdf-extractor") PROBE_TOOL= # Inspection tool (e.g., "ffprobe", "mediainfo", "file", "pdfinfo") INPUT_DIR= # Where the pipeline reads inputs (e.g., "/opt/pipeline/incoming") OUTPUT_DIR= # Where processed outputs go (e.g., "/opt/pipeline/processed") SKIP_DIR= # Where skipped inputs go (e.g., "/opt/pipeline/skipped") FAIL_DIR= # Where failed inputs go (e.g., "/opt/pipeline/failed") LOG_FILE= # Decision log path (e.g., "/opt/pipeline/logs/probe-gate.log") ``` --- ## Step 1: Define Skip Criteria Enumerate the conditions under which a file should skip the expensive step. Be specific — vague criteria lead to false positives. ### Common probe checks | Check | Probe Command | Skip When | |-------|---------------|-----------| | Video codec | `ffprobe -show_entries stream=codec_name` | Already target codec (e.g., already HEVC) | | Audio bitrate | `ffprobe -show_entries stream=bit_rate` | Below minimum quality threshold | | Resolution | `ffprobe -show_entries stream=width,height` | Below minimum (e.g., < 360p) | | Duration | `ffprobe -show_entries format=duration` | Exceeds safe processing limit | | File size | `stat -c%s` | Zero bytes, or exceeds storage budget | | PDF pages | `pdfinfo file.pdf \| grep Pages` | Too many pages for OCR budget | | Image format | `file --mime-type` | Already target format | | Container format | `ffprobe -show_entries format=format_name` | Unsupported container | | Corruption | `ffprobe -v error` exit code | Non-zero = corrupt file | | Existing output | `test -f $OUTPUT_DIR/$(basename)` | Output already exists (dedup) | ### Gate Write your criteria as a decision table: ``` Criterion 1: → SKIP (reason: "") Criterion 2: → SKIP (reason: "") Criterion 3: not available → SKIP (reason: "probe failed") Default: → PROCESS ``` --- ## Step 2: Write the Probe Gate Function ```bash #!/bin/bash # Pre-flight probe gate for $PIPELINE_NAME # Returns: 0 = process, 1 = skip, 2 = fail (corrupt/unreadable) LOGFILE="$LOG_FILE" probe_gate() { local INPUT="$1" local BASENAME=$(basename "$INPUT") local TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S') # ──── Existence check ──── if [[ ! -f "$INPUT" ]]; then echo "[$TIMESTAMP] FAIL $BASENAME reason=file_not_found" >> "$LOGFILE" return 2 fi # ──── Size check ──── local SIZE=$(stat -c%s "$INPUT" 2>/dev/null) if (( SIZE == 0 )); then echo "[$TIMESTAMP] SKIP $BASENAME reason=zero_bytes size=0" >> "$LOGFILE" return 1 fi # ──── Probe the input ──── # Adapt this section to your probe tool and criteria local PROBE_OUTPUT PROBE_OUTPUT=$($PROBE_TOOL "$INPUT" 2>/dev/null) local PROBE_EXIT=$? if (( PROBE_EXIT != 0 )); then echo "[$TIMESTAMP] FAIL $BASENAME reason=probe_failed exit=$PROBE_EXIT" >> "$LOGFILE" return 2 fi # ──── Apply skip criteria ──── # Example: check if already target codec local CODEC=$(echo "$PROBE_OUTPUT" | grep codec_name | head -1 | cut -d= -f2) if [[ "$CODEC" == "hevc" ]]; then echo "[$TIMESTAMP] SKIP $BASENAME reason=already_hevc codec=$CODEC" >> "$LOGFILE" return 1 fi # Example: check if below minimum resolution local HEIGHT=$(echo "$PROBE_OUTPUT" | grep '^height=' | head -1 | cut -d= -f2) if (( HEIGHT < 240 )); then echo "[$TIMESTAMP] SKIP $BASENAME reason=below_min_resolution height=$HEIGHT" >> "$LOGFILE" return 1 fi # ──── Passed all checks ──── echo "[$TIMESTAMP] PASS $BASENAME codec=$CODEC height=${HEIGHT} size=$SIZE" >> "$LOGFILE" return 0 } ``` ### Key design decisions - **Return codes**: 0 = process (matches shell "success" convention), 1 = skip, 2 = fail. Callers use `$?` to branch. - **Structured log lines**: Every decision logged with timestamp, verdict, filename, and reason. Parseable by grep/awk for reporting. - **Probe errors = FAIL, not SKIP**: If the probe itself fails, the file might be corrupt — route to fail directory for manual inspection rather than silently skipping. --- ## Step 3: Integrate into the Pipeline ### Option A: Inline in processing loop ```bash for INPUT in "$INPUT_DIR"/*; do probe_gate "$INPUT" case $? in 0) process_file "$INPUT" # Expensive step mv "$INPUT" "$OUTPUT_DIR/" ;; 1) mv "$INPUT" "$SKIP_DIR/" # Skipped — preserve for audit ;; 2) mv "$INPUT" "$FAIL_DIR/" # Failed probe — needs investigation ;; esac done ``` ### Option B: As a pre-filter in a wrapper script If the expensive step is a binary called by a service (see `binary-wrapper-interception.md`), add the probe gate to the wrapper: ```bash # In the wrapper script, before exec: probe_gate "$INPUT_FILE" GATE_RESULT=$? if (( GATE_RESULT == 1 )); then echo "[WRAPPER] $(date) SKIPPED: $INPUT_FILE" >> "$LOGFILE" exit 0 # Success — nothing to do fi if (( GATE_RESULT == 2 )); then echo "[WRAPPER] $(date) PROBE FAILED: $INPUT_FILE" >> "$LOGFILE" exit 1 # Error — caller should retry or alert fi # Gate passed — proceed with expensive processing exec $REAL_BINARY "$@" ``` ### Option C: In a Python pipeline script ```python import subprocess, shutil, os def probe_gate(input_path: str) -> tuple[str, dict]: """Returns (verdict, metadata) where verdict is 'process', 'skip', or 'fail'.""" if not os.path.exists(input_path): return 'fail', {'reason': 'file_not_found'} size = os.path.getsize(input_path) if size == 0: return 'skip', {'reason': 'zero_bytes', 'size': 0} result = subprocess.run( ['ffprobe', '-v', 'quiet', '-show_entries', 'stream=codec_name,height', '-of', 'flat', input_path], capture_output=True, text=True ) if result.returncode != 0: return 'fail', {'reason': 'probe_failed', 'exit': result.returncode} # Parse and apply criteria... return 'process', {'codec': codec, 'height': height, 'size': size} ``` --- ## Step 4: Add Post-Processing Safety Net The probe gate is the primary filter, but add a post-processing check as backup. This catches cases where the probe was wrong (e.g., file reported as H.264 but was actually corrupt). ```bash post_process_check() { local OUTPUT="$1" local TIMESTAMP=$(date '+%Y-%m-%d %H:%M:%S') # Size gate: output should be at least 10% of input size local INPUT_SIZE=$2 local OUTPUT_SIZE=$(stat -c%s "$OUTPUT" 2>/dev/null) if (( OUTPUT_SIZE < INPUT_SIZE / 10 )); then echo "[$TIMESTAMP] POST-FAIL $OUTPUT reason=output_too_small input=${INPUT_SIZE} output=${OUTPUT_SIZE}" >> "$LOGFILE" return 1 fi # Integrity check: verify output is valid $PROBE_TOOL -v error "$OUTPUT" 2>/dev/null if (( $? != 0 )); then echo "[$TIMESTAMP] POST-FAIL $OUTPUT reason=output_corrupt" >> "$LOGFILE" return 1 fi echo "[$TIMESTAMP] POST-PASS $OUTPUT size=$OUTPUT_SIZE" >> "$LOGFILE" return 0 } ``` --- ## Step 5: Reporting Use the structured log to generate reports: ```bash # Decision breakdown echo "=== Probe Gate Report ===" echo "Processed: $(grep -c ' PASS ' $LOG_FILE)" echo "Skipped: $(grep -c ' SKIP ' $LOG_FILE)" echo "Failed: $(grep -c ' FAIL ' $LOG_FILE)" echo "" # Top skip reasons echo "Skip reasons:" grep ' SKIP ' $LOG_FILE | grep -oP 'reason=\S+' | sort | uniq -c | sort -rn # Failed files needing attention echo "" echo "Failed files:" grep ' FAIL ' $LOG_FILE | tail -10 ``` --- ## Troubleshooting ### Probe is slow (> 1 second per file) Some probe tools read more of the file than necessary. For ffprobe, use `-analyzeduration 1000000 -probesize 1000000` to limit how much of the file it reads. For large PDFs, `pdfinfo` is faster than opening the file in Python. ### Probe reports wrong codec/format Some files have mismatched container and stream codecs. Probe the stream level, not the container: ```bash ffprobe -v quiet -select_streams v:0 -show_entries stream=codec_name -of csv=p=0 "$INPUT" ``` ### Skipped files that should have been processed Review the skip log. Lower the threshold or add exceptions for edge cases. The skip directory preserves files for re-processing if criteria change. ### Post-processing catches failures the probe missed This is the safety net working as intended. Investigate why the probe didn't catch it — the input may have unusual characteristics. Add a new probe criterion if the pattern is common. --- ## Usage Examples ### PeerTube H.265 transcoding pipeline (cortex) ``` PIPELINE_NAME=video-transcoder PROBE_TOOL=ffprobe INPUT_DIR=/opt/bulk-import/completed OUTPUT_DIR=/opt/bulk-import/transcoded SKIP_DIR=/opt/bulk-import/skipped FAIL_DIR=/opt/bulk-import/failed Probe criteria: - codec_name == "hevc" → SKIP (already H.265) - height < 240 → SKIP (too low quality to bother) - duration == 0 → FAIL (corrupt or audio-only) - probe exit != 0 → FAIL (unreadable) Post-processing safety net: - output size < 10% of input → FAIL (transcode produced garbage) - ffprobe on output fails → FAIL (corrupt output) Result: Saved ~15% of GPU cycles by skipping already-optimized files. ``` ### PDF extraction pipeline (RECON on CT 130) ``` PIPELINE_NAME=pdf-extractor PROBE_TOOL=pdfinfo INPUT_DIR=/mnt/library/incoming OUTPUT_DIR=/opt/recon/extracted Probe criteria: - Pages > 500 → route to Gemini Vision (OCR too slow) - Pages == 0 → FAIL (corrupt PDF) - File size < 1KB → SKIP (empty/placeholder) - Encrypted: yes → SKIP (can't extract without password) - Already in SQLite status table → SKIP (dedup) ``` --- *Last updated: 2026-02-17*