Implementing Pydantic Models for AFC Event Streams
Automated fare collection (AFC) systems generate high-velocity event streams that directly dictate daily revenue reconciliation, audit compliance, and operational reporting. When transit agencies modernize their data infrastructure, transitioning from brittle ETL scripts to deterministic, type-safe ingestion layers becomes non-negotiable. Embedding Pydantic models into your Fare Data Ingestion & GTFS-RT Sync architecture establishes a strict contract for every tap, board, fare adjustment, and penalty event. This approach eliminates silent data corruption, enforces regulatory compliance, and provides revenue analysts with immediate visibility into malformed payloads before they contaminate downstream settlement ledgers.
Deterministic Schema Definition
Transit validators, open-loop payment gateways, and mobile wallet SDKs rarely emit perfectly uniform JSON. Legacy firmware often serializes timestamps as naive strings, represents cents as floats, or appends undocumented metadata fields. The goal is selective strictness: coerce noisy enum and timestamp inputs into canonical forms through mode="before" validators, while marking accounting fields like fare_amount_cents as strict=True so that silent float-to-int coercion—a historic source of revenue leakage—is rejected outright. Pairing this with extra="forbid" ensures undocumented vendor fields never slip into the ledger unnoticed. The following schema enforces those guarantees while normalizing heterogeneous inputs before they enter the ledger.
import logging
import json
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4
from pydantic import BaseModel, Field, field_validator, ConfigDict, ValidationError
# Structured audit logging configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("afc_reconciliation")
class TapDirection(str, Enum):
ENTRY = "entry"
EXIT = "exit"
TRANSFER = "transfer"
class FareMediaType(str, Enum):
SMART_CARD = "smart_card"
MOBILE_WALLET = "mobile_wallet"
CONTACTLESS_BANK = "contactless_bank"
CASH = "cash"
class AFCTapEvent(BaseModel):
model_config = ConfigDict(extra="forbid")
event_id: str = Field(..., min_length=1, max_length=64)
device_id: str = Field(..., pattern=r"^[A-Z0-9_-]+$")
route_id: str = Field(..., pattern=r"^[A-Z0-9_-]+$")
stop_id: str = Field(..., min_length=1)
direction: TapDirection
media_type: FareMediaType
tap_timestamp: datetime
fare_amount_cents: int = Field(ge=0, strict=True)
agency_id: str = Field(..., min_length=2, max_length=8)
gtfs_trip_id: Optional[str] = None
validator_skew_ms: Optional[int] = None
@field_validator("tap_timestamp", mode="before")
@classmethod
def normalize_utc(cls, v: str | datetime) -> datetime:
"""Sanitize legacy timestamps and enforce UTC compliance."""
if isinstance(v, str):
dt = datetime.fromisoformat(v.replace("Z", "+00:00"))
else:
dt = v
return dt.astimezone(timezone.utc)
@field_validator("fare_amount_cents", mode="before")
@classmethod
def coerce_fare(cls, v: float | str | int) -> int:
"""Handle firmware quirks where cents are transmitted as floats or strings."""
if isinstance(v, str):
return int(float(v.replace(",", "")))
return int(v)
Ingestion Pipeline & Audit Trail
Revenue reconciliation requires deterministic routing: valid events proceed to settlement, while invalid payloads are quarantined with full forensic context. The ingestion function below implements explicit error handling, generates immutable audit records, and integrates seamlessly with Schema Validation Pipelines for downstream ledger writes.
The flow below shows how each raw payload is coerced, validated against the strict contract, and routed to exactly one channel:
def ingest_afc_stream(raw_events: list[dict]) -> tuple[list[AFCTapEvent], list[dict]]:
"""
Validates AFC tap events and routes them to valid/rejected channels.
Returns a tuple of (validated_models, audit_records).
"""
valid_events: list[AFCTapEvent] = []
rejected_audits: list[dict] = []
for idx, payload in enumerate(raw_events):
try:
# model_validate raises ValidationError on type mismatch, missing fields, or extra keys
event = AFCTapEvent.model_validate(payload)
valid_events.append(event)
except ValidationError as e:
# Extract structured error context for compliance auditing
error_details = [
{"field": str(err["loc"][0]), "message": err["msg"], "input": err.get("input")}
for err in e.errors()
]
audit_record = {
"ingestion_correlation_id": str(uuid4()),
"raw_index": idx,
"raw_payload": payload,
"validation_errors": error_details,
"rejection_timestamp_utc": datetime.now(timezone.utc).isoformat()
}
logger.warning("AFC payload quarantined: %s", json.dumps(audit_record, default=str))
rejected_audits.append(audit_record)
logger.info("Stream ingestion complete. Valid: %d | Rejected: %d", len(valid_events), len(rejected_audits))
return valid_events, rejected_audits
Transit-Specific Debugging Steps
When reconciliation discrepancies surface, isolate the failure vector using these targeted diagnostics:
- Clock Skew & Timezone Drift: Field validators frequently operate on isolated industrial networks. If
tap_timestampvalidation fails, verify the validator’s NTP sync status. Use thevalidator_skew_msfield to apply deterministic offsets during batch reconciliation rather than mutating raw timestamps. - Legacy Firmware Float Leakage: Older validators transmit
fare_amount_centsas12.0or"12.00". Thecoerce_farevalidator handles this, but if you observeValueErrorexceptions, check for currency symbols or localized decimal separators in the raw payload. - GTFS Trip ID Mismatches: When
gtfs_trip_idisNonebut required for fare capping logic, cross-reference thedevice_idandtap_timestampagainst your GTFS Realtime feed. Missing trip assignments often indicate schedule drift or unlinked vehicle telemetry. - Strict Mode Violations: Pydantic’s
extra="forbid"will reject payloads containing vendor-specific telemetry (e.g.,{"battery_pct": 85}). If your ingestion pipeline drops 100% of events, temporarily switch toextra="ignore"in staging to identify rogue keys, then strip them upstream before production deployment. For deeper type coercion mechanics, reference the official Pydantic documentation and Python’s datetime module.