Compare commits

..

21 commits

Author SHA1 Message Date
c1ba1f8dc7 Merge feature/offroute: effort-based wilderness routing, PostGIS entry points, BLM trail filtering 2026-05-09 15:45:27 +00:00
a04c10ad55 offroute: wilderness maneuvers with bearing, elevation, grade
- Segment breaks on: bearing change >30°, grade category change, distance >0.5mi
- Grade categories: flat (0-2°), gentle (2-5°), moderate (5-10°), steep (10-15°), very steep (15°+)
- Distance formatting: feet with commas <1mi, miles with decimal ≥1mi
- Instruction format: Head {cardinal}, gaining/descending X ft ({grade} uphill/downhill) — {dist}

Co-Authored-By: Claude <noreply@anthropic.com>
2026-05-09 05:05:00 +00:00
d8f84ab55a offroute: revert off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:34:37 +00:00
b4e33eb048 offroute: PostGIS entry points with 100m densification and land_status tagging
- Migrate EntryPointIndex from SQLite to PostGIS (padus database)
- Densify highway LineStrings at 100m intervals via Shapely interpolate
- 2.94M entry points from 476k lines (4x more coverage)
- Tag each entry point with land_status via ST_Intersects against padus_sub
  - 1.64M public (56%), 1.30M unknown (44%)
- Add geography GIST index for fast radius queries (~25ms)
- Increase OFF_NETWORK_THRESHOLD_M from 10m to 50m for GPS accuracy
- PBF path and PostGIS DSN configurable via home.yaml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:28:58 +00:00
05c24f95f6 offroute: tighten off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 23:27:06 +00:00
686b35710a api: add auto mode to offroute endpoint validation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 22:37:49 +00:00
cf758476b4 offroute: add auto mode for standard driving routes
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:55:31 +00:00
87a4741b8d offroute: raise bbox limit to 2.0° (~220km coverage)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:19:04 +00:00
58347415bc offroute: bidirectional wilderness routing (all 4 scenarios)
Support all four routing scenarios:
  A: off-network → on-network (wilderness then Valhalla)
  B: off-network → off-network (wilderness, Valhalla, wilderness)
  C: on-network → off-network (Valhalla then wilderness)
  D: on-network → on-network (pure Valhalla passthrough)

Off-network detection via Valhalla /locate endpoint:
  - Snap distance > 500m = off-network

Key implementation details:
  - _locate_on_network() helper for network detection
  - route() dispatches to scenario-specific handlers
  - _pathfind_wilderness() extracted for reuse (runs MCP)
  - _valhalla_route() helper for network segments
  - _build_response() unifies GeoJSON output format

Memory management:
  - Sequential MCP runs for scenario B (not parallel)
  - gc.collect() after each MCP run
  - Bbox centered on wilderness origin, not distant destination

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:11:53 +00:00
ff0721c23e offroute: wilderness always uses foot mode for pathfinding
The wilderness segment now ALWAYS uses foot mode for MCP pathfinding.
The user's selected mode only affects:
1. Entry point selection (MODE_TO_VALID_HIGHWAYS filtering)
2. Valhalla costing for the network segment

This ensures vehicles can navigate through wilderness (on foot) to
reach roads, rather than failing when no vehicle-accessible path exists.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 19:03:31 +00:00
2252905986 feat(offroute): MVUM legal access — pathfinder integration + places panel API + boundary_mode control
MVUM Data Import:
- Downloaded USFS MVUM Roads (150,636 features) and Trails (28,741 features)
- Imported to navi.db as mvum_roads and mvum_trails tables
- Idaho coverage: ~8,994 roads and ~4,504 trails across 7 national forests
- Preserved all vehicle-class fields (ATV, MOTORCYCLE, HIGHCLEARANCEVEHICLE, etc.)
- Preserved seasonal date ranges (*_DATESOPEN fields)

New mvum.py module:
- MVUMReader class for querying MVUM data by bbox and nearest point
- parse_date_range() for seasonal date string parsing (MM/DD-MM/DD format)
- check_access() for determining open/closed status with date checking
- symbol_to_access() fallback when per-vehicle fields are null
- get_mvum_access_grid() for rasterizing MVUM to pathfinder grid

Cost function integration:
- Added mvum parameter to compute_cost_grid()
- MVUM closures respond to boundary_mode:
  * strict = impassable (np.inf)
  * pragmatic = 5x friction penalty
  * emergency = ignored entirely
- Foot mode skips MVUM (motor-vehicle specific)

Router integration:
- Loads MVUM access grid for motorized modes (mtb, atv, vehicle)
- Tracks mvum_closed_crossings in path summary

Places Panel API:
- GET /api/mvum?lat=XX&lon=XX&radius=50
- Returns MVUM feature with access status for all vehicle classes
- Includes seasonal date ranges, maintenance level, forest/district info
- GeoJSON geometry for map display

Validation:
- MVUM places endpoint tested with Sawtooth NF road
- All four modes validated with strict/pragmatic/emergency boundary modes
- Foot mode correctly ignores MVUM restrictions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:26:18 +00:00
bc463188d5 feat(offroute): Phase O4 — multi-mode cost functions (foot/mtb/atv/vehicle)
- Add ModeProfile dataclass for data-driven mode configuration
- Implement three speed functions:
  * Tobler off-path hiking (foot)
  * Herzog wheeled-transport polynomial (mtb/atv)
  * Linear speed degradation (vehicle)
- Add WildernessReader for PAD-US Des_Tp=WA wilderness areas
- Mode-specific terrain friction overrides:
  * Forest impassable for ATV/vehicle, high friction for MTB
  * Wetland/mangrove impassable for all wheeled modes
- Trail access rules:
  * Foot trails (value 25) impassable for ATV/vehicle
- Wilderness blocking for mtb/atv/vehicle modes
- Vehicle mode allows flat grassland/cropland traversal
- Memory optimization: limit entry points, constrain bbox size
- Update router to pass mode and wilderness to cost function
- Add vehicle to API mode validation

Validated all four modes with test route:
- foot: 0.46km off-network, 12.11km network, 89% on trail
- mtb: 0.47km off-network, 13.13km network, 90% on trail
- atv: 0.47km off-network, 12.81km network, 90% on trail
- vehicle: 0.46km off-network, 12.81km network, 89% on trail

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:11:56 +00:00
1a9dfc8f8d feat(offroute): Phase O3b — trail entry index, Valhalla stitching, /api/offroute endpoint
Phase A: Trail Entry Point Index
- Extract highway endpoints from idaho-latest.osm.pbf using osmium + ogr2ogr
- Store 740,430 entry points in /mnt/nav/navi.db (SQLite with spatial index)
- Entry points by class: service (271k), footway (152k), residential (146k),
  track (111k), path (26k), unclassified (16k), tertiary (9k), secondary (4k),
  primary (4k), bridleway (15)

Phase B: Pathfinder → Valhalla Stitching (router.py)
- OffrouteRouter orchestrates wilderness pathfinding + Valhalla on-network routing
- Queries entry points within 50km (expanding to 100km if needed)
- MCP pathfinder routes to nearest reachable entry point
- Calls Valhalla pedestrian/bicycle/auto costing for on-network segment
- Returns GeoJSON FeatureCollection with wilderness + network + combined segments

Phase C: Flask Endpoint
- POST /api/offroute with start/end coordinates, mode, boundary_mode
- Returns GeoJSON route with per-segment metadata and turn-by-turn maneuvers

Validated: 42.35,-114.30 → Twin Falls downtown
- Wilderness: 0.5km, 9min | Network: 36km, 413min | Total: ~421min
- 21 turn-by-turn instructions, segments connect at entry point

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 13:44:34 +00:00
3293cb4238 feat(offroute): Phase O3a — trail burn-in, pathfinder seeks trail corridors
Trail friction REPLACES land cover friction where trails exist:
- Road (value 5): 0.1× friction
- Track (value 15): 0.3× friction
- Foot trail (value 25): 0.5× friction

TrailReader loads /mnt/nav/worldcover/trails.tif rasterized from OSM highways.

Validation shows trail-seeking behavior:
- On-trail travel: 17.3% → 98.7%
- Effort time: 1047 min → 155 min (-85.2%)
- Path travels farther but stays on roads for speed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 07:26:25 +00:00
e0eedcedfd feat(offroute): Phase O2c — PAD-US barriers with three-mode boundary respect
- Add barriers.py: PAD-US raster reader + build_barriers_raster() function
- Rasterize PAD-US Pub_Access=XA (Closed) polygons to CONUS GeoTIFF
- Modify cost.py: boundary_mode parameter (strict/pragmatic/emergency)
  - strict: private land = impassable (np.inf)
  - pragmatic: private land = 5x friction penalty (default)
  - emergency: private land barriers ignored
- Modify prototype.py: three-way comparison output
- Output: padus_barriers.tif at /mnt/nav/worldcover/ (144MB, ~33m resolution)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:56:36 +00:00
26d4bc7478 feat(offroute): Phase O2b — WorldCover friction integration, lake avoidance validated
- New friction.py: reads WorldCover friction VRT, resamples to match
  elevation grid, provides point sampling for validation
- Modified cost.py: accepts optional friction array, multiplies Tobler
  time cost by friction multiplier, inf for water/nodata (255/0)
- Modified prototype.py: loads friction layer, passes to cost function,
  validates path avoids water cells (friction=255)

Validated on Idaho test bbox:
- Path avoids Murtaugh Lake (no water cells on path)
- Friction along path: min=10, max=20, mean=10.2
- Effort increased 3.4% vs Phase O1 due to friction multipliers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:33:45 +00:00
f2a0f81580 feat(offroute): Phase O1 foundation — PMTiles decoder, Tobler cost, MCP pathfinder prototype
- dem.py: Terrarium-encoded PMTiles tile reader with LRU cache
  - Decodes WebP tiles from planet-dem.pmtiles
  - Stitches tiles into numpy elevation grids for arbitrary bboxes
  - Provides pixel-to-latlon coordinate conversion

- cost.py: Tobler off-path hiking cost function
  - speed = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) km/h
  - Max slope cutoff: 40 degrees → impassable
  - Returns time-to-traverse (seconds/cell) as cost metric

- prototype.py: Standalone validation on Idaho test bbox
  - 43km × 80km bbox (~17M cells at 14m resolution)
  - scikit-image MCP_Geometric Dijkstra pathfinder
  - Outputs GeoJSON LineString with path metadata
  - Validated: 61.6km path, 21.3 hours effort time

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-07 23:43:56 +00:00
227affca9d Merge fix/pdf-extraction-quality into master 2026-05-07 01:51:05 +00:00
fa456fecb1 Merge fix/zim-table-extraction into master 2026-05-07 01:51:05 +00:00
83a21854c3 fix: PDF extraction quality — word-boundary checks and layout mode
Adds _text_quality_ok() gate that replaces the bare 50-char length
check at each stage of the extraction fallback chain. Checks:
- Word-boundary ratio (≥60% of tokens must be real words)
- Concatenation ratio (lc→UC transitions must be <10% of word count)

When PyPDF2 default extraction fails quality check, retries with
space_width=100 for tighter word-boundary detection. This fixes
Haynes/workshop manuals where tight kerning produces concatenated
words like 'byMike' and 'oftheGuild'.

Also adds -layout flag to pdftotext subprocess calls for better
spatial awareness in the poppler fallback stage.

Note: PyPDF2 3.0.1 does not support layout=True parameter.
The space_width parameter serves the same purpose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:36:23 +00:00
b741e217f6 fix: ZIM table extraction — pipe-delimited cells instead of concatenation
Pre-processes HTML tree before lxml .text_content() to prevent
element concatenation:
- <table> cells joined with ' | ' delimiter, rows with newlines
- <br> tags produce newlines
- <li> items get '- ' prefix and newline separation
- <dt>/<dd> definition list items get newline separation

Fixes ~868 mangled Qdrant points where table content was jammed
together (e.g. 'Freq51Primary1A==' instead of 'Freq51 | Primary | 1A==').

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:32:25 +00:00
30 changed files with 4524 additions and 1994 deletions

View file

@ -408,14 +408,9 @@ service:
peertube:
api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx)
api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer)
host_header: stream.echo6.co # Host header for PeerTube API requests
username: root # PeerTube admin username
password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password
public_url: https://stream.echo6.co # Public URL for video links
fetch_timeout: 30 # HTTP timeout for API/VTT requests
rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
writer_rate_limit: 0.1 # Delay between category push API calls (seconds)
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
scraper:

View file

@ -6,13 +6,13 @@ profile: home
region_name: "North America"
tileset:
url: "/tiles/na.pmtiles"
url: "/tiles/planet/current.pmtiles"
bounds: [-168, 14, -52, 72]
max_zoom: 15
attribution: "Protomaps © OSM"
tileset_hillshade:
url: "/tiles/hillshade-na.pmtiles"
url: "/tiles/planet-dem.pmtiles"
encoding: "terrarium"
max_zoom: 12
@ -33,14 +33,14 @@ services:
features:
has_nominatim_details: true
has_kiwix_wiki: false
has_kiwix_wiki: true
has_hillshade: true
has_3d_terrain: false
has_traffic_overlay: true
has_landclass: true
has_public_lands_layer: true
has_contours: true
has_contours_test: true
has_contours_test: false
has_contours_test_10ft: false
has_address_book_write: false
has_overture_enrichment: true
@ -48,7 +48,16 @@ features:
has_contacts: true
has_wiki_rewriting: true
has_wiki_discovery: false
has_usfs_trails: true
has_blm_trails: true
defaults:
center: [42.5736, -114.6066]
zoom: 10
# Offroute wilderness routing
offroute:
osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf"
densify_interval_m: 100
postgis_dsn: "dbname=padus"

View file

@ -1,14 +0,0 @@
# RECON Backlog — Technical Debt
## Qdrant Migration Follow-ups (2026-04-28)
From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact.
1. **domain_assigner.py — list-type domain handling**
`_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings.
2. **recon.py --tiebreaker-pass — Qdrant client threading**
The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is.
3. **_get_qdrant_client — debug log caller identification**
The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths.

View file

@ -1,20 +0,0 @@
# Deploy Blast Radius Reference
Quick-reference for operators during deployment of domain categorization.
| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time |
|------|-------------|------------------------------|-----------------|----------|-------------------|
| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min |
| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min |
| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min |
| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min |
| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min |
| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min |
| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count |
## Risk Tiers
- **Low risk (read-only):** `--dry-run` on any command, status display
- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration
- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users)
- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk)

View file

@ -1,117 +0,0 @@
# Domain Assignment — Algorithm & Operations Guide
## Overview
RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin.
## Data Source
Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue.
Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data.
## Algorithm
### Pass 1: Concept Domain Count (inline, per-document)
Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`.
1. Query Qdrant for all points with `doc_hash` matching the document
2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only
3. If zero concept vectors → `no_concepts` (terminal)
4. If single top domain → `assigned`
5. If tied → `tied_pass_1` (deferred to tiebreaker)
### Pass 2: Channel Tiebreaker (batch)
Runs via `assign-categories --tiebreaker-pass`.
For each `tied_pass_1` document:
1. Identify the tied domains from Qdrant
2. Look up the document's channel (`catalogue.category`)
3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual`
4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter)
5. Among the tied domains only, pick the one with the highest channel-wide concept count
6. If resolved → `tied_pass_2`
7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review)
### Channel Skip List
Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review.
Current skip list:
- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence
This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason.
## Status Values
| Status | Meaning | Terminal? | Next Action |
|--------|---------|-----------|-------------|
| `assigned` | Clear winner from pass 1 | No | Push to PeerTube |
| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` |
| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube |
| `tied_manual` | Needs human review | No | Review at `/peertube/review` |
| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) |
| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` |
| `manual_assigned` | Human override from dashboard | No | Already pushed |
**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}`
## CLI Commands
```bash
cd /opt/recon && source venv/bin/activate
# Show current assignment status
python3 recon.py assign-categories
# Pass 1: backfill all unassigned complete stream documents
python3 recon.py assign-categories --backfill --dry-run
python3 recon.py assign-categories --backfill
# Pass 2: resolve ties via channel analysis
python3 recon.py assign-categories --tiebreaker-pass
# Push all assigned-but-unpushed categories to PeerTube API
python3 recon.py assign-categories --push-pending
# Re-queue items with transient failures for full re-processing
python3 recon.py assign-categories --reprocess-missing
# Limit processing count
python3 recon.py assign-categories --backfill --limit 100
```
## Dashboard Review
The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays:
- Video title and channel
- Top concept domains with counts
- Dropdown to select the correct domain
- Assign button (pushes to PeerTube immediately)
Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI.
## Pipeline Integration
New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`:
1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client
2. If clear winner: pushes category to PeerTube immediately
3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run
4. If no concepts: marks as `no_concepts` (terminal)
5. On Qdrant error: logs warning and continues — does not block the pipeline
## Source Files
| File | Purpose |
|------|---------|
| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS |
| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers |
| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` |
| `lib/embedder.py` | Post-embed hook (passes qdrant client) |
| `lib/status.py` | DB columns + helper methods |
| `lib/api.py` | Dashboard review routes |
| `recon.py` | CLI `assign-categories` command |

View file

@ -1,426 +0,0 @@
# Domain Categorization Migration Runbook
Step-by-step procedure to deploy the PeerTube domain categorization feature.
## Prerequisites
- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out)
- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170)
- PeerTube admin credentials (`root` / password in `.env`)
## Pre-Deploy Backups
These backups MUST be completed before any state-changing step.
### 1. Snapshot RECON database
```bash
ssh zvx@192.168.1.130
cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak"
ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm
```
### 2. Snapshot PeerTube PostgreSQL
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql"
ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero
```
### 3. Verify off-site concept backup
```bash
# Check last rsync to Contabo
ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5'
ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5'
# Confirm timestamps match within 6 hours
```
### 4. Confirm RECON service state
```bash
ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager'
# Note: do NOT restart until Step 3. If currently running, confirm no active
# enrichment/embedding workers before proceeding.
```
---
## Step 1: Deploy PeerTube Plugin to CT 110
```bash
# From recon-vm, copy plugin to CT 110
ssh zvx@192.168.1.130
cd /opt/recon/peertube-plugin/
scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Or via the Proxmox host:
ssh root@192.168.1.243 # media host
pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Copy files into the container (scp from recon-vm or use pct push)
```
Alternative: Install via PeerTube admin UI (Admin > Plugins > Install).
```bash
# Restart PeerTube to register plugin
ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube'
```
**STOP.** Check PeerTube logs for plugin registration errors:
```bash
ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin
```
If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose
and fix the plugin before continuing. See Rollback: "Plugin install fails" below.
## Step 2: Verify Plugin
```bash
# From recon-vm
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"'
```
Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed.
Run the parity test:
```bash
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
```
## Step 3: Apply Schema Migration
**Requires RECON restart (ask user first).**
```bash
sudo systemctl restart recon
```
The migration runs automatically on startup via `StatusDB._init_db()`. Verify:
```bash
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()]
for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']:
assert c in cols, f'Missing: {c}'
print(f' {c}: OK')
# Verify index exists
indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()]
assert 'idx_documents_recon_domain_status' in indexes, 'Missing index'
print(' idx_documents_recon_domain_status: OK')
# Verify no columns were dropped
expected_existing = ['hash', 'status', 'filename', 'discovered_at']
for c in expected_existing:
assert c in cols, f'ALERT: existing column {c} is missing!'
print('Migration verified — all columns present, no existing columns dropped')
"
```
## Step 4: Run Backfill
```bash
cd /opt/recon && source venv/bin/activate
# Dry run first
python3 recon.py assign-categories --backfill --dry-run
```
**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks:
- ~94.8% `assigned` (clear winners)
- ~5.2% `tied_pass_1` (ties)
- ~19.5% `needs_reprocess` (missing/legacy concepts)
If the distribution deviates more than 5 percentage points from these benchmarks,
halt and investigate. Do not proceed until the deviation is explained.
```bash
# Execute pass 1
python3 recon.py assign-categories --backfill
```
**STOP.** Spot-check 20 random assigned documents:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
rows = db._get_conn().execute(
\"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\"
).fetchall()
for r in rows:
print(r['hash'][:12], r['recon_domain'])
"
```
For each, visually verify against concept files: `ls data/concepts/{hash}/` and
spot-check one `window_*.json` to confirm the assigned domain is plausible.
Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below.
```bash
# Run tiebreaker pass
python3 recon.py assign-categories --tiebreaker-pass
```
**STOP.** Verify tiebreaker results:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
c = db.get_domain_status_counts()
print('Status breakdown:', c)
print()
print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0))
print('tied_manual (needs review):', c.get('tied_manual', 0))
"
```
Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given
the channel's other content.
```bash
# Check overall status
python3 recon.py assign-categories
```
## Step 5: Push to PeerTube
Push in stages. Do NOT push all at once.
```bash
# Dry run: confirm count
python3 recon.py assign-categories --push-pending --dry-run
# Stage 1: push 100 items
python3 recon.py assign-categories --push-pending --limit 100
```
**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos
now show RECON domain categories. Spot-check 5 videos.
```bash
# Verify via API: pick a random pushed video
python3 -c "
from lib.status import StatusDB
db = StatusDB()
row = db._get_conn().execute(
\"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\"
).fetchone()
if row:
uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?'
print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}')
print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category')
"
```
```bash
# Stage 2: push 1000 items
python3 recon.py assign-categories --push-pending --limit 1000
```
**STOP.** Verify via PeerTube database:
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
```
```bash
# Stage 3: push remaining
python3 recon.py assign-categories --push-pending
```
## Step 6: Verify
```bash
# Check PeerTube database directly
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
# Check uncategorized
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"'
# Check RECON status
python3 recon.py assign-categories
```
## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION)
**WARNING:** This step deletes concept directories. It is the only destructive
operation in the entire feature. Run it separately from the initial deploy,
after all other steps are verified and stable.
```bash
# Dry run first — review what would be deleted
python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10
```
**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy
domains only, or missing concept files). The dry-run reports file counts for
each directory that would be deleted.
```bash
# Small batch
python3 recon.py assign-categories --reprocess-missing --limit 10
```
**STOP.** Verify: check that 10 items re-entered the pipeline.
```bash
python3 recon.py status # queued count should increase by ~10
```
Wait for pipeline to process them. Verify domain assignment on completion:
```bash
# Check these specific items got re-enriched and assigned
python3 recon.py assign-categories
```
```bash
# Scale up
python3 recon.py assign-categories --reprocess-missing --limit 100
# Then unbounded
python3 recon.py assign-categories --reprocess-missing
```
**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running
it is safe. Any documents stranded at `status='catalogued'` without being re-queued
can be recovered with `recon.py queue --source stream.echo6.co`.
## Step 8: Dashboard Review
Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items.
Each row shows the video, channel, tied domains, and concept counts. Select the
correct domain and click Assign.
---
## Rollback Procedures
### Plugin install fails or breaks PeerTube
```bash
# Disable plugin without uninstalling
ssh root@192.168.1.243 'pct exec 110 -- bash -c "
mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \
/var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled
systemctl restart peertube
"'
# Verify PeerTube is healthy
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head
# To fully remove: use PeerTube admin UI → Plugins → Uninstall
```
### Schema migration revert (drop new columns)
Only needed if the columns cause problems. The columns are nullable and have no
constraints, so they should be inert.
```bash
ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c "
import sqlite3
conn = sqlite3.connect(\"data/recon.db\")
for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]:
try:
conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\")
print(f\"Dropped: {col}\")
except Exception as e:
print(f\"Skip {col}: {e}\")
conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\")
conn.commit()
print(\"Index dropped\")
"'
```
Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12).
Ubuntu 24.04 ships 3.45.1 — this is fine.
### Clear wrong backfill assignments (selective or full)
```bash
cd /opt/recon && source venv/bin/activate
# Clear ALL domain assignments
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = NULL,
recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''')
conn.commit()
print('Cleared all domain assignments')
"
# Clear only tiebreaker results (reset to tied_pass_1 for re-run)
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = 'tied_pass_1',
recon_domain_assigned_at = NULL
WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''')
conn.commit()
"
```
### Clear wrong PeerTube categories
```bash
# Reset ALL RECON categories (100+) to NULL in PeerTube
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "UPDATE video SET category = NULL WHERE category >= 100"'
# Verify
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "SELECT count(*) FROM video WHERE category >= 100"'
# Should return 0
# Also clear RECON pushed timestamps so --push-pending can retry
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL')
conn.commit()
print('Cleared push timestamps')
"
```
### Restore concepts after failed --reprocess-missing
```bash
# Concept backups are on Contabo at /opt/backups/recon/concepts/
# Identify which hashes were deleted (check RECON logs)
ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20'
# Restore specific hash from Contabo
HASH=<hash_from_log>
ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \
ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/"
# Restore ALL concepts (nuclear option)
ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/'
```
### Fully remove feature
1. Uninstall plugin from PeerTube admin UI
2. Restart PeerTube
3. Revert RECON code changes (`git checkout master`)
4. Restart RECON
5. Drop schema columns (see above)
6. Reset PeerTube categories (see above)

View file

@ -88,7 +88,6 @@ KNOWLEDGE_SUBNAV = [
PEERTUBE_SUBNAV = [
{'href': '/peertube', 'label': 'Dashboard'},
{'href': '/peertube/channels', 'label': 'Channels'},
{'href': '/peertube/review', 'label': 'Review'},
]
@ -377,86 +376,6 @@ def peertube_channels():
domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels')
@app.route('/peertube/review')
def peertube_review():
from .recon_domains import VALID_DOMAINS
return render_template('peertube/review.html',
domain='peertube', subnav=PEERTUBE_SUBNAV,
active_page='/peertube/review',
valid_domains=sorted(VALID_DOMAINS))
@app.route('/api/peertube/review/stats')
def api_peertube_review_stats():
db = StatusDB()
counts = db.get_domain_status_counts()
return jsonify(counts)
@app.route('/api/peertube/review/items')
def api_peertube_review_items():
from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client
db = StatusDB()
config = get_config()
items = db.get_items_by_domain_status('tied_manual', limit=200)
qdrant = _get_qdrant_client(config)
collection = config['vector_db']['collection']
result = []
for item in items:
file_hash = item['hash']
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
top_domains = [{'domain': d, 'count': cnt}
for d, cnt in domain_counter.most_common(5)]
result.append({
'hash': file_hash,
'filename': item.get('filename', ''),
'category': item.get('category', ''),
'recon_domain': item.get('recon_domain'),
'recon_domain_status': item.get('recon_domain_status'),
'top_domains': top_domains,
})
return jsonify(result)
@app.route('/api/peertube/review/assign', methods=['POST'])
def api_peertube_review_assign():
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP
from .peertube_writer import push_category, extract_uuid
data = request.get_json()
file_hash = data.get('hash')
domain = data.get('domain')
if not file_hash or not domain:
return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400
if domain not in VALID_DOMAINS:
return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400
db = StatusDB()
config = get_config()
db.set_domain_assignment(file_hash, domain, 'manual_assigned')
# Push to PeerTube
conn = db._get_conn()
cat_row = conn.execute(
"SELECT path FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if cat_row:
uuid = extract_uuid(dict(cat_row)['path'])
if uuid:
cat_id = DOMAIN_CATEGORY_MAP[domain]
try:
push_category(uuid, cat_id, config)
db.set_peertube_pushed(file_hash)
except Exception as e:
return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'})
return jsonify({'ok': True, 'domain': domain})
@app.route('/settings/keys')
def settings_keys():
from lib.key_manager import get_key_manager
@ -2803,3 +2722,214 @@ def api_auth_whoami():
'authenticated': False,
'username': None,
})
# ── OFFROUTE API ──
@app.route("/api/offroute", methods=["POST"])
def api_offroute():
"""
Off-network routing from wilderness to destination.
Request body:
{
"start": [lat, lon],
"end": [lat, lon],
"mode": "foot" | "mtb" | "atv", (default: "foot")
"boundary_mode": "strict" | "pragmatic" | "emergency" (default: "pragmatic")
}
Response:
{
"status": "ok",
"route": { GeoJSON FeatureCollection with wilderness + network segments },
"summary": { total_distance_km, total_effort_minutes, ... }
}
"""
try:
data = request.get_json()
if not data:
return jsonify({"status": "error", "message": "No JSON body provided"}), 400
# Parse coordinates
start = data.get("start")
end = data.get("end")
if not start or not end:
return jsonify({"status": "error", "message": "Missing start or end coordinates"}), 400
if not isinstance(start, (list, tuple)) or len(start) != 2:
return jsonify({"status": "error", "message": "start must be [lat, lon]"}), 400
if not isinstance(end, (list, tuple)) or len(end) != 2:
return jsonify({"status": "error", "message": "end must be [lat, lon]"}), 400
start_lat, start_lon = float(start[0]), float(start[1])
end_lat, end_lon = float(end[0]), float(end[1])
# Parse options
mode = data.get("mode", "foot")
if mode not in ("auto", "foot", "mtb", "atv", "vehicle"):
return jsonify({"status": "error", "message": "mode must be auto, foot, mtb, atv, or vehicle"}), 400
boundary_mode = data.get("boundary_mode", "pragmatic")
if boundary_mode not in ("strict", "pragmatic", "emergency"):
return jsonify({"status": "error", "message": "boundary_mode must be strict, pragmatic, or emergency"}), 400
# Import and run router
from .offroute.router import OffrouteRouter
router = OffrouteRouter()
try:
result = router.route(
start_lat=start_lat,
start_lon=start_lon,
end_lat=end_lat,
end_lon=end_lon,
mode=mode,
boundary_mode=boundary_mode
)
finally:
router.close()
if result.get("status") == "error":
return jsonify(result), 400
return jsonify(result)
except Exception as e:
logger.exception("Offroute error")
return jsonify({"status": "error", "message": str(e)}), 500
# ── MVUM Places Panel API ──
@app.route("/api/mvum", methods=["GET"])
def api_mvum():
"""
Query MVUM (Motor Vehicle Use Map) features near a point.
Used by the Navi frontend places panel when a user taps near a road/trail.
Query params:
lat: Latitude
lon: Longitude
radius: Search radius in meters (default: 50)
Response:
{
"status": "ok",
"feature": {
"id": "FR 123",
"name": "Some Forest Road",
"forest": "Sawtooth National Forest",
"district": "Ketchum Ranger District",
"surface": "NAT",
"maintenance_level": 2,
"seasonal": "Seasonal",
"symbol": 2,
"access": {
"passenger_vehicle": { "status": "Open", "dates": "06/15-10/15" },
"high_clearance": { "status": "Open", "dates": "06/15-10/15" },
"atv": { "status": "Open", "dates": "06/15-10/15" },
...
},
"geometry": { GeoJSON LineString }
}
}
If no MVUM feature within radius:
{ "status": "ok", "feature": null }
"""
try:
lat = request.args.get("lat", type=float)
lon = request.args.get("lon", type=float)
radius = request.args.get("radius", 50, type=float)
if lat is None or lon is None:
return jsonify({"status": "error", "message": "lat and lon required"}), 400
from .offroute.mvum import MVUMReader
reader = MVUMReader()
try:
# Try roads first, then trails
feature = reader.query_nearest(lat, lon, radius, "mvum_roads")
if feature is None:
feature = reader.query_nearest(lat, lon, radius, "mvum_trails")
if feature is None:
return jsonify({"status": "ok", "feature": None})
# Format access info
access = {
"passenger_vehicle": {
"status": feature.get("passengervehicle"),
"dates": feature.get("passengervehicle_datesopen")
},
"high_clearance": {
"status": feature.get("highclearancevehicle"),
"dates": feature.get("highclearancevehicle_datesopen")
},
"atv": {
"status": feature.get("atv"),
"dates": feature.get("atv_datesopen")
},
"motorcycle": {
"status": feature.get("motorcycle"),
"dates": feature.get("motorcycle_datesopen")
},
"4wd_gt50": {
"status": feature.get("fourwd_gt50inches"),
"dates": feature.get("fourwd_gt50_datesopen")
},
"2wd_gt50": {
"status": feature.get("twowd_gt50inches"),
"dates": feature.get("twowd_gt50_datesopen")
},
"e_bike_class1": {
"status": feature.get("e_bike_class1"),
"dates": feature.get("e_bike_class1_dur")
},
"e_bike_class2": {
"status": feature.get("e_bike_class2"),
"dates": feature.get("e_bike_class2_dur")
},
"e_bike_class3": {
"status": feature.get("e_bike_class3"),
"dates": feature.get("e_bike_class3_dur")
},
}
# Parse maintenance level
maint_level = feature.get("operationalmaintlevel", "")
maint_num = None
if maint_level:
# Extract first digit: "2 - HIGH CLEARANCE VEHICLES" -> 2
import re
match = re.match(r"(\d+)", maint_level)
if match:
maint_num = int(match.group(1))
result = {
"id": feature.get("id"),
"name": feature.get("name"),
"forest": feature.get("forestname"),
"district": feature.get("districtname"),
"surface": feature.get("surfacetype"),
"maintenance_level": maint_num,
"seasonal": feature.get("seasonal"),
"symbol": feature.get("symbol"),
"trail_class": feature.get("trailclass"),
"trail_system": feature.get("trailsystem"),
"access": access,
"geometry": feature.get("geojson")
}
return jsonify({"status": "ok", "feature": result})
finally:
reader.close()
except Exception as e:
logger.exception("MVUM query error")
return jsonify({"status": "error", "message": str(e)}), 500

View file

@ -1,319 +0,0 @@
"""
RECON Domain Assigner
Computes per-video domain assignments from Qdrant vector payloads.
Two functions, two execution modes:
compute_assignment() pass 1, inline from post-embed hook
run_tiebreaker_pass() batch, resolves ties via channel concept scan
Data source: Qdrant `domain` payload field on concept vectors.
Previously read on-disk concept JSON files; migrated to Qdrant as
single source of truth (2026-04-28).
Status values written to documents.recon_domain_status:
assigned clear winner from pass 1 concept count
tied_pass_1 concept tie, awaiting channel tiebreaker
tied_pass_2 resolved by channel tiebreaker
tied_manual needs human review (dashboard)
no_concepts terminal, zero concept vectors in Qdrant
needs_reprocess transient failure (Qdrant error, etc.)
manual_assigned human override from dashboard
"""
from collections import Counter
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST
from .utils import setup_logging
logger = setup_logging('recon.domain_assigner')
def _get_qdrant_client(config):
"""Create a QdrantClient from RECON config.
Callers should create one client and pass it through rather than
calling this repeatedly.
"""
logger.debug("Creating new QdrantClient (caller did not pass one)")
return QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
def _count_domains_from_qdrant(qdrant, collection, doc_hash):
"""Count valid domain occurrences for a single document from Qdrant.
Scrolls all points matching doc_hash and counts domain values.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hash: Document hash to query
Returns:
Counter of {domain_name: count} for valid domains.
Empty Counter if no points found (never None).
"""
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash))
]),
with_payload=["domain"],
with_vectors=False,
limit=200,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes):
"""Count valid domain occurrences across multiple documents from Qdrant.
Single scroll with MatchAny filter, with offset pagination for large
result sets.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hashes: List of document hashes to query
Returns:
Counter of {domain_name: count} aggregated across all matching points.
"""
if not doc_hashes:
return Counter()
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes))
]),
with_payload=["domain"],
with_vectors=False,
limit=10000,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def compute_assignment(file_hash, db, config, qdrant=None):
"""Compute domain assignment for a single document (pass 1).
Counts domain occurrences across all concept vectors in Qdrant.
If a single domain wins, assigns it. If tied, defers to batch
tiebreaker.
Args:
file_hash: Document hash
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
(domain, status) tuple where domain is a string or None,
and status is one of: 'assigned', 'tied_pass_1', 'no_concepts',
'needs_reprocess'
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
try:
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
except Exception as e:
logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}")
return (None, 'needs_reprocess')
if len(domain_counter) == 0:
return (None, 'no_concepts')
top = domain_counter.most_common(2)
top_domain = top[0][0]
top_count = top[0][1]
if len(top) == 1 or top[1][1] < top_count:
return (top_domain, 'assigned')
# Tie — defer to tiebreaker pass
return (None, 'tied_pass_1')
def _get_tied_domains(qdrant, collection, file_hash):
"""Get the set of domains tied for first place in a document's concepts."""
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
if not domain_counter:
return []
top = domain_counter.most_common()
if not top:
return []
max_count = top[0][1]
return [dom for dom, cnt in top if cnt == max_count]
def _channel_video_hashes(db, channel_name, exclude_hash=None):
"""Get all document hashes belonging to a PeerTube channel.
Args:
db: StatusDB instance
channel_name: catalogue.category (channel actor name)
exclude_hash: Hash to exclude (the document being resolved)
Returns:
List of document hashes
"""
conn = db._get_conn()
rows = conn.execute(
"SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'",
(channel_name,)
).fetchall()
hashes = [r['hash'] for r in rows]
if exclude_hash:
hashes = [h for h in hashes if h != exclude_hash]
return hashes
def run_tiebreaker_pass(db, config, qdrant=None):
"""Resolve tied domain assignments using channel-level Qdrant analysis.
Processes all documents where recon_domain_status = 'tied_pass_1'.
For each tied document, queries Qdrant for domain counts from all
other videos in the same channel and picks the tied domain with the
highest channel-wide count.
Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip
tiebreaking and go straight to 'tied_manual' for dashboard review.
Args:
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
Dict with counts: resolved, manual, skipped, errors
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
tied_items = db.get_items_by_domain_status('tied_pass_1')
stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)}
logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve")
for item in tied_items:
file_hash = item['hash']
channel = item.get('category', '')
try:
tied_domains = _get_tied_domains(qdrant, collection, file_hash)
if not tied_domains:
db.set_domain_assignment(file_hash, None, 'no_concepts')
stats['skipped'] += 1
continue
if len(tied_domains) == 1:
# No longer tied (possibly re-enriched since pass 1)
db.set_domain_assignment(file_hash, tied_domains[0], 'assigned')
stats['resolved'] += 1
continue
# Skip-list check: known non-topical catch-all channels
if channel in MEGA_CHANNEL_SKIP_LIST:
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual")
continue
# Channel tiebreaker: count domains across all other videos in channel
other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash)
channel_domain_counts = _count_domains_from_qdrant_batch(
qdrant, collection, other_hashes
)
# Among tied domains only, pick highest channel-wide count
best_domain = None
best_count = -1
for dom in tied_domains:
c = channel_domain_counts.get(dom, 0)
if c > best_count:
best_count = c
best_domain = dom
# Check if channel tiebreaker resolved it
tied_at_channel = [d for d in tied_domains
if channel_domain_counts.get(d, 0) == best_count]
if len(tied_at_channel) == 1:
db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2')
stats['resolved'] += 1
logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)")
continue
# Still tied after channel scan — mark for manual review
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual")
except Exception as e:
logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}")
stats['errors'] += 1
logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, "
f"{stats['manual']} manual, {stats['skipped']} skipped, "
f"{stats['errors']} errors")
return stats

View file

@ -27,7 +27,13 @@ from .utils import resolve_text_dir
logger = setup_logging('recon.embedder')
# ── Classification allowlists ───────────────────────────────────────────────
from .recon_domains import VALID_DOMAINS
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
@ -255,22 +261,15 @@ def embed_single(file_hash, db, config):
if not all_concepts:
db.update_status(file_hash, 'complete', vectors_inserted=0)
# Tag stream docs with no concepts for reprocessing
_cat = db._get_conn().execute(
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if _cat and dict(_cat)['source'] == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No concepts to embed for {doc['filename']}")
return True
# Look up source and path from catalogue once per doc
# Look up source from catalogue once per doc
cat_conn = db._get_conn()
cat_row = cat_conn.execute(
"SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,)
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
source = dict(cat_row)['source'] if cat_row else ''
catalogue_path = dict(cat_row)['path'] if cat_row else ''
download_url = ''
is_web = doc.get('path', '').startswith(('http://', 'https://'))
@ -322,8 +321,6 @@ def embed_single(file_hash, db, config):
if not valid:
db.update_status(file_hash, 'complete', vectors_inserted=0)
if source == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No valid concepts to embed for {doc['filename']}")
return True
@ -404,28 +401,6 @@ def embed_single(file_hash, db, config):
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
# Post-embed hook: assign domain for PeerTube videos
if source == 'stream.echo6.co':
try:
from .domain_assigner import compute_assignment
from .peertube_writer import push_category, extract_uuid
from .recon_domains import DOMAIN_CATEGORY_MAP
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
db.set_domain_assignment(file_hash, domain, status)
if domain and status == 'assigned':
cat_id = DOMAIN_CATEGORY_MAP[domain]
uuid = extract_uuid(catalogue_path)
if uuid:
pushed, _token = push_category(uuid, cat_id, config)
if pushed:
db.set_peertube_pushed(file_hash)
logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube")
else:
logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending")
except Exception as e:
logger.warning(f"Domain assignment failed for {file_hash}: {e}")
return True
except Exception as e:

View file

@ -42,7 +42,13 @@ logger = setup_logging('recon.enricher')
STALE_ENRICHING_HOURS = 2
# ── Classification allowlists ───────────────────────────────────────────────
from .recon_domains import VALID_DOMAINS
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}

View file

@ -21,6 +21,7 @@ Config: processing.extract_workers, processing.max_pdf_size_mb,
processing.extract_timeout, processing.page_timeout
"""
import base64
import re
import json
import os
import random
@ -99,6 +100,40 @@ def _is_transient(error_str):
return any(sig in s for sig in transient_signals)
def _text_quality_ok(text, min_length=50):
"""Check if extracted text meets quality thresholds.
Beyond the basic length check, validates:
- Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars)
- Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count
Returns True if text passes all checks.
"""
text = text.strip()
if len(text) < min_length:
return False
words = text.split()
if not words:
return False
# Word-like ratio: tokens with 2+ alphabetic characters
word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2)
word_ratio = word_like / len(words)
if word_ratio < 0.60:
return False
# Concatenation detector: lowercase immediately followed by uppercase
# Filter out common camelCase patterns in code (short tokens)
concat_hits = len(re.findall(r'[a-z][A-Z]', text))
concat_ratio = concat_hits / len(words) if words else 0
if concat_ratio > 0.10:
return False
return True
def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30):
"""Render a single PDF page to PNG bytes using pdftoppm.
@ -224,7 +259,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# Method 1: pdftotext (poppler)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num_0indexed + 1),
['pdftotext', '-layout', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
@ -233,7 +268,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 2: pdftoppm + Tesseract OCR
@ -258,7 +293,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 3: Gemini Vision (last resort)
@ -276,8 +311,26 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# ── Core extraction functions ──
def _pypdf2_extract(reader, page_num):
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout."""
return reader.pages[page_num].extract_text() or ''
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout.
Tries default extraction first (space_width=200). If quality check fails,
retries with space_width=100 which better detects word boundaries in
tightly-kerned PDFs (common in Haynes/workshop manuals).
Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter
controls word-boundary detection tolerance. Lower values = more aggressive
space insertion between characters.
"""
text = reader.pages[page_num].extract_text() or ''
if _text_quality_ok(text):
return text
# Retry with tighter word-boundary detection
text_tight = reader.pages[page_num].extract_text(space_width=100.0) or ''
if len(text_tight.strip()) >= len(text.strip()):
return text_tight
return text
def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
@ -302,13 +355,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
text = ''
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pypdf2'
# Method 2: pdftotext via subprocess (inherently timeout-safe)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()):
@ -316,7 +369,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 3: pdftoppm + Tesseract OCR
@ -340,7 +393,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs)

1
lib/offroute/__init__.py Normal file
View file

@ -0,0 +1 @@
"""OFFROUTE: Off-network effort-based routing module."""

440
lib/offroute/barriers.py Normal file
View file

@ -0,0 +1,440 @@
"""
PAD-US barrier and wilderness layers for OFFROUTE.
Provides access to:
1. Barrier raster (Pub_Access = 'XA' - closed/restricted areas)
2. Wilderness raster (Des_Tp = 'WA' - designated wilderness areas)
Build functions rasterize PAD-US geodatabase to aligned GeoTIFFs.
Runtime functions read the rasters and resample to match elevation grids.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
import subprocess
import tempfile
import os
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for barriers layer support")
# Paths
DEFAULT_BARRIERS_PATH = Path("/mnt/nav/worldcover/padus_barriers.tif")
DEFAULT_WILDERNESS_PATH = Path("/mnt/nav/worldcover/wilderness.tif")
PADUS_GDB_PATH = Path("/mnt/nav/padus/PADUS4_0_Geodatabase.gdb")
PADUS_LAYER = "PADUS4_0Combined_Proclamation_Marine_Fee_Designation_Easement"
# CONUS bounding box in WGS84
CONUS_BOUNDS = {
"west": -125.0,
"east": -66.0,
"south": 24.0,
"north": 50.0,
}
# Resolution in degrees (~30m at mid-latitudes)
PIXEL_SIZE = 0.0003 # ~33m
class BarrierReader:
"""Reader for PAD-US barrier raster (closed/restricted areas)."""
def __init__(self, barrier_path: Path = DEFAULT_BARRIERS_PATH):
self.barrier_path = barrier_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.barrier_path.exists():
raise FileNotFoundError(
f"Barrier raster not found at {self.barrier_path}. "
f"Run build_barriers_raster() first."
)
self._dataset = rasterio.open(self.barrier_path)
return self._dataset
def get_barrier_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get barrier values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 barrier values:
255 = closed/restricted (impassable when respect_boundaries=True)
0 = public/accessible
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
barriers = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return barriers
def sample_point(self, lat: float, lon: float) -> int:
"""Sample barrier value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
class WildernessReader:
"""Reader for PAD-US wilderness raster (designated wilderness areas)."""
def __init__(self, wilderness_path: Path = DEFAULT_WILDERNESS_PATH):
self.wilderness_path = wilderness_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.wilderness_path.exists():
raise FileNotFoundError(
f"Wilderness raster not found at {self.wilderness_path}. "
f"Run build_wilderness_raster() first."
)
self._dataset = rasterio.open(self.wilderness_path)
return self._dataset
def get_wilderness_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get wilderness values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 wilderness values:
255 = designated wilderness area
0 = not wilderness
"""
ds = self._open()
window = from_bounds(west, south, east, north, ds.transform)
wilderness = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return wilderness
def sample_point(self, lat: float, lon: float) -> int:
"""Sample wilderness value at a single point."""
ds = self._open()
row, col = ds.index(lon, lat)
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def build_barriers_raster(
output_path: Path = DEFAULT_BARRIERS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US barriers raster from the source geodatabase.
Extracts polygons where Pub_Access = 'XA' (Closed) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US barriers raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
closed_gpkg = Path(tmpdir) / "closed_areas.gpkg"
print(f"\n[1/3] Extracting closed areas (Pub_Access = 'XA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(closed_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Pub_Access = 'XA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "closed_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(closed_gpkg), "closed_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing closed areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "closed_areas",
str(closed_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
closed_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {closed_count} closed cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
def build_wilderness_raster(
output_path: Path = DEFAULT_WILDERNESS_PATH,
gdb_path: Path = PADUS_GDB_PATH,
pixel_size: float = PIXEL_SIZE,
bounds: dict = CONUS_BOUNDS,
) -> Path:
"""
Build the PAD-US wilderness raster from the source geodatabase.
Extracts polygons where Des_Tp = 'WA' (Wilderness Area) and rasterizes them.
"""
import shutil
if not gdb_path.exists():
raise FileNotFoundError(f"PAD-US geodatabase not found at {gdb_path}")
if not shutil.which('ogr2ogr'):
raise RuntimeError("ogr2ogr not found. Install GDAL.")
if not shutil.which('gdal_rasterize'):
raise RuntimeError("gdal_rasterize not found. Install GDAL.")
output_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Building PAD-US wilderness raster...")
print(f" Source: {gdb_path}")
print(f" Output: {output_path}")
print(f" Pixel size: {pixel_size} degrees (~{pixel_size * 111000:.0f}m)")
print(f" Bounds: {bounds}")
with tempfile.TemporaryDirectory() as tmpdir:
wilderness_gpkg = Path(tmpdir) / "wilderness_areas.gpkg"
print(f"\n[1/3] Extracting wilderness areas (Des_Tp = 'WA')...")
ogr_cmd = [
"ogr2ogr",
"-f", "GPKG",
str(wilderness_gpkg),
str(gdb_path),
PADUS_LAYER,
"-where", "Des_Tp = 'WA'",
"-t_srs", "EPSG:4326",
"-nlt", "MULTIPOLYGON",
"-nln", "wilderness_areas",
]
result = subprocess.run(ogr_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"ogr2ogr failed: {result.stderr}")
info_cmd = ["ogrinfo", "-so", str(wilderness_gpkg), "wilderness_areas"]
info_result = subprocess.run(info_cmd, capture_output=True, text=True)
print(f" Extraction result:\n{info_result.stdout}")
print(f"\n[2/3] Creating raster grid...")
width = int((bounds['east'] - bounds['west']) / pixel_size)
height = int((bounds['north'] - bounds['south']) / pixel_size)
print(f" Grid size: {width} x {height} pixels")
print(f"\n[3/3] Rasterizing wilderness areas...")
rasterize_cmd = [
"gdal_rasterize",
"-burn", "255",
"-init", "0",
"-a_nodata", "0",
"-te", str(bounds['west']), str(bounds['south']),
str(bounds['east']), str(bounds['north']),
"-tr", str(pixel_size), str(pixel_size),
"-ot", "Byte",
"-co", "COMPRESS=LZW",
"-co", "TILED=YES",
"-l", "wilderness_areas",
str(wilderness_gpkg),
str(output_path),
]
result = subprocess.run(rasterize_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"STDERR: {result.stderr}")
raise RuntimeError(f"gdal_rasterize failed: {result.stderr}")
print(f"\n[Done] Verifying output...")
with rasterio.open(output_path) as ds:
print(f" Size: {ds.width} x {ds.height}")
print(f" CRS: {ds.crs}")
sample = ds.read(1, window=rasterio.windows.Window(0, 0, 1000, 1000))
wilderness_count = np.sum(sample == 255)
print(f" Sample (1000x1000): {wilderness_count} wilderness cells")
file_size = output_path.stat().st_size / (1024**2)
print(f" File size: {file_size:.1f} MB")
return output_path
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "build":
print("=" * 60)
print("PAD-US Barriers Raster Build")
print("=" * 60)
build_barriers_raster()
elif cmd == "build-wilderness":
print("=" * 60)
print("PAD-US Wilderness Raster Build")
print("=" * 60)
build_wilderness_raster()
elif cmd == "build-all":
print("=" * 60)
print("Building all PAD-US rasters")
print("=" * 60)
build_barriers_raster()
print("\n")
build_wilderness_raster()
else:
print(f"Unknown command: {cmd}")
print("Usage:")
print(" python barriers.py build # Build barriers raster")
print(" python barriers.py build-wilderness # Build wilderness raster")
print(" python barriers.py build-all # Build both rasters")
sys.exit(1)
else:
# Test readers
print("Testing BarrierReader...")
if not DEFAULT_BARRIERS_PATH.exists():
print(f"Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
print(f"Run: python barriers.py build")
sys.exit(1)
reader = BarrierReader()
barriers = reader.get_barrier_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"\nBarrier grid shape: {barriers.shape}")
print(f"Unique values: {np.unique(barriers)}")
closed_cells = np.sum(barriers == 255)
print(f"Closed cells: {closed_cells} ({100*closed_cells/barriers.size:.2f}%)")
reader.close()
print("\nTesting WildernessReader...")
if not DEFAULT_WILDERNESS_PATH.exists():
print(f"Wilderness raster not found at {DEFAULT_WILDERNESS_PATH}")
print(f"Run: python barriers.py build-wilderness")
else:
wilderness_reader = WildernessReader()
wilderness = wilderness_reader.get_wilderness_grid(
south=42.2, north=42.6, west=-114.8, east=-113.8,
target_shape=(400, 1000)
)
print(f"Wilderness grid shape: {wilderness.shape}")
print(f"Unique values: {np.unique(wilderness)}")
wilderness_cells = np.sum(wilderness == 255)
print(f"Wilderness cells: {wilderness_cells} ({100*wilderness_cells/wilderness.size:.2f}%)")
wilderness_reader.close()
print("\nDone.")

494
lib/offroute/cost.py Normal file
View file

@ -0,0 +1,494 @@
"""
Multi-mode travel cost functions for OFFROUTE.
Supports four travel modes: foot, mtb, atv, vehicle.
Each mode has its own speed function, max slope, trail access rules,
and terrain friction overrides.
Mode profiles are data-driven adding a new mode means adding a profile entry.
"""
import math
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Literal, Dict, Callable
# ═══════════════════════════════════════════════════════════════════════════════
# SPEED FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_off_path_speed(grade: np.ndarray, base_speed: float = 6.0) -> np.ndarray:
"""
Tobler off-path hiking function.
W = 0.6 * base_speed * exp(-3.5 * |S + 0.05|)
Peak ~3.6 km/h at grade = -0.05 (slight downhill).
The 0.6 multiplier is the off-trail penalty.
"""
return 0.6 * base_speed * np.exp(-3.5 * np.abs(grade + 0.05))
def herzog_wheeled_speed(grade: np.ndarray, base_speed: float = 12.0) -> np.ndarray:
"""
Herzog wheeled-transport polynomial.
Relative speed factor:
1 / (1337.8·S^6 + 278.19·S^5 517.39·S^4 78.199·S^3 + 93.419·S^2 + 19.825·|S| + 1.64)
Multiply by base_speed to get km/h.
"""
S = grade
S_abs = np.abs(S)
# Herzog polynomial (returns relative speed factor 0-1)
denom = (1337.8 * S**6 + 278.19 * S**5 - 517.39 * S**4
- 78.199 * S**3 + 93.419 * S**2 + 19.825 * S_abs + 1.64)
# Avoid division by zero and negative speeds
denom = np.maximum(denom, 0.1)
rel_speed = 1.0 / denom
# Clamp relative speed to reasonable bounds (0.05 to 1.5)
rel_speed = np.clip(rel_speed, 0.05, 1.5)
return base_speed * rel_speed
def linear_degrade_speed(grade: np.ndarray, base_speed: float = 40.0, max_grade: float = 0.364) -> np.ndarray:
"""
Linear speed degradation with slope.
speed = base_speed * max(0, 1 - |grade| / max_grade)
max_grade = tan(20°) 0.364 for 20° max slope.
"""
speed = base_speed * np.maximum(0, 1.0 - np.abs(grade) / max_grade)
return np.maximum(speed, 0.1) # Minimum crawl speed
# ═══════════════════════════════════════════════════════════════════════════════
# MODE PROFILES (Data-driven configuration)
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class ModeProfile:
"""Configuration for a travel mode."""
name: str
description: str
# Speed function parameters
speed_function: str # "tobler", "herzog", "linear"
base_speed_kmh: float
max_slope_deg: float
# Trail access: trail_value -> friction multiplier (None = impassable)
# Trail values: 5=road, 15=track, 25=foot trail
trail_friction: Dict[int, Optional[float]] = field(default_factory=dict)
# Off-trail terrain friction overrides (by WorldCover class)
# These MULTIPLY the base WorldCover friction
# None = use default, np.inf = impassable
# WorldCover values: 10=tree, 20=shrub, 30=grass, 40=crop, 50=urban,
# 60=bare, 80=water, 90=wetland, 95=mangrove, 100=moss
terrain_friction_override: Dict[int, Optional[float]] = field(default_factory=dict)
# Should wilderness areas be impassable?
wilderness_impassable: bool = False
# For vehicle mode: can traverse off-trail flat terrain?
off_trail_flat_threshold_deg: float = 0.0 # 0 = no off-trail allowed
off_trail_flat_friction: float = np.inf # friction if allowed
# Define all mode profiles
MODE_PROFILES: Dict[str, ModeProfile] = {
"foot": ModeProfile(
name="foot",
description="Hiking on foot (Tobler off-path model)",
speed_function="tobler",
base_speed_kmh=6.0,
max_slope_deg=40.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
},
terrain_friction_override={
# Use default WorldCover friction for foot mode
},
wilderness_impassable=False,
),
"mtb": ModeProfile(
name="mtb",
description="Mountain bike / dirt bike (Herzog wheeled model)",
speed_function="herzog",
base_speed_kmh=12.0,
max_slope_deg=25.0,
trail_friction={
5: 0.1, # road
15: 0.2, # track
25: 0.5, # foot trail (rideable but slow)
},
terrain_friction_override={
30: 2.0, # Grassland: rideable but slow
20: 4.0, # Shrubland: barely rideable
10: 8.0, # Tree cover/forest: effectively impassable
60: 3.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"atv": ModeProfile(
name="atv",
description="ATV / side-by-side (Herzog wheeled model, higher base speed)",
speed_function="herzog",
base_speed_kmh=25.0,
max_slope_deg=30.0,
trail_friction={
5: 0.1, # road
15: 0.3, # track
25: None, # foot trail: impassable (too narrow)
},
terrain_friction_override={
30: 1.5, # Grassland: passable
20: 3.0, # Shrubland: rough
10: np.inf, # Forest: impassable
60: 2.0, # Bare/rocky
90: np.inf, # Wetland: impassable
95: np.inf, # Mangrove: impassable
80: np.inf, # Water: impassable
},
wilderness_impassable=True,
),
"vehicle": ModeProfile(
name="vehicle",
description="4x4 truck / jeep (linear speed degradation)",
speed_function="linear",
base_speed_kmh=40.0,
max_slope_deg=20.0,
trail_friction={
5: 0.1, # road
15: 0.5, # track (rough but passable)
25: None, # foot trail: impassable
},
terrain_friction_override={
# All off-trail terrain is impassable by default
10: np.inf, # Forest
20: np.inf, # Shrubland
30: np.inf, # Grassland (except flat - see below)
40: np.inf, # Cropland (except flat - see below)
60: np.inf, # Bare
90: np.inf, # Wetland
95: np.inf, # Mangrove
80: np.inf, # Water
},
wilderness_impassable=True,
off_trail_flat_threshold_deg=5.0, # Can drive on flat fields
off_trail_flat_friction=5.0, # But very slow
),
}
# Pragmatic mode friction multiplier for private land
PRAGMATIC_BARRIER_MULTIPLIER = 5.0
# ═══════════════════════════════════════════════════════════════════════════════
# COST GRID COMPUTATION
# ═══════════════════════════════════════════════════════════════════════════════
def compute_cost_grid(
elevation: np.ndarray,
cell_size_m: float,
cell_size_lat_m: float = None,
cell_size_lon_m: float = None,
friction: Optional[np.ndarray] = None,
friction_raw: Optional[np.ndarray] = None,
trails: Optional[np.ndarray] = None,
barriers: Optional[np.ndarray] = None,
wilderness: Optional[np.ndarray] = None,
mvum: Optional[np.ndarray] = None,
boundary_mode: Literal["strict", "pragmatic", "emergency"] = "pragmatic",
mode: Literal["foot", "mtb", "atv", "vehicle"] = "foot"
) -> np.ndarray:
"""
Compute isotropic travel cost grid from elevation data.
Args:
elevation: 2D array of elevation values in meters
cell_size_m: Average cell size in meters
cell_size_lat_m: Cell size in latitude direction (optional)
cell_size_lon_m: Cell size in longitude direction (optional)
friction: Optional 2D array of friction multipliers (WorldCover).
Values should be float (1.0 = baseline, 2.0 = 2x slower).
np.inf marks impassable cells.
friction_raw: Optional 2D array of raw WorldCover class values (uint8).
Used for mode-specific terrain overrides.
Values: 10=tree, 20=shrub, 30=grass, etc.
trails: Optional 2D array of trail values (uint8).
0 = no trail, 5 = road, 15 = track, 25 = foot trail
barriers: Optional 2D array of barrier values (uint8).
255 = closed/restricted area (PAD-US Pub_Access = XA).
wilderness: Optional[np.ndarray] of wilderness values (uint8).
255 = designated wilderness area.
mvum: Optional[np.ndarray] of MVUM access values (uint8).
0 = no MVUM data, 1 = open, 255 = closed to this mode.
MVUM closures respond to boundary_mode (strict/pragmatic/emergency).
Foot mode should pass None (MVUM is motor-vehicle specific).
boundary_mode: How to handle barriers ("strict", "pragmatic", "emergency")
mode: Travel mode ("foot", "mtb", "atv", "vehicle")
Returns:
2D array of travel cost in seconds per cell.
np.inf for impassable cells.
"""
if boundary_mode not in ("strict", "pragmatic", "emergency"):
raise ValueError(f"boundary_mode must be 'strict', 'pragmatic', or 'emergency'")
if mode not in MODE_PROFILES:
raise ValueError(f"mode must be one of {list(MODE_PROFILES.keys())}")
profile = MODE_PROFILES[mode]
if cell_size_lat_m is None:
cell_size_lat_m = cell_size_m
if cell_size_lon_m is None:
cell_size_lon_m = cell_size_m
rows, cols = elevation.shape
# ─── Compute gradients (in-place where possible) ─────────────────────────
# Use float32 to reduce memory footprint
grade = np.zeros(elevation.shape, dtype=np.float32)
# Compute dy contribution to grade squared
dy_contrib = np.zeros(elevation.shape, dtype=np.float32)
dy_contrib[1:-1, :] = ((elevation[:-2, :] - elevation[2:, :]) / (2 * cell_size_lat_m)) ** 2
dy_contrib[0, :] = ((elevation[0, :] - elevation[1, :]) / cell_size_lat_m) ** 2
dy_contrib[-1, :] = ((elevation[-2, :] - elevation[-1, :]) / cell_size_lat_m) ** 2
# Compute dx contribution and add to dy_contrib in-place
dy_contrib[:, 1:-1] += ((elevation[:, 2:] - elevation[:, :-2]) / (2 * cell_size_lon_m)) ** 2
dy_contrib[:, 0] += ((elevation[:, 1] - elevation[:, 0]) / cell_size_lon_m) ** 2
dy_contrib[:, -1] += ((elevation[:, -1] - elevation[:, -2]) / cell_size_lon_m) ** 2
# grade = sqrt(dx^2 + dy^2)
np.sqrt(dy_contrib, out=grade)
del dy_contrib # Free memory immediately
# ─── Compute speed based on mode ─────────────────────────────────────────
max_grade_val = np.tan(np.radians(profile.max_slope_deg))
if profile.speed_function == "tobler":
speed_kmh = tobler_off_path_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "herzog":
speed_kmh = herzog_wheeled_speed(grade, profile.base_speed_kmh)
elif profile.speed_function == "linear":
speed_kmh = linear_degrade_speed(grade, profile.base_speed_kmh, max_grade_val)
else:
raise ValueError(f"Unknown speed function: {profile.speed_function}")
# ─── Base cost (seconds per cell) ─────────────────────────────────────────
avg_cell_size = (cell_size_lat_m + cell_size_lon_m) / 2
cost = (avg_cell_size * 3.6) / speed_kmh
del speed_kmh
# ─── Max slope limit ──────────────────────────────────────────────────────
cost[grade > max_grade_val] = np.inf
# ─── NaN elevations ──────────────────────────────────────────────────────
cost[np.isnan(elevation)] = np.inf
# ─── Apply friction in-place ─────────────────────────────────────────────
# Instead of creating effective_friction copy, apply directly to cost
# Start with base friction
if friction is not None:
if friction.shape != elevation.shape:
raise ValueError(f"Friction shape mismatch")
np.multiply(cost, friction, out=cost)
# ─── Mode-specific terrain friction overrides (memory-efficient) ─────────
if friction_raw is not None and profile.terrain_friction_override:
if friction_raw.shape != elevation.shape:
raise ValueError(f"Friction_raw shape mismatch")
# Process all overrides without creating large intermediate masks
for wc_class, override in profile.terrain_friction_override.items():
if override is not None:
if override == np.inf:
# Use np.where for in-place-like behavior
np.putmask(cost, friction_raw == wc_class, np.inf)
else:
# Multiply cost where friction_raw matches
# Using a loop with putmask is more memory efficient
mask = friction_raw == wc_class
cost[mask] *= override
del mask
# ─── Vehicle mode: allow flat grassland/cropland ─────────────────────────
if mode == "vehicle" and profile.off_trail_flat_threshold_deg > 0:
if friction_raw is not None:
# Compute slope in degrees for flat terrain check
slope_deg = np.degrees(np.arctan(grade))
# Flat grassland or cropland - recompute cost for these cells
flat_field_mask = (
(slope_deg <= profile.off_trail_flat_threshold_deg) &
((friction_raw == 30) | (friction_raw == 40))
)
del slope_deg
# Recalculate cost for these cells with flat field friction
if np.any(flat_field_mask):
base_time = avg_cell_size * 3.6 / linear_degrade_speed(
grade[flat_field_mask], profile.base_speed_kmh, max_grade_val
)
cost[flat_field_mask] = base_time * profile.off_trail_flat_friction
del base_time
del flat_field_mask
# ─── Trail friction (mode-specific) ──────────────────────────────────────
if trails is not None:
if trails.shape != elevation.shape:
raise ValueError(f"Trails shape mismatch")
for trail_value, trail_friction in profile.trail_friction.items():
if trail_friction is None:
# Impassable for this mode
np.putmask(cost, trails == trail_value, np.inf)
else:
# Trail friction REPLACES terrain friction
# Recalculate cost = base_time * trail_friction
trail_mask = trails == trail_value
if np.any(trail_mask):
# Get base travel time (without friction)
if profile.speed_function == "tobler":
trail_speed = tobler_off_path_speed(grade[trail_mask], profile.base_speed_kmh)
elif profile.speed_function == "herzog":
trail_speed = herzog_wheeled_speed(grade[trail_mask], profile.base_speed_kmh)
else:
trail_speed = linear_degrade_speed(
grade[trail_mask], profile.base_speed_kmh, max_grade_val
)
cost[trail_mask] = (avg_cell_size * 3.6 / trail_speed) * trail_friction
del trail_speed
del trail_mask
# ─── Wilderness areas (mode-specific) ────────────────────────────────────
if wilderness is not None and profile.wilderness_impassable:
if wilderness.shape != elevation.shape:
raise ValueError(f"Wilderness shape mismatch")
np.putmask(cost, wilderness == 255, np.inf)
# ─── Barriers (private land) ─────────────────────────────────────────────
if barriers is not None and boundary_mode != "emergency":
if barriers.shape != elevation.shape:
raise ValueError(f"Barriers shape mismatch")
if boundary_mode == "strict":
np.putmask(cost, barriers == 255, np.inf)
elif boundary_mode == "pragmatic":
barrier_mask = barriers == 255
cost[barrier_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del barrier_mask
# ─── MVUM closures (motor vehicle restrictions) ──────────────────────────
# MVUM only applies to motorized modes, not foot. Foot mode should pass mvum=None.
# MVUM closures respond to the same boundary_mode as PAD-US barriers:
# "strict" = MVUM-closed road/trail is impassable
# "pragmatic" = MVUM-closed road/trail gets 5× friction penalty
# "emergency" = MVUM closures ignored entirely
if mvum is not None and mode != "foot" and boundary_mode != "emergency":
if mvum.shape != elevation.shape:
raise ValueError(f"MVUM shape mismatch")
# Value 255 = road/trail exists but is closed to this mode
mvum_closed_mask = mvum == 255
if boundary_mode == "strict":
np.putmask(cost, mvum_closed_mask, np.inf)
elif boundary_mode == "pragmatic":
cost[mvum_closed_mask] *= PRAGMATIC_BARRIER_MULTIPLIER
del mvum_closed_mask
return cost
# ═══════════════════════════════════════════════════════════════════════════════
# LEGACY API (backward compatibility)
# ═══════════════════════════════════════════════════════════════════════════════
def tobler_speed(grade: float) -> float:
"""Legacy single-value Tobler speed function."""
return 0.6 * 6.0 * math.exp(-3.5 * abs(grade + 0.05))
# ═══════════════════════════════════════════════════════════════════════════════
# TESTING
# ═══════════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
print("=" * 70)
print("OFFROUTE Multi-Mode Cost Function Tests")
print("=" * 70)
print("\n[1] Speed functions at various grades:")
print(f"{'Grade':<10} {'Foot':<12} {'MTB':<12} {'ATV':<12} {'Vehicle':<12}")
print("-" * 60)
for grade_val in [-0.3, -0.1, 0.0, 0.1, 0.2, 0.3]:
grade_arr = np.array([grade_val])
foot = tobler_off_path_speed(grade_arr, 6.0)[0]
mtb = herzog_wheeled_speed(grade_arr, 12.0)[0]
atv = herzog_wheeled_speed(grade_arr, 25.0)[0]
veh = linear_degrade_speed(grade_arr, 40.0, np.tan(np.radians(20)))[0]
print(f"{grade_val:+.2f} {foot:>6.2f} km/h {mtb:>6.2f} km/h {atv:>6.2f} km/h {veh:>6.2f} km/h")
print("\n[2] Mode profiles:")
for name, profile in MODE_PROFILES.items():
print(f"\n {name.upper()}: {profile.description}")
print(f" Max slope: {profile.max_slope_deg}°")
print(f" Trail access: {profile.trail_friction}")
print(f" Wilderness blocked: {profile.wilderness_impassable}")
print("\n[3] Cost grid test (flat terrain, forest):")
elev = np.ones((10, 10), dtype=np.float32) * 1000
friction = np.ones((10, 10), dtype=np.float32) * 2.0 # Forest friction
friction_raw = np.ones((10, 10), dtype=np.uint8) * 10 # Tree cover class
trails = np.zeros((10, 10), dtype=np.uint8)
trails[5, :] = 5 # Road across middle
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
friction=friction,
friction_raw=friction_raw,
trails=trails,
mode=mode_name
)
off_trail_cost = cost[0, 0]
road_cost = cost[5, 0]
impassable = np.sum(np.isinf(cost))
print(f" {mode_name:8s}: off-trail={off_trail_cost:>8.1f}s, road={road_cost:>6.1f}s, impassable={impassable}")
print("\n[4] Wilderness blocking test:")
wilderness = np.zeros((10, 10), dtype=np.uint8)
wilderness[3:7, 3:7] = 255
for mode_name in ["foot", "mtb", "atv", "vehicle"]:
cost = compute_cost_grid(
elev, cell_size_m=30.0,
wilderness=wilderness,
mode=mode_name
)
wilderness_impassable = np.sum(np.isinf(cost[3:7, 3:7]))
print(f" {mode_name:8s}: wilderness cells impassable = {wilderness_impassable}/16")
print("\nDone.")

190
lib/offroute/dem.py Normal file
View file

@ -0,0 +1,190 @@
"""
DEM tile reader for OFFROUTE.
Reads elevation tiles from planet-dem.pmtiles (Terrarium-encoded WebP),
decodes them into numpy arrays, and provides a stitched elevation grid
for a given bounding box.
"""
import math
from functools import lru_cache
from io import BytesIO
from pathlib import Path
from typing import Tuple, Optional
import numpy as np
from PIL import Image
from pmtiles.reader import MmapSource, Reader as PMTilesReader
# Default path to the planet DEM PMTiles file
DEFAULT_DEM_PATH = Path("/mnt/nas/nav/planet-dem.pmtiles")
# Tile size in pixels (z12 tiles are 512x512 in this tileset)
TILE_SIZE = 512
# Zoom level to use for elevation data
ZOOM_LEVEL = 12
def terrarium_decode(rgb_array: np.ndarray) -> np.ndarray:
"""
Decode Terrarium-encoded RGB values to elevation in meters.
Formula: elevation = (R * 256 + G + B/256) - 32768
"""
r = rgb_array[:, :, 0].astype(np.float32)
g = rgb_array[:, :, 1].astype(np.float32)
b = rgb_array[:, :, 2].astype(np.float32)
elevation = (r * 256.0 + g + b / 256.0) - 32768.0
return elevation
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
"""Convert lat/lon to tile coordinates at given zoom level."""
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_rad = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return x, y
def tile_to_lat_lon(x: int, y: int, zoom: int) -> Tuple[float, float, float, float]:
"""Convert tile coordinates to bounding box (north, south, west, east)."""
n = 2 ** zoom
lon_west = x / n * 360.0 - 180.0
lon_east = (x + 1) / n * 360.0 - 180.0
lat_north = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n))))
lat_south = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n))))
return lat_north, lat_south, lon_west, lon_east
class DEMReader:
"""Reader for Terrarium-encoded DEM tiles from PMTiles."""
def __init__(self, pmtiles_path: Path = DEFAULT_DEM_PATH, tile_cache_size: int = 128):
self.pmtiles_path = pmtiles_path
self._source = MmapSource(open(pmtiles_path, "rb"))
self._reader = PMTilesReader(self._source)
self._header = self._reader.header()
self._decode_tile = lru_cache(maxsize=tile_cache_size)(self._decode_tile_impl)
def _decode_tile_impl(self, z: int, x: int, y: int) -> Optional[np.ndarray]:
"""Fetch and decode a single tile."""
tile_data = self._reader.get(z, x, y)
if tile_data is None:
return None
img = Image.open(BytesIO(tile_data))
rgb_array = np.array(img)
if rgb_array.shape[2] == 4:
rgb_array = rgb_array[:, :, :3]
elevation = terrarium_decode(rgb_array)
return elevation
def get_elevation_grid(
self,
south: float,
north: float,
west: float,
east: float,
zoom: int = ZOOM_LEVEL
) -> Tuple[np.ndarray, dict]:
"""Get a stitched elevation grid for the given bounding box."""
x_min, y_max = lat_lon_to_tile(south, west, zoom)
x_max, y_min = lat_lon_to_tile(north, east, zoom)
n = 2 ** zoom
x_min = max(0, x_min)
x_max = min(n - 1, x_max)
y_min = max(0, y_min)
y_max = min(n - 1, y_max)
n_tiles_x = x_max - x_min + 1
n_tiles_y = y_max - y_min + 1
out_height = n_tiles_y * TILE_SIZE
out_width = n_tiles_x * TILE_SIZE
elevation = np.full((out_height, out_width), np.nan, dtype=np.float32)
for ty in range(y_min, y_max + 1):
for tx in range(x_min, x_max + 1):
tile_elev = self._decode_tile(zoom, tx, ty)
if tile_elev is not None:
out_y = (ty - y_min) * TILE_SIZE
out_x = (tx - x_min) * TILE_SIZE
elevation[out_y:out_y + TILE_SIZE, out_x:out_x + TILE_SIZE] = tile_elev
grid_north, _, grid_west, _ = tile_to_lat_lon(x_min, y_min, zoom)
_, grid_south, _, grid_east = tile_to_lat_lon(x_max, y_max, zoom)
pixel_size_lat = (grid_north - grid_south) / out_height
pixel_size_lon = (grid_east - grid_west) / out_width
origin_lat = grid_north - pixel_size_lat / 2
origin_lon = grid_west + pixel_size_lon / 2
center_lat = (south + north) / 2
lat_m = 111320.0
lon_m = 111320.0 * math.cos(math.radians(center_lat))
cell_size_lat_m = abs(pixel_size_lat) * lat_m
cell_size_lon_m = abs(pixel_size_lon) * lon_m
cell_size_m = (cell_size_lat_m + cell_size_lon_m) / 2
row_start = int((grid_north - north) / abs(pixel_size_lat))
row_end = int((grid_north - south) / abs(pixel_size_lat))
col_start = int((west - grid_west) / pixel_size_lon)
col_end = int((east - grid_west) / pixel_size_lon)
row_start = max(0, row_start)
row_end = min(out_height, row_end)
col_start = max(0, col_start)
col_end = min(out_width, col_end)
elevation = elevation[row_start:row_end, col_start:col_end]
origin_lat = grid_north - (row_start + 0.5) * abs(pixel_size_lat)
origin_lon = grid_west + (col_start + 0.5) * pixel_size_lon
metadata = {
"bounds": (south, north, west, east),
"pixel_size_lat": -abs(pixel_size_lat),
"pixel_size_lon": pixel_size_lon,
"origin_lat": origin_lat,
"origin_lon": origin_lon,
"cell_size_m": cell_size_m,
"shape": elevation.shape,
}
return elevation, metadata
def pixel_to_latlon(self, row: int, col: int, metadata: dict) -> Tuple[float, float]:
"""Convert pixel coordinates to lat/lon."""
lat = metadata["origin_lat"] + row * metadata["pixel_size_lat"]
lon = metadata["origin_lon"] + col * metadata["pixel_size_lon"]
return lat, lon
def latlon_to_pixel(self, lat: float, lon: float, metadata: dict) -> Tuple[int, int]:
"""Convert lat/lon to pixel coordinates."""
row = int((metadata["origin_lat"] - lat) / abs(metadata["pixel_size_lat"]))
col = int((lon - metadata["origin_lon"]) / metadata["pixel_size_lon"])
return row, col
def close(self):
"""Close the PMTiles file."""
pass # MmapSource handles cleanup
if __name__ == "__main__":
reader = DEMReader()
elevation, meta = reader.get_elevation_grid(
south=42.4, north=42.6, west=-114.5, east=-114.3
)
print(f"Elevation grid shape: {elevation.shape}")
print(f"Cell size: {meta['cell_size_m']:.1f} m")
print(f"Elevation range: {np.nanmin(elevation):.1f} - {np.nanmax(elevation):.1f} m")
center_row, center_col = elevation.shape[0] // 2, elevation.shape[1] // 2
lat, lon = reader.pixel_to_latlon(center_row, center_col, meta)
print(f"Center pixel lat/lon: {lat:.4f}, {lon:.4f}")
reader.close()

137
lib/offroute/friction.py Normal file
View file

@ -0,0 +1,137 @@
"""
Friction layer reader for OFFROUTE.
Reads friction values from the WorldCover friction VRT and resamples
to match the elevation grid dimensions.
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for friction layer support")
# Default path to the friction VRT
DEFAULT_FRICTION_PATH = Path("/mnt/nav/worldcover/friction/friction_conus.vrt")
class FrictionReader:
"""Reader for WorldCover friction raster."""
def __init__(self, friction_path: Path = DEFAULT_FRICTION_PATH):
self.friction_path = friction_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
self._dataset = rasterio.open(self.friction_path)
return self._dataset
def get_friction_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get friction values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 friction values, same shape as target_shape.
Values: 10-40 = friction multiplier (divide by 10)
255 = impassable
0 = nodata (treat as impassable)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor for categorical data
friction = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return friction
def sample_point(self, lat: float, lon: float) -> int:
"""Sample friction value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = nodata
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def friction_to_multiplier(friction: np.ndarray) -> np.ndarray:
"""
Convert friction values to cost multipliers.
Args:
friction: uint8 array of friction values
Returns:
float32 array of multipliers.
Values 10-40 become 1.0-4.0 (divide by 10).
Values 0 or 255 become np.inf (impassable).
"""
multiplier = friction.astype(np.float32) / 10.0
# Mark impassable cells
multiplier[friction == 0] = np.inf # nodata
multiplier[friction == 255] = np.inf # water/impassable
return multiplier
if __name__ == "__main__":
print("Testing FrictionReader...")
reader = FrictionReader()
# Test point sampling - Murtaugh Lake (should be water = 255)
lake_lat, lake_lon = 42.47, -114.15
lake_friction = reader.sample_point(lake_lat, lake_lon)
print(f"Murtaugh Lake ({lake_lat}, {lake_lon}): friction = {lake_friction}")
print(f" Expected: 255 (water/impassable)")
# Test grid read for small bbox
friction = reader.get_friction_grid(
south=42.4, north=42.5, west=-114.2, east=-114.1,
target_shape=(100, 100)
)
print(f"\nGrid test shape: {friction.shape}")
print(f"Unique values: {np.unique(friction)}")
print(f"Water cells (255): {np.sum(friction == 255)}")
reader.close()
print("\nFrictionReader test complete.")

623
lib/offroute/mvum.py Normal file
View file

@ -0,0 +1,623 @@
"""
MVUM (Motor Vehicle Use Map) legal access layer for OFFROUTE.
Queries USFS MVUM data from navi.db and provides rasterized access grids
indicating which roads/trails are open or closed to specific vehicle modes.
MVUM is motor-vehicle specific foot mode should skip this layer entirely.
"""
import re
import sqlite3
import warnings
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Literal
import numpy as np
# Path to navi.db
NAVI_DB_PATH = Path("/mnt/nav/navi.db")
def parse_date_range(date_str: str) -> List[Tuple[int, int, int, int]]:
"""
Parse MVUM date range strings like "05/01-11/30" or "06/15-10/15,12/01-03/31".
Returns list of (start_month, start_day, end_month, end_day) tuples.
Returns empty list if unparseable.
"""
if not date_str or date_str.strip() == "":
return []
ranges = []
# Split by comma for multi-period strings
for part in date_str.split(","):
part = part.strip()
# Match MM/DD-MM/DD pattern
match = re.match(r"(\d{1,2})/(\d{1,2})-(\d{1,2})/(\d{1,2})", part)
if match:
try:
sm, sd, em, ed = int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4))
if 1 <= sm <= 12 and 1 <= sd <= 31 and 1 <= em <= 12 and 1 <= ed <= 31:
ranges.append((sm, sd, em, ed))
except ValueError:
pass
return ranges
def is_date_in_range(month: int, day: int, ranges: List[Tuple[int, int, int, int]]) -> bool:
"""
Check if a given month/day falls within any of the date ranges.
Handles ranges that wrap around year end (e.g., 12/01-03/31).
"""
if not ranges:
return True # No ranges = assume open
date_num = month * 100 + day # Simple numeric comparison
for sm, sd, em, ed in ranges:
start_num = sm * 100 + sd
end_num = em * 100 + ed
if start_num <= end_num:
# Normal range (e.g., 05/01-11/30)
if start_num <= date_num <= end_num:
return True
else:
# Wrapping range (e.g., 12/01-03/31)
if date_num >= start_num or date_num <= end_num:
return True
return False
def check_access(
status_field: Optional[str],
dates_field: Optional[str],
seasonal: Optional[str],
check_date: Optional[Tuple[int, int]] = None
) -> Optional[bool]:
"""
Determine if a road/trail is open to a vehicle type.
Args:
status_field: Value of vehicle-class field (e.g., "open", null)
dates_field: Value of *_DATESOPEN field (e.g., "05/01-11/30")
seasonal: Value of SEASONAL field ("yearlong", "seasonal")
check_date: Optional (month, day) tuple to check against date ranges
Returns:
True = open
False = closed
None = no data (field not populated, defer to SYMBOL)
"""
if status_field is None or status_field.strip() == "":
return None # No data
status = status_field.strip().lower()
if status != "open":
return False # Explicitly closed or restricted
# Status is "open" - check seasonal restrictions
if check_date is not None:
month, day = check_date
# Parse date ranges
if dates_field:
ranges = parse_date_range(dates_field)
if ranges:
return is_date_in_range(month, day, ranges)
# No date field but seasonal = "yearlong" means always open
if seasonal and seasonal.strip().lower() == "yearlong":
return True
# Seasonal with no dates - assume open (data quality issue)
if seasonal and seasonal.strip().lower() == "seasonal":
warnings.warn(f"Seasonal road/trail with no DATESOPEN, assuming open")
return True
return True # Open with no date check
def get_mode_field(mode: str) -> Tuple[str, str]:
"""
Get the MVUM field names for a given travel mode.
Returns (status_field, dates_field) tuple.
"""
mode_mapping = {
"atv": ("atv", "atv_datesopen"),
"motorcycle": ("motorcycle", "motorcycle_datesopen"),
"mtb": ("e_bike_class1", "e_bike_class1_dur"), # Closest analog for e-bikes
"vehicle": ("highclearancevehicle", "highclearancevehicle_datesopen"),
"passenger": ("passengervehicle", "passengervehicle_datesopen"),
}
return mode_mapping.get(mode, ("highclearancevehicle", "highclearancevehicle_datesopen"))
def symbol_to_access(symbol: str, mode: str, maint_level: Optional[str] = None) -> Optional[bool]:
"""
Fallback: interpret SYMBOL field when per-vehicle-class fields are null.
MVUM SYMBOL meanings (roads):
1 = Open to all vehicles
2 = Open to highway legal vehicles only
3 = Road closed to motorized
4 = Road open seasonally
11 = Administrative use only
12 = Decommissioned
For trails, similar logic applies based on TRAILCLASS.
"""
if symbol is None:
return None
sym = str(symbol).strip()
# Symbol 1: Open to all
if sym == "1":
return True
# Symbol 2: Highway legal only
if sym == "2":
# ATVs/motorcycles typically not highway legal
if mode in ("atv", "motorcycle"):
return False
return True
# Symbol 3: Closed to motorized
if sym == "3":
return False
# Symbol 4: Seasonally open (assume open if no date check)
if sym == "4":
return True
# Symbol 11/12: Administrative/decommissioned = closed
if sym in ("11", "12"):
return False
# Unknown symbol - defer
return None
class MVUMReader:
"""
Reader for MVUM data from navi.db.
Queries roads and trails by bounding box and returns access grids.
"""
def __init__(self, db_path: Path = NAVI_DB_PATH):
self.db_path = db_path
self._conn = None
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
if not self.db_path.exists():
raise FileNotFoundError(f"navi.db not found at {self.db_path}")
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
# Load Spatialite extension if available
try:
self._conn.enable_load_extension(True)
self._conn.load_extension("mod_spatialite")
except Exception:
pass # Spatialite not available, will use manual bbox queries
return self._conn
def table_exists(self, table_name: str) -> bool:
"""Check if an MVUM table exists."""
conn = self._get_conn()
cur = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cur.fetchone() is not None
def query_roads_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM roads within a bounding box.
Returns list of dicts with access info for the given mode.
"""
if not self.table_exists("mvum_roads"):
return []
conn = self._get_conn()
# Query using bbox on geometry
# Since we don't have spatialite, we'll query all and filter in Python
# For production, consider pre-computing bbox columns
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, operationalmaintlevel, seasonal,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_roads
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
# Parse geometry to check bbox intersection
# The shape is stored as WKB blob
shape = row["shape"]
if shape is None:
continue
# Quick bbox check using geometry extent
# Since we don't have Spatialite functions, we'll include all
# and let the rasterization handle it
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
# Fallback to SYMBOL if no per-vehicle data
if access is None:
access = symbol_to_access(row["symbol"], mode, row["operationalmaintlevel"])
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"maint_level": row["operationalmaintlevel"],
"shape": shape,
})
return results
def query_trails_bbox(
self,
south: float, north: float, west: float, east: float,
mode: str = "atv",
check_date: Optional[Tuple[int, int]] = None
) -> List[Dict]:
"""
Query MVUM trails within a bounding box.
"""
if not self.table_exists("mvum_trails"):
return []
conn = self._get_conn()
cur = conn.execute("""
SELECT ogc_fid, id, name, symbol, seasonal, trailclass,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
passengervehicle, passengervehicle_datesopen,
e_bike_class1, e_bike_class1_dur,
shape
FROM mvum_trails
""")
status_field, dates_field = get_mode_field(mode)
results = []
for row in cur:
shape = row["shape"]
if shape is None:
continue
access = check_access(
row[status_field] if status_field in row.keys() else None,
row[dates_field] if dates_field in row.keys() else None,
row["seasonal"],
check_date
)
if access is None:
access = symbol_to_access(row["symbol"], mode)
if access is not None:
results.append({
"id": row["id"],
"name": row["name"],
"access": access,
"symbol": row["symbol"],
"trail_class": row["trailclass"],
"shape": shape,
})
return results
def query_nearest(
self,
lat: float, lon: float,
radius_m: float = 50,
table: str = "mvum_roads"
) -> Optional[Dict]:
"""
Query the nearest MVUM feature to a point.
Used for the places panel API.
"""
if not self.table_exists(table):
return None
conn = self._get_conn()
# Convert radius to degrees (approximate)
radius_deg = radius_m / 111000
# Query features in bbox around point
if table == "mvum_roads":
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
operationalmaintlevel, surfacetype, seasonal, jurisdiction,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_roads
LIMIT 1000
""")
else:
cur = conn.execute("""
SELECT ogc_fid, id, name, forestname, districtname, symbol,
seasonal, jurisdiction, trailclass, trailsystem,
passengervehicle, passengervehicle_datesopen,
highclearancevehicle, highclearancevehicle_datesopen,
atv, atv_datesopen, motorcycle, motorcycle_datesopen,
fourwd_gt50inches, fourwd_gt50_datesopen,
twowd_gt50inches, twowd_gt50_datesopen,
e_bike_class1, e_bike_class1_dur,
e_bike_class2, e_bike_class2_dur,
e_bike_class3, e_bike_class3_dur,
shape
FROM mvum_trails
LIMIT 1000
""")
# Find nearest feature
# This is a simplified approach - for production, use spatial index
try:
from shapely import wkb
from shapely.geometry import Point
query_point = Point(lon, lat)
nearest = None
min_dist = float('inf')
for row in cur:
try:
geom = wkb.loads(row["shape"])
dist = query_point.distance(geom)
if dist < min_dist and dist < radius_deg:
min_dist = dist
nearest = dict(row)
nearest["geometry"] = geom
except Exception:
continue
if nearest:
# Convert geometry to GeoJSON
nearest["geojson"] = nearest["geometry"].__geo_interface__
del nearest["geometry"]
del nearest["shape"]
return nearest
except ImportError:
warnings.warn("shapely not available for nearest query")
return None
def close(self):
if self._conn:
self._conn.close()
self._conn = None
def get_mvum_access_grid(
south: float, north: float, west: float, east: float,
target_shape: Tuple[int, int],
mode: Literal["foot", "mtb", "atv", "vehicle"] = "atv",
check_date: Optional[str] = None,
db_path: Path = NAVI_DB_PATH
) -> np.ndarray:
"""
Get MVUM access grid for pathfinding.
Args:
south, north, west, east: Bounding box (WGS84)
target_shape: (rows, cols) to match elevation grid
mode: Travel mode (foot skips MVUM entirely)
check_date: Optional "MM/DD" string for seasonal checking
db_path: Path to navi.db
Returns:
np.ndarray of uint8:
0 = no MVUM data (defer to existing trail/friction logic)
1 = road/trail is OPEN to this vehicle mode
255 = road/trail EXISTS but is CLOSED to this mode
"""
# Foot mode bypasses MVUM entirely
if mode == "foot":
return np.zeros(target_shape, dtype=np.uint8)
# Parse check_date if provided
parsed_date = None
if check_date:
match = re.match(r"(\d{1,2})/(\d{1,2})", check_date)
if match:
parsed_date = (int(match.group(1)), int(match.group(2)))
# Initialize output grid
grid = np.zeros(target_shape, dtype=np.uint8)
rows, cols = target_shape
# Pixel size
pixel_lat = (north - south) / rows
pixel_lon = (east - west) / cols
reader = MVUMReader(db_path)
try:
# Query roads and trails
roads = reader.query_roads_bbox(south, north, west, east, mode, parsed_date)
trails = reader.query_trails_bbox(south, north, west, east, mode, parsed_date)
# Rasterize features
try:
from shapely import wkb
for features in [roads, trails]:
for feat in features:
try:
geom = wkb.loads(feat["shape"])
# Get geometry bounds
minx, miny, maxx, maxy = geom.bounds
# Check if intersects our bbox
if maxx < west or minx > east or maxy < south or miny > north:
continue
# Rasterize line
value = 1 if feat["access"] else 255
# Simple line rasterization
if geom.geom_type in ("LineString", "MultiLineString"):
if geom.geom_type == "MultiLineString":
coords_list = [list(line.coords) for line in geom.geoms]
else:
coords_list = [list(geom.coords)]
for coords in coords_list:
for i in range(len(coords) - 1):
x1, y1 = coords[i]
x2, y2 = coords[i + 1]
# Convert to pixel coordinates
col1 = int((x1 - west) / pixel_lon)
row1 = int((north - y1) / pixel_lat)
col2 = int((x2 - west) / pixel_lon)
row2 = int((north - y2) / pixel_lat)
# Bresenham's line algorithm
_draw_line(grid, row1, col1, row2, col2, value)
except Exception as e:
continue
except ImportError:
warnings.warn("shapely not available, MVUM rasterization skipped")
finally:
reader.close()
return grid
def _draw_line(grid: np.ndarray, r1: int, c1: int, r2: int, c2: int, value: int):
"""Draw a line on the grid using Bresenham's algorithm."""
rows, cols = grid.shape
dr = abs(r2 - r1)
dc = abs(c2 - c1)
sr = 1 if r1 < r2 else -1
sc = 1 if c1 < c2 else -1
err = dr - dc
r, c = r1, c1
while True:
if 0 <= r < rows and 0 <= c < cols:
# Only overwrite if current value is 0 (no data) or we're marking closed
if grid[r, c] == 0 or value == 255:
grid[r, c] = value
if r == r2 and c == c2:
break
e2 = 2 * err
if e2 > -dc:
err -= dc
r += sr
if e2 < dr:
err += dr
c += sc
if __name__ == "__main__":
import sys
print("=" * 60)
print("MVUM Reader Test")
print("=" * 60)
reader = MVUMReader()
if not reader.table_exists("mvum_roads"):
print("ERROR: mvum_roads table not found in navi.db")
sys.exit(1)
# Test bbox query (Sawtooth NF area)
print("\n[1] Testing bbox query (Sawtooth NF area)...")
roads = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv"
)
print(f" Found {len(roads)} roads")
open_count = sum(1 for r in roads if r["access"])
closed_count = sum(1 for r in roads if not r["access"])
print(f" Open to ATV: {open_count}")
print(f" Closed to ATV: {closed_count}")
# Test with seasonal date
print("\n[2] Testing with date check (July 15)...")
roads_summer = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(7, 15)
)
open_summer = sum(1 for r in roads_summer if r["access"])
print(f" Open to ATV on 07/15: {open_summer}")
print("\n[3] Testing with date check (January 15)...")
roads_winter = reader.query_roads_bbox(
south=43.5, north=44.0, west=-115.0, east=-114.0,
mode="atv",
check_date=(1, 15)
)
open_winter = sum(1 for r in roads_winter if r["access"])
print(f" Open to ATV on 01/15: {open_winter}")
# Test grid generation
print("\n[4] Testing grid generation...")
grid = get_mvum_access_grid(
south=43.5, north=44.0, west=-115.0, east=-114.0,
target_shape=(500, 1000),
mode="atv"
)
print(f" Grid shape: {grid.shape}")
print(f" No data (0): {np.sum(grid == 0)}")
print(f" Open (1): {np.sum(grid == 1)}")
print(f" Closed (255): {np.sum(grid == 255)}")
reader.close()
print("\nDone.")

414
lib/offroute/prototype.py Executable file
View file

@ -0,0 +1,414 @@
#!/usr/bin/env python3
"""
OFFROUTE Phase O3a Prototype
Validates trail burn-in integration with the MCP pathfinder.
The path should actively seek out trails and roads when nearby.
Compares paths with and without trail burn-in to show the benefit
of trail-seeking behavior.
"""
import json
import time
import sys
from pathlib import Path
import numpy as np
from skimage.graph import MCP_Geometric
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from lib.offroute.dem import DEMReader
from lib.offroute.cost import compute_cost_grid
from lib.offroute.friction import FrictionReader, friction_to_multiplier
from lib.offroute.barriers import BarrierReader, DEFAULT_BARRIERS_PATH
from lib.offroute.trails import TrailReader, DEFAULT_TRAILS_PATH
# Test bounding box - Idaho area
BBOX = {
"south": 42.21,
"north": 42.60,
"west": -114.76,
"east": -113.79,
}
# Start point: wilderness area away from roads
START_LAT = 42.35
START_LON = -114.60
# End point: near Twin Falls (has roads/trails)
END_LAT = 42.55
END_LON = -114.20
# Output files
OUTPUT_PATH_WITH_TRAILS = Path("/opt/recon/data/offroute-test-trails.geojson")
OUTPUT_PATH_NO_TRAILS = Path("/opt/recon/data/offroute-test-no-trails.geojson")
# Memory limit in GB
MEMORY_LIMIT_GB = 12
def check_memory_usage():
"""Check current memory usage and abort if over limit."""
try:
import psutil
process = psutil.Process()
mem_gb = process.memory_info().rss / (1024**3)
if mem_gb > MEMORY_LIMIT_GB:
print(f"ERROR: Memory usage {mem_gb:.1f}GB exceeds {MEMORY_LIMIT_GB}GB limit")
sys.exit(1)
return mem_gb
except ImportError:
return 0
def run_pathfinder(
elevation: np.ndarray,
meta: dict,
friction_mult: np.ndarray,
trails: np.ndarray,
barriers: np.ndarray,
use_trails: bool,
start_row: int,
start_col: int,
end_row: int,
end_col: int,
dem_reader: DEMReader,
) -> dict:
"""Run the MCP pathfinder with given parameters."""
# Compute cost grid
cost = compute_cost_grid(
elevation,
cell_size_m=meta["cell_size_m"],
friction=friction_mult,
trails=trails if use_trails else None,
barriers=barriers,
boundary_mode="pragmatic",
)
# Run MCP
mcp = MCP_Geometric(cost, fully_connected=True)
cumulative_costs, traceback = mcp.find_costs([(start_row, start_col)])
end_cost = cumulative_costs[end_row, end_col]
if np.isinf(end_cost):
return {
"success": False,
"reason": "No path found (blocked by impassable terrain)",
}
# Traceback path
path_indices = mcp.traceback((end_row, end_col))
# Convert to coordinates and collect stats
coordinates = []
elevations = []
trail_values = []
for row, col in path_indices:
lat, lon = dem_reader.pixel_to_latlon(row, col, meta)
elev = elevation[row, col]
trail_val = trails[row, col] if trails is not None else 0
coordinates.append([lon, lat])
elevations.append(elev)
trail_values.append(trail_val)
# Compute distance
total_distance_m = 0
for i in range(1, len(coordinates)):
lon1, lat1 = coordinates[i-1]
lon2, lat2 = coordinates[i]
R = 6371000
dlat = np.radians(lat2 - lat1)
dlon = np.radians(lon2 - lon1)
a = np.sin(dlat/2)**2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
total_distance_m += R * c
# Elevation stats
elev_arr = np.array(elevations)
elev_diff = np.diff(elev_arr)
elev_gain = np.sum(elev_diff[elev_diff > 0])
elev_loss = np.sum(np.abs(elev_diff[elev_diff < 0]))
# Trail stats
trail_arr = np.array(trail_values)
road_cells = np.sum(trail_arr == 5)
track_cells = np.sum(trail_arr == 15)
trail_cells = np.sum(trail_arr == 25)
off_trail_cells = np.sum(trail_arr == 0)
on_trail_cells = road_cells + track_cells + trail_cells
total_cells = len(trail_arr)
return {
"success": True,
"coordinates": coordinates,
"total_time_seconds": float(end_cost),
"total_time_minutes": float(end_cost / 60),
"total_distance_m": float(total_distance_m),
"total_distance_km": float(total_distance_m / 1000),
"elevation_gain_m": float(elev_gain),
"elevation_loss_m": float(elev_loss),
"min_elevation_m": float(np.min(elev_arr)),
"max_elevation_m": float(np.max(elev_arr)),
"cell_count": total_cells,
"road_cells": int(road_cells),
"track_cells": int(track_cells),
"trail_cells": int(trail_cells),
"off_trail_cells": int(off_trail_cells),
"on_trail_pct": float(100 * on_trail_cells / total_cells) if total_cells > 0 else 0,
}
def main():
print("=" * 80)
print("OFFROUTE Phase O3a Prototype (Trail Burn-In)")
print("=" * 80)
t0 = time.time()
# Check for required rasters
if not DEFAULT_BARRIERS_PATH.exists():
print(f"\nERROR: Barrier raster not found at {DEFAULT_BARRIERS_PATH}")
sys.exit(1)
if not DEFAULT_TRAILS_PATH.exists():
print(f"\nERROR: Trails raster not found at {DEFAULT_TRAILS_PATH}")
sys.exit(1)
# Step 1: Load elevation data
print(f"\n[1] Loading DEM for bbox: {BBOX}")
dem_reader = DEMReader()
elevation, meta = dem_reader.get_elevation_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
)
print(f" Elevation grid shape: {elevation.shape}")
print(f" Cell count: {elevation.size:,}")
print(f" Cell size: {meta['cell_size_m']:.1f} m")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 2: Load friction data
print(f"\n[2] Loading WorldCover friction layer...")
friction_reader = FrictionReader()
friction_raw = friction_reader.get_friction_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
friction_mult = friction_to_multiplier(friction_raw)
print(f" Friction grid shape: {friction_raw.shape}")
print(f" Water/impassable cells: {np.sum(np.isinf(friction_mult)):,}")
# Step 3: Load barrier data
print(f"\n[3] Loading PAD-US barrier layer...")
barrier_reader = BarrierReader()
barriers = barrier_reader.get_barrier_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
closed_cells = np.sum(barriers == 255)
print(f" Barrier grid shape: {barriers.shape}")
print(f" Closed/restricted cells: {closed_cells:,}")
# Step 4: Load trails data
print(f"\n[4] Loading OSM trails layer...")
trail_reader = TrailReader()
trails = trail_reader.get_trails_grid(
south=BBOX["south"],
north=BBOX["north"],
west=BBOX["west"],
east=BBOX["east"],
target_shape=elevation.shape
)
road_cells = np.sum(trails == 5)
track_cells = np.sum(trails == 15)
trail_cells = np.sum(trails == 25)
print(f" Trails grid shape: {trails.shape}")
print(f" Road cells: {road_cells:,}")
print(f" Track cells: {track_cells:,}")
print(f" Trail cells: {trail_cells:,}")
print(f" Total trail coverage: {100*(road_cells+track_cells+trail_cells)/trails.size:.2f}%")
mem = check_memory_usage()
if mem > 0:
print(f" Memory usage: {mem:.1f} GB")
# Step 5: Convert start/end to pixel coordinates
print(f"\n[5] Converting coordinates...")
start_row, start_col = dem_reader.latlon_to_pixel(START_LAT, START_LON, meta)
end_row, end_col = dem_reader.latlon_to_pixel(END_LAT, END_LON, meta)
print(f" Start: ({START_LAT}, {START_LON}) -> pixel ({start_row}, {start_col})")
print(f" End: ({END_LAT}, {END_LON}) -> pixel ({end_row}, {end_col})")
# Validate coordinates
rows, cols = elevation.shape
if not (0 <= start_row < rows and 0 <= start_col < cols):
print(f"ERROR: Start point outside grid bounds")
sys.exit(1)
if not (0 <= end_row < rows and 0 <= end_col < cols):
print(f"ERROR: End point outside grid bounds")
sys.exit(1)
# Step 6: Run pathfinder WITH trails
print(f"\n[6] Running pathfinder WITH trail burn-in...")
t6a = time.time()
result_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=True,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t6b = time.time()
print(f" Completed in {t6b - t6a:.1f}s")
# Step 7: Run pathfinder WITHOUT trails
print(f"\n[7] Running pathfinder WITHOUT trail burn-in...")
t7a = time.time()
result_no_trails = run_pathfinder(
elevation, meta, friction_mult, trails, barriers,
use_trails=False,
start_row=start_row, start_col=start_col,
end_row=end_row, end_col=end_col,
dem_reader=dem_reader,
)
t7b = time.time()
print(f" Completed in {t7b - t7a:.1f}s")
# Step 8: Save GeoJSON outputs
print(f"\n[8] Saving GeoJSON outputs...")
OUTPUT_PATH_WITH_TRAILS.parent.mkdir(parents=True, exist_ok=True)
if result_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_with_trails",
"phase": "O3a",
"trail_burn_in": True,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_trails["coordinates"],
}
}
with open(OUTPUT_PATH_WITH_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_WITH_TRAILS}")
if result_no_trails["success"]:
geojson = {
"type": "Feature",
"properties": {
"type": "offroute_no_trails",
"phase": "O3a",
"trail_burn_in": False,
"start": {"lat": START_LAT, "lon": START_LON},
"end": {"lat": END_LAT, "lon": END_LON},
**{k: v for k, v in result_no_trails.items() if k not in ["success", "coordinates"]},
},
"geometry": {
"type": "LineString",
"coordinates": result_no_trails["coordinates"],
}
}
with open(OUTPUT_PATH_NO_TRAILS, "w") as f:
json.dump(geojson, f, indent=2)
print(f" Saved: {OUTPUT_PATH_NO_TRAILS}")
t_total = time.time()
# Final report
print(f"\n" + "=" * 80)
print("SIDE-BY-SIDE COMPARISON: Trail Burn-In Effect")
print("=" * 80)
if result_trails["success"] and result_no_trails["success"]:
print(f"{'Metric':<25} {'WITH TRAILS':<20} {'WITHOUT TRAILS':<20} {'Delta':<15}")
print("-" * 80)
metrics = [
("Distance (km)", "total_distance_km", ".2f"),
("Effort time (min)", "total_time_minutes", ".1f"),
("Cell count", "cell_count", "d"),
("Elevation gain (m)", "elevation_gain_m", ".0f"),
("On-trail %", "on_trail_pct", ".1f"),
("Road cells", "road_cells", "d"),
("Track cells", "track_cells", "d"),
("Trail cells", "trail_cells", "d"),
]
for label, key, fmt in metrics:
val_with = result_trails[key]
val_without = result_no_trails[key]
if isinstance(val_with, int):
delta = val_with - val_without
delta_str = f"{delta:+d}"
else:
delta = val_with - val_without
delta_str = f"{delta:+.2f}"
print(f"{label:<25} {val_with:<20{fmt}} {val_without:<20{fmt}} {delta_str:<15}")
# Analysis
print(f"\n" + "-" * 80)
print("ANALYSIS")
print("-" * 80)
time_saved = result_no_trails["total_time_minutes"] - result_trails["total_time_minutes"]
if time_saved > 0:
print(f"Trail burn-in saves {time_saved:.1f} minutes ({100*time_saved/result_no_trails['total_time_minutes']:.1f}% faster)")
elif time_saved < 0:
print(f"Trail burn-in adds {-time_saved:.1f} minutes (path seeks trails even if longer)")
on_trail_with = result_trails["on_trail_pct"]
on_trail_without = result_no_trails["on_trail_pct"]
if on_trail_with > on_trail_without:
print(f"Trail burn-in increases on-trail travel: {on_trail_without:.1f}% → {on_trail_with:.1f}%")
else:
print(f"Both paths have similar on-trail percentage")
else:
if not result_trails["success"]:
print(f"WITH TRAILS: FAILED - {result_trails.get('reason', 'unknown')}")
if not result_no_trails["success"]:
print(f"WITHOUT TRAILS: FAILED - {result_no_trails.get('reason', 'unknown')}")
print(f"\n" + "-" * 80)
print(f"Total wall time: {t_total - t0:.1f}s")
# Cleanup
dem_reader.close()
friction_reader.close()
barrier_reader.close()
trail_reader.close()
print("\nPrototype completed.")
if __name__ == "__main__":
main()

1682
lib/offroute/router.py Normal file

File diff suppressed because it is too large Load diff

174
lib/offroute/trails.py Normal file
View file

@ -0,0 +1,174 @@
"""
Trail corridor reader for OFFROUTE.
Provides access to the OSM-derived trail raster for pathfinding.
Trail values replace WorldCover friction where trails exist.
Raster values:
0 = no trail (use WorldCover friction)
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
import numpy as np
from pathlib import Path
from typing import Tuple, Optional
try:
import rasterio
from rasterio.windows import from_bounds
from rasterio.enums import Resampling
except ImportError:
raise ImportError("rasterio is required for trails layer support")
# Default path to the trails raster
DEFAULT_TRAILS_PATH = Path("/mnt/nav/worldcover/trails.tif")
# Trail value to friction multiplier mapping
TRAIL_FRICTION_MAP = {
5: 0.1, # road
15: 0.3, # track
25: 0.5, # foot trail
}
class TrailReader:
"""Reader for OSM-derived trail corridor raster."""
def __init__(self, trails_path: Path = DEFAULT_TRAILS_PATH):
self.trails_path = trails_path
self._dataset = None
def _open(self):
"""Lazy open the dataset."""
if self._dataset is None:
if not self.trails_path.exists():
raise FileNotFoundError(
f"Trails raster not found at {self.trails_path}. "
f"Run the Phase B rasterization script first."
)
self._dataset = rasterio.open(self.trails_path)
return self._dataset
def get_trails_grid(
self,
south: float,
north: float,
west: float,
east: float,
target_shape: Tuple[int, int]
) -> np.ndarray:
"""
Get trail values for a bounding box, resampled to target shape.
Args:
south, north, west, east: Bounding box coordinates (WGS84)
target_shape: (rows, cols) to resample to (matches elevation grid)
Returns:
np.ndarray of uint8 trail values:
0 = no trail
5 = road (0.1× friction)
15 = track (0.3× friction)
25 = foot trail (0.5× friction)
"""
ds = self._open()
# Create a window from the bounding box
window = from_bounds(west, south, east, north, ds.transform)
# Read with resampling to target shape
# Use nearest neighbor to preserve discrete values
trails = ds.read(
1,
window=window,
out_shape=target_shape,
resampling=Resampling.nearest
)
return trails
def sample_point(self, lat: float, lon: float) -> int:
"""Sample trail value at a single point."""
ds = self._open()
# Get pixel coordinates
row, col = ds.index(lon, lat)
# Check bounds
if row < 0 or row >= ds.height or col < 0 or col >= ds.width:
return 0 # Out of bounds = no trail
# Read single pixel
window = rasterio.windows.Window(col, row, 1, 1)
value = ds.read(1, window=window)
return int(value[0, 0])
def close(self):
"""Close the dataset."""
if self._dataset is not None:
self._dataset.close()
self._dataset = None
def trails_to_friction(trails: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert trail values to friction multipliers.
Args:
trails: uint8 array of trail values (0, 5, 15, or 25)
Returns:
Tuple of:
- friction: float32 array of friction multipliers
- has_trail: bool array indicating where trails exist
"""
friction = np.ones_like(trails, dtype=np.float32)
has_trail = trails > 0
# Apply friction values where trails exist
friction[trails == 5] = 0.1 # road
friction[trails == 15] = 0.3 # track
friction[trails == 25] = 0.5 # foot trail
return friction, has_trail
if __name__ == "__main__":
print("Testing TrailReader...")
if not DEFAULT_TRAILS_PATH.exists():
print(f"Trails raster not found at {DEFAULT_TRAILS_PATH}")
print("Run Phase B rasterization first.")
exit(1)
reader = TrailReader()
# Test point sampling - Twin Falls downtown (should have roads)
test_lat, test_lon = 42.563, -114.461
trail_value = reader.sample_point(test_lat, test_lon)
print(f"\nTwin Falls ({test_lat}, {test_lon}): trail value = {trail_value}")
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(trail_value, "unknown")
print(f" Type: {label}")
# Test grid read for test bbox
trails = reader.get_trails_grid(
south=42.21, north=42.60, west=-114.76, east=-113.79,
target_shape=(400, 1000)
)
print(f"\nGrid test shape: {trails.shape}")
unique, counts = np.unique(trails, return_counts=True)
print("Value distribution:")
for v, c in zip(unique, counts):
pct = 100 * c / trails.size
label = {0: "no trail", 5: "road", 15: "track", 25: "trail"}.get(v, f"unknown({v})")
print(f" {label}: {c:,} pixels ({pct:.2f}%)")
# Test conversion to friction
friction, has_trail = trails_to_friction(trails)
print(f"\nTrail coverage: {100 * np.sum(has_trail) / trails.size:.2f}%")
print(f"Friction range (on trails): {friction[has_trail].min():.1f} - {friction[has_trail].max():.1f}")
reader.close()
print("\nTrailReader test complete.")

View file

@ -1,323 +0,0 @@
"""
RECON PeerTube Writer
Authenticated PeerTube API client for pushing domain category assignments.
Uses OAuth2 password grant, caches tokens, refreshes on 401.
Config keys used:
peertube.api_url internal PeerTube URL (http://192.168.1.170:9000)
peertube.host_header Host header for API requests (stream.echo6.co)
peertube.username PeerTube admin username
peertube.password_env env var name holding the password
peertube.rate_limit_delay delay between API calls (seconds)
"""
import json
import os
import time
import requests as http_requests
from .recon_domains import DOMAIN_CATEGORY_MAP
from .utils import setup_logging
logger = setup_logging('recon.peertube_writer')
TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json'
def _get_peertube_config(config):
"""Extract PeerTube writer config with defaults."""
pt = config.get('peertube', {})
return {
'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')),
'host_header': pt.get('host_header', 'stream.echo6.co'),
'username': pt.get('username', 'root'),
'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'),
'rate_limit_delay': pt.get('writer_rate_limit', 0.1),
}
def _load_cached_token():
"""Load cached OAuth token from disk."""
if os.path.exists(TOKEN_CACHE_PATH):
try:
with open(TOKEN_CACHE_PATH, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return None
def _save_token(token_data):
"""Save OAuth token to disk cache."""
os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True)
with open(TOKEN_CACHE_PATH, 'w') as f:
json.dump(token_data, f)
def _get_oauth_client(api_url, host_header):
"""Get PeerTube OAuth client credentials.
Args:
api_url: Base API URL
host_header: Host header value
Returns:
(client_id, client_secret) tuple
"""
resp = http_requests.get(
f"{api_url}/api/v1/oauth-clients/local",
headers={'Host': host_header},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data['client_id'], data['client_secret']
def _get_token(api_url, host_header, username, password, client_id, client_secret):
"""Obtain OAuth2 access token via password grant.
Args:
api_url: Base API URL
host_header: Host header value
username: PeerTube username
password: PeerTube password
client_id: OAuth client ID
client_secret: OAuth client secret
Returns:
Token data dict with access_token, refresh_token, etc.
"""
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'password',
'username': username,
'password': password,
},
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
token_data['client_id'] = client_id
token_data['client_secret'] = client_secret
_save_token(token_data)
return token_data
def _refresh_token(api_url, host_header, token_data):
"""Refresh an expired access token.
Returns:
New token data dict, or None on failure.
"""
try:
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': token_data['client_id'],
'client_secret': token_data['client_secret'],
'grant_type': 'refresh_token',
'refresh_token': token_data['refresh_token'],
},
timeout=30,
)
resp.raise_for_status()
new_data = resp.json()
new_data['client_id'] = token_data['client_id']
new_data['client_secret'] = token_data['client_secret']
_save_token(new_data)
return new_data
except Exception as e:
logger.warning(f"Token refresh failed: {e}")
return None
def _ensure_token(config):
"""Ensure we have a valid OAuth token. Returns token data dict.
Tries cached token first, then obtains a new one.
"""
pt = _get_peertube_config(config)
password = os.environ.get(pt['password_env'], '')
if not password:
raise ValueError(f"PeerTube password not set in env var {pt['password_env']}")
# Try cached token
token_data = _load_cached_token()
if token_data and 'access_token' in token_data:
return token_data
# Get fresh token
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
return _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
def _api_request(method, path, config, token_data, **kwargs):
"""Make an authenticated PeerTube API request with auto-refresh on 401.
Args:
method: HTTP method ('GET', 'PUT', etc.)
path: API path (e.g. '/api/v1/videos/{uuid}')
config: RECON config dict
token_data: Current token data dict
**kwargs: Additional requests kwargs (json, data, etc.)
Returns:
(response, token_data) tuple token_data may be refreshed.
"""
pt = _get_peertube_config(config)
url = f"{pt['api_url']}{path}"
headers = {
'Host': pt['host_header'],
'Authorization': f"Bearer {token_data['access_token']}",
}
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
if resp.status_code == 401:
# Try refresh
new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data)
if new_token:
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
else:
# Full re-auth
password = os.environ.get(pt['password_env'], '')
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
new_token = _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
return resp, token_data
def push_category(video_uuid, category_id, config, token_data=None):
"""Push a category assignment to a single PeerTube video.
Args:
video_uuid: PeerTube video UUID
category_id: Category ID (100-117)
config: RECON config dict
token_data: Optional pre-fetched token data
Returns:
(success: bool, token_data: dict) tuple
"""
if token_data is None:
token_data = _ensure_token(config)
resp, token_data = _api_request(
'PUT',
f'/api/v1/videos/{video_uuid}',
config,
token_data,
json={'category': category_id},
)
if resp.status_code in (200, 204):
return True, token_data
else:
logger.warning(f"Failed to push category for {video_uuid}: "
f"HTTP {resp.status_code}{resp.text[:200]}")
return False, token_data
def extract_uuid(catalogue_path):
"""Extract PeerTube video UUID from catalogue path.
Catalogue paths for PeerTube videos look like:
https://stream.echo6.co/w/UUID
Args:
catalogue_path: catalogue.path value
Returns:
UUID string or None
"""
if not catalogue_path:
return None
if '/w/' in catalogue_path:
return catalogue_path.rsplit('/w/', 1)[-1]
return None
def push_pending(db, config, limit=None):
"""Push all assigned-but-unpushed domain categories to PeerTube.
Args:
db: StatusDB instance
config: RECON config dict
limit: Optional max number of items to push
Returns:
(success_count, fail_count) tuple
"""
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
if not items:
logger.info("No unpushed assignments to push")
return (0, 0)
pt = _get_peertube_config(config)
delay = pt['rate_limit_delay']
SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail
logger.info(f"Pushing {len(items)} category assignments to PeerTube")
token_data = _ensure_token(config)
success = 0
failed = 0
for item in items:
file_hash = item['hash']
domain = item.get('recon_domain')
catalogue_path = item.get('catalogue_path', '')
if not domain or domain not in DOMAIN_CATEGORY_MAP:
logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping")
failed += 1
continue
uuid = extract_uuid(catalogue_path)
if not uuid:
logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'")
failed += 1
continue
category_id = DOMAIN_CATEGORY_MAP[domain]
ok, token_data = push_category(uuid, category_id, config, token_data)
if ok:
db.set_peertube_pushed(file_hash)
success += 1
else:
failed += 1
# Abort on systemic failure (e.g. plugin not installed, auth broken)
if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD:
logger.error(f"Aborting push: first {failed} items all failed — "
f"check plugin installation and PeerTube API config")
break
time.sleep(delay)
logger.info(f"Push complete: {success} succeeded, {failed} failed")
return (success, failed)

View file

@ -77,10 +77,73 @@ def _text_hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _flatten_table(table_el):
"""Convert a <table> element to pipe-delimited text.
Each <tr> becomes a row with cells joined by ' | '.
Returns the formatted table as a string with blank lines around it.
"""
rows = []
for tr in table_el.iter('tr'):
cells = []
for cell in tr:
if cell.tag in ('td', 'th'):
cell_text = (cell.text_content() or '').strip()
# Collapse internal whitespace in each cell
cell_text = re.sub(r'\s+', ' ', cell_text)
if cell_text:
cells.append(cell_text)
if cells:
rows.append(' | '.join(cells))
if not rows:
return ''
return '\n'.join(rows)
def _preprocess_tree(doc):
"""Pre-process HTML tree to add delimiters before text_content() flattens it.
Handles: <table>, <br>, <li>, <dt>, <dd> -- elements that lxml's
text_content() would concatenate without any separators.
"""
from lxml import etree
# 1. Replace <table> elements with their pipe-delimited text
for table in list(doc.iter('table')):
formatted = _flatten_table(table)
if formatted:
replacement = etree.Element('div')
replacement.text = '\n\n' + formatted + '\n\n'
parent = table.getparent()
if parent is not None:
parent.replace(table, replacement)
else:
table.drop_tree()
# 2. <br> -> inject newline
for br in list(doc.iter('br')):
br.tail = '\n' + (br.tail or '')
# 3. <li> -> inject newline + "- " prefix
for li in list(doc.iter('li')):
li.text = '- ' + (li.text or '')
li.tail = '\n' + (li.tail or '')
# 4. <dt> -> inject newline before
for dt in list(doc.iter('dt')):
dt.tail = '\n' + (dt.tail or '')
# 5. <dd> -> inject newline + indent
for dd in list(doc.iter('dd')):
dd.text = ' ' + (dd.text or '')
dd.tail = '\n' + (dd.tail or '')
def _html_to_text(html_bytes):
"""Convert HTML bytes to clean text via lxml.
Strips nav, footer, script, style elements. Decodes entities.
Pre-processes tables, lists, and line breaks for proper delimiters.
Normalizes whitespace.
"""
try:
@ -93,6 +156,9 @@ def _html_to_text(html_bytes):
for el in doc.iter(tag):
el.drop_tree()
# Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes
_preprocess_tree(doc)
# Extract text
text = doc.text_content()

View file

@ -1,46 +0,0 @@
"""
RECON Domain Taxonomy
Single source of truth for the 18 knowledge domains and their PeerTube
category ID mappings. IDs 100-117 are registered via the
peertube-plugin-recon-domains plugin.
Import VALID_DOMAINS from here instead of defining local sets.
"""
DOMAIN_CATEGORY_MAP = {
'Agriculture & Livestock': 100,
'Civil Organization': 101,
'Communications': 102,
'Food Systems': 103,
'Foundational Skills': 104,
'Logistics': 105,
'Medical': 106,
'Navigation': 107,
'Operations': 108,
'Power Systems': 109,
'Preservation & Storage': 110,
'Security': 111,
'Shelter & Construction': 112,
'Technology': 113,
'Tools & Equipment': 114,
'Vehicles': 115,
'Water Systems': 116,
'Wilderness Skills': 117,
}
VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys())
CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()}
# Channels whose tiebreaker is skipped because their content is non-topical
# (catch-alls, miscellany dumps, etc.). Items in these channels with tied
# domain counts go straight to tied_manual without channel-context tiebreaker.
#
# This is intentionally a hardcoded explicit list, not a size threshold.
# Adding a channel here requires an explicit decision — only add channels
# that are genuinely non-topical catch-alls where channel-wide concept
# aggregation would produce meaningless noise.
MEGA_CHANNEL_SKIP_LIST = {
'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence
}

View file

@ -124,22 +124,6 @@ class StatusDB:
except Exception:
pass # column already exists
# Migration: domain assignment columns for PeerTube categorization
for col, coltype in [
('recon_domain', 'TEXT'),
('recon_domain_status', 'TEXT'),
('recon_domain_assigned_at', 'TEXT'),
('peertube_category_pushed_at', 'TEXT'),
]:
try:
conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}")
except Exception:
pass # column already exists
try:
conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)")
except Exception:
pass # index already exists
# Stream B: file_operations + duplicate_review tables
conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations (
@ -464,91 +448,7 @@ class StatusDB:
conn.commit()
# ── Domain Assignment Helpers ──────────────────<E29480><E29480>─────────────
def get_domain_assignment(self, file_hash):
"""Get domain assignment for a document.
Returns:
(recon_domain, recon_domain_status) tuple, or (None, None) if not set.
"""
conn = self._get_conn()
row = conn.execute(
"SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?",
(file_hash,)
).fetchone()
if row:
return (row['recon_domain'], row['recon_domain_status'])
return (None, None)
def set_domain_assignment(self, file_hash, domain, status):
"""Set domain assignment and status for a document."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET recon_domain = ?, recon_domain_status = ?, "
"recon_domain_assigned_at = ? WHERE hash = ?",
(domain, status, datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def set_peertube_pushed(self, file_hash):
"""Mark a document's category as pushed to PeerTube."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?",
(datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def get_unpushed_assignments(self):
"""Get documents with domain assignments not yet pushed to PeerTube.
Only returns stream.echo6.co source documents.
"""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"""SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain IS NOT NULL
AND d.peertube_category_pushed_at IS NULL
AND c.source = 'stream.echo6.co'
ORDER BY d.recon_domain_assigned_at""",
).fetchall()]
def get_items_by_domain_status(self, status, limit=None):
"""Get documents by domain assignment status."""
conn = self._get_conn()
q = """SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain_status = ?
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
def get_domain_status_counts(self):
"""Get counts of documents by domain assignment status."""
conn = self._get_conn()
return {row['recon_domain_status']: row['cnt']
for row in conn.execute(
"SELECT recon_domain_status, COUNT(*) as cnt "
"FROM documents WHERE recon_domain_status IS NOT NULL "
"GROUP BY recon_domain_status"
).fetchall()}
def get_domain_distribution(self):
"""Get counts of documents per assigned domain."""
conn = self._get_conn()
return {row['recon_domain']: row['cnt']
for row in conn.execute(
"SELECT recon_domain, COUNT(*) as cnt "
"FROM documents WHERE recon_domain IS NOT NULL "
"GROUP BY recon_domain ORDER BY cnt DESC"
).fetchall()}
# ── Scraper Job Helpers ─────────────────────────────────────
# ── Scraper Job Helpers ─────────────────────────────────────
def get_pending_scrape_job(self):
"""Fetch the oldest pending scrape job."""

View file

@ -1,52 +0,0 @@
# peertube-plugin-recon-domains
Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis.
## Category Mapping
| ID | Domain |
|-----|---------------------------|
| 100 | Agriculture & Livestock |
| 101 | Civil Organization |
| 102 | Communications |
| 103 | Food Systems |
| 104 | Foundational Skills |
| 105 | Logistics |
| 106 | Medical |
| 107 | Navigation |
| 108 | Operations |
| 109 | Power Systems |
| 110 | Preservation & Storage |
| 111 | Security |
| 112 | Shelter & Construction |
| 113 | Technology |
| 114 | Tools & Equipment |
| 115 | Vehicles |
| 116 | Water Systems |
| 117 | Wilderness Skills |
Built-in PeerTube categories (IDs 1-18) are not modified.
## Install
```bash
# Copy plugin to PeerTube storage
cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/
# Register plugin via API or admin UI
# Admin > Plugins > Install > peertube-plugin-recon-domains
# Restart PeerTube
sudo systemctl restart peertube
```
## Uninstall
Remove the plugin via PeerTube admin UI or:
```bash
rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains
sudo systemctl restart peertube
```
Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled.

View file

@ -1,32 +0,0 @@
async function register ({ videoCategoryManager }) {
const reconDomains = {
100: 'Agriculture & Livestock',
101: 'Civil Organization',
102: 'Communications',
103: 'Food Systems',
104: 'Foundational Skills',
105: 'Logistics',
106: 'Medical',
107: 'Navigation',
108: 'Operations',
109: 'Power Systems',
110: 'Preservation & Storage',
111: 'Security',
112: 'Shelter & Construction',
113: 'Technology',
114: 'Tools & Equipment',
115: 'Vehicles',
116: 'Water Systems',
117: 'Wilderness Skills'
}
for (const [id, label] of Object.entries(reconDomains)) {
videoCategoryManager.addConstant(parseInt(id), label)
}
}
async function unregister () {
return
}
module.exports = { register, unregister }

View file

@ -1,21 +0,0 @@
{
"name": "peertube-plugin-recon-domains",
"version": "1.0.0",
"description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)",
"engine": {
"peertube": ">=6.0.0"
},
"keywords": [
"peertube",
"plugin"
],
"homepage": "https://forge.echo6.co/matt/recon",
"author": "Echo6",
"license": "MIT",
"library": "./main.js",
"staticDirs": {},
"css": [],
"clientScripts": [],
"translations": {},
"bugs": "https://forge.echo6.co/matt/recon/issues"
}

170
recon.py
View file

@ -863,166 +863,6 @@ def cmd_ingest(args):
return 0
def cmd_assign_categories(args):
"""Assign RECON domains to PeerTube videos and push categories."""
from qdrant_client import QdrantClient
from lib.domain_assigner import compute_assignment, run_tiebreaker_pass
from lib.peertube_writer import push_pending, extract_uuid
from lib.recon_domains import DOMAIN_CATEGORY_MAP
config = get_config()
db = StatusDB()
dry_run = args.dry_run
limit = args.limit
if args.backfill:
# Pass 1: assign domains to all complete stream docs with no assignment
# or that previously got needs_reprocess
conn = db._get_conn()
q = """SELECT d.hash FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.status = 'complete'
AND (d.recon_domain IS NULL
OR d.recon_domain_status = 'needs_reprocess')
AND c.source = 'stream.echo6.co'
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
rows = conn.execute(q).fetchall()
hashes = [r['hash'] for r in rows]
if not hashes:
print("No unassigned complete stream documents found")
return 0
print(f"Backfill: processing {len(hashes)} documents" +
(" [DRY RUN]" if dry_run else ""))
# Create one Qdrant client for the entire backfill
qdrant = QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0}
for i, file_hash in enumerate(hashes):
try:
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
stats[status] = stats.get(status, 0) + 1
if not dry_run:
db.set_domain_assignment(file_hash, domain, status)
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(hashes)}")
except Exception as e:
stats['errors'] += 1
logger.warning(f" Assignment error for {file_hash[:12]}: {e}")
print(f"\nBackfill results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.tiebreaker_pass:
if dry_run:
items = db.get_items_by_domain_status('tied_pass_1')
print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]")
return 0
stats = run_tiebreaker_pass(db, config)
print(f"\nTiebreaker results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.push_pending:
if dry_run:
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
print(f"Push pending: {len(items)} items would be pushed [DRY RUN]")
return 0
success, failed = push_pending(db, config, limit=limit)
print(f"\nPush results: {success} succeeded, {failed} failed")
return 0
elif args.reprocess_missing:
items = db.get_items_by_domain_status('needs_reprocess', limit=limit)
if not items:
print("No items with needs_reprocess status")
return 0
print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else ""))
requeued = 0
for item in items:
file_hash = item['hash']
if dry_run:
print(f" Would reprocess: {file_hash[:12]}{item.get('filename', '?')}")
requeued += 1
continue
# Reset document status to allow re-processing
conn = db._get_conn()
conn.execute(
"""UPDATE documents SET
status = 'catalogued',
concepts_extracted = 0,
vectors_inserted = 0,
recon_domain = NULL,
recon_domain_status = NULL,
recon_domain_assigned_at = NULL,
peertube_category_pushed_at = NULL,
error_message = NULL,
extracted_at = NULL,
enriched_at = NULL,
embedded_at = NULL
WHERE hash = ?""",
(file_hash,)
)
conn.commit()
# Re-queue for pipeline processing
db.queue_document(file_hash)
requeued += 1
print(f"Requeued {requeued} items for reprocessing")
return 0
else:
# Default: show domain assignment status
status_counts = db.get_domain_status_counts()
domain_dist = db.get_domain_distribution()
conn = db._get_conn()
total_stream = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'"""
).fetchone()['cnt']
unassigned = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'
AND d.recon_domain IS NULL"""
).fetchone()['cnt']
unpushed = len(db.get_unpushed_assignments())
print("=== Domain Assignment Status ===\n")
print(f"Total complete stream docs: {total_stream}")
print(f"Unassigned: {unassigned}")
print(f"Unpushed to PeerTube: {unpushed}")
if status_counts:
print(f"\nAssignment status breakdown:")
for status, cnt in sorted(status_counts.items()):
print(f" {status:<20s} {cnt:>6d}")
if domain_dist:
print(f"\nDomain distribution:")
for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]):
print(f" {domain:<35s} {cnt:>6d}")
return 0
def cmd_pipeline(args):
"""Stream B library pipeline: status, migrate, reverse, watch, sweep."""
from lib.new_pipeline import (
@ -1318,16 +1158,6 @@ def main():
p.set_defaults(func=cmd_ingest)
# assign-categories
p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos')
p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs')
p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis')
p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API')
p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items')
p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing')
p.add_argument('--limit', type=int, help='Limit number of items to process')
p.set_defaults(func=cmd_assign_categories)
# pipeline (Stream B)
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
p.add_argument('pipeline_action', nargs='?', default='status',

View file

@ -1,148 +0,0 @@
{% extends "base.html" %}
{% block content %}
<div id="pt-review">
<div class="stat-grid" style="grid-template-columns:repeat(4, 1fr);">
<div class="stat-card"><div class="label">Manual Review</div><div class="value" id="rv-manual"></div></div>
<div class="stat-card"><div class="label">Assigned</div><div class="value" id="rv-assigned"></div></div>
<div class="stat-card"><div class="label">Tied (Pass 1)</div><div class="value" id="rv-tied1"></div></div>
<div class="stat-card"><div class="label">Needs Reprocess</div><div class="value" id="rv-reprocess"></div></div>
</div>
<div class="panel" style="margin-top:16px;">
<h3 class="section-title" style="margin-bottom:12px;">Manual Review Queue</h3>
<div id="rv-items-container">
<table class="data-table" id="rv-table" style="display:none;">
<thead>
<tr>
<th>Video</th>
<th>Channel</th>
<th>Current Domain</th>
<th>Top Domains</th>
<th>Assign</th>
</tr>
</thead>
<tbody id="rv-tbody"></tbody>
</table>
<div id="rv-empty" class="text-muted" style="padding:24px;text-align:center;">Loading...</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
const VALID_DOMAINS = {{ valid_domains | tojson }};
async function loadStats() {
try {
const resp = await fetch('/api/peertube/review/stats');
const data = await resp.json();
document.getElementById('rv-manual').textContent = data.tied_manual || 0;
document.getElementById('rv-assigned').textContent =
(data.assigned || 0) + (data.tied_pass_2 || 0) + (data.manual_assigned || 0);
document.getElementById('rv-tied1').textContent = data.tied_pass_1 || 0;
document.getElementById('rv-reprocess').textContent = data.needs_reprocess || 0;
} catch (e) {
console.error('Failed to load stats:', e);
}
}
async function loadItems() {
try {
const resp = await fetch('/api/peertube/review/items');
const items = await resp.json();
const tbody = document.getElementById('rv-tbody');
const table = document.getElementById('rv-table');
const empty = document.getElementById('rv-empty');
if (!items.length) {
empty.textContent = 'No items pending manual review.';
table.style.display = 'none';
return;
}
tbody.innerHTML = '';
table.style.display = '';
empty.style.display = 'none';
items.forEach(item => {
const tr = document.createElement('tr');
tr.id = 'row-' + item.hash;
// Video title/filename
const tdVideo = document.createElement('td');
tdVideo.textContent = item.filename || item.hash.slice(0, 12);
tdVideo.title = item.hash;
tr.appendChild(tdVideo);
// Channel
const tdChannel = document.createElement('td');
tdChannel.textContent = item.category || '—';
tr.appendChild(tdChannel);
// Current domain
const tdCurrent = document.createElement('td');
tdCurrent.textContent = item.recon_domain || '—';
tr.appendChild(tdCurrent);
// Top domains (from concept counts)
const tdTop = document.createElement('td');
if (item.top_domains) {
tdTop.innerHTML = item.top_domains.map(d =>
'<span class="badge">' + d.domain + ' (' + d.count + ')</span>'
).join(' ');
}
tr.appendChild(tdTop);
// Assign dropdown + button
const tdAssign = document.createElement('td');
const sel = document.createElement('select');
sel.className = 'input-sm';
sel.innerHTML = '<option value="">Select...</option>' +
VALID_DOMAINS.map(d => '<option value="' + d + '">' + d + '</option>').join('');
if (item.recon_domain) {
sel.value = item.recon_domain;
}
const btn = document.createElement('button');
btn.className = 'btn btn-sm btn-primary';
btn.textContent = 'Assign';
btn.onclick = () => assignDomain(item.hash, sel.value, tr);
tdAssign.appendChild(sel);
tdAssign.appendChild(btn);
tr.appendChild(tdAssign);
tbody.appendChild(tr);
});
} catch (e) {
document.getElementById('rv-empty').textContent = 'Error loading items: ' + e.message;
}
}
async function assignDomain(hash, domain, row) {
if (!domain) {
alert('Select a domain first');
return;
}
try {
const resp = await fetch('/api/peertube/review/assign', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({hash: hash, domain: domain})
});
const result = await resp.json();
if (result.ok) {
row.style.opacity = '0.4';
row.querySelector('button').disabled = true;
row.querySelector('button').textContent = 'Done';
loadStats();
} else {
alert('Assignment failed: ' + (result.error || 'unknown error'));
}
} catch (e) {
alert('Error: ' + e.message);
}
}
loadStats();
loadItems();
</script>
{% endblock %}

View file

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
Parity test: verify RECON domain taxonomy matches PeerTube categories.
Tests:
1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly
2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels
Usage:
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
"""
import json
import sys
import requests
# Add parent dir to path for imports
sys.path.insert(0, '/opt/recon')
from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP
def test_local_parity():
"""Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS."""
map_keys = set(DOMAIN_CATEGORY_MAP.keys())
assert map_keys == VALID_DOMAINS, (
f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, "
f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}"
)
assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch"
print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS")
def test_peertube_categories():
"""Verify PeerTube API returns all 18 RECON categories."""
url = "http://192.168.1.170:9000/api/v1/videos/categories"
headers = {"Host": "stream.echo6.co"}
try:
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"[SKIP] PeerTube API unreachable: {e}")
return
categories = resp.json() # dict of {id_str: label}
missing = []
wrong_label = []
for domain, cat_id in DOMAIN_CATEGORY_MAP.items():
cat_str = str(cat_id)
if cat_str not in categories:
missing.append((cat_id, domain))
elif categories[cat_str] != domain:
wrong_label.append((cat_id, domain, categories[cat_str]))
if missing:
print(f"[FAIL] Missing categories in PeerTube: {missing}")
print(" Deploy peertube-plugin-recon-domains to CT 110 first")
sys.exit(1)
if wrong_label:
print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}")
sys.exit(1)
print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels")
if __name__ == '__main__':
test_local_parity()
test_peertube_categories()
print("\nAll parity tests passed.")