System Design
This guide explains how AtlasML is architected, how components interact, and the request/response flow through the system.
High-Level Architecture
Key Components
- FastAPI Application (
app.py): Entry point, middleware, and router registration - Routers: Handle specific API endpoints (health, competency)
- ML Pipelines: Orchestrate machine learning workflows
- Weaviate Client: Interface to the vector database
- Configuration: Environment-based settings management
- Authentication: API key validation
Request Flow Diagram
Here's what happens when a client makes a request to AtlasML:
Flow Breakdown
- Request Reception: Client sends HTTP request to FastAPI
- Middleware Processing:
RequestLoggingMiddlewarelogs request details - Authentication:
TokenValidatorchecks theAuthorizationheader - Routing: FastAPI routes to appropriate endpoint handler
- Business Logic: Router calls ML pipeline or service layer
- Database Operations: Weaviate client performs vector operations
- External API Calls: OpenAI generates embeddings (if configured)
- Response Building: Pydantic models serialize the response
- Middleware Logging: Duration and status are logged
- Response Return: Client receives JSON response
Application Initialization
Startup Sequence
When you run uvicorn atlasml.app:app, here's what happens:
Lifespan Events
The lifespan context manager in app.py handles startup and shutdown:
@asynccontextmanager
async def lifespan(app):
# Startup
logger.info("🚀 Starting AtlasML API...")
logger.info(f"🔌 Weaviate client status: {'Connected' if get_weaviate_client().is_alive() else 'Disconnected'}")
logger.info("✅ Weaviate collections initialized")
yield # Application is running
# Shutdown
logger.info("👋 Shutting down AtlasML API...")
get_weaviate_client().close()
Startup tasks:
- Check Weaviate connectivity
- Initialize collections if they don't exist
- Log system status
Shutdown tasks:
- Gracefully close Weaviate connection
- Release resources
Middleware Stack
Middleware processes all requests and responses. AtlasML uses:
1. RequestLoggingMiddleware
Located in app.py, this middleware:
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
logger.info(f"→ {request.method} {request.url.path}")
if request.method == "POST":
body = await request.body()
logger.info(f"📦 Request body: {body.decode()[:200]}")
# Process request
response = await call_next(request)
# Log response
duration = time.time() - start_time
logger.info(f"← {response.status_code} ({duration:.3f}s)")
return response
What it does:
- Logs incoming request method and path
- Logs POST request bodies (first 200 chars)
- Measures request processing time
- Logs response status and duration
Why it's useful:
- Debugging: See all API activity
- Performance: Identify slow endpoints
- Monitoring: Track API usage patterns
Execution Order
Middleware wraps the entire request/response cycle.
Dependency Injection
AtlasML uses FastAPI's dependency injection for:
1. Authentication (TokenValidator)
class TokenValidator:
def __init__(self, api_keys: List[APIKeyConfig] = Depends(get_api_keys)):
self.api_keys = api_keys
async def __call__(self, api_key: str = Depends(_get_api_key)) -> APIKeyConfig:
for key in self.api_keys:
if key.token == api_key:
return key
raise HTTPException(status_code=401, detail="Invalid API key")
How it works:
Depends(get_api_keys)injects configured API keys from settingsDepends(_get_api_key)extracts theAuthorizationheader- Validates the key against configured keys
- Raises 401 if invalid, continues if valid
Usage in routers:
@router.post("/suggest", dependencies=[Depends(TokenValidator)])
async def suggest_competencies(request: SuggestCompetencyRequest):
# Only runs if authentication succeeds
...
2. Weaviate Client (get_weaviate_client)
def get_weaviate_client(weaviate_settings: WeaviateSettings = None) -> WeaviateClient:
return WeaviateClientSingleton.get_instance(weaviate_settings)
Singleton Pattern:
- Only one Weaviate client instance is created
- Reused across all requests
- Connection pooling handled by the SDK
Why singleton?
- Efficient: Avoid reconnection overhead
- Safe: Weaviate SDK is thread-safe
- Simple: No need to manage connections
Component Interaction
Competency Suggestion Flow
Here's a detailed look at how the /api/v1/competency/suggest endpoint works:
File Locations
| Component | File |
|---|---|
| Router | atlasml/routers/competency.py |
| ML Pipeline | atlasml/ml/pipeline_workflows.py |
| Embedding Generator | atlasml/ml/embeddings.py |
| Weaviate Client | atlasml/clients/weaviate.py |
| Similarity | atlasml/ml/similarity_measures.py |
Configuration Management
Settings Hierarchy
Settings Model
class Settings(BaseModel):
api_keys: list[APIKeyConfig] # API authentication keys
weaviate: WeaviateSettings # Weaviate connection config
sentry_dsn: str | None = None # Optional Sentry DSN
env: str = "development" # Environment name
Configuration Sources
-
Environment Variables (
.envfile):ATLAS_API_KEYS=key1,key2
WEAVIATE_HOST=localhost
WEAVIATE_PORT=8085 -
Default Settings (for tests):
Settings._get_default_settings() -
SettingsProxy (global access):
from atlasml.config import settings
print(settings.weaviate.host) # "localhost"
Error Handling
Exception Flow
Custom Exception Handler
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
logger.error(f"❌ Validation error for {request.method} {request.url.path}")
logger.error(f"❌ Validation details: {exc.errors()}")
logger.error(f"❌ Request body was: {await request.body()}")
return JSONResponse(
status_code=422,
content={
"detail": exc.errors(),
"body": (await request.body()).decode()
}
)
What it does:
- Logs validation failures for debugging
- Returns detailed error information
- Includes the invalid request body
Data Flow Architecture
Write Operations
Read Operations
Scalability Considerations
Current Architecture
- Single Instance: One FastAPI process
- Singleton Client: One Weaviate connection per instance
- Synchronous ML: Embeddings generated on request
Scaling Options
-
Horizontal Scaling:
- Run multiple FastAPI instances
- Load balancer distributes requests
- Weaviate handles concurrent connections
-
Async Operations:
- Use async OpenAI client
- Background tasks for long operations
- Celery for distributed task queue
-
Caching:
- Redis for embedding cache
- Reduce API calls to OpenAI
- Faster response times
Security Architecture
Authentication Flow
Security Layers
-
API Key Authentication:
- Simple token-based auth
- Keys configured in environment
- No user sessions or cookies
-
Input Validation:
- Pydantic models validate all inputs
- Type checking at runtime
- Prevent injection attacks
-
CORS (if needed):
- Configure allowed origins
- Restrict cross-origin requests
API keys are transmitted in plaintext headers. Always use HTTPS in production to encrypt transmission.
Monitoring & Observability
Logging Strategy
# Different log levels
logger.info("Normal operations") # General info
logger.warning("Potential issues") # Warnings
logger.error("Errors occurred") # Errors
logger.debug("Detailed debugging") # Debug mode only
What Gets Logged
- Startup/Shutdown: Application lifecycle
- Requests: Method, path, body (POST)
- Responses: Status code, duration
- Errors: Exception details, stack traces
- Weaviate: Connection status, query info
- ML: Embedding generation, similarity scores
Sentry Integration
When ENV=production and SENTRY_DSN is set:
sentry_sdk.init(
dsn=settings.sentry_dsn,
environment=settings.env,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
)
What Sentry captures:
- Unhandled exceptions
- Error traces
- Performance data
- Request context
Next Steps
Now that you understand the architecture:
- Modules Reference: Dive deep into each code module
- REST API Framework: Learn about FastAPI patterns
- Middleware: Understand request processing
- Weaviate Integration: Master the vector database
Use the FastAPI documentation to explore the live API while reading these docs!