- Extract database models from monolithic main.py (2,373 lines) into organized modules - Implement service layer pattern with dedicated business logic classes - Split API endpoints into modular FastAPI routers by functionality - Add centralized configuration management with environment variable handling - Create proper separation of concerns across data, service, and presentation layers **Architecture Changes:** - models/: SQLAlchemy database models (CVE, SigmaRule, RuleTemplate, BulkProcessingJob) - config/: Centralized settings and database configuration - services/: Business logic (CVEService, SigmaRuleService, GitHubExploitAnalyzer) - routers/: Modular API endpoints (cves, sigma_rules, bulk_operations, llm_operations) - schemas/: Pydantic request/response models **Key Improvements:** - 95% reduction in main.py size (2,373 → 120 lines) - Updated 15+ backend files with proper import structure - Eliminated circular dependencies and tight coupling - Enhanced testability with isolated service components - Better code organization for team collaboration **Backward Compatibility:** - All API endpoints maintain same URLs and behavior - Zero breaking changes to existing functionality - Database schema unchanged - Environment variables preserved 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
390 lines
No EOL
14 KiB
Python
390 lines
No EOL
14 KiB
Python
"""
|
|
Job Executors for Scheduled Tasks
|
|
Integrates existing job functions with the scheduler
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class JobExecutors:
|
|
"""Collection of job executor functions for the scheduler"""
|
|
|
|
@staticmethod
|
|
async def incremental_update(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute NVD incremental update job"""
|
|
try:
|
|
from bulk_seeder import BulkSeeder
|
|
|
|
seeder = BulkSeeder(db_session)
|
|
|
|
# Extract parameters
|
|
batch_size = parameters.get('batch_size', 100)
|
|
skip_nvd = parameters.get('skip_nvd', False)
|
|
skip_nomi_sec = parameters.get('skip_nomi_sec', True)
|
|
|
|
logger.info(f"Starting incremental update - batch_size: {batch_size}")
|
|
|
|
result = await seeder.incremental_update()
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'incremental_update',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Incremental update job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'incremental_update',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def cisa_kev_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute CISA KEV sync job"""
|
|
try:
|
|
from cisa_kev_client import CISAKEVClient
|
|
|
|
client = CISAKEVClient(db_session)
|
|
|
|
# Extract parameters
|
|
batch_size = parameters.get('batch_size', 100)
|
|
|
|
logger.info(f"Starting CISA KEV sync - batch_size: {batch_size}")
|
|
|
|
result = await client.bulk_sync_kev_data(batch_size=batch_size)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'cisa_kev_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"CISA KEV sync job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'cisa_kev_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def nomi_sec_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute optimized nomi-sec PoC sync job"""
|
|
try:
|
|
from nomi_sec_client import NomiSecClient
|
|
|
|
client = NomiSecClient(db_session)
|
|
|
|
# Extract parameters with optimized defaults
|
|
batch_size = parameters.get('batch_size', 100)
|
|
max_cves = parameters.get('max_cves', 1000)
|
|
force_resync = parameters.get('force_resync', False)
|
|
|
|
logger.info(f"Starting optimized nomi-sec sync - batch_size: {batch_size}, max_cves: {max_cves}")
|
|
|
|
result = await client.bulk_sync_poc_data(
|
|
batch_size=batch_size,
|
|
max_cves=max_cves,
|
|
force_resync=force_resync
|
|
)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'nomi_sec_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Nomi-sec sync job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'nomi_sec_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def github_poc_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute GitHub PoC sync job"""
|
|
try:
|
|
from mcdevitt_poc_client import GitHubPoCClient
|
|
|
|
client = GitHubPoCClient(db_session)
|
|
|
|
# Extract parameters
|
|
batch_size = parameters.get('batch_size', 50)
|
|
|
|
logger.info(f"Starting GitHub PoC sync - batch_size: {batch_size}")
|
|
|
|
result = await client.bulk_sync_poc_data(batch_size=batch_size)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'github_poc_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"GitHub PoC sync job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'github_poc_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def exploitdb_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute ExploitDB sync job"""
|
|
try:
|
|
from exploitdb_client_local import ExploitDBLocalClient
|
|
|
|
client = ExploitDBLocalClient(db_session)
|
|
|
|
# Extract parameters
|
|
batch_size = parameters.get('batch_size', 30)
|
|
|
|
logger.info(f"Starting ExploitDB sync - batch_size: {batch_size}")
|
|
|
|
result = await client.bulk_sync_exploitdb_data(batch_size=batch_size)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'exploitdb_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"ExploitDB sync job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'exploitdb_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def reference_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute reference data sync job"""
|
|
try:
|
|
from reference_client import ReferenceClient
|
|
|
|
client = ReferenceClient(db_session)
|
|
|
|
# Extract parameters
|
|
batch_size = parameters.get('batch_size', 30)
|
|
max_cves = parameters.get('max_cves', 200)
|
|
force_resync = parameters.get('force_resync', False)
|
|
|
|
logger.info(f"Starting reference sync - batch_size: {batch_size}, max_cves: {max_cves}")
|
|
|
|
result = await client.bulk_sync_references(
|
|
batch_size=batch_size,
|
|
max_cves=max_cves,
|
|
force_resync=force_resync
|
|
)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'reference_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Reference sync job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'reference_sync',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def rule_regeneration(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute SIGMA rule regeneration job"""
|
|
try:
|
|
from enhanced_sigma_generator import EnhancedSigmaGenerator
|
|
|
|
generator = EnhancedSigmaGenerator(db_session)
|
|
|
|
# Extract parameters
|
|
force = parameters.get('force', False)
|
|
|
|
logger.info(f"Starting rule regeneration - force: {force}")
|
|
|
|
# Get CVEs that need rule regeneration
|
|
from models import CVE
|
|
if force:
|
|
# Regenerate all rules
|
|
cves = db_session.query(CVE).all()
|
|
else:
|
|
# Only regenerate for CVEs with new data
|
|
cves = db_session.query(CVE).filter(
|
|
CVE.updated_at > CVE.created_at
|
|
).all()
|
|
|
|
total_processed = 0
|
|
total_generated = 0
|
|
|
|
for cve in cves:
|
|
try:
|
|
# Generate enhanced rule
|
|
rule_content = await generator.generate_enhanced_sigma_rule(cve.cve_id)
|
|
if rule_content:
|
|
total_generated += 1
|
|
total_processed += 1
|
|
|
|
# Small delay to prevent overwhelming the system
|
|
await asyncio.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error regenerating rule for {cve.cve_id}: {e}")
|
|
|
|
result = {
|
|
'total_processed': total_processed,
|
|
'total_generated': total_generated,
|
|
'generation_rate': total_generated / total_processed if total_processed > 0 else 0
|
|
}
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'rule_regeneration',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Rule regeneration job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'rule_regeneration',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def bulk_seed(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute full bulk seed job"""
|
|
try:
|
|
from bulk_seeder import BulkSeeder
|
|
|
|
seeder = BulkSeeder(db_session)
|
|
|
|
# Extract parameters
|
|
start_year = parameters.get('start_year', 2020)
|
|
end_year = parameters.get('end_year', 2025)
|
|
batch_size = parameters.get('batch_size', 100)
|
|
skip_nvd = parameters.get('skip_nvd', False)
|
|
skip_nomi_sec = parameters.get('skip_nomi_sec', False)
|
|
|
|
logger.info(f"Starting full bulk seed - years: {start_year}-{end_year}")
|
|
|
|
result = await seeder.full_bulk_seed(
|
|
start_year=start_year,
|
|
end_year=end_year
|
|
)
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'bulk_seed',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Bulk seed job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'bulk_seed',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
async def database_cleanup(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute database cleanup job"""
|
|
try:
|
|
from models import BulkProcessingJob
|
|
|
|
# Extract parameters
|
|
days_to_keep = parameters.get('days_to_keep', 30)
|
|
cleanup_failed_jobs = parameters.get('cleanup_failed_jobs', True)
|
|
cleanup_logs = parameters.get('cleanup_logs', True)
|
|
|
|
logger.info(f"Starting database cleanup - keep {days_to_keep} days")
|
|
|
|
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
|
|
|
|
deleted_jobs = 0
|
|
|
|
# Clean up old bulk processing jobs
|
|
if cleanup_failed_jobs:
|
|
# Delete failed jobs older than cutoff
|
|
deleted = db_session.query(BulkProcessingJob).filter(
|
|
BulkProcessingJob.status == 'failed',
|
|
BulkProcessingJob.created_at < cutoff_date
|
|
).delete()
|
|
deleted_jobs += deleted
|
|
|
|
# Delete completed jobs older than cutoff (keep some recent ones)
|
|
very_old_cutoff = datetime.utcnow() - timedelta(days=days_to_keep * 2)
|
|
deleted = db_session.query(BulkProcessingJob).filter(
|
|
BulkProcessingJob.status == 'completed',
|
|
BulkProcessingJob.created_at < very_old_cutoff
|
|
).delete()
|
|
deleted_jobs += deleted
|
|
|
|
db_session.commit()
|
|
|
|
result = {
|
|
'deleted_jobs': deleted_jobs,
|
|
'cutoff_date': cutoff_date.isoformat(),
|
|
'cleanup_type': 'bulk_processing_jobs'
|
|
}
|
|
|
|
return {
|
|
'status': 'completed',
|
|
'result': result,
|
|
'job_type': 'database_cleanup',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Database cleanup job failed: {e}")
|
|
return {
|
|
'status': 'failed',
|
|
'error': str(e),
|
|
'job_type': 'database_cleanup',
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
def register_all_executors(scheduler):
|
|
"""Register all job executors with the scheduler"""
|
|
executors = JobExecutors()
|
|
|
|
scheduler.register_job_executor('incremental_update', executors.incremental_update)
|
|
scheduler.register_job_executor('cisa_kev_sync', executors.cisa_kev_sync)
|
|
scheduler.register_job_executor('nomi_sec_sync', executors.nomi_sec_sync)
|
|
scheduler.register_job_executor('github_poc_sync', executors.github_poc_sync)
|
|
scheduler.register_job_executor('exploitdb_sync', executors.exploitdb_sync)
|
|
scheduler.register_job_executor('reference_sync', executors.reference_sync)
|
|
scheduler.register_job_executor('rule_regeneration', executors.rule_regeneration)
|
|
scheduler.register_job_executor('bulk_seed', executors.bulk_seed)
|
|
scheduler.register_job_executor('database_cleanup', executors.database_cleanup)
|
|
|
|
logger.info("All job executors registered successfully") |