mirror of
https://github.com/zvx-echo6/central.git
synced 2026-05-21 18:14:44 +02:00
- Add stream_name, subject_for(), and settings_schema() to SourceAdapter ABC - Implement all three methods in NWSAdapter, FIRMSAdapter, USGSQuakeAdapter - Replace manual _ADAPTER_REGISTRY with pkgutil.iter_modules auto-discovery - Remove subject_for_event from models.py (each adapter owns its subject logic) - Update supervisor to use adapter.subject_for(event) instead of helper - Fix quake events going to wrong stream (was publishing to CENTRAL_WX) - Update test files to use adapter methods This fixes the quake stream bug where events were published to central.wx.alert.us.unknown instead of central.quake.event.<tier>. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
115 lines
3.9 KiB
Python
115 lines
3.9 KiB
Python
"""Smoke tests for Central models and CloudEvents wire format."""
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
|
|
from central.models import Event, Geo
|
|
from central.config import NWSAdapterConfig, CloudEventsConfig, NATSConfig, PostgresConfig, Config
|
|
from central.cloudevents_wire import wrap_event
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_geo() -> Geo:
|
|
"""Sample Geo object for testing."""
|
|
return Geo(
|
|
centroid=(-116.2, 43.6),
|
|
bbox=(-116.5, 43.4, -115.9, 43.8),
|
|
regions=["US-ID-Ada", "US-ID-Canyon"],
|
|
primary_region="US-ID-Ada",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_event(sample_geo: Geo) -> Event:
|
|
"""Sample Event object for testing."""
|
|
return Event(
|
|
id="urn:central:nws:alert:KBOI-202401151200-SVR",
|
|
adapter="nws",
|
|
category="wx.alert.severe_thunderstorm_warning",
|
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
|
expires=datetime(2024, 1, 15, 13, 0, 0, tzinfo=timezone.utc),
|
|
severity=3,
|
|
geo=sample_geo,
|
|
data={"headline": "Severe Thunderstorm Warning", "urgency": "Immediate"},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_config() -> Config:
|
|
"""Sample Config object for testing."""
|
|
return Config(
|
|
adapters={
|
|
"nws": NWSAdapterConfig(
|
|
enabled=True,
|
|
cadence_s=60,
|
|
states=["ID", "MT"],
|
|
contact_email="test@example.com",
|
|
)
|
|
},
|
|
cloudevents=CloudEventsConfig(
|
|
type_prefix="central",
|
|
source="central.local",
|
|
schema_version="1.0",
|
|
),
|
|
nats=NATSConfig(url="nats://localhost:4222"),
|
|
postgres=PostgresConfig(dsn="postgresql://user:pass@localhost/db"),
|
|
)
|
|
|
|
|
|
|
|
class TestCloudEventsWire:
|
|
"""Tests for CloudEvents wire format."""
|
|
|
|
def test_required_fields_present(
|
|
self, sample_event: Event, sample_config: Config
|
|
) -> None:
|
|
"""Required CloudEvents fields are present."""
|
|
envelope, msg_id = wrap_event(sample_event, sample_config)
|
|
|
|
assert msg_id == sample_event.id
|
|
assert envelope["id"] == sample_event.id
|
|
assert envelope["source"] == sample_config.cloudevents.source
|
|
assert envelope["type"] == "central.wx.alert.severe_thunderstorm_warning.v1"
|
|
assert envelope["specversion"] == "1.0"
|
|
assert "time" in envelope
|
|
assert envelope["datacontenttype"] == "application/json"
|
|
assert "data" in envelope
|
|
|
|
def test_extension_attributes_lowercase(
|
|
self, sample_event: Event, sample_config: Config
|
|
) -> None:
|
|
"""Extension attributes are lowercase with no underscores."""
|
|
envelope, _ = wrap_event(sample_event, sample_config)
|
|
|
|
# Check that extension attributes exist and are lowercase
|
|
assert envelope["centralschemaversion"] == "1.0"
|
|
assert envelope["centralcategory"] == "wx.alert.severe_thunderstorm_warning"
|
|
assert envelope["centralseverity"] == 3
|
|
|
|
# Verify no uppercase or underscores in extension names
|
|
for key in ["centralschemaversion", "centralcategory", "centralseverity"]:
|
|
assert key.islower()
|
|
assert "_" not in key
|
|
|
|
def test_severity_none_omits_centralseverity(
|
|
self, sample_geo: Geo, sample_config: Config
|
|
) -> None:
|
|
"""When severity is None, centralseverity is omitted entirely."""
|
|
event = Event(
|
|
id="test-no-severity",
|
|
adapter="nws",
|
|
category="wx.alert.test",
|
|
time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
|
|
severity=None, # Explicitly None
|
|
geo=sample_geo,
|
|
data={},
|
|
)
|
|
|
|
envelope, _ = wrap_event(event, sample_config)
|
|
|
|
# centralseverity should not be present at all
|
|
assert "centralseverity" not in envelope
|
|
# Other extensions should still be present
|
|
assert "centralschemaversion" in envelope
|
|
assert "centralcategory" in envelope
|