449 lines
No EOL
16 KiB
Python
449 lines
No EOL
16 KiB
Python
"""
|
|
CVE-SIGMA Auto Generator - Cron-like Job Scheduler
|
|
Automated scheduling and execution of data processing jobs
|
|
"""
|
|
|
|
import asyncio
|
|
import yaml
|
|
import logging
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Callable, Any
|
|
from dataclasses import dataclass, field
|
|
from croniter import croniter
|
|
from sqlalchemy.orm import Session
|
|
import pytz
|
|
import uuid
|
|
import json
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class ScheduledJob:
|
|
"""Represents a scheduled job configuration"""
|
|
job_id: str
|
|
name: str
|
|
enabled: bool
|
|
schedule: str # Cron expression
|
|
description: str
|
|
job_type: str
|
|
parameters: Dict[str, Any]
|
|
priority: str
|
|
timeout_minutes: int
|
|
retry_on_failure: bool
|
|
last_run: Optional[datetime] = None
|
|
next_run: Optional[datetime] = None
|
|
run_count: int = 0
|
|
failure_count: int = 0
|
|
is_running: bool = False
|
|
max_retries: int = 2
|
|
|
|
def __post_init__(self):
|
|
if self.next_run is None:
|
|
self.calculate_next_run()
|
|
|
|
def calculate_next_run(self, base_time: datetime = None) -> datetime:
|
|
"""Calculate the next run time based on cron schedule"""
|
|
if base_time is None:
|
|
base_time = datetime.now(pytz.UTC)
|
|
|
|
try:
|
|
cron = croniter(self.schedule, base_time)
|
|
self.next_run = cron.get_next(datetime)
|
|
return self.next_run
|
|
except Exception as e:
|
|
logger.error(f"Error calculating next run for job {self.name}: {e}")
|
|
# Fallback to 1 hour from now
|
|
self.next_run = base_time + timedelta(hours=1)
|
|
return self.next_run
|
|
|
|
def should_run(self, current_time: datetime = None) -> bool:
|
|
"""Check if job should run now"""
|
|
if not self.enabled or self.is_running:
|
|
return False
|
|
|
|
if current_time is None:
|
|
current_time = datetime.now(pytz.UTC)
|
|
|
|
return self.next_run and current_time >= self.next_run
|
|
|
|
def mark_started(self):
|
|
"""Mark job as started"""
|
|
self.is_running = True
|
|
self.last_run = datetime.now(pytz.UTC)
|
|
self.run_count += 1
|
|
|
|
def mark_completed(self, success: bool = True):
|
|
"""Mark job as completed"""
|
|
self.is_running = False
|
|
if not success:
|
|
self.failure_count += 1
|
|
self.calculate_next_run()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for serialization"""
|
|
return {
|
|
'job_id': self.job_id,
|
|
'name': self.name,
|
|
'enabled': self.enabled,
|
|
'schedule': self.schedule,
|
|
'description': self.description,
|
|
'job_type': self.job_type,
|
|
'parameters': self.parameters,
|
|
'priority': self.priority,
|
|
'timeout_minutes': self.timeout_minutes,
|
|
'retry_on_failure': self.retry_on_failure,
|
|
'last_run': self.last_run.isoformat() if self.last_run else None,
|
|
'next_run': self.next_run.isoformat() if self.next_run else None,
|
|
'run_count': self.run_count,
|
|
'failure_count': self.failure_count,
|
|
'is_running': self.is_running,
|
|
'max_retries': self.max_retries
|
|
}
|
|
|
|
class JobRegistry:
|
|
"""Registry of available job executors"""
|
|
|
|
def __init__(self):
|
|
self.executors: Dict[str, Callable] = {}
|
|
self.db_session_factory = None
|
|
|
|
def register_executor(self, job_type: str, executor_func: Callable):
|
|
"""Register a job executor function"""
|
|
self.executors[job_type] = executor_func
|
|
logger.info(f"Registered executor for job type: {job_type}")
|
|
|
|
def set_db_session_factory(self, session_factory):
|
|
"""Set database session factory"""
|
|
self.db_session_factory = session_factory
|
|
|
|
async def execute_job(self, job: ScheduledJob) -> bool:
|
|
"""Execute a scheduled job"""
|
|
if job.job_type not in self.executors:
|
|
logger.error(f"No executor found for job type: {job.job_type}")
|
|
return False
|
|
|
|
try:
|
|
logger.info(f"Executing scheduled job: {job.name} (type: {job.job_type})")
|
|
|
|
# Get database session
|
|
if self.db_session_factory:
|
|
db_session = self.db_session_factory()
|
|
else:
|
|
logger.error("No database session factory available")
|
|
return False
|
|
|
|
try:
|
|
# Execute the job
|
|
executor = self.executors[job.job_type]
|
|
result = await executor(db_session, job.parameters)
|
|
|
|
# Check result
|
|
if isinstance(result, dict):
|
|
success = result.get('status') in ['completed', 'success']
|
|
if not success:
|
|
logger.warning(f"Job {job.name} completed with status: {result.get('status')}")
|
|
else:
|
|
success = bool(result)
|
|
|
|
logger.info(f"Job {job.name} completed successfully: {success}")
|
|
return success
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing job {job.name}: {e}")
|
|
return False
|
|
|
|
class JobScheduler:
|
|
"""Main job scheduler with cron-like functionality"""
|
|
|
|
def __init__(self, config_path: str = "scheduler_config.yaml"):
|
|
self.config_path = config_path
|
|
self.config: Dict[str, Any] = {}
|
|
self.jobs: Dict[str, ScheduledJob] = {}
|
|
self.registry = JobRegistry()
|
|
self.is_running = False
|
|
self.scheduler_thread: Optional[threading.Thread] = None
|
|
self.stop_event = threading.Event()
|
|
self.timezone = pytz.UTC
|
|
self.max_concurrent_jobs = 3
|
|
self.current_jobs = 0
|
|
self.job_lock = threading.Lock()
|
|
|
|
# Load configuration
|
|
self.load_config()
|
|
|
|
# Setup logging
|
|
self.setup_logging()
|
|
|
|
def load_config(self):
|
|
"""Load scheduler configuration from YAML file"""
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
self.config = yaml.safe_load(f)
|
|
|
|
# Extract scheduler settings
|
|
scheduler_config = self.config.get('scheduler', {})
|
|
self.timezone = pytz.timezone(scheduler_config.get('timezone', 'UTC'))
|
|
self.max_concurrent_jobs = scheduler_config.get('max_concurrent_jobs', 3)
|
|
|
|
# Load job configurations
|
|
self.load_jobs()
|
|
|
|
logger.info(f"Loaded scheduler configuration with {len(self.jobs)} jobs")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading scheduler config: {e}")
|
|
self.config = {}
|
|
|
|
def load_jobs(self):
|
|
"""Load job configurations from config"""
|
|
jobs_config = self.config.get('jobs', {})
|
|
|
|
for job_name, job_config in jobs_config.items():
|
|
try:
|
|
job = ScheduledJob(
|
|
job_id=str(uuid.uuid4()),
|
|
name=job_name,
|
|
enabled=job_config.get('enabled', True),
|
|
schedule=job_config.get('schedule', '0 0 * * *'),
|
|
description=job_config.get('description', ''),
|
|
job_type=job_config.get('job_type', job_name),
|
|
parameters=job_config.get('parameters', {}),
|
|
priority=job_config.get('priority', 'medium'),
|
|
timeout_minutes=job_config.get('timeout_minutes', 60),
|
|
retry_on_failure=job_config.get('retry_on_failure', True),
|
|
max_retries=job_config.get('max_retries', 2)
|
|
)
|
|
|
|
self.jobs[job_name] = job
|
|
logger.info(f"Loaded job: {job_name} - Next run: {job.next_run}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading job {job_name}: {e}")
|
|
|
|
def setup_logging(self):
|
|
"""Setup scheduler-specific logging"""
|
|
log_config = self.config.get('logging', {})
|
|
if log_config.get('enabled', True):
|
|
log_level = getattr(logging, log_config.get('level', 'INFO'))
|
|
logger.setLevel(log_level)
|
|
|
|
def register_job_executor(self, job_type: str, executor_func: Callable):
|
|
"""Register a job executor"""
|
|
self.registry.register_executor(job_type, executor_func)
|
|
|
|
def set_db_session_factory(self, session_factory):
|
|
"""Set database session factory"""
|
|
self.registry.set_db_session_factory(session_factory)
|
|
|
|
def start(self):
|
|
"""Start the job scheduler"""
|
|
if self.is_running:
|
|
logger.warning("Scheduler is already running")
|
|
return
|
|
|
|
if not self.config.get('scheduler', {}).get('enabled', True):
|
|
logger.info("Scheduler is disabled in configuration")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.stop_event.clear()
|
|
|
|
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
|
|
self.scheduler_thread.start()
|
|
|
|
logger.info("Job scheduler started")
|
|
|
|
def stop(self):
|
|
"""Stop the job scheduler"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
self.is_running = False
|
|
self.stop_event.set()
|
|
|
|
if self.scheduler_thread:
|
|
self.scheduler_thread.join(timeout=5)
|
|
|
|
logger.info("Job scheduler stopped")
|
|
|
|
def _scheduler_loop(self):
|
|
"""Main scheduler loop"""
|
|
logger.info("Scheduler loop started")
|
|
|
|
while self.is_running and not self.stop_event.is_set():
|
|
try:
|
|
current_time = datetime.now(self.timezone)
|
|
|
|
# Check each job
|
|
for job_name, job in self.jobs.items():
|
|
if job.should_run(current_time) and self.current_jobs < self.max_concurrent_jobs:
|
|
# Execute job in background
|
|
threading.Thread(
|
|
target=self._execute_job_wrapper,
|
|
args=(job,),
|
|
daemon=True
|
|
).start()
|
|
|
|
# Sleep for 60 seconds (check every minute)
|
|
self.stop_event.wait(60)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in scheduler loop: {e}")
|
|
self.stop_event.wait(60)
|
|
|
|
logger.info("Scheduler loop stopped")
|
|
|
|
def _execute_job_wrapper(self, job: ScheduledJob):
|
|
"""Wrapper for job execution with proper error handling"""
|
|
with self.job_lock:
|
|
self.current_jobs += 1
|
|
|
|
try:
|
|
job.mark_started()
|
|
|
|
# Create asyncio event loop for this thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
# Execute job with timeout
|
|
success = loop.run_until_complete(
|
|
asyncio.wait_for(
|
|
self.registry.execute_job(job),
|
|
timeout=job.timeout_minutes * 60
|
|
)
|
|
)
|
|
|
|
job.mark_completed(success)
|
|
|
|
if not success and job.retry_on_failure and job.failure_count < job.max_retries:
|
|
logger.info(f"Job {job.name} failed, will retry later")
|
|
# Schedule retry (next run will be calculated)
|
|
job.calculate_next_run(datetime.now(self.timezone) + timedelta(minutes=30))
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.error(f"Job {job.name} timed out after {job.timeout_minutes} minutes")
|
|
job.mark_completed(False)
|
|
finally:
|
|
loop.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing job {job.name}: {e}")
|
|
job.mark_completed(False)
|
|
finally:
|
|
with self.job_lock:
|
|
self.current_jobs -= 1
|
|
|
|
def get_job_status(self, job_name: str = None) -> Dict[str, Any]:
|
|
"""Get status of jobs"""
|
|
if job_name:
|
|
job = self.jobs.get(job_name)
|
|
if job:
|
|
return job.to_dict()
|
|
else:
|
|
return {"error": f"Job {job_name} not found"}
|
|
|
|
return {
|
|
"scheduler_running": self.is_running,
|
|
"total_jobs": len(self.jobs),
|
|
"enabled_jobs": sum(1 for job in self.jobs.values() if job.enabled),
|
|
"running_jobs": sum(1 for job in self.jobs.values() if job.is_running),
|
|
"jobs": {name: job.to_dict() for name, job in self.jobs.items()}
|
|
}
|
|
|
|
def enable_job(self, job_name: str) -> bool:
|
|
"""Enable a job"""
|
|
if job_name in self.jobs:
|
|
self.jobs[job_name].enabled = True
|
|
self.jobs[job_name].calculate_next_run()
|
|
logger.info(f"Enabled job: {job_name}")
|
|
return True
|
|
return False
|
|
|
|
def disable_job(self, job_name: str) -> bool:
|
|
"""Disable a job"""
|
|
if job_name in self.jobs:
|
|
self.jobs[job_name].enabled = False
|
|
logger.info(f"Disabled job: {job_name}")
|
|
return True
|
|
return False
|
|
|
|
def trigger_job(self, job_name: str) -> bool:
|
|
"""Manually trigger a job"""
|
|
if job_name not in self.jobs:
|
|
return False
|
|
|
|
job = self.jobs[job_name]
|
|
if job.is_running:
|
|
logger.warning(f"Job {job_name} is already running")
|
|
return False
|
|
|
|
if self.current_jobs >= self.max_concurrent_jobs:
|
|
logger.warning(f"Maximum concurrent jobs reached, cannot start {job_name}")
|
|
return False
|
|
|
|
# Execute job immediately
|
|
threading.Thread(
|
|
target=self._execute_job_wrapper,
|
|
args=(job,),
|
|
daemon=True
|
|
).start()
|
|
|
|
logger.info(f"Manually triggered job: {job_name}")
|
|
return True
|
|
|
|
def update_job_schedule(self, job_name: str, new_schedule: str) -> bool:
|
|
"""Update job schedule"""
|
|
if job_name not in self.jobs:
|
|
return False
|
|
|
|
try:
|
|
# Validate cron expression
|
|
croniter(new_schedule)
|
|
|
|
job = self.jobs[job_name]
|
|
job.schedule = new_schedule
|
|
job.calculate_next_run()
|
|
|
|
logger.info(f"Updated schedule for job {job_name}: {new_schedule}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Invalid cron expression {new_schedule}: {e}")
|
|
return False
|
|
|
|
def reload_config(self) -> bool:
|
|
"""Reload configuration from file"""
|
|
try:
|
|
self.load_config()
|
|
logger.info("Configuration reloaded successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error reloading configuration: {e}")
|
|
return False
|
|
|
|
# Global scheduler instance
|
|
scheduler_instance: Optional[JobScheduler] = None
|
|
|
|
def get_scheduler() -> JobScheduler:
|
|
"""Get the global scheduler instance"""
|
|
global scheduler_instance
|
|
if scheduler_instance is None:
|
|
scheduler_instance = JobScheduler()
|
|
return scheduler_instance
|
|
|
|
def initialize_scheduler(config_path: str = None) -> JobScheduler:
|
|
"""Initialize the global scheduler"""
|
|
global scheduler_instance
|
|
if config_path:
|
|
scheduler_instance = JobScheduler(config_path)
|
|
else:
|
|
scheduler_instance = JobScheduler()
|
|
return scheduler_instance |