auto_sigma_rule_generator/backend/job_scheduler.py
2025-07-11 09:16:57 -05:00

449 lines
No EOL
16 KiB
Python

"""
CVE-SIGMA Auto Generator - Cron-like Job Scheduler
Automated scheduling and execution of data processing jobs
"""
import asyncio
import yaml
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from croniter import croniter
from sqlalchemy.orm import Session
import pytz
import uuid
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ScheduledJob:
"""Represents a scheduled job configuration"""
job_id: str
name: str
enabled: bool
schedule: str # Cron expression
description: str
job_type: str
parameters: Dict[str, Any]
priority: str
timeout_minutes: int
retry_on_failure: bool
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
run_count: int = 0
failure_count: int = 0
is_running: bool = False
max_retries: int = 2
def __post_init__(self):
if self.next_run is None:
self.calculate_next_run()
def calculate_next_run(self, base_time: datetime = None) -> datetime:
"""Calculate the next run time based on cron schedule"""
if base_time is None:
base_time = datetime.now(pytz.UTC)
try:
cron = croniter(self.schedule, base_time)
self.next_run = cron.get_next(datetime)
return self.next_run
except Exception as e:
logger.error(f"Error calculating next run for job {self.name}: {e}")
# Fallback to 1 hour from now
self.next_run = base_time + timedelta(hours=1)
return self.next_run
def should_run(self, current_time: datetime = None) -> bool:
"""Check if job should run now"""
if not self.enabled or self.is_running:
return False
if current_time is None:
current_time = datetime.now(pytz.UTC)
return self.next_run and current_time >= self.next_run
def mark_started(self):
"""Mark job as started"""
self.is_running = True
self.last_run = datetime.now(pytz.UTC)
self.run_count += 1
def mark_completed(self, success: bool = True):
"""Mark job as completed"""
self.is_running = False
if not success:
self.failure_count += 1
self.calculate_next_run()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization"""
return {
'job_id': self.job_id,
'name': self.name,
'enabled': self.enabled,
'schedule': self.schedule,
'description': self.description,
'job_type': self.job_type,
'parameters': self.parameters,
'priority': self.priority,
'timeout_minutes': self.timeout_minutes,
'retry_on_failure': self.retry_on_failure,
'last_run': self.last_run.isoformat() if self.last_run else None,
'next_run': self.next_run.isoformat() if self.next_run else None,
'run_count': self.run_count,
'failure_count': self.failure_count,
'is_running': self.is_running,
'max_retries': self.max_retries
}
class JobRegistry:
"""Registry of available job executors"""
def __init__(self):
self.executors: Dict[str, Callable] = {}
self.db_session_factory = None
def register_executor(self, job_type: str, executor_func: Callable):
"""Register a job executor function"""
self.executors[job_type] = executor_func
logger.info(f"Registered executor for job type: {job_type}")
def set_db_session_factory(self, session_factory):
"""Set database session factory"""
self.db_session_factory = session_factory
async def execute_job(self, job: ScheduledJob) -> bool:
"""Execute a scheduled job"""
if job.job_type not in self.executors:
logger.error(f"No executor found for job type: {job.job_type}")
return False
try:
logger.info(f"Executing scheduled job: {job.name} (type: {job.job_type})")
# Get database session
if self.db_session_factory:
db_session = self.db_session_factory()
else:
logger.error("No database session factory available")
return False
try:
# Execute the job
executor = self.executors[job.job_type]
result = await executor(db_session, job.parameters)
# Check result
if isinstance(result, dict):
success = result.get('status') in ['completed', 'success']
if not success:
logger.warning(f"Job {job.name} completed with status: {result.get('status')}")
else:
success = bool(result)
logger.info(f"Job {job.name} completed successfully: {success}")
return success
finally:
db_session.close()
except Exception as e:
logger.error(f"Error executing job {job.name}: {e}")
return False
class JobScheduler:
"""Main job scheduler with cron-like functionality"""
def __init__(self, config_path: str = "scheduler_config.yaml"):
self.config_path = config_path
self.config: Dict[str, Any] = {}
self.jobs: Dict[str, ScheduledJob] = {}
self.registry = JobRegistry()
self.is_running = False
self.scheduler_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
self.timezone = pytz.UTC
self.max_concurrent_jobs = 3
self.current_jobs = 0
self.job_lock = threading.Lock()
# Load configuration
self.load_config()
# Setup logging
self.setup_logging()
def load_config(self):
"""Load scheduler configuration from YAML file"""
try:
with open(self.config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Extract scheduler settings
scheduler_config = self.config.get('scheduler', {})
self.timezone = pytz.timezone(scheduler_config.get('timezone', 'UTC'))
self.max_concurrent_jobs = scheduler_config.get('max_concurrent_jobs', 3)
# Load job configurations
self.load_jobs()
logger.info(f"Loaded scheduler configuration with {len(self.jobs)} jobs")
except Exception as e:
logger.error(f"Error loading scheduler config: {e}")
self.config = {}
def load_jobs(self):
"""Load job configurations from config"""
jobs_config = self.config.get('jobs', {})
for job_name, job_config in jobs_config.items():
try:
job = ScheduledJob(
job_id=str(uuid.uuid4()),
name=job_name,
enabled=job_config.get('enabled', True),
schedule=job_config.get('schedule', '0 0 * * *'),
description=job_config.get('description', ''),
job_type=job_config.get('job_type', job_name),
parameters=job_config.get('parameters', {}),
priority=job_config.get('priority', 'medium'),
timeout_minutes=job_config.get('timeout_minutes', 60),
retry_on_failure=job_config.get('retry_on_failure', True),
max_retries=job_config.get('max_retries', 2)
)
self.jobs[job_name] = job
logger.info(f"Loaded job: {job_name} - Next run: {job.next_run}")
except Exception as e:
logger.error(f"Error loading job {job_name}: {e}")
def setup_logging(self):
"""Setup scheduler-specific logging"""
log_config = self.config.get('logging', {})
if log_config.get('enabled', True):
log_level = getattr(logging, log_config.get('level', 'INFO'))
logger.setLevel(log_level)
def register_job_executor(self, job_type: str, executor_func: Callable):
"""Register a job executor"""
self.registry.register_executor(job_type, executor_func)
def set_db_session_factory(self, session_factory):
"""Set database session factory"""
self.registry.set_db_session_factory(session_factory)
def start(self):
"""Start the job scheduler"""
if self.is_running:
logger.warning("Scheduler is already running")
return
if not self.config.get('scheduler', {}).get('enabled', True):
logger.info("Scheduler is disabled in configuration")
return
self.is_running = True
self.stop_event.clear()
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.scheduler_thread.start()
logger.info("Job scheduler started")
def stop(self):
"""Stop the job scheduler"""
if not self.is_running:
return
self.is_running = False
self.stop_event.set()
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
logger.info("Job scheduler stopped")
def _scheduler_loop(self):
"""Main scheduler loop"""
logger.info("Scheduler loop started")
while self.is_running and not self.stop_event.is_set():
try:
current_time = datetime.now(self.timezone)
# Check each job
for job_name, job in self.jobs.items():
if job.should_run(current_time) and self.current_jobs < self.max_concurrent_jobs:
# Execute job in background
threading.Thread(
target=self._execute_job_wrapper,
args=(job,),
daemon=True
).start()
# Sleep for 60 seconds (check every minute)
self.stop_event.wait(60)
except Exception as e:
logger.error(f"Error in scheduler loop: {e}")
self.stop_event.wait(60)
logger.info("Scheduler loop stopped")
def _execute_job_wrapper(self, job: ScheduledJob):
"""Wrapper for job execution with proper error handling"""
with self.job_lock:
self.current_jobs += 1
try:
job.mark_started()
# Create asyncio event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Execute job with timeout
success = loop.run_until_complete(
asyncio.wait_for(
self.registry.execute_job(job),
timeout=job.timeout_minutes * 60
)
)
job.mark_completed(success)
if not success and job.retry_on_failure and job.failure_count < job.max_retries:
logger.info(f"Job {job.name} failed, will retry later")
# Schedule retry (next run will be calculated)
job.calculate_next_run(datetime.now(self.timezone) + timedelta(minutes=30))
except asyncio.TimeoutError:
logger.error(f"Job {job.name} timed out after {job.timeout_minutes} minutes")
job.mark_completed(False)
finally:
loop.close()
except Exception as e:
logger.error(f"Error executing job {job.name}: {e}")
job.mark_completed(False)
finally:
with self.job_lock:
self.current_jobs -= 1
def get_job_status(self, job_name: str = None) -> Dict[str, Any]:
"""Get status of jobs"""
if job_name:
job = self.jobs.get(job_name)
if job:
return job.to_dict()
else:
return {"error": f"Job {job_name} not found"}
return {
"scheduler_running": self.is_running,
"total_jobs": len(self.jobs),
"enabled_jobs": sum(1 for job in self.jobs.values() if job.enabled),
"running_jobs": sum(1 for job in self.jobs.values() if job.is_running),
"jobs": {name: job.to_dict() for name, job in self.jobs.items()}
}
def enable_job(self, job_name: str) -> bool:
"""Enable a job"""
if job_name in self.jobs:
self.jobs[job_name].enabled = True
self.jobs[job_name].calculate_next_run()
logger.info(f"Enabled job: {job_name}")
return True
return False
def disable_job(self, job_name: str) -> bool:
"""Disable a job"""
if job_name in self.jobs:
self.jobs[job_name].enabled = False
logger.info(f"Disabled job: {job_name}")
return True
return False
def trigger_job(self, job_name: str) -> bool:
"""Manually trigger a job"""
if job_name not in self.jobs:
return False
job = self.jobs[job_name]
if job.is_running:
logger.warning(f"Job {job_name} is already running")
return False
if self.current_jobs >= self.max_concurrent_jobs:
logger.warning(f"Maximum concurrent jobs reached, cannot start {job_name}")
return False
# Execute job immediately
threading.Thread(
target=self._execute_job_wrapper,
args=(job,),
daemon=True
).start()
logger.info(f"Manually triggered job: {job_name}")
return True
def update_job_schedule(self, job_name: str, new_schedule: str) -> bool:
"""Update job schedule"""
if job_name not in self.jobs:
return False
try:
# Validate cron expression
croniter(new_schedule)
job = self.jobs[job_name]
job.schedule = new_schedule
job.calculate_next_run()
logger.info(f"Updated schedule for job {job_name}: {new_schedule}")
return True
except Exception as e:
logger.error(f"Invalid cron expression {new_schedule}: {e}")
return False
def reload_config(self) -> bool:
"""Reload configuration from file"""
try:
self.load_config()
logger.info("Configuration reloaded successfully")
return True
except Exception as e:
logger.error(f"Error reloading configuration: {e}")
return False
# Global scheduler instance
scheduler_instance: Optional[JobScheduler] = None
def get_scheduler() -> JobScheduler:
"""Get the global scheduler instance"""
global scheduler_instance
if scheduler_instance is None:
scheduler_instance = JobScheduler()
return scheduler_instance
def initialize_scheduler(config_path: str = None) -> JobScheduler:
"""Initialize the global scheduler"""
global scheduler_instance
if config_path:
scheduler_instance = JobScheduler(config_path)
else:
scheduler_instance = JobScheduler()
return scheduler_instance