auto_sigma_rule_generator/backend/bulk_seeder.py

434 lines
No EOL
17 KiB
Python

"""
Bulk Data Seeding Coordinator
Orchestrates the complete bulk seeding process using NVD JSON feeds and nomi-sec PoC data
"""
import asyncio
import logging
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from nvd_bulk_processor import NVDBulkProcessor
from nomi_sec_client import NomiSecClient
from exploitdb_client_local import ExploitDBLocalClient
from cisa_kev_client import CISAKEVClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BulkSeeder:
"""Coordinates bulk seeding operations"""
def __init__(self, db_session: Session):
self.db_session = db_session
self.nvd_processor = NVDBulkProcessor(db_session)
self.nomi_sec_client = NomiSecClient(db_session)
self.exploitdb_client = ExploitDBLocalClient(db_session)
self.cisa_kev_client = CISAKEVClient(db_session)
async def full_bulk_seed(self, start_year: int = 2002,
end_year: Optional[int] = None,
skip_nvd: bool = False,
skip_nomi_sec: bool = False,
skip_exploitdb: bool = False,
skip_cisa_kev: bool = False) -> dict:
"""
Perform complete bulk seeding operation
Args:
start_year: Starting year for NVD data (default: 2002)
end_year: Ending year for NVD data (default: current year)
skip_nvd: Skip NVD bulk processing (default: False)
skip_nomi_sec: Skip nomi-sec PoC synchronization (default: False)
skip_exploitdb: Skip ExploitDB synchronization (default: False)
skip_cisa_kev: Skip CISA KEV synchronization (default: False)
Returns:
Dictionary containing operation results
"""
if end_year is None:
end_year = datetime.now().year
results = {
'start_time': datetime.utcnow(),
'nvd_results': None,
'nomi_sec_results': None,
'exploitdb_results': None,
'cisa_kev_results': None,
'sigma_results': None,
'total_time': None,
'status': 'running'
}
logger.info(f"Starting full bulk seed operation ({start_year}-{end_year})")
try:
# Phase 1: NVD Bulk Processing
if not skip_nvd:
logger.info("Phase 1: Starting NVD bulk processing...")
nvd_results = await self.nvd_processor.bulk_seed_database(
start_year=start_year,
end_year=end_year
)
results['nvd_results'] = nvd_results
logger.info(f"Phase 1 complete: {nvd_results['total_processed']} CVEs processed")
else:
logger.info("Phase 1: Skipping NVD bulk processing")
# Phase 2: nomi-sec PoC Synchronization
if not skip_nomi_sec:
logger.info("Phase 2: Starting nomi-sec PoC synchronization...")
nomi_sec_results = await self.nomi_sec_client.bulk_sync_all_cves(
batch_size=50 # Smaller batches for API stability
)
results['nomi_sec_results'] = nomi_sec_results
logger.info(f"Phase 2 complete: {nomi_sec_results['total_pocs_found']} PoCs found")
else:
logger.info("Phase 2: Skipping nomi-sec PoC synchronization")
# Phase 3: ExploitDB Synchronization
if not skip_exploitdb:
logger.info("Phase 3: Starting ExploitDB synchronization...")
exploitdb_results = await self.exploitdb_client.bulk_sync_exploitdb(
batch_size=30 # Smaller batches for git API stability
)
results['exploitdb_results'] = exploitdb_results
logger.info(f"Phase 3 complete: {exploitdb_results['total_exploits_found']} exploits found")
else:
logger.info("Phase 3: Skipping ExploitDB synchronization")
# Phase 4: CISA KEV Synchronization
if not skip_cisa_kev:
logger.info("Phase 4: Starting CISA KEV synchronization...")
cisa_kev_results = await self.cisa_kev_client.bulk_sync_kev_data(
batch_size=100 # Can handle larger batches since data is already filtered
)
results['cisa_kev_results'] = cisa_kev_results
logger.info(f"Phase 4 complete: {cisa_kev_results['total_kev_found']} KEV entries found")
else:
logger.info("Phase 4: Skipping CISA KEV synchronization")
# Phase 5: Generate Enhanced SIGMA Rules
logger.info("Phase 5: Generating enhanced SIGMA rules...")
sigma_results = await self.generate_enhanced_sigma_rules()
results['sigma_results'] = sigma_results
logger.info(f"Phase 5 complete: {sigma_results['rules_generated']} rules generated")
results['status'] = 'completed'
results['end_time'] = datetime.utcnow()
results['total_time'] = (results['end_time'] - results['start_time']).total_seconds()
logger.info(f"Full bulk seed operation completed in {results['total_time']:.2f} seconds")
except Exception as e:
logger.error(f"Bulk seed operation failed: {e}")
results['status'] = 'failed'
results['error'] = str(e)
results['end_time'] = datetime.utcnow()
return results
async def incremental_update(self) -> dict:
"""
Perform incremental update operation
Returns:
Dictionary containing update results
"""
results = {
'start_time': datetime.utcnow(),
'nvd_update': None,
'nomi_sec_update': None,
'exploitdb_update': None,
'cisa_kev_update': None,
'status': 'running'
}
logger.info("Starting incremental update...")
try:
# Update NVD data using modified/recent feeds
logger.info("Updating NVD data...")
nvd_update = await self.nvd_processor.incremental_update()
results['nvd_update'] = nvd_update
# Update PoC data for newly added/modified CVEs
if nvd_update['total_processed'] > 0:
logger.info("Updating PoC data for modified CVEs...")
# Get recently modified CVEs and sync their PoCs
recent_cves = await self._get_recently_modified_cves()
nomi_sec_update = await self._sync_specific_cves(recent_cves)
results['nomi_sec_update'] = nomi_sec_update
# Update ExploitDB data for modified CVEs
logger.info("Updating ExploitDB data for modified CVEs...")
exploitdb_update = await self._sync_specific_cves_exploitdb(recent_cves)
results['exploitdb_update'] = exploitdb_update
# Update CISA KEV data for modified CVEs
logger.info("Updating CISA KEV data for modified CVEs...")
cisa_kev_update = await self._sync_specific_cves_cisa_kev(recent_cves)
results['cisa_kev_update'] = cisa_kev_update
results['status'] = 'completed'
results['end_time'] = datetime.utcnow()
except Exception as e:
logger.error(f"Incremental update failed: {e}")
results['status'] = 'failed'
results['error'] = str(e)
results['end_time'] = datetime.utcnow()
return results
async def generate_enhanced_sigma_rules(self) -> dict:
"""Generate enhanced SIGMA rules using nomi-sec PoC data"""
from main import CVE, SigmaRule
# Import the enhanced rule generator
from enhanced_sigma_generator import EnhancedSigmaGenerator
generator = EnhancedSigmaGenerator(self.db_session)
# Get all CVEs that have PoC data but no enhanced rules
cves_with_pocs = self.db_session.query(CVE).filter(
CVE.poc_count > 0
).all()
rules_generated = 0
rules_updated = 0
for cve in cves_with_pocs:
try:
# Check if we need to generate/update the rule
existing_rule = self.db_session.query(SigmaRule).filter(
SigmaRule.cve_id == cve.cve_id
).first()
if existing_rule and existing_rule.poc_source == 'nomi_sec':
# Rule already exists and is up to date
continue
# Generate enhanced rule
rule_result = await generator.generate_enhanced_rule(cve)
if rule_result['success']:
if existing_rule:
rules_updated += 1
else:
rules_generated += 1
except Exception as e:
logger.error(f"Error generating rule for {cve.cve_id}: {e}")
continue
self.db_session.commit()
return {
'rules_generated': rules_generated,
'rules_updated': rules_updated,
'total_processed': len(cves_with_pocs)
}
async def _get_recently_modified_cves(self, hours: int = 24) -> list:
"""Get CVEs modified within the last N hours"""
from main import CVE
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
recent_cves = self.db_session.query(CVE).filter(
CVE.updated_at >= cutoff_time
).all()
return [cve.cve_id for cve in recent_cves]
async def _sync_specific_cves(self, cve_ids: list) -> dict:
"""Sync PoC data for specific CVEs"""
total_processed = 0
total_pocs_found = 0
for cve_id in cve_ids:
try:
result = await self.nomi_sec_client.sync_cve_pocs(cve_id)
total_processed += 1
total_pocs_found += result.get('pocs_found', 0)
# Small delay to avoid overwhelming the API
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error syncing PoCs for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_pocs_found': total_pocs_found
}
async def _sync_specific_cves_exploitdb(self, cve_ids: list) -> dict:
"""Sync ExploitDB data for specific CVEs"""
total_processed = 0
total_exploits_found = 0
for cve_id in cve_ids:
try:
result = await self.exploitdb_client.sync_cve_exploits(cve_id)
total_processed += 1
total_exploits_found += result.get('exploits_found', 0)
# Small delay to avoid overwhelming the API
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error syncing ExploitDB for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_exploits_found': total_exploits_found
}
async def _sync_specific_cves_cisa_kev(self, cve_ids: list) -> dict:
"""Sync CISA KEV data for specific CVEs"""
total_processed = 0
total_kev_found = 0
for cve_id in cve_ids:
try:
result = await self.cisa_kev_client.sync_cve_kev_data(cve_id)
total_processed += 1
if result.get('kev_found', False):
total_kev_found += 1
# Small delay to be respectful to CISA
await asyncio.sleep(0.2)
except Exception as e:
logger.error(f"Error syncing CISA KEV for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_kev_found': total_kev_found
}
async def get_seeding_status(self) -> dict:
"""Get current seeding status and statistics"""
from main import CVE, SigmaRule, BulkProcessingJob
# Get database statistics
total_cves = self.db_session.query(CVE).count()
bulk_processed_cves = self.db_session.query(CVE).filter(
CVE.bulk_processed == True
).count()
cves_with_pocs = self.db_session.query(CVE).filter(
CVE.poc_count > 0
).count()
total_rules = self.db_session.query(SigmaRule).count()
nomi_sec_rules = self.db_session.query(SigmaRule).filter(
SigmaRule.poc_source == 'nomi_sec'
).count()
# Get recent job status
recent_jobs = self.db_session.query(BulkProcessingJob).order_by(
BulkProcessingJob.created_at.desc()
).limit(5).all()
job_status = []
for job in recent_jobs:
job_status.append({
'id': str(job.id),
'job_type': job.job_type,
'status': job.status,
'created_at': job.created_at,
'completed_at': job.completed_at,
'processed_items': job.processed_items,
'total_items': job.total_items,
'failed_items': job.failed_items
})
return {
'database_stats': {
'total_cves': total_cves,
'bulk_processed_cves': bulk_processed_cves,
'cves_with_pocs': cves_with_pocs,
'total_rules': total_rules,
'nomi_sec_rules': nomi_sec_rules,
'poc_coverage': (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0,
'nomi_sec_coverage': (nomi_sec_rules / total_rules * 100) if total_rules > 0 else 0
},
'recent_jobs': job_status,
'nvd_data_status': await self._get_nvd_data_status(),
'nomi_sec_status': await self.nomi_sec_client.get_sync_status(),
'exploitdb_status': await self.exploitdb_client.get_exploitdb_sync_status(),
'cisa_kev_status': await self.cisa_kev_client.get_kev_sync_status()
}
async def _get_nvd_data_status(self) -> dict:
"""Get NVD data status"""
from main import CVE
# Get year distribution
year_counts = {}
cves = self.db_session.query(CVE).all()
for cve in cves:
if cve.published_date:
year = cve.published_date.year
year_counts[year] = year_counts.get(year, 0) + 1
# Get source distribution
source_counts = {}
for cve in cves:
source = cve.data_source or 'unknown'
source_counts[source] = source_counts.get(source, 0) + 1
return {
'year_distribution': year_counts,
'source_distribution': source_counts,
'total_cves': len(cves),
'date_range': {
'earliest': min(cve.published_date for cve in cves if cve.published_date),
'latest': max(cve.published_date for cve in cves if cve.published_date)
} if cves else None
}
# Standalone script functionality
async def main():
"""Main function for standalone execution"""
from main import SessionLocal, engine, Base
# Create tables
Base.metadata.create_all(bind=engine)
# Create database session
db_session = SessionLocal()
try:
# Create bulk seeder
seeder = BulkSeeder(db_session)
# Get current status
status = await seeder.get_seeding_status()
print(f"Current Status: {status['database_stats']['total_cves']} CVEs in database")
# Perform full bulk seed if database is empty
if status['database_stats']['total_cves'] == 0:
print("Database is empty. Starting full bulk seed...")
results = await seeder.full_bulk_seed(start_year=2020) # Start from 2020 for faster testing
print(f"Bulk seed completed: {results}")
else:
print("Database contains data. Running incremental update...")
results = await seeder.incremental_update()
print(f"Incremental update completed: {results}")
finally:
db_session.close()
if __name__ == "__main__":
asyncio.run(main())