diff --git a/CHANGELOG.md b/CHANGELOG.md index 567d746..628fe70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,16 @@ All notable changes to the KernelCI Staging Control application will be documented in this file. -## [Unreleased] +## [0.2.0] + +### Added + +- **Scheduler**: Added staging scheduler that will run staging at 0:00UTC 8:00UTC,16:00UTC + - Will be run using fastapi-crons + - Condition: If staging has run less than 1 hour ago - skip run + - Appears dashboard as user run, under "virtual" user "scheduler" + +## [0.1.0] ### Added - **Staging Run Cancellation**: Added ability to cancel running staging runs diff --git a/database.py b/database.py index 9dc1bf9..cfa61f2 100644 --- a/database.py +++ b/database.py @@ -12,6 +12,7 @@ DEFAULT_ADMIN_EMAIL, SETTINGS_KEYS, ) +from scheduler_user import ensure_scheduler_user SQLALCHEMY_DATABASE_URL = DATABASE_URL @@ -66,6 +67,13 @@ def run_migrations(): print("Database migration check completed") + # Ensure scheduler user exists for legacy databases migrated without init_db + db = SessionLocal() + try: + ensure_scheduler_user(db) + finally: + db.close() + def init_db(): """Initialize database and create default admin user""" @@ -94,6 +102,8 @@ def init_db(): db.commit() print("Default admin user created") + ensure_scheduler_user(db) + # Create default settings for setting_name, setting_key in SETTINGS_KEYS.items(): setting = db.query(Settings).filter(Settings.key == setting_key).first() diff --git a/deployment_manager.py b/deployment_manager.py index 43d2f65..a474558 100644 --- a/deployment_manager.py +++ b/deployment_manager.py @@ -440,3 +440,62 @@ async def docker_workaround(self) -> Dict[str, Any]: ) return {"success": True, "results": results} + + async def restart_trigger_service(self) -> Dict[str, Any]: + """ + Restart the trigger service in the pipeline + Returns: {"success": bool, "error": str, "details": dict} + """ + result = { + "success": False, + "error": None, + "details": {"start_time": datetime.utcnow().isoformat()}, + } + + try: + # Set environment variable for pipeline settings + env = os.environ.copy() + env["SETTINGS"] = PIPELINE_SETTINGS_PATH + + # Restart the trigger service using docker-compose + cmd = ["docker-compose"] + + # Add compose files if configured + if self.compose_files: + cmd.extend(self.compose_files) + + # Add restart command for trigger service + cmd.extend(["restart", "trigger"]) + + print(f"Restarting trigger service with command: {' '.join(cmd)}") + + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + cwd=self.pipeline_path, + ) + + stdout, stderr = await process.communicate() + + if process.returncode == 0: + result["success"] = True + result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore") + result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore") + result["details"]["end_time"] = datetime.utcnow().isoformat() + print("Successfully restarted trigger service") + else: + result["error"] = ( + f"Failed to restart trigger service: {stderr.decode('utf-8', errors='ignore')}" + ) + result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore") + result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore") + print(f"Failed to restart trigger service: {result['error']}") + + except Exception as e: + result["error"] = f"Exception restarting trigger service: {str(e)}" + result["details"]["exception"] = str(e) + print(f"Exception restarting trigger service: {e}") + + return result diff --git a/main.py b/main.py index 8fb5f56..ca6c0cb 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ from sqlalchemy.orm import Session from pydantic import BaseModel import uvicorn +from fastapi_crons import Crons from config import ( APP_TITLE, @@ -63,6 +64,7 @@ validate_single_running_staging, enforce_single_running_staging, ) +from scheduler import register_cron_jobs # Pydantic models for API requests @@ -115,6 +117,8 @@ async def lifespan(app: FastAPI): # Create app with lifespan app = FastAPI(title=APP_TITLE, lifespan=lifespan) +crons = Crons(app) +register_cron_jobs(crons) # Mount static files app.mount("/static", StaticFiles(directory="templates"), name="static") @@ -974,5 +978,63 @@ async def get_staging_status( } +@app.get("/api/staging/check-workflow") +async def check_workflow_status( + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """Check if GitHub workflow is already running""" + # Only allow admin/maintainer to check workflow status + if current_user.role not in [UserRole.ADMIN, UserRole.MAINTAINER]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" + ) + + # Check if GitHub token is configured + github_token = get_setting(GITHUB_TOKEN) + if not github_token: + return { + "can_trigger": False, + "reason": "GitHub token not configured", + "running_workflows": [], + } + + try: + from github_integration import GitHubWorkflowManager + + github_manager = GitHubWorkflowManager(github_token) + running_workflows = await github_manager.get_running_workflows() + + if running_workflows: + return { + "can_trigger": False, + "reason": f"GitHub workflow is already running ({len(running_workflows)} active)", + "running_workflows": running_workflows, + } + + # Also check if there's a running staging in database + running_staging = validate_single_running_staging(db) + if running_staging: + return { + "can_trigger": False, + "reason": f"Staging run #{running_staging.id} is already running", + "running_workflows": [], + } + + return { + "can_trigger": True, + "reason": "Ready to trigger new staging run", + "running_workflows": [], + } + + except Exception as e: + print(f"Error checking workflow status: {e}") + return { + "can_trigger": False, + "reason": f"Error checking workflow status: {str(e)}", + "running_workflows": [], + } + + if __name__ == "__main__": uvicorn.run(app, host=HOST, port=PORT) diff --git a/models.py b/models.py index a74303f..928b801 100644 --- a/models.py +++ b/models.py @@ -32,6 +32,7 @@ class StagingStepType(str, enum.Enum): KERNEL_TREE_UPDATE = "kernel_tree_update" API_PIPELINE_UPDATE = "api_pipeline_update" MONITORING_SETUP = "monitoring_setup" + TRIGGER_RESTART = "trigger_restart" class StagingRunStatus(str, enum.Enum): diff --git a/orchestrator.py b/orchestrator.py index 9762ad2..221a54e 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -137,11 +137,15 @@ async def initialize_staging_steps(self, staging_run: StagingRun, db: Session): steps.extend( [ - {"type": StagingStepType.KERNEL_TREE_UPDATE, "order": order_counter}, { "type": StagingStepType.API_PIPELINE_UPDATE, + "order": order_counter, + }, + { + "type": StagingStepType.KERNEL_TREE_UPDATE, "order": order_counter + 1, }, + {"type": StagingStepType.TRIGGER_RESTART, "order": order_counter + 2}, ] ) @@ -221,6 +225,8 @@ async def process_step( await self.process_kernel_tree_step(staging_run, step, db) elif step.step_type == StagingStepType.API_PIPELINE_UPDATE: await self.process_api_pipeline_step(staging_run, step, db) + elif step.step_type == StagingStepType.TRIGGER_RESTART: + await self.process_trigger_restart_step(staging_run, step, db) async def process_github_workflow_step( self, staging_run: StagingRun, step: StagingRunStep, db: Session @@ -441,6 +447,36 @@ async def process_api_pipeline_step( step.end_time = datetime.utcnow() db.commit() + async def process_trigger_restart_step( + self, staging_run: StagingRun, step: StagingRunStep, db: Session + ): + """Process trigger restart step - restart the trigger service in pipeline""" + if step.status == StagingStepStatus.PENDING: + step.status = StagingStepStatus.RUNNING + step.start_time = datetime.utcnow() + staging_run.current_step = "trigger_restart" + db.commit() + + try: + # Restart the trigger service in the pipeline + result = await self.deployment_manager.restart_trigger_service() + + step.end_time = datetime.utcnow() + if result["success"]: + step.status = StagingStepStatus.COMPLETED + step.details = json.dumps(result) + else: + step.status = StagingStepStatus.FAILED + step.error_message = result.get("error", "Unknown error") + + db.commit() + + except Exception as e: + step.status = StagingStepStatus.FAILED + step.error_message = str(e) + step.end_time = datetime.utcnow() + db.commit() + async def complete_staging_run( self, staging_run: StagingRun, db: Session, success: bool ): @@ -508,6 +544,8 @@ async def recover_stuck_steps(self, staging_run: StagingRun, db: Session): timeout_minutes = 15 # Git operations elif step.step_type == StagingStepType.SELF_UPDATE: timeout_minutes = 10 # Quick git pull + elif step.step_type == StagingStepType.TRIGGER_RESTART: + timeout_minutes = 5 # Quick docker restart if running_duration.total_seconds() > (timeout_minutes * 60): print( @@ -576,6 +614,7 @@ async def startup_recovery(self): StagingStepType.API_PIPELINE_UPDATE, StagingStepType.KERNEL_TREE_UPDATE, StagingStepType.SELF_UPDATE, + StagingStepType.TRIGGER_RESTART, ]: print( f"Recovering stuck local step: {step.step_type.value}" diff --git a/requirements.txt b/requirements.txt index 06267f8..27acfdd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ passlib[bcrypt]==1.7.4 aiofiles==23.2.1 httpx==0.25.2 toml +fastapi-crons==2.0.1 diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..97b2c11 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,116 @@ +"""Background scheduler setup for automatic staging runs.""" + +import logging +from datetime import datetime, timedelta + +from fastapi_crons import Crons + +from database import SessionLocal +from db_constraints import ( + validate_single_running_staging, + enforce_single_running_staging, +) +from discord_webhook import discord_webhook +from models import ( + InitiatedVia, + StagingRun, + StagingRunStatus, +) +from scheduler_config import ( + SCHEDULER_AUTO_KERNEL_TREE, + SCHEDULER_SKIP_WINDOW_SECONDS, + SCHEDULER_USERNAME, +) +from scheduler_user import ensure_scheduler_user + +logger = logging.getLogger(__name__) + +CRON_EXPRESSION = "0 0,8,16 * * *" # 00:00, 08:00, 16:00 UTC + + +async def _run_scheduled_staging() -> None: + """Execute a staging run if cooldown and concurrency constraints allow it.""" + db = SessionLocal() + try: + scheduler_user = ensure_scheduler_user(db) + + now = datetime.utcnow() + cooldown_start = now - timedelta(seconds=SCHEDULER_SKIP_WINDOW_SECONDS) + + # Skip if another user triggered a run recently + recent_manual_run = ( + db.query(StagingRun) + .filter(StagingRun.start_time >= cooldown_start) + .filter(StagingRun.user_id != scheduler_user.id) + .order_by(StagingRun.start_time.desc()) + .first() + ) + if recent_manual_run: + logger.info( + "Skipping scheduled staging: recent run #%s by %s at %s", + recent_manual_run.id, + recent_manual_run.user.username, + recent_manual_run.start_time, + ) + return + + # Skip if a run is in progress + running_staging = validate_single_running_staging(db) + if running_staging: + logger.info( + "Skipping scheduled staging: run #%s is currently %s", + running_staging.id, + running_staging.status.value, + ) + return + + staging_run = StagingRun( + user_id=scheduler_user.id, + status=StagingRunStatus.RUNNING, + initiated_via=InitiatedVia.CRON, + kernel_tree=SCHEDULER_AUTO_KERNEL_TREE, + ) + db.add(staging_run) + db.flush() + + if not enforce_single_running_staging(db, staging_run.id): + logger.warning( + "Scheduled staging run #%s cancelled due to concurrency enforcement", + staging_run.id, + ) + db.rollback() + return + + db.commit() + db.refresh(staging_run) + logger.info("Scheduled staging run #%s started", staging_run.id) + + logger.info("Using virtual scheduler user '%s'", SCHEDULER_USERNAME) + + if discord_webhook: + try: + await discord_webhook.send_staging_start( + SCHEDULER_USERNAME, staging_run.id + ) + except Exception as exc: + logger.warning( + "Discord notification failed for scheduler run #%s: %s", + staging_run.id, + exc, + ) + + except Exception as exc: + logger.error("Scheduler job failed: %s", exc) + db.rollback() + finally: + db.close() + + +def register_cron_jobs(crons: Crons) -> None: + """Attach scheduled staging jobs to the provided cron scheduler.""" + + @crons.cron(CRON_EXPRESSION, name="staging_scheduler") + async def scheduled_staging_job(): + await _run_scheduled_staging() + + logger.info("Registered scheduled staging job for expression '%s'", CRON_EXPRESSION) diff --git a/scheduler_config.py b/scheduler_config.py new file mode 100644 index 0000000..c0b5d7e --- /dev/null +++ b/scheduler_config.py @@ -0,0 +1,5 @@ +"""Configuration constants for the built-in staging scheduler.""" + +SCHEDULER_USERNAME = "scheduler" +SCHEDULER_AUTO_KERNEL_TREE = "auto" +SCHEDULER_SKIP_WINDOW_SECONDS = 3600 # 1 hour cool-down window diff --git a/scheduler_user.py b/scheduler_user.py new file mode 100644 index 0000000..ed65469 --- /dev/null +++ b/scheduler_user.py @@ -0,0 +1,28 @@ +"""Helpers for managing the virtual scheduler user account.""" + +import secrets +from sqlalchemy.orm import Session + +from models import User, UserRole +from scheduler_config import SCHEDULER_USERNAME + + +def ensure_scheduler_user(db: Session) -> User: + """Return the scheduler user, creating it with a strong random password if missing.""" + user = db.query(User).filter(User.username == SCHEDULER_USERNAME).first() + if user: + return user + + from auth import get_password_hash # Imported lazily to avoid circular dependency + + random_secret = secrets.token_urlsafe(64) + user = User( + username=SCHEDULER_USERNAME, + password_hash=get_password_hash(random_secret), + role=UserRole.MAINTAINER, + email=None, + ) + db.add(user) + db.commit() + db.refresh(user) + return user diff --git a/templates/dashboard.html b/templates/dashboard.html index 738a8fc..89e4c2f 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -177,7 +177,7 @@