CLEANUP: Remove legacy web application components and streamline for CLI-first architecture

This commit completes the transformation to a CLI-first SIGMA rule generator by removing all legacy web application components:

REMOVED COMPONENTS:
- Frontend React application (frontend/ directory)
- Docker Compose web orchestration (docker-compose.yml, Dockerfiles)
- FastAPI web backend (main.py, celery_config.py, bulk_seeder.py)
- Web-specific task schedulers and executors
- Initialization scripts for web deployment (start.sh, init.sql, Makefile)

SIMPLIFIED ARCHITECTURE:
- Created backend/database_models.py for migration-only database access
- Updated CLI commands to use simplified database models
- Retained core processing modules (sigma generator, PoC clients, NVD processor)
- Fixed import paths in CLI migration and process commands

The application now operates as a streamlined CLI tool with file-based SIGMA rule storage,
eliminating web application complexity while maintaining all core CVE processing capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Brendan McDevitt 2025-07-21 13:24:38 -05:00
parent e579c91b5e
commit de30d4ce99
34 changed files with 434 additions and 7774 deletions

View file

@ -29,12 +29,10 @@ This is an enhanced CVE-SIGMA Auto Generator that has been **transformed from a
- `rule_*.sigma`: Multiple SIGMA rule variants (template, LLM, hybrid)
- `poc_analysis.json`: Extracted exploit indicators and analysis
### **Legacy Web Architecture (Optional, for Migration)**
- **Backend**: FastAPI with SQLAlchemy ORM (`backend/main.py`)
- **Frontend**: React with Tailwind CSS (`frontend/src/App.js`)
- **Database**: PostgreSQL (used only for migration to file-based system)
- **Cache**: Redis (optional)
- **Deployment**: Docker Compose (maintained for migration purposes)
### **Database Components (For Migration Only)**
- **Database Models**: `backend/database_models.py` - SQLAlchemy models for data migration
- **Legacy Support**: Core data processors maintained for CLI integration
- **Migration Tools**: Complete CLI-based migration utilities from legacy database
## Common Development Commands
@ -91,16 +89,13 @@ chmod +x cli/sigma_cli.py
./cli/sigma_cli.py stats overview
```
### **Legacy Web Interface (Optional)**
### **Database Migration Support**
```bash
# Start legacy web interface (for migration only)
docker-compose up -d db redis backend frontend
# If you have an existing PostgreSQL database with CVE data
export DATABASE_URL="postgresql://user:pass@localhost:5432/cve_sigma_db"
# Access points:
# - Frontend: http://localhost:3000
# - API: http://localhost:8000
# - API Docs: http://localhost:8000/docs
# - Flower (Celery): http://localhost:5555
# Migrate database to CLI file structure
./cli/sigma_cli.py migrate from-database --database-url $DATABASE_URL
```
### **Development and Testing**
@ -138,12 +133,9 @@ ls -la cves/2024/CVE-2024-0001/ # View individual CVE files
- `reports/`: Generated statistics and exports
- `cli/`: Command-line tool and modules
### Legacy Service URLs (If Using Web Interface)
- Frontend: http://localhost:3000
- Backend API: http://localhost:8000
- API Documentation: http://localhost:8000/docs
- Database: localhost:5432
- Redis: localhost:6379
### Database Connection (For Migration Only)
- **PostgreSQL**: localhost:5432 (if migrating from legacy database)
- **Connection String**: Set via DATABASE_URL environment variable
### Enhanced API Endpoints
@ -187,13 +179,14 @@ ls -la cves/2024/CVE-2024-0001/ # View individual CVE files
- **Metadata Format**: JSON files with processing history and PoC data
- **Reports**: Generated statistics and export outputs
### **Legacy Backend Structure (For Migration)**
- **main.py**: Core FastAPI application (maintained for migration)
- **Data Processors**: Reused by CLI for CVE fetching and analysis
### **Backend Data Processors (Reused by CLI)**
- **database_models.py**: SQLAlchemy models for data migration
- **Data Processors**: Core processing logic reused by CLI
- `nvd_bulk_processor.py`: NVD JSON dataset processing
- `nomi_sec_client.py`: nomi-sec PoC integration
- `enhanced_sigma_generator.py`: SIGMA rule generation
- `llm_client.py`: Multi-provider LLM integration
- `poc_analyzer.py`: PoC content analysis
### **CLI-Based Data Processing Flow**
1. **CVE Processing**: NVD data fetch → File storage → PoC analysis → Metadata generation

View file

@ -1,70 +0,0 @@
.PHONY: help start stop restart build logs clean dev setup
# Default target
help:
@echo "CVE-SIGMA Auto Generator - Available Commands:"
@echo "=============================================="
@echo " make start - Start the application"
@echo " make stop - Stop the application"
@echo " make restart - Restart the application"
@echo " make build - Build and start with fresh images"
@echo " make logs - Show application logs"
@echo " make clean - Stop and remove all containers/volumes"
@echo " make dev - Start in development mode"
@echo " make setup - Initial setup (copy .env, etc.)"
@echo " make help - Show this help message"
# Initial setup
setup:
@echo "🔧 Setting up CVE-SIGMA Auto Generator..."
@if [ ! -f .env ]; then \
cp .env.example .env; \
echo "✅ .env file created from .env.example"; \
echo "💡 Edit .env to add your NVD API key for better rate limits"; \
else \
echo "✅ .env file already exists"; \
fi
# Start the application
start: setup
@echo "🚀 Starting CVE-SIGMA Auto Generator..."
docker-compose up -d
@echo "✅ Application started!"
@echo "🌐 Frontend: http://localhost:3000"
@echo "🔧 Backend: http://localhost:8000"
@echo "📚 API Docs: http://localhost:8000/docs"
# Stop the application
stop:
@echo "🛑 Stopping CVE-SIGMA Auto Generator..."
docker-compose down
@echo "✅ Application stopped!"
# Restart the application
restart: stop start
# Build and start with fresh images
build: setup
@echo "🔨 Building and starting CVE-SIGMA Auto Generator..."
docker-compose up -d --build
@echo "✅ Application built and started!"
# Show logs
logs:
@echo "📋 Application logs (press Ctrl+C to exit):"
docker-compose logs -f
# Clean everything
clean:
@echo "🧹 Cleaning up CVE-SIGMA Auto Generator..."
docker-compose down -v --remove-orphans
docker system prune -f
@echo "✅ Cleanup complete!"
# Development mode (with hot reload)
dev: setup
@echo "🔧 Starting in development mode..."
docker-compose -f docker-compose.yml up -d db redis
@echo "💡 Database and Redis started. Run backend and frontend locally for development."
@echo " Backend: cd backend && pip install -r requirements.txt && uvicorn main:app --reload"
@echo " Frontend: cd frontend && npm install && npm start"

View file

@ -1,26 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View file

@ -1,463 +0,0 @@
"""
Bulk Data Seeding Coordinator
Orchestrates the complete bulk seeding process using NVD JSON feeds and nomi-sec PoC data
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from nvd_bulk_processor import NVDBulkProcessor
from nomi_sec_client import NomiSecClient
from exploitdb_client_local import ExploitDBLocalClient
from cisa_kev_client import CISAKEVClient
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BulkSeeder:
"""Coordinates bulk seeding operations"""
def __init__(self, db_session: Session):
self.db_session = db_session
self.nvd_processor = NVDBulkProcessor(db_session)
self.nomi_sec_client = NomiSecClient(db_session)
self.exploitdb_client = ExploitDBLocalClient(db_session)
self.cisa_kev_client = CISAKEVClient(db_session)
async def full_bulk_seed(self, start_year: int = 2002,
end_year: Optional[int] = None,
skip_nvd: bool = False,
skip_nomi_sec: bool = False,
skip_exploitdb: bool = False,
skip_cisa_kev: bool = False,
progress_callback: Optional[callable] = None) -> dict:
"""
Perform complete bulk seeding operation
Args:
start_year: Starting year for NVD data (default: 2002)
end_year: Ending year for NVD data (default: current year)
skip_nvd: Skip NVD bulk processing (default: False)
skip_nomi_sec: Skip nomi-sec PoC synchronization (default: False)
skip_exploitdb: Skip ExploitDB synchronization (default: False)
skip_cisa_kev: Skip CISA KEV synchronization (default: False)
Returns:
Dictionary containing operation results
"""
if end_year is None:
end_year = datetime.now().year
results = {
'start_time': datetime.utcnow(),
'nvd_results': None,
'nomi_sec_results': None,
'exploitdb_results': None,
'cisa_kev_results': None,
'sigma_results': None,
'total_time': None,
'status': 'running'
}
logger.info(f"Starting full bulk seed operation ({start_year}-{end_year})")
try:
# Phase 1: NVD Bulk Processing
if not skip_nvd:
if progress_callback:
progress_callback("nvd_processing", 10, "Starting NVD bulk processing...")
logger.info("Phase 1: Starting NVD bulk processing...")
nvd_results = await self.nvd_processor.bulk_seed_database(
start_year=start_year,
end_year=end_year
)
results['nvd_results'] = nvd_results
if progress_callback:
progress_callback("nvd_processing", 25, f"NVD processing complete: {nvd_results['total_processed']} CVEs processed")
logger.info(f"Phase 1 complete: {nvd_results['total_processed']} CVEs processed")
else:
logger.info("Phase 1: Skipping NVD bulk processing")
if progress_callback:
progress_callback("nvd_processing", 25, "Skipping NVD bulk processing")
# Phase 2: nomi-sec PoC Synchronization
if not skip_nomi_sec:
if progress_callback:
progress_callback("nomi_sec_sync", 30, "Starting nomi-sec PoC synchronization...")
logger.info("Phase 2: Starting nomi-sec PoC synchronization...")
nomi_sec_results = await self.nomi_sec_client.bulk_sync_all_cves(
batch_size=50 # Smaller batches for API stability
)
results['nomi_sec_results'] = nomi_sec_results
if progress_callback:
progress_callback("nomi_sec_sync", 50, f"Nomi-sec sync complete: {nomi_sec_results['total_pocs_found']} PoCs found")
logger.info(f"Phase 2 complete: {nomi_sec_results['total_pocs_found']} PoCs found")
else:
logger.info("Phase 2: Skipping nomi-sec PoC synchronization")
if progress_callback:
progress_callback("nomi_sec_sync", 50, "Skipping nomi-sec PoC synchronization")
# Phase 3: ExploitDB Synchronization
if not skip_exploitdb:
if progress_callback:
progress_callback("exploitdb_sync", 55, "Starting ExploitDB synchronization...")
logger.info("Phase 3: Starting ExploitDB synchronization...")
exploitdb_results = await self.exploitdb_client.bulk_sync_exploitdb(
batch_size=30 # Smaller batches for git API stability
)
results['exploitdb_results'] = exploitdb_results
if progress_callback:
progress_callback("exploitdb_sync", 70, f"ExploitDB sync complete: {exploitdb_results['total_exploits_found']} exploits found")
logger.info(f"Phase 3 complete: {exploitdb_results['total_exploits_found']} exploits found")
else:
logger.info("Phase 3: Skipping ExploitDB synchronization")
if progress_callback:
progress_callback("exploitdb_sync", 70, "Skipping ExploitDB synchronization")
# Phase 4: CISA KEV Synchronization
if not skip_cisa_kev:
if progress_callback:
progress_callback("cisa_kev_sync", 75, "Starting CISA KEV synchronization...")
logger.info("Phase 4: Starting CISA KEV synchronization...")
cisa_kev_results = await self.cisa_kev_client.bulk_sync_kev_data(
batch_size=100 # Can handle larger batches since data is already filtered
)
results['cisa_kev_results'] = cisa_kev_results
if progress_callback:
progress_callback("cisa_kev_sync", 85, f"CISA KEV sync complete: {cisa_kev_results['total_kev_found']} KEV entries found")
logger.info(f"Phase 4 complete: {cisa_kev_results['total_kev_found']} KEV entries found")
else:
logger.info("Phase 4: Skipping CISA KEV synchronization")
if progress_callback:
progress_callback("cisa_kev_sync", 85, "Skipping CISA KEV synchronization")
# Phase 5: Generate Enhanced SIGMA Rules
if progress_callback:
progress_callback("sigma_rules", 90, "Generating enhanced SIGMA rules...")
logger.info("Phase 5: Generating enhanced SIGMA rules...")
sigma_results = await self.generate_enhanced_sigma_rules()
results['sigma_results'] = sigma_results
if progress_callback:
progress_callback("sigma_rules", 95, f"SIGMA rule generation complete: {sigma_results['rules_generated']} rules generated")
logger.info(f"Phase 5 complete: {sigma_results['rules_generated']} rules generated")
results['status'] = 'completed'
results['end_time'] = datetime.utcnow()
results['total_time'] = (results['end_time'] - results['start_time']).total_seconds()
logger.info(f"Full bulk seed operation completed in {results['total_time']:.2f} seconds")
except Exception as e:
logger.error(f"Bulk seed operation failed: {e}")
results['status'] = 'failed'
results['error'] = str(e)
results['end_time'] = datetime.utcnow()
return results
async def incremental_update(self) -> dict:
"""
Perform incremental update operation
Returns:
Dictionary containing update results
"""
results = {
'start_time': datetime.utcnow(),
'nvd_update': None,
'nomi_sec_update': None,
'exploitdb_update': None,
'cisa_kev_update': None,
'status': 'running'
}
logger.info("Starting incremental update...")
try:
# Update NVD data using modified/recent feeds
logger.info("Updating NVD data...")
nvd_update = await self.nvd_processor.incremental_update()
results['nvd_update'] = nvd_update
# Update PoC data for newly added/modified CVEs
if nvd_update['total_processed'] > 0:
logger.info("Updating PoC data for modified CVEs...")
# Get recently modified CVEs and sync their PoCs
recent_cves = await self._get_recently_modified_cves()
nomi_sec_update = await self._sync_specific_cves(recent_cves)
results['nomi_sec_update'] = nomi_sec_update
# Update ExploitDB data for modified CVEs
logger.info("Updating ExploitDB data for modified CVEs...")
exploitdb_update = await self._sync_specific_cves_exploitdb(recent_cves)
results['exploitdb_update'] = exploitdb_update
# Update CISA KEV data for modified CVEs
logger.info("Updating CISA KEV data for modified CVEs...")
cisa_kev_update = await self._sync_specific_cves_cisa_kev(recent_cves)
results['cisa_kev_update'] = cisa_kev_update
results['status'] = 'completed'
results['end_time'] = datetime.utcnow()
except Exception as e:
logger.error(f"Incremental update failed: {e}")
results['status'] = 'failed'
results['error'] = str(e)
results['end_time'] = datetime.utcnow()
return results
async def generate_enhanced_sigma_rules(self) -> dict:
"""Generate enhanced SIGMA rules using nomi-sec PoC data"""
from main import CVE, SigmaRule
# Import the enhanced rule generator
from enhanced_sigma_generator import EnhancedSigmaGenerator
generator = EnhancedSigmaGenerator(self.db_session)
# Get all CVEs that have PoC data but no enhanced rules
cves_with_pocs = self.db_session.query(CVE).filter(
CVE.poc_count > 0
).all()
rules_generated = 0
rules_updated = 0
for cve in cves_with_pocs:
try:
# Check if we need to generate/update the rule
existing_rule = self.db_session.query(SigmaRule).filter(
SigmaRule.cve_id == cve.cve_id
).first()
if existing_rule and existing_rule.poc_source == 'nomi_sec':
# Rule already exists and is up to date
continue
# Generate enhanced rule
rule_result = await generator.generate_enhanced_rule(cve)
if rule_result['success']:
if existing_rule:
rules_updated += 1
else:
rules_generated += 1
except Exception as e:
logger.error(f"Error generating rule for {cve.cve_id}: {e}")
continue
self.db_session.commit()
return {
'rules_generated': rules_generated,
'rules_updated': rules_updated,
'total_processed': len(cves_with_pocs)
}
async def _get_recently_modified_cves(self, hours: int = 24) -> list:
"""Get CVEs modified within the last N hours"""
from main import CVE
cutoff_time = datetime.utcnow() - timedelta(hours=hours)
recent_cves = self.db_session.query(CVE).filter(
CVE.updated_at >= cutoff_time
).all()
return [cve.cve_id for cve in recent_cves]
async def _sync_specific_cves(self, cve_ids: list) -> dict:
"""Sync PoC data for specific CVEs"""
total_processed = 0
total_pocs_found = 0
for cve_id in cve_ids:
try:
result = await self.nomi_sec_client.sync_cve_pocs(cve_id)
total_processed += 1
total_pocs_found += result.get('pocs_found', 0)
# Small delay to avoid overwhelming the API
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error syncing PoCs for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_pocs_found': total_pocs_found
}
async def _sync_specific_cves_exploitdb(self, cve_ids: list) -> dict:
"""Sync ExploitDB data for specific CVEs"""
total_processed = 0
total_exploits_found = 0
for cve_id in cve_ids:
try:
result = await self.exploitdb_client.sync_cve_exploits(cve_id)
total_processed += 1
total_exploits_found += result.get('exploits_found', 0)
# Small delay to avoid overwhelming the API
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error syncing ExploitDB for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_exploits_found': total_exploits_found
}
async def _sync_specific_cves_cisa_kev(self, cve_ids: list) -> dict:
"""Sync CISA KEV data for specific CVEs"""
total_processed = 0
total_kev_found = 0
for cve_id in cve_ids:
try:
result = await self.cisa_kev_client.sync_cve_kev_data(cve_id)
total_processed += 1
if result.get('kev_found', False):
total_kev_found += 1
# Small delay to be respectful to CISA
await asyncio.sleep(0.2)
except Exception as e:
logger.error(f"Error syncing CISA KEV for {cve_id}: {e}")
continue
return {
'total_processed': total_processed,
'total_kev_found': total_kev_found
}
async def get_seeding_status(self) -> dict:
"""Get current seeding status and statistics"""
from main import CVE, SigmaRule, BulkProcessingJob
# Get database statistics
total_cves = self.db_session.query(CVE).count()
bulk_processed_cves = self.db_session.query(CVE).filter(
CVE.bulk_processed == True
).count()
cves_with_pocs = self.db_session.query(CVE).filter(
CVE.poc_count > 0
).count()
total_rules = self.db_session.query(SigmaRule).count()
nomi_sec_rules = self.db_session.query(SigmaRule).filter(
SigmaRule.poc_source == 'nomi_sec'
).count()
# Get recent job status
recent_jobs = self.db_session.query(BulkProcessingJob).order_by(
BulkProcessingJob.created_at.desc()
).limit(5).all()
job_status = []
for job in recent_jobs:
job_status.append({
'id': str(job.id),
'job_type': job.job_type,
'status': job.status,
'created_at': job.created_at,
'completed_at': job.completed_at,
'processed_items': job.processed_items,
'total_items': job.total_items,
'failed_items': job.failed_items
})
return {
'database_stats': {
'total_cves': total_cves,
'bulk_processed_cves': bulk_processed_cves,
'cves_with_pocs': cves_with_pocs,
'total_rules': total_rules,
'nomi_sec_rules': nomi_sec_rules,
'poc_coverage': (cves_with_pocs / total_cves * 100) if total_cves > 0 else 0,
'nomi_sec_coverage': (nomi_sec_rules / total_rules * 100) if total_rules > 0 else 0
},
'recent_jobs': job_status,
'nvd_data_status': await self._get_nvd_data_status(),
'nomi_sec_status': await self.nomi_sec_client.get_sync_status(),
'exploitdb_status': await self.exploitdb_client.get_exploitdb_sync_status(),
'cisa_kev_status': await self.cisa_kev_client.get_kev_sync_status()
}
async def _get_nvd_data_status(self) -> dict:
"""Get NVD data status"""
from main import CVE
# Get year distribution
year_counts = {}
cves = self.db_session.query(CVE).all()
for cve in cves:
if cve.published_date:
year = cve.published_date.year
year_counts[year] = year_counts.get(year, 0) + 1
# Get source distribution
source_counts = {}
for cve in cves:
source = cve.data_source or 'unknown'
source_counts[source] = source_counts.get(source, 0) + 1
return {
'year_distribution': year_counts,
'source_distribution': source_counts,
'total_cves': len(cves),
'date_range': {
'earliest': min(cve.published_date for cve in cves if cve.published_date),
'latest': max(cve.published_date for cve in cves if cve.published_date)
} if cves else None
}
# Standalone script functionality
async def main():
"""Main function for standalone execution"""
from main import SessionLocal, engine, Base
# Create tables
Base.metadata.create_all(bind=engine)
# Create database session
db_session = SessionLocal()
try:
# Create bulk seeder
seeder = BulkSeeder(db_session)
# Get current status
status = await seeder.get_seeding_status()
print(f"Current Status: {status['database_stats']['total_cves']} CVEs in database")
# Perform full bulk seed if database is empty
if status['database_stats']['total_cves'] == 0:
print("Database is empty. Starting full bulk seed...")
results = await seeder.full_bulk_seed(start_year=2020) # Start from 2020 for faster testing
print(f"Bulk seed completed: {results}")
else:
print("Database contains data. Running incremental update...")
results = await seeder.incremental_update()
print(f"Incremental update completed: {results}")
finally:
db_session.close()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,222 +0,0 @@
"""
Celery configuration for the Auto SIGMA Rule Generator
"""
import os
from celery import Celery
from celery.schedules import crontab
from kombu import Queue
# Celery configuration
broker_url = os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
result_backend = os.getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')
# Create Celery app
celery_app = Celery(
'sigma_generator',
broker=broker_url,
backend=result_backend,
include=[
'tasks.bulk_tasks',
'tasks.sigma_tasks',
'tasks.data_sync_tasks',
'tasks.maintenance_tasks'
]
)
# Celery configuration
celery_app.conf.update(
# Serialization
task_serializer='json',
accept_content=['json'],
result_serializer='json',
# Timezone
timezone='UTC',
enable_utc=True,
# Task tracking
task_track_started=True,
task_send_sent_event=True,
# Result backend settings
result_expires=3600, # Results expire after 1 hour
result_backend_transport_options={
'master_name': 'mymaster',
'visibility_timeout': 3600,
},
# Worker settings
worker_prefetch_multiplier=1,
task_acks_late=True,
worker_max_tasks_per_child=1000,
# Task routes - different queues for different types of tasks
task_routes={
'tasks.bulk_tasks.*': {'queue': 'bulk_processing'},
'tasks.sigma_tasks.*': {'queue': 'sigma_generation'},
'tasks.data_sync_tasks.*': {'queue': 'data_sync'},
},
# Queue definitions
task_default_queue='default',
task_queues=(
Queue('default', routing_key='default'),
Queue('bulk_processing', routing_key='bulk_processing'),
Queue('sigma_generation', routing_key='sigma_generation'),
Queue('data_sync', routing_key='data_sync'),
),
# Retry settings
task_default_retry_delay=60, # 1 minute
task_max_retries=3,
# Monitoring
worker_send_task_events=True,
# Optimized Beat schedule for daily workflow
# WORKFLOW: NVD incremental -> Exploit syncs -> Reference sync -> SIGMA rules
beat_schedule={
# STEP 1: NVD Incremental Update - Daily at 2:00 AM
# This runs first to get the latest CVE data from NVD
'daily-nvd-incremental-update': {
'task': 'bulk_tasks.incremental_update_task',
'schedule': crontab(minute=0, hour=2), # Daily at 2:00 AM
'options': {'queue': 'bulk_processing'},
'kwargs': {'batch_size': 100, 'skip_nvd': False, 'skip_nomi_sec': True}
},
# STEP 2: Exploit Data Syncing - Daily starting at 3:00 AM
# These run in parallel but start at different times to avoid conflicts
# CISA KEV Sync - Daily at 3:00 AM (15 minutes after NVD)
'daily-cisa-kev-sync': {
'task': 'data_sync_tasks.sync_cisa_kev',
'schedule': crontab(minute=0, hour=3), # Daily at 3:00 AM
'options': {'queue': 'data_sync'},
'kwargs': {'batch_size': 100}
},
# Nomi-sec PoC Sync - Daily at 3:15 AM
'daily-nomi-sec-sync': {
'task': 'data_sync_tasks.sync_nomi_sec',
'schedule': crontab(minute=15, hour=3), # Daily at 3:15 AM
'options': {'queue': 'data_sync'},
'kwargs': {'batch_size': 100}
},
# GitHub PoC Sync - Daily at 3:30 AM
'daily-github-poc-sync': {
'task': 'data_sync_tasks.sync_github_poc',
'schedule': crontab(minute=30, hour=3), # Daily at 3:30 AM
'options': {'queue': 'data_sync'},
'kwargs': {'batch_size': 50}
},
# ExploitDB Sync - Daily at 3:45 AM
'daily-exploitdb-sync': {
'task': 'data_sync_tasks.sync_exploitdb',
'schedule': crontab(minute=45, hour=3), # Daily at 3:45 AM
'options': {'queue': 'data_sync'},
'kwargs': {'batch_size': 30}
},
# CVE2CAPEC MITRE ATT&CK Mapping Sync - Daily at 4:00 AM
'daily-cve2capec-sync': {
'task': 'data_sync_tasks.sync_cve2capec',
'schedule': crontab(minute=0, hour=4), # Daily at 4:00 AM
'options': {'queue': 'data_sync'},
'kwargs': {'force_refresh': False} # Only refresh if cache is stale
},
# ExploitDB Index Rebuild - Daily at 4:15 AM
'daily-exploitdb-index-build': {
'task': 'data_sync_tasks.build_exploitdb_index',
'schedule': crontab(minute=15, hour=4), # Daily at 4:15 AM
'options': {'queue': 'data_sync'}
},
# STEP 3: Reference Content Sync - Daily at 5:00 AM
# This is the longest-running task, starts after exploit syncs have time to complete
'daily-reference-content-sync': {
'task': 'data_sync_tasks.sync_reference_content',
'schedule': crontab(minute=0, hour=5), # Daily at 5:00 AM
'options': {'queue': 'data_sync'},
'kwargs': {'batch_size': 30, 'max_cves': 200, 'force_resync': False}
},
# STEP 4: SIGMA Rule Generation - Daily at 8:00 AM
# This runs LAST after all other daily data sync jobs
'daily-sigma-rule-generation': {
'task': 'bulk_tasks.generate_enhanced_sigma_rules',
'schedule': crontab(minute=0, hour=8), # Daily at 8:00 AM
'options': {'queue': 'sigma_generation'}
},
# LLM-Enhanced SIGMA Rule Generation - Daily at 9:00 AM
# Additional LLM-based rule generation after standard rules
'daily-llm-sigma-generation': {
'task': 'sigma_tasks.generate_enhanced_rules',
'schedule': crontab(minute=0, hour=9), # Daily at 9:00 AM
'options': {'queue': 'sigma_generation'},
'kwargs': {'cve_ids': None} # Process all CVEs with PoCs
},
# MAINTENANCE TASKS
# Database Cleanup - Weekly on Sunday at 1:00 AM (before daily workflow)
'weekly-database-cleanup': {
'task': 'tasks.maintenance_tasks.database_cleanup_comprehensive',
'schedule': crontab(minute=0, hour=1, day_of_week=0), # Sunday at 1:00 AM
'options': {'queue': 'default'},
'kwargs': {'days_to_keep': 30, 'cleanup_failed_jobs': True, 'cleanup_logs': True}
},
# Health Check - Every 15 minutes
'health-check-detailed': {
'task': 'tasks.maintenance_tasks.health_check_detailed',
'schedule': crontab(minute='*/15'), # Every 15 minutes
'options': {'queue': 'default'}
},
# Celery result cleanup - Daily at 1:30 AM
'daily-cleanup-old-results': {
'task': 'tasks.maintenance_tasks.cleanup_old_results',
'schedule': crontab(minute=30, hour=1), # Daily at 1:30 AM
'options': {'queue': 'default'}
},
},
)
# Configure logging
celery_app.conf.update(
worker_log_format='[%(asctime)s: %(levelname)s/%(processName)s] %(message)s',
worker_task_log_format='[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s',
)
# Database session configuration for tasks
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://cve_user:cve_password@db:5432/cve_sigma_db')
# Create engine and session factory
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db_session():
"""Get database session for tasks"""
return SessionLocal()
# Import all task modules to register them
def register_tasks():
"""Register all task modules"""
try:
from tasks import bulk_tasks, sigma_tasks, data_sync_tasks, maintenance_tasks
print("All task modules registered successfully")
except ImportError as e:
print(f"Warning: Could not import some task modules: {e}")
# Auto-register tasks when module is imported
if __name__ != "__main__":
register_tasks()

View file

@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""
Convert LoRA adapter to GGUF format for better Ollama integration
"""
import os
import sys
import subprocess
import tempfile
import shutil
from pathlib import Path
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_llama_cpp_tools():
"""Check if llama.cpp tools are available"""
tools_needed = [
'convert_hf_to_gguf.py',
'convert_lora_to_gguf.py',
'llama-export-lora'
]
missing_tools = []
for tool in tools_needed:
if not shutil.which(tool):
missing_tools.append(tool)
if missing_tools:
logger.error(f"Missing required tools: {missing_tools}")
logger.error("Please install llama.cpp and ensure tools are in PATH")
logger.error("See: https://github.com/ggerganov/llama.cpp")
return False
return True
def convert_lora_to_gguf(lora_path: str, base_model_name: str = "meta-llama/Llama-3.2-3B-Instruct"):
"""Convert LoRA adapter to GGUF format"""
if not check_llama_cpp_tools():
logger.error("Cannot convert LoRA - missing llama.cpp tools")
return None
try:
# Create temporary directory for conversion
with tempfile.TemporaryDirectory() as temp_dir:
logger.info(f"Converting LoRA adapter from {lora_path}")
# Step 1: Convert LoRA to GGUF
lora_gguf_path = os.path.join(temp_dir, "adapter.gguf")
cmd = [
'convert_lora_to_gguf.py',
'--base', base_model_name,
'--outtype', 'q8_0', # High quality quantization
'--outfile', lora_gguf_path,
lora_path
]
logger.info(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"LoRA conversion failed: {result.stderr}")
return None
# Step 2: Download/prepare base model in GGUF format
base_model_path = os.path.join(temp_dir, "base_model.gguf")
# This would need to download the base model in GGUF format
# For now, we'll assume it's available or use Ollama's version
# Step 3: Merge LoRA with base model
merged_model_path = os.path.join(temp_dir, "merged_model.gguf")
merge_cmd = [
'llama-export-lora',
'-m', base_model_path,
'-o', merged_model_path,
'--lora', lora_gguf_path
]
logger.info(f"Running: {' '.join(merge_cmd)}")
merge_result = subprocess.run(merge_cmd, capture_output=True, text=True)
if merge_result.returncode != 0:
logger.error(f"LoRA merge failed: {merge_result.stderr}")
return None
# Copy merged model to output location
output_path = "/app/models/sigma_llama_merged.gguf"
shutil.copy2(merged_model_path, output_path)
logger.info(f"✅ Successfully converted and merged LoRA to {output_path}")
return output_path
except Exception as e:
logger.error(f"Error converting LoRA: {e}")
return None
def create_gguf_modelfile(gguf_path: str) -> str:
"""Create Modelfile for GGUF merged model"""
modelfile_content = f"""FROM {gguf_path}
TEMPLATE \"\"\"<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a cybersecurity expert specializing in SIGMA rule creation. You have been fine-tuned specifically for generating high-quality SIGMA detection rules.
Generate valid SIGMA rules in YAML format based on the provided CVE and exploit information. Output ONLY valid YAML starting with 'title:' and ending with the last YAML line.
Focus on:
- Accurate logsource identification
- Precise detection logic
- Relevant fields and values
- Proper YAML formatting
- Contextual understanding from CVE details<|eot_id|><|start_header_id|>user<|end_header_id|>
{{ .Prompt }}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
\"\"\"
# Optimized parameters for SIGMA rule generation
PARAMETER temperature 0.1
PARAMETER top_p 0.9
PARAMETER top_k 40
PARAMETER repeat_penalty 1.1
PARAMETER num_ctx 4096
PARAMETER stop "<|eot_id|>"
PARAMETER stop "<|end_of_text|>"
SYSTEM \"\"\"You are a specialized SIGMA rule generation model. Your training has optimized you for creating accurate, contextual SIGMA detection rules. Generate only valid YAML format rules based on the provided context.\"\"\"
"""
return modelfile_content
if __name__ == "__main__":
# Test conversion
lora_path = "/app/models/sigma_llama_finetuned/checkpoint-2268"
if os.path.exists(lora_path):
converted_path = convert_lora_to_gguf(lora_path)
if converted_path:
print(f"✅ Conversion successful: {converted_path}")
print("Use this path in your Ollama Modelfile:")
print(create_gguf_modelfile(converted_path))
else:
print("❌ Conversion failed")
else:
print(f"❌ LoRA path not found: {lora_path}")

View file

@ -0,0 +1,91 @@
"""
Database Models for CVE-SIGMA Auto Generator CLI
Maintains database model definitions for migration and data processing purposes.
Used by CLI migration tools to export data from legacy web application database.
"""
import uuid
from datetime import datetime
from sqlalchemy import create_engine, Column, String, Text, DECIMAL, TIMESTAMP, Boolean, ARRAY, Integer, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import UUID
import os
# Database setup
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://cve_user:cve_password@localhost:5432/cve_sigma_db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Database Models (for migration purposes)
class CVE(Base):
__tablename__ = "cves"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
cve_id = Column(String(20), unique=True, nullable=False)
description = Column(Text)
cvss_score = Column(DECIMAL(3, 1))
severity = Column(String(20))
published_date = Column(TIMESTAMP)
modified_date = Column(TIMESTAMP)
affected_products = Column(ARRAY(String))
reference_urls = Column(ARRAY(String))
# Bulk processing fields
data_source = Column(String(20), default='nvd_api') # 'nvd_api', 'nvd_bulk', 'manual'
nvd_json_version = Column(String(10), default='2.0')
bulk_processed = Column(Boolean, default=False)
# nomi-sec PoC fields
poc_count = Column(Integer, default=0)
poc_data = Column(JSON) # Store nomi-sec PoC metadata
# Reference data fields
reference_data = Column(JSON) # Store extracted reference content and analysis
reference_sync_status = Column(String(20), default='pending') # 'pending', 'processing', 'completed', 'failed'
reference_last_synced = Column(TIMESTAMP)
created_at = Column(TIMESTAMP, default=datetime.utcnow)
updated_at = Column(TIMESTAMP, default=datetime.utcnow)
class SigmaRule(Base):
__tablename__ = "sigma_rules"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
cve_id = Column(String(20))
rule_name = Column(String(255), nullable=False)
rule_content = Column(Text, nullable=False)
detection_type = Column(String(50))
log_source = Column(String(100))
confidence_level = Column(String(20))
auto_generated = Column(Boolean, default=True)
exploit_based = Column(Boolean, default=False)
github_repos = Column(ARRAY(String))
exploit_indicators = Column(Text) # JSON string of extracted indicators
# Enhanced fields for new data sources
poc_source = Column(String(20), default='github_search') # 'github_search', 'nomi_sec', 'manual'
poc_quality_score = Column(Integer, default=0) # Based on star count, activity, etc.
nomi_sec_data = Column(JSON) # Store nomi-sec PoC metadata
created_at = Column(TIMESTAMP, default=datetime.utcnow)
updated_at = Column(TIMESTAMP, default=datetime.utcnow)
class RuleTemplate(Base):
__tablename__ = "rule_templates"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
template_name = Column(String(255), nullable=False)
template_content = Column(Text, nullable=False)
applicable_product_patterns = Column(ARRAY(String))
description = Column(Text)
created_at = Column(TIMESTAMP, default=datetime.utcnow)
def get_db():
"""Get database session for migration purposes"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# Create all tables (for migration purposes)
def create_tables():
"""Create database tables"""
Base.metadata.create_all(bind=engine)

View file

@ -1,58 +0,0 @@
#!/usr/bin/env python3
"""
Script to delete all SIGMA rules from the database
This will clear existing rules so they can be regenerated with the improved LLM client
"""
from main import SigmaRule, SessionLocal
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def delete_all_sigma_rules():
"""Delete all SIGMA rules from the database"""
db = SessionLocal()
try:
# Count existing rules
total_rules = db.query(SigmaRule).count()
logger.info(f"Found {total_rules} SIGMA rules in database")
if total_rules == 0:
logger.info("No SIGMA rules to delete")
return 0
# Delete all SIGMA rules
logger.info("Deleting all SIGMA rules...")
deleted_count = db.query(SigmaRule).delete()
db.commit()
logger.info(f"✅ Successfully deleted {deleted_count} SIGMA rules")
# Verify deletion
remaining_rules = db.query(SigmaRule).count()
logger.info(f"Remaining rules in database: {remaining_rules}")
return deleted_count
except Exception as e:
logger.error(f"Error deleting SIGMA rules: {e}")
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
print("🗑️ Deleting all SIGMA rules from database...")
print("This will allow regeneration with the improved LLM client.")
deleted_count = delete_all_sigma_rules()
if deleted_count > 0:
print(f"\n🎉 Successfully deleted {deleted_count} SIGMA rules!")
print("You can now regenerate them with the fixed LLM prompts.")
else:
print("\n✅ No SIGMA rules were found to delete.")

View file

@ -1,171 +0,0 @@
#!/usr/bin/env python3
"""
Initial setup script that runs once on first boot to populate the database.
This script checks if initial data seeding is needed and triggers it via Celery.
"""
import os
import sys
import time
import logging
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import OperationalError
# Add the current directory to path so we can import our modules
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL', 'postgresql://cve_user:cve_password@db:5432/cve_sigma_db')
def wait_for_database(max_retries: int = 30, delay: int = 5) -> bool:
"""Wait for database to be ready"""
logger.info("Waiting for database to be ready...")
for attempt in range(max_retries):
try:
engine = create_engine(DATABASE_URL)
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("✅ Database is ready!")
return True
except OperationalError as e:
logger.info(f"Attempt {attempt + 1}/{max_retries}: Database not ready yet ({e})")
except Exception as e:
logger.error(f"Unexpected error connecting to database: {e}")
if attempt < max_retries - 1:
time.sleep(delay)
logger.error("❌ Database failed to become ready")
return False
def check_initial_setup_needed() -> bool:
"""Check if initial setup is needed by examining the database state"""
try:
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
with SessionLocal() as session:
# Check if we have any CVEs in the database
result = session.execute(text("SELECT COUNT(*) FROM cves")).fetchone()
cve_count = result[0] if result else 0
logger.info(f"Current CVE count in database: {cve_count}")
# Check if we have any bulk processing jobs that completed successfully
bulk_jobs_result = session.execute(text("""
SELECT COUNT(*) FROM bulk_processing_jobs
WHERE job_type = 'nvd_bulk_seed'
AND status = 'completed'
AND created_at > NOW() - INTERVAL '30 days'
""")).fetchone()
recent_bulk_jobs = bulk_jobs_result[0] if bulk_jobs_result else 0
logger.info(f"Recent successful bulk seed jobs: {recent_bulk_jobs}")
# Initial setup needed if:
# 1. Very few CVEs (less than 1000) AND
# 2. No recent successful bulk seed jobs
initial_setup_needed = cve_count < 1000 and recent_bulk_jobs == 0
if initial_setup_needed:
logger.info("🔄 Initial setup is needed - will trigger full NVD sync")
else:
logger.info("✅ Initial setup already completed - database has sufficient data")
return initial_setup_needed
except Exception as e:
logger.error(f"Error checking initial setup status: {e}")
# If we can't check, assume setup is needed
return True
def trigger_initial_bulk_seed():
"""Trigger initial bulk seed via Celery"""
try:
# Import here to avoid circular dependencies
from celery_config import celery_app
from tasks.bulk_tasks import full_bulk_seed_task
logger.info("🚀 Triggering initial full NVD bulk seed...")
# Start a comprehensive bulk seed job
# Start from 2020 for faster initial setup, can be adjusted
task_result = full_bulk_seed_task.delay(
start_year=2020, # Start from 2020 for faster initial setup
end_year=None, # Current year
skip_nvd=False,
skip_nomi_sec=True, # Skip nomi-sec initially, will be done daily
skip_exploitdb=True, # Skip exploitdb initially, will be done daily
skip_cisa_kev=True # Skip CISA KEV initially, will be done daily
)
logger.info(f"✅ Initial bulk seed task started with ID: {task_result.id}")
logger.info(f"Monitor progress at: http://localhost:5555/task/{task_result.id}")
return task_result.id
except Exception as e:
logger.error(f"❌ Failed to trigger initial bulk seed: {e}")
return None
def create_initial_setup_marker():
"""Create a marker to indicate initial setup was attempted"""
try:
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
with SessionLocal() as session:
# Insert a marker record
session.execute(text("""
INSERT INTO bulk_processing_jobs (job_type, status, job_metadata, created_at, started_at)
VALUES ('initial_setup_marker', 'completed', '{"purpose": "initial_setup_marker"}', NOW(), NOW())
ON CONFLICT DO NOTHING
"""))
session.commit()
logger.info("✅ Created initial setup marker")
except Exception as e:
logger.error(f"Error creating initial setup marker: {e}")
def main():
"""Main initial setup function"""
logger.info("🚀 Starting initial setup check...")
# Step 1: Wait for database
if not wait_for_database():
logger.error("❌ Initial setup failed: Database not available")
sys.exit(1)
# Step 2: Check if initial setup is needed
if not check_initial_setup_needed():
logger.info("🎉 Initial setup not needed - database already populated")
sys.exit(0)
# Step 3: Wait for Celery to be ready
logger.info("Waiting for Celery workers to be ready...")
time.sleep(10) # Give Celery workers time to start
# Step 4: Trigger initial bulk seed
task_id = trigger_initial_bulk_seed()
if task_id:
# Step 5: Create marker
create_initial_setup_marker()
logger.info("🎉 Initial setup triggered successfully!")
logger.info(f"Task ID: {task_id}")
logger.info("The system will begin daily scheduled tasks once initial setup completes.")
sys.exit(0)
else:
logger.error("❌ Initial setup failed")
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -1,390 +0,0 @@
"""
Job Executors for Scheduled Tasks
Integrates existing job functions with the scheduler
"""
import asyncio
import logging
from typing import Dict, Any
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class JobExecutors:
"""Collection of job executor functions for the scheduler"""
@staticmethod
async def incremental_update(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute NVD incremental update job"""
try:
from bulk_seeder import BulkSeeder
seeder = BulkSeeder(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 100)
skip_nvd = parameters.get('skip_nvd', False)
skip_nomi_sec = parameters.get('skip_nomi_sec', True)
logger.info(f"Starting incremental update - batch_size: {batch_size}")
result = await seeder.incremental_update()
return {
'status': 'completed',
'result': result,
'job_type': 'incremental_update',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Incremental update job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'incremental_update',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def cisa_kev_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute CISA KEV sync job"""
try:
from cisa_kev_client import CISAKEVClient
client = CISAKEVClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 100)
logger.info(f"Starting CISA KEV sync - batch_size: {batch_size}")
result = await client.bulk_sync_kev_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'cisa_kev_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"CISA KEV sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'cisa_kev_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def nomi_sec_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute optimized nomi-sec PoC sync job"""
try:
from nomi_sec_client import NomiSecClient
client = NomiSecClient(db_session)
# Extract parameters with optimized defaults
batch_size = parameters.get('batch_size', 100)
max_cves = parameters.get('max_cves', 1000)
force_resync = parameters.get('force_resync', False)
logger.info(f"Starting optimized nomi-sec sync - batch_size: {batch_size}, max_cves: {max_cves}")
result = await client.bulk_sync_poc_data(
batch_size=batch_size,
max_cves=max_cves,
force_resync=force_resync
)
return {
'status': 'completed',
'result': result,
'job_type': 'nomi_sec_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Nomi-sec sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'nomi_sec_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def github_poc_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute GitHub PoC sync job"""
try:
from mcdevitt_poc_client import GitHubPoCClient
client = GitHubPoCClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 50)
logger.info(f"Starting GitHub PoC sync - batch_size: {batch_size}")
result = await client.bulk_sync_poc_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'github_poc_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"GitHub PoC sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'github_poc_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def exploitdb_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute ExploitDB sync job"""
try:
from exploitdb_client_local import ExploitDBLocalClient
client = ExploitDBLocalClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 30)
logger.info(f"Starting ExploitDB sync - batch_size: {batch_size}")
result = await client.bulk_sync_exploitdb_data(batch_size=batch_size)
return {
'status': 'completed',
'result': result,
'job_type': 'exploitdb_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"ExploitDB sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'exploitdb_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def reference_sync(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute reference data sync job"""
try:
from reference_client import ReferenceClient
client = ReferenceClient(db_session)
# Extract parameters
batch_size = parameters.get('batch_size', 30)
max_cves = parameters.get('max_cves', 200)
force_resync = parameters.get('force_resync', False)
logger.info(f"Starting reference sync - batch_size: {batch_size}, max_cves: {max_cves}")
result = await client.bulk_sync_references(
batch_size=batch_size,
max_cves=max_cves,
force_resync=force_resync
)
return {
'status': 'completed',
'result': result,
'job_type': 'reference_sync',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Reference sync job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'reference_sync',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def rule_regeneration(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute SIGMA rule regeneration job"""
try:
from enhanced_sigma_generator import EnhancedSigmaGenerator
generator = EnhancedSigmaGenerator(db_session)
# Extract parameters
force = parameters.get('force', False)
logger.info(f"Starting rule regeneration - force: {force}")
# Get CVEs that need rule regeneration
from main import CVE
if force:
# Regenerate all rules
cves = db_session.query(CVE).all()
else:
# Only regenerate for CVEs with new data
cves = db_session.query(CVE).filter(
CVE.updated_at > CVE.created_at
).all()
total_processed = 0
total_generated = 0
for cve in cves:
try:
# Generate enhanced rule
rule_content = await generator.generate_enhanced_sigma_rule(cve.cve_id)
if rule_content:
total_generated += 1
total_processed += 1
# Small delay to prevent overwhelming the system
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error regenerating rule for {cve.cve_id}: {e}")
result = {
'total_processed': total_processed,
'total_generated': total_generated,
'generation_rate': total_generated / total_processed if total_processed > 0 else 0
}
return {
'status': 'completed',
'result': result,
'job_type': 'rule_regeneration',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Rule regeneration job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'rule_regeneration',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def bulk_seed(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute full bulk seed job"""
try:
from bulk_seeder import BulkSeeder
seeder = BulkSeeder(db_session)
# Extract parameters
start_year = parameters.get('start_year', 2020)
end_year = parameters.get('end_year', 2025)
batch_size = parameters.get('batch_size', 100)
skip_nvd = parameters.get('skip_nvd', False)
skip_nomi_sec = parameters.get('skip_nomi_sec', False)
logger.info(f"Starting full bulk seed - years: {start_year}-{end_year}")
result = await seeder.full_bulk_seed(
start_year=start_year,
end_year=end_year
)
return {
'status': 'completed',
'result': result,
'job_type': 'bulk_seed',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Bulk seed job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'bulk_seed',
'completed_at': datetime.utcnow().isoformat()
}
@staticmethod
async def database_cleanup(db_session: Session, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute database cleanup job"""
try:
from main import BulkProcessingJob
# Extract parameters
days_to_keep = parameters.get('days_to_keep', 30)
cleanup_failed_jobs = parameters.get('cleanup_failed_jobs', True)
cleanup_logs = parameters.get('cleanup_logs', True)
logger.info(f"Starting database cleanup - keep {days_to_keep} days")
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
deleted_jobs = 0
# Clean up old bulk processing jobs
if cleanup_failed_jobs:
# Delete failed jobs older than cutoff
deleted = db_session.query(BulkProcessingJob).filter(
BulkProcessingJob.status == 'failed',
BulkProcessingJob.created_at < cutoff_date
).delete()
deleted_jobs += deleted
# Delete completed jobs older than cutoff (keep some recent ones)
very_old_cutoff = datetime.utcnow() - timedelta(days=days_to_keep * 2)
deleted = db_session.query(BulkProcessingJob).filter(
BulkProcessingJob.status == 'completed',
BulkProcessingJob.created_at < very_old_cutoff
).delete()
deleted_jobs += deleted
db_session.commit()
result = {
'deleted_jobs': deleted_jobs,
'cutoff_date': cutoff_date.isoformat(),
'cleanup_type': 'bulk_processing_jobs'
}
return {
'status': 'completed',
'result': result,
'job_type': 'database_cleanup',
'completed_at': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Database cleanup job failed: {e}")
return {
'status': 'failed',
'error': str(e),
'job_type': 'database_cleanup',
'completed_at': datetime.utcnow().isoformat()
}
def register_all_executors(scheduler):
"""Register all job executors with the scheduler"""
executors = JobExecutors()
scheduler.register_job_executor('incremental_update', executors.incremental_update)
scheduler.register_job_executor('cisa_kev_sync', executors.cisa_kev_sync)
scheduler.register_job_executor('nomi_sec_sync', executors.nomi_sec_sync)
scheduler.register_job_executor('github_poc_sync', executors.github_poc_sync)
scheduler.register_job_executor('exploitdb_sync', executors.exploitdb_sync)
scheduler.register_job_executor('reference_sync', executors.reference_sync)
scheduler.register_job_executor('rule_regeneration', executors.rule_regeneration)
scheduler.register_job_executor('bulk_seed', executors.bulk_seed)
scheduler.register_job_executor('database_cleanup', executors.database_cleanup)
logger.info("All job executors registered successfully")

View file

@ -1,449 +0,0 @@
"""
CVE-SIGMA Auto Generator - Cron-like Job Scheduler
Automated scheduling and execution of data processing jobs
"""
import asyncio
import yaml
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from croniter import croniter
from sqlalchemy.orm import Session
import pytz
import uuid
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ScheduledJob:
"""Represents a scheduled job configuration"""
job_id: str
name: str
enabled: bool
schedule: str # Cron expression
description: str
job_type: str
parameters: Dict[str, Any]
priority: str
timeout_minutes: int
retry_on_failure: bool
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
run_count: int = 0
failure_count: int = 0
is_running: bool = False
max_retries: int = 2
def __post_init__(self):
if self.next_run is None:
self.calculate_next_run()
def calculate_next_run(self, base_time: datetime = None) -> datetime:
"""Calculate the next run time based on cron schedule"""
if base_time is None:
base_time = datetime.now(pytz.UTC)
try:
cron = croniter(self.schedule, base_time)
self.next_run = cron.get_next(datetime)
return self.next_run
except Exception as e:
logger.error(f"Error calculating next run for job {self.name}: {e}")
# Fallback to 1 hour from now
self.next_run = base_time + timedelta(hours=1)
return self.next_run
def should_run(self, current_time: datetime = None) -> bool:
"""Check if job should run now"""
if not self.enabled or self.is_running:
return False
if current_time is None:
current_time = datetime.now(pytz.UTC)
return self.next_run and current_time >= self.next_run
def mark_started(self):
"""Mark job as started"""
self.is_running = True
self.last_run = datetime.now(pytz.UTC)
self.run_count += 1
def mark_completed(self, success: bool = True):
"""Mark job as completed"""
self.is_running = False
if not success:
self.failure_count += 1
self.calculate_next_run()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization"""
return {
'job_id': self.job_id,
'name': self.name,
'enabled': self.enabled,
'schedule': self.schedule,
'description': self.description,
'job_type': self.job_type,
'parameters': self.parameters,
'priority': self.priority,
'timeout_minutes': self.timeout_minutes,
'retry_on_failure': self.retry_on_failure,
'last_run': self.last_run.isoformat() if self.last_run else None,
'next_run': self.next_run.isoformat() if self.next_run else None,
'run_count': self.run_count,
'failure_count': self.failure_count,
'is_running': self.is_running,
'max_retries': self.max_retries
}
class JobRegistry:
"""Registry of available job executors"""
def __init__(self):
self.executors: Dict[str, Callable] = {}
self.db_session_factory = None
def register_executor(self, job_type: str, executor_func: Callable):
"""Register a job executor function"""
self.executors[job_type] = executor_func
logger.info(f"Registered executor for job type: {job_type}")
def set_db_session_factory(self, session_factory):
"""Set database session factory"""
self.db_session_factory = session_factory
async def execute_job(self, job: ScheduledJob) -> bool:
"""Execute a scheduled job"""
if job.job_type not in self.executors:
logger.error(f"No executor found for job type: {job.job_type}")
return False
try:
logger.info(f"Executing scheduled job: {job.name} (type: {job.job_type})")
# Get database session
if self.db_session_factory:
db_session = self.db_session_factory()
else:
logger.error("No database session factory available")
return False
try:
# Execute the job
executor = self.executors[job.job_type]
result = await executor(db_session, job.parameters)
# Check result
if isinstance(result, dict):
success = result.get('status') in ['completed', 'success']
if not success:
logger.warning(f"Job {job.name} completed with status: {result.get('status')}")
else:
success = bool(result)
logger.info(f"Job {job.name} completed successfully: {success}")
return success
finally:
db_session.close()
except Exception as e:
logger.error(f"Error executing job {job.name}: {e}")
return False
class JobScheduler:
"""Main job scheduler with cron-like functionality"""
def __init__(self, config_path: str = "scheduler_config.yaml"):
self.config_path = config_path
self.config: Dict[str, Any] = {}
self.jobs: Dict[str, ScheduledJob] = {}
self.registry = JobRegistry()
self.is_running = False
self.scheduler_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
self.timezone = pytz.UTC
self.max_concurrent_jobs = 3
self.current_jobs = 0
self.job_lock = threading.Lock()
# Load configuration
self.load_config()
# Setup logging
self.setup_logging()
def load_config(self):
"""Load scheduler configuration from YAML file"""
try:
with open(self.config_path, 'r') as f:
self.config = yaml.safe_load(f)
# Extract scheduler settings
scheduler_config = self.config.get('scheduler', {})
self.timezone = pytz.timezone(scheduler_config.get('timezone', 'UTC'))
self.max_concurrent_jobs = scheduler_config.get('max_concurrent_jobs', 3)
# Load job configurations
self.load_jobs()
logger.info(f"Loaded scheduler configuration with {len(self.jobs)} jobs")
except Exception as e:
logger.error(f"Error loading scheduler config: {e}")
self.config = {}
def load_jobs(self):
"""Load job configurations from config"""
jobs_config = self.config.get('jobs', {})
for job_name, job_config in jobs_config.items():
try:
job = ScheduledJob(
job_id=str(uuid.uuid4()),
name=job_name,
enabled=job_config.get('enabled', True),
schedule=job_config.get('schedule', '0 0 * * *'),
description=job_config.get('description', ''),
job_type=job_config.get('job_type', job_name),
parameters=job_config.get('parameters', {}),
priority=job_config.get('priority', 'medium'),
timeout_minutes=job_config.get('timeout_minutes', 60),
retry_on_failure=job_config.get('retry_on_failure', True),
max_retries=job_config.get('max_retries', 2)
)
self.jobs[job_name] = job
logger.info(f"Loaded job: {job_name} - Next run: {job.next_run}")
except Exception as e:
logger.error(f"Error loading job {job_name}: {e}")
def setup_logging(self):
"""Setup scheduler-specific logging"""
log_config = self.config.get('logging', {})
if log_config.get('enabled', True):
log_level = getattr(logging, log_config.get('level', 'INFO'))
logger.setLevel(log_level)
def register_job_executor(self, job_type: str, executor_func: Callable):
"""Register a job executor"""
self.registry.register_executor(job_type, executor_func)
def set_db_session_factory(self, session_factory):
"""Set database session factory"""
self.registry.set_db_session_factory(session_factory)
def start(self):
"""Start the job scheduler"""
if self.is_running:
logger.warning("Scheduler is already running")
return
if not self.config.get('scheduler', {}).get('enabled', True):
logger.info("Scheduler is disabled in configuration")
return
self.is_running = True
self.stop_event.clear()
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.scheduler_thread.start()
logger.info("Job scheduler started")
def stop(self):
"""Stop the job scheduler"""
if not self.is_running:
return
self.is_running = False
self.stop_event.set()
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
logger.info("Job scheduler stopped")
def _scheduler_loop(self):
"""Main scheduler loop"""
logger.info("Scheduler loop started")
while self.is_running and not self.stop_event.is_set():
try:
current_time = datetime.now(self.timezone)
# Check each job
for job_name, job in self.jobs.items():
if job.should_run(current_time) and self.current_jobs < self.max_concurrent_jobs:
# Execute job in background
threading.Thread(
target=self._execute_job_wrapper,
args=(job,),
daemon=True
).start()
# Sleep for 60 seconds (check every minute)
self.stop_event.wait(60)
except Exception as e:
logger.error(f"Error in scheduler loop: {e}")
self.stop_event.wait(60)
logger.info("Scheduler loop stopped")
def _execute_job_wrapper(self, job: ScheduledJob):
"""Wrapper for job execution with proper error handling"""
with self.job_lock:
self.current_jobs += 1
try:
job.mark_started()
# Create asyncio event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Execute job with timeout
success = loop.run_until_complete(
asyncio.wait_for(
self.registry.execute_job(job),
timeout=job.timeout_minutes * 60
)
)
job.mark_completed(success)
if not success and job.retry_on_failure and job.failure_count < job.max_retries:
logger.info(f"Job {job.name} failed, will retry later")
# Schedule retry (next run will be calculated)
job.calculate_next_run(datetime.now(self.timezone) + timedelta(minutes=30))
except asyncio.TimeoutError:
logger.error(f"Job {job.name} timed out after {job.timeout_minutes} minutes")
job.mark_completed(False)
finally:
loop.close()
except Exception as e:
logger.error(f"Error executing job {job.name}: {e}")
job.mark_completed(False)
finally:
with self.job_lock:
self.current_jobs -= 1
def get_job_status(self, job_name: str = None) -> Dict[str, Any]:
"""Get status of jobs"""
if job_name:
job = self.jobs.get(job_name)
if job:
return job.to_dict()
else:
return {"error": f"Job {job_name} not found"}
return {
"scheduler_running": self.is_running,
"total_jobs": len(self.jobs),
"enabled_jobs": sum(1 for job in self.jobs.values() if job.enabled),
"running_jobs": sum(1 for job in self.jobs.values() if job.is_running),
"jobs": {name: job.to_dict() for name, job in self.jobs.items()}
}
def enable_job(self, job_name: str) -> bool:
"""Enable a job"""
if job_name in self.jobs:
self.jobs[job_name].enabled = True
self.jobs[job_name].calculate_next_run()
logger.info(f"Enabled job: {job_name}")
return True
return False
def disable_job(self, job_name: str) -> bool:
"""Disable a job"""
if job_name in self.jobs:
self.jobs[job_name].enabled = False
logger.info(f"Disabled job: {job_name}")
return True
return False
def trigger_job(self, job_name: str) -> bool:
"""Manually trigger a job"""
if job_name not in self.jobs:
return False
job = self.jobs[job_name]
if job.is_running:
logger.warning(f"Job {job_name} is already running")
return False
if self.current_jobs >= self.max_concurrent_jobs:
logger.warning(f"Maximum concurrent jobs reached, cannot start {job_name}")
return False
# Execute job immediately
threading.Thread(
target=self._execute_job_wrapper,
args=(job,),
daemon=True
).start()
logger.info(f"Manually triggered job: {job_name}")
return True
def update_job_schedule(self, job_name: str, new_schedule: str) -> bool:
"""Update job schedule"""
if job_name not in self.jobs:
return False
try:
# Validate cron expression
croniter(new_schedule)
job = self.jobs[job_name]
job.schedule = new_schedule
job.calculate_next_run()
logger.info(f"Updated schedule for job {job_name}: {new_schedule}")
return True
except Exception as e:
logger.error(f"Invalid cron expression {new_schedule}: {e}")
return False
def reload_config(self) -> bool:
"""Reload configuration from file"""
try:
self.load_config()
logger.info("Configuration reloaded successfully")
return True
except Exception as e:
logger.error(f"Error reloading configuration: {e}")
return False
# Global scheduler instance
scheduler_instance: Optional[JobScheduler] = None
def get_scheduler() -> JobScheduler:
"""Get the global scheduler instance"""
global scheduler_instance
if scheduler_instance is None:
scheduler_instance = JobScheduler()
return scheduler_instance
def initialize_scheduler(config_path: str = None) -> JobScheduler:
"""Initialize the global scheduler"""
global scheduler_instance
if config_path:
scheduler_instance = JobScheduler(config_path)
else:
scheduler_instance = JobScheduler()
return scheduler_instance

File diff suppressed because it is too large Load diff

View file

@ -514,7 +514,7 @@ class GitHubPoCClient:
async def bulk_sync_all_cves(self, batch_size: int = 50) -> dict:
"""Bulk synchronize all CVEs with GitHub PoC data"""
from main import CVE
from database_models import CVE
# Load all GitHub PoC data first
github_poc_data = self.load_github_poc_data()

View file

@ -1,182 +0,0 @@
# CVE-SIGMA Auto Generator - Job Scheduler Configuration
# Cron-like scheduling for automated jobs
#
# Cron Format: minute hour day_of_month month day_of_week
# Special values:
# * = any value
# */N = every N units
# N-M = range from N to M
# N,M,O = specific values N, M, and O
#
# Examples:
# "0 */6 * * *" = Every 6 hours
# "0 2 * * *" = Daily at 2 AM
# "0 2 * * 1" = Weekly on Monday at 2 AM
# "0 0 1 * *" = Monthly on the 1st at midnight
# "*/30 * * * *" = Every 30 minutes
scheduler:
enabled: true
timezone: "UTC"
max_concurrent_jobs: 3
job_timeout_hours: 4
retry_failed_jobs: true
max_retries: 2
jobs:
# NVD Incremental Updates - Fetch new CVEs regularly
nvd_incremental_update:
enabled: true
schedule: "0 */6 * * *" # Every 6 hours
description: "Fetch new CVEs from NVD modified feeds"
job_type: "incremental_update"
parameters:
batch_size: 100
skip_nvd: false
skip_nomi_sec: true
priority: "high"
timeout_minutes: 60
retry_on_failure: true
# CISA KEV Sync - Update known exploited vulnerabilities
cisa_kev_sync:
enabled: true
schedule: "0 3 * * *" # Daily at 3 AM
description: "Sync CISA Known Exploited Vulnerabilities"
job_type: "cisa_kev_sync"
parameters:
batch_size: 100
priority: "high"
timeout_minutes: 30
retry_on_failure: true
# Nomi-sec PoC Sync - Update proof-of-concept data (OPTIMIZED)
nomi_sec_sync:
enabled: true
schedule: "0 4 * * 1" # Weekly on Monday at 4 AM
description: "Sync nomi-sec Proof-of-Concept data (optimized)"
job_type: "nomi_sec_sync"
parameters:
batch_size: 100 # Increased batch size
max_cves: 1000 # Limit to recent/important CVEs
force_resync: false # Skip recently synced CVEs
priority: "high" # Increased priority
timeout_minutes: 60 # Reduced timeout due to optimizations
retry_on_failure: true
# GitHub PoC Sync - Update GitHub proof-of-concept data
github_poc_sync:
enabled: true
schedule: "0 5 * * 1" # Weekly on Monday at 5 AM
description: "Sync GitHub Proof-of-Concept data"
job_type: "github_poc_sync"
parameters:
batch_size: 50
priority: "medium"
timeout_minutes: 120
retry_on_failure: true
# ExploitDB Sync - Update exploit database
exploitdb_sync:
enabled: true
schedule: "0 6 * * 2" # Weekly on Tuesday at 6 AM
description: "Sync ExploitDB data"
job_type: "exploitdb_sync"
parameters:
batch_size: 30
priority: "medium"
timeout_minutes: 90
retry_on_failure: true
# Reference Data Sync - Extract content from CVE references
reference_sync:
enabled: true
schedule: "0 2 * * 3" # Weekly on Wednesday at 2 AM
description: "Extract and analyze CVE reference content"
job_type: "reference_sync"
parameters:
batch_size: 30
max_cves: 200
force_resync: false
priority: "medium"
timeout_minutes: 180
retry_on_failure: true
# Rule Regeneration - Regenerate SIGMA rules with latest data
rule_regeneration:
enabled: true
schedule: "0 7 * * 4" # Weekly on Thursday at 7 AM
description: "Regenerate SIGMA rules with enhanced data"
job_type: "rule_regeneration"
parameters:
force: false
priority: "low"
timeout_minutes: 240
retry_on_failure: false
# Full Bulk Seed - Complete data refresh (monthly)
full_bulk_seed:
enabled: false # Disabled by default due to resource intensity
schedule: "0 1 1 * *" # Monthly on the 1st at 1 AM
description: "Complete bulk seed of all data sources"
job_type: "bulk_seed"
parameters:
start_year: 2020
end_year: 2025
batch_size: 100
skip_nvd: false
skip_nomi_sec: false
priority: "low"
timeout_minutes: 1440 # 24 hours
retry_on_failure: false
# Database Cleanup - Clean old job records and logs
database_cleanup:
enabled: true
schedule: "0 0 * * 0" # Weekly on Sunday at midnight
description: "Clean up old job records and temporary data"
job_type: "database_cleanup"
parameters:
days_to_keep: 30
cleanup_failed_jobs: true
cleanup_logs: true
priority: "low"
timeout_minutes: 30
retry_on_failure: false
# Job execution policies
policies:
# Prevent overlapping jobs of the same type
prevent_overlap: true
# Maximum job execution time before forced termination
max_execution_time_hours: 6
# Retry policy for failed jobs
retry_policy:
enabled: true
max_retries: 2
retry_delay_minutes: 30
exponential_backoff: true
# Resource management
resource_limits:
max_memory_mb: 2048
max_cpu_percent: 80
# Notification settings (future enhancement)
notifications:
enabled: false
on_success: false
on_failure: true
webhook_url: ""
email_recipients: []
# Logging configuration for scheduler
logging:
enabled: true
level: "INFO"
log_file: "/app/logs/scheduler.log"
max_log_size_mb: 100
backup_count: 5
log_format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

View file

@ -1,3 +0,0 @@
"""
Celery tasks for the Auto SIGMA Rule Generator
"""

View file

@ -1,235 +0,0 @@
"""
Bulk processing tasks for Celery
"""
import asyncio
import logging
from typing import Optional, Dict, Any
from celery import current_task
from celery_config import celery_app, get_db_session
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from bulk_seeder import BulkSeeder
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name='bulk_tasks.full_bulk_seed')
def full_bulk_seed_task(self, start_year: int = 2002, end_year: Optional[int] = None,
skip_nvd: bool = False, skip_nomi_sec: bool = False,
skip_exploitdb: bool = False, skip_cisa_kev: bool = False) -> Dict[str, Any]:
"""
Celery task for full bulk seeding operation
Args:
start_year: Starting year for NVD data
end_year: Ending year for NVD data
skip_nvd: Skip NVD bulk processing
skip_nomi_sec: Skip nomi-sec PoC synchronization
skip_exploitdb: Skip ExploitDB synchronization
skip_cisa_kev: Skip CISA KEV synchronization
Returns:
Dictionary containing operation results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'initializing',
'progress': 0,
'message': 'Starting bulk seeding operation'
}
)
logger.info(f"Starting full bulk seed task: {start_year}-{end_year}")
# Create seeder instance
seeder = BulkSeeder(db_session)
# Create progress callback
def update_progress(stage: str, progress: int, message: str = None):
self.update_state(
state='PROGRESS',
meta={
'stage': stage,
'progress': progress,
'message': message or f'Processing {stage}'
}
)
# Run the bulk seeding operation
# Note: We need to handle the async nature of bulk_seeder
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
seeder.full_bulk_seed(
start_year=start_year,
end_year=end_year,
skip_nvd=skip_nvd,
skip_nomi_sec=skip_nomi_sec,
skip_exploitdb=skip_exploitdb,
skip_cisa_kev=skip_cisa_kev,
progress_callback=update_progress
)
)
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'Bulk seeding completed successfully'
}
)
logger.info(f"Full bulk seed task completed: {result}")
return result
except Exception as e:
logger.error(f"Full bulk seed task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='bulk_tasks.incremental_update_task')
def incremental_update_task(self) -> Dict[str, Any]:
"""
Celery task for incremental updates
Returns:
Dictionary containing update results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'incremental_update',
'progress': 0,
'message': 'Starting incremental update'
}
)
logger.info("Starting incremental update task")
# Create seeder instance
seeder = BulkSeeder(db_session)
# Run the incremental update
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(seeder.incremental_update())
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'Incremental update completed successfully'
}
)
logger.info(f"Incremental update task completed: {result}")
return result
except Exception as e:
logger.error(f"Incremental update task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='bulk_tasks.generate_enhanced_sigma_rules')
def generate_enhanced_sigma_rules_task(self) -> Dict[str, Any]:
"""
Celery task for generating enhanced SIGMA rules
Returns:
Dictionary containing generation results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'generating_rules',
'progress': 0,
'message': 'Starting enhanced SIGMA rule generation'
}
)
logger.info("Starting enhanced SIGMA rule generation task")
# Create seeder instance
seeder = BulkSeeder(db_session)
# Run the rule generation
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(seeder.generate_enhanced_sigma_rules())
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'Enhanced SIGMA rule generation completed successfully'
}
)
logger.info(f"Enhanced SIGMA rule generation task completed: {result}")
return result
except Exception as e:
logger.error(f"Enhanced SIGMA rule generation task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()

View file

@ -1,504 +0,0 @@
"""
Data synchronization tasks for Celery
"""
import asyncio
import logging
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from typing import Dict, Any
from celery import current_task
from celery_config import celery_app, get_db_session
from nomi_sec_client import NomiSecClient
from exploitdb_client_local import ExploitDBLocalClient
from cisa_kev_client import CISAKEVClient
from mcdevitt_poc_client import GitHubPoCClient
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name='data_sync_tasks.sync_nomi_sec')
def sync_nomi_sec_task(self, batch_size: int = 50) -> Dict[str, Any]:
"""
Celery task for nomi-sec PoC synchronization
Args:
batch_size: Number of CVEs to process in each batch
Returns:
Dictionary containing sync results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_nomi_sec',
'progress': 0,
'message': 'Starting nomi-sec PoC synchronization'
}
)
logger.info(f"Starting nomi-sec sync task with batch size: {batch_size}")
# Create client instance
client = NomiSecClient(db_session)
# Run the synchronization
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
client.bulk_sync_all_cves(batch_size=batch_size)
)
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'Nomi-sec synchronization completed successfully'
}
)
logger.info(f"Nomi-sec sync task completed: {result}")
return result
except Exception as e:
logger.error(f"Nomi-sec sync task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='data_sync_tasks.sync_github_poc')
def sync_github_poc_task(self, batch_size: int = 50) -> Dict[str, Any]:
"""
Celery task for GitHub PoC synchronization
Args:
batch_size: Number of CVEs to process in each batch
Returns:
Dictionary containing sync results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_github_poc',
'progress': 0,
'message': 'Starting GitHub PoC synchronization'
}
)
logger.info(f"Starting GitHub PoC sync task with batch size: {batch_size}")
# Create client instance
client = GitHubPoCClient(db_session)
# Run the synchronization
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
client.bulk_sync_all_cves(batch_size=batch_size)
)
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'GitHub PoC synchronization completed successfully'
}
)
logger.info(f"GitHub PoC sync task completed: {result}")
return result
except Exception as e:
logger.error(f"GitHub PoC sync task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='data_sync_tasks.sync_reference_content')
def sync_reference_content_task(self, batch_size: int = 30, max_cves: int = 200,
force_resync: bool = False) -> Dict[str, Any]:
"""
Celery task for CVE reference content extraction and analysis
Args:
batch_size: Number of CVEs to process in each batch
max_cves: Maximum number of CVEs to process
force_resync: Force re-sync of recently processed CVEs
Returns:
Dictionary containing sync results
"""
db_session = get_db_session()
try:
# Import here to avoid circular imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import CVE
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_reference_content',
'progress': 0,
'message': 'Starting CVE reference content extraction'
}
)
logger.info(f"Starting reference content sync task - batch_size: {batch_size}, max_cves: {max_cves}")
# Get CVEs to process (prioritize those with references but no extracted content)
query = db_session.query(CVE)
if not force_resync:
# Skip CVEs that were recently processed
from datetime import datetime, timedelta
cutoff_date = datetime.utcnow() - timedelta(days=7)
query = query.filter(
(CVE.reference_content_extracted_at.is_(None)) |
(CVE.reference_content_extracted_at < cutoff_date)
)
# Prioritize CVEs with references
cves = query.filter(CVE.references.isnot(None)).limit(max_cves).all()
if not cves:
logger.info("No CVEs found for reference content extraction")
return {'total_processed': 0, 'successful_extractions': 0, 'failed_extractions': 0}
total_processed = 0
successful_extractions = 0
failed_extractions = 0
# Process CVEs in batches
for i in range(0, len(cves), batch_size):
batch = cves[i:i + batch_size]
for j, cve in enumerate(batch):
try:
# Update progress
overall_progress = int(((i + j) / len(cves)) * 100)
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_reference_content',
'progress': overall_progress,
'message': f'Processing CVE {cve.cve_id} ({i + j + 1}/{len(cves)})',
'current_cve': cve.cve_id,
'processed': i + j,
'total': len(cves)
}
)
# For now, simulate reference content extraction
# In a real implementation, you would create a ReferenceContentExtractor
# and extract content from CVE references
# Mark CVE as processed
from datetime import datetime
cve.reference_content_extracted_at = datetime.utcnow()
successful_extractions += 1
total_processed += 1
# Small delay between requests
import time
time.sleep(2)
except Exception as e:
logger.error(f"Error processing reference content for CVE {cve.cve_id}: {e}")
failed_extractions += 1
total_processed += 1
# Commit after each batch
db_session.commit()
logger.info(f"Processed batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}")
# Final results
result = {
'total_processed': total_processed,
'successful_extractions': successful_extractions,
'failed_extractions': failed_extractions,
'extraction_rate': (successful_extractions / total_processed * 100) if total_processed > 0 else 0
}
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'Reference content extraction completed: {successful_extractions} successful, {failed_extractions} failed',
'results': result
}
)
logger.info(f"Reference content sync task completed: {result}")
return result
except Exception as e:
logger.error(f"Reference content sync task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='data_sync_tasks.sync_exploitdb')
def sync_exploitdb_task(self, batch_size: int = 30) -> Dict[str, Any]:
"""
Celery task for ExploitDB synchronization
Args:
batch_size: Number of CVEs to process in each batch
Returns:
Dictionary containing sync results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_exploitdb',
'progress': 0,
'message': 'Starting ExploitDB synchronization'
}
)
logger.info(f"Starting ExploitDB sync task with batch size: {batch_size}")
# Create client instance
client = ExploitDBLocalClient(db_session)
# Run the synchronization
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
client.bulk_sync_exploitdb(batch_size=batch_size)
)
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'ExploitDB synchronization completed successfully'
}
)
logger.info(f"ExploitDB sync task completed: {result}")
return result
except Exception as e:
logger.error(f"ExploitDB sync task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='data_sync_tasks.sync_cisa_kev')
def sync_cisa_kev_task(self, batch_size: int = 100) -> Dict[str, Any]:
"""
Celery task for CISA KEV synchronization
Args:
batch_size: Number of CVEs to process in each batch
Returns:
Dictionary containing sync results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'sync_cisa_kev',
'progress': 0,
'message': 'Starting CISA KEV synchronization'
}
)
logger.info(f"Starting CISA KEV sync task with batch size: {batch_size}")
# Create client instance
client = CISAKEVClient(db_session)
# Run the synchronization
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
client.bulk_sync_kev_data(batch_size=batch_size)
)
finally:
loop.close()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': 'CISA KEV synchronization completed successfully'
}
)
logger.info(f"CISA KEV sync task completed: {result}")
return result
except Exception as e:
logger.error(f"CISA KEV sync task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='data_sync_tasks.build_exploitdb_index')
def build_exploitdb_index_task(self) -> Dict[str, Any]:
"""
Celery task for building/rebuilding ExploitDB file index
Returns:
Dictionary containing build results
"""
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'build_exploitdb_index',
'progress': 0,
'message': 'Starting ExploitDB file index building'
}
)
logger.info("Starting ExploitDB index build task")
# Import here to avoid circular dependencies
from exploitdb_client_local import ExploitDBLocalClient
# Create client instance with lazy_load=False to force index building
client = ExploitDBLocalClient(None, lazy_load=False)
# Update progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'build_exploitdb_index',
'progress': 50,
'message': 'Building file index...'
}
)
# Force index rebuild
client._build_file_index()
# Update progress to completion
self.update_state(
state='PROGRESS',
meta={
'stage': 'build_exploitdb_index',
'progress': 100,
'message': 'ExploitDB index building completed successfully'
}
)
result = {
'status': 'completed',
'total_exploits_indexed': len(client.file_index),
'index_updated': True
}
logger.info(f"ExploitDB index build task completed: {result}")
return result
except Exception as e:
logger.error(f"ExploitDB index build task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise

View file

@ -1,444 +0,0 @@
"""
Maintenance tasks for Celery
"""
import logging
from datetime import datetime, timedelta
from typing import Dict, Any
from celery_config import celery_app, get_db_session
logger = logging.getLogger(__name__)
@celery_app.task(name='tasks.maintenance_tasks.cleanup_old_results')
def cleanup_old_results():
"""
Periodic task to clean up old Celery results and logs
"""
try:
logger.info("Starting cleanup of old Celery results")
# This would clean up old results from Redis
# For now, we'll just log the action
cutoff_date = datetime.utcnow() - timedelta(days=7)
# Clean up old task results (this would be Redis cleanup)
# celery_app.backend.cleanup()
logger.info(f"Cleanup completed for results older than {cutoff_date}")
return {
'status': 'completed',
'cutoff_date': cutoff_date.isoformat(),
'message': 'Old results cleanup completed'
}
except Exception as e:
logger.error(f"Cleanup task failed: {e}")
raise
@celery_app.task(name='tasks.maintenance_tasks.health_check')
def health_check():
"""
Health check task to verify system components
"""
try:
db_session = get_db_session()
# Check database connectivity
try:
from sqlalchemy import text
db_session.execute(text("SELECT 1"))
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {e}"
finally:
db_session.close()
# Check Redis connectivity
try:
import redis
redis_client = redis.Redis.from_url(celery_app.conf.broker_url)
redis_client.ping()
redis_status = "healthy"
except Exception as e:
redis_status = f"unhealthy: {e}"
result = {
'timestamp': datetime.utcnow().isoformat(),
'database': db_status,
'redis': redis_status,
'celery': 'healthy'
}
logger.info(f"Health check completed: {result}")
return result
except Exception as e:
logger.error(f"Health check failed: {e}")
raise
@celery_app.task(bind=True, name='tasks.maintenance_tasks.database_cleanup_comprehensive')
def database_cleanup_comprehensive(self, days_to_keep: int = 30, cleanup_failed_jobs: bool = True,
cleanup_logs: bool = True) -> Dict[str, Any]:
"""
Comprehensive database cleanup task
Args:
days_to_keep: Number of days to keep old records
cleanup_failed_jobs: Whether to clean up failed job records
cleanup_logs: Whether to clean up old log entries
Returns:
Dictionary containing cleanup results
"""
try:
from datetime import datetime, timedelta
from typing import Dict, Any
db_session = get_db_session()
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'database_cleanup',
'progress': 0,
'message': 'Starting comprehensive database cleanup'
}
)
logger.info(f"Starting comprehensive database cleanup - keeping {days_to_keep} days")
cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
cleanup_results = {
'cutoff_date': cutoff_date.isoformat(),
'cleaned_tables': {},
'total_records_cleaned': 0
}
try:
# Import models here to avoid circular imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import BulkProcessingJob
# Clean up old bulk processing jobs
self.update_state(
state='PROGRESS',
meta={
'stage': 'database_cleanup',
'progress': 20,
'message': 'Cleaning up old bulk processing jobs'
}
)
old_jobs_query = db_session.query(BulkProcessingJob).filter(
BulkProcessingJob.created_at < cutoff_date
)
if cleanup_failed_jobs:
# Clean all old jobs
old_jobs_count = old_jobs_query.count()
old_jobs_query.delete()
else:
# Only clean completed jobs
old_jobs_query = old_jobs_query.filter(
BulkProcessingJob.status.in_(['completed', 'cancelled'])
)
old_jobs_count = old_jobs_query.count()
old_jobs_query.delete()
cleanup_results['cleaned_tables']['bulk_processing_jobs'] = old_jobs_count
cleanup_results['total_records_cleaned'] += old_jobs_count
# Clean up old Celery task results from Redis
self.update_state(
state='PROGRESS',
meta={
'stage': 'database_cleanup',
'progress': 40,
'message': 'Cleaning up old Celery task results'
}
)
try:
# This would clean up old results from Redis backend
# For now, we'll simulate this
celery_cleanup_count = 0
# celery_app.backend.cleanup()
cleanup_results['cleaned_tables']['celery_results'] = celery_cleanup_count
except Exception as e:
logger.warning(f"Could not clean Celery results: {e}")
cleanup_results['cleaned_tables']['celery_results'] = 0
# Clean up old temporary data (if any)
self.update_state(
state='PROGRESS',
meta={
'stage': 'database_cleanup',
'progress': 60,
'message': 'Cleaning up temporary data'
}
)
# Add any custom temporary table cleanup here
# Example: Clean up old session data, temporary files, etc.
temp_cleanup_count = 0
cleanup_results['cleaned_tables']['temporary_data'] = temp_cleanup_count
# Vacuum/optimize database (PostgreSQL)
self.update_state(
state='PROGRESS',
meta={
'stage': 'database_cleanup',
'progress': 80,
'message': 'Optimizing database'
}
)
try:
# Run VACUUM on PostgreSQL to reclaim space
from sqlalchemy import text
db_session.execute(text("VACUUM;"))
cleanup_results['database_optimized'] = True
except Exception as e:
logger.warning(f"Could not vacuum database: {e}")
cleanup_results['database_optimized'] = False
# Commit all changes
db_session.commit()
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'Database cleanup completed - removed {cleanup_results["total_records_cleaned"]} records',
'results': cleanup_results
}
)
logger.info(f"Database cleanup completed: {cleanup_results}")
return cleanup_results
finally:
db_session.close()
except Exception as e:
logger.error(f"Database cleanup failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Cleanup failed: {str(e)}',
'error': str(e)
}
)
raise
@celery_app.task(bind=True, name='tasks.maintenance_tasks.health_check_detailed')
def health_check_detailed(self) -> Dict[str, Any]:
"""
Detailed health check task for all system components
Returns:
Dictionary containing detailed health status
"""
try:
from datetime import datetime
import psutil
import redis
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'health_check',
'progress': 0,
'message': 'Starting detailed health check'
}
)
logger.info("Starting detailed health check")
health_status = {
'timestamp': datetime.utcnow().isoformat(),
'overall_status': 'healthy',
'components': {}
}
# Check database connectivity and performance
self.update_state(
state='PROGRESS',
meta={
'stage': 'health_check',
'progress': 20,
'message': 'Checking database health'
}
)
db_session = get_db_session()
try:
from sqlalchemy import text
start_time = datetime.utcnow()
db_session.execute(text("SELECT 1"))
db_response_time = (datetime.utcnow() - start_time).total_seconds()
# Check database size and connections
db_size_result = db_session.execute(text("SELECT pg_size_pretty(pg_database_size(current_database()));")).fetchone()
db_connections_result = db_session.execute(text("SELECT count(*) FROM pg_stat_activity;")).fetchone()
health_status['components']['database'] = {
'status': 'healthy',
'response_time_seconds': db_response_time,
'database_size': db_size_result[0] if db_size_result else 'unknown',
'active_connections': db_connections_result[0] if db_connections_result else 0,
'details': 'Database responsive and accessible'
}
except Exception as e:
health_status['components']['database'] = {
'status': 'unhealthy',
'error': str(e),
'details': 'Database connection failed'
}
health_status['overall_status'] = 'degraded'
finally:
db_session.close()
# Check Redis connectivity and performance
self.update_state(
state='PROGRESS',
meta={
'stage': 'health_check',
'progress': 40,
'message': 'Checking Redis health'
}
)
try:
import redis
start_time = datetime.utcnow()
redis_client = redis.Redis.from_url(celery_app.conf.broker_url)
redis_client.ping()
redis_response_time = (datetime.utcnow() - start_time).total_seconds()
# Get Redis info
redis_client = redis.Redis.from_url(celery_app.conf.broker_url)
redis_info = redis_client.info()
health_status['components']['redis'] = {
'status': 'healthy',
'response_time_seconds': redis_response_time,
'memory_usage_mb': redis_info.get('used_memory', 0) / (1024 * 1024),
'connected_clients': redis_info.get('connected_clients', 0),
'uptime_seconds': redis_info.get('uptime_in_seconds', 0),
'details': 'Redis responsive and accessible'
}
except Exception as e:
health_status['components']['redis'] = {
'status': 'unhealthy',
'error': str(e),
'details': 'Redis connection failed'
}
health_status['overall_status'] = 'degraded'
# Check system resources
self.update_state(
state='PROGRESS',
meta={
'stage': 'health_check',
'progress': 60,
'message': 'Checking system resources'
}
)
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
health_status['components']['system'] = {
'status': 'healthy',
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available_gb': memory.available / (1024**3),
'disk_percent': disk.percent,
'disk_free_gb': disk.free / (1024**3),
'details': 'System resources within normal ranges'
}
# Mark as degraded if resources are high
if cpu_percent > 80 or memory.percent > 85 or disk.percent > 90:
health_status['components']['system']['status'] = 'degraded'
health_status['overall_status'] = 'degraded'
health_status['components']['system']['details'] = 'High resource usage detected'
except Exception as e:
health_status['components']['system'] = {
'status': 'unknown',
'error': str(e),
'details': 'Could not check system resources'
}
# Check Celery worker status
self.update_state(
state='PROGRESS',
meta={
'stage': 'health_check',
'progress': 80,
'message': 'Checking Celery workers'
}
)
try:
inspect = celery_app.control.inspect()
active_workers = inspect.active()
stats = inspect.stats()
health_status['components']['celery'] = {
'status': 'healthy',
'active_workers': len(active_workers) if active_workers else 0,
'worker_stats': stats,
'details': 'Celery workers responding'
}
if not active_workers:
health_status['components']['celery']['status'] = 'degraded'
health_status['components']['celery']['details'] = 'No active workers found'
health_status['overall_status'] = 'degraded'
except Exception as e:
health_status['components']['celery'] = {
'status': 'unknown',
'error': str(e),
'details': 'Could not check Celery workers'
}
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'Health check completed - overall status: {health_status["overall_status"]}',
'results': health_status
}
)
logger.info(f"Detailed health check completed: {health_status['overall_status']}")
return health_status
except Exception as e:
logger.error(f"Detailed health check failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Health check failed: {str(e)}',
'error': str(e)
}
)
raise

View file

@ -1,409 +0,0 @@
"""
SIGMA rule generation tasks for Celery
"""
import asyncio
import logging
from typing import Dict, Any, List, Optional
from celery import current_task
from celery_config import celery_app, get_db_session
from enhanced_sigma_generator import EnhancedSigmaGenerator
from llm_client import LLMClient
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name='sigma_tasks.generate_enhanced_rules')
def generate_enhanced_rules_task(self, cve_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Celery task for enhanced SIGMA rule generation
Args:
cve_ids: Optional list of specific CVE IDs to process
Returns:
Dictionary containing generation results
"""
db_session = get_db_session()
try:
# Import here to avoid circular imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import CVE
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'generating_rules',
'progress': 0,
'message': 'Starting enhanced SIGMA rule generation'
}
)
logger.info(f"Starting enhanced rule generation task for CVEs: {cve_ids}")
# Create generator instance
generator = EnhancedSigmaGenerator(db_session)
# Get CVEs to process
if cve_ids:
cves = db_session.query(CVE).filter(CVE.cve_id.in_(cve_ids)).all()
else:
cves = db_session.query(CVE).filter(CVE.poc_count > 0).all()
total_cves = len(cves)
processed_cves = 0
successful_rules = 0
failed_rules = 0
results = []
# Process each CVE
for i, cve in enumerate(cves):
try:
# Update progress
progress = int((i / total_cves) * 100)
self.update_state(
state='PROGRESS',
meta={
'stage': 'generating_rules',
'progress': progress,
'message': f'Processing CVE {cve.cve_id}',
'current_cve': cve.cve_id,
'processed': processed_cves,
'total': total_cves
}
)
# Generate rule using asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
generator.generate_enhanced_rule(cve)
)
if result.get('success', False):
successful_rules += 1
else:
failed_rules += 1
results.append({
'cve_id': cve.cve_id,
'success': result.get('success', False),
'message': result.get('message', 'No message'),
'rule_id': result.get('rule_id')
})
finally:
loop.close()
processed_cves += 1
except Exception as e:
logger.error(f"Error processing CVE {cve.cve_id}: {e}")
failed_rules += 1
results.append({
'cve_id': cve.cve_id,
'success': False,
'message': f'Error: {str(e)}',
'rule_id': None
})
# Final results
final_result = {
'total_processed': processed_cves,
'successful_rules': successful_rules,
'failed_rules': failed_rules,
'results': results
}
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'Generated {successful_rules} rules from {processed_cves} CVEs',
'results': final_result
}
)
logger.info(f"Enhanced rule generation task completed: {final_result}")
return final_result
except Exception as e:
logger.error(f"Enhanced rule generation task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='sigma_tasks.llm_enhanced_generation')
def llm_enhanced_generation_task(self, cve_id: str, provider: str = 'ollama',
model: Optional[str] = None) -> Dict[str, Any]:
"""
Celery task for LLM-enhanced rule generation
Args:
cve_id: CVE identifier
provider: LLM provider (openai, anthropic, ollama, finetuned)
model: Specific model to use
Returns:
Dictionary containing generation result
"""
db_session = get_db_session()
try:
# Import here to avoid circular imports
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import CVE
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'llm_generation',
'progress': 10,
'message': f'Starting LLM rule generation for {cve_id}',
'cve_id': cve_id,
'provider': provider,
'model': model
}
)
logger.info(f"Starting LLM rule generation for {cve_id} using {provider}")
# Get CVE from database
cve = db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if not cve:
raise ValueError(f"CVE {cve_id} not found in database")
# Update progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'llm_generation',
'progress': 25,
'message': f'Initializing LLM client ({provider})',
'cve_id': cve_id
}
)
# Create LLM client
llm_client = LLMClient(provider=provider, model=model)
if not llm_client.is_available():
raise ValueError(f"LLM client not available for provider: {provider}")
# Update progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'llm_generation',
'progress': 50,
'message': f'Generating rule with LLM for {cve_id}',
'cve_id': cve_id
}
)
# Generate rule using asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
rule_content = loop.run_until_complete(
llm_client.generate_sigma_rule(
cve_id=cve.cve_id,
poc_content=cve.poc_data or '',
cve_description=cve.description or ''
)
)
finally:
loop.close()
# Update progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'llm_generation',
'progress': 75,
'message': f'Validating generated rule for {cve_id}',
'cve_id': cve_id
}
)
# Validate the generated rule
is_valid = False
if rule_content:
is_valid = llm_client.validate_sigma_rule(rule_content, cve_id)
# Prepare result
result = {
'cve_id': cve_id,
'rule_content': rule_content,
'is_valid': is_valid,
'provider': provider,
'model': model or llm_client.model,
'success': bool(rule_content and is_valid)
}
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'LLM rule generation completed for {cve_id}',
'cve_id': cve_id,
'success': result['success'],
'result': result
}
)
logger.info(f"LLM rule generation task completed for {cve_id}: {result['success']}")
return result
except Exception as e:
logger.error(f"LLM rule generation task failed for {cve_id}: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Task failed for {cve_id}: {str(e)}',
'cve_id': cve_id,
'error': str(e)
}
)
raise
finally:
db_session.close()
@celery_app.task(bind=True, name='sigma_tasks.batch_llm_generation')
def batch_llm_generation_task(self, cve_ids: List[str], provider: str = 'ollama',
model: Optional[str] = None) -> Dict[str, Any]:
"""
Celery task for batch LLM rule generation
Args:
cve_ids: List of CVE identifiers
provider: LLM provider (openai, anthropic, ollama, finetuned)
model: Specific model to use
Returns:
Dictionary containing batch generation results
"""
db_session = get_db_session()
try:
# Update task progress
self.update_state(
state='PROGRESS',
meta={
'stage': 'batch_llm_generation',
'progress': 0,
'message': f'Starting batch LLM generation for {len(cve_ids)} CVEs',
'total_cves': len(cve_ids),
'provider': provider,
'model': model
}
)
logger.info(f"Starting batch LLM generation for {len(cve_ids)} CVEs using {provider}")
# Initialize results
results = []
successful_rules = 0
failed_rules = 0
# Process each CVE
for i, cve_id in enumerate(cve_ids):
try:
# Update progress
progress = int((i / len(cve_ids)) * 100)
self.update_state(
state='PROGRESS',
meta={
'stage': 'batch_llm_generation',
'progress': progress,
'message': f'Processing CVE {cve_id} ({i+1}/{len(cve_ids)})',
'current_cve': cve_id,
'processed': i,
'total': len(cve_ids)
}
)
# Generate rule for this CVE
result = llm_enhanced_generation_task.apply(
args=[cve_id, provider, model]
).get()
if result.get('success', False):
successful_rules += 1
else:
failed_rules += 1
results.append(result)
except Exception as e:
logger.error(f"Error processing CVE {cve_id} in batch: {e}")
failed_rules += 1
results.append({
'cve_id': cve_id,
'success': False,
'error': str(e),
'provider': provider,
'model': model
})
# Final results
final_result = {
'total_processed': len(cve_ids),
'successful_rules': successful_rules,
'failed_rules': failed_rules,
'provider': provider,
'model': model,
'results': results
}
# Update final progress
self.update_state(
state='SUCCESS',
meta={
'stage': 'completed',
'progress': 100,
'message': f'Batch generation completed: {successful_rules} successful, {failed_rules} failed',
'results': final_result
}
)
logger.info(f"Batch LLM generation task completed: {final_result}")
return final_result
except Exception as e:
logger.error(f"Batch LLM generation task failed: {e}")
self.update_state(
state='FAILURE',
meta={
'stage': 'error',
'progress': 0,
'message': f'Batch task failed: {str(e)}',
'error': str(e)
}
)
raise
finally:
db_session.close()

View file

@ -1,211 +0,0 @@
#!/usr/bin/env python3
"""
Test script for enhanced SIGMA rule generation
"""
import asyncio
import json
from datetime import datetime
from main import SessionLocal, CVE, SigmaRule, Base, engine
from enhanced_sigma_generator import EnhancedSigmaGenerator
from nomi_sec_client import NomiSecClient
from initialize_templates import initialize_templates
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
async def test_enhanced_rule_generation():
"""Test the enhanced rule generation with mock data"""
# Initialize templates
print("Initializing templates...")
initialize_templates()
db = SessionLocal()
try:
# Check if CVE already exists, if not create it
test_cve = db.query(CVE).filter(CVE.cve_id == "CVE-2014-7236").first()
if not test_cve:
# Create a test CVE with mock PoC data
test_cve = CVE(
cve_id="CVE-2014-7236",
description="Remote code execution vulnerability in Microsoft Office",
cvss_score=8.5,
severity="high",
published_date=datetime(2014, 10, 15),
affected_products=["Microsoft Office", "Windows"],
poc_count=2,
poc_data=[
{
"id": "test1",
"name": "CVE-2014-7236-exploit",
"owner": "security-researcher",
"full_name": "security-researcher/CVE-2014-7236-exploit",
"html_url": "https://github.com/security-researcher/CVE-2014-7236-exploit",
"description": "PowerShell exploit for CVE-2014-7236 using cmd.exe and powershell.exe",
"stargazers_count": 15,
"created_at": "2014-11-01T00:00:00Z",
"updated_at": "2014-11-15T00:00:00Z",
"quality_analysis": {
"quality_score": 75,
"quality_tier": "good",
"factors": {
"star_score": 30,
"recency_score": 10,
"description_score": 15,
"vuln_description_score": 15,
"name_relevance_score": 10
}
},
"exploit_indicators": {
"processes": ["powershell.exe", "cmd.exe"],
"files": ["exploit.ps1", "payload.exe"],
"commands": ["Invoke-Expression", "DownloadString", "whoami"],
"network": ["192.168.1.100", "8080"],
"urls": ["http://malicious.com/payload"],
"registry": ["HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft"]
}
},
{
"id": "test2",
"name": "office-exploit-poc",
"owner": "hacker",
"full_name": "hacker/office-exploit-poc",
"html_url": "https://github.com/hacker/office-exploit-poc",
"description": "Office document exploit with malicious macro",
"stargazers_count": 8,
"created_at": "2014-12-01T00:00:00Z",
"updated_at": "2014-12-10T00:00:00Z",
"quality_analysis": {
"quality_score": 45,
"quality_tier": "fair",
"factors": {
"star_score": 16,
"recency_score": 8,
"description_score": 12,
"vuln_description_score": 0,
"name_relevance_score": 5
}
},
"exploit_indicators": {
"processes": ["winword.exe", "excel.exe"],
"files": ["document.docx", "malicious.xlsm"],
"commands": ["CreateObject", "Shell.Application"],
"network": ["10.0.0.1"],
"urls": ["http://evil.com/download"],
"registry": ["HKEY_CURRENT_USER\\Software\\Microsoft\\Office"]
}
}
]
)
# Add to database
db.add(test_cve)
db.commit()
else:
# Update existing CVE with our mock PoC data
test_cve.poc_count = 2
test_cve.poc_data = [
{
"id": "test1",
"name": "CVE-2014-7236-exploit",
"owner": "security-researcher",
"full_name": "security-researcher/CVE-2014-7236-exploit",
"html_url": "https://github.com/security-researcher/CVE-2014-7236-exploit",
"description": "PowerShell exploit for CVE-2014-7236 using cmd.exe and powershell.exe",
"stargazers_count": 15,
"created_at": "2014-11-01T00:00:00Z",
"updated_at": "2014-11-15T00:00:00Z",
"quality_analysis": {
"quality_score": 75,
"quality_tier": "good",
"factors": {
"star_score": 30,
"recency_score": 10,
"description_score": 15,
"vuln_description_score": 15,
"name_relevance_score": 10
}
},
"exploit_indicators": {
"processes": ["powershell.exe", "cmd.exe"],
"files": ["exploit.ps1", "payload.exe"],
"commands": ["Invoke-Expression", "DownloadString", "whoami"],
"network": ["192.168.1.100", "8080"],
"urls": ["http://malicious.com/payload"],
"registry": ["HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft"]
}
},
{
"id": "test2",
"name": "office-exploit-poc",
"owner": "hacker",
"full_name": "hacker/office-exploit-poc",
"html_url": "https://github.com/hacker/office-exploit-poc",
"description": "Office document exploit with malicious macro",
"stargazers_count": 8,
"created_at": "2014-12-01T00:00:00Z",
"updated_at": "2014-12-10T00:00:00Z",
"quality_analysis": {
"quality_score": 45,
"quality_tier": "fair",
"factors": {
"star_score": 16,
"recency_score": 8,
"description_score": 12,
"vuln_description_score": 0,
"name_relevance_score": 5
}
},
"exploit_indicators": {
"processes": ["winword.exe", "excel.exe"],
"files": ["document.docx", "malicious.xlsm"],
"commands": ["CreateObject", "Shell.Application"],
"network": ["10.0.0.1"],
"urls": ["http://evil.com/download"],
"registry": ["HKEY_CURRENT_USER\\Software\\Microsoft\\Office"]
}
}
]
db.commit()
print(f"Using CVE: {test_cve.cve_id} with {test_cve.poc_count} PoCs")
# Generate enhanced rule
print("Generating enhanced SIGMA rule...")
generator = EnhancedSigmaGenerator(db)
result = await generator.generate_enhanced_rule(test_cve)
print(f"Generation result: {result}")
if result.get('success'):
# Fetch the generated rule
sigma_rule = db.query(SigmaRule).filter(SigmaRule.cve_id == test_cve.cve_id).first()
if sigma_rule:
print("\n" + "="*60)
print("GENERATED SIGMA RULE:")
print("="*60)
print(sigma_rule.rule_content)
print("="*60)
print(f"Detection Type: {sigma_rule.detection_type}")
print(f"Log Source: {sigma_rule.log_source}")
print(f"Confidence Level: {sigma_rule.confidence_level}")
print(f"PoC Quality Score: {sigma_rule.poc_quality_score}")
print(f"Exploit Indicators: {sigma_rule.exploit_indicators}")
print("="*60)
else:
print("No SIGMA rule found in database")
else:
print(f"Rule generation failed: {result.get('error')}")
except Exception as e:
print(f"Error during test: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
asyncio.run(test_enhanced_rule_generation())

View file

@ -0,0 +1,155 @@
"""
YAML Metadata Generator for SIGMA Rules
Generates YAML metadata sections for SIGMA rules based on CVE and PoC data
"""
import logging
from typing import Dict, List, Optional, Any
from sqlalchemy.orm import Session
from datetime import datetime
logger = logging.getLogger(__name__)
class YAMLMetadataGenerator:
"""Generates YAML metadata sections for SIGMA rules"""
def __init__(self, db_session: Session):
self.db_session = db_session
def generate_metadata(self, cve, poc_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate YAML metadata for a SIGMA rule based on CVE and PoC data
Args:
cve: CVE database object
poc_data: List of PoC data dictionaries
Returns:
Dictionary containing YAML metadata sections
"""
try:
# Extract CVE information
cve_id = cve.cve_id
description = cve.description or ""
cvss_score = getattr(cve, 'cvss_score', None)
published_date = getattr(cve, 'published_date', None)
# Determine attack techniques from PoC data
attack_techniques = self._extract_attack_techniques(poc_data)
# Generate basic metadata
metadata = {
'title': f"Potential {cve_id} Exploitation",
'id': f"sigma-{cve_id.lower().replace('-', '_')}",
'description': self._generate_description(cve_id, description),
'references': [
f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}",
f"https://nvd.nist.gov/vuln/detail/{cve_id}"
],
'author': "Auto-generated SIGMA Rule",
'date': datetime.now().strftime("%Y/%m/%d"),
'tags': self._generate_tags(cve_id, attack_techniques),
'level': self._determine_level(cvss_score),
'status': "experimental"
}
# Add PoC-specific references
if poc_data:
for poc in poc_data[:3]: # Add up to 3 PoC references
if 'html_url' in poc:
metadata['references'].append(poc['html_url'])
# Add MITRE ATT&CK techniques if available
if attack_techniques:
metadata['falsepositives'] = [
"Legitimate use of affected software",
"Administrative activities"
]
metadata['fields'] = ["CommandLine", "ProcessName", "ParentProcessName"]
return metadata
except Exception as e:
logger.error(f"Error generating metadata for {cve.cve_id}: {e}")
return self._generate_fallback_metadata(cve.cve_id)
def _extract_attack_techniques(self, poc_data: List[Dict[str, Any]]) -> List[str]:
"""Extract MITRE ATT&CK techniques from PoC data"""
techniques = []
for poc in poc_data:
# Look for common attack patterns in PoC descriptions
description = poc.get('description', '').lower()
if 'remote code execution' in description or 'rce' in description:
techniques.append('T1203') # Exploitation for Client Execution
if 'privilege escalation' in description:
techniques.append('T1068') # Exploitation for Privilege Escalation
if 'sql injection' in description:
techniques.append('T1190') # Exploit Public-Facing Application
if 'xss' in description or 'cross-site scripting' in description:
techniques.append('T1185') # Browser Session Hijacking
if 'buffer overflow' in description:
techniques.append('T1203') # Exploitation for Client Execution
if 'deserialization' in description:
techniques.append('T1190') # Exploit Public-Facing Application
return list(set(techniques))
def _generate_description(self, cve_id: str, description: str) -> str:
"""Generate a concise description for the SIGMA rule"""
if description:
# Take first sentence or first 200 characters
first_sentence = description.split('.')[0]
if len(first_sentence) > 200:
return first_sentence[:200] + "..."
return first_sentence + "."
else:
return f"Detects potential exploitation of {cve_id}"
def _generate_tags(self, cve_id: str, attack_techniques: List[str]) -> List[str]:
"""Generate tags for the SIGMA rule"""
tags = [
"attack.t1203", # Default to exploitation technique
"cve." + cve_id.lower().replace('-', '_')
]
# Add specific technique tags
for technique in attack_techniques:
tags.append(f"attack.{technique.lower()}")
return tags
def _determine_level(self, cvss_score: Optional[float]) -> str:
"""Determine the severity level based on CVSS score"""
if cvss_score is None:
return "medium"
if cvss_score >= 9.0:
return "critical"
elif cvss_score >= 7.0:
return "high"
elif cvss_score >= 4.0:
return "medium"
else:
return "low"
def _generate_fallback_metadata(self, cve_id: str) -> Dict[str, Any]:
"""Generate minimal fallback metadata when primary generation fails"""
return {
'title': f"Potential {cve_id} Exploitation",
'id': f"sigma-{cve_id.lower().replace('-', '_')}",
'description': f"Detects potential exploitation of {cve_id}",
'references': [
f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}",
f"https://nvd.nist.gov/vuln/detail/{cve_id}"
],
'author': "Auto-generated SIGMA Rule",
'date': datetime.now().strftime("%Y/%m/%d"),
'tags': [
"attack.t1203",
f"cve.{cve_id.lower().replace('-', '_')}"
],
'level': "medium",
'status': "experimental"
}

View file

@ -27,21 +27,21 @@ class MigrateCommands(BaseCommand):
"""Migrate data from existing database to file structure"""
try:
# Import database components
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from main import CVE, SigmaRule, RuleTemplate # Import from existing main.py
# Import database components
from database_models import CVE, SigmaRule, RuleTemplate, SessionLocal
# Use provided database URL or default
if not database_url:
database_url = os.getenv("DATABASE_URL", "postgresql://cve_user:cve_password@localhost:5432/cve_sigma_db")
self.info(f"Connecting to database: {database_url.split('@')[1] if '@' in database_url else database_url}")
# Create database session
engine = create_engine(database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionLocal()
# Use existing database session factory
if database_url:
self.info(f"Using provided database URL")
# Create new engine with provided URL
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(database_url)
SessionFactory = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionFactory()
else:
# Use default session factory
db = SessionLocal()
# Get total counts
cve_count = db.query(CVE).count()

View file

@ -208,7 +208,7 @@ class ProcessCommands(BaseCommand):
try:
# Use the existing NVD bulk processor
from main import SessionLocal # Import session factory
from database_models import SessionLocal # Import session factory
db_session = SessionLocal()
try:
@ -242,7 +242,7 @@ class ProcessCommands(BaseCommand):
self.info(f"Fetching data for {cve_id}...")
try:
from main import SessionLocal
from database_models import SessionLocal
db_session = SessionLocal()
try:
@ -267,7 +267,7 @@ class ProcessCommands(BaseCommand):
async def _sync_database_to_files(self, db_session, year: int):
"""Sync database records to file structure for a specific year"""
try:
from main import CVE
from database_models import CVE
# Get all CVEs for the year from database
year_pattern = f"CVE-{year}-%"
@ -282,7 +282,7 @@ class ProcessCommands(BaseCommand):
async def _sync_single_cve_to_files(self, db_session, cve_id: str):
"""Sync a single CVE from database to file structure"""
try:
from main import CVE
from database_models import CVE
cve = db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if cve:
@ -368,7 +368,7 @@ class ProcessCommands(BaseCommand):
async def _generate_template_rule(self, cve_id: str, metadata: Dict) -> bool:
"""Generate template-based SIGMA rule"""
try:
from main import SessionLocal
from database_models import SessionLocal
db_session = SessionLocal()
try:
@ -407,7 +407,7 @@ class ProcessCommands(BaseCommand):
async def _generate_llm_rule(self, cve_id: str, metadata: Dict, provider: str = 'openai') -> bool:
"""Generate LLM-based SIGMA rule"""
try:
from main import SessionLocal
from database_models import SessionLocal
db_session = SessionLocal()
try:

View file

@ -1,203 +0,0 @@
services:
db:
image: postgres:15
environment:
POSTGRES_DB: cve_sigma_db
POSTGRES_USER: cve_user
POSTGRES_PASSWORD: cve_password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U cve_user -d cve_sigma_db"]
interval: 30s
timeout: 10s
retries: 3
backend:
build: ./backend
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql://cve_user:cve_password@db:5432/cve_sigma_db
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
NVD_API_KEY: ${NVD_API_KEY:-}
GITHUB_TOKEN: ${GITHUB_TOKEN}
OPENAI_API_KEY: ${OPENAI_API_KEY:-}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-}
OLLAMA_BASE_URL: ${OLLAMA_BASE_URL:-http://ollama:11434}
LLM_PROVIDER: ${LLM_PROVIDER:-ollama}
LLM_MODEL: ${LLM_MODEL:-llama3.2}
LLM_ENABLED: ${LLM_ENABLED:-true}
FINETUNED_MODEL_PATH: ${FINETUNED_MODEL_PATH:-/app/models/sigma_llama_finetuned}
HUGGING_FACE_TOKEN: ${HUGGING_FACE_TOKEN}
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
ollama-setup:
condition: service_completed_successfully
volumes:
- ./backend:/app
- ./github_poc_collector:/github_poc_collector
- ./exploit-db-mirror:/app/exploit-db-mirror
- ./models:/app/models
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
frontend:
build: ./frontend
ports:
- "3000:3000"
environment:
REACT_APP_API_URL: http://localhost:8000
volumes:
- ./frontend:/app
- /app/node_modules
command: npm start
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis_data:/data
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_HOST=0.0.0.0
restart: unless-stopped
deploy:
resources:
limits:
memory: 5G
reservations:
memory: 3G
ollama-setup:
build: ./backend
depends_on:
- ollama
environment:
OLLAMA_BASE_URL: http://ollama:11434
LLM_MODEL: llama3.2
volumes:
- ./backend:/app
- ./models:/app/models
command: python setup_ollama_with_sigma.py
restart: "no"
user: root
initial-setup:
build: ./backend
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
celery-worker:
condition: service_healthy
environment:
DATABASE_URL: postgresql://cve_user:cve_password@db:5432/cve_sigma_db
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
volumes:
- ./backend:/app
command: python initial_setup.py
restart: "no"
celery-worker:
build: ./backend
command: celery -A celery_config worker --loglevel=info --concurrency=4
environment:
DATABASE_URL: postgresql://cve_user:cve_password@db:5432/cve_sigma_db
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
NVD_API_KEY: ${NVD_API_KEY:-}
GITHUB_TOKEN: ${GITHUB_TOKEN}
OPENAI_API_KEY: ${OPENAI_API_KEY:-}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-}
OLLAMA_BASE_URL: ${OLLAMA_BASE_URL:-http://ollama:11434}
LLM_PROVIDER: ${LLM_PROVIDER:-ollama}
LLM_MODEL: ${LLM_MODEL:-llama3.2}
LLM_ENABLED: ${LLM_ENABLED:-true}
FINETUNED_MODEL_PATH: ${FINETUNED_MODEL_PATH:-/app/models/sigma_llama_finetuned}
HUGGING_FACE_TOKEN: ${HUGGING_FACE_TOKEN}
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
ollama-setup:
condition: service_completed_successfully
volumes:
- ./backend:/app
- ./github_poc_collector:/github_poc_collector
- ./exploit-db-mirror:/app/exploit-db-mirror
- ./models:/app/models
restart: unless-stopped
healthcheck:
test: ["CMD", "celery", "-A", "celery_config", "inspect", "ping"]
interval: 30s
timeout: 10s
retries: 3
celery-beat:
build: ./backend
command: celery -A celery_config beat --loglevel=info --pidfile=/tmp/celerybeat.pid
environment:
DATABASE_URL: postgresql://cve_user:cve_password@db:5432/cve_sigma_db
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
NVD_API_KEY: ${NVD_API_KEY:-}
GITHUB_TOKEN: ${GITHUB_TOKEN}
OPENAI_API_KEY: ${OPENAI_API_KEY:-}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY:-}
OLLAMA_BASE_URL: ${OLLAMA_BASE_URL:-http://ollama:11434}
LLM_PROVIDER: ${LLM_PROVIDER:-ollama}
LLM_MODEL: ${LLM_MODEL:-llama3.2}
LLM_ENABLED: ${LLM_ENABLED:-true}
FINETUNED_MODEL_PATH: ${FINETUNED_MODEL_PATH:-/app/models/sigma_llama_finetuned}
HUGGING_FACE_TOKEN: ${HUGGING_FACE_TOKEN}
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
celery-worker:
condition: service_healthy
volumes:
- ./backend:/app
- ./github_poc_collector:/github_poc_collector
- ./exploit-db-mirror:/app/exploit-db-mirror
- ./models:/app/models
restart: unless-stopped
flower:
build: ./backend
command: celery -A celery_config flower --port=5555
ports:
- "5555:5555"
environment:
CELERY_BROKER_URL: redis://redis:6379/0
CELERY_RESULT_BACKEND: redis://redis:6379/0
depends_on:
redis:
condition: service_started
celery-worker:
condition: service_healthy
restart: unless-stopped
volumes:
postgres_data:
redis_data:
ollama_data:

View file

@ -1,24 +0,0 @@
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package*.json ./
# Install dependencies
RUN npm install
# Copy source code
COPY . .
# Create non-root user
RUN addgroup -g 1001 -S nodejs
RUN adduser -S reactuser -u 1001
# Change ownership
RUN chown -R reactuser:nodejs /app
USER reactuser
EXPOSE 3000
CMD ["npm", "start"]

View file

@ -1,47 +0,0 @@
{
"name": "cve-sigma-frontend",
"version": "0.1.0",
"private": true,
"dependencies": {
"@testing-library/jest-dom": "^5.16.4",
"@testing-library/react": "^13.3.0",
"@testing-library/user-event": "^13.5.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-scripts": "5.0.1",
"axios": "^1.6.0",
"react-router-dom": "^6.8.0",
"react-syntax-highlighter": "^15.5.0",
"web-vitals": "^2.1.4"
},
"devDependencies": {
"tailwindcss": "^3.3.0",
"autoprefixer": "^10.4.14",
"postcss": "^8.4.24"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": [
"react-app",
"react-app/jest"
]
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
},
"proxy": "http://backend:8000"
}

View file

@ -1,6 +0,0 @@
module.exports = {
plugins: {
tailwindcss: {},
autoprefixer: {},
},
}

View file

@ -1,18 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="theme-color" content="#000000" />
<meta
name="description"
content="CVE-SIGMA Auto Generator - Automatically generate SIGMA rules from CVE data"
/>
<title>CVE-SIGMA Auto Generator</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<div id="root"></div>
</body>
</html>

View file

@ -1,126 +0,0 @@
@tailwind base;
@tailwind components;
@tailwind utilities;
.line-clamp-2 {
display: -webkit-box;
-webkit-line-clamp: 2;
-webkit-box-orient: vertical;
overflow: hidden;
}
.animate-spin {
animation: spin 1s linear infinite;
}
@keyframes spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}
/* Custom scrollbar for syntax highlighter */
.react-syntax-highlighter-line-number {
color: #6b7280 !important;
}
/* Responsive table improvements */
@media (max-width: 768px) {
.overflow-x-auto {
-webkit-overflow-scrolling: touch;
}
table {
font-size: 0.875rem;
}
.px-6 {
padding-left: 1rem;
padding-right: 1rem;
}
}
/* Modal backdrop blur effect */
.fixed.inset-0.bg-gray-600 {
backdrop-filter: blur(4px);
}
/* Syntax highlighter theme overrides */
.language-yaml {
border-radius: 0.375rem;
max-height: 400px;
overflow-y: auto;
}
/* Loading spinner improvements */
.animate-spin {
border-top-color: transparent;
}
/* Badge hover effects */
.inline-flex.px-2.py-1 {
transition: all 0.2s ease-in-out;
}
.inline-flex.px-2.py-1:hover {
transform: translateY(-1px);
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
/* Button hover effects */
button {
transition: all 0.2s ease-in-out;
}
button:hover {
transform: translateY(-1px);
}
/* Card hover effects */
.hover\:bg-gray-50:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.05);
transition: all 0.2s ease-in-out;
}
/* Smooth transitions for tab switching */
.border-b-2 {
transition: border-color 0.2s ease-in-out;
}
/* Custom focus styles for accessibility */
button:focus,
.focus\:outline-none:focus {
outline: 2px solid #3b82f6;
outline-offset: 2px;
}
/* Table row hover effects */
tbody tr:hover {
background-color: #f9fafb;
transition: background-color 0.15s ease-in-out;
}
/* Responsive grid improvements */
@media (max-width: 640px) {
.grid-cols-1.md\:grid-cols-3 {
gap: 1rem;
}
}
/* Loading state styles */
.loading-pulse {
animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite;
}
@keyframes pulse {
0%, 100% {
opacity: 1;
}
50% {
opacity: .5;
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,11 +0,0 @@
import React from 'react';
import ReactDOM from 'react-dom/client';
import './App.css';
import App from './App';
const root = ReactDOM.createRoot(document.getElementById('root'));
root.render(
<React.StrictMode>
<App />
</React.StrictMode>
);

View file

@ -1,33 +0,0 @@
/** @type {import('tailwindcss').Config} */
module.exports = {
content: [
"./src/**/*.{js,jsx,ts,tsx}",
"./public/index.html"
],
theme: {
extend: {
colors: {
'cve-blue': '#3b82f6',
'cve-green': '#10b981',
'cve-red': '#ef4444',
'cve-orange': '#f97316',
'cve-yellow': '#eab308',
},
animation: {
'fade-in': 'fadeIn 0.5s ease-in-out',
'slide-up': 'slideUp 0.3s ease-out',
},
keyframes: {
fadeIn: {
'0%': { opacity: '0' },
'100%': { opacity: '1' },
},
slideUp: {
'0%': { transform: 'translateY(10px)', opacity: '0' },
'100%': { transform: 'translateY(0)', opacity: '1' },
},
},
},
},
plugins: [],
}

191
init.sql
View file

@ -1,191 +0,0 @@
-- Database initialization script
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- CVEs table
CREATE TABLE cves (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
cve_id VARCHAR(20) UNIQUE NOT NULL,
description TEXT,
cvss_score DECIMAL(3,1),
severity VARCHAR(20),
published_date TIMESTAMP,
modified_date TIMESTAMP,
affected_products TEXT[],
reference_urls TEXT[],
-- Bulk processing fields
data_source VARCHAR(20) DEFAULT 'nvd_api',
nvd_json_version VARCHAR(10) DEFAULT '2.0',
bulk_processed BOOLEAN DEFAULT FALSE,
-- nomi-sec PoC fields
poc_count INTEGER DEFAULT 0,
poc_data JSON,
-- Reference data fields
reference_data JSON,
reference_sync_status VARCHAR(20) DEFAULT 'pending',
reference_last_synced TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- SIGMA rules table
CREATE TABLE sigma_rules (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
cve_id VARCHAR(20) REFERENCES cves(cve_id),
rule_name VARCHAR(255) NOT NULL,
rule_content TEXT NOT NULL,
detection_type VARCHAR(50),
log_source VARCHAR(100),
confidence_level VARCHAR(20),
auto_generated BOOLEAN DEFAULT TRUE,
exploit_based BOOLEAN DEFAULT FALSE,
github_repos TEXT[],
exploit_indicators TEXT,
-- Enhanced fields for new data sources
poc_source VARCHAR(20) DEFAULT 'github_search',
poc_quality_score INTEGER DEFAULT 0,
nomi_sec_data JSON,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Rule templates table
CREATE TABLE rule_templates (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
template_name VARCHAR(255) NOT NULL,
template_content TEXT NOT NULL,
applicable_product_patterns TEXT[],
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
-- Bulk processing jobs table
CREATE TABLE bulk_processing_jobs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
job_type VARCHAR(50) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
year INTEGER,
total_items INTEGER DEFAULT 0,
processed_items INTEGER DEFAULT 0,
failed_items INTEGER DEFAULT 0,
error_message TEXT,
job_metadata JSON,
started_at TIMESTAMP,
completed_at TIMESTAMP,
cancelled_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
-- Insert some basic rule templates
INSERT INTO rule_templates (template_name, template_content, applicable_product_patterns, description) VALUES
(
'Windows Process Execution',
'title: {title}
description: {description}
id: {rule_id}
status: experimental
author: CVE-SIGMA Auto Generator
date: {date}
references:
- {cve_url}
tags:
- {tags}
logsource:
category: process_creation
product: windows
detection:
selection:
Image|contains: {suspicious_processes}
condition: selection
falsepositives:
- Legitimate use of the software
level: {level}',
ARRAY['windows', 'microsoft'],
'Template for Windows process execution detection'
),
(
'Network Connection',
'title: {title}
description: {description}
id: {rule_id}
status: experimental
author: CVE-SIGMA Auto Generator
date: {date}
references:
- {cve_url}
tags:
- {tags}
logsource:
category: network_connection
product: windows
detection:
selection:
Initiated: true
DestinationPort: {suspicious_ports}
condition: selection
falsepositives:
- Legitimate network connections
level: {level}',
ARRAY['network', 'connection', 'remote'],
'Template for network connection detection'
),
(
'File Modification',
'title: {title}
description: {description}
id: {rule_id}
status: experimental
author: CVE-SIGMA Auto Generator
date: {date}
references:
- {cve_url}
tags:
- {tags}
logsource:
category: file_event
product: windows
detection:
selection:
EventType: creation
TargetFilename|contains: {file_patterns}
condition: selection
falsepositives:
- Legitimate file operations
level: {level}',
ARRAY['file', 'filesystem', 'modification'],
'Template for file modification detection'
),
(
'PowerShell Execution',
'title: {title}
description: {description}
id: {rule_id}
status: experimental
author: CVE-SIGMA Auto Generator
date: {date}
references:
- {cve_url}
tags:
- {tags}
logsource:
product: windows
category: ps_script
detection:
selection:
ScriptBlockText|contains: {suspicious_processes}
condition: selection
falsepositives:
- Legitimate PowerShell scripts
level: {level}',
ARRAY['powershell', 'script', 'ps1'],
'Template for PowerShell script execution detection'
);
-- Create indexes
CREATE INDEX idx_cves_cve_id ON cves(cve_id);
CREATE INDEX idx_cves_published_date ON cves(published_date);
CREATE INDEX idx_cves_severity ON cves(severity);
CREATE INDEX idx_cves_reference_sync_status ON cves(reference_sync_status);
CREATE INDEX idx_cves_reference_last_synced ON cves(reference_last_synced);
CREATE INDEX idx_sigma_rules_cve_id ON sigma_rules(cve_id);
CREATE INDEX idx_sigma_rules_detection_type ON sigma_rules(detection_type);

View file

@ -1,63 +0,0 @@
#!/bin/bash
# CVE-SIGMA Auto Generator Startup Script
echo "🚀 Starting CVE-SIGMA Auto Generator..."
echo "==============================================="
# Check if Docker and Docker Compose are installed
if ! command -v docker &> /dev/null; then
echo "❌ Docker is not installed. Please install Docker first."
exit 1
fi
if ! command -v docker-compose &> /dev/null; then
echo "❌ Docker Compose is not installed. Please install Docker Compose first."
exit 1
fi
# Check if .env file exists, if not create from example
if [ ! -f .env ]; then
echo "📝 Creating .env file from .env.example..."
cp .env.example .env
echo "✅ .env file created. Please edit it to add your NVD API key for better rate limits."
fi
# Stop any existing containers
echo "🛑 Stopping any existing containers..."
docker-compose down
# Build and start the application
echo "🔨 Building and starting the application..."
docker-compose up -d --build
# Wait for services to be ready
echo "⏳ Waiting for services to start..."
sleep 10
# Check if services are running
echo "🔍 Checking service status..."
if docker-compose ps | grep -q "Up"; then
echo "✅ Services are running!"
echo ""
echo "🌐 Access the application at:"
echo " Frontend: http://localhost:3000"
echo " Backend API: http://localhost:8000"
echo " API Documentation: http://localhost:8000/docs"
echo ""
echo "📊 The application will automatically:"
echo " - Fetch recent CVEs from NVD"
echo " - Generate SIGMA rules"
echo " - Update every hour"
echo ""
echo "💡 Tip: Add your NVD API key to .env for higher rate limits"
echo " Get one free at: https://nvd.nist.gov/developers/request-an-api-key"
else
echo "❌ Some services failed to start. Check logs with:"
echo " docker-compose logs"
fi
# Show logs
echo ""
echo "📋 Recent logs (press Ctrl+C to exit):"
docker-compose logs -f --tail=50