Introduce monitoring module: added EndpointMonitor for API endpoint checks and SchemaRegistry for dynamic schema tracking. Centralized endpoint definitions and implemented schema change detection.

This commit is contained in:
Yûki VACHOT 2025-11-29 00:04:16 +01:00
parent 4f74343efc
commit a988aaa04f
3 changed files with 857 additions and 0 deletions

View file

@ -0,0 +1,21 @@
"""Monitoring module for API endpoint tracking and schema detection."""
from .endpoint_monitor import EndpointMonitor, endpoint_monitor, MONITORED_ENDPOINTS
from .schema_manager import (
SchemaDetector,
SchemaRegistry,
EndpointSchema,
SchemaField,
schema_registry,
)
__all__ = [
"EndpointMonitor",
"endpoint_monitor",
"MONITORED_ENDPOINTS",
"SchemaDetector",
"SchemaRegistry",
"EndpointSchema",
"SchemaField",
"schema_registry",
]

View file

@ -0,0 +1,397 @@
"""
API Endpoint Monitor.
This module provides automated monitoring of GeoGuessr API endpoints,
checking their availability and detecting response format changes.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Optional
import httpx
from ..config import settings
from .schema_manager import SchemaRegistry, schema_registry
logger = logging.getLogger(__name__)
@dataclass
class EndpointDefinition:
"""Definition of an API endpoint to monitor."""
path: str
method: str = "GET"
requires_auth: bool = True
use_game_server: bool = False
params: dict = field(default_factory=dict)
description: str = ""
# Known GeoGuessr API endpoints to monitor
MONITORED_ENDPOINTS = [
# Profile endpoints
EndpointDefinition(
path="/v3/profiles",
description="Current user profile",
),
EndpointDefinition(
path="/v3/profiles/stats",
description="User statistics",
),
EndpointDefinition(
path="/v4/stats/me",
description="Extended user statistics",
),
EndpointDefinition(
path="/v3/profiles/achievements",
description="User achievements",
),
EndpointDefinition(
path="/v3/profiles/maps",
description="User's custom maps",
),
# Game endpoints
EndpointDefinition(
path="/v3/social/events/unfinishedgames",
description="Unfinished games",
),
# Social endpoints
EndpointDefinition(
path="/v4/feed/private",
params={"count": 10, "page": 0},
description="Private activity feed",
),
EndpointDefinition(
path="/v3/social/friends/summary",
description="Friends summary",
),
EndpointDefinition(
path="/v3/social/badges/unclaimed",
description="Unclaimed badges",
),
EndpointDefinition(
path="/v3/social/maps/browse/personalized",
description="Personalized map recommendations",
),
# Competitive endpoints
EndpointDefinition(
path="/v4/seasons/active/stats",
description="Active season statistics",
),
# Explorer endpoints
EndpointDefinition(
path="/v3/explorer",
description="Explorer mode progress",
),
# Objectives endpoints
EndpointDefinition(
path="/v4/objectives",
description="Current objectives",
),
EndpointDefinition(
path="/v4/objectives/unclaimed",
description="Unclaimed objective rewards",
),
# Subscription endpoints
EndpointDefinition(
path="/v3/subscriptions",
description="Subscription information",
),
# Challenge endpoints
EndpointDefinition(
path="/v3/challenges/daily-challenges/today",
description="Today's daily challenge",
),
# Game server endpoints
EndpointDefinition(
path="/tournaments",
use_game_server=True,
description="Tournament information",
),
]
@dataclass
class MonitoringResult:
"""Result of monitoring an endpoint."""
endpoint: str
is_available: bool
response_code: int
response_time_ms: float
schema_changed: bool
error_message: Optional[str] = None
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
class EndpointMonitor:
"""
Monitors API endpoints for availability and schema changes.
This class runs periodic checks on all known endpoints, updating the
schema registry with any changes detected.
"""
def __init__(
self,
registry: Optional[SchemaRegistry] = None,
ncfa_cookie: Optional[str] = None,
):
self.registry = registry or schema_registry
self.ncfa_cookie = ncfa_cookie or settings.DEFAULT_NCFA_COOKIE
self.results: list[MonitoringResult] = []
self._running = False
self._task: Optional[asyncio.Task] = None
async def check_endpoint(
self,
endpoint: EndpointDefinition,
client: httpx.AsyncClient,
) -> MonitoringResult:
"""
Check a single endpoint and update its schema.
Args:
endpoint: The endpoint definition to check
client: HTTP client to use
Returns:
MonitoringResult with check details
"""
base_url = (
settings.GAME_SERVER_URL
if endpoint.use_game_server
else settings.GEOGUESSR_API_URL
)
url = f"{base_url}{endpoint.path}"
start_time = datetime.now(UTC)
try:
response = await client.request(
endpoint.method,
url,
params=endpoint.params if endpoint.params else None,
timeout=settings.REQUEST_TIMEOUT,
)
response_time = (datetime.now(UTC) - start_time).total_seconds() * 1000
if response.status_code == 200:
try:
data = response.json()
schema, changed = self.registry.update_schema(
endpoint.path,
data,
response.status_code,
endpoint.method,
)
return MonitoringResult(
endpoint=endpoint.path,
is_available=True,
response_code=response.status_code,
response_time_ms=response_time,
schema_changed=changed,
)
except Exception as e:
logger.warning(f"Failed to parse response from {endpoint.path}: {e}")
return MonitoringResult(
endpoint=endpoint.path,
is_available=True,
response_code=response.status_code,
response_time_ms=response_time,
schema_changed=False,
error_message=f"Parse error: {str(e)}",
)
else:
self.registry.mark_unavailable(
endpoint.path,
f"HTTP {response.status_code}",
response.status_code,
)
return MonitoringResult(
endpoint=endpoint.path,
is_available=False,
response_code=response.status_code,
response_time_ms=response_time,
schema_changed=False,
error_message=f"HTTP {response.status_code}",
)
except httpx.TimeoutException:
self.registry.mark_unavailable(endpoint.path, "Timeout")
return MonitoringResult(
endpoint=endpoint.path,
is_available=False,
response_code=0,
response_time_ms=settings.REQUEST_TIMEOUT * 1000,
schema_changed=False,
error_message="Request timeout",
)
except Exception as e:
self.registry.mark_unavailable(endpoint.path, str(e))
return MonitoringResult(
endpoint=endpoint.path,
is_available=False,
response_code=0,
response_time_ms=0,
schema_changed=False,
error_message=str(e),
)
async def run_full_check(self) -> list[MonitoringResult]:
"""
Run a full check of all monitored endpoints.
Returns:
List of monitoring results for all endpoints
"""
if not self.ncfa_cookie:
logger.warning("No authentication cookie available for monitoring")
return []
results = []
async with httpx.AsyncClient() as client:
client.cookies.set("_ncfa", self.ncfa_cookie, domain="www.geoguessr.com")
for endpoint in MONITORED_ENDPOINTS:
try:
result = await self.check_endpoint(endpoint, client)
results.append(result)
status = "" if result.is_available else ""
changed = " [SCHEMA CHANGED]" if result.schema_changed else ""
logger.info(
f"{status} {endpoint.path}: "
f"{result.response_code} ({result.response_time_ms:.0f}ms){changed}"
)
# Small delay between requests to avoid rate limiting
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error checking {endpoint.path}: {e}")
results.append(MonitoringResult(
endpoint=endpoint.path,
is_available=False,
response_code=0,
response_time_ms=0,
schema_changed=False,
error_message=str(e),
))
self.results = results
return results
async def start_periodic_monitoring(self) -> None:
"""Start the periodic monitoring background task."""
if self._running:
logger.warning("Monitoring already running")
return
self._running = True
self._task = asyncio.create_task(self._monitoring_loop())
logger.info(
f"Started periodic monitoring (interval: {settings.MONITORING_INTERVAL_HOURS}h)"
)
async def stop_monitoring(self) -> None:
"""Stop the periodic monitoring background task."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("Stopped periodic monitoring")
async def _monitoring_loop(self) -> None:
"""Background loop for periodic monitoring."""
while self._running:
try:
logger.info("Running scheduled endpoint check...")
await self.run_full_check()
# Wait for next check interval
await asyncio.sleep(settings.MONITORING_INTERVAL_HOURS * 3600)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in monitoring loop: {e}")
# Wait a bit before retrying on error
await asyncio.sleep(60)
def get_monitoring_report(self) -> dict:
"""
Generate a monitoring report for all endpoints.
Returns:
Dictionary with monitoring summary and details
"""
if not self.results:
return {
"status": "no_data",
"message": "No monitoring data available. Run a check first.",
}
available = [r for r in self.results if r.is_available]
unavailable = [r for r in self.results if not r.is_available]
changed = [r for r in self.results if r.schema_changed]
avg_response_time = (
sum(r.response_time_ms for r in available) / len(available)
if available else 0
)
return {
"status": "ok" if len(unavailable) == 0 else "degraded",
"summary": {
"total_endpoints": len(self.results),
"available": len(available),
"unavailable": len(unavailable),
"schema_changes": len(changed),
"average_response_time_ms": round(avg_response_time, 2),
"last_check": self.results[0].timestamp.isoformat() if self.results else None,
},
"available_endpoints": [
{
"endpoint": r.endpoint,
"response_code": r.response_code,
"response_time_ms": round(r.response_time_ms, 2),
"schema_changed": r.schema_changed,
}
for r in available
],
"unavailable_endpoints": [
{
"endpoint": r.endpoint,
"error": r.error_message,
"response_code": r.response_code,
}
for r in unavailable
],
"schema_changes": [
{
"endpoint": r.endpoint,
"timestamp": r.timestamp.isoformat(),
}
for r in changed
],
}
# Global monitor instance
endpoint_monitor = EndpointMonitor()

View file

@ -0,0 +1,439 @@
"""
Dynamic Schema Detection and Management.
This module automatically detects, tracks, and adapts to changes in API response formats.
It maintains a versioned history of schemas and provides tools for the LLM to understand
the current data structure.
"""
import hashlib
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from pathlib import Path
from typing import Any, Optional
from ..config import settings
logger = logging.getLogger(__name__)
@dataclass
class SchemaField:
"""Represents a single field in a schema."""
name: str
field_type: str
nullable: bool = False
nested_schema: Optional[dict] = None
example_value: Any = None
description: str = ""
@dataclass
class EndpointSchema:
"""Schema definition for an API endpoint."""
endpoint: str
method: str
fields: dict[str, SchemaField] = field(default_factory=dict)
last_updated: datetime = field(default_factory=lambda: datetime.now(UTC))
schema_hash: str = ""
response_code: int = 200
is_available: bool = True
error_message: Optional[str] = None
sample_response: Optional[dict] = None
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"endpoint": self.endpoint,
"method": self.method,
"fields": {
name: {
"name": f.name,
"field_type": f.field_type,
"nullable": f.nullable,
"nested_schema": f.nested_schema,
"example_value": self._serialize_example(f.example_value),
"description": f.description,
}
for name, f in self.fields.items()
},
"last_updated": self.last_updated.isoformat(),
"schema_hash": self.schema_hash,
"response_code": self.response_code,
"is_available": self.is_available,
"error_message": self.error_message,
"sample_response": self.sample_response,
}
@staticmethod
def _serialize_example(value: Any) -> Any:
"""Safely serialize example values."""
if isinstance(value, (str, int, float, bool, type(None))):
return value
if isinstance(value, (list, dict)):
return str(value)[:100] + "..." if len(str(value)) > 100 else value
return str(value)
@classmethod
def from_dict(cls, data: dict) -> "EndpointSchema":
"""Create from dictionary."""
fields = {}
for name, f_data in data.get("fields", {}).items():
fields[name] = SchemaField(
name=f_data["name"],
field_type=f_data["field_type"],
nullable=f_data.get("nullable", False),
nested_schema=f_data.get("nested_schema"),
example_value=f_data.get("example_value"),
description=f_data.get("description", ""),
)
last_updated = data.get("last_updated")
if isinstance(last_updated, str):
last_updated = datetime.fromisoformat(last_updated)
else:
last_updated = datetime.now(UTC)
return cls(
endpoint=data["endpoint"],
method=data.get("method", "GET"),
fields=fields,
last_updated=last_updated,
schema_hash=data.get("schema_hash", ""),
response_code=data.get("response_code", 200),
is_available=data.get("is_available", True),
error_message=data.get("error_message"),
sample_response=data.get("sample_response"),
)
class SchemaDetector:
"""Detects and analyzes JSON response schemas dynamically."""
@staticmethod
def detect_type(value: Any) -> str:
"""Detect the type of value."""
if value is None:
return "null"
if isinstance(value, bool):
return "boolean"
if isinstance(value, int):
return "integer"
if isinstance(value, float):
return "number"
if isinstance(value, str):
# Try to detect special string types
if SchemaDetector._is_iso_datetime(value):
return "datetime"
if SchemaDetector._is_uuid(value):
return "uuid"
if SchemaDetector._is_url(value):
return "url"
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return "unknown"
@staticmethod
def _is_iso_datetime(value: str) -> bool:
"""Check if string is ISO datetime format."""
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return True
except (ValueError, AttributeError):
return False
@staticmethod
def _is_uuid(value: str) -> bool:
"""Check if string is UUID format."""
import re
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, value.lower()))
@staticmethod
def _is_url(value: str) -> bool:
"""Check if string is URL format."""
return value.startswith(("http://", "https://"))
def analyze_response(self, data: Any, max_depth: int = 5) -> dict[str, SchemaField]:
"""
Analyze a JSON response and extract its schema.
Args:
data: The JSON response data
max_depth: Maximum depth for nested object analysis
Returns:
Dictionary mapping field names to SchemaField objects
"""
if not isinstance(data, dict):
return {}
fields = {}
self._analyze_object(data, fields, "", max_depth)
return fields
def _analyze_object(
self,
obj: dict,
fields: dict,
prefix: str,
remaining_depth: int
) -> None:
"""Recursively analyze an object and extract field information."""
if remaining_depth <= 0:
return
for key, value in obj.items():
field_name = f"{prefix}.{key}" if prefix else key
field_type = self.detect_type(value)
nested_schema = None
if field_type == "object" and isinstance(value, dict):
nested_schema = {}
self._analyze_object(value, nested_schema, "", remaining_depth - 1)
elif field_type == "array" and value and isinstance(value[0], dict):
nested_schema = {}
self._analyze_object(value[0], nested_schema, "", remaining_depth - 1)
fields[field_name] = SchemaField(
name=field_name,
field_type=field_type,
nullable=value is None,
nested_schema=nested_schema if nested_schema else None,
example_value=value,
)
@staticmethod
def compute_schema_hash(fields: dict[str, SchemaField]) -> str:
"""Compute a hash of the schema for change detection."""
schema_repr = json.dumps(
{name: (f.field_type, f.nullable) for name, f in sorted(fields.items())},
sort_keys=True
)
return hashlib.sha256(schema_repr.encode()).hexdigest()[:16]
class SchemaRegistry:
"""
Manages schema storage, versioning, and change detection.
Schemas are persisted to disk and loaded on startup, allowing the system
to track changes over time and adapt automatically.
"""
def __init__(self, cache_dir: Optional[str] = None):
self.cache_dir = Path(cache_dir or settings.SCHEMA_CACHE_DIR)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.schemas: dict[str, EndpointSchema] = {}
self.schema_history: dict[str, list[EndpointSchema]] = {}
self.detector = SchemaDetector()
self._load_cached_schemas()
def _get_schema_file(self) -> Path:
"""Get the path to the schema cache file."""
return self.cache_dir / "schemas.json"
def _get_history_file(self) -> Path:
"""Get the path to the schema history file."""
return self.cache_dir / "schema_history.json"
def _load_cached_schemas(self) -> None:
"""Load schemas from disk cache."""
schema_file = self._get_schema_file()
if schema_file.exists():
try:
with open(schema_file) as f:
data = json.load(f)
for endpoint, schema_data in data.items():
self.schemas[endpoint] = EndpointSchema.from_dict(schema_data)
logger.info(f"Loaded {len(self.schemas)} cached schemas")
except Exception as e:
logger.warning(f"Failed to load cached schemas: {e}")
history_file = self._get_history_file()
if history_file.exists():
try:
with open(history_file) as f:
data = json.load(f)
for endpoint, history in data.items():
self.schema_history[endpoint] = [
EndpointSchema.from_dict(h) for h in history
]
except Exception as e:
logger.warning(f"Failed to load schema history: {e}")
def _save_schemas(self) -> None:
"""Save schemas to disk cache."""
try:
with open(self._get_schema_file(), "w") as f:
json.dump(
{ep: schema.to_dict() for ep, schema in self.schemas.items()},
f,
indent=2
)
with open(self._get_history_file(), "w") as f:
json.dump(
{
ep: [h.to_dict() for h in history[-10:]] # Keep last 10 versions
for ep, history in self.schema_history.items()
},
f,
indent=2
)
except Exception as e:
logger.error(f"Failed to save schemas: {e}")
def update_schema(
self,
endpoint: str,
response_data: Any,
response_code: int = 200,
method: str = "GET"
) -> tuple[EndpointSchema, bool]:
"""
Update schema for an endpoint based on response data.
Args:
endpoint: The API endpoint
response_data: The JSON response data
response_code: HTTP response code
method: HTTP method
Returns:
Tuple of (updated schema, whether schema changed)
"""
fields = self.detector.analyze_response(response_data)
new_hash = self.detector.compute_schema_hash(fields)
existing_schema = self.schemas.get(endpoint)
schema_changed = existing_schema is None or existing_schema.schema_hash != new_hash
new_schema = EndpointSchema(
endpoint=endpoint,
method=method,
fields=fields,
last_updated=datetime.now(UTC),
schema_hash=new_hash,
response_code=response_code,
is_available=True,
sample_response=self._truncate_sample(response_data),
)
if schema_changed:
if endpoint not in self.schema_history:
self.schema_history[endpoint] = []
if existing_schema:
self.schema_history[endpoint].append(existing_schema)
logger.info(f"Schema changed for {endpoint}: {new_hash}")
self.schemas[endpoint] = new_schema
self._save_schemas()
return new_schema, schema_changed
def mark_unavailable(
self,
endpoint: str,
error_message: str,
response_code: int = 0
) -> None:
"""Mark an endpoint as unavailable."""
if endpoint in self.schemas:
self.schemas[endpoint].is_available = False
self.schemas[endpoint].error_message = error_message
self.schemas[endpoint].response_code = response_code
self.schemas[endpoint].last_updated = datetime.now(UTC)
else:
self.schemas[endpoint] = EndpointSchema(
endpoint=endpoint,
method="GET",
is_available=False,
error_message=error_message,
response_code=response_code,
)
self._save_schemas()
def get_schema(self, endpoint: str) -> Optional[EndpointSchema]:
"""Get the current schema for an endpoint."""
return self.schemas.get(endpoint)
def get_all_schemas(self) -> dict[str, EndpointSchema]:
"""Get all registered schemas."""
return self.schemas.copy()
def get_available_endpoints(self) -> list[str]:
"""Get list of currently available endpoints."""
return [ep for ep, schema in self.schemas.items() if schema.is_available]
def get_schema_summary(self) -> dict:
"""Get a summary of all schemas for LLM context."""
return {
"total_endpoints": len(self.schemas),
"available_endpoints": len(self.get_available_endpoints()),
"endpoints": {
endpoint: {
"available": schema.is_available,
"last_updated": schema.last_updated.isoformat(),
"field_count": len(schema.fields),
"fields": list(schema.fields.keys())[:20], # Limit for context
"response_code": schema.response_code,
}
for endpoint, schema in self.schemas.items()
}
}
def generate_dynamic_description(self, endpoint: str) -> str:
"""
Generate a dynamic description of an endpoint's response format.
This is used to provide context to the LLM about what data is available.
"""
schema = self.get_schema(endpoint)
if not schema:
return f"No schema information available for {endpoint}"
if not schema.is_available:
return f"Endpoint {endpoint} is currently unavailable: {schema.error_message}"
lines = [
f"Endpoint: {endpoint}",
f"Method: {schema.method}",
f"Last Updated: {schema.last_updated.isoformat()}",
f"Status: {'Available' if schema.is_available else 'Unavailable'}",
"",
"Response Fields:",
]
for name, item in sorted(schema.fields.items()):
nullable_str = " (nullable)" if item.nullable else ""
lines.append(f" - {name}: {item.field_type}{nullable_str}")
if item.nested_schema:
lines.append(f" Nested fields: {list(item.nested_schema.keys())}")
return "\n".join(lines)
@staticmethod
def _truncate_sample(data: Any, max_items: int = 3) -> Any:
"""Truncate sample response for storage."""
if isinstance(data, dict):
return {
k: SchemaRegistry._truncate_sample(v, max_items)
for k, v in list(data.items())[:20]
}
if isinstance(data, list):
return [
SchemaRegistry._truncate_sample(item, max_items)
for item in data[:max_items]
]
if isinstance(data, str) and len(data) > 200:
return data[:200] + "..."
return data
# Global registry instance
schema_registry = SchemaRegistry()