Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@

All notable changes to the KernelCI Staging Control application will be documented in this file.

## [Unreleased]
## [0.2.0]

### Added

- **Scheduler**: Added staging scheduler that will run staging at 0:00UTC 8:00UTC,16:00UTC
- Will be run using fastapi-crons
- Condition: If staging has run less than 1 hour ago - skip run
- Appears dashboard as user run, under "virtual" user "scheduler"

## [0.1.0]

### Added
- **Staging Run Cancellation**: Added ability to cancel running staging runs
Expand Down
10 changes: 10 additions & 0 deletions database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEFAULT_ADMIN_EMAIL,
SETTINGS_KEYS,
)
from scheduler_user import ensure_scheduler_user

SQLALCHEMY_DATABASE_URL = DATABASE_URL

Expand Down Expand Up @@ -66,6 +67,13 @@ def run_migrations():

print("Database migration check completed")

# Ensure scheduler user exists for legacy databases migrated without init_db
db = SessionLocal()
try:
ensure_scheduler_user(db)
finally:
db.close()


def init_db():
"""Initialize database and create default admin user"""
Expand Down Expand Up @@ -94,6 +102,8 @@ def init_db():
db.commit()
print("Default admin user created")

ensure_scheduler_user(db)

# Create default settings
for setting_name, setting_key in SETTINGS_KEYS.items():
setting = db.query(Settings).filter(Settings.key == setting_key).first()
Expand Down
59 changes: 59 additions & 0 deletions deployment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,62 @@ async def docker_workaround(self) -> Dict[str, Any]:
)

return {"success": True, "results": results}

async def restart_trigger_service(self) -> Dict[str, Any]:
"""
Restart the trigger service in the pipeline
Returns: {"success": bool, "error": str, "details": dict}
"""
result = {
"success": False,
"error": None,
"details": {"start_time": datetime.utcnow().isoformat()},
}

try:
# Set environment variable for pipeline settings
env = os.environ.copy()
env["SETTINGS"] = PIPELINE_SETTINGS_PATH

# Restart the trigger service using docker-compose
cmd = ["docker-compose"]

# Add compose files if configured
if self.compose_files:
cmd.extend(self.compose_files)

# Add restart command for trigger service
cmd.extend(["restart", "trigger"])

print(f"Restarting trigger service with command: {' '.join(cmd)}")

process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=self.pipeline_path,
)

stdout, stderr = await process.communicate()

if process.returncode == 0:
result["success"] = True
result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore")
result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore")
result["details"]["end_time"] = datetime.utcnow().isoformat()
print("Successfully restarted trigger service")
else:
result["error"] = (
f"Failed to restart trigger service: {stderr.decode('utf-8', errors='ignore')}"
)
result["details"]["stdout"] = stdout.decode("utf-8", errors="ignore")
result["details"]["stderr"] = stderr.decode("utf-8", errors="ignore")
print(f"Failed to restart trigger service: {result['error']}")

except Exception as e:
result["error"] = f"Exception restarting trigger service: {str(e)}"
result["details"]["exception"] = str(e)
print(f"Exception restarting trigger service: {e}")

return result
62 changes: 62 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sqlalchemy.orm import Session
from pydantic import BaseModel
import uvicorn
from fastapi_crons import Crons

from config import (
APP_TITLE,
Expand Down Expand Up @@ -63,6 +64,7 @@
validate_single_running_staging,
enforce_single_running_staging,
)
from scheduler import register_cron_jobs


# Pydantic models for API requests
Expand Down Expand Up @@ -115,6 +117,8 @@ async def lifespan(app: FastAPI):

# Create app with lifespan
app = FastAPI(title=APP_TITLE, lifespan=lifespan)
crons = Crons(app)
register_cron_jobs(crons)

# Mount static files
app.mount("/static", StaticFiles(directory="templates"), name="static")
Expand Down Expand Up @@ -974,5 +978,63 @@ async def get_staging_status(
}


@app.get("/api/staging/check-workflow")
async def check_workflow_status(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Check if GitHub workflow is already running"""
# Only allow admin/maintainer to check workflow status
if current_user.role not in [UserRole.ADMIN, UserRole.MAINTAINER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions"
)

# Check if GitHub token is configured
github_token = get_setting(GITHUB_TOKEN)
if not github_token:
return {
"can_trigger": False,
"reason": "GitHub token not configured",
"running_workflows": [],
}

try:
from github_integration import GitHubWorkflowManager

github_manager = GitHubWorkflowManager(github_token)
running_workflows = await github_manager.get_running_workflows()

if running_workflows:
return {
"can_trigger": False,
"reason": f"GitHub workflow is already running ({len(running_workflows)} active)",
"running_workflows": running_workflows,
}

# Also check if there's a running staging in database
running_staging = validate_single_running_staging(db)
if running_staging:
return {
"can_trigger": False,
"reason": f"Staging run #{running_staging.id} is already running",
"running_workflows": [],
}

return {
"can_trigger": True,
"reason": "Ready to trigger new staging run",
"running_workflows": [],
}

except Exception as e:
print(f"Error checking workflow status: {e}")
return {
"can_trigger": False,
"reason": f"Error checking workflow status: {str(e)}",
"running_workflows": [],
}


if __name__ == "__main__":
uvicorn.run(app, host=HOST, port=PORT)
1 change: 1 addition & 0 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class StagingStepType(str, enum.Enum):
KERNEL_TREE_UPDATE = "kernel_tree_update"
API_PIPELINE_UPDATE = "api_pipeline_update"
MONITORING_SETUP = "monitoring_setup"
TRIGGER_RESTART = "trigger_restart"


class StagingRunStatus(str, enum.Enum):
Expand Down
41 changes: 40 additions & 1 deletion orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,15 @@ async def initialize_staging_steps(self, staging_run: StagingRun, db: Session):

steps.extend(
[
{"type": StagingStepType.KERNEL_TREE_UPDATE, "order": order_counter},
{
"type": StagingStepType.API_PIPELINE_UPDATE,
"order": order_counter,
},
{
"type": StagingStepType.KERNEL_TREE_UPDATE,
"order": order_counter + 1,
},
{"type": StagingStepType.TRIGGER_RESTART, "order": order_counter + 2},
]
)

Expand Down Expand Up @@ -221,6 +225,8 @@ async def process_step(
await self.process_kernel_tree_step(staging_run, step, db)
elif step.step_type == StagingStepType.API_PIPELINE_UPDATE:
await self.process_api_pipeline_step(staging_run, step, db)
elif step.step_type == StagingStepType.TRIGGER_RESTART:
await self.process_trigger_restart_step(staging_run, step, db)

async def process_github_workflow_step(
self, staging_run: StagingRun, step: StagingRunStep, db: Session
Expand Down Expand Up @@ -441,6 +447,36 @@ async def process_api_pipeline_step(
step.end_time = datetime.utcnow()
db.commit()

async def process_trigger_restart_step(
self, staging_run: StagingRun, step: StagingRunStep, db: Session
):
"""Process trigger restart step - restart the trigger service in pipeline"""
if step.status == StagingStepStatus.PENDING:
step.status = StagingStepStatus.RUNNING
step.start_time = datetime.utcnow()
staging_run.current_step = "trigger_restart"
db.commit()

try:
# Restart the trigger service in the pipeline
result = await self.deployment_manager.restart_trigger_service()

step.end_time = datetime.utcnow()
if result["success"]:
step.status = StagingStepStatus.COMPLETED
step.details = json.dumps(result)
else:
step.status = StagingStepStatus.FAILED
step.error_message = result.get("error", "Unknown error")

db.commit()

except Exception as e:
step.status = StagingStepStatus.FAILED
step.error_message = str(e)
step.end_time = datetime.utcnow()
db.commit()

async def complete_staging_run(
self, staging_run: StagingRun, db: Session, success: bool
):
Expand Down Expand Up @@ -508,6 +544,8 @@ async def recover_stuck_steps(self, staging_run: StagingRun, db: Session):
timeout_minutes = 15 # Git operations
elif step.step_type == StagingStepType.SELF_UPDATE:
timeout_minutes = 10 # Quick git pull
elif step.step_type == StagingStepType.TRIGGER_RESTART:
timeout_minutes = 5 # Quick docker restart

if running_duration.total_seconds() > (timeout_minutes * 60):
print(
Expand Down Expand Up @@ -576,6 +614,7 @@ async def startup_recovery(self):
StagingStepType.API_PIPELINE_UPDATE,
StagingStepType.KERNEL_TREE_UPDATE,
StagingStepType.SELF_UPDATE,
StagingStepType.TRIGGER_RESTART,
]:
print(
f"Recovering stuck local step: {step.step_type.value}"
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ passlib[bcrypt]==1.7.4
aiofiles==23.2.1
httpx==0.25.2
toml
fastapi-crons==2.0.1
Loading