From d17f961b9ded40f0b91bc1ce16aa82efed58c83f Mon Sep 17 00:00:00 2001 From: bpmcdevitt Date: Fri, 11 Jul 2025 09:16:57 -0500 Subject: [PATCH] add job scheduler --- backend/job_executors.py | 384 +++++++++++++++++++++++++++++ backend/job_scheduler.py | 449 ++++++++++++++++++++++++++++++++++ backend/main.py | 215 +++++++++++++++- backend/requirements.txt | 2 + backend/scheduler_config.yaml | 180 ++++++++++++++ frontend/src/App.js | 286 ++++++++++++++++++++++ 6 files changed, 1512 insertions(+), 4 deletions(-) create mode 100644 backend/job_executors.py create mode 100644 backend/job_scheduler.py create mode 100644 backend/scheduler_config.yaml diff --git a/backend/job_executors.py b/backend/job_executors.py new file mode 100644 index 0000000..f120ade --- /dev/null +++ b/backend/job_executors.py @@ -0,0 +1,384 @@ +""" +Job Executors for Scheduled Tasks +Integrates existing job functions with the scheduler +""" + +import asyncio +import logging +from typing import Dict, Any +from sqlalchemy.orm import Session +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + +class JobExecutors: + """Collection of job executor functions for the scheduler""" + + @staticmethod + async def incremental_update(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute NVD incremental update job""" + try: + from bulk_seeder import BulkSeeder + + seeder = BulkSeeder(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 100) + skip_nvd = parameters.get('skip_nvd', False) + skip_nomi_sec = parameters.get('skip_nomi_sec', True) + + logger.info(f"Starting incremental update - batch_size: {batch_size}") + + result = await seeder.incremental_update() + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'incremental_update', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Incremental update job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'incremental_update', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def cisa_kev_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute CISA KEV sync job""" + try: + from cisa_kev_client import CISAKEVClient + + client = CISAKEVClient(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 100) + + logger.info(f"Starting CISA KEV sync - batch_size: {batch_size}") + + result = await client.bulk_sync_kev_data(batch_size=batch_size) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'cisa_kev_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"CISA KEV sync job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'cisa_kev_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def nomi_sec_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute nomi-sec PoC sync job""" + try: + from nomi_sec_client import NomiSecClient + + client = NomiSecClient(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 50) + + logger.info(f"Starting nomi-sec sync - batch_size: {batch_size}") + + result = await client.bulk_sync_poc_data(batch_size=batch_size) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'nomi_sec_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Nomi-sec sync job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'nomi_sec_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def github_poc_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute GitHub PoC sync job""" + try: + from mcdevitt_poc_client import GitHubPoCClient + + client = GitHubPoCClient(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 50) + + logger.info(f"Starting GitHub PoC sync - batch_size: {batch_size}") + + result = await client.bulk_sync_poc_data(batch_size=batch_size) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'github_poc_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"GitHub PoC sync job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'github_poc_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def exploitdb_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute ExploitDB sync job""" + try: + from exploitdb_client_local import ExploitDBLocalClient + + client = ExploitDBLocalClient(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 30) + + logger.info(f"Starting ExploitDB sync - batch_size: {batch_size}") + + result = await client.bulk_sync_exploitdb_data(batch_size=batch_size) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'exploitdb_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"ExploitDB sync job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'exploitdb_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def reference_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute reference data sync job""" + try: + from reference_client import ReferenceClient + + client = ReferenceClient(db_session) + + # Extract parameters + batch_size = parameters.get('batch_size', 30) + max_cves = parameters.get('max_cves', 200) + force_resync = parameters.get('force_resync', False) + + logger.info(f"Starting reference sync - batch_size: {batch_size}, max_cves: {max_cves}") + + result = await client.bulk_sync_references( + batch_size=batch_size, + max_cves=max_cves, + force_resync=force_resync + ) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'reference_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Reference sync job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'reference_sync', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def rule_regeneration(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute SIGMA rule regeneration job""" + try: + from enhanced_sigma_generator import EnhancedSigmaGenerator + + generator = EnhancedSigmaGenerator(db_session) + + # Extract parameters + force = parameters.get('force', False) + + logger.info(f"Starting rule regeneration - force: {force}") + + # Get CVEs that need rule regeneration + from main import CVE + if force: + # Regenerate all rules + cves = db_session.query(CVE).all() + else: + # Only regenerate for CVEs with new data + cves = db_session.query(CVE).filter( + CVE.updated_at > CVE.created_at + ).all() + + total_processed = 0 + total_generated = 0 + + for cve in cves: + try: + # Generate enhanced rule + rule_content = await generator.generate_enhanced_sigma_rule(cve.cve_id) + if rule_content: + total_generated += 1 + total_processed += 1 + + # Small delay to prevent overwhelming the system + await asyncio.sleep(0.1) + + except Exception as e: + logger.error(f"Error regenerating rule for {cve.cve_id}: {e}") + + result = { + 'total_processed': total_processed, + 'total_generated': total_generated, + 'generation_rate': total_generated / total_processed if total_processed > 0 else 0 + } + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'rule_regeneration', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Rule regeneration job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'rule_regeneration', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def bulk_seed(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute full bulk seed job""" + try: + from bulk_seeder import BulkSeeder + + seeder = BulkSeeder(db_session) + + # Extract parameters + start_year = parameters.get('start_year', 2020) + end_year = parameters.get('end_year', 2025) + batch_size = parameters.get('batch_size', 100) + skip_nvd = parameters.get('skip_nvd', False) + skip_nomi_sec = parameters.get('skip_nomi_sec', False) + + logger.info(f"Starting full bulk seed - years: {start_year}-{end_year}") + + result = await seeder.full_bulk_seed( + start_year=start_year, + end_year=end_year + ) + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'bulk_seed', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Bulk seed job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'bulk_seed', + 'completed_at': datetime.utcnow().isoformat() + } + + @staticmethod + async def database_cleanup(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Execute database cleanup job""" + try: + from main import BulkProcessingJob + + # Extract parameters + days_to_keep = parameters.get('days_to_keep', 30) + cleanup_failed_jobs = parameters.get('cleanup_failed_jobs', True) + cleanup_logs = parameters.get('cleanup_logs', True) + + logger.info(f"Starting database cleanup - keep {days_to_keep} days") + + cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep) + + deleted_jobs = 0 + + # Clean up old bulk processing jobs + if cleanup_failed_jobs: + # Delete failed jobs older than cutoff + deleted = db_session.query(BulkProcessingJob).filter( + BulkProcessingJob.status == 'failed', + BulkProcessingJob.created_at < cutoff_date + ).delete() + deleted_jobs += deleted + + # Delete completed jobs older than cutoff (keep some recent ones) + very_old_cutoff = datetime.utcnow() - timedelta(days=days_to_keep * 2) + deleted = db_session.query(BulkProcessingJob).filter( + BulkProcessingJob.status == 'completed', + BulkProcessingJob.created_at < very_old_cutoff + ).delete() + deleted_jobs += deleted + + db_session.commit() + + result = { + 'deleted_jobs': deleted_jobs, + 'cutoff_date': cutoff_date.isoformat(), + 'cleanup_type': 'bulk_processing_jobs' + } + + return { + 'status': 'completed', + 'result': result, + 'job_type': 'database_cleanup', + 'completed_at': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Database cleanup job failed: {e}") + return { + 'status': 'failed', + 'error': str(e), + 'job_type': 'database_cleanup', + 'completed_at': datetime.utcnow().isoformat() + } + +def register_all_executors(scheduler): + """Register all job executors with the scheduler""" + executors = JobExecutors() + + scheduler.register_job_executor('incremental_update', executors.incremental_update) + scheduler.register_job_executor('cisa_kev_sync', executors.cisa_kev_sync) + scheduler.register_job_executor('nomi_sec_sync', executors.nomi_sec_sync) + scheduler.register_job_executor('github_poc_sync', executors.github_poc_sync) + scheduler.register_job_executor('exploitdb_sync', executors.exploitdb_sync) + scheduler.register_job_executor('reference_sync', executors.reference_sync) + scheduler.register_job_executor('rule_regeneration', executors.rule_regeneration) + scheduler.register_job_executor('bulk_seed', executors.bulk_seed) + scheduler.register_job_executor('database_cleanup', executors.database_cleanup) + + logger.info("All job executors registered successfully") \ No newline at end of file diff --git a/backend/job_scheduler.py b/backend/job_scheduler.py new file mode 100644 index 0000000..8411172 --- /dev/null +++ b/backend/job_scheduler.py @@ -0,0 +1,449 @@ +""" +CVE-SIGMA Auto Generator - Cron-like Job Scheduler +Automated scheduling and execution of data processing jobs +""" + +import asyncio +import yaml +import logging +import threading +import time +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Callable, Any +from dataclasses import dataclass, field +from croniter import croniter +from sqlalchemy.orm import Session +import pytz +import uuid +import json + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +@dataclass +class ScheduledJob: + """Represents a scheduled job configuration""" + job_id: str + name: str + enabled: bool + schedule: str # Cron expression + description: str + job_type: str + parameters: Dict[str, Any] + priority: str + timeout_minutes: int + retry_on_failure: bool + last_run: Optional[datetime] = None + next_run: Optional[datetime] = None + run_count: int = 0 + failure_count: int = 0 + is_running: bool = False + max_retries: int = 2 + + def __post_init__(self): + if self.next_run is None: + self.calculate_next_run() + + def calculate_next_run(self, base_time: datetime = None) -> datetime: + """Calculate the next run time based on cron schedule""" + if base_time is None: + base_time = datetime.now(pytz.UTC) + + try: + cron = croniter(self.schedule, base_time) + self.next_run = cron.get_next(datetime) + return self.next_run + except Exception as e: + logger.error(f"Error calculating next run for job {self.name}: {e}") + # Fallback to 1 hour from now + self.next_run = base_time + timedelta(hours=1) + return self.next_run + + def should_run(self, current_time: datetime = None) -> bool: + """Check if job should run now""" + if not self.enabled or self.is_running: + return False + + if current_time is None: + current_time = datetime.now(pytz.UTC) + + return self.next_run and current_time >= self.next_run + + def mark_started(self): + """Mark job as started""" + self.is_running = True + self.last_run = datetime.now(pytz.UTC) + self.run_count += 1 + + def mark_completed(self, success: bool = True): + """Mark job as completed""" + self.is_running = False + if not success: + self.failure_count += 1 + self.calculate_next_run() + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization""" + return { + 'job_id': self.job_id, + 'name': self.name, + 'enabled': self.enabled, + 'schedule': self.schedule, + 'description': self.description, + 'job_type': self.job_type, + 'parameters': self.parameters, + 'priority': self.priority, + 'timeout_minutes': self.timeout_minutes, + 'retry_on_failure': self.retry_on_failure, + 'last_run': self.last_run.isoformat() if self.last_run else None, + 'next_run': self.next_run.isoformat() if self.next_run else None, + 'run_count': self.run_count, + 'failure_count': self.failure_count, + 'is_running': self.is_running, + 'max_retries': self.max_retries + } + +class JobRegistry: + """Registry of available job executors""" + + def __init__(self): + self.executors: Dict[str, Callable] = {} + self.db_session_factory = None + + def register_executor(self, job_type: str, executor_func: Callable): + """Register a job executor function""" + self.executors[job_type] = executor_func + logger.info(f"Registered executor for job type: {job_type}") + + def set_db_session_factory(self, session_factory): + """Set database session factory""" + self.db_session_factory = session_factory + + async def execute_job(self, job: ScheduledJob) -> bool: + """Execute a scheduled job""" + if job.job_type not in self.executors: + logger.error(f"No executor found for job type: {job.job_type}") + return False + + try: + logger.info(f"Executing scheduled job: {job.name} (type: {job.job_type})") + + # Get database session + if self.db_session_factory: + db_session = self.db_session_factory() + else: + logger.error("No database session factory available") + return False + + try: + # Execute the job + executor = self.executors[job.job_type] + result = await executor(db_session, job.parameters) + + # Check result + if isinstance(result, dict): + success = result.get('status') in ['completed', 'success'] + if not success: + logger.warning(f"Job {job.name} completed with status: {result.get('status')}") + else: + success = bool(result) + + logger.info(f"Job {job.name} completed successfully: {success}") + return success + + finally: + db_session.close() + + except Exception as e: + logger.error(f"Error executing job {job.name}: {e}") + return False + +class JobScheduler: + """Main job scheduler with cron-like functionality""" + + def __init__(self, config_path: str = "scheduler_config.yaml"): + self.config_path = config_path + self.config: Dict[str, Any] = {} + self.jobs: Dict[str, ScheduledJob] = {} + self.registry = JobRegistry() + self.is_running = False + self.scheduler_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + self.timezone = pytz.UTC + self.max_concurrent_jobs = 3 + self.current_jobs = 0 + self.job_lock = threading.Lock() + + # Load configuration + self.load_config() + + # Setup logging + self.setup_logging() + + def load_config(self): + """Load scheduler configuration from YAML file""" + try: + with open(self.config_path, 'r') as f: + self.config = yaml.safe_load(f) + + # Extract scheduler settings + scheduler_config = self.config.get('scheduler', {}) + self.timezone = pytz.timezone(scheduler_config.get('timezone', 'UTC')) + self.max_concurrent_jobs = scheduler_config.get('max_concurrent_jobs', 3) + + # Load job configurations + self.load_jobs() + + logger.info(f"Loaded scheduler configuration with {len(self.jobs)} jobs") + + except Exception as e: + logger.error(f"Error loading scheduler config: {e}") + self.config = {} + + def load_jobs(self): + """Load job configurations from config""" + jobs_config = self.config.get('jobs', {}) + + for job_name, job_config in jobs_config.items(): + try: + job = ScheduledJob( + job_id=str(uuid.uuid4()), + name=job_name, + enabled=job_config.get('enabled', True), + schedule=job_config.get('schedule', '0 0 * * *'), + description=job_config.get('description', ''), + job_type=job_config.get('job_type', job_name), + parameters=job_config.get('parameters', {}), + priority=job_config.get('priority', 'medium'), + timeout_minutes=job_config.get('timeout_minutes', 60), + retry_on_failure=job_config.get('retry_on_failure', True), + max_retries=job_config.get('max_retries', 2) + ) + + self.jobs[job_name] = job + logger.info(f"Loaded job: {job_name} - Next run: {job.next_run}") + + except Exception as e: + logger.error(f"Error loading job {job_name}: {e}") + + def setup_logging(self): + """Setup scheduler-specific logging""" + log_config = self.config.get('logging', {}) + if log_config.get('enabled', True): + log_level = getattr(logging, log_config.get('level', 'INFO')) + logger.setLevel(log_level) + + def register_job_executor(self, job_type: str, executor_func: Callable): + """Register a job executor""" + self.registry.register_executor(job_type, executor_func) + + def set_db_session_factory(self, session_factory): + """Set database session factory""" + self.registry.set_db_session_factory(session_factory) + + def start(self): + """Start the job scheduler""" + if self.is_running: + logger.warning("Scheduler is already running") + return + + if not self.config.get('scheduler', {}).get('enabled', True): + logger.info("Scheduler is disabled in configuration") + return + + self.is_running = True + self.stop_event.clear() + + self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True) + self.scheduler_thread.start() + + logger.info("Job scheduler started") + + def stop(self): + """Stop the job scheduler""" + if not self.is_running: + return + + self.is_running = False + self.stop_event.set() + + if self.scheduler_thread: + self.scheduler_thread.join(timeout=5) + + logger.info("Job scheduler stopped") + + def _scheduler_loop(self): + """Main scheduler loop""" + logger.info("Scheduler loop started") + + while self.is_running and not self.stop_event.is_set(): + try: + current_time = datetime.now(self.timezone) + + # Check each job + for job_name, job in self.jobs.items(): + if job.should_run(current_time) and self.current_jobs < self.max_concurrent_jobs: + # Execute job in background + threading.Thread( + target=self._execute_job_wrapper, + args=(job,), + daemon=True + ).start() + + # Sleep for 60 seconds (check every minute) + self.stop_event.wait(60) + + except Exception as e: + logger.error(f"Error in scheduler loop: {e}") + self.stop_event.wait(60) + + logger.info("Scheduler loop stopped") + + def _execute_job_wrapper(self, job: ScheduledJob): + """Wrapper for job execution with proper error handling""" + with self.job_lock: + self.current_jobs += 1 + + try: + job.mark_started() + + # Create asyncio event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # Execute job with timeout + success = loop.run_until_complete( + asyncio.wait_for( + self.registry.execute_job(job), + timeout=job.timeout_minutes * 60 + ) + ) + + job.mark_completed(success) + + if not success and job.retry_on_failure and job.failure_count < job.max_retries: + logger.info(f"Job {job.name} failed, will retry later") + # Schedule retry (next run will be calculated) + job.calculate_next_run(datetime.now(self.timezone) + timedelta(minutes=30)) + + except asyncio.TimeoutError: + logger.error(f"Job {job.name} timed out after {job.timeout_minutes} minutes") + job.mark_completed(False) + finally: + loop.close() + + except Exception as e: + logger.error(f"Error executing job {job.name}: {e}") + job.mark_completed(False) + finally: + with self.job_lock: + self.current_jobs -= 1 + + def get_job_status(self, job_name: str = None) -> Dict[str, Any]: + """Get status of jobs""" + if job_name: + job = self.jobs.get(job_name) + if job: + return job.to_dict() + else: + return {"error": f"Job {job_name} not found"} + + return { + "scheduler_running": self.is_running, + "total_jobs": len(self.jobs), + "enabled_jobs": sum(1 for job in self.jobs.values() if job.enabled), + "running_jobs": sum(1 for job in self.jobs.values() if job.is_running), + "jobs": {name: job.to_dict() for name, job in self.jobs.items()} + } + + def enable_job(self, job_name: str) -> bool: + """Enable a job""" + if job_name in self.jobs: + self.jobs[job_name].enabled = True + self.jobs[job_name].calculate_next_run() + logger.info(f"Enabled job: {job_name}") + return True + return False + + def disable_job(self, job_name: str) -> bool: + """Disable a job""" + if job_name in self.jobs: + self.jobs[job_name].enabled = False + logger.info(f"Disabled job: {job_name}") + return True + return False + + def trigger_job(self, job_name: str) -> bool: + """Manually trigger a job""" + if job_name not in self.jobs: + return False + + job = self.jobs[job_name] + if job.is_running: + logger.warning(f"Job {job_name} is already running") + return False + + if self.current_jobs >= self.max_concurrent_jobs: + logger.warning(f"Maximum concurrent jobs reached, cannot start {job_name}") + return False + + # Execute job immediately + threading.Thread( + target=self._execute_job_wrapper, + args=(job,), + daemon=True + ).start() + + logger.info(f"Manually triggered job: {job_name}") + return True + + def update_job_schedule(self, job_name: str, new_schedule: str) -> bool: + """Update job schedule""" + if job_name not in self.jobs: + return False + + try: + # Validate cron expression + croniter(new_schedule) + + job = self.jobs[job_name] + job.schedule = new_schedule + job.calculate_next_run() + + logger.info(f"Updated schedule for job {job_name}: {new_schedule}") + return True + + except Exception as e: + logger.error(f"Invalid cron expression {new_schedule}: {e}") + return False + + def reload_config(self) -> bool: + """Reload configuration from file""" + try: + self.load_config() + logger.info("Configuration reloaded successfully") + return True + except Exception as e: + logger.error(f"Error reloading configuration: {e}") + return False + +# Global scheduler instance +scheduler_instance: Optional[JobScheduler] = None + +def get_scheduler() -> JobScheduler: + """Get the global scheduler instance""" + global scheduler_instance + if scheduler_instance is None: + scheduler_instance = JobScheduler() + return scheduler_instance + +def initialize_scheduler(config_path: str = None) -> JobScheduler: + """Initialize the global scheduler""" + global scheduler_instance + if config_path: + scheduler_instance = JobScheduler(config_path) + else: + scheduler_instance = JobScheduler() + return scheduler_instance \ No newline at end of file diff --git a/backend/main.py b/backend/main.py index 706497a..0b498e6 100644 --- a/backend/main.py +++ b/backend/main.py @@ -805,11 +805,50 @@ async def background_cve_fetch(): @asynccontextmanager async def lifespan(app: FastAPI): - # Background CVE fetching disabled - use manual fetch endpoint instead - # task = asyncio.create_task(background_cve_fetch()) + # Initialize database + Base.metadata.create_all(bind=engine) + + # Initialize rule templates + db = SessionLocal() + try: + existing_templates = db.query(RuleTemplate).count() + if existing_templates == 0: + logger.info("No rule templates found. Database initialization will handle template creation.") + except Exception as e: + logger.error(f"Error checking rule templates: {e}") + finally: + db.close() + + # Initialize and start the job scheduler + try: + from job_scheduler import initialize_scheduler + from job_executors import register_all_executors + + # Initialize scheduler + scheduler = initialize_scheduler() + scheduler.set_db_session_factory(SessionLocal) + + # Register all job executors + register_all_executors(scheduler) + + # Start the scheduler + scheduler.start() + + logger.info("Job scheduler initialized and started") + + except Exception as e: + logger.error(f"Error initializing job scheduler: {e}") + yield - # Clean up - # task.cancel() + + # Shutdown + try: + from job_scheduler import get_scheduler + scheduler = get_scheduler() + scheduler.stop() + logger.info("Job scheduler stopped") + except Exception as e: + logger.error(f"Error stopping job scheduler: {e}") # FastAPI app app = FastAPI(title="CVE-SIGMA Auto Generator", lifespan=lifespan) @@ -2143,6 +2182,174 @@ async def get_ollama_models(): logger.error(f"Error getting Ollama models: {e}") raise HTTPException(status_code=500, detail=str(e)) +# ============================================================================ +# SCHEDULER ENDPOINTS +# ============================================================================ + +class SchedulerControlRequest(BaseModel): + action: str # 'start', 'stop', 'restart' + +class JobControlRequest(BaseModel): + job_name: str + action: str # 'enable', 'disable', 'trigger' + +class UpdateScheduleRequest(BaseModel): + job_name: str + schedule: str # Cron expression + +@app.get("/api/scheduler/status") +async def get_scheduler_status(): + """Get scheduler status and job information""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + status = scheduler.get_job_status() + + return { + "scheduler_status": status, + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Error getting scheduler status: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/scheduler/control") +async def control_scheduler(request: SchedulerControlRequest): + """Control scheduler (start/stop/restart)""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + + if request.action == 'start': + scheduler.start() + message = "Scheduler started" + elif request.action == 'stop': + scheduler.stop() + message = "Scheduler stopped" + elif request.action == 'restart': + scheduler.stop() + scheduler.start() + message = "Scheduler restarted" + else: + raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}") + + return { + "message": message, + "action": request.action, + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Error controlling scheduler: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/scheduler/job/control") +async def control_job(request: JobControlRequest): + """Control individual jobs (enable/disable/trigger)""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + + if request.action == 'enable': + success = scheduler.enable_job(request.job_name) + message = f"Job {request.job_name} enabled" if success else f"Job {request.job_name} not found" + elif request.action == 'disable': + success = scheduler.disable_job(request.job_name) + message = f"Job {request.job_name} disabled" if success else f"Job {request.job_name} not found" + elif request.action == 'trigger': + success = scheduler.trigger_job(request.job_name) + message = f"Job {request.job_name} triggered" if success else f"Failed to trigger job {request.job_name}" + else: + raise HTTPException(status_code=400, detail=f"Invalid action: {request.action}") + + return { + "message": message, + "job_name": request.job_name, + "action": request.action, + "success": success, + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Error controlling job: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/scheduler/job/schedule") +async def update_job_schedule(request: UpdateScheduleRequest): + """Update job schedule""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + success = scheduler.update_job_schedule(request.job_name, request.schedule) + + if success: + # Get updated job info + job_status = scheduler.get_job_status(request.job_name) + return { + "message": f"Schedule updated for job {request.job_name}", + "job_name": request.job_name, + "new_schedule": request.schedule, + "next_run": job_status.get("next_run"), + "success": True, + "timestamp": datetime.utcnow().isoformat() + } + else: + raise HTTPException(status_code=400, detail=f"Failed to update schedule for job {request.job_name}") + + except Exception as e: + logger.error(f"Error updating job schedule: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/scheduler/job/{job_name}") +async def get_job_status(job_name: str): + """Get status of a specific job""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + status = scheduler.get_job_status(job_name) + + if "error" in status: + raise HTTPException(status_code=404, detail=status["error"]) + + return { + "job_status": status, + "timestamp": datetime.utcnow().isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting job status: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/scheduler/reload") +async def reload_scheduler_config(): + """Reload scheduler configuration from file""" + try: + from job_scheduler import get_scheduler + + scheduler = get_scheduler() + success = scheduler.reload_config() + + if success: + return { + "message": "Scheduler configuration reloaded successfully", + "success": True, + "timestamp": datetime.utcnow().isoformat() + } + else: + raise HTTPException(status_code=500, detail="Failed to reload configuration") + + except Exception as e: + logger.error(f"Error reloading scheduler config: {e}") + raise HTTPException(status_code=500, detail=str(e)) + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/backend/requirements.txt b/backend/requirements.txt index dc2d6e0..d51e99d 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -23,3 +23,5 @@ langchain-core>=0.2.20 openai>=1.32.0 anthropic==0.40.0 certifi==2024.2.2 +croniter==1.4.1 +pytz==2023.3 diff --git a/backend/scheduler_config.yaml b/backend/scheduler_config.yaml new file mode 100644 index 0000000..bc63c00 --- /dev/null +++ b/backend/scheduler_config.yaml @@ -0,0 +1,180 @@ +# CVE-SIGMA Auto Generator - Job Scheduler Configuration +# Cron-like scheduling for automated jobs +# +# Cron Format: minute hour day_of_month month day_of_week +# Special values: +# * = any value +# */N = every N units +# N-M = range from N to M +# N,M,O = specific values N, M, and O +# +# Examples: +# "0 */6 * * *" = Every 6 hours +# "0 2 * * *" = Daily at 2 AM +# "0 2 * * 1" = Weekly on Monday at 2 AM +# "0 0 1 * *" = Monthly on the 1st at midnight +# "*/30 * * * *" = Every 30 minutes + +scheduler: + enabled: true + timezone: "UTC" + max_concurrent_jobs: 3 + job_timeout_hours: 4 + retry_failed_jobs: true + max_retries: 2 + +jobs: + # NVD Incremental Updates - Fetch new CVEs regularly + nvd_incremental_update: + enabled: true + schedule: "0 */6 * * *" # Every 6 hours + description: "Fetch new CVEs from NVD modified feeds" + job_type: "incremental_update" + parameters: + batch_size: 100 + skip_nvd: false + skip_nomi_sec: true + priority: "high" + timeout_minutes: 60 + retry_on_failure: true + + # CISA KEV Sync - Update known exploited vulnerabilities + cisa_kev_sync: + enabled: true + schedule: "0 3 * * *" # Daily at 3 AM + description: "Sync CISA Known Exploited Vulnerabilities" + job_type: "cisa_kev_sync" + parameters: + batch_size: 100 + priority: "high" + timeout_minutes: 30 + retry_on_failure: true + + # Nomi-sec PoC Sync - Update proof-of-concept data + nomi_sec_sync: + enabled: true + schedule: "0 4 * * 1" # Weekly on Monday at 4 AM + description: "Sync nomi-sec Proof-of-Concept data" + job_type: "nomi_sec_sync" + parameters: + batch_size: 50 + priority: "medium" + timeout_minutes: 120 + retry_on_failure: true + + # GitHub PoC Sync - Update GitHub proof-of-concept data + github_poc_sync: + enabled: true + schedule: "0 5 * * 1" # Weekly on Monday at 5 AM + description: "Sync GitHub Proof-of-Concept data" + job_type: "github_poc_sync" + parameters: + batch_size: 50 + priority: "medium" + timeout_minutes: 120 + retry_on_failure: true + + # ExploitDB Sync - Update exploit database + exploitdb_sync: + enabled: true + schedule: "0 6 * * 2" # Weekly on Tuesday at 6 AM + description: "Sync ExploitDB data" + job_type: "exploitdb_sync" + parameters: + batch_size: 30 + priority: "medium" + timeout_minutes: 90 + retry_on_failure: true + + # Reference Data Sync - Extract content from CVE references + reference_sync: + enabled: true + schedule: "0 2 * * 3" # Weekly on Wednesday at 2 AM + description: "Extract and analyze CVE reference content" + job_type: "reference_sync" + parameters: + batch_size: 30 + max_cves: 200 + force_resync: false + priority: "medium" + timeout_minutes: 180 + retry_on_failure: true + + # Rule Regeneration - Regenerate SIGMA rules with latest data + rule_regeneration: + enabled: true + schedule: "0 7 * * 4" # Weekly on Thursday at 7 AM + description: "Regenerate SIGMA rules with enhanced data" + job_type: "rule_regeneration" + parameters: + force: false + priority: "low" + timeout_minutes: 240 + retry_on_failure: false + + # Full Bulk Seed - Complete data refresh (monthly) + full_bulk_seed: + enabled: false # Disabled by default due to resource intensity + schedule: "0 1 1 * *" # Monthly on the 1st at 1 AM + description: "Complete bulk seed of all data sources" + job_type: "bulk_seed" + parameters: + start_year: 2020 + end_year: 2025 + batch_size: 100 + skip_nvd: false + skip_nomi_sec: false + priority: "low" + timeout_minutes: 1440 # 24 hours + retry_on_failure: false + + # Database Cleanup - Clean old job records and logs + database_cleanup: + enabled: true + schedule: "0 0 * * 0" # Weekly on Sunday at midnight + description: "Clean up old job records and temporary data" + job_type: "database_cleanup" + parameters: + days_to_keep: 30 + cleanup_failed_jobs: true + cleanup_logs: true + priority: "low" + timeout_minutes: 30 + retry_on_failure: false + +# Job execution policies +policies: + # Prevent overlapping jobs of the same type + prevent_overlap: true + + # Maximum job execution time before forced termination + max_execution_time_hours: 6 + + # Retry policy for failed jobs + retry_policy: + enabled: true + max_retries: 2 + retry_delay_minutes: 30 + exponential_backoff: true + + # Resource management + resource_limits: + max_memory_mb: 2048 + max_cpu_percent: 80 + + # Notification settings (future enhancement) + notifications: + enabled: false + on_success: false + on_failure: true + webhook_url: "" + email_recipients: [] + +# Logging configuration for scheduler +logging: + enabled: true + level: "INFO" + log_file: "/app/logs/scheduler.log" + max_log_size_mb: 100 + backup_count: 5 + log_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" \ No newline at end of file diff --git a/frontend/src/App.js b/frontend/src/App.js index d98ef26..4f24ccd 100644 --- a/frontend/src/App.js +++ b/frontend/src/App.js @@ -26,6 +26,8 @@ function App() { const [runningJobTypes, setRunningJobTypes] = useState(new Set()); const [llmStatus, setLlmStatus] = useState({}); const [exploitSyncDropdownOpen, setExploitSyncDropdownOpen] = useState(false); + const [schedulerStatus, setSchedulerStatus] = useState({}); + const [schedulerJobs, setSchedulerJobs] = useState({}); useEffect(() => { fetchData(); @@ -82,6 +84,16 @@ function App() { return isNomiSecSyncRunning() || isGitHubPocSyncRunning() || isExploitDBSyncRunning() || isCISAKEVSyncRunning(); }; + const fetchSchedulerData = async () => { + try { + const response = await axios.get('http://localhost:8000/api/scheduler/status'); + setSchedulerStatus(response.data.scheduler_status); + setSchedulerJobs(response.data.scheduler_status.jobs || {}); + } catch (error) { + console.error('Error fetching scheduler data:', error); + } + }; + const fetchData = async () => { try { setLoading(true); @@ -1171,6 +1183,269 @@ function App() { ); + const SchedulerManager = () => { + const [selectedJob, setSelectedJob] = useState(null); + const [newSchedule, setNewSchedule] = useState(''); + const [showScheduleEdit, setShowScheduleEdit] = useState(false); + + useEffect(() => { + fetchSchedulerData(); + const interval = setInterval(fetchSchedulerData, 30000); // Refresh every 30 seconds + return () => clearInterval(interval); + }, []); + + const controlScheduler = async (action) => { + try { + const response = await axios.post('http://localhost:8000/api/scheduler/control', { + action: action + }); + console.log('Scheduler control response:', response.data); + fetchSchedulerData(); + } catch (error) { + console.error('Error controlling scheduler:', error); + alert('Error controlling scheduler: ' + (error.response?.data?.detail || error.message)); + } + }; + + const controlJob = async (jobName, action) => { + try { + const response = await axios.post('http://localhost:8000/api/scheduler/job/control', { + job_name: jobName, + action: action + }); + console.log('Job control response:', response.data); + fetchSchedulerData(); + } catch (error) { + console.error('Error controlling job:', error); + alert('Error controlling job: ' + (error.response?.data?.detail || error.message)); + } + }; + + const updateJobSchedule = async (jobName, schedule) => { + try { + const response = await axios.post('http://localhost:8000/api/scheduler/job/schedule', { + job_name: jobName, + schedule: schedule + }); + console.log('Schedule update response:', response.data); + setShowScheduleEdit(false); + setNewSchedule(''); + setSelectedJob(null); + fetchSchedulerData(); + } catch (error) { + console.error('Error updating schedule:', error); + alert('Error updating schedule: ' + (error.response?.data?.detail || error.message)); + } + }; + + const formatNextRun = (dateString) => { + if (!dateString) return 'Not scheduled'; + const date = new Date(dateString); + return date.toLocaleString(); + }; + + const getStatusColor = (status) => { + switch (status) { + case true: + case 'enabled': + return 'text-green-600'; + case false: + case 'disabled': + return 'text-red-600'; + case 'running': + return 'text-blue-600'; + default: + return 'text-gray-600'; + } + }; + + return ( +
+ {/* Scheduler Status */} +
+
+

Job Scheduler Status

+
+
+
+
+

Scheduler Status

+

+ {schedulerStatus.scheduler_running ? 'Running' : 'Stopped'} +

+
+
+

Total Jobs

+

{schedulerStatus.total_jobs || 0}

+
+
+

Enabled Jobs

+

{schedulerStatus.enabled_jobs || 0}

+
+
+

Running Jobs

+

{schedulerStatus.running_jobs || 0}

+
+
+ +
+ + + +
+
+
+ + {/* Scheduled Jobs */} +
+
+

Scheduled Jobs

+
+
+ {Object.entries(schedulerJobs).map(([jobName, job]) => ( +
+
+
+
+

{jobName}

+ + {job.enabled ? 'Enabled' : 'Disabled'} + + {job.is_running && ( + + Running + + )} +
+

{job.description}

+
+
+ Schedule: {job.schedule} +
+
+ Next Run: {formatNextRun(job.next_run)} +
+
+ Run Count: {job.run_count} +
+
+ Failures: {job.failure_count} +
+
+
+
+ + + +
+
+
+ ))} +
+
+ + {/* Schedule Edit Modal */} + {showScheduleEdit && selectedJob && ( +
+
+

+ Edit Schedule for {selectedJob} +

+
+ + setNewSchedule(e.target.value)} + className="w-full px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + placeholder="0 */6 * * *" + /> +

+ Format: minute hour day_of_month month day_of_week +

+
+
+ + +
+
+
+ )} +
+ ); + }; + if (loading) { return (
@@ -1232,6 +1507,16 @@ function App() { > Bulk Jobs +
@@ -1244,6 +1529,7 @@ function App() { {activeTab === 'cves' && } {activeTab === 'rules' && } {activeTab === 'bulk-jobs' && } + {activeTab === 'scheduler' && }