diff --git a/src/geoguessr_mcp/monitoring/__init__.py b/src/geoguessr_mcp/monitoring/__init__.py index 585e2a7..029210b 100644 --- a/src/geoguessr_mcp/monitoring/__init__.py +++ b/src/geoguessr_mcp/monitoring/__init__.py @@ -1,13 +1,9 @@ """Monitoring module for API endpoint tracking and schema detection.""" -from .endpoint_monitor import EndpointMonitor, endpoint_monitor, MONITORED_ENDPOINTS -from .schema_manager import ( - SchemaDetector, - SchemaRegistry, - EndpointSchema, - SchemaField, - schema_registry, -) +from .endpoint.EndpointMonitor import EndpointMonitor, endpoint_monitor, MONITORED_ENDPOINTS +from schema.EndpointSchema import EndpointSchema +from schema.SchemaRegistry import SchemaRegistry, schema_registry +from schema.SchemaDetector import SchemaDetector, SchemaField __all__ = [ "EndpointMonitor", diff --git a/src/geoguessr_mcp/monitoring/endpoint/EndpointDefinition.py b/src/geoguessr_mcp/monitoring/endpoint/EndpointDefinition.py new file mode 100644 index 0000000..2af8eac --- /dev/null +++ b/src/geoguessr_mcp/monitoring/endpoint/EndpointDefinition.py @@ -0,0 +1,19 @@ +""" +Definition of API endpoint data structure to monitor. + +This module provides the `EndpointDefinition` class to encapsulate necessary +details about an API endpoint, such as its path, request method, authentication +requirement, and additional parameters. +""" + +from dataclasses import dataclass, field + +@dataclass +class EndpointDefinition: + """Definition of an API endpoint to monitor.""" + path: str + method: str = "GET" + requires_auth: bool = True + use_game_server: bool = False + params: dict = field(default_factory=dict) + description: str = "" diff --git a/src/geoguessr_mcp/monitoring/endpoint_monitor.py b/src/geoguessr_mcp/monitoring/endpoint/EndpointMonitor.py similarity index 93% rename from src/geoguessr_mcp/monitoring/endpoint_monitor.py rename to src/geoguessr_mcp/monitoring/endpoint/EndpointMonitor.py index bd103c0..b31f2e0 100644 --- a/src/geoguessr_mcp/monitoring/endpoint_monitor.py +++ b/src/geoguessr_mcp/monitoring/endpoint/EndpointMonitor.py @@ -1,35 +1,30 @@ """ -API Endpoint Monitor. +Monitors GeoGuessr API endpoints for availability and schema changes. -This module provides automated monitoring of GeoGuessr API endpoints, -checking their availability and detecting response format changes. +This module is responsible for monitoring a set of known GeoGuessr API endpoints +to ensure their availability and detect schema changes. It periodically checks +each endpoint, logs relevant details about its status, and updates a schema +registry when necessary. + +Classes: + EndpointMonitor: Manages periodic checks of API endpoints, updates schema + registry, and logs activity. """ import asyncio import logging -from dataclasses import dataclass, field from datetime import datetime, UTC from typing import Optional import httpx -from ..config import settings -from .schema_manager import SchemaRegistry, schema_registry +from ...config import settings +from .EndpointDefinition import EndpointDefinition +from .EndpointMonitoringResult import MonitoringResult +from ..schema.SchemaRegistry import SchemaRegistry, schema_registry logger = logging.getLogger(__name__) - -@dataclass -class EndpointDefinition: - """Definition of an API endpoint to monitor.""" - path: str - method: str = "GET" - requires_auth: bool = True - use_game_server: bool = False - params: dict = field(default_factory=dict) - description: str = "" - - # Known GeoGuessr API endpoints to monitor MONITORED_ENDPOINTS = [ # Profile endpoints @@ -121,19 +116,6 @@ MONITORED_ENDPOINTS = [ ), ] - -@dataclass -class MonitoringResult: - """Result of monitoring an endpoint.""" - endpoint: str - is_available: bool - response_code: int - response_time_ms: float - schema_changed: bool - error_message: Optional[str] = None - timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) - - class EndpointMonitor: """ Monitors API endpoints for availability and schema changes. diff --git a/src/geoguessr_mcp/monitoring/endpoint/EndpointMonitoringResult.py b/src/geoguessr_mcp/monitoring/endpoint/EndpointMonitoringResult.py new file mode 100644 index 0000000..0d25fd7 --- /dev/null +++ b/src/geoguessr_mcp/monitoring/endpoint/EndpointMonitoringResult.py @@ -0,0 +1,23 @@ +""" +This module defines a data structure representing the result of monitoring a +network endpoint. + +It contains the necessary details to capture the status and performance of +an endpoint, including its availability, response time, and any errors encountered. +""" + +from dataclasses import dataclass, field +from datetime import datetime, UTC +from typing import Optional + + +@dataclass +class MonitoringResult: + """Result of monitoring an endpoint.""" + endpoint: str + is_available: bool + response_code: int + response_time_ms: float + schema_changed: bool + error_message: Optional[str] = None + timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) diff --git a/src/tests/e2e/__init__.py b/src/geoguessr_mcp/monitoring/endpoint/__init__.py similarity index 100% rename from src/tests/e2e/__init__.py rename to src/geoguessr_mcp/monitoring/endpoint/__init__.py diff --git a/src/geoguessr_mcp/monitoring/schema/EndpointSchema.py b/src/geoguessr_mcp/monitoring/schema/EndpointSchema.py new file mode 100644 index 0000000..8545a27 --- /dev/null +++ b/src/geoguessr_mcp/monitoring/schema/EndpointSchema.py @@ -0,0 +1,97 @@ +""" +Schema definitions and utility methods for managing API endpoints. + +This module provides the `EndpointSchema` class, which offers the ability +to define the schema of an API endpoint, serialize and deserialize data, and +manage metadata such as response codes and availability. The class also +includes helper utilities for handling data transformations and validating +schema information. +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, UTC +from typing import Any, Optional + +from .SchemaField import SchemaField + +logger = logging.getLogger(__name__) + + +@dataclass +class EndpointSchema: + """Schema definition for an API endpoint.""" + endpoint: str + method: str + fields: dict[str, SchemaField] = field(default_factory=dict) + last_updated: datetime = field(default_factory=lambda: datetime.now(UTC)) + schema_hash: str = "" + response_code: int = 200 + is_available: bool = True + error_message: Optional[str] = None + sample_response: Optional[dict] = None + + def to_dict(self) -> dict: + """Convert to dictionary for serialization.""" + return { + "endpoint": self.endpoint, + "method": self.method, + "fields": { + name: { + "name": f.name, + "field_type": f.field_type, + "nullable": f.nullable, + "nested_schema": f.nested_schema, + "example_value": self._serialize_example(f.example_value), + "description": f.description, + } + for name, f in self.fields.items() + }, + "last_updated": self.last_updated.isoformat(), + "schema_hash": self.schema_hash, + "response_code": self.response_code, + "is_available": self.is_available, + "error_message": self.error_message, + "sample_response": self.sample_response, + } + + @staticmethod + def _serialize_example(value: Any) -> Any: + """Safely serialize example values.""" + if isinstance(value, (str, int, float, bool, type(None))): + return value + if isinstance(value, (list, dict)): + return str(value)[:100] + "..." if len(str(value)) > 100 else value + return str(value) + + @classmethod + def from_dict(cls, data: dict) -> "EndpointSchema": + """Create from dictionary.""" + fields = {} + for name, f_data in data.get("fields", {}).items(): + fields[name] = SchemaField( + name=f_data["name"], + field_type=f_data["field_type"], + nullable=f_data.get("nullable", False), + nested_schema=f_data.get("nested_schema"), + example_value=f_data.get("example_value"), + description=f_data.get("description", ""), + ) + + last_updated = data.get("last_updated") + if isinstance(last_updated, str): + last_updated = datetime.fromisoformat(last_updated) + else: + last_updated = datetime.now(UTC) + + return cls( + endpoint=data["endpoint"], + method=data.get("method", "GET"), + fields=fields, + last_updated=last_updated, + schema_hash=data.get("schema_hash", ""), + response_code=data.get("response_code", 200), + is_available=data.get("is_available", True), + error_message=data.get("error_message"), + sample_response=data.get("sample_response"), + ) diff --git a/src/geoguessr_mcp/monitoring/schema/SchemaDetector.py b/src/geoguessr_mcp/monitoring/schema/SchemaDetector.py new file mode 100644 index 0000000..b242a22 --- /dev/null +++ b/src/geoguessr_mcp/monitoring/schema/SchemaDetector.py @@ -0,0 +1,127 @@ +""" +This module provides a dynamic JSON schema detection and analysis utility. + +It includes functionality to infer types of various data values, analyze JSON +response structures, compute schema hashes, and identify nested schemas. +The module is particularly useful for understanding and working with +dynamic JSON datasets and detecting changes in schema over time. +""" + +import hashlib +import json +import logging +from datetime import datetime +from typing import Any + +from .SchemaField import SchemaField + +logger = logging.getLogger(__name__) + + +class SchemaDetector: + """Detects and analyzes JSON response schemas dynamically.""" + + @staticmethod + def detect_type(value: Any) -> str: + """Detect the type of value.""" + if value is None: + return "null" + if isinstance(value, bool): + return "boolean" + if isinstance(value, int): + return "integer" + if isinstance(value, float): + return "number" + if isinstance(value, str): + # Try to detect special string types + if SchemaDetector._is_iso_datetime(value): + return "datetime" + if SchemaDetector._is_uuid(value): + return "uuid" + if SchemaDetector._is_url(value): + return "url" + return "string" + if isinstance(value, list): + return "array" + if isinstance(value, dict): + return "object" + return "unknown" + + @staticmethod + def _is_iso_datetime(value: str) -> bool: + """Check if string is ISO datetime format.""" + try: + datetime.fromisoformat(value.replace("Z", "+00:00")) + return True + except (ValueError, AttributeError): + return False + + @staticmethod + def _is_uuid(value: str) -> bool: + """Check if string is UUID format.""" + import re + uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' + return bool(re.match(uuid_pattern, value.lower())) + + @staticmethod + def _is_url(value: str) -> bool: + """Check if string is URL format.""" + return value.startswith(("http://", "https://")) + + def analyze_response(self, data: Any, max_depth: int = 5) -> dict[str, SchemaField]: + """ + Analyze a JSON response and extract its schema. + + Args: + data: The JSON response data + max_depth: Maximum depth for nested object analysis + + Returns: + Dictionary mapping field names to SchemaField objects + """ + if not isinstance(data, dict): + return {} + + fields = {} + self._analyze_object(data, fields, "", max_depth) + return fields + + def _analyze_object( + self, + obj: dict, + fields: dict, + prefix: str, + remaining_depth: int + ) -> None: + """Recursively analyze an object and extract field information.""" + if remaining_depth <= 0: + return + + for key, value in obj.items(): + field_name = f"{prefix}.{key}" if prefix else key + field_type = self.detect_type(value) + + nested_schema = None + if field_type == "object" and isinstance(value, dict): + nested_schema = {} + self._analyze_object(value, nested_schema, "", remaining_depth - 1) + elif field_type == "array" and value and isinstance(value[0], dict): + nested_schema = {} + self._analyze_object(value[0], nested_schema, "", remaining_depth - 1) + + fields[field_name] = SchemaField( + name=field_name, + field_type=field_type, + nullable=value is None, + nested_schema=nested_schema if nested_schema else None, + example_value=value, + ) + + @staticmethod + def compute_schema_hash(fields: dict[str, SchemaField]) -> str: + """Compute a hash of the schema for change detection.""" + schema_repr = json.dumps( + {name: (f.field_type, f.nullable) for name, f in sorted(fields.items())}, + sort_keys=True + ) + return hashlib.sha256(schema_repr.encode()).hexdigest()[:16] diff --git a/src/geoguessr_mcp/monitoring/schema/SchemaField.py b/src/geoguessr_mcp/monitoring/schema/SchemaField.py new file mode 100644 index 0000000..e50b18c --- /dev/null +++ b/src/geoguessr_mcp/monitoring/schema/SchemaField.py @@ -0,0 +1,20 @@ +""" +Represents a single field in a schema. + +This module defines a dataclass that encapsulates the attributes of +a schema field, including its name, type, and other relevant metadata. +""" + +from dataclasses import dataclass +from typing import Any, Optional + + +@dataclass +class SchemaField: + """Represents a single field in a schema.""" + name: str + field_type: str + nullable: bool = False + nested_schema: Optional[dict] = None + example_value: Any = None + description: str = "" diff --git a/src/geoguessr_mcp/monitoring/schema_manager.py b/src/geoguessr_mcp/monitoring/schema/SchemaRegistry.py similarity index 53% rename from src/geoguessr_mcp/monitoring/schema_manager.py rename to src/geoguessr_mcp/monitoring/schema/SchemaRegistry.py index 909ab18..91d7cee 100644 --- a/src/geoguessr_mcp/monitoring/schema_manager.py +++ b/src/geoguessr_mcp/monitoring/schema/SchemaRegistry.py @@ -1,223 +1,29 @@ """ -Dynamic Schema Detection and Management. +Handles schema versioning, storage, and updates for API endpoints. -This module automatically detects, tracks, and adapts to changes in API response formats. -It maintains a versioned history of schemas and provides tools for the LLM to understand -the current data structure. +The module provides functionality to detect and manage changes in API response +schemas, maintain history of schema versions, and persist schema data to disk. +It supports use cases such as tracking endpoint availability, generating schema +summaries, and providing dynamic endpoint descriptions. + +Classes: + SchemaRegistry """ -import hashlib import json import logging -from dataclasses import dataclass, field from datetime import datetime, UTC from pathlib import Path from typing import Any, Optional -from ..config import settings + +from ...config import settings +from .EndpointSchema import EndpointSchema +from .SchemaDetector import SchemaDetector logger = logging.getLogger(__name__) -@dataclass -class SchemaField: - """Represents a single field in a schema.""" - name: str - field_type: str - nullable: bool = False - nested_schema: Optional[dict] = None - example_value: Any = None - description: str = "" - - -@dataclass -class EndpointSchema: - """Schema definition for an API endpoint.""" - endpoint: str - method: str - fields: dict[str, SchemaField] = field(default_factory=dict) - last_updated: datetime = field(default_factory=lambda: datetime.now(UTC)) - schema_hash: str = "" - response_code: int = 200 - is_available: bool = True - error_message: Optional[str] = None - sample_response: Optional[dict] = None - - def to_dict(self) -> dict: - """Convert to dictionary for serialization.""" - return { - "endpoint": self.endpoint, - "method": self.method, - "fields": { - name: { - "name": f.name, - "field_type": f.field_type, - "nullable": f.nullable, - "nested_schema": f.nested_schema, - "example_value": self._serialize_example(f.example_value), - "description": f.description, - } - for name, f in self.fields.items() - }, - "last_updated": self.last_updated.isoformat(), - "schema_hash": self.schema_hash, - "response_code": self.response_code, - "is_available": self.is_available, - "error_message": self.error_message, - "sample_response": self.sample_response, - } - - @staticmethod - def _serialize_example(value: Any) -> Any: - """Safely serialize example values.""" - if isinstance(value, (str, int, float, bool, type(None))): - return value - if isinstance(value, (list, dict)): - return str(value)[:100] + "..." if len(str(value)) > 100 else value - return str(value) - - @classmethod - def from_dict(cls, data: dict) -> "EndpointSchema": - """Create from dictionary.""" - fields = {} - for name, f_data in data.get("fields", {}).items(): - fields[name] = SchemaField( - name=f_data["name"], - field_type=f_data["field_type"], - nullable=f_data.get("nullable", False), - nested_schema=f_data.get("nested_schema"), - example_value=f_data.get("example_value"), - description=f_data.get("description", ""), - ) - - last_updated = data.get("last_updated") - if isinstance(last_updated, str): - last_updated = datetime.fromisoformat(last_updated) - else: - last_updated = datetime.now(UTC) - - return cls( - endpoint=data["endpoint"], - method=data.get("method", "GET"), - fields=fields, - last_updated=last_updated, - schema_hash=data.get("schema_hash", ""), - response_code=data.get("response_code", 200), - is_available=data.get("is_available", True), - error_message=data.get("error_message"), - sample_response=data.get("sample_response"), - ) - - -class SchemaDetector: - """Detects and analyzes JSON response schemas dynamically.""" - - @staticmethod - def detect_type(value: Any) -> str: - """Detect the type of value.""" - if value is None: - return "null" - if isinstance(value, bool): - return "boolean" - if isinstance(value, int): - return "integer" - if isinstance(value, float): - return "number" - if isinstance(value, str): - # Try to detect special string types - if SchemaDetector._is_iso_datetime(value): - return "datetime" - if SchemaDetector._is_uuid(value): - return "uuid" - if SchemaDetector._is_url(value): - return "url" - return "string" - if isinstance(value, list): - return "array" - if isinstance(value, dict): - return "object" - return "unknown" - - @staticmethod - def _is_iso_datetime(value: str) -> bool: - """Check if string is ISO datetime format.""" - try: - datetime.fromisoformat(value.replace("Z", "+00:00")) - return True - except (ValueError, AttributeError): - return False - - @staticmethod - def _is_uuid(value: str) -> bool: - """Check if string is UUID format.""" - import re - uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' - return bool(re.match(uuid_pattern, value.lower())) - - @staticmethod - def _is_url(value: str) -> bool: - """Check if string is URL format.""" - return value.startswith(("http://", "https://")) - - def analyze_response(self, data: Any, max_depth: int = 5) -> dict[str, SchemaField]: - """ - Analyze a JSON response and extract its schema. - - Args: - data: The JSON response data - max_depth: Maximum depth for nested object analysis - - Returns: - Dictionary mapping field names to SchemaField objects - """ - if not isinstance(data, dict): - return {} - - fields = {} - self._analyze_object(data, fields, "", max_depth) - return fields - - def _analyze_object( - self, - obj: dict, - fields: dict, - prefix: str, - remaining_depth: int - ) -> None: - """Recursively analyze an object and extract field information.""" - if remaining_depth <= 0: - return - - for key, value in obj.items(): - field_name = f"{prefix}.{key}" if prefix else key - field_type = self.detect_type(value) - - nested_schema = None - if field_type == "object" and isinstance(value, dict): - nested_schema = {} - self._analyze_object(value, nested_schema, "", remaining_depth - 1) - elif field_type == "array" and value and isinstance(value[0], dict): - nested_schema = {} - self._analyze_object(value[0], nested_schema, "", remaining_depth - 1) - - fields[field_name] = SchemaField( - name=field_name, - field_type=field_type, - nullable=value is None, - nested_schema=nested_schema if nested_schema else None, - example_value=value, - ) - - @staticmethod - def compute_schema_hash(fields: dict[str, SchemaField]) -> str: - """Compute a hash of the schema for change detection.""" - schema_repr = json.dumps( - {name: (f.field_type, f.nullable) for name, f in sorted(fields.items())}, - sort_keys=True - ) - return hashlib.sha256(schema_repr.encode()).hexdigest()[:16] - - class SchemaRegistry: """ Manages schema storage, versioning, and change detection. diff --git a/src/tests/unit/test_analysis_service.py b/src/geoguessr_mcp/monitoring/schema/__init__.py similarity index 100% rename from src/tests/unit/test_analysis_service.py rename to src/geoguessr_mcp/monitoring/schema/__init__.py diff --git a/src/tests/unit/test_game_service.py b/src/tests/unit/test_game_service.py deleted file mode 100644 index e69de29..0000000