434 lines
No EOL
17 KiB
Python
434 lines
No EOL
17 KiB
Python
"""
|
|
Bulk Data Seeding Coordinator
|
|
Orchestrates the complete bulk seeding process using NVD JSON feeds and nomi-sec PoC data
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from sqlalchemy.orm import Session
|
|
from nvd_bulk_processor import NVDBulkProcessor
|
|
from nomi_sec_client import NomiSecClient
|
|
from exploitdb_client_local import ExploitDBLocalClient
|
|
from cisa_kev_client import CISAKEVClient
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BulkSeeder:
|
|
"""Coordinates bulk seeding operations"""
|
|
|
|
def __init__(self, db_session: Session):
|
|
self.db_session = db_session
|
|
self.nvd_processor = NVDBulkProcessor(db_session)
|
|
self.nomi_sec_client = NomiSecClient(db_session)
|
|
self.exploitdb_client = ExploitDBLocalClient(db_session)
|
|
self.cisa_kev_client = CISAKEVClient(db_session)
|
|
|
|
async def full_bulk_seed(self, start_year: int = 2002,
|
|
end_year: Optional[int] = None,
|
|
skip_nvd: bool = False,
|
|
skip_nomi_sec: bool = False,
|
|
skip_exploitdb: bool = False,
|
|
skip_cisa_kev: bool = False) -> dict:
|
|
"""
|
|
Perform complete bulk seeding operation
|
|
|
|
Args:
|
|
start_year: Starting year for NVD data (default: 2002)
|
|
end_year: Ending year for NVD data (default: current year)
|
|
skip_nvd: Skip NVD bulk processing (default: False)
|
|
skip_nomi_sec: Skip nomi-sec PoC synchronization (default: False)
|
|
skip_exploitdb: Skip ExploitDB synchronization (default: False)
|
|
skip_cisa_kev: Skip CISA KEV synchronization (default: False)
|
|
|
|
Returns:
|
|
Dictionary containing operation results
|
|
"""
|
|
if end_year is None:
|
|
end_year = datetime.now().year
|
|
|
|
results = {
|
|
'start_time': datetime.utcnow(),
|
|
'nvd_results': None,
|
|
'nomi_sec_results': None,
|
|
'exploitdb_results': None,
|
|
'cisa_kev_results': None,
|
|
'sigma_results': None,
|
|
'total_time': None,
|
|
'status': 'running'
|
|
}
|
|
|
|
logger.info(f"Starting full bulk seed operation ({start_year}-{end_year})")
|
|
|
|
try:
|
|
# Phase 1: NVD Bulk Processing
|
|
if not skip_nvd:
|
|
logger.info("Phase 1: Starting NVD bulk processing...")
|
|
nvd_results = await self.nvd_processor.bulk_seed_database(
|
|
start_year=start_year,
|
|
end_year=end_year
|
|
)
|
|
results['nvd_results'] = nvd_results
|
|
logger.info(f"Phase 1 complete: {nvd_results['total_processed']} CVEs processed")
|
|
else:
|
|
logger.info("Phase 1: Skipping NVD bulk processing")
|
|
|
|
# Phase 2: nomi-sec PoC Synchronization
|
|
if not skip_nomi_sec:
|
|
logger.info("Phase 2: Starting nomi-sec PoC synchronization...")
|
|
nomi_sec_results = await self.nomi_sec_client.bulk_sync_all_cves(
|
|
batch_size=50 # Smaller batches for API stability
|
|
)
|
|
results['nomi_sec_results'] = nomi_sec_results
|
|
logger.info(f"Phase 2 complete: {nomi_sec_results['total_pocs_found']} PoCs found")
|
|
else:
|
|
logger.info("Phase 2: Skipping nomi-sec PoC synchronization")
|
|
|
|
# Phase 3: ExploitDB Synchronization
|
|
if not skip_exploitdb:
|
|
logger.info("Phase 3: Starting ExploitDB synchronization...")
|
|
exploitdb_results = await self.exploitdb_client.bulk_sync_exploitdb(
|
|
batch_size=30 # Smaller batches for git API stability
|
|
)
|
|
results['exploitdb_results'] = exploitdb_results
|
|
logger.info(f"Phase 3 complete: {exploitdb_results['total_exploits_found']} exploits found")
|
|
else:
|
|
logger.info("Phase 3: Skipping ExploitDB synchronization")
|
|
|
|
# Phase 4: CISA KEV Synchronization
|
|
if not skip_cisa_kev:
|
|
logger.info("Phase 4: Starting CISA KEV synchronization...")
|
|
cisa_kev_results = await self.cisa_kev_client.bulk_sync_kev_data(
|
|
batch_size=100 # Can handle larger batches since data is already filtered
|
|
)
|
|
results['cisa_kev_results'] = cisa_kev_results
|
|
logger.info(f"Phase 4 complete: {cisa_kev_results['total_kev_found']} KEV entries found")
|
|
else:
|
|
logger.info("Phase 4: Skipping CISA KEV synchronization")
|
|
|
|
# Phase 5: Generate Enhanced SIGMA Rules
|
|
logger.info("Phase 5: Generating enhanced SIGMA rules...")
|
|
sigma_results = await self.generate_enhanced_sigma_rules()
|
|
results['sigma_results'] = sigma_results
|
|
logger.info(f"Phase 5 complete: {sigma_results['rules_generated']} rules generated")
|
|
|
|
results['status'] = 'completed'
|
|
results['end_time'] = datetime.utcnow()
|
|
results['total_time'] = (results['end_time'] - results['start_time']).total_seconds()
|
|
|
|
logger.info(f"Full bulk seed operation completed in {results['total_time']:.2f} seconds")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Bulk seed operation failed: {e}")
|
|
results['status'] = 'failed'
|
|
results['error'] = str(e)
|
|
results['end_time'] = datetime.utcnow()
|
|
|
|
return results
|
|
|
|
async def incremental_update(self) -> dict:
|
|
"""
|
|
Perform incremental update operation
|
|
|
|
Returns:
|
|
Dictionary containing update results
|
|
"""
|
|
results = {
|
|
'start_time': datetime.utcnow(),
|
|
'nvd_update': None,
|
|
'nomi_sec_update': None,
|
|
'exploitdb_update': None,
|
|
'cisa_kev_update': None,
|
|
'status': 'running'
|
|
}
|
|
|
|
logger.info("Starting incremental update...")
|
|
|
|
try:
|
|
# Update NVD data using modified/recent feeds
|
|
logger.info("Updating NVD data...")
|
|
nvd_update = await self.nvd_processor.incremental_update()
|
|
results['nvd_update'] = nvd_update
|
|
|
|
# Update PoC data for newly added/modified CVEs
|
|
if nvd_update['total_processed'] > 0:
|
|
logger.info("Updating PoC data for modified CVEs...")
|
|
# Get recently modified CVEs and sync their PoCs
|
|
recent_cves = await self._get_recently_modified_cves()
|
|
nomi_sec_update = await self._sync_specific_cves(recent_cves)
|
|
results['nomi_sec_update'] = nomi_sec_update
|
|
|
|
# Update ExploitDB data for modified CVEs
|
|
logger.info("Updating ExploitDB data for modified CVEs...")
|
|
exploitdb_update = await self._sync_specific_cves_exploitdb(recent_cves)
|
|
results['exploitdb_update'] = exploitdb_update
|
|
|
|
# Update CISA KEV data for modified CVEs
|
|
logger.info("Updating CISA KEV data for modified CVEs...")
|
|
cisa_kev_update = await self._sync_specific_cves_cisa_kev(recent_cves)
|
|
results['cisa_kev_update'] = cisa_kev_update
|
|
|
|
results['status'] = 'completed'
|
|
results['end_time'] = datetime.utcnow()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Incremental update failed: {e}")
|
|
results['status'] = 'failed'
|
|
results['error'] = str(e)
|
|
results['end_time'] = datetime.utcnow()
|
|
|
|
return results
|
|
|
|
async def generate_enhanced_sigma_rules(self) -> dict:
|
|
"""Generate enhanced SIGMA rules using nomi-sec PoC data"""
|
|
from main import CVE, SigmaRule
|
|
|
|
# Import the enhanced rule generator
|
|
from enhanced_sigma_generator import EnhancedSigmaGenerator
|
|
|
|
generator = EnhancedSigmaGenerator(self.db_session)
|
|
|
|
# Get all CVEs that have PoC data but no enhanced rules
|
|
cves_with_pocs = self.db_session.query(CVE).filter(
|
|
CVE.poc_count > 0
|
|
).all()
|
|
|
|
rules_generated = 0
|
|
rules_updated = 0
|
|
|
|
for cve in cves_with_pocs:
|
|
try:
|
|
# Check if we need to generate/update the rule
|
|
existing_rule = self.db_session.query(SigmaRule).filter(
|
|
SigmaRule.cve_id == cve.cve_id
|
|
).first()
|
|
|
|
if existing_rule and existing_rule.poc_source == 'nomi_sec':
|
|
# Rule already exists and is up to date
|
|
continue
|
|
|
|
# Generate enhanced rule
|
|
rule_result = await generator.generate_enhanced_rule(cve)
|
|
|
|
if rule_result['success']:
|
|
if existing_rule:
|
|
rules_updated += 1
|
|
else:
|
|
rules_generated += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating rule for {cve.cve_id}: {e}")
|
|
continue
|
|
|
|
self.db_session.commit()
|
|
|
|
return {
|
|
'rules_generated': rules_generated,
|
|
'rules_updated': rules_updated,
|
|
'total_processed': len(cves_with_pocs)
|
|
}
|
|
|
|
async def _get_recently_modified_cves(self, hours: int = 24) -> list:
|
|
"""Get CVEs modified within the last N hours"""
|
|
from main import CVE
|
|
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
|
|
|
|
recent_cves = self.db_session.query(CVE).filter(
|
|
CVE.updated_at >= cutoff_time
|
|
).all()
|
|
|
|
return [cve.cve_id for cve in recent_cves]
|
|
|
|
async def _sync_specific_cves(self, cve_ids: list) -> dict:
|
|
"""Sync PoC data for specific CVEs"""
|
|
total_processed = 0
|
|
total_pocs_found = 0
|
|
|
|
for cve_id in cve_ids:
|
|
try:
|
|
result = await self.nomi_sec_client.sync_cve_pocs(cve_id)
|
|
total_processed += 1
|
|
total_pocs_found += result.get('pocs_found', 0)
|
|
|
|
# Small delay to avoid overwhelming the API
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing PoCs for {cve_id}: {e}")
|
|
continue
|
|
|
|
return {
|
|
'total_processed': total_processed,
|
|
'total_pocs_found': total_pocs_found
|
|
}
|
|
|
|
async def _sync_specific_cves_exploitdb(self, cve_ids: list) -> dict:
|
|
"""Sync ExploitDB data for specific CVEs"""
|
|
total_processed = 0
|
|
total_exploits_found = 0
|
|
|
|
for cve_id in cve_ids:
|
|
try:
|
|
result = await self.exploitdb_client.sync_cve_exploits(cve_id)
|
|
total_processed += 1
|
|
total_exploits_found += result.get('exploits_found', 0)
|
|
|
|
# Small delay to avoid overwhelming the API
|
|
await asyncio.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing ExploitDB for {cve_id}: {e}")
|
|
continue
|
|
|
|
return {
|
|
'total_processed': total_processed,
|
|
'total_exploits_found': total_exploits_found
|
|
}
|
|
|
|
async def _sync_specific_cves_cisa_kev(self, cve_ids: list) -> dict:
|
|
"""Sync CISA KEV data for specific CVEs"""
|
|
total_processed = 0
|
|
total_kev_found = 0
|
|
|
|
for cve_id in cve_ids:
|
|
try:
|
|
result = await self.cisa_kev_client.sync_cve_kev_data(cve_id)
|
|
total_processed += 1
|
|
if result.get('kev_found', False):
|
|
total_kev_found += 1
|
|
|
|
# Small delay to be respectful to CISA
|
|
await asyncio.sleep(0.2)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error syncing CISA KEV for {cve_id}: {e}")
|
|
continue
|
|
|
|
return {
|
|
'total_processed': total_processed,
|
|
'total_kev_found': total_kev_found
|
|
}
|
|
|
|
async def get_seeding_status(self) -> dict:
|
|
"""Get current seeding status and statistics"""
|
|
from main import CVE, SigmaRule, BulkProcessingJob
|
|
|
|
# Get database statistics
|
|
total_cves = self.db_session.query(CVE).count()
|
|
bulk_processed_cves = self.db_session.query(CVE).filter(
|
|
CVE.bulk_processed == True
|
|
).count()
|
|
|
|
cves_with_pocs = self.db_session.query(CVE).filter(
|
|
CVE.poc_count > 0
|
|
).count()
|
|
|
|
total_rules = self.db_session.query(SigmaRule).count()
|
|
nomi_sec_rules = self.db_session.query(SigmaRule).filter(
|
|
SigmaRule.poc_source == 'nomi_sec'
|
|
).count()
|
|
|
|
# Get recent job status
|
|
recent_jobs = self.db_session.query(BulkProcessingJob).order_by(
|
|
BulkProcessingJob.created_at.desc()
|
|
).limit(5).all()
|
|
|
|
job_status = []
|
|
for job in recent_jobs:
|
|
job_status.append({
|
|
'id': str(job.id),
|
|
'job_type': job.job_type,
|
|
'status': job.status,
|
|
'created_at': job.created_at,
|
|
'completed_at': job.completed_at,
|
|
'processed_items': job.processed_items,
|
|
'total_items': job.total_items,
|
|
'failed_items': job.failed_items
|
|
})
|
|
|
|
return {
|
|
'database_stats': {
|
|
'total_cves': total_cves,
|
|
'bulk_processed_cves': bulk_processed_cves,
|
|
'cves_with_pocs': cves_with_pocs,
|
|
'total_rules': total_rules,
|
|
'nomi_sec_rules': nomi_sec_rules,
|
|
'poc_coverage': (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0,
|
|
'nomi_sec_coverage': (nomi_sec_rules / total_rules * 100) if total_rules > 0 else 0
|
|
},
|
|
'recent_jobs': job_status,
|
|
'nvd_data_status': await self._get_nvd_data_status(),
|
|
'nomi_sec_status': await self.nomi_sec_client.get_sync_status(),
|
|
'exploitdb_status': await self.exploitdb_client.get_exploitdb_sync_status(),
|
|
'cisa_kev_status': await self.cisa_kev_client.get_kev_sync_status()
|
|
}
|
|
|
|
async def _get_nvd_data_status(self) -> dict:
|
|
"""Get NVD data status"""
|
|
from main import CVE
|
|
|
|
# Get year distribution
|
|
year_counts = {}
|
|
cves = self.db_session.query(CVE).all()
|
|
|
|
for cve in cves:
|
|
if cve.published_date:
|
|
year = cve.published_date.year
|
|
year_counts[year] = year_counts.get(year, 0) + 1
|
|
|
|
# Get source distribution
|
|
source_counts = {}
|
|
for cve in cves:
|
|
source = cve.data_source or 'unknown'
|
|
source_counts[source] = source_counts.get(source, 0) + 1
|
|
|
|
return {
|
|
'year_distribution': year_counts,
|
|
'source_distribution': source_counts,
|
|
'total_cves': len(cves),
|
|
'date_range': {
|
|
'earliest': min(cve.published_date for cve in cves if cve.published_date),
|
|
'latest': max(cve.published_date for cve in cves if cve.published_date)
|
|
} if cves else None
|
|
}
|
|
|
|
|
|
# Standalone script functionality
|
|
async def main():
|
|
"""Main function for standalone execution"""
|
|
from main import SessionLocal, engine, Base
|
|
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Create database session
|
|
db_session = SessionLocal()
|
|
|
|
try:
|
|
# Create bulk seeder
|
|
seeder = BulkSeeder(db_session)
|
|
|
|
# Get current status
|
|
status = await seeder.get_seeding_status()
|
|
print(f"Current Status: {status['database_stats']['total_cves']} CVEs in database")
|
|
|
|
# Perform full bulk seed if database is empty
|
|
if status['database_stats']['total_cves'] == 0:
|
|
print("Database is empty. Starting full bulk seed...")
|
|
results = await seeder.full_bulk_seed(start_year=2020) # Start from 2020 for faster testing
|
|
print(f"Bulk seed completed: {results}")
|
|
else:
|
|
print("Database contains data. Running incremental update...")
|
|
results = await seeder.incremental_update()
|
|
print(f"Incremental update completed: {results}")
|
|
|
|
finally:
|
|
db_session.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |