""" Job Executors for Scheduled Tasks Integrates existing job functions with the scheduler """ import asyncio import logging from typing import Dict, Any from sqlalchemy.orm import Session from datetime import datetime, timedelta logger = logging.getLogger(__name__) class JobExecutors: """Collection of job executor functions for the scheduler""" @staticmethod async def incremental_update(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute NVD incremental update job""" try: from bulk_seeder import BulkSeeder seeder = BulkSeeder(db_session) # Extract parameters batch_size = parameters.get('batch_size', 100) skip_nvd = parameters.get('skip_nvd', False) skip_nomi_sec = parameters.get('skip_nomi_sec', True) logger.info(f"Starting incremental update - batch_size: {batch_size}") result = await seeder.incremental_update() return { 'status': 'completed', 'result': result, 'job_type': 'incremental_update', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Incremental update job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'incremental_update', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def cisa_kev_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute CISA KEV sync job""" try: from cisa_kev_client import CISAKEVClient client = CISAKEVClient(db_session) # Extract parameters batch_size = parameters.get('batch_size', 100) logger.info(f"Starting CISA KEV sync - batch_size: {batch_size}") result = await client.bulk_sync_kev_data(batch_size=batch_size) return { 'status': 'completed', 'result': result, 'job_type': 'cisa_kev_sync', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"CISA KEV sync job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'cisa_kev_sync', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def nomi_sec_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute optimized nomi-sec PoC sync job""" try: from nomi_sec_client import NomiSecClient client = NomiSecClient(db_session) # Extract parameters with optimized defaults batch_size = parameters.get('batch_size', 100) max_cves = parameters.get('max_cves', 1000) force_resync = parameters.get('force_resync', False) logger.info(f"Starting optimized nomi-sec sync - batch_size: {batch_size}, max_cves: {max_cves}") result = await client.bulk_sync_poc_data( batch_size=batch_size, max_cves=max_cves, force_resync=force_resync ) return { 'status': 'completed', 'result': result, 'job_type': 'nomi_sec_sync', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Nomi-sec sync job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'nomi_sec_sync', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def github_poc_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute GitHub PoC sync job""" try: from mcdevitt_poc_client import GitHubPoCClient client = GitHubPoCClient(db_session) # Extract parameters batch_size = parameters.get('batch_size', 50) logger.info(f"Starting GitHub PoC sync - batch_size: {batch_size}") result = await client.bulk_sync_poc_data(batch_size=batch_size) return { 'status': 'completed', 'result': result, 'job_type': 'github_poc_sync', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"GitHub PoC sync job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'github_poc_sync', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def exploitdb_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute ExploitDB sync job""" try: from exploitdb_client_local import ExploitDBLocalClient client = ExploitDBLocalClient(db_session) # Extract parameters batch_size = parameters.get('batch_size', 30) logger.info(f"Starting ExploitDB sync - batch_size: {batch_size}") result = await client.bulk_sync_exploitdb_data(batch_size=batch_size) return { 'status': 'completed', 'result': result, 'job_type': 'exploitdb_sync', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"ExploitDB sync job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'exploitdb_sync', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def reference_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute reference data sync job""" try: from reference_client import ReferenceClient client = ReferenceClient(db_session) # Extract parameters batch_size = parameters.get('batch_size', 30) max_cves = parameters.get('max_cves', 200) force_resync = parameters.get('force_resync', False) logger.info(f"Starting reference sync - batch_size: {batch_size}, max_cves: {max_cves}") result = await client.bulk_sync_references( batch_size=batch_size, max_cves=max_cves, force_resync=force_resync ) return { 'status': 'completed', 'result': result, 'job_type': 'reference_sync', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Reference sync job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'reference_sync', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def rule_regeneration(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute SIGMA rule regeneration job""" try: from enhanced_sigma_generator import EnhancedSigmaGenerator generator = EnhancedSigmaGenerator(db_session) # Extract parameters force = parameters.get('force', False) logger.info(f"Starting rule regeneration - force: {force}") # Get CVEs that need rule regeneration from main import CVE if force: # Regenerate all rules cves = db_session.query(CVE).all() else: # Only regenerate for CVEs with new data cves = db_session.query(CVE).filter( CVE.updated_at > CVE.created_at ).all() total_processed = 0 total_generated = 0 for cve in cves: try: # Generate enhanced rule rule_content = await generator.generate_enhanced_sigma_rule(cve.cve_id) if rule_content: total_generated += 1 total_processed += 1 # Small delay to prevent overwhelming the system await asyncio.sleep(0.1) except Exception as e: logger.error(f"Error regenerating rule for {cve.cve_id}: {e}") result = { 'total_processed': total_processed, 'total_generated': total_generated, 'generation_rate': total_generated / total_processed if total_processed > 0 else 0 } return { 'status': 'completed', 'result': result, 'job_type': 'rule_regeneration', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Rule regeneration job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'rule_regeneration', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def bulk_seed(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute full bulk seed job""" try: from bulk_seeder import BulkSeeder seeder = BulkSeeder(db_session) # Extract parameters start_year = parameters.get('start_year', 2020) end_year = parameters.get('end_year', 2025) batch_size = parameters.get('batch_size', 100) skip_nvd = parameters.get('skip_nvd', False) skip_nomi_sec = parameters.get('skip_nomi_sec', False) logger.info(f"Starting full bulk seed - years: {start_year}-{end_year}") result = await seeder.full_bulk_seed( start_year=start_year, end_year=end_year ) return { 'status': 'completed', 'result': result, 'job_type': 'bulk_seed', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Bulk seed job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'bulk_seed', 'completed_at': datetime.utcnow().isoformat() } @staticmethod async def database_cleanup(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute database cleanup job""" try: from main import BulkProcessingJob # Extract parameters days_to_keep = parameters.get('days_to_keep', 30) cleanup_failed_jobs = parameters.get('cleanup_failed_jobs', True) cleanup_logs = parameters.get('cleanup_logs', True) logger.info(f"Starting database cleanup - keep {days_to_keep} days") cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep) deleted_jobs = 0 # Clean up old bulk processing jobs if cleanup_failed_jobs: # Delete failed jobs older than cutoff deleted = db_session.query(BulkProcessingJob).filter( BulkProcessingJob.status == 'failed', BulkProcessingJob.created_at < cutoff_date ).delete() deleted_jobs += deleted # Delete completed jobs older than cutoff (keep some recent ones) very_old_cutoff = datetime.utcnow() - timedelta(days=days_to_keep * 2) deleted = db_session.query(BulkProcessingJob).filter( BulkProcessingJob.status == 'completed', BulkProcessingJob.created_at < very_old_cutoff ).delete() deleted_jobs += deleted db_session.commit() result = { 'deleted_jobs': deleted_jobs, 'cutoff_date': cutoff_date.isoformat(), 'cleanup_type': 'bulk_processing_jobs' } return { 'status': 'completed', 'result': result, 'job_type': 'database_cleanup', 'completed_at': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Database cleanup job failed: {e}") return { 'status': 'failed', 'error': str(e), 'job_type': 'database_cleanup', 'completed_at': datetime.utcnow().isoformat() } def register_all_executors(scheduler): """Register all job executors with the scheduler""" executors = JobExecutors() scheduler.register_job_executor('incremental_update', executors.incremental_update) scheduler.register_job_executor('cisa_kev_sync', executors.cisa_kev_sync) scheduler.register_job_executor('nomi_sec_sync', executors.nomi_sec_sync) scheduler.register_job_executor('github_poc_sync', executors.github_poc_sync) scheduler.register_job_executor('exploitdb_sync', executors.exploitdb_sync) scheduler.register_job_executor('reference_sync', executors.reference_sync) scheduler.register_job_executor('rule_regeneration', executors.rule_regeneration) scheduler.register_job_executor('bulk_seed', executors.bulk_seed) scheduler.register_job_executor('database_cleanup', executors.database_cleanup) logger.info("All job executors registered successfully")