Implementing Pydantic Models for AFC Event Streams

Automated fare collection (AFC) systems generate high-velocity event streams that directly dictate daily revenue reconciliation, audit compliance, and operational reporting. When transit agencies modernize their data infrastructure, transitioning from brittle ETL scripts to deterministic, type-safe ingestion layers becomes non-negotiable. Embedding Pydantic models into your Fare Data Ingestion & GTFS-RT Sync architecture establishes a strict contract for every tap, board, fare adjustment, and penalty event. This approach eliminates silent data corruption, enforces regulatory compliance, and provides revenue analysts with immediate visibility into malformed payloads before they contaminate downstream settlement ledgers.

Deterministic Schema Definition

Transit validators, open-loop payment gateways, and mobile wallet SDKs rarely emit perfectly uniform JSON. Legacy firmware often serializes timestamps as naive strings, represents cents as floats, or appends undocumented metadata fields. The goal is selective strictness: coerce noisy enum and timestamp inputs into canonical forms through mode="before" validators, while marking accounting fields like fare_amount_cents as strict=True so that silent float-to-int coercion—a historic source of revenue leakage—is rejected outright. Pairing this with extra="forbid" ensures undocumented vendor fields never slip into the ledger unnoticed. The following schema enforces those guarantees while normalizing heterogeneous inputs before they enter the ledger.

import logging
import json
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from uuid import uuid4
from pydantic import BaseModel, Field, field_validator, ConfigDict, ValidationError

# Structured audit logging configuration
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("afc_reconciliation")

class TapDirection(str, Enum):
    ENTRY = "entry"
    EXIT = "exit"
    TRANSFER = "transfer"

class FareMediaType(str, Enum):
    SMART_CARD = "smart_card"
    MOBILE_WALLET = "mobile_wallet"
    CONTACTLESS_BANK = "contactless_bank"
    CASH = "cash"

class AFCTapEvent(BaseModel):
    model_config = ConfigDict(extra="forbid")
    event_id: str = Field(..., min_length=1, max_length=64)
    device_id: str = Field(..., pattern=r"^[A-Z0-9_-]+$")
    route_id: str = Field(..., pattern=r"^[A-Z0-9_-]+$")
    stop_id: str = Field(..., min_length=1)
    direction: TapDirection
    media_type: FareMediaType
    tap_timestamp: datetime
    fare_amount_cents: int = Field(ge=0, strict=True)
    agency_id: str = Field(..., min_length=2, max_length=8)
    gtfs_trip_id: Optional[str] = None
    validator_skew_ms: Optional[int] = None

    @field_validator("tap_timestamp", mode="before")
    @classmethod
    def normalize_utc(cls, v: str | datetime) -> datetime:
        """Sanitize legacy timestamps and enforce UTC compliance."""
        if isinstance(v, str):
            dt = datetime.fromisoformat(v.replace("Z", "+00:00"))
        else:
            dt = v
        return dt.astimezone(timezone.utc)

    @field_validator("fare_amount_cents", mode="before")
    @classmethod
    def coerce_fare(cls, v: float | str | int) -> int:
        """Handle firmware quirks where cents are transmitted as floats or strings."""
        if isinstance(v, str):
            return int(float(v.replace(",", "")))
        return int(v)

Ingestion Pipeline & Audit Trail

Revenue reconciliation requires deterministic routing: valid events proceed to settlement, while invalid payloads are quarantined with full forensic context. The ingestion function below implements explicit error handling, generates immutable audit records, and integrates seamlessly with Schema Validation Pipelines for downstream ledger writes.

The flow below shows how each raw payload is coerced, validated against the strict contract, and routed to exactly one channel:

flowchart TD R["Raw event dict"] --> B["before-validators<br/>(normalize UTC, coerce cents)"] B --> M["model_validate<br/>(extra=forbid, strict fields)"] M -->|"ok"| V["Valid AFCTapEvent<br/>(settlement)"] M -->|"ValidationError"| A["Build audit record<br/>(correlation_id, raw, errors)"] A --> Q["Rejected / quarantine<br/>(forensic context)"]
def ingest_afc_stream(raw_events: list[dict]) -> tuple[list[AFCTapEvent], list[dict]]:
    """
    Validates AFC tap events and routes them to valid/rejected channels.
    Returns a tuple of (validated_models, audit_records).
    """
    valid_events: list[AFCTapEvent] = []
    rejected_audits: list[dict] = []

    for idx, payload in enumerate(raw_events):
        try:
            # model_validate raises ValidationError on type mismatch, missing fields, or extra keys
            event = AFCTapEvent.model_validate(payload)
            valid_events.append(event)
        except ValidationError as e:
            # Extract structured error context for compliance auditing
            error_details = [
                {"field": str(err["loc"][0]), "message": err["msg"], "input": err.get("input")}
                for err in e.errors()
            ]
            audit_record = {
                "ingestion_correlation_id": str(uuid4()),
                "raw_index": idx,
                "raw_payload": payload,
                "validation_errors": error_details,
                "rejection_timestamp_utc": datetime.now(timezone.utc).isoformat()
            }
            logger.warning("AFC payload quarantined: %s", json.dumps(audit_record, default=str))
            rejected_audits.append(audit_record)

    logger.info("Stream ingestion complete. Valid: %d | Rejected: %d", len(valid_events), len(rejected_audits))
    return valid_events, rejected_audits

Transit-Specific Debugging Steps

When reconciliation discrepancies surface, isolate the failure vector using these targeted diagnostics:

  1. Clock Skew & Timezone Drift: Field validators frequently operate on isolated industrial networks. If tap_timestamp validation fails, verify the validator’s NTP sync status. Use the validator_skew_ms field to apply deterministic offsets during batch reconciliation rather than mutating raw timestamps.
  2. Legacy Firmware Float Leakage: Older validators transmit fare_amount_cents as 12.0 or "12.00". The coerce_fare validator handles this, but if you observe ValueError exceptions, check for currency symbols or localized decimal separators in the raw payload.
  3. GTFS Trip ID Mismatches: When gtfs_trip_id is None but required for fare capping logic, cross-reference the device_id and tap_timestamp against your GTFS Realtime feed. Missing trip assignments often indicate schedule drift or unlinked vehicle telemetry.
  4. Strict Mode Violations: Pydantic’s extra="forbid" will reject payloads containing vendor-specific telemetry (e.g., {"battery_pct": 85}). If your ingestion pipeline drops 100% of events, temporarily switch to extra="ignore" in staging to identify rogue keys, then strip them upstream before production deployment. For deeper type coercion mechanics, reference the official Pydantic documentation and Python’s datetime module.