-
Notifications
You must be signed in to change notification settings - Fork 0
🔗 Webhook Orchestrator FastAPI Service - Advanced GitHub Event Processing #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
🔗 Webhook Orchestrator FastAPI Service - Advanced GitHub Event Processing #8
Conversation
- Implement robust webhook processing with GitHub signature verification - Add advanced retry logic with exponential backoff and circuit breakers - Include comprehensive monitoring with Prometheus metrics and Jaeger tracing - Provide scalable background task processing with Celery and Redis - Add complete Docker containerization and Kubernetes deployment configs - Include extensive API documentation and deployment guides - Implement security features including rate limiting and replay protection - Add comprehensive test suite with unit and integration tests - Provide production-ready configuration with health checks and monitoring
Reviewer's GuideThis PR delivers a production-grade FastAPI webhook orchestrator that consumes GitHub webhooks, persists events, spawns Celery background workflows for PRs/checks/pushes, integrates Codegen triggers, and provides comprehensive security, retry/circuit-breaker resilience, structured logging, distributed tracing, and Prometheus monitoring, along with full containerization, Kubernetes manifests, documentation, and tests. Sequence Diagram for GitHub Webhook Ingestion and Task CreationsequenceDiagram
actor GitHub
participant Nginx
participant FastAPI_Service as FastAPI Service
participant WebhookValidator as Webhook Validator
participant GitHubWebhookHandler as GitHub Webhook Handler
participant PostgreSQL_DB as PostgreSQL DB
participant Celery_Redis as Celery (Redis)
GitHub->>Nginx: HTTP POST /webhooks/github (Webhook Payload)
Nginx->>FastAPI_Service: HTTP POST /webhooks/github
FastAPI_Service->>WebhookValidator: validate_webhook(request)
WebhookValidator-->>FastAPI_Service: headers, payload
FastAPI_Service->>GitHubWebhookHandler: handle_webhook(headers, payload, session)
GitHubWebhookHandler->>PostgreSQL_DB: _store_webhook_event() INSERT WebhookEvent
PostgreSQL_DB-->>GitHubWebhookHandler: stored WebhookEvent
GitHubWebhookHandler->>GitHubWebhookHandler: _handle_EVENT_TYPE() (e.g., _handle_pull_request)
GitHubWebhookHandler->>PostgreSQL_DB: _create_workflow_task() INSERT WorkflowTask
PostgreSQL_DB-->>GitHubWebhookHandler: created WorkflowTask
GitHubWebhookHandler->>Celery_Redis: EVENT_TASK.delay(task_id, payload)
Celery_Redis-->>GitHubWebhookHandler: Async Task Enqueued
GitHubWebhookHandler-->>FastAPI_Service: result (e.g., {"status": "queued"})
FastAPI_Service-->>Nginx: HTTP 200 OK
Nginx-->>GitHub: HTTP 200 OK
Sequence Diagram for Celery Pull Request Event ProcessingsequenceDiagram
participant CeleryWorker as Celery Worker
participant WorkflowTaskProcessor
participant PostgreSQL_DB as PostgreSQL DB
participant GitHub_API as GitHub API
participant Celery_Redis as Celery (Redis)
CeleryWorker->>WorkflowTaskProcessor: process_pull_request_event(task_id, payload)
WorkflowTaskProcessor->>PostgreSQL_DB: update_task_status(task_id, "running")
WorkflowTaskProcessor->>PostgreSQL_DB: create_task_execution(...)
WorkflowTaskProcessor->>GitHub_API: get_repo(repo_name)
GitHub_API-->>WorkflowTaskProcessor: repo object
WorkflowTaskProcessor->>GitHub_API: repo.get_pull(pr_number)
GitHub_API-->>WorkflowTaskProcessor: pr object
WorkflowTaskProcessor->>WorkflowTaskProcessor: analysis_result = analyze_pr_changes(pr, action)
alt analysis_result.should_trigger_codegen is true
WorkflowTaskProcessor->>Celery_Redis: trigger_codegen_agent.delay(task_id, repo, pr_num, analysis_result)
Celery_Redis-->>WorkflowTaskProcessor: Async Codegen Task Enqueued
WorkflowTaskProcessor->>PostgreSQL_DB: update WorkflowTask (set codegen_task_id from celery result)
end
WorkflowTaskProcessor->>PostgreSQL_DB: update_task_status(task_id, "completed", result)
WorkflowTaskProcessor->>PostgreSQL_DB: update_task_execution(...)
Sequence Diagram for Celery Codegen Agent Trigger TasksequenceDiagram
participant CeleryWorker_Codegen as Celery Worker (Codegen Trigger Task)
participant WorkflowTaskProcessor
participant CodegenAgent_API as Codegen Agent API
participant PostgreSQL_DB as PostgreSQL DB
CeleryWorker_Codegen->>WorkflowTaskProcessor: trigger_codegen_agent(parent_task_id, repo, pr_num, analysis_data)
WorkflowTaskProcessor->>WorkflowTaskProcessor: prompt = generate_codegen_prompt(...)
WorkflowTaskProcessor->>CodegenAgent_API: agent.run(prompt)
CodegenAgent_API-->>WorkflowTaskProcessor: codegen_task_info (id, web_url)
WorkflowTaskProcessor->>PostgreSQL_DB: Update parent WorkflowTask (task_id=parent_task_id) with codegen_task_id, codegen_task_url
PostgreSQL_DB-->>WorkflowTaskProcessor: Success
Entity Relationship Diagram for Webhook Orchestrator DatabaseerDiagram
WebhookEvent {
int id PK
string delivery_id
string event_type
json payload
json headers
bool processed
datetime created_at
datetime updated_at
string error_message
string signature
datetime processing_started_at
datetime processing_completed_at
int retry_count
}
WorkflowTask {
int id PK
string task_id UK
int webhook_event_id FK
string task_type
string status
string repository
int pr_number
datetime created_at
datetime updated_at
datetime started_at
datetime completed_at
string error_message
string codegen_task_id
string codegen_task_url
string branch
string commit_sha
json config
json input_data
json output_data
int retry_count
int max_retries
}
TaskExecution {
int id PK
string task_id FK
string execution_id UK
string status
datetime started_at
datetime completed_at
int duration_ms
string error_message
string worker_id
string queue_name
json result
}
SystemMetrics {
int id PK
string metric_name
json metric_value
json labels
datetime timestamp
}
WebhookEvent ||--o{ WorkflowTask : "triggers"
WorkflowTask ||--o{ TaskExecution : "has"
Class Diagram for GitHubWebhookHandler (app/webhooks/github_handler.py)classDiagram
class GitHubWebhookHandler {
+WebhookLoggerMixin
event_handlers: dict
+handle_webhook(headers: WebhookHeaders, payload: dict, session: AsyncSession) dict
-_store_webhook_event(headers: WebhookHeaders, payload: dict, session: AsyncSession) WebhookEvent
-_handle_pull_request(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_check_run(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_check_suite(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_push(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_ping(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_installation(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_handle_installation_repositories(webhook_event: WebhookEvent, payload: dict, session: AsyncSession) dict
-_create_workflow_task(webhook_event: WebhookEvent, task_type: str, repository: Optional_str_, pr_number: Optional_int_, branch: Optional_str_, commit_sha: Optional_str_, config: Optional_dict_, input_data: Optional_dict_, session: AsyncSession) WorkflowTask
}
WebhookLoggerMixin <|-- GitHubWebhookHandler
Class Diagram for WorkflowTaskProcessor and Background Tasks Module (app/tasks/workflow_tasks.py)classDiagram
class WorkflowTaskProcessor {
+WebhookLoggerMixin
github_client: Github
codegen_client: Agent
+get_github_client() Github
+get_codegen_client() Agent
+get_db_session() Session
+update_task_status(task_id: str, status: str, output_data: Optional_dict_, error_message: Optional_str_)
+create_task_execution(task_id: str, execution_id: str, status: str, result: Optional_dict_, error_message: Optional_str_)
}
WebhookLoggerMixin <|-- WorkflowTaskProcessor
class BackgroundTasksModule {
<<Module>>
+process_pull_request_event(task_id: str, payload: dict) CeleryTask
+process_check_run_event(task_id: str, payload: dict) CeleryTask
+process_check_suite_event(task_id: str, payload: dict) CeleryTask
+process_push_event(task_id: str, payload: dict) CeleryTask
+trigger_codegen_agent(parent_task_id: str, repository: str, pr_number: Optional_int_, analysis_data: dict) CeleryTask
+cleanup_old_tasks() CeleryTask
+health_check() CeleryTask
+collect_metrics() CeleryTask
+analyze_pr_changes(pr: PullRequest, action: str) dict
+analyze_check_failure(repo: Repository, check_run_id: int, check_name: str, conclusion: str) dict
+analyze_check_suite_failure(repo: Repository, check_suite_id: int, conclusion: str) dict
+analyze_push_changes(commits: list, ref: str) dict
+generate_codegen_prompt(repository: str, pr_number: Optional_int_, analysis_data: dict) str
}
Class Diagram for API Endpoint Pydantic Models (app/api/endpoints.py)classDiagram
class WebhookEventResponse {
+id: int
+delivery_id: str
+event_type: str
+processed: bool
+created_at: datetime
+error_message: Optional_str_
}
class WorkflowTaskResponse {
+id: int
+task_id: str
+task_type: str
+status: str
+repository: Optional_str_
+pr_number: Optional_int_
+created_at: datetime
+completed_at: Optional_datetime_
+error_message: Optional_str_
+codegen_task_url: Optional_str_
}
class TaskExecutionResponse {
+id: int
+execution_id: str
+status: str
+started_at: datetime
+completed_at: Optional_datetime_
+duration_ms: Optional_int_
+error_message: Optional_str_
}
class HealthCheckResponse {
+status: str
+timestamp: datetime
+checks: Dict_str_str_
+uptime_seconds: Optional_float_
}
class MetricsResponse {
+webhook_events_total: int
+workflow_tasks_total: int
+task_executions_total: int
+tasks_by_status: Dict_str_int_
+events_by_type: Dict_str_int_
+avg_processing_time_ms: Optional_float_
}
Class Diagram for Monitoring Components (app/api/monitoring.py)classDiagram
class MetricsCollector {
+start_time: float
+_setup_system_info()
+record_webhook_event(event_type: str, source: str)
+record_webhook_processing(event_type: str, duration: float, status: str)
+record_webhook_error(event_type: str, error_type: str)
+record_workflow_task(task_type: str)
+record_workflow_task_completion(task_type: str, duration: float, status: str)
+record_workflow_task_retry(task_type: str)
+update_queue_size(queue_name: str, size: int)
+update_active_workers(count: int)
+update_database_connections(count: int)
+record_github_api_request(endpoint: str, status: str)
+update_github_rate_limit(remaining: int)
+record_codegen_api_request(endpoint: str, status: str)
+record_codegen_agent_task(task_type: str)
+get_uptime() float
}
class HealthChecker {
checks: dict
last_check_time: dict
check_cache_duration: int
+register_check(name: str, check_func: callable, cache_duration: int)
+run_check(name: str) dict
+run_all_checks() dict
}
class MonitoringSetupModule {
<<Module>>
+setup_tracing()
+setup_instrumentation(app: FastAPI)
+trace_operation(operation_name: str, attributes: dict) ContextManager
+get_prometheus_metrics() str
+prometheus_metrics_response() Response
}
class PrometheusMetricDefinitions {
<<Service>>
webhook_events_total: Counter
webhook_processing_duration: Histogram
webhook_errors_total: Counter
workflow_tasks_total: Counter
workflow_task_duration: Histogram
workflow_task_retries_total: Counter
celery_queue_size: Gauge
celery_active_workers: Gauge
system_info: Info
database_connections: Gauge
github_api_requests_total: Counter
github_api_rate_limit_remaining: Gauge
codegen_api_requests_total: Counter
codegen_agent_tasks_total: Counter
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the You can disable this status message by setting the 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Join our Discord community for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
🔗 Webhook Orchestrator FastAPI Service
🎯 Overview
This PR implements a comprehensive, production-ready webhook orchestrator built with FastAPI that handles GitHub events, coordinates workflow execution, and manages communication between system components. The service includes advanced features for security, resilience, observability, and scalability.
✨ Key Features
🔒 Advanced Security
🔄 Resilience & Reliability
📊 Observability & Monitoring
⚡ Performance & Scalability
🏗️ Architecture
The service follows a microservices architecture with clear separation of concerns:
📁 Project Structure
🔧 Implementation Highlights
Webhook Processing
Task Management
Monitoring & Observability
Security Features
🚀 Deployment Options
1. Docker Compose (Quick Start)
cd webhook-orchestrator/docker docker-compose up -d2. Kubernetes (Production)
3. Manual Setup (Development)
📊 Monitoring & Dashboards
The service includes comprehensive monitoring with:
🔗 Integration Points
GitHub Integration
Codegen Integration
Database Integration
🧪 Testing
Comprehensive test suite including:
📈 Scalability Recommendations
Horizontal Scaling
Performance Optimization
Load Balancing
🔒 Security Considerations
Production Security
Monitoring Security
📚 Documentation
🎯 Benefits for the Project
🔄 Next Steps
After this PR is merged, the webhook orchestrator can be:
This implementation provides a solid foundation for the AI-powered development workflow system described in the parent issue (ZAM-510), serving as the central event hub that coordinates all workflow execution across multiple agents and services.
🔗 Related Issues
Ready for review and deployment! 🚀
💻 View my work • About Codegen
Summary by Sourcery
Add a full-featured Webhook Orchestrator FastAPI service to handle GitHub events, coordinate background workflows with Celery, integrate Codegen agents for code generation and fixes, and provide robust security, resilience, observability and deployment configurations.
New Features:
Enhancements:
Build:
CI:
Deployment:
Documentation:
Tests: