Refactor monitoring module: modularized EndpointMonitor and SchemaRegistry into separate submodules under endpoint and schema respectively. Centralized endpoint definitions, improved structure, and updated imports accordingly.

This commit is contained in:
Yûki VACHOT 2025-11-29 00:37:27 +01:00
parent aad2bc93ea
commit 80631f6f44
11 changed files with 315 additions and 245 deletions

View file

@ -1,13 +1,9 @@
"""Monitoring module for API endpoint tracking and schema detection."""
from .endpoint_monitor import EndpointMonitor, endpoint_monitor, MONITORED_ENDPOINTS
from .schema_manager import (
SchemaDetector,
SchemaRegistry,
EndpointSchema,
SchemaField,
schema_registry,
)
from .endpoint.EndpointMonitor import EndpointMonitor, endpoint_monitor, MONITORED_ENDPOINTS
from schema.EndpointSchema import EndpointSchema
from schema.SchemaRegistry import SchemaRegistry, schema_registry
from schema.SchemaDetector import SchemaDetector, SchemaField
__all__ = [
"EndpointMonitor",

View file

@ -0,0 +1,19 @@
"""
Definition of API endpoint data structure to monitor.
This module provides the `EndpointDefinition` class to encapsulate necessary
details about an API endpoint, such as its path, request method, authentication
requirement, and additional parameters.
"""
from dataclasses import dataclass, field
@dataclass
class EndpointDefinition:
"""Definition of an API endpoint to monitor."""
path: str
method: str = "GET"
requires_auth: bool = True
use_game_server: bool = False
params: dict = field(default_factory=dict)
description: str = ""

View file

@ -1,35 +1,30 @@
"""
API Endpoint Monitor.
Monitors GeoGuessr API endpoints for availability and schema changes.
This module provides automated monitoring of GeoGuessr API endpoints,
checking their availability and detecting response format changes.
This module is responsible for monitoring a set of known GeoGuessr API endpoints
to ensure their availability and detect schema changes. It periodically checks
each endpoint, logs relevant details about its status, and updates a schema
registry when necessary.
Classes:
EndpointMonitor: Manages periodic checks of API endpoints, updates schema
registry, and logs activity.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Optional
import httpx
from ..config import settings
from .schema_manager import SchemaRegistry, schema_registry
from ...config import settings
from .EndpointDefinition import EndpointDefinition
from .EndpointMonitoringResult import MonitoringResult
from ..schema.SchemaRegistry import SchemaRegistry, schema_registry
logger = logging.getLogger(__name__)
@dataclass
class EndpointDefinition:
"""Definition of an API endpoint to monitor."""
path: str
method: str = "GET"
requires_auth: bool = True
use_game_server: bool = False
params: dict = field(default_factory=dict)
description: str = ""
# Known GeoGuessr API endpoints to monitor
MONITORED_ENDPOINTS = [
# Profile endpoints
@ -121,19 +116,6 @@ MONITORED_ENDPOINTS = [
),
]
@dataclass
class MonitoringResult:
"""Result of monitoring an endpoint."""
endpoint: str
is_available: bool
response_code: int
response_time_ms: float
schema_changed: bool
error_message: Optional[str] = None
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
class EndpointMonitor:
"""
Monitors API endpoints for availability and schema changes.

View file

@ -0,0 +1,23 @@
"""
This module defines a data structure representing the result of monitoring a
network endpoint.
It contains the necessary details to capture the status and performance of
an endpoint, including its availability, response time, and any errors encountered.
"""
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Optional
@dataclass
class MonitoringResult:
"""Result of monitoring an endpoint."""
endpoint: str
is_available: bool
response_code: int
response_time_ms: float
schema_changed: bool
error_message: Optional[str] = None
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))

View file

@ -0,0 +1,97 @@
"""
Schema definitions and utility methods for managing API endpoints.
This module provides the `EndpointSchema` class, which offers the ability
to define the schema of an API endpoint, serialize and deserialize data, and
manage metadata such as response codes and availability. The class also
includes helper utilities for handling data transformations and validating
schema information.
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Any, Optional
from .SchemaField import SchemaField
logger = logging.getLogger(__name__)
@dataclass
class EndpointSchema:
"""Schema definition for an API endpoint."""
endpoint: str
method: str
fields: dict[str, SchemaField] = field(default_factory=dict)
last_updated: datetime = field(default_factory=lambda: datetime.now(UTC))
schema_hash: str = ""
response_code: int = 200
is_available: bool = True
error_message: Optional[str] = None
sample_response: Optional[dict] = None
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"endpoint": self.endpoint,
"method": self.method,
"fields": {
name: {
"name": f.name,
"field_type": f.field_type,
"nullable": f.nullable,
"nested_schema": f.nested_schema,
"example_value": self._serialize_example(f.example_value),
"description": f.description,
}
for name, f in self.fields.items()
},
"last_updated": self.last_updated.isoformat(),
"schema_hash": self.schema_hash,
"response_code": self.response_code,
"is_available": self.is_available,
"error_message": self.error_message,
"sample_response": self.sample_response,
}
@staticmethod
def _serialize_example(value: Any) -> Any:
"""Safely serialize example values."""
if isinstance(value, (str, int, float, bool, type(None))):
return value
if isinstance(value, (list, dict)):
return str(value)[:100] + "..." if len(str(value)) > 100 else value
return str(value)
@classmethod
def from_dict(cls, data: dict) -> "EndpointSchema":
"""Create from dictionary."""
fields = {}
for name, f_data in data.get("fields", {}).items():
fields[name] = SchemaField(
name=f_data["name"],
field_type=f_data["field_type"],
nullable=f_data.get("nullable", False),
nested_schema=f_data.get("nested_schema"),
example_value=f_data.get("example_value"),
description=f_data.get("description", ""),
)
last_updated = data.get("last_updated")
if isinstance(last_updated, str):
last_updated = datetime.fromisoformat(last_updated)
else:
last_updated = datetime.now(UTC)
return cls(
endpoint=data["endpoint"],
method=data.get("method", "GET"),
fields=fields,
last_updated=last_updated,
schema_hash=data.get("schema_hash", ""),
response_code=data.get("response_code", 200),
is_available=data.get("is_available", True),
error_message=data.get("error_message"),
sample_response=data.get("sample_response"),
)

View file

@ -0,0 +1,127 @@
"""
This module provides a dynamic JSON schema detection and analysis utility.
It includes functionality to infer types of various data values, analyze JSON
response structures, compute schema hashes, and identify nested schemas.
The module is particularly useful for understanding and working with
dynamic JSON datasets and detecting changes in schema over time.
"""
import hashlib
import json
import logging
from datetime import datetime
from typing import Any
from .SchemaField import SchemaField
logger = logging.getLogger(__name__)
class SchemaDetector:
"""Detects and analyzes JSON response schemas dynamically."""
@staticmethod
def detect_type(value: Any) -> str:
"""Detect the type of value."""
if value is None:
return "null"
if isinstance(value, bool):
return "boolean"
if isinstance(value, int):
return "integer"
if isinstance(value, float):
return "number"
if isinstance(value, str):
# Try to detect special string types
if SchemaDetector._is_iso_datetime(value):
return "datetime"
if SchemaDetector._is_uuid(value):
return "uuid"
if SchemaDetector._is_url(value):
return "url"
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return "unknown"
@staticmethod
def _is_iso_datetime(value: str) -> bool:
"""Check if string is ISO datetime format."""
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return True
except (ValueError, AttributeError):
return False
@staticmethod
def _is_uuid(value: str) -> bool:
"""Check if string is UUID format."""
import re
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, value.lower()))
@staticmethod
def _is_url(value: str) -> bool:
"""Check if string is URL format."""
return value.startswith(("http://", "https://"))
def analyze_response(self, data: Any, max_depth: int = 5) -> dict[str, SchemaField]:
"""
Analyze a JSON response and extract its schema.
Args:
data: The JSON response data
max_depth: Maximum depth for nested object analysis
Returns:
Dictionary mapping field names to SchemaField objects
"""
if not isinstance(data, dict):
return {}
fields = {}
self._analyze_object(data, fields, "", max_depth)
return fields
def _analyze_object(
self,
obj: dict,
fields: dict,
prefix: str,
remaining_depth: int
) -> None:
"""Recursively analyze an object and extract field information."""
if remaining_depth <= 0:
return
for key, value in obj.items():
field_name = f"{prefix}.{key}" if prefix else key
field_type = self.detect_type(value)
nested_schema = None
if field_type == "object" and isinstance(value, dict):
nested_schema = {}
self._analyze_object(value, nested_schema, "", remaining_depth - 1)
elif field_type == "array" and value and isinstance(value[0], dict):
nested_schema = {}
self._analyze_object(value[0], nested_schema, "", remaining_depth - 1)
fields[field_name] = SchemaField(
name=field_name,
field_type=field_type,
nullable=value is None,
nested_schema=nested_schema if nested_schema else None,
example_value=value,
)
@staticmethod
def compute_schema_hash(fields: dict[str, SchemaField]) -> str:
"""Compute a hash of the schema for change detection."""
schema_repr = json.dumps(
{name: (f.field_type, f.nullable) for name, f in sorted(fields.items())},
sort_keys=True
)
return hashlib.sha256(schema_repr.encode()).hexdigest()[:16]

View file

@ -0,0 +1,20 @@
"""
Represents a single field in a schema.
This module defines a dataclass that encapsulates the attributes of
a schema field, including its name, type, and other relevant metadata.
"""
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class SchemaField:
"""Represents a single field in a schema."""
name: str
field_type: str
nullable: bool = False
nested_schema: Optional[dict] = None
example_value: Any = None
description: str = ""

View file

@ -1,223 +1,29 @@
"""
Dynamic Schema Detection and Management.
Handles schema versioning, storage, and updates for API endpoints.
This module automatically detects, tracks, and adapts to changes in API response formats.
It maintains a versioned history of schemas and provides tools for the LLM to understand
the current data structure.
The module provides functionality to detect and manage changes in API response
schemas, maintain history of schema versions, and persist schema data to disk.
It supports use cases such as tracking endpoint availability, generating schema
summaries, and providing dynamic endpoint descriptions.
Classes:
SchemaRegistry
"""
import hashlib
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, UTC
from pathlib import Path
from typing import Any, Optional
from ..config import settings
from ...config import settings
from .EndpointSchema import EndpointSchema
from .SchemaDetector import SchemaDetector
logger = logging.getLogger(__name__)
@dataclass
class SchemaField:
"""Represents a single field in a schema."""
name: str
field_type: str
nullable: bool = False
nested_schema: Optional[dict] = None
example_value: Any = None
description: str = ""
@dataclass
class EndpointSchema:
"""Schema definition for an API endpoint."""
endpoint: str
method: str
fields: dict[str, SchemaField] = field(default_factory=dict)
last_updated: datetime = field(default_factory=lambda: datetime.now(UTC))
schema_hash: str = ""
response_code: int = 200
is_available: bool = True
error_message: Optional[str] = None
sample_response: Optional[dict] = None
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"endpoint": self.endpoint,
"method": self.method,
"fields": {
name: {
"name": f.name,
"field_type": f.field_type,
"nullable": f.nullable,
"nested_schema": f.nested_schema,
"example_value": self._serialize_example(f.example_value),
"description": f.description,
}
for name, f in self.fields.items()
},
"last_updated": self.last_updated.isoformat(),
"schema_hash": self.schema_hash,
"response_code": self.response_code,
"is_available": self.is_available,
"error_message": self.error_message,
"sample_response": self.sample_response,
}
@staticmethod
def _serialize_example(value: Any) -> Any:
"""Safely serialize example values."""
if isinstance(value, (str, int, float, bool, type(None))):
return value
if isinstance(value, (list, dict)):
return str(value)[:100] + "..." if len(str(value)) > 100 else value
return str(value)
@classmethod
def from_dict(cls, data: dict) -> "EndpointSchema":
"""Create from dictionary."""
fields = {}
for name, f_data in data.get("fields", {}).items():
fields[name] = SchemaField(
name=f_data["name"],
field_type=f_data["field_type"],
nullable=f_data.get("nullable", False),
nested_schema=f_data.get("nested_schema"),
example_value=f_data.get("example_value"),
description=f_data.get("description", ""),
)
last_updated = data.get("last_updated")
if isinstance(last_updated, str):
last_updated = datetime.fromisoformat(last_updated)
else:
last_updated = datetime.now(UTC)
return cls(
endpoint=data["endpoint"],
method=data.get("method", "GET"),
fields=fields,
last_updated=last_updated,
schema_hash=data.get("schema_hash", ""),
response_code=data.get("response_code", 200),
is_available=data.get("is_available", True),
error_message=data.get("error_message"),
sample_response=data.get("sample_response"),
)
class SchemaDetector:
"""Detects and analyzes JSON response schemas dynamically."""
@staticmethod
def detect_type(value: Any) -> str:
"""Detect the type of value."""
if value is None:
return "null"
if isinstance(value, bool):
return "boolean"
if isinstance(value, int):
return "integer"
if isinstance(value, float):
return "number"
if isinstance(value, str):
# Try to detect special string types
if SchemaDetector._is_iso_datetime(value):
return "datetime"
if SchemaDetector._is_uuid(value):
return "uuid"
if SchemaDetector._is_url(value):
return "url"
return "string"
if isinstance(value, list):
return "array"
if isinstance(value, dict):
return "object"
return "unknown"
@staticmethod
def _is_iso_datetime(value: str) -> bool:
"""Check if string is ISO datetime format."""
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return True
except (ValueError, AttributeError):
return False
@staticmethod
def _is_uuid(value: str) -> bool:
"""Check if string is UUID format."""
import re
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, value.lower()))
@staticmethod
def _is_url(value: str) -> bool:
"""Check if string is URL format."""
return value.startswith(("http://", "https://"))
def analyze_response(self, data: Any, max_depth: int = 5) -> dict[str, SchemaField]:
"""
Analyze a JSON response and extract its schema.
Args:
data: The JSON response data
max_depth: Maximum depth for nested object analysis
Returns:
Dictionary mapping field names to SchemaField objects
"""
if not isinstance(data, dict):
return {}
fields = {}
self._analyze_object(data, fields, "", max_depth)
return fields
def _analyze_object(
self,
obj: dict,
fields: dict,
prefix: str,
remaining_depth: int
) -> None:
"""Recursively analyze an object and extract field information."""
if remaining_depth <= 0:
return
for key, value in obj.items():
field_name = f"{prefix}.{key}" if prefix else key
field_type = self.detect_type(value)
nested_schema = None
if field_type == "object" and isinstance(value, dict):
nested_schema = {}
self._analyze_object(value, nested_schema, "", remaining_depth - 1)
elif field_type == "array" and value and isinstance(value[0], dict):
nested_schema = {}
self._analyze_object(value[0], nested_schema, "", remaining_depth - 1)
fields[field_name] = SchemaField(
name=field_name,
field_type=field_type,
nullable=value is None,
nested_schema=nested_schema if nested_schema else None,
example_value=value,
)
@staticmethod
def compute_schema_hash(fields: dict[str, SchemaField]) -> str:
"""Compute a hash of the schema for change detection."""
schema_repr = json.dumps(
{name: (f.field_type, f.nullable) for name, f in sorted(fields.items())},
sort_keys=True
)
return hashlib.sha256(schema_repr.encode()).hexdigest()[:16]
class SchemaRegistry:
"""
Manages schema storage, versioning, and change detection.