Compare commits

..

50 commits

Author SHA1 Message Date
e840a119dd
cleanup: drop dead deployment_config references + orphaned deleted_contacts template
Tidies stale references left behind by the navi extraction + decoupling work.

- lib/deployment_config.py: the consumer-catalog docstring listed four in-process
  consumers that were all extracted/removed across cleanups #4/#5/#6/#27
  (/api/landclass gate, google_places.py, place_detail.py, offroute/router.py).
  Replaced the stale 4-bullet list with an accurate note: recon has no remaining
  caller of get_deployment_config() today; the module is retained per cleanup #1.
- lib/api.py: removed the now-dead `from .deployment_config import
  get_deployment_config` import (its only caller was the /api/landclass handler
  removed in #5 — zero call sites remain).
- templates/knowledge/deleted_contacts.html: deleted — orphaned since cleanup #3
  removed the contacts/dashboard routes; zero callers in recon.

No functional change (the removed import was unused; the template unrendered).

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 23:09:49 -06:00
6365fe6756
decouple: remove /api/wiki-rewrite (migrated to navi-places)
PR-B of decouple #4-REWRITE — the LAST recon→navi decoupling step. navi-places
now owns the Kiwix link-rewrite logic in-process (navi-backend PR-A 7103c27,
deployed + verified: Twin Falls live route returns wiki_rewrites local/public
from navi's own wiki_cache.db; zero outbound calls to recon /api/wiki-rewrite).

- DELETE lib/wiki_rewrite.py (the Kiwix rewrite logic — ported to navi-places).
- DELETE lib/wiki_rewrite_api.py (the /api/wiki-rewrite blueprint).
- DELETE lib/wiki_rewrite_api_test.py (tests the deleted endpoint).
- api.py: drop the wiki_rewrite_bp import + register_blueprint + section comment.

Verified zero recon consumers: nothing in recon imports wiki_rewrite — it was
purely an HTTP endpoint for navi-places. After this, recon services make and
receive zero navi-ecosystem runtime calls; recon is a fully separate product.

Out-of-band (post-deploy): DROP TABLE wiki_cache from /opt/recon/data/place_cache.db
(table only — place_cache + google_api_calls stay).

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 21:21:22 -06:00
ac99723e51
decouple: remove /api/wiki-enrich + wiki_index read path (migrated to navi-places)
PR-B of decouple #4-READ. navi-places now reads its own wiki_index.db directly
(navi-backend a8f9520, deployed + verified: Horseshoe Falls enrichment served
from /var/lib/navi-backend/wiki_index.db; admin-info dropped the recon-wiki-enrich
dependency). recon's endpoint is edge-unreachable-unused, safe to remove.

- DELETE lib/wiki_enrich_api.py (the /api/wiki-enrich blueprint).
- DELETE lib/place_detail.py (97-line survivor: lookup_wiki_index +
  _get_wiki_index_db) — its only consumer was wiki_enrich_api.py (verified zero
  non-test code consumers). Fully orphaned.
- DELETE lib/wiki_enrich_api_test.py (tests the deleted endpoint).
- api.py: drop the wiki_enrich_bp import + register_blueprint.

Untouched (separate decouple): /api/wiki-rewrite (wiki_rewrite_api.py +
wiki_rewrite.py), still navi-consumed. /opt/recon/data/wiki_index.db left in
place (data; now a harmless dead file). Internal localhost migration — no nginx.

Flag (doc follow-up, not fixed): deployment_config.py:10 + wiki_rewrite_api.py:6
both have stale in-prose references to the deleted place_detail.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 19:25:53 -06:00
21c0f11eff
decouple: remove scripts/overture_import.py (migrated to navi-backend)
PR-B of the Overture-import relocation. The ETL now lives in
zvx-echo6/navi-backend at the same scripts/ path (PR-A, navi-backend 475739d:
script ported verbatim + duckdb dep + docs; verified live — imports cleanly,
overture PG reachable with ~20.9M rows). recon no longer produces overture data
it doesn't consume.

- DELETE scripts/overture_import.py.

Context: cleanup #29 removed lib/overture.py (recon's only overture *reader*),
leaving this ETL as recon's last orphan overture code path. PR-A moved the
writer to the navi side; this removes recon's now-orphan copy. The `overture`
PG database is unchanged — only the writer moved.

OVERTURE_DB_* vars in /opt/recon/.env are now dead in recon (zero overture code
paths remain) — flagged for out-of-band post-merge prune, same pattern as
PADUS_DB_* (cleanup #5).

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 13:57:14 -06:00
879df84b7a
decouple: remove /api/auth/whoami handler (migrated to navi-admin)
PR-B of the 2-PR whoami migration. The route is now served by navi-admin
:8427 via nginx (`^~ /api/auth/whoami` cutover verified live — edge responses
carry navi-admin's X-Cache-Status: BYPASS), so recon's handler is
edge-unreachable and safe to remove.

- lib/api.py: delete the @app.route('/api/auth/whoami') api_auth_whoami handler
  + its dedicated section comment. It was the file tail (post-cleanup-#6), so
  api.py now ends on the metrics-history handler.

Sequenced after PR-A (navi-backend, merged + deployed) and the nginx edge
cutover, so the route never 404s. recon serves zero navi-facing auth-state
endpoints now.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 13:34:06 -06:00
aa6e972260
cleanup: remove orphaned lib/overture.py + lib/osm_categories.py (post-#27 dead code)
Both modules were flagged in cleanup #27 (PR #16) as fully orphaned once the
place_detail orchestrator cluster was deleted; Matt confirmed scope in chat.

- lib/overture.py (170L): only consumer was place_detail._enrich_with_overture
  (deleted in #27).
- lib/osm_categories.py (143L): humanize_category's only callers were
  place_detail._parse_nominatim / _parse_overpass (both deleted in #27).

Re-probed against master 79d7b2b: zero import/usage references anywhere outside
the modules themselves, zero template/JS refs, no test files. compileall lib/
passes.

Note: scripts/overture_import.py (the Overture-Maps→PostGIS ETL script) is
independent — imports nothing from lib/ — and is left untouched. After this PR
the `overture` PostGIS DB it populates has no remaining recon reader; that's a
data-ops follow-up, not code touched here.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 10:35:15 -06:00
79d7b2b343
cleanup: remove orphaned lib/address_book.py (post-cleanup-4 dead code)
After cleanup #4 deleted lib/geocode.py, the only remaining address_book
references in recon were lib/address_book_test.py (test of the dying SUT) and
a dead `from . import address_book` import at the top of lib/netsyms_api.py
(never referenced in the body). This PR removes all three.

- DELETE lib/address_book.py + lib/address_book_test.py
- netsyms_api.py: drop the dead `from . import address_book` import

config/address_book.yaml stays — vendored data, navi-contacts (:8423) consumes
its own copy via NAVI_ADDRESS_BOOK_YAML.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 10:29:39 -06:00
adee6d5a69
cleanup: remove dead place_detail orchestrator cluster + lib/google_places.py (post-PR-11 dead code)
PR #11 (cleanup #2) deleted the /api/place* HTTP handlers but left their
orchestrator functions in lib/place_detail.py as dead code. Pre-flight for the
original Task #27 (delete google_places.py) surfaced that _enrich_with_google
is NOT a no-caller leaf — it's called by the unreachable get_place_detail. A
full caller-graph trace showed ~90% of place_detail.py is dead orchestration.

Scope expanded (Matt confirmed in chat) to remove the whole dead cluster:
- lib/google_places.py (entire file)
- place_detail.py: get_place_detail, get_place_by_wikidata, _enrich_with_google,
  _apply_google_data, _enrich_with_overture, _enrich_with_wiki_index,
  _enrich_wiki_links, _parse_nominatim, _parse_nominatim_address, _parse_overpass,
  _build_overpass_query, cache_get, cache_put, _get_db + their now-unused
  imports/constants (json, time, requests, osm_categories, NOMINATIM_URL, etc.)

KEEP only lookup_wiki_index + _get_wiki_index_db (the wiki_enrich_api survivor
path) — preserved byte-exact. Module docstring refreshed.

Flagged separately (not touched): overture.py + osm_categories.py are now
orphaned (only consumers were the deleted cluster); stale docstrings; the
deployment_config.py:9 catalog comment.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 10:21:31 -06:00
86c902f7b5
cleanup: remove /api/offroute + /api/mvum handlers + lib/offroute/ package (extraction #8 shadow)
/api/offroute (POST) and /api/mvum (GET) are edge-shadowed since extraction #8
— navi-offroute :8428 serves both via nginx. Cleanup #4 removed the last
in-process consumer of lib/offroute/dem.py (netsyms_api._reverse_elevation +
the module-level _DEM = DEMReader()), so the entire 9-file lib/offroute/
package is now orphaned and goes with this PR.

- api.py: drop both handlers (api_offroute, api_mvum) + their section comments.
  Both used in-function lazy imports of offroute, so no top-of-file import
  survives.
- DELETE lib/offroute/ wholesale (__init__, router, mvum, cost, barriers, dem,
  friction, trails, prototype). prototype.py was already dead at runtime.

Closes the recon->navi navi-shadow cleanup loop: recon now serves zero navi-*
shadow routes.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 04:25:54 -06:00
1f05d4b4d6
cleanup: remove /api/landclass handler + lib/landclass.py (extraction #4 shadow)
/api/landclass is edge-shadowed since extraction #4 — navi-landclass :8424
serves the route via nginx. Cleanup #4 removed the last in-process consumer
(netsyms_api._reverse_landclass), so lib/landclass.py is now fully orphaned.

- api.py: drop the @app.route('/api/landclass') handler + the
  `from .landclass import lookup_landclass, format_summary` import.
- DELETE lib/landclass.py (only consumer was the deleted handler).
- DELETE lib/landclass_test.py (SUT gone).

PADUS_DB_* vars in /opt/recon/.env are now dead in recon — flagged for an
out-of-band post-merge cleanup, not touched here (data, not code).

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 04:14:47 -06:00
d7292c4cc7
cleanup: remove /api/geocode + /api/reverse handlers (extraction #6 shadow)
All three routes (/api/geocode, /api/reverse, /api/reverse/<lat>/<lon>) are
edge-shadowed since extraction #6 — navi-geo :8426 serves them via nginx.

- netsyms_api.py: drop geocode_bp + its three handlers, the bundle-private
  helpers, and module state (TTLCache/lock/_TZ_DB_PATH/_DEM). netsyms_bp
  (/api/netsyms/lookup + /health) survives.
- api.py: drop the geocode_bp import + register_blueprint line.
- DELETE lib/geocode.py, lib/nav_tools.py (both orphaned once the handlers go).
- DELETE reverse_bundle_test.py, geocode_test.py, nav_tools_test.py.

Decouples netsyms_api.py from landclass.py and offroute/dem.py — prerequisite
for cleanups #5 and #6.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 04:04:45 -06:00
d56b1d5f92
cleanup: remove /api/contacts + /api/address_book handlers + pull entire /nav-i/* subtree (extraction #3 shadow) (#12)
* cleanup: remove /api/address_book handlers (extraction #3 shadow)

Removes address_book_bp (lib/address_book_api.py: /api/address_book/lookup +
/api/address_book/list) + its registration in lib/api.py. Edge-shadowed since
extraction #3 — navi-contacts (:8423) serves /api/address_book/* on
navi.echo6.co; no recon-side consumer (no template/JS reference).

lib/address_book.py is KEPT — geocode.py (nickname short-circuit + annotation)
and netsyms_api.py import it.

NOT removed this PR: contacts_bp. The recon dashboard at /deleted-contacts
(recon-product, stays) calls /api/contacts/<id>/{restore,restore-as,purge} via
XHR, and recon.echo6.co proxies straight to recon:8420 (verified the Caddy
block — no navi-contacts routing there). Removing contacts_bp would break those
dashboard actions. Flagged for a decision; lib/contacts.py also stays (dashboard
ContactsDB reads). See PR body.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* cleanup: deprecate /nav-i + /deleted-contacts; remove contacts_bp + lib/contacts.py

Probe found recon's /deleted-contacts dashboard reads /opt/recon/data/contacts.db
— frozen since extraction #3 moved write ownership to navi-contacts
(/var/lib/navi-backend/contacts.db). The page has been silently rendering ~25-day
stale data, and its restore/restore-as/purge XHRs hit recon's contacts_bp (the
recon.echo6.co Caddy block proxies straight to recon:8420 — no navi-contacts
routing there). Per Matt's decision, deprecate the pages entirely; they'll be
re-surfaced later as a proper admin page consuming navi-contacts via API.

Removed:
- contacts_bp (lib/contacts_api.py, all 10 /api/contacts* routes) + its
  registration in lib/api.py — edge-shadowed by navi-contacts :8423 since #3,
  and now free of recon-product consumers once the dashboard goes.
- /nav-i (navi_landing_page) + /deleted-contacts (deleted_contacts_page) route
  handlers; templates/navi/landing.html + templates/navi/deleted_contacts.html.
- lib/contacts.py (ContactsDB) — the dashboard was its only non-contacts_bp
  consumer; both gone.
- The two dead NAVI_SUBNAV entries (Overview→/nav-i, Deleted Contacts→
  /deleted-contacts).

Kept / adapted:
- /nav-i/api-keys page (recon-product key management) stays. NAVI_SUBNAV reduced
  to just its API Keys entry; the base.html top-nav "Nav-I" link repointed
  /nav-i -> /nav-i/api-keys so the surviving section page stays reachable
  (minimal href change, not a nav restructure — flagged in PR).
- lib/address_book.py — geocode.py + netsyms_api.py still consume it (untouched).

Out-of-band follow-up after merge: delete the stale /opt/recon/data/contacts.db
(frozen 2026-04-28; data, not code).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* cleanup: pull the entire /nav-i/* subtree (api-keys page is a weaker dup of /settings/keys)

Completes the contacts cleanup by removing the rest of /nav-i/. The
/nav-i/api-keys page was (a) a weaker duplicate of /settings/keys for Gemini
(it lacked remove + reload-from-.env), and (b) a write-only-to-dead-files
surface for TomTom + Google Places: it wrote /opt/recon/.env, but the live
navi-traffic (:8421) and navi-places (:8425) services read their own
/etc/navi-backend/<svc>.env and have ignored recon's copy since extractions
#1 + #5. End state: no /nav-i/* URLs in recon.

Removed:
- /nav-i/api-keys route + template (templates/navi/api_keys.html)
- all /api/nav-i/api-keys/* endpoints (list/update/test/restart-recon)
- lib/api_keys_admin.py (its only importers were those 4 endpoints; _KEY_DEFS/
  _read_env/_write_env were private to it)
- the now-orphaned NAVI_SUBNAV
- the "Nav-I" top-nav entry in base.html (reverses the /nav-i->/nav-i/api-keys
  repoint from the previous commit, now that the page itself is gone)

Kept (Gemini's real home, recon-product):
- /settings/keys + /api/keys/* + lib/key_manager.py (KeyManager) — they import
  key_manager directly, never api_keys_admin, so untouched.

Note: TOMTOM_API_KEY now has zero recon .py references. GOOGLE_PLACES_API_KEY
still has one (lib/google_places.py), kept in the prior /api/place cleanup as
place_detail's dep; its only caller (_enrich_with_google) is unreachable since
the /api/place handlers were removed — left in place pending /api/wiki-enrich
retirement (out of scope here).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: zvx-echo6 <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 03:34:22 -06:00
c968497b94
cleanup: remove /api/place handlers (extraction #5 shadow) (#11)
/api/place/<osm_type>/<int:osm_id> and /api/place/wikidata/<id> are
edge-shadowed since extraction #5 — navi-places (:8425) serves both via
nginx. Removes the two recon-side handlers + the now-unused
`from .place_detail import get_place_detail, get_place_by_wikidata` import.

NO modules deleted. place_detail.py is KEPT — wiki_enrich_api.py (the
/api/wiki-enrich endpoint, which stays; navi-places HTTP-consumes it) imports
`lookup_wiki_index` from it. That transitively keeps its deps google_places.py,
overture.py, osm_categories.py (all imported only by place_detail). This
corrects Phase A #5 §3's "only lib/api.py imports place_detail" — the
wiki-enrich endpoint (added post-#5) is a second consumer.

Co-authored-by: zvx-echo6 <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 00:27:44 -06:00
ed36eec85e
cleanup: remove /api/config handler (extraction #2 shadow) (#10)
* cleanup: remove /api/config handler (extraction #2 shadow)

recon's /api/config Flask handler (lib/api.py) is edge-shadowed since
extraction #2 — navi-config (:8422) serves the route via nginx on
navi.echo6.co. The recon-side handler is dead at the edge; remove it.

lib/deployment_config.py is KEPT: get_deployment_config() still has many
in-process consumers (lib/api.py:1237 /api/landclass has_landclass gate,
google_places.py, place_detail.py x4, offroute/router.py). Only the
/api/config HTTP handler is removed; the import at api.py:27 stays.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* cleanup: refresh deployment_config docstring (drop /api/config reference)

The module docstring still said get_deployment_config() was "for use by the
/api/config endpoint" — that handler was removed in the parent commit. Rewrite
to reflect the actual 5 in-process consumers (landclass gate, google_places,
place_detail ×4, offroute/router.py profile.offroute.*).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: zvx-echo6 <mj@k7zvx.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 00:08:48 -06:00
14ad2cd34a
recon: add /api/wiki-rewrite endpoint (extraction #5 prep, additive) (#9)
Per-tag HTTP wrapper over wiki_rewrite.rewrite_wiki_link so the (future)
navi-places service can rewrite OSM wiki tags to local Kiwix URLs over HTTP
instead of importing recon's wiki_rewrite module (which talks to Kiwix on
localhost:8430 and the wiki_cache table in /opt/recon/data/place_cache.db).
Companion to PR #8 (/api/wiki-enrich) — Matt picked option B (HTTP-couple the
Kiwix offline-wiki rewriting too, since it matters in prod).

  GET /api/wiki-rewrite?tag=<wikipedia|wikidata|wikivoyage|appropedia>&value=<raw>
  -> 200 {url, status}  where status is "local" | "public" | "original"
  -> 400 on missing value or unknown tag
  -> no 404 (unclassifiable value echoes back with status "original",
     mirroring rewrite_wiki_link)
  Public (no auth), like /api/place/* and /api/wiki-enrich.

Changes (additive only):
  - lib/wiki_rewrite_api.py: new wiki_rewrite_bp blueprint. Thin route directly
    over the existing rewrite_wiki_link(tag, value) — no extraction needed
    (it's already a clean standalone function, unlike wiki-enrich's lookup).
  - lib/api.py: register the blueprint (one block).
  - lib/wiki_rewrite_api_test.py: 5 tests (local Kiwix hit, public fallback,
    unclassifiable -> original, missing value -> 400, unknown tag -> 400),
    stubbing check_kiwix_has_article (no Kiwix/DB), plain-assert + __main__
    runner. Verified green against recon's venv (flask 3.1.2).

Does NOT touch place_detail's in-process _enrich_wiki_links — that gets removed
in a later PR once navi-places is live (same as PR #8). wiki_cache stays in
recon's own place_cache.db post-cutover (harmless positive-cache duplication).

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 14:08:18 -06:00
f42b1fef3b
recon: add /api/wiki-enrich endpoint (extraction #5 prep, additive) (#8)
HTTP wrapper over the wiki_index lookup so the (future) navi-places service can
fetch wiki enrichment over HTTP instead of reading recon's 2.1 GB
data/wiki_index.db directly (Phase A option B — HTTP coupling).

  GET /api/wiki-enrich?wikidata=<Qid>           (primary key)
  GET /api/wiki-enrich?name=<name>&country=<cc>  (fallback key)
  -> 200 {wiki_summary?, wiki_population?, wiki_url?, wikivoyage_url?}
  -> 400 if no usable key; 404 on no match. Public (no auth, like /api/place/*).

Route keys are wikidata_id / name+country — NOT osm_type/osm_id — because that
is how wiki_index is actually queried (the in-process _enrich_with_wiki_index
looks up by result['wikidata_id'] then name+country_code, never by OSM id; see
extraction-5-wiki-enrich-investigation.md). An osm-keyed route would have forced
a redundant in-recon place lookup.

Changes (additive only):
  - lib/place_detail.py: new standalone lookup_wiki_index(wikidata_id, name,
    country_code) doing the same two SELECTs + field/URL mapping as the
    in-process path, returning a dict or None. Pure DB read, never raises.
    `_enrich_with_wiki_index` is LEFT UNTOUCHED — it can be DRY-refactored to
    delegate to this in a later PR; the in-process enrichment path is unchanged.
  - lib/wiki_enrich_api.py: new wiki_enrich_bp blueprint with the route.
  - lib/api.py: register the blueprint (one block).
  - lib/wiki_enrich_api_test.py: 4 tests (hit-by-wikidata + decoded fields,
    no-match -> 404, name+country fallback, no-key -> 400) over an in-memory
    fixture DB; plain-assert style + __main__ runner (recon venv has no pytest).
    Verified green against recon's venv (flask 3.1.2).

Does NOT remove the in-process _enrich_with_wiki_index call from place_detail —
that happens in a later PR once navi-places is live and serving.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 13:23:08 -06:00
cf74f1840b
Merge pull request #5 from zvx-echo6/extraction-1-remove-traffic-handler
Remove /api/traffic/flow handler (now served by navi-traffic)
2026-05-22 09:57:05 -06:00
25cf5ac16a
Merge pull request #6 from zvx-echo6/extraction-2-add-auth-config
Add auth.login_url/logout_url to deployment profiles (extraction #2 prep)
2026-05-22 09:18:06 -06:00
bb220b7ba3 recon: add auth.login_url/logout_url to deployment profiles (extraction #2)
Additive prep for the Navi Panel.jsx login/logout cutover. Adds an `auth`
block (login_url, logout_url) to each deployment profile, placed after the
existing `services` block:

  - home.yaml        login=/outpost.goauthentik.io/start?rd=%2F
                     logout=auth.echo6.co invalidation flow, next=navi.echo6.co
  - minimal_pi.yaml  same, with TODO(matt) to confirm logout next= host
  - regional_pi.yaml same, with TODO(matt) to confirm logout next= host

No Python change. /api/config returns the whole profile dict, so these keys
flow through automatically; existing consumers ignore unknown keys, making
this backward-safe (the frontend fallback path is simply never needed once
this is live).

Next steps (separate PRs): the navi-config service (:8422) mirroring this
handler, and the Panel.jsx fix to read cfg.auth.login_url/logout_url with the
current literals as fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:10:33 -06:00
75664c7d02 recon: remove /api/traffic/flow handler (now served by navi-traffic, extraction #1)
The /api/traffic/flow/<z>/<x>/<y>.png handler is dead code in recon. As of
extraction #1 of the recon<->Navi decoupling, this path is served by the
standalone navi-traffic service. Live request flow is now:

    Caddy (CT 101, navi.echo6.co @authed_api, forward_auth)
      -> nginx :8440 (location ^~ /api/traffic/ -> proxy_cache traffic_cache)
        -> navi-traffic gunicorn :8421 (services/navi_traffic)

Cutover verified live: authenticated browser fetch to
https://navi.echo6.co/api/traffic/flow/... returns 200 image/png with
X-Cache-Status MISS then HIT (120s cache), Server: gunicorn.

navi-backend (github.com/zvx-echo6/navi-backend):
  - dae54f3  Initial scaffold: navi-backend + navi-traffic
  - 311cb8f  nginx: use ^~ prefix on /api/traffic/ to beat .png regex catch-all

Caddy cutover (@authed_api upstream 8420 -> nginx 8440) applied on Utility
CT 101. Also drops the now-unused make_response flask import (no other uses
in lib/api.py). os and http_requests remain (used elsewhere).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:12:20 -06:00
f7a501b4d7
Merge pull request #4 from zvx-echo6/feat/orbis-flow-migration
Migrate TomTom flow proxy from classic to Orbis Maps API
2026-05-21 16:24:13 -06:00
dcd4ddd358 Migrate TomTom flow proxy from classic to Orbis Maps API 2026-05-21 16:07:54 -06:00
f67f4ec9e3 Add wiki_index enrichment for place details
Enriches place API responses with wiki_summary, wiki_url, wiki_population,
and wikivoyage_url from wiki_index.db. Lookups by wikidata_id first,
then falls back to name + country_code.

Called from Nominatim, Overpass, and Wikidata endpoints.
Gated by has_kiwix_wiki feature flag.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-21 21:47:52 +00:00
dc7591b101
Merge PR #3: landclass: filter antimeridian-wrapping PAD-US records
landclass: filter antimeridian-wrapping PAD-US records
2026-05-20 10:41:46 -06:00
484dfbd1e0 landclass: filter antimeridian-wrapping PAD-US records
47 PAD-US units (Aleutian/Bering-Sea BOEM marine features, all is_valid=False)
are stored as antimeridian-wrapping polygons whose bbox spans ~360 deg of
longitude. Their invalid planar geometry forms latitude bands that ST_Intersects
false-matches for non-US points (e.g. London/Germany at ~51N matched
"Rat Islands" ogc_fid 3974).

Fix: add `AND (ST_XMax(geom) - ST_XMin(geom)) < 60` to the lookup_landclass
SELECT. No DB writes; two cheap ST_XMax/XMin evals on the already
spatial-index-filtered result set. Verified live: total 651088 rows,
filtered 651041 (exactly 47 excluded); Yosemite/Grand Canyon retained,
London/Germany now empty.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 16:34:14 +00:00
573347a2ee
Merge PR #2: Switch /api/reverse/<lat>/<lon> elevation source from Valhalla to planet-DEM
Switch /api/reverse/<lat>/<lon> elevation source from Valhalla to planet-DEM
2026-05-20 09:34:15 -06:00
3d2d69cd56 Switch /api/reverse/<lat>/<lon> elevation source from Valhalla to planet-DEM
Per OFFROUTE-ARCHITECTURE.md §9 ("planet-dem.pmtiles as single elevation
source"). The bundle endpoint previously called Valhalla /height, which only
has 48 Idaho HGT tiles; it now reads the planet-scale Terrarium PMTiles that
already back the frontend hillshade and contours.

- dem.py: add DEMReader.sample_point(lat, lon) — one z12 tile (LRU-cached),
  Web-Mercator pixel index, None outside the +/-85.05 pole cap or when untiled.
- netsyms_api.py: module-level DEMReader singleton (lazy mmap, None if init
  fails); _reverse_elevation now calls _DEM.sample_point; drop the Valhalla
  HTTP call and _VALHALLA_HEIGHT_URL.
- tests: DEM-mock and DEM-unavailable cases; EXPECTED_KEYS derives from
  _BUNDLE_KEYS. All 9 tests pass.

Verified live: Boise 824m, London 8m, Tokyo 35m, Yosemite 2804m, pole -> None.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 15:20:35 +00:00
a80bb6e848
Merge PR #1: Add /api/reverse/<lat>/<lon> localhost-sourced enrichment bundle
Add /api/reverse/<lat>/<lon> localhost-sourced enrichment bundle
2026-05-20 00:16:11 -06:00
f276b95753 Add /api/reverse/<lat>/<lon> localhost-sourced enrichment bundle
New geocode_bp sibling to the existing /api/reverse?lat=&lon= route (which
is unchanged). Returns a flat 9-field bundle for the Central enrichment
framework: name, city, county, state, country, postal_code (Photon),
timezone (timezones.sqlite via R-tree + shapely), landclass (in-process
lookup_landclass), elevation_m (Valhalla /height).

- Each component lookup is independent and wrapped in try/except: a failure
  logs a warning and yields null, never a 5xx. 400 only on unparseable /
  out-of-range coordinates.
- lat/lon parsed manually rather than via Flask <float:>, which rejects
  negative and integer coordinates and would 404 instead of 400.
- 10k-entry / 24h TTLCache keyed on coords rounded to 4 decimals.
- Tests mock Photon/Valhalla/landclass; one test exercises the real
  timezones.sqlite. cachetools pinned in requirements.txt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 05:33:45 +00:00
c1ba1f8dc7 Merge feature/offroute: effort-based wilderness routing, PostGIS entry points, BLM trail filtering 2026-05-09 15:45:27 +00:00
a04c10ad55 offroute: wilderness maneuvers with bearing, elevation, grade
- Segment breaks on: bearing change >30°, grade category change, distance >0.5mi
- Grade categories: flat (0-2°), gentle (2-5°), moderate (5-10°), steep (10-15°), very steep (15°+)
- Distance formatting: feet with commas <1mi, miles with decimal ≥1mi
- Instruction format: Head {cardinal}, gaining/descending X ft ({grade} uphill/downhill) — {dist}

Co-Authored-By: Claude <noreply@anthropic.com>
2026-05-09 05:05:00 +00:00
d8f84ab55a offroute: revert off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:34:37 +00:00
b4e33eb048 offroute: PostGIS entry points with 100m densification and land_status tagging
- Migrate EntryPointIndex from SQLite to PostGIS (padus database)
- Densify highway LineStrings at 100m intervals via Shapely interpolate
- 2.94M entry points from 476k lines (4x more coverage)
- Tag each entry point with land_status via ST_Intersects against padus_sub
  - 1.64M public (56%), 1.30M unknown (44%)
- Add geography GIST index for fast radius queries (~25ms)
- Increase OFF_NETWORK_THRESHOLD_M from 10m to 50m for GPS accuracy
- PBF path and PostGIS DSN configurable via home.yaml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-09 03:28:58 +00:00
05c24f95f6 offroute: tighten off-network threshold to 10m
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 23:27:06 +00:00
686b35710a api: add auto mode to offroute endpoint validation
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 22:37:49 +00:00
cf758476b4 offroute: add auto mode for standard driving routes
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:55:31 +00:00
87a4741b8d offroute: raise bbox limit to 2.0° (~220km coverage)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:19:04 +00:00
58347415bc offroute: bidirectional wilderness routing (all 4 scenarios)
Support all four routing scenarios:
  A: off-network → on-network (wilderness then Valhalla)
  B: off-network → off-network (wilderness, Valhalla, wilderness)
  C: on-network → off-network (Valhalla then wilderness)
  D: on-network → on-network (pure Valhalla passthrough)

Off-network detection via Valhalla /locate endpoint:
  - Snap distance > 500m = off-network

Key implementation details:
  - _locate_on_network() helper for network detection
  - route() dispatches to scenario-specific handlers
  - _pathfind_wilderness() extracted for reuse (runs MCP)
  - _valhalla_route() helper for network segments
  - _build_response() unifies GeoJSON output format

Memory management:
  - Sequential MCP runs for scenario B (not parallel)
  - gc.collect() after each MCP run
  - Bbox centered on wilderness origin, not distant destination

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 21:11:53 +00:00
ff0721c23e offroute: wilderness always uses foot mode for pathfinding
The wilderness segment now ALWAYS uses foot mode for MCP pathfinding.
The user's selected mode only affects:
1. Entry point selection (MODE_TO_VALID_HIGHWAYS filtering)
2. Valhalla costing for the network segment

This ensures vehicles can navigate through wilderness (on foot) to
reach roads, rather than failing when no vehicle-accessible path exists.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 19:03:31 +00:00
2252905986 feat(offroute): MVUM legal access — pathfinder integration + places panel API + boundary_mode control
MVUM Data Import:
- Downloaded USFS MVUM Roads (150,636 features) and Trails (28,741 features)
- Imported to navi.db as mvum_roads and mvum_trails tables
- Idaho coverage: ~8,994 roads and ~4,504 trails across 7 national forests
- Preserved all vehicle-class fields (ATV, MOTORCYCLE, HIGHCLEARANCEVEHICLE, etc.)
- Preserved seasonal date ranges (*_DATESOPEN fields)

New mvum.py module:
- MVUMReader class for querying MVUM data by bbox and nearest point
- parse_date_range() for seasonal date string parsing (MM/DD-MM/DD format)
- check_access() for determining open/closed status with date checking
- symbol_to_access() fallback when per-vehicle fields are null
- get_mvum_access_grid() for rasterizing MVUM to pathfinder grid

Cost function integration:
- Added mvum parameter to compute_cost_grid()
- MVUM closures respond to boundary_mode:
  * strict = impassable (np.inf)
  * pragmatic = 5x friction penalty
  * emergency = ignored entirely
- Foot mode skips MVUM (motor-vehicle specific)

Router integration:
- Loads MVUM access grid for motorized modes (mtb, atv, vehicle)
- Tracks mvum_closed_crossings in path summary

Places Panel API:
- GET /api/mvum?lat=XX&lon=XX&radius=50
- Returns MVUM feature with access status for all vehicle classes
- Includes seasonal date ranges, maintenance level, forest/district info
- GeoJSON geometry for map display

Validation:
- MVUM places endpoint tested with Sawtooth NF road
- All four modes validated with strict/pragmatic/emergency boundary modes
- Foot mode correctly ignores MVUM restrictions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:26:18 +00:00
bc463188d5 feat(offroute): Phase O4 — multi-mode cost functions (foot/mtb/atv/vehicle)
- Add ModeProfile dataclass for data-driven mode configuration
- Implement three speed functions:
  * Tobler off-path hiking (foot)
  * Herzog wheeled-transport polynomial (mtb/atv)
  * Linear speed degradation (vehicle)
- Add WildernessReader for PAD-US Des_Tp=WA wilderness areas
- Mode-specific terrain friction overrides:
  * Forest impassable for ATV/vehicle, high friction for MTB
  * Wetland/mangrove impassable for all wheeled modes
- Trail access rules:
  * Foot trails (value 25) impassable for ATV/vehicle
- Wilderness blocking for mtb/atv/vehicle modes
- Vehicle mode allows flat grassland/cropland traversal
- Memory optimization: limit entry points, constrain bbox size
- Update router to pass mode and wilderness to cost function
- Add vehicle to API mode validation

Validated all four modes with test route:
- foot: 0.46km off-network, 12.11km network, 89% on trail
- mtb: 0.47km off-network, 13.13km network, 90% on trail
- atv: 0.47km off-network, 12.81km network, 90% on trail
- vehicle: 0.46km off-network, 12.81km network, 89% on trail

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 14:11:56 +00:00
1a9dfc8f8d feat(offroute): Phase O3b — trail entry index, Valhalla stitching, /api/offroute endpoint
Phase A: Trail Entry Point Index
- Extract highway endpoints from idaho-latest.osm.pbf using osmium + ogr2ogr
- Store 740,430 entry points in /mnt/nav/navi.db (SQLite with spatial index)
- Entry points by class: service (271k), footway (152k), residential (146k),
  track (111k), path (26k), unclassified (16k), tertiary (9k), secondary (4k),
  primary (4k), bridleway (15)

Phase B: Pathfinder → Valhalla Stitching (router.py)
- OffrouteRouter orchestrates wilderness pathfinding + Valhalla on-network routing
- Queries entry points within 50km (expanding to 100km if needed)
- MCP pathfinder routes to nearest reachable entry point
- Calls Valhalla pedestrian/bicycle/auto costing for on-network segment
- Returns GeoJSON FeatureCollection with wilderness + network + combined segments

Phase C: Flask Endpoint
- POST /api/offroute with start/end coordinates, mode, boundary_mode
- Returns GeoJSON route with per-segment metadata and turn-by-turn maneuvers

Validated: 42.35,-114.30 → Twin Falls downtown
- Wilderness: 0.5km, 9min | Network: 36km, 413min | Total: ~421min
- 21 turn-by-turn instructions, segments connect at entry point

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 13:44:34 +00:00
3293cb4238 feat(offroute): Phase O3a — trail burn-in, pathfinder seeks trail corridors
Trail friction REPLACES land cover friction where trails exist:
- Road (value 5): 0.1× friction
- Track (value 15): 0.3× friction
- Foot trail (value 25): 0.5× friction

TrailReader loads /mnt/nav/worldcover/trails.tif rasterized from OSM highways.

Validation shows trail-seeking behavior:
- On-trail travel: 17.3% → 98.7%
- Effort time: 1047 min → 155 min (-85.2%)
- Path travels farther but stays on roads for speed

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 07:26:25 +00:00
e0eedcedfd feat(offroute): Phase O2c — PAD-US barriers with three-mode boundary respect
- Add barriers.py: PAD-US raster reader + build_barriers_raster() function
- Rasterize PAD-US Pub_Access=XA (Closed) polygons to CONUS GeoTIFF
- Modify cost.py: boundary_mode parameter (strict/pragmatic/emergency)
  - strict: private land = impassable (np.inf)
  - pragmatic: private land = 5x friction penalty (default)
  - emergency: private land barriers ignored
- Modify prototype.py: three-way comparison output
- Output: padus_barriers.tif at /mnt/nav/worldcover/ (144MB, ~33m resolution)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:56:36 +00:00
26d4bc7478 feat(offroute): Phase O2b — WorldCover friction integration, lake avoidance validated
- New friction.py: reads WorldCover friction VRT, resamples to match
  elevation grid, provides point sampling for validation
- Modified cost.py: accepts optional friction array, multiplies Tobler
  time cost by friction multiplier, inf for water/nodata (255/0)
- Modified prototype.py: loads friction layer, passes to cost function,
  validates path avoids water cells (friction=255)

Validated on Idaho test bbox:
- Path avoids Murtaugh Lake (no water cells on path)
- Friction along path: min=10, max=20, mean=10.2
- Effort increased 3.4% vs Phase O1 due to friction multipliers

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-08 06:33:45 +00:00
f2a0f81580 feat(offroute): Phase O1 foundation — PMTiles decoder, Tobler cost, MCP pathfinder prototype
- dem.py: Terrarium-encoded PMTiles tile reader with LRU cache
  - Decodes WebP tiles from planet-dem.pmtiles
  - Stitches tiles into numpy elevation grids for arbitrary bboxes
  - Provides pixel-to-latlon coordinate conversion

- cost.py: Tobler off-path hiking cost function
  - speed = 0.6 * 6.0 * exp(-3.5 * |grade + 0.05|) km/h
  - Max slope cutoff: 40 degrees → impassable
  - Returns time-to-traverse (seconds/cell) as cost metric

- prototype.py: Standalone validation on Idaho test bbox
  - 43km × 80km bbox (~17M cells at 14m resolution)
  - scikit-image MCP_Geometric Dijkstra pathfinder
  - Outputs GeoJSON LineString with path metadata
  - Validated: 61.6km path, 21.3 hours effort time

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-05-07 23:43:56 +00:00
227affca9d Merge fix/pdf-extraction-quality into master 2026-05-07 01:51:05 +00:00
fa456fecb1 Merge fix/zim-table-extraction into master 2026-05-07 01:51:05 +00:00
83a21854c3 fix: PDF extraction quality — word-boundary checks and layout mode
Adds _text_quality_ok() gate that replaces the bare 50-char length
check at each stage of the extraction fallback chain. Checks:
- Word-boundary ratio (≥60% of tokens must be real words)
- Concatenation ratio (lc→UC transitions must be <10% of word count)

When PyPDF2 default extraction fails quality check, retries with
space_width=100 for tighter word-boundary detection. This fixes
Haynes/workshop manuals where tight kerning produces concatenated
words like 'byMike' and 'oftheGuild'.

Also adds -layout flag to pdftotext subprocess calls for better
spatial awareness in the poppler fallback stage.

Note: PyPDF2 3.0.1 does not support layout=True parameter.
The space_width parameter serves the same purpose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:36:23 +00:00
b741e217f6 fix: ZIM table extraction — pipe-delimited cells instead of concatenation
Pre-processes HTML tree before lxml .text_content() to prevent
element concatenation:
- <table> cells joined with ' | ' delimiter, rows with newlines
- <br> tags produce newlines
- <li> items get '- ' prefix and newline separation
- <dt>/<dd> definition list items get newline separation

Fixes ~868 mangled Qdrant points where table content was jammed
together (e.g. 'Freq51Primary1A==' instead of 'Freq51 | Primary | 1A==').

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-07 01:32:25 +00:00
48 changed files with 186 additions and 7386 deletions

View file

@ -408,14 +408,9 @@ service:
peertube:
api_base: http://192.168.1.170 # Internal PeerTube API (CT 110 nginx)
api_url: http://192.168.1.170:9000 # Direct PeerTube API (bypasses nginx, for writer)
host_header: stream.echo6.co # Host header for PeerTube API requests
username: root # PeerTube admin username
password_env: PEERTUBE_PASSWORD # Env var holding PeerTube admin password
public_url: https://stream.echo6.co # Public URL for video links
fetch_timeout: 30 # HTTP timeout for API/VTT requests
rate_limit_delay: 0.5 # Delay between video ingestions (seconds)
writer_rate_limit: 0.1 # Delay between category push API calls (seconds)
poll_interval: 1800 # Seconds between PeerTube acquisition polls (30 min)
scraper:

View file

@ -6,13 +6,13 @@ profile: home
region_name: "North America"
tileset:
url: "/tiles/na.pmtiles"
url: "/tiles/planet/current.pmtiles"
bounds: [-168, 14, -52, 72]
max_zoom: 15
attribution: "Protomaps © OSM"
tileset_hillshade:
url: "/tiles/hillshade-na.pmtiles"
url: "/tiles/planet-dem.pmtiles"
encoding: "terrarium"
max_zoom: 12
@ -31,16 +31,20 @@ services:
address_book: "/api/address_book"
valhalla: "/valhalla"
auth:
login_url: "/outpost.goauthentik.io/start?rd=%2F"
logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/"
features:
has_nominatim_details: true
has_kiwix_wiki: false
has_kiwix_wiki: true
has_hillshade: true
has_3d_terrain: false
has_traffic_overlay: true
has_landclass: true
has_public_lands_layer: true
has_contours: true
has_contours_test: true
has_contours_test: false
has_contours_test_10ft: false
has_address_book_write: false
has_overture_enrichment: true
@ -48,7 +52,16 @@ features:
has_contacts: true
has_wiki_rewriting: true
has_wiki_discovery: false
has_usfs_trails: true
has_blm_trails: true
defaults:
center: [42.5736, -114.6066]
zoom: 10
# Offroute wilderness routing
offroute:
osm_pbf_path: "/mnt/nav/sources/idaho-latest.osm.pbf"
densify_interval_m: 100
postgis_dsn: "dbname=padus"

View file

@ -26,6 +26,11 @@ services:
address_book: "/api/address_book"
valhalla: "/valhalla"
# TODO(matt): confirm logout next= host for this profile
auth:
login_url: "/outpost.goauthentik.io/start?rd=%2F"
logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/"
features:
has_nominatim_details: false
has_kiwix_wiki: false

View file

@ -31,6 +31,11 @@ services:
address_book: "/api/address_book"
valhalla: "/valhalla"
# TODO(matt): confirm logout next= host for this profile
auth:
login_url: "/outpost.goauthentik.io/start?rd=%2F"
logout_url: "https://auth.echo6.co/if/flow/default-invalidation-flow/?next=https://navi.echo6.co/"
features:
has_nominatim_details: true
has_kiwix_wiki: false

View file

@ -1,14 +0,0 @@
# RECON Backlog — Technical Debt
## Qdrant Migration Follow-ups (2026-04-28)
From the Qdrant source-of-truth migration pre-commit review. All nice-to-haves, zero production impact.
1. **domain_assigner.py — list-type domain handling**
`_count_domains_from_qdrant` counts every element in a list-type `domain` payload. The embedder's `_validate_classification` normalizes lists to `payload['domain'] = valid[0]` before upsert, so multi-element lists never exist in production Qdrant data. For spec consistency, could match the embedder's first-only normalization. Zero impact since the embedder guarantees bare strings.
2. **recon.py --tiebreaker-pass — Qdrant client threading**
The `--tiebreaker-pass` CLI branch calls `run_tiebreaker_pass(db, config)` without creating and passing a `QdrantClient`. The function handles this via lazy construction in `_get_qdrant_client`, which creates one client for the entire batch. Could thread a client from the CLI entry point for consistency with `--backfill`. Functionally fine as-is.
3. **_get_qdrant_client — debug log caller identification**
The debug log `"Creating new QdrantClient (caller did not pass one)"` doesn't identify which function triggered the lazy construction. Could include caller info (e.g., `inspect.stack()[1].function`) for easier debug session triage. Low priority since it only fires for lazy construction paths.

View file

@ -1,20 +0,0 @@
# Deploy Blast Radius Reference
Quick-reference for operators during deployment of domain categorization.
| Step | What Changes | Worst Case (Partial Failure) | Detection Signal | Rollback | Est. Rollback Time |
|------|-------------|------------------------------|-----------------|----------|-------------------|
| **Plugin install** | PeerTube plugin dir on CT 110 | PeerTube fails to start | `systemctl status peertube` shows failed | Move plugin dir to `.disabled`, restart PeerTube | 2 min |
| **PeerTube restart** | PeerTube service state | PeerTube crash loop | `journalctl -u peertube` shows repeated failures | Disable plugin, restart | 2 min |
| **Schema migration** (RECON restart) | 4 new nullable columns + 1 index in recon.db | Migration SQL error leaves partial columns | Python PRAGMA check fails | DROP COLUMN for each added column | 5 min |
| **--backfill** | `recon_domain` + `recon_domain_status` on ~22K rows | Wrong domain assignments | Spot-check 20 random docs | `UPDATE documents SET recon_domain = NULL, recon_domain_status = NULL ...` | 1 min |
| **--tiebreaker-pass** | ~1,100 rows: `tied_pass_1` to `tied_pass_2`/`tied_manual` | Wrong tiebreaker resolution | Spot-check 5 resolved items | Reset `tied_pass_2`/`tied_manual` back to `tied_pass_1` | 1 min |
| **--push-pending** | PeerTube `video.category` column on ~22K rows | Wrong categories visible to all PeerTube users | PeerTube UI shows wrong labels | `UPDATE video SET category = NULL WHERE category >= 100` + clear push timestamps | 2 min |
| **--reprocess-missing** | **DELETES** concept directories (irreversible locally) | Concepts deleted, re-enrichment fails (Gemini API down, quota hit) | `recon.py status` shows stuck `queued` items, concept dirs missing | Restore from Contabo backup (`rsync`) | 10-60 min depending on count |
## Risk Tiers
- **Low risk (read-only):** `--dry-run` on any command, status display
- **Medium risk (DB-only, reversible):** `--backfill`, `--tiebreaker-pass`, schema migration
- **High risk (external writes):** `--push-pending` (writes to PeerTube, visible to users)
- **Critical risk (destructive):** `--reprocess-missing` (deletes concept files, $130+ Gemini work at risk)

View file

@ -1,117 +0,0 @@
# Domain Assignment — Algorithm & Operations Guide
## Overview
RECON's domain assignment feature maps each PeerTube video to one of 18 knowledge domains by analyzing the concept vectors stored in Qdrant. Assignments are pushed to PeerTube as category metadata via a custom plugin.
## Data Source
Domain counts are read from the `domain` payload field on concept vectors in Qdrant (`recon_knowledge_hybrid` collection on cortex:6333). Each concept vector has a `domain` string in its payload, set during enrichment and validated at embed time. This provides 100% coverage for all embedded documents with zero legacy domain residue.
Previously, domain counts were read from on-disk concept JSON files (`data/concepts/{hash}/window_*.json`). This was replaced with Qdrant queries on 2026-04-28 because ~10,000 items had missing or legacy-only concept files on disk while Qdrant had the correct data.
## Algorithm
### Pass 1: Concept Domain Count (inline, per-document)
Runs automatically via post-embed hook when a video completes the pipeline, or in bulk via `--backfill`.
1. Query Qdrant for all points with `doc_hash` matching the document
2. Count `domain` payload occurrences, filtering to `VALID_DOMAINS` only
3. If zero concept vectors → `no_concepts` (terminal)
4. If single top domain → `assigned`
5. If tied → `tied_pass_1` (deferred to tiebreaker)
### Pass 2: Channel Tiebreaker (batch)
Runs via `assign-categories --tiebreaker-pass`.
For each `tied_pass_1` document:
1. Identify the tied domains from Qdrant
2. Look up the document's channel (`catalogue.category`)
3. **Skip-list check:** If channel is in `MEGA_CHANNEL_SKIP_LIST` (known non-topical catch-alls), skip tiebreaking → `tied_manual`
4. Query Qdrant for domain counts across all other videos in the same channel (single batch query with `MatchAny` filter)
5. Among the tied domains only, pick the one with the highest channel-wide concept count
6. If resolved → `tied_pass_2`
7. If still tied → `tied_manual` (alphabetical fallback assigned, flagged for review)
### Channel Skip List
Certain channels are known non-topical catch-alls where channel-wide concept aggregation produces meaningless noise. These are listed explicitly in `MEGA_CHANNEL_SKIP_LIST` (defined in `lib/recon_domains.py`) and skip tiebreaking entirely — their tied items go straight to `tied_manual` for dashboard review.
Current skip list:
- `Transcript` — Legacy catch-all (~9,200 videos), no topical coherence
This is intentionally an explicit list, not a size threshold. Legitimate large channels (e.g., 1a-auto, forgotten-weapons) run the tiebreaker normally because their content is topically coherent. Adding a channel to the skip list requires a code change and a documented reason.
## Status Values
| Status | Meaning | Terminal? | Next Action |
|--------|---------|-----------|-------------|
| `assigned` | Clear winner from pass 1 | No | Push to PeerTube |
| `tied_pass_1` | Concept tie, awaiting tiebreaker | No | Run `--tiebreaker-pass` |
| `tied_pass_2` | Resolved by channel tiebreaker | No | Push to PeerTube |
| `tied_manual` | Needs human review | No | Review at `/peertube/review` |
| `no_concepts` | Zero concept vectors in Qdrant | **Yes** | None — typically non-topical content (vlogs, giveaways, announcements) |
| `needs_reprocess` | Transient failure (Qdrant error) | No | Run `--reprocess-missing` |
| `manual_assigned` | Human override from dashboard | No | Already pushed |
**"Categorized" filter** = `{'assigned', 'tied_pass_2', 'manual_assigned'}`
## CLI Commands
```bash
cd /opt/recon && source venv/bin/activate
# Show current assignment status
python3 recon.py assign-categories
# Pass 1: backfill all unassigned complete stream documents
python3 recon.py assign-categories --backfill --dry-run
python3 recon.py assign-categories --backfill
# Pass 2: resolve ties via channel analysis
python3 recon.py assign-categories --tiebreaker-pass
# Push all assigned-but-unpushed categories to PeerTube API
python3 recon.py assign-categories --push-pending
# Re-queue items with transient failures for full re-processing
python3 recon.py assign-categories --reprocess-missing
# Limit processing count
python3 recon.py assign-categories --backfill --limit 100
```
## Dashboard Review
The review UI at `recon.echo6.co/peertube/review` shows only `tied_manual` items. Each row displays:
- Video title and channel
- Top concept domains with counts
- Dropdown to select the correct domain
- Assign button (pushes to PeerTube immediately)
Items with `no_concepts` or `needs_reprocess` status do NOT appear in the review UI.
## Pipeline Integration
New videos ingested via the PeerTube collector are automatically assigned a domain when they complete the embed stage. The post-embed hook in `embedder.py`:
1. Runs `compute_assignment()` (pass 1 only), reusing the embedder's existing Qdrant client
2. If clear winner: pushes category to PeerTube immediately
3. If tied: marks as `tied_pass_1` for the next tiebreaker batch run
4. If no concepts: marks as `no_concepts` (terminal)
5. On Qdrant error: logs warning and continues — does not block the pipeline
## Source Files
| File | Purpose |
|------|---------|
| `lib/recon_domains.py` | Domain↔Category ID mapping, VALID_DOMAINS |
| `lib/domain_assigner.py` | `compute_assignment()` + `run_tiebreaker_pass()` + Qdrant helpers |
| `lib/peertube_writer.py` | OAuth2 client, `push_category()`, `push_pending()` |
| `lib/embedder.py` | Post-embed hook (passes qdrant client) |
| `lib/status.py` | DB columns + helper methods |
| `lib/api.py` | Dashboard review routes |
| `recon.py` | CLI `assign-categories` command |

View file

@ -1,426 +0,0 @@
# Domain Categorization Migration Runbook
Step-by-step procedure to deploy the PeerTube domain categorization feature.
## Prerequisites
- Feature branch `feature/peertube-domain-categorization` merged to master (or checked out)
- SSH access to recon-vm (192.168.1.130) and CT 110 (192.168.1.170)
- PeerTube admin credentials (`root` / password in `.env`)
## Pre-Deploy Backups
These backups MUST be completed before any state-changing step.
### 1. Snapshot RECON database
```bash
ssh zvx@192.168.1.130
cp /opt/recon/data/recon.db "/opt/recon/data/recon.db.pre-domain-feature.$(date +%Y%m%d_%H%M%S).bak"
ls -la /opt/recon/data/recon.db.pre-domain-feature.*.bak # Confirm
```
### 2. Snapshot PeerTube PostgreSQL
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres pg_dump peertube_prod' > "/tmp/peertube_prod.pre-domain-feature.$(date +%Y%m%d_%H%M%S).sql"
ls -la /tmp/peertube_prod.pre-domain-feature.*.sql # Confirm non-zero
```
### 3. Verify off-site concept backup
```bash
# Check last rsync to Contabo
ssh zvx@192.168.1.130 'ls -la /opt/recon/data/concepts/ | tail -5'
ssh root@100.64.0.1 'ls -la /opt/backups/recon/concepts/ | tail -5'
# Confirm timestamps match within 6 hours
```
### 4. Confirm RECON service state
```bash
ssh zvx@192.168.1.130 'sudo systemctl status recon --no-pager'
# Note: do NOT restart until Step 3. If currently running, confirm no active
# enrichment/embedding workers before proceeding.
```
---
## Step 1: Deploy PeerTube Plugin to CT 110
```bash
# From recon-vm, copy plugin to CT 110
ssh zvx@192.168.1.130
cd /opt/recon/peertube-plugin/
scp -r peertube-plugin-recon-domains root@192.168.1.241:'pct exec 110 -- mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Or via the Proxmox host:
ssh root@192.168.1.243 # media host
pct exec 110 -- bash -c 'mkdir -p /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains'
# Copy files into the container (scp from recon-vm or use pct push)
```
Alternative: Install via PeerTube admin UI (Admin > Plugins > Install).
```bash
# Restart PeerTube to register plugin
ssh root@192.168.1.243 'pct exec 110 -- systemctl restart peertube'
```
**STOP.** Check PeerTube logs for plugin registration errors:
```bash
ssh root@192.168.1.243 'pct exec 110 -- journalctl -u peertube --since=-5min' | grep -i plugin
```
If any errors reference `peertube-plugin-recon-domains`, do NOT proceed. Diagnose
and fix the plugin before continuing. See Rollback: "Plugin install fails" below.
## Step 2: Verify Plugin
```bash
# From recon-vm
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | grep -E '"1[0-1][0-9]"'
```
Should show all 18 categories (IDs 100-117). If any are missing, do NOT proceed.
Run the parity test:
```bash
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
```
## Step 3: Apply Schema Migration
**Requires RECON restart (ask user first).**
```bash
sudo systemctl restart recon
```
The migration runs automatically on startup via `StatusDB._init_db()`. Verify:
```bash
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
cols = [r[1] for r in conn.execute('PRAGMA table_info(documents)').fetchall()]
for c in ['recon_domain', 'recon_domain_status', 'recon_domain_assigned_at', 'peertube_category_pushed_at']:
assert c in cols, f'Missing: {c}'
print(f' {c}: OK')
# Verify index exists
indexes = [r[1] for r in conn.execute('PRAGMA index_list(documents)').fetchall()]
assert 'idx_documents_recon_domain_status' in indexes, 'Missing index'
print(' idx_documents_recon_domain_status: OK')
# Verify no columns were dropped
expected_existing = ['hash', 'status', 'filename', 'discovered_at']
for c in expected_existing:
assert c in cols, f'ALERT: existing column {c} is missing!'
print('Migration verified — all columns present, no existing columns dropped')
"
```
## Step 4: Run Backfill
```bash
cd /opt/recon && source venv/bin/activate
# Dry run first
python3 recon.py assign-categories --backfill --dry-run
```
**STOP.** Verify dry-run output distribution roughly matches investigation benchmarks:
- ~94.8% `assigned` (clear winners)
- ~5.2% `tied_pass_1` (ties)
- ~19.5% `needs_reprocess` (missing/legacy concepts)
If the distribution deviates more than 5 percentage points from these benchmarks,
halt and investigate. Do not proceed until the deviation is explained.
```bash
# Execute pass 1
python3 recon.py assign-categories --backfill
```
**STOP.** Spot-check 20 random assigned documents:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
rows = db._get_conn().execute(
\"SELECT d.hash, d.recon_domain FROM documents d WHERE d.recon_domain_status = 'assigned' ORDER BY RANDOM() LIMIT 20\"
).fetchall()
for r in rows:
print(r['hash'][:12], r['recon_domain'])
"
```
For each, visually verify against concept files: `ls data/concepts/{hash}/` and
spot-check one `window_*.json` to confirm the assigned domain is plausible.
Halt if any are wildly wrong. See Rollback: "Clear wrong backfill assignments" below.
```bash
# Run tiebreaker pass
python3 recon.py assign-categories --tiebreaker-pass
```
**STOP.** Verify tiebreaker results:
```bash
python3 -c "
from lib.status import StatusDB
db = StatusDB()
c = db.get_domain_status_counts()
print('Status breakdown:', c)
print()
print('tied_pass_2 (resolved):', c.get('tied_pass_2', 0))
print('tied_manual (needs review):', c.get('tied_manual', 0))
"
```
Spot-check 5 `tied_pass_2` items — verify the resolved domain is plausible given
the channel's other content.
```bash
# Check overall status
python3 recon.py assign-categories
```
## Step 5: Push to PeerTube
Push in stages. Do NOT push all at once.
```bash
# Dry run: confirm count
python3 recon.py assign-categories --push-pending --dry-run
# Stage 1: push 100 items
python3 recon.py assign-categories --push-pending --limit 100
```
**STOP.** Verify in PeerTube UI (stream.echo6.co admin, or via API) that 100 videos
now show RECON domain categories. Spot-check 5 videos.
```bash
# Verify via API: pick a random pushed video
python3 -c "
from lib.status import StatusDB
db = StatusDB()
row = db._get_conn().execute(
\"SELECT d.recon_domain, c.path FROM documents d LEFT JOIN catalogue c ON d.hash = c.hash WHERE d.peertube_category_pushed_at IS NOT NULL ORDER BY RANDOM() LIMIT 1\"
).fetchone()
if row:
uuid = row['path'].rsplit('/w/', 1)[-1] if row['path'] and '/w/' in row['path'] else '?'
print(f'Domain: {row[\"recon_domain\"]} UUID: {uuid}')
print(f'Check: curl -s http://192.168.1.170:9000/api/v1/videos/{uuid} -H \"Host: stream.echo6.co\" | python3 -m json.tool | grep category')
"
```
```bash
# Stage 2: push 1000 items
python3 recon.py assign-categories --push-pending --limit 1000
```
**STOP.** Verify via PeerTube database:
```bash
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
```
```bash
# Stage 3: push remaining
python3 recon.py assign-categories --push-pending
```
## Step 6: Verify
```bash
# Check PeerTube database directly
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT category, count(*) FROM video WHERE category >= 100 GROUP BY category ORDER BY count DESC"'
# Check uncategorized
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod -c "SELECT count(*) FROM video WHERE category IS NULL"'
# Check RECON status
python3 recon.py assign-categories
```
## Step 7: Reprocess Missing Items (SEPARATE POST-DEPLOY OPERATION)
**WARNING:** This step deletes concept directories. It is the only destructive
operation in the entire feature. Run it separately from the initial deploy,
after all other steps are verified and stable.
```bash
# Dry run first — review what would be deleted
python3 recon.py assign-categories --reprocess-missing --dry-run --limit 10
```
**STOP.** Review output. Verify concept dirs listed are genuinely stale (legacy
domains only, or missing concept files). The dry-run reports file counts for
each directory that would be deleted.
```bash
# Small batch
python3 recon.py assign-categories --reprocess-missing --limit 10
```
**STOP.** Verify: check that 10 items re-entered the pipeline.
```bash
python3 recon.py status # queued count should increase by ~10
```
Wait for pipeline to process them. Verify domain assignment on completion:
```bash
# Check these specific items got re-enriched and assigned
python3 recon.py assign-categories
```
```bash
# Scale up
python3 recon.py assign-categories --reprocess-missing --limit 100
# Then unbounded
python3 recon.py assign-categories --reprocess-missing
```
**Note on interrupts:** If `--reprocess-missing` is interrupted mid-run, re-running
it is safe. Any documents stranded at `status='catalogued'` without being re-queued
can be recovered with `recon.py queue --source stream.echo6.co`.
## Step 8: Dashboard Review
Navigate to `https://recon.echo6.co/peertube/review` to review `tied_manual` items.
Each row shows the video, channel, tied domains, and concept counts. Select the
correct domain and click Assign.
---
## Rollback Procedures
### Plugin install fails or breaks PeerTube
```bash
# Disable plugin without uninstalling
ssh root@192.168.1.243 'pct exec 110 -- bash -c "
mv /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains \
/var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains.disabled
systemctl restart peertube
"'
# Verify PeerTube is healthy
curl -s http://192.168.1.170:9000/api/v1/videos/categories -H "Host: stream.echo6.co" | python3 -m json.tool | head
# To fully remove: use PeerTube admin UI → Plugins → Uninstall
```
### Schema migration revert (drop new columns)
Only needed if the columns cause problems. The columns are nullable and have no
constraints, so they should be inert.
```bash
ssh zvx@192.168.1.130 'cd /opt/recon && source venv/bin/activate && python3 -c "
import sqlite3
conn = sqlite3.connect(\"data/recon.db\")
for col in [\"recon_domain\", \"recon_domain_status\", \"recon_domain_assigned_at\", \"peertube_category_pushed_at\"]:
try:
conn.execute(f\"ALTER TABLE documents DROP COLUMN {col}\")
print(f\"Dropped: {col}\")
except Exception as e:
print(f\"Skip {col}: {e}\")
conn.execute(\"DROP INDEX IF EXISTS idx_documents_recon_domain_status\")
conn.commit()
print(\"Index dropped\")
"'
```
Note: SQLite ALTER TABLE DROP COLUMN requires SQLite 3.35.0+ (2021-03-12).
Ubuntu 24.04 ships 3.45.1 — this is fine.
### Clear wrong backfill assignments (selective or full)
```bash
cd /opt/recon && source venv/bin/activate
# Clear ALL domain assignments
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = NULL,
recon_domain_assigned_at = NULL, peertube_category_pushed_at = NULL''')
conn.commit()
print('Cleared all domain assignments')
"
# Clear only tiebreaker results (reset to tied_pass_1 for re-run)
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('''UPDATE documents SET
recon_domain = NULL, recon_domain_status = 'tied_pass_1',
recon_domain_assigned_at = NULL
WHERE recon_domain_status IN ('tied_pass_2', 'tied_manual')''')
conn.commit()
"
```
### Clear wrong PeerTube categories
```bash
# Reset ALL RECON categories (100+) to NULL in PeerTube
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "UPDATE video SET category = NULL WHERE category >= 100"'
# Verify
ssh root@192.168.1.243 'pct exec 110 -- sudo -u postgres psql -d peertube_prod \
-c "SELECT count(*) FROM video WHERE category >= 100"'
# Should return 0
# Also clear RECON pushed timestamps so --push-pending can retry
cd /opt/recon && source venv/bin/activate
python3 -c "
from lib.status import StatusDB
db = StatusDB()
conn = db._get_conn()
conn.execute('UPDATE documents SET peertube_category_pushed_at = NULL WHERE peertube_category_pushed_at IS NOT NULL')
conn.commit()
print('Cleared push timestamps')
"
```
### Restore concepts after failed --reprocess-missing
```bash
# Concept backups are on Contabo at /opt/backups/recon/concepts/
# Identify which hashes were deleted (check RECON logs)
ssh zvx@192.168.1.130 'grep "Deleting concept dir" /opt/recon/logs/recon.log | tail -20'
# Restore specific hash from Contabo
HASH=<hash_from_log>
ssh root@100.64.0.1 "tar -cf - -C /opt/backups/recon/concepts/ $HASH" | \
ssh zvx@192.168.1.130 "tar -xf - -C /opt/recon/data/concepts/"
# Restore ALL concepts (nuclear option)
ssh root@100.64.0.1 'rsync -av /opt/backups/recon/concepts/ zvx@192.168.1.130:/opt/recon/data/concepts/'
```
### Fully remove feature
1. Uninstall plugin from PeerTube admin UI
2. Restart PeerTube
3. Revert RECON code changes (`git checkout master`)
4. Restart RECON
5. Drop schema columns (see above)
6. Reset PeerTube categories (see above)

View file

@ -1,160 +0,0 @@
"""
RECON Address Book YAML-backed saved-location lookup.
Provides named locations (home, work, etc.) that short-circuit Photon
geocoding when an exact alias match is found.
Config: /opt/recon/config/address_book.yaml
"""
import os
import re
import threading
import yaml
from .utils import setup_logging
logger = setup_logging('recon.address_book')
_CONFIG_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'config', 'address_book.yaml',
)
_lock = threading.Lock()
_entries: list[dict] = []
_mtime: float = 0.0
def _reload_if_changed():
"""Reload the YAML file if its mtime has changed."""
global _entries, _mtime
try:
st = os.stat(_CONFIG_PATH)
except FileNotFoundError:
logger.warning("Address book not found: %s", _CONFIG_PATH)
_entries = []
_mtime = 0.0
return
if st.st_mtime == _mtime:
return
with _lock:
# Double-check after acquiring lock
try:
st = os.stat(_CONFIG_PATH)
except FileNotFoundError:
_entries = []
_mtime = 0.0
return
if st.st_mtime == _mtime:
return
with open(_CONFIG_PATH, 'r') as f:
data = yaml.safe_load(f) or {}
raw = data.get('entries', [])
loaded = []
for entry in raw:
# Normalise aliases to lowercase for matching
aliases = [a.lower() for a in entry.get('aliases', [])]
loaded.append({
'id': entry.get('id', ''),
'name': entry.get('name', ''),
'aliases': aliases,
'address': entry.get('address', ''),
'lat': entry.get('lat'),
'lon': entry.get('lon'),
'tags': entry.get('tags', []),
})
_entries = loaded
_mtime = st.st_mtime
logger.info("Address book loaded: %d entries from %s", len(_entries), _CONFIG_PATH)
def load():
"""Ensure the address book is loaded (and refreshed if the file changed)."""
_reload_if_changed()
return _entries
def _normalize(text: str) -> str:
"""Lowercase, strip, remove commas, collapse whitespace."""
t = text.strip().lower()
t = t.replace(',', ' ')
return ' '.join(t.split())
def lookup(query: str):
"""
Look up a query against name and aliases.
Returns dict with the matching entry plus a 'confidence' field:
- "exact": full name/alias match, OR query starts with alias + word boundary
- "partial": alias starts with query + word boundary, or alias appears
as a contiguous token sequence inside the query
- None if no match
Matching order (first exact wins, else first partial):
1. normalized(query) == normalized(name or alias) exact
2. normalized(query) starts with normalized(alias) + " " exact
3. normalized(alias) starts with normalized(query) + " " partial
4. normalized(alias) is a contiguous token sub-sequence partial
"""
_reload_if_changed()
q = _normalize(query)
if not q:
return None
first_exact = None
first_partial = None
for entry in _entries:
norm_name = _normalize(entry['name'])
check_aliases = [_normalize(a) for a in entry.get('aliases', [])]
all_forms = [norm_name] + check_aliases
for form in all_forms:
if not form:
continue
# Rule 1: exact match
if q == form:
return {**entry, 'confidence': 'exact'}
# Rule 2: query starts with alias + word boundary
if q.startswith(form + ' '):
if first_exact is None:
first_exact = entry
continue
# Rule 3: alias starts with query (user still typing)
if form.startswith(q) and len(q) < len(form):
if first_partial is None:
first_partial = entry
continue
# Rule 4: alias is contiguous token sub-sequence in query
# Build regex: token1\s+token2\s+...tokenN
tokens = form.split()
if len(tokens) >= 1:
pattern = r'(?:^|\s)' + r'\s+'.join(re.escape(t) for t in tokens) + r'(?:\s|$)'
if re.search(pattern, q):
if first_partial is None:
first_partial = entry
if first_exact is not None:
return {**first_exact, 'confidence': 'exact'}
if first_partial is not None:
return {**first_partial, 'confidence': 'partial'}
return None
def list_all():
"""Return all address book entries."""
_reload_if_changed()
return list(_entries)

View file

@ -1,31 +0,0 @@
"""
RECON Address Book API Flask Blueprint.
GET /api/address_book/lookup?q=<query> best match or 404
GET /api/address_book/list all entries
"""
from flask import Blueprint, request, jsonify
from . import address_book
address_book_bp = Blueprint('address_book', __name__)
@address_book_bp.route('/api/address_book/lookup')
def api_address_book_lookup():
q = request.args.get('q', '').strip()
if not q:
return jsonify({'error': 'Missing q parameter'}), 400
result = address_book.lookup(q)
if result is None:
return '', 404
return jsonify(result)
@address_book_bp.route('/api/address_book/list')
def api_address_book_list():
entries = address_book.list_all()
return jsonify(entries)

View file

@ -1,91 +0,0 @@
#!/usr/bin/env python3
"""Tests for RECON address book module."""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib import address_book
TESTS = [
# ── Existing tests ──
("lookup('home') → exact",
lambda: address_book.lookup("home"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('Home') → exact (case-insensitive)",
lambda: address_book.lookup("Home"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 north st') → exact via alias",
lambda: address_book.lookup("214 north st"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 North Street') → exact via alias",
lambda: address_book.lookup("214 North Street"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('nonexistent place') → None",
lambda: address_book.lookup("nonexistent place"),
lambda r: r is None),
("list_all() → 1 entry",
lambda: address_book.list_all(),
lambda r: isinstance(r, list) and len(r) == 1 and r[0]['id'] == 'home'),
# ── New prefix+boundary tests ──
("lookup('214 north st filer') → exact (query starts with alias)",
lambda: address_book.lookup("214 north st filer"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 North St Filer ID') → exact (case + trailing state)",
lambda: address_book.lookup("214 North St Filer ID"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214 north st, filer, id') → exact (commas stripped)",
lambda: address_book.lookup("214 north st, filer, id"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('home today') → exact (short alias + trailing text)",
lambda: address_book.lookup("home today"),
lambda r: r is not None and r['confidence'] == 'exact' and r['id'] == 'home'),
("lookup('214') → partial (query is prefix of alias)",
lambda: address_book.lookup("214"),
lambda r: r is not None and r['confidence'] == 'partial'),
("lookup('214 n') → partial (partial prefix of alias)",
lambda: address_book.lookup("214 n"),
lambda r: r is not None and r['confidence'] == 'partial'),
("lookup('completely unrelated query') → None",
lambda: address_book.lookup("completely unrelated query"),
lambda r: r is None),
("lookup('214 north streets of filer') → None (no word boundary after st)",
lambda: address_book.lookup("214 north streets of filer"),
lambda r: r is None),
]
passed = 0
failed = 0
for name, fn, check in TESTS:
try:
result = fn()
ok = check(result)
except Exception as e:
ok = False
result = f"EXCEPTION: {e}"
status = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
print(f" [{status}] {name}")
if not ok:
print(f" got: {result}")
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View file

@ -17,16 +17,13 @@ import shutil
import tempfile
import requests as http_requests
from flask import Flask, request, jsonify, redirect, render_template, make_response
from flask import Flask, request, jsonify, redirect, render_template
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue
from werkzeug.utils import secure_filename
from .utils import get_config, content_hash, clean_filename_to_title, derive_source_and_category, generate_download_url, setup_logging
from .status import StatusDB
from .deployment_config import get_deployment_config
from .place_detail import get_place_detail, get_place_by_wikidata
from .landclass import lookup_landclass, format_summary
logger = setup_logging('recon.api')
@ -60,19 +57,9 @@ class _LargeZimRequest(_FlaskRequest):
return super()._get_file_stream(total_content_length, content_type, filename, content_length)
app.request_class = _LargeZimRequest
# ── Address Book Blueprint ──
from .address_book_api import address_book_bp
app.register_blueprint(address_book_bp)
# ── Contacts Blueprint ──
from .contacts_api import contacts_bp
app.register_blueprint(contacts_bp)
# ── Netsyms + Geocode Blueprints ──
from .netsyms_api import netsyms_bp, geocode_bp
# ── Netsyms Blueprint ──
from .netsyms_api import netsyms_bp
app.register_blueprint(netsyms_bp)
app.register_blueprint(geocode_bp)
# ── Navigation Constants ──
@ -88,7 +75,6 @@ KNOWLEDGE_SUBNAV = [
PEERTUBE_SUBNAV = [
{'href': '/peertube', 'label': 'Dashboard'},
{'href': '/peertube/channels', 'label': 'Channels'},
{'href': '/peertube/review', 'label': 'Review'},
]
@ -103,12 +89,6 @@ SETTINGS_SUBNAV = [
{'href': '/settings/health', 'label': 'Service Health'},
]
NAVI_SUBNAV = [
{'href': '/nav-i', 'label': 'Overview'},
{'href': '/deleted-contacts', 'label': 'Deleted Contacts'},
{'href': '/nav-i/api-keys', 'label': 'API Keys'},
]
def _format_source_citation(payload):
"""Format a human-readable citation from a search result payload."""
@ -335,36 +315,6 @@ def failures_page():
failures=failures)
@app.route("/deleted-contacts")
def deleted_contacts_page():
from .auth import get_user_id
from .contacts import ContactsDB
user_id = get_user_id() or "anonymous"
db = ContactsDB()
contacts = db.list_deleted(user_id)
return render_template("navi/deleted_contacts.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/deleted-contacts",
contacts=contacts)
@app.route("/nav-i")
def navi_landing_page():
from .auth import get_user_id
from .contacts import ContactsDB
user_id = get_user_id() or "anonymous"
db = ContactsDB()
deleted_count = len(db.list_deleted(user_id))
return render_template("navi/landing.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/nav-i",
deleted_count=deleted_count)
@app.route("/nav-i/api-keys")
def navi_api_keys_page():
return render_template("navi/api_keys.html",
domain="navi", subnav=NAVI_SUBNAV, active_page="/nav-i/api-keys")
@app.route('/peertube')
def peertube_dashboard():
return render_template('peertube/dashboard.html',
@ -377,86 +327,6 @@ def peertube_channels():
domain='peertube', subnav=PEERTUBE_SUBNAV, active_page='/peertube/channels')
@app.route('/peertube/review')
def peertube_review():
from .recon_domains import VALID_DOMAINS
return render_template('peertube/review.html',
domain='peertube', subnav=PEERTUBE_SUBNAV,
active_page='/peertube/review',
valid_domains=sorted(VALID_DOMAINS))
@app.route('/api/peertube/review/stats')
def api_peertube_review_stats():
db = StatusDB()
counts = db.get_domain_status_counts()
return jsonify(counts)
@app.route('/api/peertube/review/items')
def api_peertube_review_items():
from .domain_assigner import _count_domains_from_qdrant, _get_qdrant_client
db = StatusDB()
config = get_config()
items = db.get_items_by_domain_status('tied_manual', limit=200)
qdrant = _get_qdrant_client(config)
collection = config['vector_db']['collection']
result = []
for item in items:
file_hash = item['hash']
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
top_domains = [{'domain': d, 'count': cnt}
for d, cnt in domain_counter.most_common(5)]
result.append({
'hash': file_hash,
'filename': item.get('filename', ''),
'category': item.get('category', ''),
'recon_domain': item.get('recon_domain'),
'recon_domain_status': item.get('recon_domain_status'),
'top_domains': top_domains,
})
return jsonify(result)
@app.route('/api/peertube/review/assign', methods=['POST'])
def api_peertube_review_assign():
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP
from .peertube_writer import push_category, extract_uuid
data = request.get_json()
file_hash = data.get('hash')
domain = data.get('domain')
if not file_hash or not domain:
return jsonify({'ok': False, 'error': 'Missing hash or domain'}), 400
if domain not in VALID_DOMAINS:
return jsonify({'ok': False, 'error': f'Invalid domain: {domain}'}), 400
db = StatusDB()
config = get_config()
db.set_domain_assignment(file_hash, domain, 'manual_assigned')
# Push to PeerTube
conn = db._get_conn()
cat_row = conn.execute(
"SELECT path FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if cat_row:
uuid = extract_uuid(dict(cat_row)['path'])
if uuid:
cat_id = DOMAIN_CATEGORY_MAP[domain]
try:
push_category(uuid, cat_id, config)
db.set_peertube_pushed(file_hash)
except Exception as e:
return jsonify({'ok': True, 'warning': f'Assigned but PeerTube push failed: {e}'})
return jsonify({'ok': True, 'domain': domain})
@app.route('/settings/keys')
def settings_keys():
from lib.key_manager import get_key_manager
@ -1289,82 +1159,6 @@ def api_knowledge_stats():
return jsonify(_cache['knowledge_stats'])
@app.route('/api/traffic/flow/<int:z>/<int:x>/<int:y>.png')
def api_traffic_flow(z, x, y):
"""Proxy TomTom traffic flow tiles to hide API key from frontend."""
key = os.environ.get('TOMTOM_API_KEY')
if not key:
return 'Traffic service not configured', 503
url = f'https://api.tomtom.com/traffic/map/4/tile/flow/relative/{z}/{x}/{y}.png?key={key}'
try:
resp = http_requests.get(url, timeout=10)
if resp.status_code != 200:
return 'Upstream error', 502
r = make_response(resp.content)
r.headers['Content-Type'] = 'image/png'
r.headers['Cache-Control'] = 'public, max-age=120'
return r
except Exception:
return 'Upstream timeout', 504
@app.route('/api/place/<osm_type>/<int:osm_id>')
def api_place_detail(osm_type, osm_id):
"""Proxy place details from local Nominatim or Overpass API."""
result, status = get_place_detail(osm_type, osm_id)
return jsonify(result), status
@app.route("/api/place/wikidata/<wikidata_id>")
def api_place_wikidata(wikidata_id):
"""Fetch place details from Wikidata entity."""
result, status = get_place_by_wikidata(wikidata_id)
return jsonify(result), status
@app.route('/api/landclass')
def api_landclass():
"""PAD-US land classification lookup for a point."""
config = get_deployment_config()
if not config.get('features', {}).get('has_landclass'):
return jsonify({'error': 'Land classification not available'}), 404
try:
lat = float(request.args.get('lat', ''))
lon = float(request.args.get('lon', ''))
except (ValueError, TypeError):
return jsonify({'error': 'lat and lon required as numbers'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'lat must be -90..90, lon must be -180..180'}), 400
classifications = lookup_landclass(lat, lon)
is_public = len(classifications) > 0
is_private = len(classifications) == 0
summary = format_summary(classifications)
return jsonify({
'lat': lat,
'lon': lon,
'classifications': classifications,
'count': len(classifications),
'is_public': is_public,
'is_private': is_private,
'summary': summary,
})
@app.route('/api/config')
def api_config():
"""Return deployment profile config for frontend consumption."""
config = get_deployment_config()
resp = jsonify(config)
resp.headers['Cache-Control'] = 'public, max-age=300'
return resp
@app.route('/api/health')
def api_health():
"""Health check endpoint for monitoring."""
@ -1526,60 +1320,6 @@ def api_keys_reload():
# ── Nav-I API Key Admin ──
@app.route('/api/nav-i/api-keys/list', methods=['GET'])
def navi_api_keys_list():
from .api_keys_admin import list_keys
return jsonify({'keys': list_keys()})
@app.route('/api/nav-i/api-keys/update', methods=['POST'])
def navi_api_keys_update():
from .auth import require_auth
from .api_keys_admin import update_key, update_gemini_key
data = request.get_json(force=True)
name = data.get('name', '')
new_value = data.get('new_value', '')
index = data.get('index') # optional, for Gemini key replacement
if not name or not new_value:
return jsonify({'error': 'name and new_value required'}), 400
if name == 'GEMINI_KEY' and index is not None:
result = update_gemini_key(int(index), new_value)
else:
result = update_key(name, new_value)
if result.get('success'):
return jsonify(result)
return jsonify(result), 400
@app.route('/api/nav-i/api-keys/test', methods=['POST'])
def navi_api_keys_test():
from .api_keys_admin import test_key
data = request.get_json(force=True)
name = data.get('name', '')
index = data.get('index') # optional, for testing specific Gemini key
if not name:
return jsonify({'error': 'name required'}), 400
result = test_key(name, index=int(index) if index is not None else None)
return jsonify(result)
@app.route('/api/nav-i/api-keys/restart-recon', methods=['POST'])
def navi_api_keys_restart():
import subprocess
try:
result = subprocess.run(
['sudo', 'systemctl', 'restart', 'recon'],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
return jsonify({'success': True, 'note': 'RECON service restarted'})
return jsonify({'success': False, 'error': result.stderr.strip()}), 500
except subprocess.TimeoutExpired:
return jsonify({'success': False, 'error': 'Restart timed out'}), 500
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ── YouTube Cookie Management ──
@ -2785,21 +2525,3 @@ def api_metrics_history():
return jsonify({'type': metric_type, 'hours': hours, 'points': points})
except Exception as e:
return jsonify({'type': metric_type, 'hours': hours, 'points': [], 'error': str(e)})
# ── Auth state endpoint ─────────────────────────────────────────────────────
# Returns current auth state for frontend consumption.
# This endpoint must be behind Caddy forward_auth to receive X-Authentik-* headers.
@app.route('/api/auth/whoami')
def api_auth_whoami():
"""Return auth state for frontend. Behind forward_auth, so headers are present when authenticated."""
username = request.headers.get('X-Authentik-Username')
if username:
return jsonify({
'authenticated': True,
'username': username,
})
return jsonify({
'authenticated': False,
'username': None,
})

View file

@ -1,358 +0,0 @@
"""
Nav-I API Keys Admin unified view/update/test for third-party API keys.
Manages three provider categories:
- Gemini (multiple keys via KeyManager singleton)
- TomTom (single key in .env)
- Google Places (single key in .env)
All key values are masked in responses. Full values never leave the server
except as user-supplied input on update.
"""
import os
import re
import shutil
import tempfile
import time
import requests as http_requests
from .utils import setup_logging
logger = setup_logging('recon.api_keys_admin')
ENV_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')
# Key definitions: env_name → display metadata
_KEY_DEFS = {
'TOMTOM_API_KEY': {
'display_name': 'TomTom',
'provider': 'tomtom',
},
'GOOGLE_PLACES_API_KEY': {
'display_name': 'Google Places',
'provider': 'google_places',
},
}
# ── .env read/write helpers ─────────────────────────────────────────────
def _read_env():
"""Read .env file into a dict of key=value pairs, preserving order."""
entries = [] # list of (key, value, raw_line) — preserves order and comments
if not os.path.exists(ENV_PATH):
return entries
with open(ENV_PATH, 'r') as f:
for line in f:
raw = line.rstrip('\n')
stripped = raw.strip()
if not stripped or stripped.startswith('#'):
entries.append((None, None, raw))
continue
m = re.match(r'^([A-Za-z_][A-Za-z0-9_]*)=(.*)$', stripped)
if m:
entries.append((m.group(1), m.group(2).strip().strip('"').strip("'"), raw))
else:
entries.append((None, None, raw))
return entries
def _write_env(entries):
"""Atomically write .env from entries list. Backs up to .env.bak first."""
# Backup current .env
if os.path.exists(ENV_PATH):
bak_path = ENV_PATH + '.bak'
shutil.copy2(ENV_PATH, bak_path)
# Write to temp file, then rename (atomic on same filesystem)
fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(ENV_PATH), prefix='.env.', suffix='.tmp')
try:
with os.fdopen(fd, 'w') as f:
for key, value, raw in entries:
if key is not None:
f.write(f'{key}={value}\n')
else:
f.write(raw + '\n')
os.rename(tmp_path, ENV_PATH)
except Exception:
# Clean up temp file on failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise
logger.info(f"Wrote .env atomically ({len([e for e in entries if e[0]])} keys)")
def _get_env_value(name):
"""Get a single value from .env by key name."""
for key, value, _ in _read_env():
if key == name:
return value
return None
def _set_env_value(name, new_value):
"""Set a single value in .env. Adds if not present."""
entries = _read_env()
found = False
for i, (key, value, raw) in enumerate(entries):
if key == name:
entries[i] = (name, new_value, f'{name}={new_value}')
found = True
break
if not found:
entries.append((name, new_value, f'{name}={new_value}'))
_write_env(entries)
# ── Masking ─────────────────────────────────────────────────────────────
def _mask_key(value):
"""Mask a key: first 4 chars + '...' + last 4 chars. Never return full value."""
if not value:
return None
if len(value) <= 8:
return '****'
return value[:4] + '...' + value[-4:]
# ── List ────────────────────────────────────────────────────────────────
def list_keys():
"""
Return masked status of all managed API keys.
Returns list of dicts with: name, display_name, provider, masked_value,
is_set, count (for multi-key providers like Gemini).
"""
result = []
env_mtime = None
if os.path.exists(ENV_PATH):
env_mtime = time.strftime('%Y-%m-%dT%H:%M:%SZ',
time.gmtime(os.path.getmtime(ENV_PATH)))
# Gemini keys (via KeyManager)
from .key_manager import get_key_manager
km = get_key_manager()
gemini_keys = km.get_masked_keys()
gemini_count = len(gemini_keys)
# Show a single summary entry for Gemini with count
first_masked = gemini_keys[0]['masked'] if gemini_keys else None
result.append({
'name': 'GEMINI_KEY',
'display_name': 'Gemini',
'provider': 'gemini',
'masked_value': first_masked,
'is_set': gemini_count > 0,
'count': gemini_count,
'last_modified': env_mtime,
'keys': gemini_keys, # full list with per-key stats
})
# Single-value keys
for env_name, meta in _KEY_DEFS.items():
value = _get_env_value(env_name)
result.append({
'name': env_name,
'display_name': meta['display_name'],
'provider': meta['provider'],
'masked_value': _mask_key(value),
'is_set': bool(value),
'count': 1 if value else 0,
'last_modified': env_mtime,
})
return result
# ── Update ──────────────────────────────────────────────────────────────
def update_key(name, new_value):
"""
Update a key value. For Gemini, name should be 'GEMINI_KEY' with an
optional 'index' for replacing a specific key, or use the KeyManager API.
For TomTom/Google Places, writes directly to .env.
Returns dict with success status and masked value.
"""
new_value = new_value.strip()
if not new_value:
return {'success': False, 'error': 'Key value cannot be empty'}
if name == 'GEMINI_KEY':
# Use KeyManager for Gemini
from .key_manager import get_key_manager
km = get_key_manager()
try:
idx = km.add_gemini_key(new_value)
return {
'success': True,
'name': name,
'masked_value': _mask_key(new_value),
'action': 'added',
'index': idx,
}
except ValueError as e:
return {'success': False, 'error': str(e)}
if name in _KEY_DEFS:
_set_env_value(name, new_value)
return {
'success': True,
'name': name,
'masked_value': _mask_key(new_value),
'action': 'updated',
}
return {'success': False, 'error': f'Unknown key: {name}'}
def update_gemini_key(index, new_value):
"""Replace a specific Gemini key by index."""
new_value = new_value.strip()
if not new_value:
return {'success': False, 'error': 'Key value cannot be empty'}
from .key_manager import get_key_manager
km = get_key_manager()
try:
km.replace_gemini_key(index, new_value)
return {
'success': True,
'name': 'GEMINI_KEY',
'index': index,
'masked_value': _mask_key(new_value),
'action': 'replaced',
}
except (ValueError, IndexError) as e:
return {'success': False, 'error': str(e)}
# ── Test ────────────────────────────────────────────────────────────────
def test_key(name, index=None):
"""
Test a key against its provider API using the current .env value.
Returns dict with: success, latency_ms, error, note.
"""
if name == 'GEMINI_KEY':
return _test_gemini(index)
elif name == 'TOMTOM_API_KEY':
return _test_tomtom()
elif name == 'GOOGLE_PLACES_API_KEY':
return _test_google_places()
else:
return {'success': False, 'error': f'Unknown key: {name}', 'latency_ms': 0}
def _test_gemini(index=None):
"""Test Gemini key by listing models."""
from .key_manager import get_key_manager
km = get_key_manager()
if index is not None:
key = km.get_gemini_key(index)
if not key:
return {'success': False, 'error': f'Gemini key index {index} not found', 'latency_ms': 0}
else:
key = km.get_gemini_key(0)
if not key:
return {'success': False, 'error': 'No Gemini keys configured', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.get(
f"https://generativelanguage.googleapis.com/v1beta/models?key={key}",
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200 and 'models' in resp.text:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Models list returned successfully'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Key disabled or quota exhausted'}
elif resp.status_code == 429:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Valid key — currently rate-limited'}
else:
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}
def _test_tomtom():
"""Test TomTom key with a minimal geocode request."""
key = _get_env_value('TOMTOM_API_KEY')
if not key:
return {'success': False, 'error': 'TOMTOM_API_KEY not set', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.get(
f"https://api.tomtom.com/search/2/geocode/Boise.json",
params={'key': key, 'limit': 1},
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200:
data = resp.json()
count = data.get('summary', {}).get('totalResults', 0)
return {'success': True, 'latency_ms': latency, 'error': None,
'note': f'Geocode returned {count} result(s)'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Invalid or expired key'}
else:
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}
def _test_google_places():
"""Test Google Places (New) API key with a minimal searchText request."""
key = _get_env_value('GOOGLE_PLACES_API_KEY')
if not key:
return {'success': False, 'error': 'GOOGLE_PLACES_API_KEY not set', 'latency_ms': 0}
t0 = time.time()
try:
resp = http_requests.post(
"https://places.googleapis.com/v1/places:searchText",
json={'textQuery': 'Boise Idaho', 'maxResultCount': 1},
headers={
'X-Goog-Api-Key': key,
'X-Goog-FieldMask': 'places.displayName',
},
timeout=10
)
latency = int((time.time() - t0) * 1000)
if resp.status_code == 200:
data = resp.json()
count = len(data.get('places', []))
return {'success': True, 'latency_ms': latency, 'error': None,
'note': f'searchText returned {count} place(s)'}
elif resp.status_code == 403:
return {'success': False, 'latency_ms': latency,
'error': 'Key not authorized for Places API (New)'}
elif resp.status_code == 429:
return {'success': True, 'latency_ms': latency, 'error': None,
'note': 'Valid key — quota exceeded'}
else:
body = resp.text[:200]
return {'success': False, 'latency_ms': latency,
'error': f'HTTP {resp.status_code}: {body}'}
except Exception as e:
latency = int((time.time() - t0) * 1000)
return {'success': False, 'latency_ms': latency, 'error': str(e)}

View file

@ -1,230 +0,0 @@
"""
RECON Contacts Database per-user phone book with soft delete and proximity queries.
Separate DB at data/contacts.db. Thread-local connections with WAL mode (StatusDB pattern).
"""
import math
import os
import sqlite3
import threading
from datetime import datetime, timezone
_local = threading.local()
_SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
label TEXT NOT NULL,
name TEXT,
call_sign TEXT,
phone TEXT,
email TEXT,
category TEXT,
notes TEXT,
lat REAL,
lon REAL,
osm_type TEXT,
osm_id INTEGER,
address TEXT,
show_proximity INTEGER DEFAULT 0,
created_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
updated_at TEXT DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
deleted_at TEXT,
deleted_by TEXT
);
CREATE INDEX IF NOT EXISTS idx_contacts_user ON contacts(user_id);
CREATE INDEX IF NOT EXISTS idx_contacts_user_category ON contacts(user_id, category);
CREATE INDEX IF NOT EXISTS idx_contacts_user_deleted ON contacts(user_id, deleted_at);
CREATE INDEX IF NOT EXISTS idx_contacts_geo ON contacts(lat, lon);
CREATE UNIQUE INDEX IF NOT EXISTS idx_contacts_home_work
ON contacts(user_id, label)
WHERE label IN ('Home', 'Work') AND deleted_at IS NULL;
"""
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _row_to_dict(row):
"""Convert sqlite3.Row to dict, casting show_proximity to bool."""
d = dict(row)
d['show_proximity'] = bool(d.get('show_proximity', 0))
return d
class ContactsDB:
def __init__(self, db_path=None):
if db_path is None:
db_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'contacts.db')
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
def _get_conn(self):
if not hasattr(_local, 'contacts_conn') or _local.contacts_conn is None:
_local.contacts_conn = sqlite3.connect(self.db_path, timeout=30)
_local.contacts_conn.row_factory = sqlite3.Row
_local.contacts_conn.execute("PRAGMA journal_mode=WAL")
_local.contacts_conn.execute("PRAGMA busy_timeout=5000")
return _local.contacts_conn
def _init_db(self):
conn = self._get_conn()
conn.executescript(_SCHEMA)
conn.commit()
def list_all(self, user_id, category=None, search=None):
conn = self._get_conn()
sql = "SELECT * FROM contacts WHERE user_id = ? AND deleted_at IS NULL"
params = [user_id]
if category:
sql += " AND category = ?"
params.append(category)
if search:
sql += " AND (label LIKE ? OR name LIKE ? OR call_sign LIKE ? OR phone LIKE ?)"
like = f"%{search}%"
params.extend([like, like, like, like])
sql += " ORDER BY label"
return [_row_to_dict(r) for r in conn.execute(sql, params).fetchall()]
def list_deleted(self, user_id):
conn = self._get_conn()
rows = conn.execute(
"SELECT * FROM contacts WHERE user_id = ? AND deleted_at IS NOT NULL ORDER BY deleted_at DESC",
(user_id,)
).fetchall()
return [_row_to_dict(r) for r in rows]
def get(self, user_id, contact_id, include_deleted=False):
conn = self._get_conn()
sql = "SELECT * FROM contacts WHERE id = ? AND user_id = ?"
if not include_deleted:
sql += " AND deleted_at IS NULL"
row = conn.execute(sql, (contact_id, user_id)).fetchone()
return _row_to_dict(row) if row else None
def create(self, user_id, **fields):
conn = self._get_conn()
fields.pop('id', None)
fields.pop('user_id', None)
fields.pop('created_at', None)
fields.pop('updated_at', None)
fields.pop('deleted_at', None)
fields.pop('deleted_by', None)
if 'show_proximity' in fields:
fields['show_proximity'] = 1 if fields['show_proximity'] else 0
columns = ['user_id'] + list(fields.keys())
placeholders = ', '.join(['?'] * len(columns))
col_str = ', '.join(columns)
values = [user_id] + list(fields.values())
try:
cur = conn.execute(f"INSERT INTO contacts ({col_str}) VALUES ({placeholders})", values)
conn.commit()
return self.get(user_id, cur.lastrowid), None
except sqlite3.IntegrityError:
return None, 'conflict'
def update(self, user_id, contact_id, **fields):
conn = self._get_conn()
fields.pop('id', None)
fields.pop('user_id', None)
fields.pop('created_at', None)
fields.pop('deleted_at', None)
fields.pop('deleted_by', None)
if 'show_proximity' in fields:
fields['show_proximity'] = 1 if fields['show_proximity'] else 0
fields['updated_at'] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
sets = ', '.join(f"{k} = ?" for k in fields)
values = list(fields.values()) + [contact_id, user_id]
conn.execute(f"UPDATE contacts SET {sets} WHERE id = ? AND user_id = ? AND deleted_at IS NULL", values)
conn.commit()
return self.get(user_id, contact_id)
def soft_delete(self, user_id, contact_id):
conn = self._get_conn()
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
conn.execute(
"UPDATE contacts SET deleted_at = ?, deleted_by = ? WHERE id = ? AND user_id = ? AND deleted_at IS NULL",
(now, user_id, contact_id, user_id)
)
conn.commit()
return self.get(user_id, contact_id, include_deleted=True)
def restore(self, user_id, contact_id):
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row or not row.get('deleted_at'):
return None, 'not_found'
if row.get('label') in ('Home', 'Work'):
existing = conn.execute(
"SELECT id FROM contacts WHERE user_id = ? AND label = ? AND deleted_at IS NULL AND id != ?",
(user_id, row['label'], contact_id)
).fetchone()
if existing:
return None, 'conflict'
conn.execute(
"UPDATE contacts SET deleted_at = NULL, deleted_by = NULL WHERE id = ? AND user_id = ?",
(contact_id, user_id)
)
conn.commit()
return self.get(user_id, contact_id), None
def restore_as(self, user_id, contact_id, new_label):
"""Restore a soft-deleted contact with a new label (for Home/Work conflict resolution)."""
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row or not row.get('deleted_at'):
return None, 'not_found'
if not new_label or not new_label.strip():
return None, 'invalid_label'
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%fZ')
try:
conn.execute(
"UPDATE contacts SET deleted_at = NULL, deleted_by = NULL, label = ?, updated_at = ? WHERE id = ? AND user_id = ?",
(new_label.strip(), now, contact_id, user_id)
)
conn.commit()
except sqlite3.IntegrityError:
return None, 'conflict'
return self.get(user_id, contact_id), None
def purge(self, user_id, contact_id):
conn = self._get_conn()
row = self.get(user_id, contact_id, include_deleted=True)
if not row:
return False, 'not_found'
if not row.get('deleted_at'):
return False, 'not_deleted'
conn.execute("DELETE FROM contacts WHERE id = ? AND user_id = ?", (contact_id, user_id))
conn.commit()
return True, None
def find_nearby(self, user_id, lat, lon, radius_m=75):
conn = self._get_conn()
# Bounding box pre-filter (~111km per degree lat)
dlat = radius_m / 111_000
dlon = radius_m / (111_000 * math.cos(math.radians(lat)))
rows = conn.execute(
"""SELECT * FROM contacts
WHERE user_id = ? AND deleted_at IS NULL AND show_proximity = 1
AND lat BETWEEN ? AND ? AND lon BETWEEN ? AND ?""",
(user_id, lat - dlat, lat + dlat, lon - dlon, lon + dlon)
).fetchall()
results = []
for r in rows:
dist = _haversine_m(lat, lon, r['lat'], r['lon'])
if dist <= radius_m:
d = _row_to_dict(r)
d['distance_m'] = round(dist, 1)
results.append(d)
results.sort(key=lambda x: x['distance_m'])
return results

View file

@ -1,132 +0,0 @@
"""
RECON Contacts API Flask Blueprint.
Per-user phone book with soft delete, restore, purge, and proximity queries.
All endpoints require Authentik forward-auth (X-Authentik-Username header).
"""
from flask import Blueprint, request, jsonify
from .auth import require_auth
from .contacts import ContactsDB
contacts_bp = Blueprint('contacts', __name__)
_db = None
def _get_db():
global _db
if _db is None:
_db = ContactsDB()
return _db
@contacts_bp.route('/api/contacts', methods=['GET'])
@require_auth
def list_contacts():
db = _get_db()
category = request.args.get('category')
search = request.args.get('search')
return jsonify(db.list_all(request.user_id, category=category, search=search))
@contacts_bp.route('/api/contacts', methods=['POST'])
@require_auth
def create_contact():
db = _get_db()
data = request.get_json(force=True)
contact, err = db.create(request.user_id, **data)
if err == 'conflict':
return jsonify({'error': 'You already have a Home/Work contact'}), 409
return jsonify(contact), 201
@contacts_bp.route('/api/contacts/nearby', methods=['GET'])
@require_auth
def nearby_contacts():
db = _get_db()
lat = request.args.get('lat', type=float)
lon = request.args.get('lon', type=float)
radius_m = request.args.get('radius_m', 75, type=float)
if lat is None or lon is None:
return jsonify({'error': 'lat and lon required'}), 400
return jsonify(db.find_nearby(request.user_id, lat, lon, radius_m))
@contacts_bp.route('/api/contacts/deleted', methods=['GET'])
@require_auth
def list_deleted():
db = _get_db()
return jsonify(db.list_deleted(request.user_id))
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['GET'])
@require_auth
def get_contact(contact_id):
db = _get_db()
contact = db.get(request.user_id, contact_id)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['PATCH'])
@require_auth
def update_contact(contact_id):
db = _get_db()
data = request.get_json(force=True)
contact = db.update(request.user_id, contact_id, **data)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>', methods=['DELETE'])
@require_auth
def delete_contact(contact_id):
db = _get_db()
contact = db.soft_delete(request.user_id, contact_id)
if not contact:
return jsonify({'error': 'Not found'}), 404
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/restore', methods=['POST'])
@require_auth
def restore_contact(contact_id):
db = _get_db()
contact, err = db.restore(request.user_id, contact_id)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'conflict':
return jsonify({'error': 'You already have a Home/Work contact'}), 409
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/restore-as', methods=['POST'])
@require_auth
def restore_as_contact(contact_id):
db = _get_db()
data = request.get_json(force=True)
new_label = data.get('label', '').strip()
if not new_label:
return jsonify({'error': 'label is required'}), 400
contact, err = db.restore_as(request.user_id, contact_id, new_label)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'invalid_label':
return jsonify({'error': 'Invalid label'}), 400
if err == 'conflict':
return jsonify({'error': 'Label conflict'}), 409
return jsonify(contact)
@contacts_bp.route('/api/contacts/<int:contact_id>/purge', methods=['DELETE'])
@require_auth
def purge_contact(contact_id):
db = _get_db()
ok, err = db.purge(request.user_id, contact_id)
if err == 'not_found':
return jsonify({'error': 'Not found'}), 404
if err == 'not_deleted':
return jsonify({'error': 'Contact must be deleted before purging'}), 400
return jsonify({'ok': True})

View file

@ -3,7 +3,15 @@ Deployment profile loader.
Reads RECON_PROFILE env var (default: "home"), loads the matching YAML
from config/profiles/<profile>.yaml, and caches the parsed dict in memory.
Provides get_deployment_config() for use by the /api/config endpoint.
Exposes get_deployment_config() as the in-process accessor for the profile.
Note: its former consumers (the /api/landclass gate, google_places,
place_detail, offroute/router) were all extracted to navi-* services or removed
across cleanups #4#6/#27 — recon has no remaining caller of
get_deployment_config() today; the module is retained per cleanup #1.
(The former /api/config HTTP endpoint that served this dict to the frontend was
removed once navi-config (:8422) took over that route.)
"""
import os
import yaml

View file

@ -1,319 +0,0 @@
"""
RECON Domain Assigner
Computes per-video domain assignments from Qdrant vector payloads.
Two functions, two execution modes:
compute_assignment() pass 1, inline from post-embed hook
run_tiebreaker_pass() batch, resolves ties via channel concept scan
Data source: Qdrant `domain` payload field on concept vectors.
Previously read on-disk concept JSON files; migrated to Qdrant as
single source of truth (2026-04-28).
Status values written to documents.recon_domain_status:
assigned clear winner from pass 1 concept count
tied_pass_1 concept tie, awaiting channel tiebreaker
tied_pass_2 resolved by channel tiebreaker
tied_manual needs human review (dashboard)
no_concepts terminal, zero concept vectors in Qdrant
needs_reprocess transient failure (Qdrant error, etc.)
manual_assigned human override from dashboard
"""
from collections import Counter
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, MatchAny
from .recon_domains import VALID_DOMAINS, DOMAIN_CATEGORY_MAP, MEGA_CHANNEL_SKIP_LIST
from .utils import setup_logging
logger = setup_logging('recon.domain_assigner')
def _get_qdrant_client(config):
"""Create a QdrantClient from RECON config.
Callers should create one client and pass it through rather than
calling this repeatedly.
"""
logger.debug("Creating new QdrantClient (caller did not pass one)")
return QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
def _count_domains_from_qdrant(qdrant, collection, doc_hash):
"""Count valid domain occurrences for a single document from Qdrant.
Scrolls all points matching doc_hash and counts domain values.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hash: Document hash to query
Returns:
Counter of {domain_name: count} for valid domains.
Empty Counter if no points found (never None).
"""
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchValue(value=doc_hash))
]),
with_payload=["domain"],
with_vectors=False,
limit=200,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def _count_domains_from_qdrant_batch(qdrant, collection, doc_hashes):
"""Count valid domain occurrences across multiple documents from Qdrant.
Single scroll with MatchAny filter, with offset pagination for large
result sets.
Args:
qdrant: QdrantClient instance
collection: Qdrant collection name
doc_hashes: List of document hashes to query
Returns:
Counter of {domain_name: count} aggregated across all matching points.
"""
if not doc_hashes:
return Counter()
domain_counter = Counter()
offset = None
while True:
results, next_offset = qdrant.scroll(
collection_name=collection,
scroll_filter=Filter(must=[
FieldCondition(key="doc_hash", match=MatchAny(any=doc_hashes))
]),
with_payload=["domain"],
with_vectors=False,
limit=10000,
offset=offset,
)
for point in results:
dom = point.payload.get('domain')
if isinstance(dom, str) and dom in VALID_DOMAINS:
domain_counter[dom] += 1
elif isinstance(dom, list):
for d in dom:
if isinstance(d, str) and d in VALID_DOMAINS:
domain_counter[d] += 1
if next_offset is None:
break
offset = next_offset
return domain_counter
def compute_assignment(file_hash, db, config, qdrant=None):
"""Compute domain assignment for a single document (pass 1).
Counts domain occurrences across all concept vectors in Qdrant.
If a single domain wins, assigns it. If tied, defers to batch
tiebreaker.
Args:
file_hash: Document hash
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
(domain, status) tuple where domain is a string or None,
and status is one of: 'assigned', 'tied_pass_1', 'no_concepts',
'needs_reprocess'
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
try:
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
except Exception as e:
logger.warning(f"Qdrant query failed for {file_hash[:12]}: {e}")
return (None, 'needs_reprocess')
if len(domain_counter) == 0:
return (None, 'no_concepts')
top = domain_counter.most_common(2)
top_domain = top[0][0]
top_count = top[0][1]
if len(top) == 1 or top[1][1] < top_count:
return (top_domain, 'assigned')
# Tie — defer to tiebreaker pass
return (None, 'tied_pass_1')
def _get_tied_domains(qdrant, collection, file_hash):
"""Get the set of domains tied for first place in a document's concepts."""
domain_counter = _count_domains_from_qdrant(qdrant, collection, file_hash)
if not domain_counter:
return []
top = domain_counter.most_common()
if not top:
return []
max_count = top[0][1]
return [dom for dom, cnt in top if cnt == max_count]
def _channel_video_hashes(db, channel_name, exclude_hash=None):
"""Get all document hashes belonging to a PeerTube channel.
Args:
db: StatusDB instance
channel_name: catalogue.category (channel actor name)
exclude_hash: Hash to exclude (the document being resolved)
Returns:
List of document hashes
"""
conn = db._get_conn()
rows = conn.execute(
"SELECT hash FROM catalogue WHERE category = ? AND source = 'stream.echo6.co'",
(channel_name,)
).fetchall()
hashes = [r['hash'] for r in rows]
if exclude_hash:
hashes = [h for h in hashes if h != exclude_hash]
return hashes
def run_tiebreaker_pass(db, config, qdrant=None):
"""Resolve tied domain assignments using channel-level Qdrant analysis.
Processes all documents where recon_domain_status = 'tied_pass_1'.
For each tied document, queries Qdrant for domain counts from all
other videos in the same channel and picks the tied domain with the
highest channel-wide count.
Channels in MEGA_CHANNEL_SKIP_LIST (known non-topical catch-alls) skip
tiebreaking and go straight to 'tied_manual' for dashboard review.
Args:
db: StatusDB instance
config: RECON config dict
qdrant: Optional QdrantClient (created if not provided)
Returns:
Dict with counts: resolved, manual, skipped, errors
"""
owns_client = False
if qdrant is None:
qdrant = _get_qdrant_client(config)
owns_client = True
collection = config['vector_db']['collection']
tied_items = db.get_items_by_domain_status('tied_pass_1')
stats = {'resolved': 0, 'manual': 0, 'skipped': 0, 'errors': 0, 'total': len(tied_items)}
logger.info(f"Tiebreaker pass: {len(tied_items)} items to resolve")
for item in tied_items:
file_hash = item['hash']
channel = item.get('category', '')
try:
tied_domains = _get_tied_domains(qdrant, collection, file_hash)
if not tied_domains:
db.set_domain_assignment(file_hash, None, 'no_concepts')
stats['skipped'] += 1
continue
if len(tied_domains) == 1:
# No longer tied (possibly re-enriched since pass 1)
db.set_domain_assignment(file_hash, tied_domains[0], 'assigned')
stats['resolved'] += 1
continue
# Skip-list check: known non-topical catch-all channels
if channel in MEGA_CHANNEL_SKIP_LIST:
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: skip-list channel '{channel}' → tied_manual")
continue
# Channel tiebreaker: count domains across all other videos in channel
other_hashes = _channel_video_hashes(db, channel, exclude_hash=file_hash)
channel_domain_counts = _count_domains_from_qdrant_batch(
qdrant, collection, other_hashes
)
# Among tied domains only, pick highest channel-wide count
best_domain = None
best_count = -1
for dom in tied_domains:
c = channel_domain_counts.get(dom, 0)
if c > best_count:
best_count = c
best_domain = dom
# Check if channel tiebreaker resolved it
tied_at_channel = [d for d in tied_domains
if channel_domain_counts.get(d, 0) == best_count]
if len(tied_at_channel) == 1:
db.set_domain_assignment(file_hash, best_domain, 'tied_pass_2')
stats['resolved'] += 1
logger.debug(f" {file_hash[:12]}: resolved → {best_domain} (channel tiebreaker)")
continue
# Still tied after channel scan — mark for manual review
fallback = sorted(tied_domains)[0]
db.set_domain_assignment(file_hash, fallback, 'tied_manual')
stats['manual'] += 1
logger.debug(f" {file_hash[:12]}: still tied after channel scan, → tied_manual")
except Exception as e:
logger.warning(f" Tiebreaker error for {file_hash[:12]}: {e}")
stats['errors'] += 1
logger.info(f"Tiebreaker complete: {stats['resolved']} resolved, "
f"{stats['manual']} manual, {stats['skipped']} skipped, "
f"{stats['errors']} errors")
return stats

View file

@ -27,7 +27,13 @@ from .utils import resolve_text_dir
logger = setup_logging('recon.embedder')
# ── Classification allowlists ───────────────────────────────────────────────
from .recon_domains import VALID_DOMAINS
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}
@ -255,22 +261,15 @@ def embed_single(file_hash, db, config):
if not all_concepts:
db.update_status(file_hash, 'complete', vectors_inserted=0)
# Tag stream docs with no concepts for reprocessing
_cat = db._get_conn().execute(
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
if _cat and dict(_cat)['source'] == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No concepts to embed for {doc['filename']}")
return True
# Look up source and path from catalogue once per doc
# Look up source from catalogue once per doc
cat_conn = db._get_conn()
cat_row = cat_conn.execute(
"SELECT source, path FROM catalogue WHERE hash = ?", (file_hash,)
"SELECT source FROM catalogue WHERE hash = ?", (file_hash,)
).fetchone()
source = dict(cat_row)['source'] if cat_row else ''
catalogue_path = dict(cat_row)['path'] if cat_row else ''
download_url = ''
is_web = doc.get('path', '').startswith(('http://', 'https://'))
@ -322,8 +321,6 @@ def embed_single(file_hash, db, config):
if not valid:
db.update_status(file_hash, 'complete', vectors_inserted=0)
if source == 'stream.echo6.co':
db.set_domain_assignment(file_hash, None, 'needs_reprocess')
logger.info(f"No valid concepts to embed for {doc['filename']}")
return True
@ -404,28 +401,6 @@ def embed_single(file_hash, db, config):
db.update_status(file_hash, 'complete', vectors_inserted=embedded_count)
logger.info(f"Embedded {doc['filename']}: {embedded_count} vectors ({skipped} skipped)")
# Post-embed hook: assign domain for PeerTube videos
if source == 'stream.echo6.co':
try:
from .domain_assigner import compute_assignment
from .peertube_writer import push_category, extract_uuid
from .recon_domains import DOMAIN_CATEGORY_MAP
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
db.set_domain_assignment(file_hash, domain, status)
if domain and status == 'assigned':
cat_id = DOMAIN_CATEGORY_MAP[domain]
uuid = extract_uuid(catalogue_path)
if uuid:
pushed, _token = push_category(uuid, cat_id, config)
if pushed:
db.set_peertube_pushed(file_hash)
logger.info(f" Domain assigned: {domain} (category {cat_id}) → PeerTube")
else:
logger.warning(f" Domain assigned ({domain}) but PeerTube push failed for {file_hash[:12]}, will retry via --push-pending")
except Exception as e:
logger.warning(f"Domain assignment failed for {file_hash}: {e}")
return True
except Exception as e:

View file

@ -42,7 +42,13 @@ logger = setup_logging('recon.enricher')
STALE_ENRICHING_HOURS = 2
# ── Classification allowlists ───────────────────────────────────────────────
from .recon_domains import VALID_DOMAINS
VALID_DOMAINS = {
'Agriculture & Livestock', 'Civil Organization', 'Communications',
'Food Systems', 'Foundational Skills', 'Logistics', 'Medical',
'Navigation', 'Operations', 'Power Systems', 'Preservation & Storage',
'Security', 'Shelter & Construction', 'Technology', 'Tools & Equipment',
'Vehicles', 'Water Systems', 'Wilderness Skills',
}
VALID_KNOWLEDGE_TYPES = {'foundational', 'procedural', 'operational'}
VALID_COMPLEXITIES = {'basic', 'intermediate', 'advanced'}

View file

@ -21,6 +21,7 @@ Config: processing.extract_workers, processing.max_pdf_size_mb,
processing.extract_timeout, processing.page_timeout
"""
import base64
import re
import json
import os
import random
@ -99,6 +100,40 @@ def _is_transient(error_str):
return any(sig in s for sig in transient_signals)
def _text_quality_ok(text, min_length=50):
"""Check if extracted text meets quality thresholds.
Beyond the basic length check, validates:
- Word-boundary ratio: at least 60% of tokens should be real words (2+ alpha chars)
- Concatenation ratio: lowercase-immediately-followed-by-uppercase shouldn't exceed 10% of word count
Returns True if text passes all checks.
"""
text = text.strip()
if len(text) < min_length:
return False
words = text.split()
if not words:
return False
# Word-like ratio: tokens with 2+ alphabetic characters
word_like = sum(1 for w in words if len(re.findall(r'[a-zA-Z]', w)) >= 2)
word_ratio = word_like / len(words)
if word_ratio < 0.60:
return False
# Concatenation detector: lowercase immediately followed by uppercase
# Filter out common camelCase patterns in code (short tokens)
concat_hits = len(re.findall(r'[a-z][A-Z]', text))
concat_ratio = concat_hits / len(words) if words else 0
if concat_ratio > 0.10:
return False
return True
def _render_page_to_png(pdf_path, page_num_1indexed, dpi=200, timeout=30):
"""Render a single PDF page to PNG bytes using pdftoppm.
@ -224,7 +259,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# Method 1: pdftotext (poppler)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num_0indexed + 1),
['pdftotext', '-layout', '-f', str(page_num_0indexed + 1),
'-l', str(page_num_0indexed + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
@ -233,7 +268,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 2: pdftoppm + Tesseract OCR
@ -258,7 +293,7 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 3: Gemini Vision (last resort)
@ -276,8 +311,26 @@ def _extract_page_without_reader(pdf_path, page_num_0indexed, page_timeout=30):
# ── Core extraction functions ──
def _pypdf2_extract(reader, page_num):
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout."""
return reader.pages[page_num].extract_text() or ''
"""Extract text from a PyPDF2 page object. Runs inside a thread for timeout.
Tries default extraction first (space_width=200). If quality check fails,
retries with space_width=100 which better detects word boundaries in
tightly-kerned PDFs (common in Haynes/workshop manuals).
Note: PyPDF2 3.0.1 does not support layout=True. The space_width parameter
controls word-boundary detection tolerance. Lower values = more aggressive
space insertion between characters.
"""
text = reader.pages[page_num].extract_text() or ''
if _text_quality_ok(text):
return text
# Retry with tighter word-boundary detection
text_tight = reader.pages[page_num].extract_text(space_width=100.0) or ''
if len(text_tight.strip()) >= len(text.strip()):
return text_tight
return text
def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
@ -302,13 +355,13 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
text = ''
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pypdf2'
# Method 2: pdftotext via subprocess (inherently timeout-safe)
try:
result = subprocess.run(
['pdftotext', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
['pdftotext', '-layout', '-f', str(page_num + 1), '-l', str(page_num + 1), pdf_path, '-'],
capture_output=True, text=True, timeout=page_timeout
)
if result.returncode == 0 and len(result.stdout.strip()) > len(text.strip()):
@ -316,7 +369,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'pdftotext'
# Method 3: pdftoppm + Tesseract OCR
@ -340,7 +393,7 @@ def extract_text_from_page(reader, page_num, pdf_path, page_timeout=30):
except Exception:
pass
if len(text.strip()) >= 50:
if _text_quality_ok(text):
return text, 'tesseract'
# Method 4: Gemini Vision (last resort — costs API calls but handles scanned docs)

View file

@ -1,774 +0,0 @@
"""
RECON geocode structured preprocessing, multi-source retrieval, reranking.
Replaces the naive Photon-only search with:
1. usaddress parsing + intent classification (ADDRESS / POI / LOCALITY / COORD / POSTCODE)
2. Multi-source retrieval: ADDRESS Netsyms + Photon; POI/LOCALITY Photon /api
3. Python reranker with weighted signals
Public entry point: geocode(query, limit) {query, results, count}
"""
import math
import re
import logging
import requests
import usaddress
from rapidfuzz import fuzz
from .utils import setup_logging
logger = setup_logging('recon.geocode')
# ── Trace logger for reranking audit ──
_trace_logger = logging.getLogger('recon.geocode.trace')
_trace_handler = logging.FileHandler('/tmp/geocode_rerank_trace.log')
_trace_handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
_trace_logger.addHandler(_trace_handler)
_trace_logger.setLevel(logging.DEBUG)
# ── Config constants ──
PHOTON_URL = "http://localhost:2322"
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# ── Reranker weights ──
# Derived from research analysis of failure modes:
# housenumber_exact is the strongest signal because Photon's soft-boost
# lets wrong-number results bubble up. street_name_fuzz and locality_fuzz
# handle abbreviation/case variation. source_authority gives Netsyms a
# boost for US addresses since it has USPS-verified data.
W_HOUSENUMBER_EXACT = 6.0 # exact housenumber match
W_HOUSENUMBER_MISMATCH = -5.0 # housenumber present but wrong
W_STREET_NAME_FUZZ = 3.0 # fuzzy street name similarity [0..1] * weight
W_TOKEN_COVERAGE = 2.0 # fraction of query tokens found in result
W_STREET_TYPE_MATCH = 1.5 # "st" matches "street", etc.
W_LOCALITY_FUZZ = 2.0 # city/state fuzzy match
W_SOURCE_AUTHORITY = 2.0 # Netsyms for US addresses
W_LAYER_RANK = 1.0 # type-appropriate results ranked higher
W_PHOTON_POSITION_NORM = 1.0 # Photon's native ranking (normalized by position)
W_STATE_EXACT = 1.0 # exact state code match
W_POI_CLASS_BOOST = 3.0 # amenity/shop/etc boost for business-name queries
W_HIGHWAY_CLASS_PENALTY = -4.0 # highway/route penalty for business-name queries
# ── US abbreviation expansions ──
# Applied ONLY to parsed StreetName/StreetNamePostType tokens, NOT to ordinals.
_STREET_TYPE_ABBREVS = {
'st': 'street', 'ave': 'avenue', 'blvd': 'boulevard', 'dr': 'drive',
'rd': 'road', 'ln': 'lane', 'ct': 'court', 'cir': 'circle',
'pl': 'place', 'way': 'way', 'pkwy': 'parkway', 'hwy': 'highway',
'trl': 'trail', 'ter': 'terrace', 'sq': 'square',
}
_DIRECTIONAL_ABBREVS = {
'n': 'north', 's': 'south', 'e': 'east', 'w': 'west',
'ne': 'northeast', 'nw': 'northwest', 'se': 'southeast', 'sw': 'southwest',
}
_ORDINAL_RE = re.compile(r'^\d+(st|nd|rd|th)$', re.IGNORECASE)
# ── Road keywords (for detecting when query is about a road vs a business) ──
_ROAD_KEYWORDS = (
set(_STREET_TYPE_ABBREVS.keys())
| set(_STREET_TYPE_ABBREVS.values())
| {'route', 'rte', 'pass'}
)
# ── US state codes ──
_STATE_CODES = {
'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA',
'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ',
'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC',
'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY', 'DC',
}
# ── Full state name → code (for intent classifier) ──
_STATE_NAME_TO_CODE = {
'alabama': 'AL', 'alaska': 'AK', 'arizona': 'AZ', 'arkansas': 'AR',
'california': 'CA', 'colorado': 'CO', 'connecticut': 'CT', 'delaware': 'DE',
'florida': 'FL', 'georgia': 'GA', 'hawaii': 'HI', 'idaho': 'ID',
'illinois': 'IL', 'indiana': 'IN', 'iowa': 'IA', 'kansas': 'KS',
'kentucky': 'KY', 'louisiana': 'LA', 'maine': 'ME', 'maryland': 'MD',
'massachusetts': 'MA', 'michigan': 'MI', 'minnesota': 'MN',
'mississippi': 'MS', 'missouri': 'MO', 'montana': 'MT', 'nebraska': 'NE',
'nevada': 'NV', 'new hampshire': 'NH', 'new jersey': 'NJ',
'new mexico': 'NM', 'new york': 'NY', 'north carolina': 'NC',
'north dakota': 'ND', 'ohio': 'OH', 'oklahoma': 'OK', 'oregon': 'OR',
'pennsylvania': 'PA', 'rhode island': 'RI', 'south carolina': 'SC',
'south dakota': 'SD', 'tennessee': 'TN', 'texas': 'TX', 'utah': 'UT',
'vermont': 'VT', 'virginia': 'VA', 'washington': 'WA',
'west virginia': 'WV', 'wisconsin': 'WI', 'wyoming': 'WY',
}
# Coordinate regex
_COORD_RE = re.compile(r'^\s*(-?\d+\.?\d*)\s*[,\s]\s*(-?\d+\.?\d*)\s*$')
# ═══════════════════════════════════════════════════════════════════
# STEP 1: PREPROCESSING
# ═══════════════════════════════════════════════════════════════════
def _parse_coords(text):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _classify_and_parse(query):
"""
Parse query with usaddress, classify intent, expand abbreviations.
Returns (intent, parsed_dict) where:
intent: 'ADDRESS' | 'POI' | 'LOCALITY' | 'POSTCODE' | 'COORD' | 'UNKNOWN'
parsed_dict: {number, street, city, state, zipcode, raw_query, expanded_query}
"""
q = query.strip()
parsed = {
'number': None, 'street': None, 'street_raw': None,
'city': None, 'state': None,
'zipcode': None, 'raw_query': q, 'expanded_query': q,
}
# Coordinate check first
if _parse_coords(q):
return 'COORD', parsed
# Try usaddress
try:
tagged, addr_type = usaddress.tag(q)
except usaddress.RepeatedLabelError:
# Ambiguous input — fall back to free-text Photon
return 'UNKNOWN', parsed
# Extract components
number = tagged.get('AddressNumber', '').strip()
street_name = tagged.get('StreetName', '').strip()
street_pre_dir = tagged.get('StreetNamePreDirectional', '').strip()
street_post_type = tagged.get('StreetNamePostType', '').strip()
place = tagged.get('PlaceName', '').strip()
state = tagged.get('StateName', '').strip()
zipcode = tagged.get('ZipCode', '').strip()
# ── Fix usaddress edge case: "214 N St Filer" ──
# usaddress reads single-letter directional + "St" as PreDirectional + empty,
# mashing "St Filer" into StreetName. Detect: PreDirectional is single letter,
# StreetName has 2+ tokens where the first is a street type.
if (street_pre_dir and len(street_pre_dir) <= 2
and not street_name.strip().startswith(street_pre_dir)
and ' ' in street_name):
name_tokens = street_name.split()
first_lower = name_tokens[0].lower()
if first_lower in _STREET_TYPE_ABBREVS or first_lower in _STREET_TYPE_ABBREVS.values():
# "N" is actually the street name, "St" is the post-type
street_name = street_pre_dir
street_post_type = name_tokens[0]
if len(name_tokens) > 1:
place = ' '.join(name_tokens[1:])
street_pre_dir = ''
# ── Expand abbreviations (guard ordinals) ──
expanded_parts = []
if number:
parsed['number'] = number
expanded_parts.append(number)
if street_pre_dir:
exp = _DIRECTIONAL_ABBREVS.get(street_pre_dir.lower(), street_pre_dir)
expanded_parts.append(exp)
if street_name:
# Don't expand ordinals: "21st" stays "21st"
if _ORDINAL_RE.match(street_name):
expanded_parts.append(street_name)
else:
# Expand directional abbreviation if it IS the street name
exp = _DIRECTIONAL_ABBREVS.get(street_name.lower(), street_name)
expanded_parts.append(exp)
parsed['street'] = street_name
if street_post_type:
if _ORDINAL_RE.match(street_post_type):
expanded_parts.append(street_post_type)
else:
exp = _STREET_TYPE_ABBREVS.get(street_post_type.lower(), street_post_type)
expanded_parts.append(exp)
# Build raw street (original abbreviations, for Netsyms) and expanded (for Photon)
raw_street_parts = []
if street_pre_dir:
raw_street_parts.append(street_pre_dir)
if street_name:
raw_street_parts.append(street_name)
if street_post_type:
raw_street_parts.append(street_post_type)
parsed['street_raw'] = ' '.join(raw_street_parts)
# Build the full expanded street
if expanded_parts:
# The street is everything after the number
street_full = ' '.join(expanded_parts[1:] if number else expanded_parts)
parsed['street'] = street_full
if place:
parsed['city'] = place
expanded_parts.append(place)
if state:
parsed['state'] = state.upper()
expanded_parts.append(state)
if zipcode:
parsed['zipcode'] = zipcode
expanded_parts.append(zipcode)
parsed['expanded_query'] = ' '.join(expanded_parts)
# ── Intent classification ──
if addr_type == 'Street Address' and number:
return 'ADDRESS', parsed
elif zipcode and not number and not street_name:
return 'POSTCODE', parsed
elif addr_type == 'Ambiguous':
# Check if it looks like a locality: last token(s) are a state code or name
tokens = q.replace(',', ' ').split()
if len(tokens) >= 2:
last_upper = tokens[-1].upper()
if last_upper in _STATE_CODES:
parsed['city'] = ' '.join(tokens[:-1])
parsed['state'] = last_upper
return 'LOCALITY', parsed
# Check full state names (single-word like "idaho" or two-word like "new york")
last_lower = tokens[-1].lower()
if last_lower in _STATE_NAME_TO_CODE:
parsed['city'] = ' '.join(tokens[:-1])
parsed['state'] = _STATE_NAME_TO_CODE[last_lower]
return 'LOCALITY', parsed
if len(tokens) >= 3:
two_word = f"{tokens[-2].lower()} {last_lower}"
if two_word in _STATE_NAME_TO_CODE:
parsed['city'] = ' '.join(tokens[:-2])
parsed['state'] = _STATE_NAME_TO_CODE[two_word]
return 'LOCALITY', parsed
return 'UNKNOWN', parsed
else:
return 'UNKNOWN', parsed
# ═══════════════════════════════════════════════════════════════════
# STEP 2: RETRIEVAL
# ═══════════════════════════════════════════════════════════════════
def _retrieve_netsyms(parsed, limit=10, lat=None, lon=None):
"""Query Netsyms for structured address lookup. Returns list of candidate dicts."""
try:
from . import netsyms
except Exception:
return []
results = []
number = parsed.get('number', '')
street = parsed.get('street_raw') or parsed.get('street', '')
city = parsed.get('city', '')
state = parsed.get('state', '')
zipcode = parsed.get('zipcode', '')
# When viewport provided, fetch more results to sort from
fetch_limit = 200 if (lat is not None and lon is not None) else limit
if number and street:
rows = netsyms.lookup_by_street(
number, street, city=city, state=state, zipcode=zipcode, limit=fetch_limit
)
elif zipcode:
rows = netsyms.lookup_by_zipcode(zipcode, limit=fetch_limit)
else:
return []
for row in rows:
addr_parts = [row['number'], row['street']]
if row.get('street2'):
addr_parts.append(row['street2'])
addr_parts.extend([row['city'], row['state'], row['zipcode']])
display = ' '.join(p for p in addr_parts if p)
results.append({
'name': display,
'lat': row['lat'],
'lon': row['lon'],
'source': 'netsyms',
'type': 'street_address',
'raw': row,
'_number': row.get('number', ''),
'_street': row.get('street', ''),
'_city': row.get('city', ''),
'_state': row.get('state', ''),
})
# Sort by viewport distance if lat/lon provided, then limit
if lat is not None and lon is not None and results:
results.sort(key=lambda r: (r["lat"] - lat)**2 + (r["lon"] - lon)**2)
results = results[:limit]
return results
def _retrieve_photon_structured(parsed, limit=10):
"""Query Photon /structured endpoint for address lookup."""
params = {'limit': limit, 'countrycode': 'US'}
if parsed.get('street'):
params['street'] = parsed['street']
if parsed.get('number'):
params['housenumber'] = parsed['number']
if parsed.get('city'):
params['city'] = parsed['city']
if parsed.get('state'):
params['state'] = parsed['state']
if 'street' not in params:
return []
try:
resp = requests.get(f"{PHOTON_URL}/structured", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.debug("Photon /structured failed: %s", e)
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _retrieve_photon_freetext(query, limit=10, lat=None, lon=None, zoom=None):
"""Query Photon /api for free-text search with location bias."""
try:
params = {
'q': query,
'limit': limit,
'lat': lat if lat is not None else GEOCODE_BIAS_LAT,
'lon': lon if lon is not None else GEOCODE_BIAS_LON,
'zoom': int(zoom) if zoom is not None else GEOCODE_BIAS_ZOOM,
}
resp = requests.get(f"{PHOTON_URL}/api", params=params, timeout=5)
resp.raise_for_status()
data = resp.json()
except Exception as e:
return []
return _parse_photon_features(data.get('features', []), 'photon')
def _parse_photon_features(features, source):
"""Convert Photon GeoJSON features to candidate dicts."""
results = []
for i, feature in enumerate(features):
props = feature.get('properties', {})
coords = feature.get('geometry', {}).get('coordinates', [0, 0])
osm_key = props.get('osm_key', '')
osm_value = props.get('osm_value', '')
feat_type = props.get('type', '')
has_hn = bool(props.get('housenumber'))
if osm_key in ('amenity', 'shop', 'tourism', 'leisure', 'office'):
rtype = 'poi'
elif has_hn or osm_value in ('house', 'residential'):
rtype = 'street_address'
elif feat_type in ('city', 'town', 'village', 'hamlet', 'county', 'state', 'country'):
rtype = 'locality'
else:
rtype = 'poi'
# Build display name
parts = []
hn = props.get('housenumber')
street = props.get('street')
name = props.get('name', '')
if hn and street:
parts.append(f"{hn} {street}")
if name and name != street:
parts.append(name)
elif name:
parts.append(name)
elif street:
parts.append(street)
for key in ('city', 'county', 'state', 'country'):
v = props.get(key)
if v and (not parts or v != parts[-1]):
parts.append(v)
display = ', '.join(p for p in parts if p) or 'Unknown'
results.append({
'name': display,
'lat': coords[1],
'lon': coords[0],
'source': source,
'type': rtype,
'raw': props,
'_photon_rank': i,
'_number': props.get('housenumber', ''),
'_street': props.get('street', ''),
# For locality results, the name IS the city (Photon omits 'city' on city-type features)
'_city': props.get('city', '') or (props.get('name', '') if rtype == 'locality' else ''),
'_state': props.get('state', ''),
})
return results
# ═══════════════════════════════════════════════════════════════════
# STEP 3: RERANKER
# ═══════════════════════════════════════════════════════════════════
def _expand_street_type(s):
"""Expand a street type abbreviation for comparison."""
return _STREET_TYPE_ABBREVS.get(s.lower(), s.lower())
def _score_candidate(candidate, parsed, intent):
"""
Score a candidate against the parsed query.
Returns (total_score, signal_breakdown_dict).
"""
signals = {}
total = 0.0
query_number = (parsed.get('number') or '').strip().upper()
query_street = (parsed.get('street') or '').strip().upper()
query_city = (parsed.get('city') or '').strip().upper()
query_state = (parsed.get('state') or '').strip().upper()
cand_number = (candidate.get('_number') or '').strip().upper()
cand_street = (candidate.get('_street') or '').strip().upper()
cand_city = (candidate.get('_city') or '').strip().upper()
cand_state = (candidate.get('_state') or '').strip().upper()
# ── Housenumber ──
if intent == 'ADDRESS' and query_number:
if cand_number == query_number:
signals['housenumber_exact'] = W_HOUSENUMBER_EXACT
total += W_HOUSENUMBER_EXACT
elif cand_number and cand_number != query_number:
signals['housenumber_mismatch'] = W_HOUSENUMBER_MISMATCH
total += W_HOUSENUMBER_MISMATCH
# ── Street name fuzz ──
if query_street and cand_street:
# Expand both for comparison
q_expanded = ' '.join(_expand_street_type(t) for t in query_street.split())
c_expanded = ' '.join(_expand_street_type(t) for t in cand_street.split())
ratio = fuzz.token_sort_ratio(q_expanded, c_expanded) / 100.0
score = ratio * W_STREET_NAME_FUZZ
signals['street_name_fuzz'] = round(score, 2)
total += score
# ── Street type match ──
if query_street and cand_street:
q_tokens = set(_expand_street_type(t) for t in query_street.split())
c_tokens = set(_expand_street_type(t) for t in cand_street.split())
# Check if the street type words overlap
street_types = set(_STREET_TYPE_ABBREVS.values())
q_types = q_tokens & street_types
c_types = c_tokens & street_types
if q_types and q_types & c_types:
signals['street_type_match'] = W_STREET_TYPE_MATCH
total += W_STREET_TYPE_MATCH
# ── Token coverage ──
raw_q = parsed.get('raw_query', '').upper()
q_tokens = set(raw_q.replace(',', ' ').split())
if q_tokens:
cand_text = candidate.get('name', '').upper()
matched = sum(1 for t in q_tokens if t in cand_text)
coverage = matched / len(q_tokens)
score = coverage * W_TOKEN_COVERAGE
signals['token_coverage'] = round(score, 2)
total += score
# ── Locality fuzz ──
if query_city and cand_city:
ratio = fuzz.ratio(query_city, cand_city) / 100.0
score = ratio * W_LOCALITY_FUZZ
signals['locality_fuzz'] = round(score, 2)
total += score
# ── State exact ──
if query_state and cand_state:
if cand_state == query_state:
signals['state_exact'] = W_STATE_EXACT
total += W_STATE_EXACT
# ── Source authority ──
if candidate.get('source') == 'netsyms' and intent == 'ADDRESS':
signals['source_authority'] = W_SOURCE_AUTHORITY
total += W_SOURCE_AUTHORITY
# ── Layer rank (type-appropriate bonus) ──
cand_type = candidate.get('type', '')
if intent == 'ADDRESS' and cand_type == 'street_address':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'LOCALITY' and cand_type == 'locality':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
elif intent == 'POI' and cand_type == 'poi':
signals['layer_rank'] = W_LAYER_RANK
total += W_LAYER_RANK
# ── Photon position normalization ──
photon_rank = candidate.get('_photon_rank')
if photon_rank is not None:
# Top result gets full bonus, decays linearly
score = max(0, (1.0 - photon_rank / 10.0)) * W_PHOTON_POSITION_NORM
signals['photon_position'] = round(score, 2)
total += score
# ── Business intent POI boost ──
# When the query has no road keywords (likely a business/POI search),
# boost amenity/shop/etc results and penalize highway/route results.
# Skipped for LOCALITY, POSTCODE, COORD queries where class is irrelevant.
if intent not in ('LOCALITY', 'POSTCODE', 'COORD'):
q_tokens_lower = set(parsed.get('raw_query', '').lower().replace(',', ' ').split())
if not (q_tokens_lower & _ROAD_KEYWORDS):
osm_key = (candidate.get('raw') or {}).get('osm_key', '')
if osm_key in ('amenity', 'shop', 'tourism', 'leisure', 'office', 'craft'):
signals['poi_class_boost'] = W_POI_CLASS_BOOST
total += W_POI_CLASS_BOOST
elif osm_key in ('highway', 'route'):
signals['highway_class_penalty'] = W_HIGHWAY_CLASS_PENALTY
total += W_HIGHWAY_CLASS_PENALTY
return round(total, 2), signals
def _build_match_code(candidate, parsed, intent):
"""Build a match_code dict indicating match quality for each field."""
mc = {}
if intent == 'ADDRESS':
q_num = (parsed.get('number') or '').strip().upper()
c_num = (candidate.get('_number') or '').strip().upper()
if q_num and c_num == q_num:
mc['housenumber'] = 'matched'
elif q_num and c_num:
mc['housenumber'] = 'unmatched'
elif q_num and not c_num:
mc['housenumber'] = 'inferred'
q_street = (parsed.get('street') or '').strip().upper()
c_street = (candidate.get('_street') or '').strip().upper()
if q_street and c_street:
q_exp = ' '.join(_expand_street_type(t) for t in q_street.split())
c_exp = ' '.join(_expand_street_type(t) for t in c_street.split())
ratio = fuzz.token_sort_ratio(q_exp, c_exp) / 100.0
mc['street'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_street:
mc['street'] = 'inferred'
q_city = (parsed.get('city') or '').strip().upper()
c_city = (candidate.get('_city') or '').strip().upper()
if q_city and c_city:
ratio = fuzz.ratio(q_city, c_city) / 100.0
mc['city'] = 'matched' if ratio > 0.8 else 'unmatched'
elif q_city:
mc['city'] = 'inferred'
return mc
def _rerank(candidates, parsed, intent, query, limit):
"""Score, sort, and trim candidates. Trace-log top 3."""
scored = []
for c in candidates:
total, signals = _score_candidate(c, parsed, intent)
c['_score'] = total
c['_signals'] = signals
scored.append(c)
scored.sort(key=lambda c: c['_score'], reverse=True)
# Trace log for audit
_trace_logger.debug("─── Query: %r intent=%s ───", query, intent)
for i, c in enumerate(scored):
osm_key = (c.get('raw') or {}).get('osm_key', '')
osm_val = (c.get('raw') or {}).get('osm_value', '')
_trace_logger.debug(
" #%d score=%.2f src=%s key=%s/%s name=%s",
i, c['_score'], c.get('source', '?'), osm_key, osm_val,
c.get('name', '?')[:60]
)
_trace_logger.debug(" signals=%s", c.get('_signals', {}))
# Clean internal fields and add match_code
result = []
for c in scored[:limit]:
mc = _build_match_code(c, parsed, intent)
# Assign confidence from score
score = c.get('_score', 0)
if score >= 10:
confidence = 'exact'
elif score >= 5:
confidence = 'high'
elif score >= 2:
confidence = 'medium'
else:
confidence = 'low'
entry = {
'name': c['name'],
'lat': c['lat'],
'lon': c['lon'],
'source': c['source'],
'confidence': confidence,
'type': c.get('type', 'poi'),
'raw': c.get('raw'),
}
if mc:
entry['match_code'] = mc
result.append(entry)
return result
# ═══════════════════════════════════════════════════════════════════
# STEP 4: ANNOTATION
# ═══════════════════════════════════════════════════════════════════
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _annotate_with_address_book(results):
"""Add labeled_as to results within radius of an address book entry."""
try:
from . import address_book
entries = address_book.load()
except Exception:
return
for result in results:
rlat, rlon = result.get('lat'), result.get('lon')
if rlat is None or rlon is None:
continue
for entry in entries:
elat, elon = entry.get('lat'), entry.get('lon')
if elat is None or elon is None:
continue
if _haversine_m(rlat, rlon, elat, elon) <= ADDRESS_BOOK_ANNOTATION_RADIUS_M:
result['labeled_as'] = entry['name']
break
# ═══════════════════════════════════════════════════════════════════
# PUBLIC API
# ═══════════════════════════════════════════════════════════════════
def geocode(query, limit=10, lat=None, lon=None, zoom=None):
"""
Structured geocoding with multi-source retrieval and reranking.
Returns {query, results: [...], count} always 200-safe.
"""
limit = max(1, min(limit, 20))
q = (query or '').strip()
empty = {'query': q, 'results': [], 'count': 0}
if not q:
return empty
# ── Coordinate detection ──
coords = _parse_coords(q)
if coords:
return {
'query': q,
'results': [{
'name': q,
'lat': coords[0],
'lon': coords[1],
'source': 'coordinates',
'confidence': 'exact',
'type': 'coordinates',
'raw': None,
}],
'count': 1,
}
# ── Address book nickname short-circuit ──
normalized_q = ' '.join(q.lower().replace(',', ' ').split())
is_single_word = ' ' not in normalized_q
try:
from . import address_book
ab_match = address_book.lookup(q)
if (ab_match
and ab_match['confidence'] == 'exact'
and ab_match.get('lat') and ab_match.get('lon')
and is_single_word):
logger.info("geocode: nickname short-circuit %r%s", q, ab_match['name'])
return {
'query': q,
'results': [{
'name': ab_match.get('address') or ab_match['name'],
'lat': ab_match['lat'],
'lon': ab_match['lon'],
'source': 'address_book',
'confidence': 'exact',
'type': 'nickname',
'raw': ab_match,
}],
'count': 1,
}
except Exception as e:
logger.debug("geocode: address_book lookup failed: %s", e)
# ── Classify intent + parse ──
intent, parsed = _classify_and_parse(q)
logger.debug("geocode: intent=%s parsed=%s", intent, parsed)
# ── Retrieve candidates ──
candidates = []
if intent == 'ADDRESS':
# Parallel: Netsyms (structured) + Photon (freetext with expanded query)
netsyms_results = _retrieve_netsyms(parsed, limit=limit, lat=lat, lon=lon)
photon_results = _retrieve_photon_freetext(
parsed.get('expanded_query', q), limit=limit, lat=lat, lon=lon, zoom=zoom
)
# Also try Photon /structured for addresses
photon_struct = _retrieve_photon_structured(parsed, limit=5)
candidates = netsyms_results + photon_results + photon_struct
elif intent == 'POSTCODE':
netsyms_results = _retrieve_netsyms(parsed, limit=limit, lat=lat, lon=lon)
photon_results = _retrieve_photon_freetext(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
candidates = netsyms_results + photon_results
elif intent in ('LOCALITY', 'POI', 'UNKNOWN'):
candidates = _retrieve_photon_freetext(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
# ── Deduplicate by (lat, lon) proximity ──
deduped = []
for c in candidates:
is_dup = False
for existing in deduped:
if (_haversine_m(c['lat'], c['lon'], existing['lat'], existing['lon']) < 50
and c.get('source') == existing.get('source')):
is_dup = True
break
if not is_dup:
deduped.append(c)
candidates = deduped
# ── Rerank ──
results = _rerank(candidates, parsed, intent, q, limit)
# ── Address book annotation ──
_annotate_with_address_book(results)
logger.info("geocode: %r → intent=%s, %d results", q, intent, len(results))
return {'query': q, 'results': results, 'count': len(results)}

View file

@ -1,157 +0,0 @@
#!/usr/bin/env python3
"""Tests for RECON Photon-first geocode chain."""
import sys
import os
import json
import urllib.request
import urllib.parse
BASE = "http://localhost:8420"
TESTS = [
{
"name": "home → nickname short-circuit",
"query": "home",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "address_book"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "nickname"
),
},
{
"name": "214 north st filer → netsyms exact match (multi-word, not nickname)",
"query": "214 north st filer",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "netsyms"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "street_address"
),
},
{
"name": "214 North St, Filer, ID → netsyms (case/punctuation)",
"query": "214 North St, Filer, ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "214 NORTH ST FILER ID → netsyms (uppercase)",
"query": "214 NORTH ST FILER ID",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "netsyms",
},
{
"name": "1600 Pennsylvania Ave Washington DC → White House",
"query": "1600 Pennsylvania Ave Washington DC",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
),
},
{
"name": "1600 pennsylvania ave washington dc → lowercase",
"query": "1600 pennsylvania ave washington dc",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "starbucks filer → POI result",
"query": "starbucks filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "filer idaho → locality",
"query": "filer idaho",
"check": lambda r: (
r["count"] >= 1
and r["results"][0]["source"] == "photon"
and r["results"][0]["type"] == "locality"
),
},
{
"name": "filer → partial query, at least 1 result",
"query": "filer",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "42.5736, -114.6066 → coordinates (with space)",
"query": "42.5736, -114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
and r["results"][0]["type"] == "coordinates"
),
},
{
"name": "42.5736,-114.6066 → coordinates (no space)",
"query": "42.5736,-114.6066",
"check": lambda r: (
r["count"] == 1
and r["results"][0]["source"] == "coordinates"
and r["results"][0]["confidence"] == "exact"
),
},
{
"name": "boise → at least 1 result",
"query": "boise",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "toronto → CA canary",
"query": "toronto",
"check": lambda r: r["count"] >= 1 and r["results"][0]["source"] == "photon",
},
{
"name": "asdfghjklqwerty → empty results, 200 OK",
"query": "asdfghjklqwerty",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
{
"name": "empty query → empty results",
"query": "",
"check": lambda r: r["count"] == 0 and r["results"] == [],
},
]
passed = 0
failed = 0
for t in TESTS:
q = urllib.parse.urlencode({"q": t["query"]}) if t["query"] else "q="
url = f"{BASE}/api/geocode?{q}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
status = resp.status
body = json.loads(resp.read())
except urllib.error.HTTPError as e:
status = e.code
try:
body = json.loads(e.read())
except Exception:
body = {}
except Exception as e:
status = 0
body = {}
print(f" [FAIL] {t['name']}")
print(f" EXCEPTION: {e}")
failed += 1
continue
ok = status == 200 and t["check"](body)
tag = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
top = body.get("results", [{}])[0] if body.get("results") else {}
top_summary = f"source={top.get('source','')} type={top.get('type','')} conf={top.get('confidence','')} name={top.get('name','')[:50]}"
print(f" [{tag}] {t['name']}")
if not ok:
print(f" HTTP {status}, count={body.get('count','?')}, top: {top_summary}")
else:
labeled = f" labeled_as={top.get('labeled_as')}" if top.get('labeled_as') else ""
print(f"{top_summary}{labeled}")
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View file

@ -1,397 +0,0 @@
"""
Google Places (New) API client for tertiary enrichment.
Searches for business POIs and fetches details (opening hours, phone, website)
when OSM + Overture data is incomplete. Uses field masks to minimize cost.
API docs: https://developers.google.com/maps/documentation/places/web-service
"""
import json
import os
import sqlite3
import time
from datetime import date, timezone, datetime
import requests
from .utils import setup_logging
logger = setup_logging('recon.google_places')
API_BASE = 'https://places.googleapis.com/v1'
DEFAULT_DAILY_CAP = 500
REQUEST_TIMEOUT = 3 # seconds
# Google day index → OSM abbreviation
_DAY_ABBR = ['Su', 'Mo', 'Tu', 'We', 'Th', 'Fr', 'Sa']
_db_conn = None
def _get_db():
"""Return a module-level SQLite connection (lazy init)."""
global _db_conn
if _db_conn is not None:
return _db_conn
db_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data')
db_path = os.path.join(db_dir, 'place_cache.db')
_db_conn = sqlite3.connect(db_path, check_same_thread=False)
_db_conn.execute("PRAGMA journal_mode=WAL")
_db_conn.execute("PRAGMA synchronous=NORMAL")
# Ensure google_api_calls table exists
_db_conn.execute("""
CREATE TABLE IF NOT EXISTS google_api_calls (
call_date TEXT PRIMARY KEY,
call_count INTEGER NOT NULL DEFAULT 0
)
""")
_db_conn.commit()
return _db_conn
def _get_api_key():
"""Return the Google Places API key from environment."""
key = os.environ.get('GOOGLE_PLACES_API_KEY')
if not key:
logger.error("GOOGLE_PLACES_API_KEY not set in environment")
return key
def _get_daily_cap():
"""Return the daily API call cap (configurable via deployment config)."""
try:
from .deployment_config import get_deployment_config
config = get_deployment_config()
return config.get('google_places', {}).get('daily_cap', DEFAULT_DAILY_CAP)
except Exception:
return DEFAULT_DAILY_CAP
# ── Daily call counter ──────────────────────────────────────────────────
def check_daily_cap():
"""Return True if under daily cap, False if limit reached."""
db = _get_db()
today = date.today().isoformat()
row = db.execute(
"SELECT call_count FROM google_api_calls WHERE call_date = ?", (today,)
).fetchone()
current = row[0] if row else 0
cap = _get_daily_cap()
if current >= cap:
logger.info(f"google_places: daily_cap_reached count={current} cap={cap}")
return False
return True
def get_daily_count():
"""Return today's API call count."""
db = _get_db()
today = date.today().isoformat()
row = db.execute(
"SELECT call_count FROM google_api_calls WHERE call_date = ?", (today,)
).fetchone()
return row[0] if row else 0
def increment_call_counter():
"""Atomically increment today's API call counter."""
db = _get_db()
today = date.today().isoformat()
db.execute("""
INSERT INTO google_api_calls (call_date, call_count) VALUES (?, 1)
ON CONFLICT(call_date) DO UPDATE SET call_count = call_count + 1
""", (today,))
db.commit()
def _set_daily_count_to_cap():
"""Set today's counter to the cap value (soft-stop on quota error)."""
db = _get_db()
today = date.today().isoformat()
cap = _get_daily_cap()
db.execute("""
INSERT INTO google_api_calls (call_date, call_count) VALUES (?, ?)
ON CONFLICT(call_date) DO UPDATE SET call_count = ?
""", (today, cap, cap))
db.commit()
# ── Google Places cache (on place_cache table) ─────────────────────────
def cache_get_google(osm_type, osm_id):
"""Return (google_place_id, google_data_dict) or (None, None)."""
db = _get_db()
row = db.execute(
"SELECT google_place_id, google_data FROM place_cache WHERE osm_type=? AND osm_id=?",
(osm_type, osm_id)
).fetchone()
if row and row[0]:
data = None
if row[1]:
try:
data = json.loads(row[1])
except (json.JSONDecodeError, TypeError):
pass
return row[0], data
return None, None
def cache_put_google(osm_type, osm_id, place_id, data):
"""Store Google Places data for a cache entry (UPSERT on google columns)."""
db = _get_db()
now = int(time.time())
db.execute("""
INSERT INTO place_cache (osm_type, osm_id, data, source, cached_at, google_place_id, google_data, google_fetched_at)
VALUES (?, ?, '', 'pending', 0, ?, ?, ?)
ON CONFLICT(osm_type, osm_id) DO UPDATE SET
google_place_id = excluded.google_place_id,
google_data = excluded.google_data,
google_fetched_at = excluded.google_fetched_at
""", (osm_type, osm_id, place_id, json.dumps(data) if data else None, now))
db.commit()
# ── API calls ───────────────────────────────────────────────────────────
def search_place(name, lat, lon, radius_m=200):
"""
Search Google Places (New) for a business by name + location.
Returns the Google Place ID of the best match, or None.
"""
key = _get_api_key()
if not key:
return None
if not check_daily_cap():
return None
try:
resp = requests.post(
f'{API_BASE}/places:searchText',
headers={
'Content-Type': 'application/json',
'X-Goog-Api-Key': key,
'X-Goog-FieldMask': 'places.id,places.displayName,places.location',
},
json={
'textQuery': name,
'locationBias': {
'circle': {
'center': {'latitude': lat, 'longitude': lon},
'radius': float(radius_m),
}
},
'maxResultCount': 1,
},
timeout=REQUEST_TIMEOUT,
)
increment_call_counter()
if resp.status_code == 429:
logger.warning("google_places: action=search place=%s result=rate_limited", name)
_set_daily_count_to_cap()
return None
if resp.status_code == 403:
logger.error("google_places: action=search place=%s result=forbidden (invalid key?)", name)
return None
if resp.status_code != 200:
logger.warning("google_places: action=search place=%s result=error status=%d", name, resp.status_code)
return None
data = resp.json()
places = data.get('places', [])
if not places:
logger.info("google_places: action=search place=%s result=miss", name)
return None
place_id = places[0].get('id')
display = places[0].get('displayName', {}).get('text', '?')
logger.info("google_places: action=search place=%s result=hit google_name=%s id=%s", name, display, place_id)
return place_id
except requests.exceptions.Timeout:
logger.warning("google_places: action=search place=%s result=timeout", name)
return None
except Exception as e:
logger.error("google_places: action=search place=%s result=error err=%s", name, e)
return None
def get_place_details(place_id):
"""
Fetch details for a Google Place ID.
Returns dict with {opening_hours, phone_number, website} or None.
"""
key = _get_api_key()
if not key:
return None
if not check_daily_cap():
return None
try:
resp = requests.get(
f'{API_BASE}/places/{place_id}',
headers={
'X-Goog-Api-Key': key,
'X-Goog-FieldMask': 'regularOpeningHours,internationalPhoneNumber,websiteUri',
},
timeout=REQUEST_TIMEOUT,
)
increment_call_counter()
if resp.status_code == 429:
logger.warning("google_places: action=details id=%s result=rate_limited", place_id)
_set_daily_count_to_cap()
return None
if resp.status_code != 200:
logger.warning("google_places: action=details id=%s result=error status=%d", place_id, resp.status_code)
return None
data = resp.json()
result = {
'opening_hours': None,
'opening_hours_raw': None,
'phone_number': None,
'website': None,
}
# Phone
phone = data.get('internationalPhoneNumber')
if phone:
result['phone_number'] = phone.replace(' ', '').replace('-', '')
# Website
result['website'] = data.get('websiteUri')
# Opening hours
hours = data.get('regularOpeningHours')
if hours:
# Try OSM-compatible format from periods
periods = hours.get('periods', [])
if periods:
osm_str = _periods_to_osm(periods)
if osm_str:
result['opening_hours'] = osm_str
# Fallback: weekday descriptions (human-readable)
if not result['opening_hours']:
descriptions = hours.get('weekdayDescriptions')
if descriptions:
result['opening_hours_raw'] = descriptions
logger.info("google_places: action=details id=%s result=hit hours=%s phone=%s website=%s",
place_id,
'yes' if result['opening_hours'] or result['opening_hours_raw'] else 'no',
'yes' if result['phone_number'] else 'no',
'yes' if result['website'] else 'no')
return result
except requests.exceptions.Timeout:
logger.warning("google_places: action=details id=%s result=timeout", place_id)
return None
except Exception as e:
logger.error("google_places: action=details id=%s result=error err=%s", place_id, e)
return None
# ── Opening hours conversion ────────────────────────────────────────────
def _periods_to_osm(periods):
"""
Convert Google Places periods array to OSM opening_hours string.
Google periods: [{"open": {"day": 0-6, "hour": H, "minute": M},
"close": {"day": 0-6, "hour": H, "minute": M}}, ...]
Where day 0 = Sunday.
OSM format: "Mo-Fr 06:00-23:00; Sa-Su 07:00-23:00"
"""
if not periods:
return None
# Check for 24/7: single period with no close, or open 00:00 close 00:00 next day
if len(periods) == 1:
p = periods[0]
o = p.get('open', {})
c = p.get('close')
if c is None and o.get('hour', 0) == 0 and o.get('minute', 0) == 0:
return '24/7'
# Build a map: day_index → "HH:MM-HH:MM"
day_hours = {} # day_index → time_range string
for p in periods:
o = p.get('open', {})
c = p.get('close', {})
day = o.get('day', 0)
open_time = f"{o.get('hour', 0):02d}:{o.get('minute', 0):02d}"
if c:
close_time = f"{c.get('hour', 0):02d}:{c.get('minute', 0):02d}"
# Handle midnight closing (00:00 means end of day)
if close_time == '00:00':
close_time = '24:00'
else:
close_time = '24:00'
time_range = f"{open_time}-{close_time}"
# A day can have multiple periods (e.g., lunch break)
if day in day_hours:
day_hours[day] = day_hours[day] + ',' + time_range
else:
day_hours[day] = time_range
if not day_hours:
return None
# Check if all 7 days have same hours
unique_ranges = set(day_hours.values())
if len(day_hours) == 7 and len(unique_ranges) == 1:
hours = unique_ranges.pop()
if hours == '00:00-24:00':
return '24/7'
return hours # implicit "every day"
# Group consecutive days with same hours
# Reorder to OSM convention: Mo(1) Tu(2) We(3) Th(4) Fr(5) Sa(6) Su(0)
osm_day_order = [1, 2, 3, 4, 5, 6, 0]
groups = []
current_days = []
current_hours = None
for day_idx in osm_day_order:
hours = day_hours.get(day_idx)
if hours == current_hours:
current_days.append(day_idx)
else:
if current_days and current_hours:
groups.append((current_days, current_hours))
current_days = [day_idx]
current_hours = hours
if current_days and current_hours:
groups.append((current_days, current_hours))
if not groups:
return None
# Format each group
parts = []
for days, hours in groups:
if len(days) == 1:
day_str = _DAY_ABBR[days[0]]
elif len(days) == 2:
day_str = f"{_DAY_ABBR[days[0]]},{_DAY_ABBR[days[1]]}"
else:
day_str = f"{_DAY_ABBR[days[0]]}-{_DAY_ABBR[days[-1]]}"
parts.append(f"{day_str} {hours}")
return '; '.join(parts)

View file

@ -1,252 +0,0 @@
"""
PAD-US land classification lookup.
Provides point-in-polygon queries against the USGS Protected Areas Database
(PAD-US) stored in a local PostGIS database. Returns land ownership,
management, and public access information for any lat/lon coordinate.
Connection pool is lazy-initialized on first call. If PostgreSQL is unreachable,
functions return empty results gracefully (feature degrades, doesn't crash).
"""
import os
import psycopg2
import psycopg2.pool
from .utils import setup_logging
logger = setup_logging('recon.landclass')
_pool = None
_pool_failed = False
# ── Label mappings from PAD-US domain tables ────────────────────────────
# Extracted from PADUS4_0_Geodatabase.gdb domain lookup layers.
# ogr2ogr lowercases all column names.
AGENCY_NAME_MAP = {
'TVA': 'Tennessee Valley Authority',
'BLM': 'Bureau of Land Management',
'BOEM': 'Bureau of Ocean Energy Management',
'USBR': 'Bureau of Reclamation',
'FWS': 'U.S. Fish and Wildlife Service',
'USFS': 'Forest Service',
'DOD': 'Department of Defense',
'USACE': 'Army Corps of Engineers',
'DOE': 'Department of Energy',
'NPS': 'National Park Service',
'NRCS': 'Natural Resources Conservation Service',
'ARS': 'Agricultural Research Service',
'BIA': 'Bureau of Indian Affairs',
'NOAA': 'National Oceanic and Atmospheric Administration',
'BPA': 'Bonneville Power Administration',
'OTHF': 'Other or Unknown Federal Land',
'TRIB': 'American Indian Lands',
'SPR': 'State Park and Recreation',
'SDC': 'State Department of Conservation',
'SLB': 'State Land Board',
}
AGENCY_TYPE_MAP = {
'FED': 'Federal',
'TRIB': 'American Indian Lands',
'STAT': 'State',
'DIST': 'Regional Agency Special District',
'LOC': 'Local Government',
'NGO': 'Non-Governmental Organization',
'PVT': 'Private',
'JNT': 'Joint',
'UNK': 'Unknown',
'TERR': 'Territorial',
'DESG': 'Designation',
}
DESIGNATION_TYPE_MAP = {
'NP': 'National Park',
'NM': 'National Monument',
'NCA': 'Conservation Area',
'NF': 'National Forest',
'NG': 'National Grassland',
'PUB': 'National Public Lands',
'NT': 'National Scenic or Historic Trail',
'NWR': 'National Wildlife Refuge',
'WA': 'Wilderness Area',
'WSR': 'Wild and Scenic River',
'WSA': 'Wilderness Study Area',
'MPA': 'Marine Protected Area',
'NRA': 'National Recreation Area',
'NSBV': 'National Scenic, Botanical or Volcanic Area',
'NLS': 'National Lakeshore or Seashore',
'IRA': 'Inventoried Roadless Area',
'ACEC': 'Area of Critical Environmental Concern',
'RNA': 'Research Natural Area',
'REC': 'Recreation Management Area',
'RMA': 'Resource Management Area',
'WPA': 'Watershed Protection Area',
'REA': 'Research or Educational Area',
'HCA': 'Historic or Cultural Area',
'MIT': 'Mitigation Land or Bank',
'MIL': 'Military Land',
'ACC': 'Access Area',
'SDA': 'Special Designation Area',
'PROC': 'Approved or Proclamation Boundary',
'FOTH': 'Federal Other or Unknown',
'ND': 'Not Designated',
}
PUBLIC_ACCESS_MAP = {
'OA': 'Open Access',
'RA': 'Restricted Access',
'XA': 'Closed',
'UK': 'Unknown',
}
GAP_STATUS_MAP = {
'1': 'Managed for biodiversity (disturbance events proceed)',
'2': 'Managed for biodiversity (disturbance suppressed)',
'3': 'Multiple uses (extractive/OHV)',
'4': 'No known mandate for biodiversity protection',
}
CATEGORY_MAP = {
'Fee': 'Fee',
'Easement': 'Easement',
'Other': 'Other',
'Unknown': 'Unknown',
'Designation': 'Designation',
'Marine': 'Marine Area',
'Proclamation': 'Approved, Proclamation or Extent Boundary',
}
STATE_MAP = {
'AL': 'Alabama', 'AK': 'Alaska', 'AZ': 'Arizona', 'AR': 'Arkansas',
'CA': 'California', 'CO': 'Colorado', 'CT': 'Connecticut', 'DE': 'Delaware',
'DC': 'District of Columbia', 'FL': 'Florida', 'GA': 'Georgia', 'HI': 'Hawaii',
'ID': 'Idaho', 'IL': 'Illinois', 'IN': 'Indiana', 'IA': 'Iowa',
'KS': 'Kansas', 'KY': 'Kentucky', 'LA': 'Louisiana', 'ME': 'Maine',
'MD': 'Maryland', 'MA': 'Massachusetts', 'MI': 'Michigan', 'MN': 'Minnesota',
'MS': 'Mississippi', 'MO': 'Missouri', 'MT': 'Montana', 'NE': 'Nebraska',
'NV': 'Nevada', 'NH': 'New Hampshire', 'NJ': 'New Jersey', 'NM': 'New Mexico',
'NY': 'New York', 'NC': 'North Carolina', 'ND': 'North Dakota', 'OH': 'Ohio',
'OK': 'Oklahoma', 'OR': 'Oregon', 'PA': 'Pennsylvania', 'RI': 'Rhode Island',
'SC': 'South Carolina', 'SD': 'South Dakota', 'TN': 'Tennessee', 'TX': 'Texas',
'UT': 'Utah', 'VT': 'Vermont', 'VA': 'Virginia', 'WA': 'Washington',
'WV': 'West Virginia', 'WI': 'Wisconsin', 'WY': 'Wyoming',
}
def _decode(code, label_map):
"""Decode a PAD-US code using a label map. Returns decoded label or the raw code."""
if not code:
return ''
code = str(code).strip()
return label_map.get(code, code)
def _get_pool():
"""Lazy-init the connection pool. Returns None if Postgres is unreachable."""
global _pool, _pool_failed
if _pool is not None:
return _pool
if _pool_failed:
return None
try:
_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=3,
host=os.environ.get('PADUS_DB_HOST', 'localhost'),
port=int(os.environ.get('PADUS_DB_PORT', '5432')),
dbname=os.environ.get('PADUS_DB_NAME', 'padus'),
user=os.environ.get('PADUS_DB_USER', 'overture'),
password=os.environ.get('PADUS_DB_PASSWORD', ''),
connect_timeout=5,
)
logger.info("PAD-US PostgreSQL connection pool initialized")
return _pool
except Exception as e:
_pool_failed = True
logger.warning(f"PAD-US PostgreSQL unavailable, land classification disabled: {e}")
return None
def _query_all(sql, params):
"""Execute a query and return all rows as a list of dicts, or empty list."""
pool = _get_pool()
if pool is None:
return []
conn = None
try:
conn = pool.getconn()
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
if not rows:
return []
cols = [desc[0] for desc in cur.description]
return [dict(zip(cols, row)) for row in rows]
except Exception as e:
logger.warning(f"PAD-US query error: {e}")
if conn:
try:
conn.rollback()
except Exception:
pass
return []
finally:
if conn:
try:
pool.putconn(conn)
except Exception:
pass
def lookup_landclass(lat, lon):
"""
Look up PAD-US land classifications for a point.
Returns a list of classification dicts, ordered by area ascending
(smallest/most specific first). Empty list on error or no results.
"""
rows = _query_all(
"""SELECT unit_nm, mang_name, mang_type, own_name, own_type,
des_tp, gap_sts, pub_access, category, gis_acres, state_nm
FROM pad_units
WHERE ST_Intersects(geom, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
ORDER BY gis_acres ASC
LIMIT 10""",
(lon, lat)
)
results = []
for row in rows:
pa_code = str(row.get('pub_access', '')).strip()
results.append({
'unit_name': (row.get('unit_nm') or '').strip(),
'manager_name': _decode(row.get('mang_name'), AGENCY_NAME_MAP),
'manager_type': _decode(row.get('mang_type'), AGENCY_TYPE_MAP),
'owner_type': _decode(row.get('own_type'), AGENCY_TYPE_MAP),
'designation_type': _decode(row.get('des_tp'), DESIGNATION_TYPE_MAP),
'gap_status': str(row.get('gap_sts', '')).strip(),
'public_access': _decode(pa_code, PUBLIC_ACCESS_MAP),
'public_access_code': pa_code,
'category': _decode(row.get('category'), CATEGORY_MAP),
'acres': row.get('gis_acres'),
'state': _decode(row.get('state_nm'), STATE_MAP),
})
return results
def format_summary(classifications):
"""
Format a human-readable summary from classification results.
Returns the most specific unit name, or None if no results.
"""
if not classifications:
return None
# First result is smallest/most specific (ordered by acres ASC)
return classifications[0].get('unit_name') or None

View file

@ -1,168 +0,0 @@
"""Navigation tools: geocoding via Photon and routing via Valhalla."""
import math
import re
import requests
from .utils import setup_logging
logger = setup_logging('recon.nav_tools')
PHOTON_URL = "http://localhost:2322"
VALHALLA_URL = "http://localhost:8002"
# Regional bias for Photon searches (Idaho-centric for Matt's use case).
# Adjustable — Photon uses these to rank nearby results higher.
GEOCODE_BIAS_LAT = 42.5736
GEOCODE_BIAS_LON = -114.6066
GEOCODE_BIAS_ZOOM = 10
# Distance threshold (meters) for annotating Photon results with address
# book labels. 75m covers GPS jitter + geocoder imprecision.
ADDRESS_BOOK_ANNOTATION_RADIUS_M = 75
# Coordinate regex — handles comma-separated and space-separated forms.
_COORD_RE = re.compile(
r'^\s*(-?\d+\.\d+)\s*[,\s]\s*(-?\d+\.\d+)\s*$'
)
VALID_MODES = {"auto", "pedestrian", "bicycle", "truck"}
def _parse_coords(text: str):
"""Return (lat, lon) if text looks like coordinates with valid bounds, else None."""
m = _COORD_RE.match(text.strip())
if not m:
return None
lat, lon = float(m.group(1)), float(m.group(2))
if -90 <= lat <= 90 and -180 <= lon <= 180:
return lat, lon
return None
def _haversine_m(lat1, lon1, lat2, lon2):
"""Haversine distance in meters between two (lat, lon) points."""
R = 6_371_000 # Earth radius in meters
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def geocode(query: str, limit: int = 10, lat=None, lon=None, zoom=None):
"""Delegate to the structured geocode module. See lib/geocode.py."""
from . import geocode as geocode_mod
return geocode_mod.geocode(query, limit=limit, lat=lat, lon=lon, zoom=zoom)
def _geocode(query: str):
"""Internal: returns (lat, lon, display_name) tuple for route()."""
result = geocode(query, limit=1)
results = result.get('results', [])
if not results:
raise ValueError(f"Could not find location: {query}")
top = results[0]
return top['lat'], top['lon'], top['name']
def reverse_geocode(lat: float, lon: float) -> str:
"""Reverse geocode coordinates via Photon. Returns formatted address string."""
try:
resp = requests.get(
f"{PHOTON_URL}/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
data = resp.json()
features = data.get("features", [])
if not features:
return f"{lat}, {lon}"
props = features[0]["properties"]
parts = []
for key in ("name", "housenumber", "street", "city", "state", "country", "postcode"):
v = props.get(key)
if v:
parts.append(v)
return ", ".join(parts) if parts else f"{lat}, {lon}"
def route(origin: str, destination: str, mode: str = "auto") -> dict:
"""
Get a route between two locations.
Args:
origin: Starting location address, place name, or "lat,lon"
destination: Destination address, place name, or "lat,lon"
mode: Travel mode auto, pedestrian, bicycle, truck
Returns:
dict with summary, maneuvers, origin/destination info, and raw shape
"""
if mode not in VALID_MODES:
mode = "auto"
# Geocode both endpoints
orig_lat, orig_lon, orig_name = _geocode(origin)
dest_lat, dest_lon, dest_name = _geocode(destination)
# Query Valhalla
valhalla_req = {
"locations": [
{"lat": orig_lat, "lon": orig_lon},
{"lat": dest_lat, "lon": dest_lon},
],
"costing": mode,
"directions_options": {"units": "miles"},
}
try:
resp = requests.post(
f"{VALHALLA_URL}/route",
json=valhalla_req,
timeout=30,
)
except requests.RequestException:
raise RuntimeError("Navigation service unavailable")
if resp.status_code != 200:
try:
err = resp.json()
msg = err.get("error", "Unknown routing error")
except Exception:
msg = f"Routing error (HTTP {resp.status_code})"
raise RuntimeError(f"No route found between locations: {msg}")
data = resp.json()
trip = data["trip"]
summary = trip["summary"]
leg = trip["legs"][0]
# Build maneuver list
maneuvers = []
for m in leg["maneuvers"]:
streets = m.get("street_names", [])
maneuvers.append({
"instruction": m["instruction"],
"distance_miles": round(m.get("length", 0), 2),
"street_name": streets[0] if streets else "",
"type": m.get("type", 0),
"verbal_succinct": m.get("verbal_succinct_transition_instruction", ""),
})
return {
"origin": {"name": orig_name, "lat": orig_lat, "lon": orig_lon},
"destination": {"name": dest_name, "lat": dest_lat, "lon": dest_lon},
"summary": {
"distance_miles": round(summary["length"], 1),
"time_minutes": round(summary["time"] / 60, 1),
"mode": mode,
},
"maneuvers": maneuvers,
"shape": leg.get("shape", ""),
}

View file

@ -1,77 +0,0 @@
"""Tests for nav_tools — run against live Photon + Valhalla services."""
import sys
import json
from nav_tools import route, reverse_geocode
def test_route_named():
"""route("Buhl Idaho", "Boise Idaho", "auto") returns maneuvers."""
print("TEST 1: route('Buhl Idaho', 'Boise Idaho', 'auto')")
r = route("Buhl Idaho", "Boise Idaho", "auto")
assert r["summary"]["distance_miles"] > 50, f"Expected >50 mi, got {r['summary']['distance_miles']}"
assert r["summary"]["time_minutes"] > 60, f"Expected >60 min, got {r['summary']['time_minutes']}"
assert len(r["maneuvers"]) > 5, f"Expected >5 maneuvers, got {len(r['maneuvers'])}"
assert r["shape"], "Missing polyline shape"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min, {len(r['maneuvers'])} maneuvers")
print(f" Origin: {r['origin']['name']}")
print(f" Destination: {r['destination']['name']}")
print(f" First maneuver: {r['maneuvers'][0]['instruction']}")
def test_route_coords():
"""route with raw lat,lon coordinates."""
print("\nTEST 2: route('42.5991,-114.7636', '43.615,-116.2023', 'auto')")
r = route("42.5991,-114.7636", "43.615,-116.2023", "auto")
assert r["summary"]["distance_miles"] > 100, f"Expected >100 mi, got {r['summary']['distance_miles']}"
assert len(r["maneuvers"]) > 3, f"Expected >3 maneuvers"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min")
def test_route_pedestrian():
"""route with pedestrian mode."""
print("\nTEST 3: route('Buhl Idaho', 'Boise Idaho', 'pedestrian')")
r = route("Buhl Idaho", "Boise Idaho", "pedestrian")
assert r["summary"]["mode"] == "pedestrian"
assert r["summary"]["time_minutes"] > r["summary"]["distance_miles"], "Walking should take more min than miles"
print(f" OK — {r['summary']['distance_miles']} mi, {r['summary']['time_minutes']} min (pedestrian)")
def test_reverse_geocode():
"""reverse_geocode near Buhl, Idaho."""
print("\nTEST 4: reverse_geocode(42.5991, -114.7636)")
result = reverse_geocode(42.5991, -114.7636)
assert "Buhl" in result or "Twin Falls" in result or "Idaho" in result, f"Expected Buhl/Idaho, got: {result}"
print(f" OK — {result}")
def test_route_bad_origin():
"""route with nonexistent place returns clean error."""
print("\nTEST 5: route('nonexistent place xyz123abc', 'Boise Idaho')")
try:
r = route("nonexistent place xyz123abc", "Boise Idaho")
print(f" FAIL — expected error, got result: {r['summary']}")
return False
except ValueError as e:
print(f" OK — clean error: {e}")
except RuntimeError as e:
print(f" OK — runtime error: {e}")
if __name__ == "__main__":
passed = 0
failed = 0
tests = [test_route_named, test_route_coords, test_route_pedestrian, test_reverse_geocode, test_route_bad_origin]
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f" FAIL — {e}")
failed += 1
print(f"\n{'='*40}")
print(f"Results: {passed} passed, {failed} failed out of {len(tests)}")
sys.exit(1 if failed else 0)

View file

@ -1,22 +1,18 @@
"""
RECON Netsyms API + Geocode Flask Blueprints.
RECON Netsyms API Flask Blueprint.
GET /api/netsyms/lookup?q=<free text>&country=<optional>
GET /api/netsyms/health
GET /api/geocode?q=<query>&limit=<N> (Photon-first search with ranked results)
"""
from flask import Blueprint, request, jsonify
from . import netsyms
from . import address_book
from . import nav_tools
from .utils import setup_logging
logger = setup_logging('recon.netsyms_api')
netsyms_bp = Blueprint('netsyms', __name__)
geocode_bp = Blueprint('geocode', __name__)
@netsyms_bp.route('/api/netsyms/lookup')
@ -33,94 +29,3 @@ def api_netsyms_lookup():
@netsyms_bp.route('/api/netsyms/health')
def api_netsyms_health():
return jsonify(netsyms.health())
def _safe_float(val, lo, hi):
"""Parse val as float; return None if missing, non-numeric, or out of [lo, hi]."""
if val is None:
return None
try:
f = float(val)
if lo <= f <= hi:
return f
except (ValueError, TypeError):
pass
return None
@geocode_bp.route('/api/geocode')
def api_geocode():
"""
Photon-first geocoding with ranked candidates.
GET /api/geocode?q=<query>&limit=<N>
Always returns 200 OK with:
{query, results: [{name, lat, lon, source, confidence, type, raw, ...}], count}
- source: "address_book" | "coordinates" | "photon"
- confidence: "exact" | "high" | "medium" | "low"
- type: "nickname" | "coordinates" | "street_address" | "poi" | "locality"
- labeled_as: present when result is within 75m of an address book entry
- Empty results array is valid (no match). No 404s.
"""
q = request.args.get('q', '').strip()
limit = request.args.get('limit', '10')
try:
limit = max(1, min(int(limit), 20))
except (ValueError, TypeError):
limit = 10
# Viewport bias parameters (optional)
lat = _safe_float(request.args.get("lat"), -90, 90)
lon = _safe_float(request.args.get("lon"), -180, 180)
zoom = _safe_float(request.args.get("zoom"), 0, 22)
result = nav_tools.geocode(q, limit=limit, lat=lat, lon=lon, zoom=zoom)
return jsonify(result)
@geocode_bp.route('/api/reverse')
def api_reverse():
"""
Reverse geocode coordinates via Photon.
GET /api/reverse?lat=X&lon=Y
Returns same shape as /api/geocode:
{query: "lat,lon", results: [{name, lat, lon, source, type, raw, ...}], count}
Returns 200 OK with empty results on no match. 400 on invalid coords.
"""
try:
lat = float(request.args.get('lat', ''))
lon = float(request.args.get('lon', ''))
except (ValueError, TypeError):
return jsonify({'error': 'Missing or invalid lat/lon parameters'}), 400
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
return jsonify({'error': 'Coordinates out of range'}), 400
query_str = f"{lat},{lon}"
try:
import requests as http_requests
resp = http_requests.get(
"http://localhost:2322/reverse",
params={"lat": lat, "lon": lon, "limit": 1},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
features = data.get("features", [])
except Exception:
logger.warning("Photon reverse geocode failed for %s", query_str)
return jsonify({'query': query_str, 'results': [], 'count': 0})
if not features:
return jsonify({'query': query_str, 'results': [], 'count': 0})
from .geocode import _parse_photon_features
results = _parse_photon_features(features, source='photon_reverse')
return jsonify({'query': query_str, 'results': results, 'count': len(results)})

View file

@ -1,143 +0,0 @@
"""
Human-readable category names for OSM class/type pairs.
Used by the place detail proxy to turn ("amenity", "cafe") into "Coffee shop".
Covers ~50 common categories; unmapped pairs fall back to title-cased class:type.
"""
# Exact (class, type) → label
CATEGORY_MAP = {
# Amenity
("amenity", "cafe"): "Coffee shop",
("amenity", "restaurant"): "Restaurant",
("amenity", "fast_food"): "Fast food restaurant",
("amenity", "bar"): "Bar",
("amenity", "pub"): "Pub",
("amenity", "biergarten"): "Beer garden",
("amenity", "ice_cream"): "Ice cream shop",
("amenity", "fuel"): "Gas station",
("amenity", "charging_station"): "EV charging station",
("amenity", "parking"): "Parking",
("amenity", "bank"): "Bank",
("amenity", "atm"): "ATM",
("amenity", "pharmacy"): "Pharmacy",
("amenity", "hospital"): "Hospital",
("amenity", "clinic"): "Clinic",
("amenity", "dentist"): "Dentist",
("amenity", "doctors"): "Doctor's office",
("amenity", "veterinary"): "Veterinarian",
("amenity", "school"): "School",
("amenity", "university"): "University",
("amenity", "college"): "College",
("amenity", "library"): "Library",
("amenity", "post_office"): "Post office",
("amenity", "fire_station"): "Fire station",
("amenity", "police"): "Police station",
("amenity", "townhall"): "Town hall",
("amenity", "place_of_worship"): "Place of worship",
("amenity", "theatre"): "Theatre",
("amenity", "cinema"): "Cinema",
("amenity", "community_centre"): "Community center",
("amenity", "toilets"): "Restrooms",
("amenity", "drinking_water"): "Drinking water",
("amenity", "shelter"): "Shelter",
("amenity", "camping"): "Campground",
# Shop
("shop", "supermarket"): "Supermarket",
("shop", "convenience"): "Convenience store",
("shop", "hardware"): "Hardware store",
("shop", "clothes"): "Clothing store",
("shop", "car_repair"): "Auto repair",
("shop", "car"): "Car dealership",
("shop", "bakery"): "Bakery",
("shop", "butcher"): "Butcher",
# Leisure
("leisure", "park"): "Park",
("leisure", "playground"): "Playground",
("leisure", "sports_centre"): "Sports center",
("leisure", "swimming_pool"): "Swimming pool",
("leisure", "golf_course"): "Golf course",
("leisure", "nature_reserve"): "Nature reserve",
("leisure", "campsite"): "Campsite",
# Tourism
("tourism", "hotel"): "Hotel",
("tourism", "motel"): "Motel",
("tourism", "guest_house"): "Guest house",
("tourism", "hostel"): "Hostel",
("tourism", "camp_site"): "Campsite",
("tourism", "viewpoint"): "Viewpoint",
("tourism", "museum"): "Museum",
("tourism", "information"): "Information",
("tourism", "attraction"): "Tourist attraction",
("tourism", "picnic_site"): "Picnic site",
# Natural
("natural", "peak"): "Peak",
("natural", "spring"): "Spring",
("natural", "hot_spring"): "Hot spring",
("natural", "lake"): "Lake",
("natural", "water"): "Water body",
("natural", "cliff"): "Cliff",
("natural", "cave_entrance"): "Cave",
# Highway
("highway", "bus_stop"): "Bus stop",
("highway", "rest_area"): "Rest area",
# Boundary
("boundary", "administrative"): "Administrative boundary",
("boundary", "protected_area"): "Protected area",
("boundary", "national_park"): "National park",
# Place
("place", "city"): "City",
("place", "town"): "Town",
("place", "village"): "Village",
("place", "hamlet"): "Hamlet",
("place", "suburb"): "Suburb",
("place", "neighbourhood"): "Neighborhood",
# Building
("building", "yes"): "Building",
# Waterway
("waterway", "river"): "River",
("waterway", "stream"): "Stream",
("waterway", "waterfall"): "Waterfall",
# Landuse
("landuse", "cemetery"): "Cemetery",
("landuse", "forest"): "Forest",
# Historic
("historic", "monument"): "Monument",
("historic", "memorial"): "Memorial",
("historic", "ruins"): "Ruins",
}
# Class-level wildcard fallbacks (when exact type isn't mapped)
CLASS_FALLBACKS = {
"shop": "Shop",
"amenity": "Amenity",
"leisure": "Leisure",
"tourism": "Tourism",
"natural": "Natural feature",
"historic": "Historic site",
}
def humanize_category(osm_class, osm_type):
"""Return a human-readable category string for an OSM class/type pair."""
if not osm_class or not osm_type:
return "Place"
osm_class = osm_class.lower()
osm_type = osm_type.lower()
# Exact match
label = CATEGORY_MAP.get((osm_class, osm_type))
if label:
return label
# Class-level wildcard with formatted type
prefix = CLASS_FALLBACKS.get(osm_class)
if prefix:
nice_type = osm_type.replace("_", " ").title()
return f"{prefix}: {nice_type}" if prefix != nice_type else prefix
# Generic fallback
nice_class = osm_class.replace("_", " ").title()
nice_type = osm_type.replace("_", " ").title()
return f"{nice_class}: {nice_type}"

View file

@ -1,170 +0,0 @@
"""
Overture Maps enrichment layer.
Provides lookup functions against the local PostgreSQL Overture Places database.
Two strategies:
1. find_by_osm_id exact match via OSM cross-reference index
2. find_by_coords_and_name spatial + fuzzy name fallback
Connection pool is lazy-initialized on first call. If PostgreSQL is unreachable,
functions return None gracefully (feature degrades, doesn't crash).
"""
import json
import os
import psycopg2
import psycopg2.pool
from .utils import setup_logging
logger = setup_logging('recon.overture')
_pool = None
_pool_failed = False
# Map full OSM type names to single-letter codes used in Overture sources
OSM_TYPE_MAP = {
'N': 'n', 'W': 'w', 'R': 'r',
'node': 'n', 'way': 'w', 'relation': 'r',
'n': 'n', 'w': 'w', 'r': 'r',
}
def _get_pool():
"""Lazy-init the connection pool. Returns None if Postgres is unreachable."""
global _pool, _pool_failed
if _pool is not None:
return _pool
if _pool_failed:
return None
try:
_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=3,
host=os.environ.get('OVERTURE_DB_HOST', 'localhost'),
port=int(os.environ.get('OVERTURE_DB_PORT', '5432')),
dbname=os.environ.get('OVERTURE_DB_NAME', 'overture'),
user=os.environ.get('OVERTURE_DB_USER', 'overture'),
password=os.environ.get('OVERTURE_DB_PASSWORD', ''),
connect_timeout=5,
)
logger.info("Overture PostgreSQL connection pool initialized")
return _pool
except Exception as e:
_pool_failed = True
logger.warning(f"Overture PostgreSQL unavailable, enrichment disabled: {e}")
return None
def _query(sql, params):
"""Execute a query and return the first row as a dict, or None."""
pool = _get_pool()
if pool is None:
return None
conn = None
try:
conn = pool.getconn()
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
if row is None:
return None
cols = [desc[0] for desc in cur.description]
return dict(zip(cols, row))
except Exception as e:
logger.warning(f"Overture query error: {e}")
if conn:
try:
conn.rollback()
except Exception:
pass
return None
finally:
if conn:
try:
pool.putconn(conn)
except Exception:
pass
def _format_result(row, match_method):
"""Convert a database row dict to the enrichment result shape."""
if not row:
return None
socials = row.get('socials')
if isinstance(socials, str):
try:
socials = json.loads(socials)
except (json.JSONDecodeError, TypeError):
socials = None
return {
'phone': row.get('phone'),
'website': row.get('website'),
'socials': socials,
'brand_name': row.get('brand_name'),
'brand_wikidata': row.get('brand_wikidata'),
'basic_category': row.get('basic_category'),
'confidence': row.get('confidence'),
'gers_id': row.get('id'),
'match_method': match_method,
}
def find_by_osm_id(osm_type, osm_id):
"""
Look up an Overture place by its OSM cross-reference.
Args:
osm_type: OSM type 'N', 'W', 'R', 'node', 'way', 'relation', or single letter
osm_id: OSM numeric ID
Returns:
Enrichment dict or None
"""
type_letter = OSM_TYPE_MAP.get(osm_type)
if not type_letter:
return None
row = _query(
"""SELECT id, name, basic_category, confidence,
phone, website, socials, brand_name, brand_wikidata
FROM places
WHERE osm_type = %s AND osm_id = %s
LIMIT 1""",
(type_letter, int(osm_id))
)
return _format_result(row, 'osm_xref')
def find_by_coords_and_name(lat, lon, name, radius_m=100):
"""
Look up an Overture place by spatial proximity + fuzzy name match.
Args:
lat: Latitude
lon: Longitude
name: Place name to fuzzy-match
radius_m: Search radius in meters (default 100)
Returns:
Enrichment dict or None
"""
if not name or not lat or not lon:
return None
row = _query(
"""SELECT id, name, basic_category, confidence,
phone, website, socials, brand_name, brand_wikidata,
similarity(name, %s) AS sim
FROM places
WHERE ST_DWithin(geometry::geography, ST_MakePoint(%s, %s)::geography, %s)
AND similarity(name, %s) > 0.4
ORDER BY sim DESC, ST_Distance(geometry::geography, ST_MakePoint(%s, %s)::geography) ASC
LIMIT 1""",
(name, lon, lat, radius_m, name, lon, lat)
)
return _format_result(row, 'coord_name_fuzzy')

View file

@ -1,323 +0,0 @@
"""
RECON PeerTube Writer
Authenticated PeerTube API client for pushing domain category assignments.
Uses OAuth2 password grant, caches tokens, refreshes on 401.
Config keys used:
peertube.api_url internal PeerTube URL (http://192.168.1.170:9000)
peertube.host_header Host header for API requests (stream.echo6.co)
peertube.username PeerTube admin username
peertube.password_env env var name holding the password
peertube.rate_limit_delay delay between API calls (seconds)
"""
import json
import os
import time
import requests as http_requests
from .recon_domains import DOMAIN_CATEGORY_MAP
from .utils import setup_logging
logger = setup_logging('recon.peertube_writer')
TOKEN_CACHE_PATH = '/opt/recon/data/peertube-oauth-token.json'
def _get_peertube_config(config):
"""Extract PeerTube writer config with defaults."""
pt = config.get('peertube', {})
return {
'api_url': pt.get('api_url', pt.get('api_base', 'http://192.168.1.170:9000')),
'host_header': pt.get('host_header', 'stream.echo6.co'),
'username': pt.get('username', 'root'),
'password_env': pt.get('password_env', 'PEERTUBE_PASSWORD'),
'rate_limit_delay': pt.get('writer_rate_limit', 0.1),
}
def _load_cached_token():
"""Load cached OAuth token from disk."""
if os.path.exists(TOKEN_CACHE_PATH):
try:
with open(TOKEN_CACHE_PATH, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return None
def _save_token(token_data):
"""Save OAuth token to disk cache."""
os.makedirs(os.path.dirname(TOKEN_CACHE_PATH), exist_ok=True)
with open(TOKEN_CACHE_PATH, 'w') as f:
json.dump(token_data, f)
def _get_oauth_client(api_url, host_header):
"""Get PeerTube OAuth client credentials.
Args:
api_url: Base API URL
host_header: Host header value
Returns:
(client_id, client_secret) tuple
"""
resp = http_requests.get(
f"{api_url}/api/v1/oauth-clients/local",
headers={'Host': host_header},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data['client_id'], data['client_secret']
def _get_token(api_url, host_header, username, password, client_id, client_secret):
"""Obtain OAuth2 access token via password grant.
Args:
api_url: Base API URL
host_header: Host header value
username: PeerTube username
password: PeerTube password
client_id: OAuth client ID
client_secret: OAuth client secret
Returns:
Token data dict with access_token, refresh_token, etc.
"""
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'password',
'username': username,
'password': password,
},
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
token_data['client_id'] = client_id
token_data['client_secret'] = client_secret
_save_token(token_data)
return token_data
def _refresh_token(api_url, host_header, token_data):
"""Refresh an expired access token.
Returns:
New token data dict, or None on failure.
"""
try:
resp = http_requests.post(
f"{api_url}/api/v1/users/token",
headers={'Host': host_header},
data={
'client_id': token_data['client_id'],
'client_secret': token_data['client_secret'],
'grant_type': 'refresh_token',
'refresh_token': token_data['refresh_token'],
},
timeout=30,
)
resp.raise_for_status()
new_data = resp.json()
new_data['client_id'] = token_data['client_id']
new_data['client_secret'] = token_data['client_secret']
_save_token(new_data)
return new_data
except Exception as e:
logger.warning(f"Token refresh failed: {e}")
return None
def _ensure_token(config):
"""Ensure we have a valid OAuth token. Returns token data dict.
Tries cached token first, then obtains a new one.
"""
pt = _get_peertube_config(config)
password = os.environ.get(pt['password_env'], '')
if not password:
raise ValueError(f"PeerTube password not set in env var {pt['password_env']}")
# Try cached token
token_data = _load_cached_token()
if token_data and 'access_token' in token_data:
return token_data
# Get fresh token
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
return _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
def _api_request(method, path, config, token_data, **kwargs):
"""Make an authenticated PeerTube API request with auto-refresh on 401.
Args:
method: HTTP method ('GET', 'PUT', etc.)
path: API path (e.g. '/api/v1/videos/{uuid}')
config: RECON config dict
token_data: Current token data dict
**kwargs: Additional requests kwargs (json, data, etc.)
Returns:
(response, token_data) tuple token_data may be refreshed.
"""
pt = _get_peertube_config(config)
url = f"{pt['api_url']}{path}"
headers = {
'Host': pt['host_header'],
'Authorization': f"Bearer {token_data['access_token']}",
}
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
if resp.status_code == 401:
# Try refresh
new_token = _refresh_token(pt['api_url'], pt['host_header'], token_data)
if new_token:
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
else:
# Full re-auth
password = os.environ.get(pt['password_env'], '')
client_id, client_secret = _get_oauth_client(pt['api_url'], pt['host_header'])
new_token = _get_token(
pt['api_url'], pt['host_header'],
pt['username'], password,
client_id, client_secret,
)
headers['Authorization'] = f"Bearer {new_token['access_token']}"
resp = http_requests.request(method, url, headers=headers, timeout=30, **kwargs)
return resp, new_token
return resp, token_data
def push_category(video_uuid, category_id, config, token_data=None):
"""Push a category assignment to a single PeerTube video.
Args:
video_uuid: PeerTube video UUID
category_id: Category ID (100-117)
config: RECON config dict
token_data: Optional pre-fetched token data
Returns:
(success: bool, token_data: dict) tuple
"""
if token_data is None:
token_data = _ensure_token(config)
resp, token_data = _api_request(
'PUT',
f'/api/v1/videos/{video_uuid}',
config,
token_data,
json={'category': category_id},
)
if resp.status_code in (200, 204):
return True, token_data
else:
logger.warning(f"Failed to push category for {video_uuid}: "
f"HTTP {resp.status_code}{resp.text[:200]}")
return False, token_data
def extract_uuid(catalogue_path):
"""Extract PeerTube video UUID from catalogue path.
Catalogue paths for PeerTube videos look like:
https://stream.echo6.co/w/UUID
Args:
catalogue_path: catalogue.path value
Returns:
UUID string or None
"""
if not catalogue_path:
return None
if '/w/' in catalogue_path:
return catalogue_path.rsplit('/w/', 1)[-1]
return None
def push_pending(db, config, limit=None):
"""Push all assigned-but-unpushed domain categories to PeerTube.
Args:
db: StatusDB instance
config: RECON config dict
limit: Optional max number of items to push
Returns:
(success_count, fail_count) tuple
"""
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
if not items:
logger.info("No unpushed assignments to push")
return (0, 0)
pt = _get_peertube_config(config)
delay = pt['rate_limit_delay']
SYSTEMIC_FAIL_THRESHOLD = 5 # abort if first N items all fail
logger.info(f"Pushing {len(items)} category assignments to PeerTube")
token_data = _ensure_token(config)
success = 0
failed = 0
for item in items:
file_hash = item['hash']
domain = item.get('recon_domain')
catalogue_path = item.get('catalogue_path', '')
if not domain or domain not in DOMAIN_CATEGORY_MAP:
logger.warning(f" {file_hash[:12]}: invalid domain '{domain}', skipping")
failed += 1
continue
uuid = extract_uuid(catalogue_path)
if not uuid:
logger.warning(f" {file_hash[:12]}: could not extract UUID from '{catalogue_path}'")
failed += 1
continue
category_id = DOMAIN_CATEGORY_MAP[domain]
ok, token_data = push_category(uuid, category_id, config, token_data)
if ok:
db.set_peertube_pushed(file_hash)
success += 1
else:
failed += 1
# Abort on systemic failure (e.g. plugin not installed, auth broken)
if success == 0 and failed >= SYSTEMIC_FAIL_THRESHOLD:
logger.error(f"Aborting push: first {failed} items all failed — "
f"check plugin installation and PeerTube API config")
break
time.sleep(delay)
logger.info(f"Push complete: {success} succeeded, {failed} failed")
return (success, failed)

View file

@ -1,817 +0,0 @@
"""
Place detail proxy local Nominatim first, Overpass API fallback, SQLite cache.
Overture Maps enrichment layer fills sparse extratags (phone, website, brand).
Provides get_place_detail(osm_type, osm_id) which returns a cleaned dict
matching the response shape for /api/place/<osm_type>/<osm_id>.
"""
import json
import os
import sqlite3
import time
import requests as http_requests
from .osm_categories import humanize_category
from .utils import setup_logging
logger = setup_logging('recon.place_detail')
NOMINATIM_URL = "http://localhost:8010/details.php"
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
OVERPASS_UA = "Navi/1.0 (forge.echo6.co/matt/recon)"
VALID_OSM_TYPES = {"N", "W", "R"}
_db_conn = None
# ── SQLite cache ────────────────────────────────────────────────────────
def _get_db():
"""Return a module-level SQLite connection (lazy init)."""
global _db_conn
if _db_conn is not None:
return _db_conn
db_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data')
os.makedirs(db_dir, exist_ok=True)
db_path = os.path.join(db_dir, 'place_cache.db')
_db_conn = sqlite3.connect(db_path, check_same_thread=False)
_db_conn.execute("PRAGMA journal_mode=WAL")
_db_conn.execute("PRAGMA synchronous=NORMAL")
_db_conn.execute("""
CREATE TABLE IF NOT EXISTS place_cache (
osm_type TEXT NOT NULL,
osm_id INTEGER NOT NULL,
data TEXT NOT NULL,
source TEXT NOT NULL,
cached_at INTEGER NOT NULL,
PRIMARY KEY (osm_type, osm_id)
)
""")
_db_conn.commit()
logger.info(f"Place cache DB ready at {db_path}")
return _db_conn
def cache_get(osm_type, osm_id):
"""Return cached place dict or None."""
db = _get_db()
row = db.execute(
"SELECT data FROM place_cache WHERE osm_type=? AND osm_id=?",
(osm_type, osm_id)
).fetchone()
if row:
try:
result = json.loads(row[0])
result['source'] = 'cache'
return result
except (json.JSONDecodeError, TypeError):
pass
return None
def cache_put(osm_type, osm_id, data, source):
"""Store a place detail result in the cache (preserves google columns)."""
db = _get_db()
now = int(time.time())
db.execute("""
INSERT INTO place_cache (osm_type, osm_id, data, source, cached_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(osm_type, osm_id) DO UPDATE SET
data = excluded.data,
source = excluded.source,
cached_at = excluded.cached_at
""", (osm_type, osm_id, json.dumps(data), source, now))
db.commit()
# ── Overture enrichment ─────────────────────────────────────────────────
def _enrich_with_overture(result, osm_type, osm_id):
"""
Attempt to enrich a place result with Overture Maps data.
Fills sparse extratags (phone, website, brand) without overwriting existing values.
Returns the (possibly enriched) result dict.
"""
try:
from .deployment_config import get_deployment_config
deploy_config = get_deployment_config()
features = deploy_config.get('features', {})
if not features.get('has_overture_enrichment', False):
return result
except Exception:
return result
try:
from .overture import find_by_osm_id, find_by_coords_and_name
except ImportError:
logger.debug("Overture module not available")
return result
enrichment = None
match_method = None
# Strategy 1: OSM cross-reference (exact)
enrichment = find_by_osm_id(osm_type, osm_id)
if enrichment:
match_method = 'osm_xref'
# Strategy 2: Coordinate + name fuzzy (fallback)
if not enrichment and result.get('centroid') and result.get('name'):
centroid = result['centroid']
if centroid.get('lat') and centroid.get('lon'):
enrichment = find_by_coords_and_name(
centroid['lat'], centroid['lon'], result['name']
)
if enrichment:
match_method = 'coord_name_fuzzy'
if not enrichment:
return result
# Fill sparse extratags (never overwrite existing non-null values)
extratags = result.get('extratags', {})
fill_map = [
('phone', 'phone'),
('website', 'website'),
('brand', 'brand_name'),
('brand:wikidata', 'brand_wikidata'),
]
for osm_key, overture_key in fill_map:
if not extratags.get(osm_key) and enrichment.get(overture_key):
extratags[osm_key] = enrichment[overture_key]
result['extratags'] = extratags
# Add source metadata
result['sources'] = {
'primary': result.get('source', 'unknown'),
'enrichment': 'overture',
'overture_match_method': match_method,
'overture_gers_id': enrichment.get('gers_id'),
'overture_confidence': enrichment.get('confidence'),
'overture_basic_category': enrichment.get('basic_category'),
}
logger.debug(f"Overture enrichment for {osm_type}/{osm_id}: {match_method}")
return result
# ── Google Places enrichment (tertiary, gap-fill only) ──────────────
# Business POI classes eligible for Google enrichment
_BUSINESS_CLASSES = {'amenity', 'shop', 'tourism', 'leisure', 'office', 'craft'}
# Fields Google can fill
_GOOGLE_GAP_FIELDS = ('opening_hours', 'phone', 'website')
def _enrich_with_google(result, osm_type, osm_id):
"""
Tertiary enrichment via Google Places (New) API.
Only fires for business-type POIs when opening_hours, phone, or website
are still missing after OSM + Overture enrichment.
Fills only empty fields never overwrites existing values.
"""
# Check feature flag
try:
from .deployment_config import get_deployment_config
deploy_config = get_deployment_config()
features = deploy_config.get('features', {})
if not features.get('has_google_places_enrichment', False):
return result
except Exception:
return result
# Only enrich business-type POIs
poi_class = result.get('class', '')
if poi_class not in _BUSINESS_CLASSES:
return result
# Check if any gap fields are missing
extratags = result.get('extratags', {})
gaps = [f for f in _GOOGLE_GAP_FIELDS if not extratags.get(f)]
if not gaps:
logger.debug(f"google_places: skip {osm_type}/{osm_id} — no gaps")
return result
try:
from . import google_places
except ImportError:
logger.debug("google_places module not available")
return result
# Check Google cache first
cached_pid, cached_data = google_places.cache_get_google(osm_type, osm_id)
if cached_pid and cached_data:
_apply_google_data(result, cached_data, gaps)
result.setdefault('sources', {})['google_places'] = {
'place_id': cached_pid,
'source': 'cache',
}
logger.debug(f"google_places: cache hit for {osm_type}/{osm_id}")
return result
# Skip if already looked up and found nothing (cached_pid is None)
if cached_pid is not None:
return result
# Skip new Google API calls for guest users (cached data already returned above)
from .auth import get_user_id
if not get_user_id():
logger.debug(f"google_places: skip API call for {osm_type}/{osm_id} — guest user")
return result
# Daily cap check
if not google_places.check_daily_cap():
return result
# Search for the place
name = result.get('name', '')
centroid = result.get('centroid', {})
lat = centroid.get('lat')
lon = centroid.get('lon')
if not name or not lat or not lon:
return result
place_id = google_places.search_place(name, lat, lon)
if not place_id:
# Cache the miss to avoid repeated lookups
google_places.cache_put_google(osm_type, osm_id, '__miss__', None)
return result
# Get details
details = google_places.get_place_details(place_id)
if not details:
google_places.cache_put_google(osm_type, osm_id, place_id, None)
return result
# Cache the result
google_places.cache_put_google(osm_type, osm_id, place_id, details)
# Apply to result
_apply_google_data(result, details, gaps)
result.setdefault('sources', {})['google_places'] = {
'place_id': place_id,
'source': 'api',
'daily_count': google_places.get_daily_count(),
}
return result
def _apply_google_data(result, google_data, gaps):
"""Apply Google Places data to fill gap fields only."""
extratags = result.get('extratags', {})
if 'opening_hours' in gaps:
osm_hours = google_data.get('opening_hours')
if osm_hours:
extratags['opening_hours'] = osm_hours
elif google_data.get('opening_hours_raw'):
extratags['opening_hours_raw'] = google_data['opening_hours_raw']
if 'phone' in gaps and google_data.get('phone_number'):
extratags['phone'] = google_data['phone_number']
if 'website' in gaps and google_data.get('website'):
extratags['website'] = google_data['website']
result['extratags'] = extratags
# ── Wiki link rewriting ─────────────────────────────────────────────────
# Extratag keys that may contain wiki references
_WIKI_TAGS = ('wikipedia', 'wikidata', 'wikivoyage', 'appropedia')
def _enrich_wiki_links(result):
"""
Rewrite wiki-related extratags to local Kiwix URLs where available.
Falls back to public URLs. Only runs when has_wiki_rewriting is enabled.
Returns the (possibly enriched) result dict.
"""
try:
from .deployment_config import get_deployment_config
deploy_config = get_deployment_config()
features = deploy_config.get('features', {})
if not features.get('has_wiki_rewriting', False):
return result
except Exception:
return result
try:
from .wiki_rewrite import rewrite_wiki_link
except ImportError:
logger.debug("wiki_rewrite module not available")
return result
extratags = result.get('extratags', {})
if not extratags:
return result
rewrites = {}
for tag in _WIKI_TAGS:
value = extratags.get(tag)
if not value:
continue
url, status = rewrite_wiki_link(tag, value)
if status != 'original':
extratags[tag] = url
rewrites[tag] = status
if rewrites:
result['extratags'] = extratags
result.setdefault('sources', {})['wiki_rewrites'] = rewrites
logger.debug(f"Wiki rewrites for {result.get('osm_type')}/{result.get('osm_id')}: {rewrites}")
return result
# ── Nominatim parsing ───────────────────────────────────────────────────
# Nominatim address array uses rank_address to indicate what each entry is.
# We map rank ranges to our flat address fields.
RANK_TO_FIELD = {
4: 'country',
5: 'postcode',
6: 'state', # rank 6 = county in US, but we try name matching
8: 'state',
12: 'county',
16: 'city',
20: 'neighbourhood',
22: 'neighbourhood',
26: 'road',
28: 'house_number',
}
def _parse_nominatim_address(address_array, country_code=None):
"""Parse Nominatim's ranked address array into a flat address dict."""
addr = {
'house_number': None,
'road': None,
'neighbourhood': None,
'city': None,
'county': None,
'state': None,
'postcode': None,
'country': None,
'country_code': country_code,
}
if not address_array:
return addr
for entry in address_array:
if not entry.get('isaddress', False):
continue
name = entry.get('localname', '')
rank = entry.get('rank_address', 0)
etype = entry.get('type', '')
eclass = entry.get('class', '')
# Explicit type-based assignments (more reliable than rank alone)
if etype == 'country' and eclass == 'place':
addr['country'] = name
elif etype == 'state' or (eclass == 'boundary' and etype == 'administrative' and rank == 8):
if not addr['state']:
addr['state'] = name
elif etype == 'county' or (eclass == 'boundary' and etype == 'administrative' and rank in (10, 12)):
if not addr['county']:
addr['county'] = name
elif etype in ('city', 'town', 'village', 'hamlet') and eclass == 'place':
if not addr['city']:
addr['city'] = name
elif eclass == 'boundary' and etype == 'administrative' and rank == 16:
# City-level admin boundary (common in US)
if not addr['city']:
addr['city'] = name
elif etype == 'postcode':
addr['postcode'] = name
elif eclass == 'highway' or rank == 26:
if not addr['road']:
addr['road'] = name
elif etype == 'house_number' or rank == 28:
addr['house_number'] = name
elif rank in (20, 22) and not addr['neighbourhood']:
addr['neighbourhood'] = name
# Remove county from output (not in spec)
addr.pop('county', None)
return addr
def _parse_nominatim(data):
"""Parse a Nominatim /details response into our canonical shape."""
osm_type = data.get('osm_type', '')
osm_id = data.get('osm_id', 0)
osm_class = data.get('category', '')
osm_type_tag = data.get('type', '')
# Centroid
centroid_geom = data.get('centroid', {})
coords = centroid_geom.get('coordinates', [0, 0])
centroid = {'lat': coords[1], 'lon': coords[0]} if len(coords) >= 2 else {'lat': 0, 'lon': 0}
# Names
names = data.get('names', {})
display_name = data.get('localname', '') or names.get('name', '')
# Address
address = _parse_nominatim_address(
data.get('address', []),
country_code=data.get('country_code')
)
# Use calculated_postcode if address parse didn't find one
if not address.get('postcode') and data.get('calculated_postcode'):
address['postcode'] = data['calculated_postcode']
# Extratags
raw_extra = data.get('extratags', {})
extratags = {
'opening_hours': raw_extra.get('opening_hours'),
'phone': raw_extra.get('phone') or raw_extra.get('contact:phone'),
'website': raw_extra.get('website') or raw_extra.get('contact:website') or raw_extra.get('url'),
'email': raw_extra.get('email') or raw_extra.get('contact:email'),
'wikipedia': raw_extra.get('wikipedia'),
'wikidata': raw_extra.get('wikidata'),
'cuisine': raw_extra.get('cuisine'),
'operator': raw_extra.get('operator'),
'wheelchair': raw_extra.get('wheelchair'),
'fee': raw_extra.get('fee'),
'takeaway': raw_extra.get('takeaway'),
}
# Category: use extratags.place for boundaries (e.g. "city"), else class/type
effective_class = osm_class
effective_type = osm_type_tag
if osm_class == 'boundary' and osm_type_tag == 'administrative':
place_tag = raw_extra.get('place') or raw_extra.get('linked_place')
if place_tag:
effective_class = 'place'
effective_type = place_tag
category = humanize_category(effective_class, effective_type)
# Filter names: only include extra name tags, not the bare "name"
extra_names = {k: v for k, v in names.items() if k != 'name'} if names else {}
# Boundary geometry (polygon/multipolygon from Nominatim)
boundary = None
geom = data.get('geometry')
if geom and geom.get('type') in ('Polygon', 'MultiPolygon'):
boundary = geom
return {
'osm_type': osm_type,
'osm_id': osm_id,
'name': display_name,
'category': category,
'class': osm_class,
'type': osm_type_tag,
'address': address,
'centroid': centroid,
'extratags': extratags,
'names': extra_names if extra_names else None,
'source': 'nominatim_local',
'boundary': boundary,
}
# ── Overpass parsing ────────────────────────────────────────────────────
OVERPASS_TYPE_MAP = {'N': 'node', 'W': 'way', 'R': 'relation'}
def _build_overpass_query(osm_type, osm_id):
"""Build an Overpass QL query for a single element."""
elem = OVERPASS_TYPE_MAP.get(osm_type)
if not elem:
return None
return f"[out:json][timeout:10];{elem}({osm_id});out tags center;"
def _parse_overpass(data, osm_type, osm_id):
"""Parse an Overpass API response into our canonical shape."""
elements = data.get('elements', [])
if not elements:
return None
elem = elements[0]
tags = elem.get('tags', {})
# Centroid: Overpass returns lat/lon for nodes, center for ways/relations
lat = elem.get('lat') or (elem.get('center', {}).get('lat'))
lon = elem.get('lon') or (elem.get('center', {}).get('lon'))
centroid = {'lat': lat, 'lon': lon} if lat and lon else {'lat': 0, 'lon': 0}
# Determine class/type from tags — Overpass doesn't have a canonical class field
# Use the first recognized class tag
osm_class = ''
osm_type_tag = ''
for cls in ('amenity', 'shop', 'leisure', 'tourism', 'natural', 'highway',
'boundary', 'place', 'building', 'waterway', 'landuse', 'historic'):
if cls in tags:
osm_class = cls
osm_type_tag = tags[cls]
break
category = humanize_category(osm_class, osm_type_tag)
# Address from addr:* tags
address = {
'house_number': tags.get('addr:housenumber'),
'road': tags.get('addr:street'),
'neighbourhood': tags.get('addr:suburb') or tags.get('addr:neighbourhood'),
'city': tags.get('addr:city'),
'state': tags.get('addr:state'),
'postcode': tags.get('addr:postcode'),
'country': tags.get('addr:country'),
'country_code': tags.get('addr:country_code',
tags.get('addr:country', '')).lower()[:2] or None,
}
# Extratags
extratags = {
'opening_hours': tags.get('opening_hours'),
'phone': tags.get('phone') or tags.get('contact:phone'),
'website': tags.get('website') or tags.get('contact:website') or tags.get('url'),
'email': tags.get('email') or tags.get('contact:email'),
'wikipedia': tags.get('wikipedia'),
'wikidata': tags.get('wikidata'),
'cuisine': tags.get('cuisine'),
'operator': tags.get('operator'),
'wheelchair': tags.get('wheelchair'),
'fee': tags.get('fee'),
'takeaway': tags.get('takeaway'),
}
# Names
name = tags.get('name', '')
extra_names = {}
for k, v in tags.items():
if k.startswith('name:') or k in ('alt_name', 'old_name', 'short_name', 'official_name'):
extra_names[k] = v
return {
'osm_type': osm_type,
'osm_id': osm_id,
'name': name,
'category': category,
'class': osm_class,
'type': osm_type_tag,
'address': address,
'centroid': centroid,
'extratags': extratags,
'names': extra_names if extra_names else None,
'source': 'overpass',
}
# ── Public API ──────────────────────────────────────────────────────────
def get_place_detail(osm_type, osm_id):
"""
Fetch place details for an OSM element.
Returns (dict, status_code):
- (data, 200) on success
- (error_dict, 404) if not found in any source
- (error_dict, 502) if both sources error
"""
osm_type = osm_type.upper()
if osm_type not in VALID_OSM_TYPES:
return {'error': f'Invalid osm_type: {osm_type}. Must be N, W, or R.'}, 400
if osm_id <= 0:
return {'error': 'osm_id must be a positive integer'}, 400
# 1. Check cache
cached = cache_get(osm_type, osm_id)
if cached:
logger.debug(f"Cache hit: {osm_type}/{osm_id}")
return cached, 200
# 2. Try local Nominatim first
nominatim_result = None
nominatim_error = None
try:
resp = http_requests.get(NOMINATIM_URL, params={
'osmtype': osm_type,
'osmid': osm_id,
'format': 'json',
'addressdetails': 1,
'hierarchy': 0,
'keywords': 0,
'polygon_geojson': 1,
}, timeout=5)
if resp.status_code == 200:
data = resp.json()
# Nominatim returns a result even for IDs not in its DB,
# but they'll have empty/minimal data. Check for osm_id match.
if data.get('osm_id') == osm_id:
nominatim_result = _parse_nominatim(data)
logger.debug(f"Nominatim hit: {osm_type}/{osm_id}")
except Exception as e:
nominatim_error = str(e)
logger.warning(f"Nominatim error for {osm_type}/{osm_id}: {e}")
if nominatim_result:
nominatim_result = _enrich_with_overture(nominatim_result, osm_type, osm_id)
nominatim_result = _enrich_with_google(nominatim_result, osm_type, osm_id)
nominatim_result = _enrich_wiki_links(nominatim_result)
cache_put(osm_type, osm_id, nominatim_result, 'nominatim_local')
return nominatim_result, 200
# 3. Fallback to Overpass
overpass_result = None
overpass_error = None
try:
query = _build_overpass_query(osm_type, osm_id)
if query:
resp = http_requests.post(
OVERPASS_URL,
data={'data': query},
headers={'User-Agent': OVERPASS_UA},
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
overpass_result = _parse_overpass(data, osm_type, osm_id)
if overpass_result:
logger.debug(f"Overpass hit: {osm_type}/{osm_id}")
elif resp.status_code == 429:
overpass_error = "Overpass rate limited"
logger.warning(f"Overpass 429 for {osm_type}/{osm_id}")
else:
overpass_error = f"Overpass HTTP {resp.status_code}"
except Exception as e:
overpass_error = str(e)
logger.warning(f"Overpass error for {osm_type}/{osm_id}: {e}")
if overpass_result:
overpass_result = _enrich_with_overture(overpass_result, osm_type, osm_id)
overpass_result = _enrich_with_google(overpass_result, osm_type, osm_id)
overpass_result = _enrich_wiki_links(overpass_result)
cache_put(osm_type, osm_id, overpass_result, 'overpass')
return overpass_result, 200
# 4. Both failed
if nominatim_error and overpass_error:
logger.error(f"Both sources failed for {osm_type}/{osm_id}: "
f"Nominatim={nominatim_error}, Overpass={overpass_error}")
return {'error': 'Both data sources unavailable'}, 502
# Not found in either source (no errors, just empty results)
return {'error': f'{osm_type}/{osm_id} not found'}, 404
# ── Wikidata lookup ─────────────────────────────────────────────────────
WIKIDATA_API_URL = "https://www.wikidata.org/w/api.php"
def get_place_by_wikidata(wikidata_id):
"""
Fetch place details from Wikidata entity.
Returns (dict, status_code):
- (data, 200) on success
- (error_dict, 404) if entity not found
- (error_dict, 400) if invalid ID format
- (error_dict, 502) on API error
"""
# Validate wikidata ID format (Q followed by digits)
wikidata_id = wikidata_id.upper().strip()
if not wikidata_id.startswith("Q") or not wikidata_id[1:].isdigit():
return {"error": f"Invalid wikidata ID: {wikidata_id}. Must be Q followed by digits."}, 400
try:
resp = http_requests.get(WIKIDATA_API_URL, params={
"action": "wbgetentities",
"ids": wikidata_id,
"format": "json",
"languages": "en",
"props": "labels|descriptions|claims|sitelinks",
}, timeout=10, headers={"User-Agent": "Navi/1.0 (forge.echo6.co/matt/recon)"})
if resp.status_code != 200:
logger.warning(f"Wikidata API error for {wikidata_id}: HTTP {resp.status_code}")
return {"error": "Wikidata API error"}, 502
data = resp.json()
entities = data.get("entities", {})
entity = entities.get(wikidata_id)
if not entity or entity.get("missing"):
return {"error": f"Wikidata entity {wikidata_id} not found"}, 404
# Extract basic info
labels = entity.get("labels", {})
descriptions = entity.get("descriptions", {})
claims = entity.get("claims", {})
name = labels.get("en", {}).get("value", wikidata_id)
description = descriptions.get("en", {}).get("value", "")
# Extract coordinates from P625 (coordinate location)
lat, lon = None, None
if "P625" in claims:
coord_claim = claims["P625"]
if coord_claim and coord_claim[0].get("mainsnak", {}).get("datavalue"):
coord_val = coord_claim[0]["mainsnak"]["datavalue"]["value"]
lat = coord_val.get("latitude")
lon = coord_val.get("longitude")
# Extract population from P1082
population = None
if "P1082" in claims:
pop_claims = claims["P1082"]
if pop_claims:
# Get the most recent population value
for claim in pop_claims:
if claim.get("mainsnak", {}).get("datavalue"):
try:
population = int(claim["mainsnak"]["datavalue"]["value"]["amount"].lstrip("+"))
break
except (KeyError, ValueError):
pass
# Extract country from P17
country = None
if "P17" in claims:
country_claims = claims["P17"]
if country_claims and country_claims[0].get("mainsnak", {}).get("datavalue"):
country_id = country_claims[0]["mainsnak"]["datavalue"]["value"]["id"]
# Could resolve this to a name, but for now just store the ID
# Extract instance of (P31) for type classification
instance_of = []
if "P31" in claims:
for claim in claims["P31"]:
if claim.get("mainsnak", {}).get("datavalue"):
instance_of.append(claim["mainsnak"]["datavalue"]["value"]["id"])
# Extract OSM relation ID if available (P402)
osm_relation_id = None
if "P402" in claims:
osm_claims = claims["P402"]
if osm_claims and osm_claims[0].get("mainsnak", {}).get("datavalue"):
osm_relation_id = osm_claims[0]["mainsnak"]["datavalue"]["value"]
# Extract Wikipedia sitelink
sitelinks = entity.get("sitelinks", {})
wikipedia = None
if "enwiki" in sitelinks:
wiki_title = sitelinks["enwiki"].get("title", "")
if wiki_title:
wikipedia = f"en:{wiki_title}"
result = {
"wikidata_id": wikidata_id,
"name": name,
"description": description,
"centroid": {"lat": lat, "lon": lon} if lat and lon else None,
"population": population,
"instance_of": instance_of,
"osm_relation_id": osm_relation_id,
"source": "wikidata",
"extratags": {
"wikidata": wikidata_id,
},
}
if wikipedia:
result["extratags"]["wikipedia"] = wikipedia
# Fetch boundary polygon from Nominatim if we have an OSM relation ID
boundary = None
if osm_relation_id:
try:
nom_resp = http_requests.get(NOMINATIM_URL, params={
'osmtype': 'R',
'osmid': osm_relation_id,
'format': 'json',
'polygon_geojson': 1,
}, timeout=5)
if nom_resp.status_code == 200:
nom_data = nom_resp.json()
geom = nom_data.get('geometry')
if geom and geom.get('type') in ('Polygon', 'MultiPolygon'):
boundary = geom
logger.debug(f"Wikidata boundary hit for {wikidata_id}")
except Exception as e:
logger.debug(f"Wikidata boundary fetch failed: {e}")
result["boundary"] = boundary
logger.debug(f"Wikidata hit: {wikidata_id} -> {name}")
return result, 200
except Exception as e:
logger.warning(f"Wikidata error for {wikidata_id}: {e}")
return {"error": "Wikidata lookup failed"}, 502

View file

@ -77,10 +77,73 @@ def _text_hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _flatten_table(table_el):
"""Convert a <table> element to pipe-delimited text.
Each <tr> becomes a row with cells joined by ' | '.
Returns the formatted table as a string with blank lines around it.
"""
rows = []
for tr in table_el.iter('tr'):
cells = []
for cell in tr:
if cell.tag in ('td', 'th'):
cell_text = (cell.text_content() or '').strip()
# Collapse internal whitespace in each cell
cell_text = re.sub(r'\s+', ' ', cell_text)
if cell_text:
cells.append(cell_text)
if cells:
rows.append(' | '.join(cells))
if not rows:
return ''
return '\n'.join(rows)
def _preprocess_tree(doc):
"""Pre-process HTML tree to add delimiters before text_content() flattens it.
Handles: <table>, <br>, <li>, <dt>, <dd> -- elements that lxml's
text_content() would concatenate without any separators.
"""
from lxml import etree
# 1. Replace <table> elements with their pipe-delimited text
for table in list(doc.iter('table')):
formatted = _flatten_table(table)
if formatted:
replacement = etree.Element('div')
replacement.text = '\n\n' + formatted + '\n\n'
parent = table.getparent()
if parent is not None:
parent.replace(table, replacement)
else:
table.drop_tree()
# 2. <br> -> inject newline
for br in list(doc.iter('br')):
br.tail = '\n' + (br.tail or '')
# 3. <li> -> inject newline + "- " prefix
for li in list(doc.iter('li')):
li.text = '- ' + (li.text or '')
li.tail = '\n' + (li.tail or '')
# 4. <dt> -> inject newline before
for dt in list(doc.iter('dt')):
dt.tail = '\n' + (dt.tail or '')
# 5. <dd> -> inject newline + indent
for dd in list(doc.iter('dd')):
dd.text = ' ' + (dd.text or '')
dd.tail = '\n' + (dd.tail or '')
def _html_to_text(html_bytes):
"""Convert HTML bytes to clean text via lxml.
Strips nav, footer, script, style elements. Decodes entities.
Pre-processes tables, lists, and line breaks for proper delimiters.
Normalizes whitespace.
"""
try:
@ -93,6 +156,9 @@ def _html_to_text(html_bytes):
for el in doc.iter(tag):
el.drop_tree()
# Pre-process tree: tables -> pipe-delimited, br -> newlines, li -> dashes
_preprocess_tree(doc)
# Extract text
text = doc.text_content()

View file

@ -1,46 +0,0 @@
"""
RECON Domain Taxonomy
Single source of truth for the 18 knowledge domains and their PeerTube
category ID mappings. IDs 100-117 are registered via the
peertube-plugin-recon-domains plugin.
Import VALID_DOMAINS from here instead of defining local sets.
"""
DOMAIN_CATEGORY_MAP = {
'Agriculture & Livestock': 100,
'Civil Organization': 101,
'Communications': 102,
'Food Systems': 103,
'Foundational Skills': 104,
'Logistics': 105,
'Medical': 106,
'Navigation': 107,
'Operations': 108,
'Power Systems': 109,
'Preservation & Storage': 110,
'Security': 111,
'Shelter & Construction': 112,
'Technology': 113,
'Tools & Equipment': 114,
'Vehicles': 115,
'Water Systems': 116,
'Wilderness Skills': 117,
}
VALID_DOMAINS = set(DOMAIN_CATEGORY_MAP.keys())
CATEGORY_DOMAIN_MAP = {v: k for k, v in DOMAIN_CATEGORY_MAP.items()}
# Channels whose tiebreaker is skipped because their content is non-topical
# (catch-alls, miscellany dumps, etc.). Items in these channels with tied
# domain counts go straight to tied_manual without channel-context tiebreaker.
#
# This is intentionally a hardcoded explicit list, not a size threshold.
# Adding a channel here requires an explicit decision — only add channels
# that are genuinely non-topical catch-alls where channel-wide concept
# aggregation would produce meaningless noise.
MEGA_CHANNEL_SKIP_LIST = {
'Transcript', # Legacy catch-all, ~9,200 videos, no topical coherence
}

View file

@ -124,22 +124,6 @@ class StatusDB:
except Exception:
pass # column already exists
# Migration: domain assignment columns for PeerTube categorization
for col, coltype in [
('recon_domain', 'TEXT'),
('recon_domain_status', 'TEXT'),
('recon_domain_assigned_at', 'TEXT'),
('peertube_category_pushed_at', 'TEXT'),
]:
try:
conn.execute(f"ALTER TABLE documents ADD COLUMN {col} {coltype}")
except Exception:
pass # column already exists
try:
conn.execute("CREATE INDEX idx_documents_recon_domain_status ON documents(recon_domain_status)")
except Exception:
pass # index already exists
# Stream B: file_operations + duplicate_review tables
conn.executescript("""
CREATE TABLE IF NOT EXISTS file_operations (
@ -464,90 +448,6 @@ class StatusDB:
conn.commit()
# ── Domain Assignment Helpers ──────────────────<E29480><E29480>─────────────
def get_domain_assignment(self, file_hash):
"""Get domain assignment for a document.
Returns:
(recon_domain, recon_domain_status) tuple, or (None, None) if not set.
"""
conn = self._get_conn()
row = conn.execute(
"SELECT recon_domain, recon_domain_status FROM documents WHERE hash = ?",
(file_hash,)
).fetchone()
if row:
return (row['recon_domain'], row['recon_domain_status'])
return (None, None)
def set_domain_assignment(self, file_hash, domain, status):
"""Set domain assignment and status for a document."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET recon_domain = ?, recon_domain_status = ?, "
"recon_domain_assigned_at = ? WHERE hash = ?",
(domain, status, datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def set_peertube_pushed(self, file_hash):
"""Mark a document's category as pushed to PeerTube."""
conn = self._get_conn()
conn.execute(
"UPDATE documents SET peertube_category_pushed_at = ? WHERE hash = ?",
(datetime.now(timezone.utc).isoformat(), file_hash)
)
conn.commit()
def get_unpushed_assignments(self):
"""Get documents with domain assignments not yet pushed to PeerTube.
Only returns stream.echo6.co source documents.
"""
conn = self._get_conn()
return [dict(r) for r in conn.execute(
"""SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain IS NOT NULL
AND d.peertube_category_pushed_at IS NULL
AND c.source = 'stream.echo6.co'
ORDER BY d.recon_domain_assigned_at""",
).fetchall()]
def get_items_by_domain_status(self, status, limit=None):
"""Get documents by domain assignment status."""
conn = self._get_conn()
q = """SELECT d.*, c.source, c.category, c.path as catalogue_path
FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.recon_domain_status = ?
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
return [dict(r) for r in conn.execute(q, (status,)).fetchall()]
def get_domain_status_counts(self):
"""Get counts of documents by domain assignment status."""
conn = self._get_conn()
return {row['recon_domain_status']: row['cnt']
for row in conn.execute(
"SELECT recon_domain_status, COUNT(*) as cnt "
"FROM documents WHERE recon_domain_status IS NOT NULL "
"GROUP BY recon_domain_status"
).fetchall()}
def get_domain_distribution(self):
"""Get counts of documents per assigned domain."""
conn = self._get_conn()
return {row['recon_domain']: row['cnt']
for row in conn.execute(
"SELECT recon_domain, COUNT(*) as cnt "
"FROM documents WHERE recon_domain IS NOT NULL "
"GROUP BY recon_domain ORDER BY cnt DESC"
).fetchall()}
# ── Scraper Job Helpers ─────────────────────────────────────
def get_pending_scrape_job(self):

View file

@ -1,324 +0,0 @@
"""
Wiki link rewriter rewrites OSM wikipedia/wikidata/wikivoyage/appropedia
links to local Kiwix URLs where the article exists in a loaded ZIM.
Falls back silently to public URLs when article is unavailable locally.
Caches positive results only in place_cache.db.
Kiwix catalog is parsed from the OPDS Atom feed at startup and refreshed
hourly to pick up newly loaded ZIMs without a restart.
Operations note:
- After loading a new ZIM, either restart RECON (forces fresh catalog
fetch) or wait up to 1 hour for automatic refresh.
- To invalidate the wiki cache (e.g. after ZIM update):
sqlite3 /opt/recon/data/place_cache.db "DELETE FROM wiki_cache;"
"""
import os
import re
import sqlite3
import time
import xml.etree.ElementTree as ET
from urllib.parse import unquote, quote
import requests as http_requests
from .utils import setup_logging
logger = setup_logging('recon.wiki_rewrite')
# ── Configuration ───────────────────────────────────────────────────────
KIWIX_BASE = "http://localhost:8430"
KIWIX_PUBLIC_BASE = "https://wiki.echo6.co"
KIWIX_CATALOG_URL = f"{KIWIX_BASE}/catalog/v2/entries"
HEAD_TIMEOUT = 1.5 # seconds
CATALOG_REFRESH_INTERVAL = 3600 # 1 hour
# OPDS Atom namespace
_ATOM_NS = "http://www.w3.org/2005/Atom"
# ── ZIM catalog map ─────────────────────────────────────────────────────
_zim_map = {} # source_type → content_path e.g. 'wikipedia' → 'wikipedia_en_all_maxi_2026-02'
_zim_map_ts = 0.0 # last refresh timestamp
# Prefix-to-source-type mapping (order matters: longest prefix first)
_ZIM_PREFIX_MAP = [
('wikipedia_en_all', 'wikipedia'),
('appropedia_en_all', 'appropedia'),
('wikivoyage_en', 'wikivoyage'),
('wikidata_en', 'wikidata'),
]
def _discover_zims():
"""Parse Kiwix OPDS Atom catalog to map source types to content paths."""
global _zim_map, _zim_map_ts
try:
resp = http_requests.get(KIWIX_CATALOG_URL, timeout=5)
if resp.status_code != 200:
logger.warning(f"Kiwix catalog returned HTTP {resp.status_code}")
return
root = ET.fromstring(resp.content)
new_map = {}
for entry in root.findall(f"{{{_ATOM_NS}}}entry"):
name_el = entry.find(f"{{{_ATOM_NS}}}name")
if name_el is None:
continue
book_name = name_el.text or ""
# <link type="text/html" href="/content/..."/>
content_path = None
for link in entry.findall(f"{{{_ATOM_NS}}}link"):
if link.get("type") == "text/html":
href = link.get("href", "")
if href.startswith("/content/"):
content_path = href[len("/content/"):]
break
if not content_path:
continue
# Match book name against known prefixes
for prefix, source_type in _ZIM_PREFIX_MAP:
if book_name.startswith(prefix):
new_map[source_type] = content_path
break
_zim_map = new_map
_zim_map_ts = time.time()
logger.info(f"ZIM catalog refreshed: {new_map}")
except Exception as e:
logger.warning(f"Failed to discover ZIMs from Kiwix catalog: {e}")
def _ensure_zim_map():
"""Lazy-load and refresh ZIM map if stale."""
if not _zim_map or (time.time() - _zim_map_ts) > CATALOG_REFRESH_INTERVAL:
_discover_zims()
# ── Database (wiki_cache in place_cache.db) ─────────────────────────────
_db_conn = None
def _get_db():
"""Return a module-level SQLite connection to place_cache.db (lazy init)."""
global _db_conn
if _db_conn is not None:
return _db_conn
db_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data')
os.makedirs(db_dir, exist_ok=True)
db_path = os.path.join(db_dir, 'place_cache.db')
_db_conn = sqlite3.connect(db_path, check_same_thread=False)
_db_conn.execute("PRAGMA journal_mode=WAL")
_db_conn.execute("PRAGMA synchronous=NORMAL")
_db_conn.execute("""
CREATE TABLE IF NOT EXISTS wiki_cache (
source_type TEXT NOT NULL,
article_id TEXT NOT NULL,
kiwix_url TEXT NOT NULL,
cached_at INTEGER NOT NULL,
PRIMARY KEY (source_type, article_id)
)
""")
_db_conn.commit()
logger.info(f"Wiki cache table ready in {db_path}")
return _db_conn
# ── URL classification ──────────────────────────────────────────────────
# Patterns for OSM wikipedia/wikidata tag values
_WIKI_TAG_RE = re.compile(r'^(?:en:)?(.+)$') # "en:Title" or just "Title"
_WIKI_URL_RE = re.compile(r'https?://en\.wikipedia\.org/wiki/(.+)')
_WIKIDATA_TAG_RE = re.compile(r'^(Q\d+)$')
_WIKIDATA_URL_RE = re.compile(r'https?://(?:www\.)?wikidata\.org/wiki/(Q\d+)')
_WIKIVOYAGE_URL_RE = re.compile(r'https?://en\.wikivoyage\.org/wiki/(.+)')
_APPROPEDIA_URL_RE = re.compile(r'https?://(?:www\.)?appropedia\.org/(?:wiki/)?(.+)')
def _normalize_article_id(article_id):
"""Normalize article ID to MediaWiki/Kiwix convention: spaces → underscores."""
return article_id.replace(' ', '_')
def classify_wiki_link(tag_name, value):
"""
Classify an OSM extratag value into (source_type, article_id) or None.
tag_name: the extratags key ('wikipedia', 'wikidata', etc.)
value: the raw tag value from OSM
Article IDs are normalized to MediaWiki convention (spaces underscores).
"""
if not value or not isinstance(value, str):
return None
value = value.strip()
if tag_name == 'wikidata':
m = _WIKIDATA_TAG_RE.match(value)
if m:
return ('wikidata', m.group(1))
m = _WIKIDATA_URL_RE.match(value)
if m:
return ('wikidata', m.group(1))
return None
if tag_name == 'wikipedia':
# URL form: https://en.wikipedia.org/wiki/Title
m = _WIKI_URL_RE.match(value)
if m:
return ('wikipedia', _normalize_article_id(unquote(m.group(1))))
# Tag form: "en:Title" or "Title"
m = _WIKI_TAG_RE.match(value)
if m:
return ('wikipedia', _normalize_article_id(m.group(1)))
return None
if tag_name == 'wikivoyage':
m = _WIKIVOYAGE_URL_RE.match(value)
if m:
return ('wikivoyage', _normalize_article_id(unquote(m.group(1))))
# Plain tag: "en:Title" or "Title"
m = _WIKI_TAG_RE.match(value)
if m:
return ('wikivoyage', _normalize_article_id(m.group(1)))
return None
if tag_name == 'appropedia':
m = _APPROPEDIA_URL_RE.match(value)
if m:
return ('appropedia', _normalize_article_id(unquote(m.group(1))))
return ('appropedia', _normalize_article_id(value))
return None
# ── URL builders ────────────────────────────────────────────────────────
def build_kiwix_url(source_type, article_id):
"""Build a public Kiwix URL. Returns None if source_type not in ZIM map."""
_ensure_zim_map()
content_path = _zim_map.get(source_type)
if not content_path:
return None
return f"{KIWIX_PUBLIC_BASE}/content/{content_path}/{quote(article_id, safe='/:@!$&\'()*+,;=')}"
_PUBLIC_URL_TEMPLATES = {
'wikipedia': "https://en.wikipedia.org/wiki/{id}",
'wikidata': "https://www.wikidata.org/wiki/{id}",
'wikivoyage': "https://en.wikivoyage.org/wiki/{id}",
'appropedia': "https://www.appropedia.org/wiki/{id}",
}
def build_public_url(source_type, article_id):
"""Build the canonical public URL for a wiki article."""
tmpl = _PUBLIC_URL_TEMPLATES.get(source_type)
if not tmpl:
return None
return tmpl.format(id=quote(article_id, safe='/:@!$&\'()*+,;='))
# ── Kiwix availability check ───────────────────────────────────────────
def check_kiwix_has_article(source_type, article_id):
"""
Check if an article exists in local Kiwix.
Returns (bool, url):
- (True, kiwix_public_url) if article exists locally
- (False, None) if not found or Kiwix unavailable
Only positive results are cached.
"""
# Check cache first
db = _get_db()
row = db.execute(
"SELECT kiwix_url FROM wiki_cache WHERE source_type=? AND article_id=?",
(source_type, article_id)
).fetchone()
if row:
return (True, row[0])
# Build local HEAD URL
_ensure_zim_map()
content_path = _zim_map.get(source_type)
if not content_path:
return (False, None)
head_url = f"{KIWIX_BASE}/content/{content_path}/{quote(article_id, safe='/:@!$&\'()*+,;=')}"
try:
resp = http_requests.head(head_url, timeout=HEAD_TIMEOUT, allow_redirects=True)
if resp.status_code == 200:
kiwix_url = build_kiwix_url(source_type, article_id)
# Cache positive result
now = int(time.time())
db.execute("""
INSERT OR REPLACE INTO wiki_cache (source_type, article_id, kiwix_url, cached_at)
VALUES (?, ?, ?, ?)
""", (source_type, article_id, kiwix_url, now))
db.commit()
return (True, kiwix_url)
else:
return (False, None)
except Exception as e:
logger.debug(f"Kiwix HEAD failed for {source_type}/{article_id}: {e}")
return (False, None)
# ── Primary entry point ────────────────────────────────────────────────
def rewrite_wiki_link(tag_name, value):
"""
Rewrite an OSM wiki tag value to a local Kiwix URL if available.
Returns (url, 'local'|'public') or (None, None) if unrecognized.
"""
classified = classify_wiki_link(tag_name, value)
if not classified:
return (value, 'original')
source_type, article_id = classified
# Try local Kiwix
found, kiwix_url = check_kiwix_has_article(source_type, article_id)
if found and kiwix_url:
return (kiwix_url, 'local')
# Fall back to public URL
public_url = build_public_url(source_type, article_id)
if public_url:
return (public_url, 'public')
return (value, 'original')
# ── Discovery stubs (disabled, for future activation) ───────────────────
def discover_wikivoyage_article(name, category, lat, lon):
"""
Discover a related Wikivoyage article for a place.
Enabled by has_wiki_discovery. Currently returns None.
"""
return None
def discover_appropedia_article(name, category):
"""
Discover a related Appropedia article for a place.
Enabled by has_wiki_discovery. Currently returns None.
"""
return None

View file

@ -1,52 +0,0 @@
# peertube-plugin-recon-domains
Registers 18 RECON knowledge domains as PeerTube video categories using IDs 100-117. These categories are assigned automatically by RECON's domain assignment pipeline based on concept extraction analysis.
## Category Mapping
| ID | Domain |
|-----|---------------------------|
| 100 | Agriculture & Livestock |
| 101 | Civil Organization |
| 102 | Communications |
| 103 | Food Systems |
| 104 | Foundational Skills |
| 105 | Logistics |
| 106 | Medical |
| 107 | Navigation |
| 108 | Operations |
| 109 | Power Systems |
| 110 | Preservation & Storage |
| 111 | Security |
| 112 | Shelter & Construction |
| 113 | Technology |
| 114 | Tools & Equipment |
| 115 | Vehicles |
| 116 | Water Systems |
| 117 | Wilderness Skills |
Built-in PeerTube categories (IDs 1-18) are not modified.
## Install
```bash
# Copy plugin to PeerTube storage
cp -r peertube-plugin-recon-domains /var/www/peertube/storage/plugins/node_modules/
# Register plugin via API or admin UI
# Admin > Plugins > Install > peertube-plugin-recon-domains
# Restart PeerTube
sudo systemctl restart peertube
```
## Uninstall
Remove the plugin via PeerTube admin UI or:
```bash
rm -rf /var/www/peertube/storage/plugins/node_modules/peertube-plugin-recon-domains
sudo systemctl restart peertube
```
Videos with RECON categories will revert to showing the raw category ID until the plugin is reinstalled.

View file

@ -1,32 +0,0 @@
async function register ({ videoCategoryManager }) {
const reconDomains = {
100: 'Agriculture & Livestock',
101: 'Civil Organization',
102: 'Communications',
103: 'Food Systems',
104: 'Foundational Skills',
105: 'Logistics',
106: 'Medical',
107: 'Navigation',
108: 'Operations',
109: 'Power Systems',
110: 'Preservation & Storage',
111: 'Security',
112: 'Shelter & Construction',
113: 'Technology',
114: 'Tools & Equipment',
115: 'Vehicles',
116: 'Water Systems',
117: 'Wilderness Skills'
}
for (const [id, label] of Object.entries(reconDomains)) {
videoCategoryManager.addConstant(parseInt(id), label)
}
}
async function unregister () {
return
}
module.exports = { register, unregister }

View file

@ -1,21 +0,0 @@
{
"name": "peertube-plugin-recon-domains",
"version": "1.0.0",
"description": "Registers 18 RECON knowledge domains as PeerTube video categories (IDs 100-117)",
"engine": {
"peertube": ">=6.0.0"
},
"keywords": [
"peertube",
"plugin"
],
"homepage": "https://forge.echo6.co/matt/recon",
"author": "Echo6",
"license": "MIT",
"library": "./main.js",
"staticDirs": {},
"css": [],
"clientScripts": [],
"translations": {},
"bugs": "https://forge.echo6.co/matt/recon/issues"
}

170
recon.py
View file

@ -863,166 +863,6 @@ def cmd_ingest(args):
return 0
def cmd_assign_categories(args):
"""Assign RECON domains to PeerTube videos and push categories."""
from qdrant_client import QdrantClient
from lib.domain_assigner import compute_assignment, run_tiebreaker_pass
from lib.peertube_writer import push_pending, extract_uuid
from lib.recon_domains import DOMAIN_CATEGORY_MAP
config = get_config()
db = StatusDB()
dry_run = args.dry_run
limit = args.limit
if args.backfill:
# Pass 1: assign domains to all complete stream docs with no assignment
# or that previously got needs_reprocess
conn = db._get_conn()
q = """SELECT d.hash FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE d.status = 'complete'
AND (d.recon_domain IS NULL
OR d.recon_domain_status = 'needs_reprocess')
AND c.source = 'stream.echo6.co'
ORDER BY d.discovered_at"""
if limit:
q += f" LIMIT {int(limit)}"
rows = conn.execute(q).fetchall()
hashes = [r['hash'] for r in rows]
if not hashes:
print("No unassigned complete stream documents found")
return 0
print(f"Backfill: processing {len(hashes)} documents" +
(" [DRY RUN]" if dry_run else ""))
# Create one Qdrant client for the entire backfill
qdrant = QdrantClient(
host=config['vector_db']['host'],
port=config['vector_db']['port'],
timeout=60
)
stats = {'assigned': 0, 'tied_pass_1': 0, 'no_concepts': 0, 'needs_reprocess': 0, 'errors': 0}
for i, file_hash in enumerate(hashes):
try:
domain, status = compute_assignment(file_hash, db, config, qdrant=qdrant)
stats[status] = stats.get(status, 0) + 1
if not dry_run:
db.set_domain_assignment(file_hash, domain, status)
if (i + 1) % 1000 == 0:
print(f" Progress: {i + 1}/{len(hashes)}")
except Exception as e:
stats['errors'] += 1
logger.warning(f" Assignment error for {file_hash[:12]}: {e}")
print(f"\nBackfill results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.tiebreaker_pass:
if dry_run:
items = db.get_items_by_domain_status('tied_pass_1')
print(f"Tiebreaker pass: {len(items)} items would be processed [DRY RUN]")
return 0
stats = run_tiebreaker_pass(db, config)
print(f"\nTiebreaker results:")
for k, v in sorted(stats.items()):
print(f" {k}: {v}")
return 0
elif args.push_pending:
if dry_run:
items = db.get_unpushed_assignments()
if limit:
items = items[:limit]
print(f"Push pending: {len(items)} items would be pushed [DRY RUN]")
return 0
success, failed = push_pending(db, config, limit=limit)
print(f"\nPush results: {success} succeeded, {failed} failed")
return 0
elif args.reprocess_missing:
items = db.get_items_by_domain_status('needs_reprocess', limit=limit)
if not items:
print("No items with needs_reprocess status")
return 0
print(f"Reprocess: {len(items)} items" + (" [DRY RUN]" if dry_run else ""))
requeued = 0
for item in items:
file_hash = item['hash']
if dry_run:
print(f" Would reprocess: {file_hash[:12]}{item.get('filename', '?')}")
requeued += 1
continue
# Reset document status to allow re-processing
conn = db._get_conn()
conn.execute(
"""UPDATE documents SET
status = 'catalogued',
concepts_extracted = 0,
vectors_inserted = 0,
recon_domain = NULL,
recon_domain_status = NULL,
recon_domain_assigned_at = NULL,
peertube_category_pushed_at = NULL,
error_message = NULL,
extracted_at = NULL,
enriched_at = NULL,
embedded_at = NULL
WHERE hash = ?""",
(file_hash,)
)
conn.commit()
# Re-queue for pipeline processing
db.queue_document(file_hash)
requeued += 1
print(f"Requeued {requeued} items for reprocessing")
return 0
else:
# Default: show domain assignment status
status_counts = db.get_domain_status_counts()
domain_dist = db.get_domain_distribution()
conn = db._get_conn()
total_stream = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'"""
).fetchone()['cnt']
unassigned = conn.execute(
"""SELECT COUNT(*) as cnt FROM documents d
LEFT JOIN catalogue c ON d.hash = c.hash
WHERE c.source = 'stream.echo6.co' AND d.status = 'complete'
AND d.recon_domain IS NULL"""
).fetchone()['cnt']
unpushed = len(db.get_unpushed_assignments())
print("=== Domain Assignment Status ===\n")
print(f"Total complete stream docs: {total_stream}")
print(f"Unassigned: {unassigned}")
print(f"Unpushed to PeerTube: {unpushed}")
if status_counts:
print(f"\nAssignment status breakdown:")
for status, cnt in sorted(status_counts.items()):
print(f" {status:<20s} {cnt:>6d}")
if domain_dist:
print(f"\nDomain distribution:")
for domain, cnt in sorted(domain_dist.items(), key=lambda x: -x[1]):
print(f" {domain:<35s} {cnt:>6d}")
return 0
def cmd_pipeline(args):
"""Stream B library pipeline: status, migrate, reverse, watch, sweep."""
from lib.new_pipeline import (
@ -1318,16 +1158,6 @@ def main():
p.set_defaults(func=cmd_ingest)
# assign-categories
p = sub.add_parser('assign-categories', help='Assign RECON domains to PeerTube videos')
p.add_argument('--backfill', action='store_true', help='Assign domains to all complete stream docs')
p.add_argument('--tiebreaker-pass', action='store_true', help='Resolve tied assignments via channel analysis')
p.add_argument('--push-pending', action='store_true', help='Push assigned categories to PeerTube API')
p.add_argument('--reprocess-missing', action='store_true', help='Re-queue needs_reprocess items')
p.add_argument('--dry-run', action='store_true', help='Show what would happen without writing')
p.add_argument('--limit', type=int, help='Limit number of items to process')
p.set_defaults(func=cmd_assign_categories)
# pipeline (Stream B)
p = sub.add_parser('pipeline', help='Stream B library pipeline (status, migrate, reverse, watch, sweep)')
p.add_argument('pipeline_action', nargs='?', default='status',

View file

@ -3,6 +3,7 @@ anyio==4.12.1
babel==2.18.0
beautifulsoup4==4.14.3
blinker==1.9.0
cachetools==7.1.3
certifi==2026.1.4
cffi==2.0.0
charset-normalizer==3.4.4

View file

@ -1,350 +0,0 @@
#!/usr/bin/env python3
"""Overture Maps Places → PostgreSQL import script (v2).
Downloads Overture Places Parquet from S3 via DuckDB (public bucket, no credentials),
filters to North America bounding box, and inserts into local PostgreSQL with PostGIS.
Usage:
cd /opt/recon && venv/bin/python scripts/overture_import.py
Re-runnable (idempotent via UPSERT).
"""
import json
import logging
import os
import re
import sys
import time
import duckdb
import psycopg2
import psycopg2.extras
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%H:%M:%S'
)
log = logging.getLogger('overture_import')
# --- Config ---
OVERTURE_RELEASE = '2026-04-15.0'
S3_PATH = f's3://overturemaps-us-west-2/release/{OVERTURE_RELEASE}/theme=places/type=place/*'
# North America bounding box (generous — includes Hawaii, Puerto Rico, Canada)
BBOX = {
'xmin': -170.0,
'xmax': -50.0,
'ymin': 15.0,
'ymax': 85.0,
}
BATCH_SIZE = 50_000
OSM_RECORD_RE = re.compile(r'^([nwr])(\d+)@\d+$')
DB_CONFIG = {
'host': os.environ.get('OVERTURE_DB_HOST', 'localhost'),
'port': int(os.environ.get('OVERTURE_DB_PORT', '5432')),
'dbname': os.environ.get('OVERTURE_DB_NAME', 'overture'),
'user': os.environ.get('OVERTURE_DB_USER', 'overture'),
'password': os.environ.get('OVERTURE_DB_PASSWORD', ''),
}
def create_table(conn):
"""Create places table and indexes if they don't exist."""
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS places (
id TEXT PRIMARY KEY,
geometry GEOMETRY(Point, 4326),
name TEXT,
basic_category TEXT,
confidence REAL,
phone TEXT,
website TEXT,
socials JSONB,
brand_name TEXT,
brand_wikidata TEXT,
osm_type CHAR(1),
osm_id BIGINT,
source_record_id TEXT,
raw_sources JSONB
);
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_places_osm
ON places(osm_type, osm_id) WHERE osm_type IS NOT NULL;
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_places_geom
ON places USING GIST(geometry);
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_places_name_trgm
ON places USING GIN(name gin_trgm_ops);
""")
conn.commit()
log.info('Table and indexes ready')
def parse_osm_ref(sources):
"""Extract OSM type letter and ID from Overture sources array."""
if not sources:
return None, None, None
for src in sources:
record_id = None
if isinstance(src, dict):
record_id = src.get('record_id', '')
elif hasattr(src, '__getitem__'):
# DuckDB struct — try attribute access
try:
record_id = src['record_id']
except (KeyError, TypeError, IndexError):
pass
if not record_id:
continue
m = OSM_RECORD_RE.match(str(record_id))
if m:
return m.group(1), int(m.group(2)), str(record_id)
return None, None, None
def run_import():
"""Main import: DuckDB reads S3 Parquet → PostgreSQL via chunked OFFSET/LIMIT."""
log.info(f'Overture release: {OVERTURE_RELEASE}')
log.info(f'S3 path: {S3_PATH}')
log.info(f'Bounding box: {BBOX}')
# Connect to PostgreSQL
conn = psycopg2.connect(**DB_CONFIG)
conn.autocommit = False
create_table(conn)
# Set up DuckDB with httpfs and spatial for S3 access
duck = duckdb.connect()
duck.execute("INSTALL httpfs; LOAD httpfs;")
duck.execute("INSTALL spatial; LOAD spatial;")
duck.execute("SET s3_region='us-west-2';")
# Use a materialized approach: DuckDB query → Arrow → iterate in Python
query = f"""
SELECT
id,
ST_X(geometry) AS lon,
ST_Y(geometry) AS lat,
names.primary AS name,
basic_category,
confidence,
phones,
websites,
socials,
brand,
sources
FROM read_parquet('{S3_PATH}', hive_partitioning=true)
WHERE bbox.xmin >= {BBOX['xmin']}
AND bbox.xmax <= {BBOX['xmax']}
AND bbox.ymin >= {BBOX['ymin']}
AND bbox.ymax <= {BBOX['ymax']}
"""
log.info('Starting DuckDB query against S3 (this will take several minutes)...')
t_start = time.time()
# Execute and fetch all as Arrow for efficient iteration
result_rel = duck.sql(query)
upsert_sql = """
INSERT INTO places (id, geometry, name, basic_category, confidence,
phone, website, socials, brand_name, brand_wikidata,
osm_type, osm_id, source_record_id, raw_sources)
VALUES %s
ON CONFLICT (id) DO UPDATE SET
geometry = EXCLUDED.geometry,
name = EXCLUDED.name,
basic_category = EXCLUDED.basic_category,
confidence = EXCLUDED.confidence,
phone = EXCLUDED.phone,
website = EXCLUDED.website,
socials = EXCLUDED.socials,
brand_name = EXCLUDED.brand_name,
brand_wikidata = EXCLUDED.brand_wikidata,
osm_type = EXCLUDED.osm_type,
osm_id = EXCLUDED.osm_id,
source_record_id = EXCLUDED.source_record_id,
raw_sources = EXCLUDED.raw_sources
"""
template = """(
%(id)s,
ST_SetSRID(ST_MakePoint(%(lon)s, %(lat)s), 4326),
%(name)s,
%(basic_category)s,
%(confidence)s,
%(phone)s,
%(website)s,
%(socials)s::jsonb,
%(brand_name)s,
%(brand_wikidata)s,
%(osm_type)s,
%(osm_id)s,
%(source_record_id)s,
%(raw_sources)s::jsonb
)"""
total = 0
osm_refs = 0
batch = []
log.info('DuckDB query executing, fetching results in chunks...')
# Fetch in chunks using fetchmany on the relation
chunk_size = BATCH_SIZE
while True:
chunk = result_rel.fetchmany(chunk_size)
if not chunk:
break
for row in chunk:
row_id = row[0]
lon = row[1]
lat = row[2]
name = row[3]
basic_cat = row[4]
conf = row[5]
phones = row[6]
websites = row[7]
socials_raw = row[8]
brand_raw = row[9]
sources_raw = row[10]
if lon is None or lat is None:
continue
# Phone: first element of VARCHAR[]
phone = None
if phones and len(phones) > 0:
phone = str(phones[0]) if phones[0] else None
# Website: first element of VARCHAR[]
website = None
if websites and len(websites) > 0:
website = str(websites[0]) if websites[0] else None
# Socials: VARCHAR[] → JSON array of strings
socials_json = None
if socials_raw and len(socials_raw) > 0:
socials_json = json.dumps([str(s) for s in socials_raw if s])
# Brand: struct with wikidata and names.primary
brand_name = None
brand_wikidata = None
if brand_raw:
try:
if isinstance(brand_raw, dict):
brand_wikidata = brand_raw.get('wikidata')
names_struct = brand_raw.get('names')
if names_struct and isinstance(names_struct, dict):
brand_name = names_struct.get('primary')
else:
# DuckDB struct — access by key
brand_wikidata = brand_raw['wikidata'] if 'wikidata' in dir(brand_raw) else None
try:
brand_wikidata = brand_raw[0] # wikidata is first field
names_struct = brand_raw[1] # names is second field
if names_struct:
brand_name = names_struct[0] # primary is first field
except (IndexError, TypeError):
pass
except Exception:
pass
# Sources: parse OSM cross-reference
sources_list = None
if sources_raw:
if isinstance(sources_raw, (list, tuple)):
sources_list = []
for s in sources_raw:
if isinstance(s, dict):
sources_list.append(s)
else:
# DuckDB struct tuple — convert
try:
sources_list.append({
'dataset': s[1] if len(s) > 1 else None,
'record_id': s[3] if len(s) > 3 else None,
})
except (TypeError, IndexError):
pass
osm_type_letter, osm_id_val, source_record_id = parse_osm_ref(sources_list)
if osm_type_letter:
osm_refs += 1
raw_sources_json = json.dumps(sources_list) if sources_list else None
batch.append({
'id': row_id,
'lon': float(lon),
'lat': float(lat),
'name': name,
'basic_category': basic_cat,
'confidence': float(conf) if conf is not None else None,
'phone': phone,
'website': website,
'socials': socials_json,
'brand_name': brand_name,
'brand_wikidata': brand_wikidata,
'osm_type': osm_type_letter,
'osm_id': osm_id_val,
'source_record_id': source_record_id,
'raw_sources': raw_sources_json,
})
if len(batch) >= BATCH_SIZE:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur, upsert_sql, batch,
template=template,
page_size=BATCH_SIZE
)
conn.commit()
total += len(batch)
elapsed = time.time() - t_start
rate = total / elapsed if elapsed > 0 else 0
log.info(f'Inserted {total:,} rows ({osm_refs:,} OSM xrefs) '
f'[{rate:.0f} rows/sec, {elapsed:.0f}s elapsed]')
batch = []
# Flush remaining
if batch:
with conn.cursor() as cur:
psycopg2.extras.execute_values(
cur, upsert_sql, batch,
template=template,
page_size=BATCH_SIZE
)
conn.commit()
total += len(batch)
duck.close()
# Final stats
elapsed = time.time() - t_start
log.info(f'Import complete: {total:,} rows, {osm_refs:,} OSM cross-refs, '
f'{elapsed:.0f}s total ({total/elapsed:.0f} rows/sec)')
# Verify
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM places")
count = cur.fetchone()[0]
cur.execute("SELECT count(*) FROM places WHERE osm_type IS NOT NULL")
osm_count = cur.fetchone()[0]
log.info(f'Final table: {count:,} total rows, {osm_count:,} with OSM cross-references')
conn.close()
if __name__ == '__main__':
run_import()

View file

@ -21,7 +21,6 @@
<a href="/peertube"{% if domain == 'peertube' %} class="active"{% endif %}>PeerTube</a>
<a href="/kiwix"{% if domain == 'kiwix' %} class="active"{% endif %}>Kiwix</a>
<a href="/search"{% if domain == 'search' %} class="active"{% endif %}>Search</a>
<a href="/nav-i"{% if domain == 'navi' %} class="active"{% endif %}>Nav-I</a>
<a href="/settings/keys"{% if domain == 'settings' %} class="active"{% endif %}>Settings</a>
</div>
{% if subnav %}

View file

@ -1,56 +0,0 @@
{% extends "base.html" %}
{% block content %}
<h3 style="color:#ffa500;margin-bottom:16px;">Deleted Contacts</h3>
{% if not contacts %}
<p class="text-dim">No deleted contacts.</p>
{% else %}
<table>
<tr><th>Label</th><th>Name</th><th>Category</th><th>Phone</th><th>Deleted At</th><th>Actions</th></tr>
{% for c in contacts %}
<tr id="row-{{ c.id }}">
<td>{{ c.label }}</td>
<td>{{ c.name or '' }}</td>
<td class="text-dim">{{ c.category or '' }}</td>
<td class="text-dim text-xs">{{ c.phone or '' }}</td>
<td class="text-dim text-xs">{{ c.deleted_at or '' }}</td>
<td>
<button class="btn" onclick="restoreContact({{ c.id }})">Restore</button>
<button class="btn" style="margin-left:4px;color:#ff4444;" onclick="purgeContact({{ c.id }})">Purge</button>
</td>
</tr>
{% endfor %}
</table>
{% endif %}
{% endblock %}
{% block scripts %}
<script>
async function restoreContact(id) {
try {
var resp = await fetch('/api/contacts/' + id + '/restore', {method: 'POST'});
if (resp.ok) {
location.reload();
} else {
var data = await resp.json();
alert(data.error || 'Restore failed');
}
} catch(e) {
alert('Error: ' + e.message);
}
}
async function purgeContact(id) {
if (!confirm('Permanently delete this contact? This cannot be undone.')) return;
try {
var resp = await fetch('/api/contacts/' + id + '/purge', {method: 'DELETE'});
if (resp.ok) {
location.reload();
} else {
var data = await resp.json();
alert(data.error || 'Purge failed');
}
} catch(e) {
alert('Error: ' + e.message);
}
}
</script>
{% endblock %}

View file

@ -1,269 +0,0 @@
{% extends "base.html" %}
{% block content %}
<h3 style="color:var(--orange);margin-bottom:16px;">API Keys</h3>
<div class="panel" style="margin-bottom:16px;padding:10px 14px;border-left:3px solid var(--orange);">
<p class="text-dim" style="font-size:12px;margin:0;">Updating keys does not restart RECON. After updates, click <strong style="color:var(--text-primary);">Restart RECON</strong> below or restart manually from terminal.</p>
</div>
<div id="keys-loading" class="text-dim" style="padding:20px;">Loading keys...</div>
<div id="keys-error" style="display:none;padding:12px;color:#ff4444;"></div>
<table id="keys-table" style="display:none;">
<thead>
<tr><th>Provider</th><th>Masked Value</th><th>Count</th><th>Last Modified</th><th style="width:200px;">Actions</th></tr>
</thead>
<tbody id="keys-tbody"></tbody>
</table>
<div id="gemini-detail" style="display:none;margin-top:16px;">
<h4 style="color:var(--text-primary);margin-bottom:8px;font-size:13px;">Gemini Keys</h4>
<table style="font-size:12px;">
<thead>
<tr><th>#</th><th>Masked Key</th><th>Calls</th><th>Errors</th><th>Last Used</th><th style="width:200px;">Actions</th></tr>
</thead>
<tbody id="gemini-tbody"></tbody>
</table>
</div>
<div style="margin-top:20px;padding-top:16px;border-top:1px solid var(--border-light);">
<button class="btn" onclick="restartRecon(this)" style="border-color:var(--orange);color:var(--orange);">Restart RECON</button>
<span id="restart-status" class="text-dim text-xs" style="margin-left:8px;"></span>
</div>
<!-- Update modal -->
<div id="update-modal" style="display:none;position:fixed;inset:0;z-index:50;background:rgba(0,0,0,0.6);align-items:center;justify-content:center;">
<div style="background:var(--bg-secondary);border:1px solid var(--border-light);padding:24px;max-width:440px;width:90%;">
<h4 style="color:var(--orange);margin-bottom:12px;">Update Key</h4>
<p class="text-dim" style="margin-bottom:4px;font-size:12px;">Provider: <span id="modal-provider" style="color:var(--text-primary);"></span></p>
<p class="text-dim" style="margin-bottom:12px;font-size:12px;">Key: <span id="modal-key-name" style="color:var(--text-primary);font-family:var(--font-mono);"></span></p>
<div style="position:relative;">
<input id="modal-new-value" type="password" placeholder="Paste new key value..." autocomplete="off" style="width:100%;padding:6px 36px 6px 10px;background:var(--bg-tertiary);border:1px solid var(--border-light);color:var(--text-primary);font-family:var(--font-mono);font-size:13px;">
<button onclick="toggleKeyVisibility()" style="position:absolute;right:4px;top:50%;transform:translateY(-50%);background:none;border:none;color:var(--text-dim);cursor:pointer;font-size:11px;padding:4px;" title="Toggle visibility" id="modal-toggle-vis">show</button>
</div>
<div style="display:flex;gap:8px;justify-content:flex-end;margin-top:16px;">
<button class="btn" onclick="closeUpdateModal()">Cancel</button>
<button class="btn" id="modal-save" onclick="saveKey()" style="border-color:var(--green);color:var(--green);">Save</button>
</div>
<p id="modal-error" style="display:none;color:#ff4444;font-size:12px;margin-top:8px;"></p>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
var pendingUpdate = null; // {name, index, provider}
async function loadKeys() {
try {
var resp = await fetch('/api/nav-i/api-keys/list');
if (!resp.ok) throw new Error('HTTP ' + resp.status);
var data = await resp.json();
renderKeys(data.keys);
} catch(e) {
document.getElementById('keys-loading').style.display = 'none';
var errEl = document.getElementById('keys-error');
errEl.textContent = 'Failed to load keys: ' + e.message;
errEl.style.display = 'block';
}
}
function renderKeys(keys) {
document.getElementById('keys-loading').style.display = 'none';
document.getElementById('keys-table').style.display = '';
var tbody = document.getElementById('keys-tbody');
tbody.innerHTML = '';
keys.forEach(function(k) {
var tr = document.createElement('tr');
tr.id = 'row-' + k.name;
var masked = k.masked_value || '<span class="text-dim">not set</span>';
var countStr = k.count.toString();
var mtime = k.last_modified ? k.last_modified.replace('T', ' ').replace('Z', '') : '—';
tr.innerHTML =
'<td style="font-weight:600;">' + k.display_name + '</td>' +
'<td><code style="font-size:12px;">' + masked + '</code></td>' +
'<td style="text-align:center;">' + countStr + '</td>' +
'<td class="text-dim text-xs">' + mtime + '</td>' +
'<td>' +
(k.provider === 'gemini'
? '<button class="btn" onclick="toggleGeminiDetail()">Details</button> '
: '<button class="btn" onclick="openUpdateModal(\'' + k.name + '\', null, \'' + k.display_name + '\')">Update</button> ') +
'<button class="btn" onclick="testKey(\'' + k.name + '\', null, this)">Test</button>' +
'<span class="test-result text-xs" style="margin-left:6px;"></span>' +
'</td>';
tbody.appendChild(tr);
// Render Gemini sub-table
if (k.provider === 'gemini' && k.keys) {
renderGeminiKeys(k.keys);
}
});
}
function renderGeminiKeys(keys) {
var tbody = document.getElementById('gemini-tbody');
tbody.innerHTML = '';
keys.forEach(function(k) {
var tr = document.createElement('tr');
var lastUsed = k.last_used ? k.last_used.replace('T', ' ').replace('Z', '') : '—';
tr.innerHTML =
'<td>' + k.index + '</td>' +
'<td><code style="font-size:11px;">' + k.masked + '</code></td>' +
'<td style="text-align:center;">' + k.calls + '</td>' +
'<td style="text-align:center;">' + (k.errors || 0) + '</td>' +
'<td class="text-dim text-xs">' + lastUsed + '</td>' +
'<td>' +
'<button class="btn" onclick="openUpdateModal(\'GEMINI_KEY\', ' + k.index + ', \'Gemini #' + k.index + '\')">Update</button> ' +
'<button class="btn" onclick="testKey(\'GEMINI_KEY\', ' + k.index + ', this)">Test</button>' +
'<span class="test-result text-xs" style="margin-left:6px;"></span>' +
'</td>';
tbody.appendChild(tr);
});
}
function toggleGeminiDetail() {
var el = document.getElementById('gemini-detail');
el.style.display = el.style.display === 'none' ? '' : 'none';
}
function openUpdateModal(name, index, displayName) {
pendingUpdate = {name: name, index: index};
document.getElementById('modal-provider').textContent = displayName;
document.getElementById('modal-key-name').textContent = name + (index !== null ? ' [' + index + ']' : '');
document.getElementById('modal-new-value').value = '';
document.getElementById('modal-new-value').type = 'password';
document.getElementById('modal-toggle-vis').textContent = 'show';
document.getElementById('modal-error').style.display = 'none';
document.getElementById('update-modal').style.display = 'flex';
document.getElementById('modal-new-value').focus();
}
function closeUpdateModal() {
document.getElementById('update-modal').style.display = 'none';
pendingUpdate = null;
}
function toggleKeyVisibility() {
var inp = document.getElementById('modal-new-value');
var btn = document.getElementById('modal-toggle-vis');
if (inp.type === 'password') {
inp.type = 'text';
btn.textContent = 'hide';
} else {
inp.type = 'password';
btn.textContent = 'show';
}
}
async function saveKey() {
if (!pendingUpdate) return;
var newValue = document.getElementById('modal-new-value').value.trim();
if (!newValue) {
var errEl = document.getElementById('modal-error');
errEl.textContent = 'Key value cannot be empty.';
errEl.style.display = 'block';
return;
}
var saveBtn = document.getElementById('modal-save');
saveBtn.disabled = true;
saveBtn.textContent = 'Saving...';
try {
var body = {name: pendingUpdate.name, new_value: newValue};
if (pendingUpdate.index !== null) body.index = pendingUpdate.index;
var resp = await fetch('/api/nav-i/api-keys/update', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(body)
});
var data = await resp.json();
if (data.success) {
closeUpdateModal();
loadKeys(); // refresh table
} else {
var errEl = document.getElementById('modal-error');
errEl.textContent = data.error || 'Update failed';
errEl.style.display = 'block';
}
} catch(e) {
var errEl = document.getElementById('modal-error');
errEl.textContent = 'Error: ' + e.message;
errEl.style.display = 'block';
} finally {
saveBtn.disabled = false;
saveBtn.textContent = 'Save';
}
}
async function testKey(name, index, btn) {
var resultSpan = btn.nextElementSibling;
resultSpan.textContent = 'testing...';
resultSpan.style.color = 'var(--text-dim)';
btn.disabled = true;
try {
var body = {name: name};
if (index !== null) body.index = index;
var resp = await fetch('/api/nav-i/api-keys/test', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(body)
});
var data = await resp.json();
if (data.success) {
resultSpan.innerHTML = '<span style="color:var(--green);">&#10003;</span> Pass — ' + data.latency_ms + 'ms';
if (data.note) resultSpan.innerHTML += ' <span class="text-dim">(' + data.note + ')</span>';
} else {
resultSpan.innerHTML = '<span style="color:#ff4444;">&#10007;</span> Failed: ' + (data.error || 'unknown');
}
} catch(e) {
resultSpan.innerHTML = '<span style="color:#ff4444;">&#10007;</span> Error: ' + e.message;
} finally {
btn.disabled = false;
}
}
async function restartRecon(btn) {
if (!confirm('Restart RECON service? Active enrichment/embedding workers will be interrupted.')) return;
var statusEl = document.getElementById('restart-status');
btn.disabled = true;
statusEl.textContent = 'Restarting...';
statusEl.style.color = 'var(--text-dim)';
try {
var resp = await fetch('/api/nav-i/api-keys/restart-recon', {method: 'POST'});
var data = await resp.json();
if (data.success) {
statusEl.innerHTML = '<span style="color:var(--green);">&#10003;</span> Restarted successfully';
} else {
statusEl.innerHTML = '<span style="color:#ff4444;">&#10007;</span> ' + (data.error || 'Failed');
}
} catch(e) {
statusEl.innerHTML = '<span style="color:#ff4444;">&#10007;</span> ' + e.message;
} finally {
btn.disabled = false;
}
}
// Close modal on Escape key
document.addEventListener('keydown', function(e) {
if (e.key === 'Escape') closeUpdateModal();
});
// Close modal on backdrop click
document.getElementById('update-modal').addEventListener('click', function(e) {
if (e.target === this) closeUpdateModal();
});
// Load on page init
loadKeys();
</script>
{% endblock %}

View file

@ -1,116 +0,0 @@
{% extends "base.html" %}
{% block content %}
<h3 style="color:var(--orange);margin-bottom:16px;">Deleted Contacts</h3>
{% if not contacts %}
<p class="text-dim">No deleted contacts.</p>
{% else %}
<table>
<tr><th>Label</th><th>Name</th><th>Category</th><th>Phone</th><th>Deleted At</th><th>Actions</th></tr>
{% for c in contacts %}
<tr id="row-{{ c.id }}">
<td>{{ c.label }}</td>
<td>{{ c.name or '' }}</td>
<td class="text-dim">{{ c.category or '' }}</td>
<td class="text-dim text-xs">{{ c.phone or '' }}</td>
<td class="text-dim text-xs">{{ c.deleted_at or '' }}</td>
<td>
<button class="btn" onclick="restoreContact({{ c.id }}, '{{ c.label }}')">Restore</button>
<button class="btn" style="margin-left:4px;color:#ff4444;" onclick="purgeContact({{ c.id }})">Purge</button>
</td>
</tr>
{% endfor %}
</table>
{% endif %}
<!-- Conflict resolution modal -->
<div id="conflict-modal" style="display:none;position:fixed;inset:0;z-index:50;background:rgba(0,0,0,0.6);align-items:center;justify-content:center;">
<div style="background:var(--bg-secondary);border:1px solid var(--border-light);padding:24px;max-width:400px;width:90%;">
<h4 style="color:var(--orange);margin-bottom:12px;">Label Conflict</h4>
<p class="text-dim" style="margin-bottom:16px;">An active contact with the label "<span id="conflict-label" style="color:var(--text-primary);"></span>" already exists. Choose a new label to restore this contact:</p>
<input id="conflict-new-label" type="text" placeholder="New label..." style="width:100%;padding:6px 10px;background:var(--bg-tertiary);border:1px solid var(--border-light);color:var(--text-primary);font-family:var(--font-mono);font-size:13px;margin-bottom:16px;">
<div style="display:flex;gap:8px;justify-content:flex-end;">
<button class="btn" onclick="closeConflictModal()">Cancel</button>
<button class="btn" id="conflict-submit" onclick="submitRestoreAs()" style="border-color:var(--green);color:var(--green);">Restore As</button>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
var pendingRestoreId = null;
async function restoreContact(id, label) {
try {
var resp = await fetch('/api/contacts/' + id + '/restore', {method: 'POST'});
if (resp.ok) {
location.reload();
} else if (resp.status === 409) {
// Home/Work conflict — show modal
pendingRestoreId = id;
document.getElementById('conflict-label').textContent = label;
document.getElementById('conflict-new-label').value = '';
var modal = document.getElementById('conflict-modal');
modal.style.display = 'flex';
document.getElementById('conflict-new-label').focus();
} else {
var data = await resp.json();
alert(data.error || 'Restore failed');
}
} catch(e) {
alert('Error: ' + e.message);
}
}
function closeConflictModal() {
document.getElementById('conflict-modal').style.display = 'none';
pendingRestoreId = null;
}
async function submitRestoreAs() {
var newLabel = document.getElementById('conflict-new-label').value.trim();
if (!newLabel) {
document.getElementById('conflict-new-label').style.borderColor = 'var(--red)';
return;
}
try {
var resp = await fetch('/api/contacts/' + pendingRestoreId + '/restore-as', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({label: newLabel})
});
if (resp.ok) {
location.reload();
} else {
var data = await resp.json();
alert(data.error || 'Restore failed');
}
} catch(e) {
alert('Error: ' + e.message);
}
}
async function purgeContact(id) {
if (!confirm('Permanently delete this contact? This cannot be undone.')) return;
try {
var resp = await fetch('/api/contacts/' + id + '/purge', {method: 'DELETE'});
if (resp.ok) {
location.reload();
} else {
var data = await resp.json();
alert(data.error || 'Purge failed');
}
} catch(e) {
alert('Error: ' + e.message);
}
}
// Close modal on Escape key
document.addEventListener('keydown', function(e) {
if (e.key === 'Escape') closeConflictModal();
});
// Close modal on backdrop click
document.getElementById('conflict-modal').addEventListener('click', function(e) {
if (e.target === this) closeConflictModal();
});
</script>
{% endblock %}

View file

@ -1,22 +0,0 @@
{% extends "base.html" %}
{% block content %}
<h3 style="color:var(--green);margin-bottom:16px;">Nav-I</h3>
<p class="text-dim" style="margin-bottom:24px;">Navi frontend management — contacts, API keys, and configuration.</p>
<div class="stat-grid">
<a href="/deleted-contacts" style="text-decoration:none;">
<div class="stat-card" style="cursor:pointer;transition:border-color 0.15s;">
<div class="label">Deleted Contacts</div>
<div class="value">{{ deleted_count }}</div>
<div class="sublabel">awaiting restore or purge</div>
</div>
</a>
<a href="/nav-i/api-keys" style="text-decoration:none;">
<div class="stat-card" style="cursor:pointer;transition:border-color 0.15s;">
<div class="label">API Keys</div>
<div class="value" style="font-size:14px;color:var(--text-dim);margin-top:12px;">Coming soon</div>
<div class="sublabel">per-user key management</div>
</div>
</a>
</div>
{% endblock %}

View file

@ -1,148 +0,0 @@
{% extends "base.html" %}
{% block content %}
<div id="pt-review">
<div class="stat-grid" style="grid-template-columns:repeat(4, 1fr);">
<div class="stat-card"><div class="label">Manual Review</div><div class="value" id="rv-manual"></div></div>
<div class="stat-card"><div class="label">Assigned</div><div class="value" id="rv-assigned"></div></div>
<div class="stat-card"><div class="label">Tied (Pass 1)</div><div class="value" id="rv-tied1"></div></div>
<div class="stat-card"><div class="label">Needs Reprocess</div><div class="value" id="rv-reprocess"></div></div>
</div>
<div class="panel" style="margin-top:16px;">
<h3 class="section-title" style="margin-bottom:12px;">Manual Review Queue</h3>
<div id="rv-items-container">
<table class="data-table" id="rv-table" style="display:none;">
<thead>
<tr>
<th>Video</th>
<th>Channel</th>
<th>Current Domain</th>
<th>Top Domains</th>
<th>Assign</th>
</tr>
</thead>
<tbody id="rv-tbody"></tbody>
</table>
<div id="rv-empty" class="text-muted" style="padding:24px;text-align:center;">Loading...</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
const VALID_DOMAINS = {{ valid_domains | tojson }};
async function loadStats() {
try {
const resp = await fetch('/api/peertube/review/stats');
const data = await resp.json();
document.getElementById('rv-manual').textContent = data.tied_manual || 0;
document.getElementById('rv-assigned').textContent =
(data.assigned || 0) + (data.tied_pass_2 || 0) + (data.manual_assigned || 0);
document.getElementById('rv-tied1').textContent = data.tied_pass_1 || 0;
document.getElementById('rv-reprocess').textContent = data.needs_reprocess || 0;
} catch (e) {
console.error('Failed to load stats:', e);
}
}
async function loadItems() {
try {
const resp = await fetch('/api/peertube/review/items');
const items = await resp.json();
const tbody = document.getElementById('rv-tbody');
const table = document.getElementById('rv-table');
const empty = document.getElementById('rv-empty');
if (!items.length) {
empty.textContent = 'No items pending manual review.';
table.style.display = 'none';
return;
}
tbody.innerHTML = '';
table.style.display = '';
empty.style.display = 'none';
items.forEach(item => {
const tr = document.createElement('tr');
tr.id = 'row-' + item.hash;
// Video title/filename
const tdVideo = document.createElement('td');
tdVideo.textContent = item.filename || item.hash.slice(0, 12);
tdVideo.title = item.hash;
tr.appendChild(tdVideo);
// Channel
const tdChannel = document.createElement('td');
tdChannel.textContent = item.category || '—';
tr.appendChild(tdChannel);
// Current domain
const tdCurrent = document.createElement('td');
tdCurrent.textContent = item.recon_domain || '—';
tr.appendChild(tdCurrent);
// Top domains (from concept counts)
const tdTop = document.createElement('td');
if (item.top_domains) {
tdTop.innerHTML = item.top_domains.map(d =>
'<span class="badge">' + d.domain + ' (' + d.count + ')</span>'
).join(' ');
}
tr.appendChild(tdTop);
// Assign dropdown + button
const tdAssign = document.createElement('td');
const sel = document.createElement('select');
sel.className = 'input-sm';
sel.innerHTML = '<option value="">Select...</option>' +
VALID_DOMAINS.map(d => '<option value="' + d + '">' + d + '</option>').join('');
if (item.recon_domain) {
sel.value = item.recon_domain;
}
const btn = document.createElement('button');
btn.className = 'btn btn-sm btn-primary';
btn.textContent = 'Assign';
btn.onclick = () => assignDomain(item.hash, sel.value, tr);
tdAssign.appendChild(sel);
tdAssign.appendChild(btn);
tr.appendChild(tdAssign);
tbody.appendChild(tr);
});
} catch (e) {
document.getElementById('rv-empty').textContent = 'Error loading items: ' + e.message;
}
}
async function assignDomain(hash, domain, row) {
if (!domain) {
alert('Select a domain first');
return;
}
try {
const resp = await fetch('/api/peertube/review/assign', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({hash: hash, domain: domain})
});
const result = await resp.json();
if (result.ok) {
row.style.opacity = '0.4';
row.querySelector('button').disabled = true;
row.querySelector('button').textContent = 'Done';
loadStats();
} else {
alert('Assignment failed: ' + (result.error || 'unknown error'));
}
} catch (e) {
alert('Error: ' + e.message);
}
}
loadStats();
loadItems();
</script>
{% endblock %}

View file

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
Parity test: verify RECON domain taxonomy matches PeerTube categories.
Tests:
1. DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS exactly
2. PeerTube API returns all 18 RECON categories (IDs 100-117) with correct labels
Usage:
cd /opt/recon && source venv/bin/activate
python3 tests/test_constants_parity.py
"""
import json
import sys
import requests
# Add parent dir to path for imports
sys.path.insert(0, '/opt/recon')
from lib.recon_domains import DOMAIN_CATEGORY_MAP, VALID_DOMAINS, CATEGORY_DOMAIN_MAP
def test_local_parity():
"""Verify DOMAIN_CATEGORY_MAP keys match VALID_DOMAINS."""
map_keys = set(DOMAIN_CATEGORY_MAP.keys())
assert map_keys == VALID_DOMAINS, (
f"Mismatch: in map but not VALID_DOMAINS: {map_keys - VALID_DOMAINS}, "
f"in VALID_DOMAINS but not map: {VALID_DOMAINS - map_keys}"
)
assert len(CATEGORY_DOMAIN_MAP) == len(DOMAIN_CATEGORY_MAP), "Reverse map size mismatch"
print(f"[OK] Local parity: {len(VALID_DOMAINS)} domains, map keys match VALID_DOMAINS")
def test_peertube_categories():
"""Verify PeerTube API returns all 18 RECON categories."""
url = "http://192.168.1.170:9000/api/v1/videos/categories"
headers = {"Host": "stream.echo6.co"}
try:
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"[SKIP] PeerTube API unreachable: {e}")
return
categories = resp.json() # dict of {id_str: label}
missing = []
wrong_label = []
for domain, cat_id in DOMAIN_CATEGORY_MAP.items():
cat_str = str(cat_id)
if cat_str not in categories:
missing.append((cat_id, domain))
elif categories[cat_str] != domain:
wrong_label.append((cat_id, domain, categories[cat_str]))
if missing:
print(f"[FAIL] Missing categories in PeerTube: {missing}")
print(" Deploy peertube-plugin-recon-domains to CT 110 first")
sys.exit(1)
if wrong_label:
print(f"[FAIL] Wrong labels in PeerTube: {wrong_label}")
sys.exit(1)
print(f"[OK] PeerTube parity: all {len(DOMAIN_CATEGORY_MAP)} categories present with correct labels")
if __name__ == '__main__':
test_local_parity()
test_peertube_categories()
print("\nAll parity tests passed.")