auto_sigma_rule_generator/backend/job_executors.py
bpmcdevitt a6fb367ed4 refactor: modularize backend architecture for improved maintainability
- Extract database models from monolithic main.py (2,373 lines) into organized modules
- Implement service layer pattern with dedicated business logic classes
- Split API endpoints into modular FastAPI routers by functionality
- Add centralized configuration management with environment variable handling
- Create proper separation of concerns across data, service, and presentation layers

**Architecture Changes:**
- models/: SQLAlchemy database models (CVE, SigmaRule, RuleTemplate, BulkProcessingJob)
- config/: Centralized settings and database configuration
- services/: Business logic (CVEService, SigmaRuleService, GitHubExploitAnalyzer)
- routers/: Modular API endpoints (cves, sigma_rules, bulk_operations, llm_operations)
- schemas/: Pydantic request/response models

**Key Improvements:**
- 95% reduction in main.py size (2,373 → 120 lines)
- Updated 15+ backend files with proper import structure
- Eliminated circular dependencies and tight coupling
- Enhanced testability with isolated service components
- Better code organization for team collaboration

**Backward Compatibility:**
- All API endpoints maintain same URLs and behavior
- Zero breaking changes to existing functionality
- Database schema unchanged
- Environment variables preserved

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-14 17:51:23 -05:00

390 lines
No EOL
14 KiB
Python

"""
Job Executors for Scheduled Tasks
Integrates existing job functions with the scheduler
"""
import asyncio
import logging
from typing import Dict, Any
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class JobExecutors:
"""Collection of job executor functions for the scheduler"""
@staticmethod
async def incremental_update(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute NVD incremental update job"""
try:
from bulk_seeder import BulkSeeder
seeder = BulkSeeder(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 100)
skip_nvd = parameters.get('skip_nvd', False)
skip_nomi_sec = parameters.get('skip_nomi_sec', True)
logger.info(f"Starting incremental update - batch_size: {batch_size}")
result = await seeder.incremental_update()
return {
'status': 'completed',
'result': result,
'job_type': 'incremental_update',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Incremental update job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'incremental_update',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def cisa_kev_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute CISA KEV sync job"""
try:
from cisa_kev_client import CISAKEVClient
client = CISAKEVClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 100)
logger.info(f"Starting CISA KEV sync - batch_size: {batch_size}")
result = await client.bulk_sync_kev_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'cisa_kev_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"CISA KEV sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'cisa_kev_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def nomi_sec_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute optimized nomi-sec PoC sync job"""
try:
from nomi_sec_client import NomiSecClient
client = NomiSecClient(db_session)
# Extract parameters with optimized defaults
batch_size = parameters.get('batch_size', 100)
max_cves = parameters.get('max_cves', 1000)
force_resync = parameters.get('force_resync', False)
logger.info(f"Starting optimized nomi-sec sync - batch_size: {batch_size}, max_cves: {max_cves}")
result = await client.bulk_sync_poc_data(
batch_size=batch_size,
max_cves=max_cves,
force_resync=force_resync
)
return {
'status': 'completed',
'result': result,
'job_type': 'nomi_sec_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Nomi-sec sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'nomi_sec_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def github_poc_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute GitHub PoC sync job"""
try:
from mcdevitt_poc_client import GitHubPoCClient
client = GitHubPoCClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 50)
logger.info(f"Starting GitHub PoC sync - batch_size: {batch_size}")
result = await client.bulk_sync_poc_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'github_poc_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"GitHub PoC sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'github_poc_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def exploitdb_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute ExploitDB sync job"""
try:
from exploitdb_client_local import ExploitDBLocalClient
client = ExploitDBLocalClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 30)
logger.info(f"Starting ExploitDB sync - batch_size: {batch_size}")
result = await client.bulk_sync_exploitdb_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'exploitdb_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"ExploitDB sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'exploitdb_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def reference_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute reference data sync job"""
try:
from reference_client import ReferenceClient
client = ReferenceClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 30)
max_cves = parameters.get('max_cves', 200)
force_resync = parameters.get('force_resync', False)
logger.info(f"Starting reference sync - batch_size: {batch_size}, max_cves: {max_cves}")
result = await client.bulk_sync_references(
batch_size=batch_size,
max_cves=max_cves,
force_resync=force_resync
)
return {
'status': 'completed',
'result': result,
'job_type': 'reference_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Reference sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'reference_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def rule_regeneration(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute SIGMA rule regeneration job"""
try:
from enhanced_sigma_generator import EnhancedSigmaGenerator
generator = EnhancedSigmaGenerator(db_session)
# Extract parameters
force = parameters.get('force', False)
logger.info(f"Starting rule regeneration - force: {force}")
# Get CVEs that need rule regeneration
from models import CVE
if force:
# Regenerate all rules
cves = db_session.query(CVE).all()
else:
# Only regenerate for CVEs with new data
cves = db_session.query(CVE).filter(
CVE.updated_at > CVE.created_at
).all()
total_processed = 0
total_generated = 0
for cve in cves:
try:
# Generate enhanced rule
rule_content = await generator.generate_enhanced_sigma_rule(cve.cve_id)
if rule_content:
total_generated += 1
total_processed += 1
# Small delay to prevent overwhelming the system
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error regenerating rule for {cve.cve_id}: {e}")
result = {
'total_processed': total_processed,
'total_generated': total_generated,
'generation_rate': total_generated / total_processed if total_processed > 0 else 0
}
return {
'status': 'completed',
'result': result,
'job_type': 'rule_regeneration',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Rule regeneration job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'rule_regeneration',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def bulk_seed(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute full bulk seed job"""
try:
from bulk_seeder import BulkSeeder
seeder = BulkSeeder(db_session)
# Extract parameters
start_year = parameters.get('start_year', 2020)
end_year = parameters.get('end_year', 2025)
batch_size = parameters.get('batch_size', 100)
skip_nvd = parameters.get('skip_nvd', False)
skip_nomi_sec = parameters.get('skip_nomi_sec', False)
logger.info(f"Starting full bulk seed - years: {start_year}-{end_year}")
result = await seeder.full_bulk_seed(
start_year=start_year,
end_year=end_year
)
return {
'status': 'completed',
'result': result,
'job_type': 'bulk_seed',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Bulk seed job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'bulk_seed',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def database_cleanup(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute database cleanup job"""
try:
from models import BulkProcessingJob
# Extract parameters
days_to_keep = parameters.get('days_to_keep', 30)
cleanup_failed_jobs = parameters.get('cleanup_failed_jobs', True)
cleanup_logs = parameters.get('cleanup_logs', True)
logger.info(f"Starting database cleanup - keep {days_to_keep} days")
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
deleted_jobs = 0
# Clean up old bulk processing jobs
if cleanup_failed_jobs:
# Delete failed jobs older than cutoff
deleted = db_session.query(BulkProcessingJob).filter(
BulkProcessingJob.status == 'failed',
BulkProcessingJob.created_at < cutoff_date
).delete()
deleted_jobs += deleted
# Delete completed jobs older than cutoff (keep some recent ones)
very_old_cutoff = datetime.utcnow() - timedelta(days=days_to_keep * 2)
deleted = db_session.query(BulkProcessingJob).filter(
BulkProcessingJob.status == 'completed',
BulkProcessingJob.created_at < very_old_cutoff
).delete()
deleted_jobs += deleted
db_session.commit()
result = {
'deleted_jobs': deleted_jobs,
'cutoff_date': cutoff_date.isoformat(),
'cleanup_type': 'bulk_processing_jobs'
}
return {
'status': 'completed',
'result': result,
'job_type': 'database_cleanup',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Database cleanup job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'database_cleanup',
'completed_at': datetime.utcnow().isoformat()
}
def register_all_executors(scheduler):
"""Register all job executors with the scheduler"""
executors = JobExecutors()
scheduler.register_job_executor('incremental_update', executors.incremental_update)
scheduler.register_job_executor('cisa_kev_sync', executors.cisa_kev_sync)
scheduler.register_job_executor('nomi_sec_sync', executors.nomi_sec_sync)
scheduler.register_job_executor('github_poc_sync', executors.github_poc_sync)
scheduler.register_job_executor('exploitdb_sync', executors.exploitdb_sync)
scheduler.register_job_executor('reference_sync', executors.reference_sync)
scheduler.register_job_executor('rule_regeneration', executors.rule_regeneration)
scheduler.register_job_executor('bulk_seed', executors.bulk_seed)
scheduler.register_job_executor('database_cleanup', executors.database_cleanup)
logger.info("All job executors registered successfully")