add cve2capec client to map mitre attack data to cves

This commit is contained in:
Brendan McDevitt 2025-07-14 15:48:10 -05:00
parent d38edff1cd
commit 06c4ed74b8
5 changed files with 1308 additions and 395 deletions

542
README.md
View file

@ -1,236 +1,139 @@
# CVE-SIGMA Auto Generator (Enhanced)
# CVE-SIGMA Auto Generator
An advanced automated platform that processes comprehensive CVE data and generates enhanced SIGMA rules for threat detection using curated exploit intelligence.
Automated platform that generates SIGMA detection rules from CVE data using AI-enhanced exploit analysis.
## 🚀 Enhanced Features
## ✨ Key Features
### Data Processing
- **Bulk NVD Processing**: Downloads and processes complete NVD JSON datasets (2002-2025)
- **nomi-sec PoC Integration**: Uses curated PoC data from github.com/nomi-sec/PoC-in-GitHub
- **Incremental Updates**: Efficient updates using NVD modified/recent feeds
- **Quality Assessment**: Advanced PoC quality scoring with star count, recency, and relevance analysis
- **Bulk CVE Processing**: Complete NVD datasets (2002-2025) with nomi-sec PoC integration
- **AI-Powered Rule Generation**: Multi-provider LLM support (OpenAI, Anthropic, local Ollama)
- **Quality-Based PoC Analysis**: 5-tier quality scoring system for exploit reliability
- **Real-time Monitoring**: Live job tracking and progress dashboard
- **Advanced Indicators**: Extract processes, files, network patterns from actual exploits
### Intelligence Generation
- **Enhanced SIGMA Rules**: Creates rules using real exploit indicators from curated PoCs
- **AI-Powered Rule Generation**: Multi-provider LLM integration (OpenAI, Anthropic, Ollama)
- **Local LLM Processing**: Built-in Ollama container for offline AI rule generation
- **Quality Tiers**: Excellent, Good, Fair, Poor, Very Poor classification system
- **Smart Template Selection**: AI-driven template matching based on PoC characteristics
- **Advanced Indicator Extraction**: Processes, files, network, registry, and command patterns
- **MITRE ATT&CK Mapping**: Automatic technique identification based on exploit analysis
### User Experience
- **Modern Web Interface**: React-based UI with enhanced bulk processing controls
- **Real-time Monitoring**: Live job tracking and progress monitoring
- **Comprehensive Statistics**: PoC coverage, quality metrics, and processing status
- **Bulk Operations Dashboard**: Centralized control for all data processing operations
## Architecture
- **Backend**: FastAPI with SQLAlchemy ORM
- **Frontend**: React with Tailwind CSS
- **Database**: PostgreSQL
- **Cache**: Redis (optional)
- **LLM Engine**: Ollama (local models) with multi-provider support
- **Containerization**: Docker & Docker Compose
## Quick Start
## 🚀 Quick Start
### Prerequisites
- Docker and Docker Compose
- (Optional) NVD API Key for increased rate limits
- (Optional) API keys for enhanced features
### Setup
### Installation
1. Clone the repository:
```bash
# Clone and start
git clone <repository-url>
cd cve-sigma-generator
```
2. **Quick Start** (Recommended):
```bash
cd auto_sigma_rule_generator
chmod +x start.sh
./start.sh
```
3. **Manual Setup**:
```bash
# Copy environment file
cp .env.example .env
# (Optional) Edit .env and add your NVD API key
nano .env
# Start the application
docker-compose up -d --build
```
4. Wait for services to initialize (about 30-60 seconds)
5. Access the application:
- Frontend: http://localhost:3000
- Backend API: http://localhost:8000
- API Documentation: http://localhost:8000/docs
- Ollama API: http://localhost:11434
**Access Points:**
- Frontend: http://localhost:3000
- API: http://localhost:8000
- API Docs: http://localhost:8000/docs
### First Run
The application automatically:
1. Initializes database with rule templates
2. Fetches recent CVEs from NVD
3. Generates SIGMA rules with AI enhancement
4. Polls for new CVEs hourly
The application will automatically:
1. Initialize the database with rule templates
2. Start fetching recent CVEs from NVD
3. Generate SIGMA rules for each CVE
4. Continue polling for new CVEs every hour
## Usage
## 🎯 Usage
### Web Interface
The web interface provides three main sections:
1. **Dashboard**: Overview statistics and recent CVEs
2. **CVEs**: Complete list of all fetched CVEs with details
3. **SIGMA Rules**: Generated detection rules organized by CVE
### Manual CVE Fetch
You can trigger a manual CVE fetch using the "Fetch New CVEs" button in the dashboard or via API:
```bash
curl -X POST http://localhost:8000/api/fetch-cves
```
- **Dashboard**: Statistics and system overview
- **CVEs**: Complete CVE listing with PoC data
- **SIGMA Rules**: Generated detection rules
- **Bulk Jobs**: Processing status and controls
### API Endpoints
#### Core Endpoints
- `GET /api/cves` - List all CVEs
- `GET /api/cves/{cve_id}` - Get specific CVE details
- `GET /api/sigma-rules` - List all SIGMA rules
- `GET /api/sigma-rules/{cve_id}` - Get SIGMA rules for specific CVE
- `POST /api/fetch-cves` - Manually trigger CVE fetch
- `GET /api/stats` - Get application statistics
#### Core Operations
```bash
# Fetch CVEs
curl -X POST http://localhost:8000/api/fetch-cves
#### LLM-Enhanced Endpoints
- `POST /api/llm-enhanced-rules` - Generate SIGMA rules using LLM AI analysis
- `GET /api/llm-status` - Check LLM API availability and configuration
- `POST /api/llm-switch` - Switch between LLM providers and models
# Bulk processing
curl -X POST http://localhost:8000/api/bulk-seed
curl -X POST http://localhost:8000/api/incremental-update
## Configuration
# LLM-enhanced rules
curl -X POST http://localhost:8000/api/llm-enhanced-rules
```
#### Data Access
- `GET /api/cves` - List CVEs
- `GET /api/sigma-rules` - List rules
- `GET /api/stats` - Statistics
- `GET /api/llm-status` - LLM provider status
## ⚙️ Configuration
### Environment Variables
#### Core Configuration
- `DATABASE_URL`: PostgreSQL connection string
- `NVD_API_KEY`: Optional NVD API key for higher rate limits (5→50 requests/30s)
- `GITHUB_TOKEN`: Optional GitHub personal access token for exploit analysis
- `REACT_APP_API_URL`: Backend API URL for frontend
#### LLM Configuration
- `LLM_PROVIDER`: LLM provider selection (openai, anthropic, ollama) - **Default: ollama**
- `LLM_MODEL`: Model selection (provider-specific) - **Default: llama3.2**
- `OLLAMA_BASE_URL`: Ollama service URL - **Default: http://ollama:11434**
- `OPENAI_API_KEY`: Optional OpenAI API key for GPT models
- `ANTHROPIC_API_KEY`: Optional Anthropic API key for Claude models
### GitHub Integration (Optional)
For enhanced SIGMA rule generation with exploit analysis:
1. **Create GitHub Token**: Visit https://github.com/settings/tokens
2. **Required Permissions**: Only needs "public_repo" scope for searching public repositories
3. **Add to Environment**: `GITHUB_TOKEN=your_token_here` in `.env` file
4. **Benefits**:
- Automatically searches for CVE-related exploit code
- Extracts real indicators (processes, files, network connections)
- Generates more accurate and specific SIGMA rules
- Higher confidence ratings for exploit-based rules
**Rate Limits**: 5000 requests/hour with token, 60/hour without
### Rule Templates
The application includes pre-configured rule templates for:
- Windows Process Execution
- Network Connections
- File Modifications
Additional templates can be added to the database via the `rule_templates` table.
## SIGMA Rule Generation Logic
The enhanced rule generation process supports multiple generation methods:
### Traditional Rule Generation
1. **CVE Analysis**: Analyzes CVE description and affected products
2. **GitHub Exploit Search**: Searches GitHub for exploit code using multiple query strategies
3. **Code Analysis**: Extracts specific indicators from exploit code:
- Process names and command lines
- File paths and registry keys
- Network connections and ports
- PowerShell commands and scripts
- Command execution patterns
4. **Template Selection**: Chooses appropriate SIGMA rule template based on exploit analysis
5. **Enhanced Rule Population**: Fills template with real exploit indicators
6. **MITRE ATT&CK Mapping**: Maps to specific MITRE ATT&CK techniques
7. **Confidence Scoring**: Higher confidence for exploit-based rules
### AI-Enhanced Rule Generation
1. **LLM Provider Selection**: Chooses between OpenAI, Anthropic, or local Ollama
2. **Contextual Analysis**: LLM analyzes CVE description and PoC code
3. **Intelligent Rule Creation**: AI generates sophisticated SIGMA rules with:
- Proper YAML syntax and structure
- Advanced detection logic
- Contextual field selection
- Relevant MITRE ATT&CK mappings
4. **Automatic Validation**: Generated rules are validated for syntax compliance
5. **Fallback Mechanism**: Falls back to template-based generation if LLM fails
### Rule Quality Levels
- **Basic Rules**: Generated from CVE description only
- **Exploit-Based Rules**: Enhanced with GitHub exploit analysis (marked with 🔍)
- **AI-Enhanced Rules**: Generated using LLM analysis of PoC code (marked with 🤖)
- **Confidence Ratings**:
- **High**: CVSS ≥9.0 + exploit analysis + AI enhancement
- **Medium**: CVSS ≥7.0 or exploit analysis or AI enhancement
- **Low**: Basic CVE description only
### Template Matching
- **PowerShell Execution**: Exploit contains PowerShell scripts or cmdlets
- **Process Execution**: Exploit shows process creation or command execution
- **Network Connection**: Exploit demonstrates network communications
- **File Modification**: Exploit involves file system operations
### Example Enhanced Rules
**Traditional Exploit-Based Rule:**
```yaml
title: CVE-2025-1234 Exploit-Based Detection
description: Detection for CVE-2025-1234 remote code execution [Enhanced with GitHub exploit analysis]
tags:
- attack.t1059.001
- cve-2025-1234
- exploit.github
detection:
selection:
Image|contains:
- "powershell.exe"
- "malicious_payload.exe"
- "reverse_shell.ps1"
condition: selection
level: high
**Core Settings**
```bash
DATABASE_URL=postgresql://user:pass@db:5432/dbname
NVD_API_KEY=your_nvd_key # Optional: 5→50 req/30s
GITHUB_TOKEN=your_github_token # Optional: Enhanced PoC analysis
```
**AI-Enhanced Rule (Generated by Ollama):**
**LLM Configuration**
```bash
LLM_PROVIDER=ollama # Default: ollama (local)
LLM_MODEL=llama3.2 # Provider-specific model
OLLAMA_BASE_URL=http://ollama:11434
# External providers (optional)
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
```
### API Keys Setup
**NVD API** (Recommended)
1. Get key: https://nvd.nist.gov/developers/request-an-api-key
2. Add to `.env`: `NVD_API_KEY=your_key`
3. Benefit: 10x rate limit increase
**GitHub Token** (Optional)
1. Create: https://github.com/settings/tokens (public_repo scope)
2. Add to `.env`: `GITHUB_TOKEN=your_token`
3. Benefit: Enhanced exploit-based rules
**LLM APIs** (Optional)
- **Local Ollama**: No setup required (default)
- **OpenAI**: Get key from https://platform.openai.com/api-keys
- **Anthropic**: Get key from https://console.anthropic.com/
## 🧠 Rule Generation
### AI-Enhanced Generation
1. **PoC Analysis**: LLM analyzes actual exploit code
2. **Intelligent Detection**: Creates sophisticated SIGMA rules
3. **Context Awareness**: Maps CVE descriptions to detection patterns
4. **Validation**: Automatic SIGMA syntax verification
5. **Fallback**: Template-based generation if LLM unavailable
### Quality Tiers
- **Excellent** (80+ pts): High-quality PoCs with recent updates
- **Good** (60-79 pts): Moderate quality indicators
- **Fair** (40-59 pts): Basic PoCs with some validation
- **Poor** (20-39 pts): Minimal quality indicators
- **Very Poor** (<20 pts): Low-quality PoCs
### Rule Types
- 🤖 **AI-Enhanced**: LLM-generated with PoC analysis
- 🔍 **Exploit-Based**: Template + GitHub exploit indicators
- ⚡ **Basic**: CVE description only
### Example Output
```yaml
title: CVE-2025-1234 AI-Enhanced Detection
description: Detection for CVE-2025-1234 remote code execution [AI-Enhanced with PoC analysis]
description: Detection for CVE-2025-1234 RCE [AI-Enhanced with PoC analysis]
tags:
- attack.t1059.001
- attack.t1071.001
- cve-2025-1234
- cve-2025-1234
- ai.enhanced
detection:
selection_process:
@ -238,207 +141,126 @@ detection:
CommandLine|contains:
- '-EncodedCommand'
- 'bypass'
- 'downloadstring'
selection_network:
Initiated: true
DestinationPort:
- 443
- 80
DestinationPort: [443, 80]
condition: selection_process and selection_network
level: high
```
## Development
## 🛠️ Development
### Local Development
1. Start the database and services:
```bash
# Start dependencies
docker-compose up -d db redis ollama
```
2. Run the backend:
```bash
cd backend
pip install -r requirements.txt
# Backend
cd backend && pip install -r requirements.txt
uvicorn main:app --reload
# Frontend
cd frontend && npm install && npm start
```
3. Run the frontend:
```bash
cd frontend
npm install
npm start
```
### Testing Ollama Integration
To test the local LLM functionality:
1. **Check Ollama Status**:
### Testing LLM Integration
```bash
# Check Ollama
curl http://localhost:11434/api/tags
```
2. **Test LLM API Status**:
```bash
# Test LLM status
curl http://localhost:8000/api/llm-status
```
3. **Generate AI-Enhanced Rule**:
```bash
curl -X POST http://localhost:8000/api/llm-enhanced-rules \
-H "Content-Type: application/json" \
-d '{"cve_id": "CVE-2025-1234", "poc_content": "example exploit code"}'
```
4. **Switch LLM Provider**:
```bash
# Switch providers
curl -X POST http://localhost:8000/api/llm-switch \
-H "Content-Type: application/json" \
-d '{"provider": "ollama", "model": "llama3.2"}'
```
### Database Migration
## 📊 Architecture
The application automatically creates tables on startup. For manual schema changes:
- **Backend**: FastAPI + SQLAlchemy ORM
- **Frontend**: React + Tailwind CSS
- **Database**: PostgreSQL with enhanced schema
- **Cache**: Redis (optional)
- **LLM**: Ollama container + multi-provider support
- **Deployment**: Docker Compose
```bash
# Connect to database
docker-compose exec db psql -U cve_user -d cve_sigma_db
### Enhanced Database Schema
- **CVEs**: PoC metadata, bulk processing fields
- **SIGMA Rules**: Quality scoring, nomi-sec data
- **Rule Templates**: Pattern templates for generation
- **Bulk Jobs**: Job tracking and status
# Run custom SQL
\i /path/to/migration.sql
```
## SIGMA Rule Quality
Generated rules are marked as "experimental" and should be:
- Reviewed by security analysts
- Tested in a lab environment
- Tuned to reduce false positives
- Validated against real attack scenarios
## Monitoring
### Logs
View application logs:
```bash
# All services
docker-compose logs -f
# Specific service
docker-compose logs -f backend
```
### Health Checks
The application includes health checks for database connectivity. Monitor with:
```bash
docker-compose ps
```
## ✅ **Recent Fixes (July 2025)**
- **Fixed 404 CVE fetch error**: Corrected NVD API 2.0 endpoint format and parameters
- **Updated for current dates**: Now properly fetches CVEs from July 2025 (current date)
- **Improved API integration**: Better error handling, fallback mechanisms, and debugging
- **Enhanced date handling**: Proper ISO-8601 format with UTC timezone
- **API key integration**: Correctly passes API keys in headers for higher rate limits
## Troubleshooting
## 🔧 Troubleshooting
### Common Issues
1. **Frontend build fails with "npm ci" error**: This is fixed in the current version. The Dockerfile now uses `npm install` instead of `npm ci`.
2. **CVE Fetch returns 404**: Fixed in latest version. The application now uses proper NVD API 2.0 format with current 2025 dates.
3. **No CVEs being fetched**:
- Check if you have an NVD API key configured in `.env` for better rate limits
- Use the "Test NVD API" button to verify connectivity
- Check backend logs: `docker-compose logs -f backend`
4. **Database Connection Error**: Ensure PostgreSQL is running and accessible
5. **Frontend Not Loading**: Verify backend is running and CORS is configured
6. **Rule Generation Issues**: Check CVE description quality and template matching
7. **Port conflicts**: If ports 3000, 8000, or 5432 are in use, stop other services or modify docker-compose.yml
**CVE Fetch Issues**
- Verify NVD API key in `.env`
- Check API connectivity: Use "Test NVD API" button
- Review logs: `docker-compose logs -f backend`
### API Key Setup
**No Rules Generated**
- Ensure LLM provider is accessible
- Check `/api/llm-status` for provider health
- Verify PoC data quality in CVE details
**NVD API (Recommended)**
For optimal CVE fetching performance:
1. Visit: https://nvd.nist.gov/developers/request-an-api-key
2. Add to your `.env` file: `NVD_API_KEY=your_key_here`
3. Restart the application
**Performance Issues**
- Start with recent years (2020+) for faster initial setup
- Use smaller batch sizes for bulk operations
- Monitor system resources during processing
Without an API key: 5 requests per 30 seconds
With an API key: 50 requests per 30 seconds
**GitHub API (Optional)**
For enhanced exploit-based SIGMA rules:
1. Visit: https://github.com/settings/tokens
2. Create token with "public_repo" scope
3. Add to your `.env` file: `GITHUB_TOKEN=your_token_here`
4. Restart the application
Without a GitHub token: Basic rules only
With a GitHub token: Enhanced rules with exploit analysis (🔍 Exploit-Based)
**LLM API Keys (Optional)**
For AI-enhanced SIGMA rule generation:
**Local Ollama (Recommended - No API Key Required)**
- Ollama runs locally in Docker container
- No external API dependencies
- Models downloaded automatically on first use
- Default model: llama3.2 (configurable)
**OpenAI API (Optional)**
1. Visit: https://platform.openai.com/api-keys
2. Create API key
3. Add to your `.env` file: `OPENAI_API_KEY=your_key_here`
4. Set `LLM_PROVIDER=openai` in `.env`
**Anthropic API (Optional)**
1. Visit: https://console.anthropic.com/
2. Create API key
3. Add to your `.env` file: `ANTHROPIC_API_KEY=your_key_here`
4. Set `LLM_PROVIDER=anthropic` in `.env`
**Port Conflicts**
- Default ports: 3000 (frontend), 8000 (backend), 5432 (db)
- Modify `docker-compose.yml` if ports are in use
### Rate Limits
- **NVD API**: 5/30s (no key) → 50/30s (with key)
- **nomi-sec API**: 1/second (built-in limiting)
- **GitHub API**: 60/hour (no token) → 5000/hour (with token)
Without an API key, NVD limits requests to 5 per 30 seconds. With an API key, the limit increases to 50 per 30 seconds.
## 🛡️ Security
## Security Considerations
- Store API keys in environment variables
- Validate generated rules before production deployment
- Rules marked as "experimental" - require analyst review
- Use strong database passwords in production
- **API Keys**: Store NVD API keys securely using environment variables
- **Database Access**: Use strong passwords and restrict database access
- **Network Security**: Deploy behind a reverse proxy in production
- **Rule Validation**: Always validate generated SIGMA rules before deployment
## 📈 Monitoring
## Contributing
```bash
# View logs
docker-compose logs -f backend
docker-compose logs -f frontend
1. Fork the repository
2. Create a feature branch
3. Make changes and add tests
4. Submit a pull request
# Check service health
docker-compose ps
## License
# Monitor bulk jobs
curl http://localhost:8000/api/bulk-status
```
This project is licensed under the MIT License - see the LICENSE file for details.
## 🗺️ Roadmap
## Support
For issues and questions:
1. Check the troubleshooting section
2. Review application logs
3. Open an issue on GitHub
## Roadmap
Planned features:
- [ ] Custom rule template editor
- [ ] MITRE ATT&CK mapping
- [ ] Rule effectiveness scoring
- [ ] Export to SIEM platforms
- [ ] Advanced threat intelligence integration
- [ ] Machine learning-based rule optimization
- [ ] Advanced MITRE ATT&CK mapping
- [ ] SIEM platform export
- [ ] ML-based rule optimization
- [ ] Threat intelligence integration
## 📝 License
MIT License - see LICENSE file for details.
## 🤝 Contributing
1. Fork repository
2. Create feature branch
3. Add tests and documentation
4. Submit pull request
## 📞 Support
- Check troubleshooting section
- Review application logs
- Open GitHub issue for bugs/questions

447
backend/cve2capec_client.py Normal file
View file

@ -0,0 +1,447 @@
"""
CVE2CAPEC client for retrieving MITRE ATT&CK technique mappings.
Integrates with the CVE2CAPEC repository: https://github.com/Galeax/CVE2CAPEC
"""
import json
import logging
import requests
from typing import Dict, List, Optional
import time
from datetime import datetime, timedelta
import os
logger = logging.getLogger(__name__)
class CVE2CAPECClient:
"""Client for accessing CVE to MITRE ATT&CK technique mappings."""
def __init__(self):
self.base_url = "https://raw.githubusercontent.com/Galeax/CVE2CAPEC/main"
self.cache_file = "/tmp/cve2capec_cache.json"
self.cache_expiry_hours = 24 # Cache for 24 hours
self.cve_mappings = {}
self.technique_names = {} # Map technique IDs to names
# Load cached data if available
self._load_cache()
# Load MITRE ATT&CK technique names
self._load_technique_names()
def _load_cache(self):
"""Load cached CVE mappings if they exist and are fresh."""
try:
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is still fresh
cache_time = datetime.fromisoformat(cache_data.get('timestamp', '2000-01-01'))
if datetime.now() - cache_time < timedelta(hours=self.cache_expiry_hours):
self.cve_mappings = cache_data.get('mappings', {})
logger.info(f"Loaded {len(self.cve_mappings)} CVE mappings from cache")
return
# Cache is stale or doesn't exist, fetch fresh data
self._fetch_fresh_data()
except Exception as e:
logger.error(f"Error loading CVE2CAPEC cache: {e}")
self._fetch_fresh_data()
def _fetch_fresh_data(self):
"""Fetch fresh CVE mappings from the repository."""
try:
logger.info("Fetching fresh CVE2CAPEC data from all database files...")
# Define year range to fetch (focusing on recent years first for better performance)
# Start with recent years that are most likely to be relevant
years_to_fetch = list(range(2018, 2026)) # 2018-2025
all_mappings = {}
for year in years_to_fetch:
try:
url = f"{self.base_url}/database/CVE-{year}.jsonl"
logger.info(f"Fetching CVE mappings for year {year}...")
response = requests.get(url, timeout=30)
response.raise_for_status()
# Parse JSONL format
year_mappings = {}
for line in response.text.strip().split('\n'):
if line.strip():
try:
data = json.loads(line)
year_mappings.update(data)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse line in {year} data: {e}")
continue
all_mappings.update(year_mappings)
logger.info(f"Loaded {len(year_mappings)} CVE mappings from {year}")
# Add a small delay to be respectful to the server
time.sleep(0.5)
except requests.RequestException as e:
logger.warning(f"Failed to fetch CVE-{year}.jsonl: {e}")
continue
except Exception as e:
logger.warning(f"Error processing CVE-{year}.jsonl: {e}")
continue
# Also try to fetch the new_cves.jsonl for the latest data
try:
logger.info("Fetching latest CVE mappings from new_cves.jsonl...")
url = f"{self.base_url}/results/new_cves.jsonl"
response = requests.get(url, timeout=30)
response.raise_for_status()
latest_mappings = {}
for line in response.text.strip().split('\n'):
if line.strip():
try:
data = json.loads(line)
latest_mappings.update(data)
except json.JSONDecodeError:
continue
all_mappings.update(latest_mappings)
logger.info(f"Added {len(latest_mappings)} latest CVE mappings")
except Exception as e:
logger.warning(f"Failed to fetch new_cves.jsonl: {e}")
self.cve_mappings = all_mappings
# Save to cache
cache_data = {
'timestamp': datetime.now().isoformat(),
'mappings': all_mappings,
'years_fetched': years_to_fetch
}
with open(self.cache_file, 'w') as f:
json.dump(cache_data, f)
logger.info(f"Successfully fetched and cached {len(all_mappings)} total CVE mappings")
except Exception as e:
logger.error(f"Error fetching CVE2CAPEC data: {e}")
# Continue with empty mappings if fetch fails
self.cve_mappings = {}
def _load_technique_names(self):
"""Load MITRE ATT&CK technique names for better rule descriptions."""
# Common MITRE ATT&CK techniques and their names
self.technique_names = {
# Initial Access
"1189": "Drive-by Compromise",
"1190": "Exploit Public-Facing Application",
"1133": "External Remote Services",
"1200": "Hardware Additions",
"1566": "Phishing",
"1091": "Replication Through Removable Media",
"1195": "Supply Chain Compromise",
"1199": "Trusted Relationship",
"1078": "Valid Accounts",
# Execution
"1059": "Command and Scripting Interpreter",
"1059.001": "PowerShell",
"1059.003": "Windows Command Shell",
"1059.005": "Visual Basic",
"1059.006": "Python",
"1203": "Exploitation for Client Execution",
"1559": "Inter-Process Communication",
"1106": "Execution through Module Load",
"1053": "Scheduled Task/Job",
"1129": "Shared Modules",
"1204": "User Execution",
"1047": "Windows Management Instrumentation",
# Persistence
"1098": "Account Manipulation",
"1197": "BITS Jobs",
"1547": "Boot or Logon Autostart Execution",
"1037": "Boot or Logon Initialization Scripts",
"1176": "Browser Extensions",
"1554": "Compromise Client Software Binary",
"1136": "Create Account",
"1543": "Create or Modify System Process",
"1546": "Event Triggered Execution",
"1133": "External Remote Services",
"1574": "Hijack Execution Flow",
"1525": "Implant Internal Image",
"1556": "Modify Authentication Process",
"1137": "Office Application Startup",
"1542": "Pre-OS Boot",
"1053": "Scheduled Task/Job",
"1505": "Server Software Component",
"1205": "Traffic Signaling",
"1078": "Valid Accounts",
# Privilege Escalation
"1548": "Abuse Elevation Control Mechanism",
"1134": "Access Token Manipulation",
"1547": "Boot or Logon Autostart Execution",
"1037": "Boot or Logon Initialization Scripts",
"1543": "Create or Modify System Process",
"1484": "Domain Policy Modification",
"1546": "Event Triggered Execution",
"1068": "Exploitation for Privilege Escalation",
"1574": "Hijack Execution Flow",
"1055": "Process Injection",
"1053": "Scheduled Task/Job",
"1078": "Valid Accounts",
# Defense Evasion
"1548": "Abuse Elevation Control Mechanism",
"1134": "Access Token Manipulation",
"1197": "BITS Jobs",
"1610": "Deploy Container",
"1140": "Deobfuscate/Decode Files or Information",
"1006": "Direct Volume Access",
"1484": "Domain Policy Modification",
"1480": "Execution Guardrails",
"1211": "Exploitation for Defense Evasion",
"1222": "File and Directory Permissions Modification",
"1564": "Hide Artifacts",
"1574": "Hijack Execution Flow",
"1562": "Impair Defenses",
"1070": "Indicator Removal on Host",
"1202": "Indirect Command Execution",
"1036": "Masquerading",
"1556": "Modify Authentication Process",
"1112": "Modify Registry",
"1207": "Rogue Domain Controller",
"1014": "Rootkit",
"1218": "Signed Binary Proxy Execution",
"1216": "Signed Script Proxy Execution",
"1553": "Subvert Trust Controls",
"1221": "Template Injection",
"1205": "Traffic Signaling",
"1535": "Unused/Unsupported Cloud Regions",
"1078": "Valid Accounts",
"1497": "Virtualization/Sandbox Evasion",
"1220": "XSL Script Processing",
# Credential Access
"1557": "Adversary-in-the-Middle",
"1110": "Brute Force",
"1555": "Credentials from Password Stores",
"1212": "Exploitation for Credential Access",
"1187": "Forced Authentication",
"1606": "Forge Web Credentials",
"1056": "Input Capture",
"1556": "Modify Authentication Process",
"1040": "Network Sniffing",
"1003": "OS Credential Dumping",
"1528": "Steal Application Access Token",
"1558": "Steal or Forge Kerberos Tickets",
"1111": "Two-Factor Authentication Interception",
"1552": "Unsecured Credentials",
# Discovery
"1087": "Account Discovery",
"1010": "Application Window Discovery",
"1217": "Browser Bookmark Discovery",
"1580": "Cloud Infrastructure Discovery",
"1538": "Cloud Service Dashboard",
"1526": "Cloud Service Discovery",
"1613": "Container and Resource Discovery",
"1482": "Domain Trust Discovery",
"1083": "File and Directory Discovery",
"1615": "Group Policy Discovery",
"1046": "Network Service Scanning",
"1135": "Network Share Discovery",
"1201": "Password Policy Discovery",
"1069": "Permission Groups Discovery",
"1057": "Process Discovery",
"1012": "Query Registry",
"1018": "Remote System Discovery",
"1518": "Software Discovery",
"1082": "System Information Discovery",
"1614": "System Location Discovery",
"1016": "System Network Configuration Discovery",
"1049": "System Network Connections Discovery",
"1033": "System Owner/User Discovery",
"1007": "System Service Discovery",
"1124": "System Time Discovery",
"1497": "Virtualization/Sandbox Evasion",
# Lateral Movement
"1210": "Exploitation of Remote Services",
"1534": "Internal Spearphishing",
"1570": "Lateral Tool Transfer",
"1021": "Remote Service Session Hijacking",
"1021.001": "RDP Hijacking",
"1021.002": "SSH Hijacking",
"1021.004": "Tty Shell Hijacking",
"1021.005": "VNC Hijacking",
"1080": "Taint Shared Content",
"1550": "Use Alternate Authentication Material",
# Collection
"1557": "Adversary-in-the-Middle",
"1560": "Archive Collected Data",
"1123": "Audio Capture",
"1119": "Automated Collection",
"1185": "Browser Session Hijacking",
"1115": "Clipboard Data",
"1530": "Data from Cloud Storage Object",
"1602": "Data from Configuration Repository",
"1213": "Data from Information Repositories",
"1005": "Data from Local System",
"1039": "Data from Network Shared Drive",
"1025": "Data from Removable Media",
"1074": "Data Staged",
"1114": "Email Collection",
"1056": "Input Capture",
"1113": "Screen Capture",
"1125": "Video Capture",
# Command and Control
"1071": "Application Layer Protocol",
"1092": "Communication Through Removable Media",
"1132": "Data Encoding",
"1001": "Data Obfuscation",
"1568": "Dynamic Resolution",
"1573": "Encrypted Channel",
"1008": "Fallback Channels",
"1105": "Ingress Tool Transfer",
"1104": "Multi-Stage Channels",
"1095": "Non-Application Layer Protocol",
"1571": "Non-Standard Port",
"1572": "Protocol Tunneling",
"1090": "Proxy",
"1219": "Remote Access Software",
"1102": "Web Service",
# Exfiltration
"1020": "Automated Exfiltration",
"1030": "Data Transfer Size Limits",
"1048": "Exfiltration Over Alternative Protocol",
"1041": "Exfiltration Over C2 Channel",
"1011": "Exfiltration Over Other Network Medium",
"1052": "Exfiltration Over Physical Medium",
"1567": "Exfiltration Over Web Service",
"1029": "Scheduled Transfer",
"1537": "Transfer Data to Cloud Account",
# Impact
"1531": "Account Access Removal",
"1485": "Data Destruction",
"1486": "Data Encrypted for Impact",
"1565": "Data Manipulation",
"1491": "Defacement",
"1561": "Disk Wipe",
"1499": "Endpoint Denial of Service",
"1495": "Firmware Corruption",
"1490": "Inhibit System Recovery",
"1498": "Network Denial of Service",
"1496": "Resource Hijacking",
"1489": "Service Stop",
"1529": "System Shutdown/Reboot"
}
def get_mitre_techniques_for_cve(self, cve_id: str) -> List[str]:
"""Get MITRE ATT&CK techniques for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
techniques = cve_data.get('TECHNIQUES', [])
# Convert technique IDs to T-prefixed format
formatted_techniques = []
for tech in techniques:
if isinstance(tech, (int, str)):
formatted_techniques.append(f"T{tech}")
return formatted_techniques
except Exception as e:
logger.error(f"Error getting MITRE techniques for {cve_id}: {e}")
return []
def get_technique_name(self, technique_id: str) -> str:
"""Get the name for a MITRE ATT&CK technique ID."""
# Remove T prefix if present
clean_id = technique_id.replace('T', '')
return self.technique_names.get(clean_id, f"Technique {technique_id}")
def get_cwe_for_cve(self, cve_id: str) -> List[str]:
"""Get CWE codes for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
cwes = cve_data.get('CWE', [])
# Format CWE IDs
formatted_cwes = []
for cwe in cwes:
if isinstance(cwe, (int, str)):
formatted_cwes.append(f"CWE-{cwe}")
return formatted_cwes
except Exception as e:
logger.error(f"Error getting CWEs for {cve_id}: {e}")
return []
def get_capec_for_cve(self, cve_id: str) -> List[str]:
"""Get CAPEC codes for a given CVE ID."""
try:
cve_data = self.cve_mappings.get(cve_id, {})
capecs = cve_data.get('CAPEC', [])
# Format CAPEC IDs
formatted_capecs = []
for capec in capecs:
if isinstance(capec, (int, str)):
formatted_capecs.append(f"CAPEC-{capec}")
return formatted_capecs
except Exception as e:
logger.error(f"Error getting CAPECs for {cve_id}: {e}")
return []
def get_full_mapping_for_cve(self, cve_id: str) -> Dict:
"""Get complete CVE mapping including CWE, CAPEC, and MITRE techniques."""
try:
return {
'cve_id': cve_id,
'mitre_techniques': self.get_mitre_techniques_for_cve(cve_id),
'cwe_codes': self.get_cwe_for_cve(cve_id),
'capec_codes': self.get_capec_for_cve(cve_id),
'has_mappings': bool(self.cve_mappings.get(cve_id, {}))
}
except Exception as e:
logger.error(f"Error getting full mapping for {cve_id}: {e}")
return {
'cve_id': cve_id,
'mitre_techniques': [],
'cwe_codes': [],
'capec_codes': [],
'has_mappings': False
}
def get_stats(self) -> Dict:
"""Get statistics about the CVE2CAPEC dataset."""
total_cves = len(self.cve_mappings)
cves_with_techniques = len([cve for cve, data in self.cve_mappings.items()
if data.get('TECHNIQUES')])
cves_with_cwe = len([cve for cve, data in self.cve_mappings.items()
if data.get('CWE')])
cves_with_capec = len([cve for cve, data in self.cve_mappings.items()
if data.get('CAPEC')])
return {
'total_cves': total_cves,
'cves_with_mitre_techniques': cves_with_techniques,
'cves_with_cwe': cves_with_cwe,
'cves_with_capec': cves_with_capec,
'coverage_percentage': (cves_with_techniques / total_cves * 100) if total_cves > 0 else 0
}

View file

@ -10,6 +10,7 @@ from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
import re
from llm_client import LLMClient
from cve2capec_client import CVE2CAPECClient
# Configure logging
logging.basicConfig(level=logging.INFO)
@ -21,6 +22,7 @@ class EnhancedSigmaGenerator:
def __init__(self, db_session: Session, llm_provider: str = None, llm_model: str = None):
self.db_session = db_session
self.llm_client = LLMClient(provider=llm_provider, model=llm_model)
self.cve2capec_client = CVE2CAPECClient()
async def generate_enhanced_rule(self, cve, use_llm: bool = True) -> dict:
"""Generate enhanced SIGMA rule for a CVE using PoC data"""
@ -141,8 +143,8 @@ class EnhancedSigmaGenerator:
)
if rule_content:
# Validate the generated rule
if self.llm_client.validate_sigma_rule(rule_content):
# Validate the generated rule with CVE ID check
if self.llm_client.validate_sigma_rule(rule_content, cve.cve_id):
logger.info(f"Successfully generated LLM-enhanced rule for {cve.cve_id}")
return rule_content
else:
@ -468,26 +470,46 @@ class EnhancedSigmaGenerator:
return '\\n'.join(f" - {ref}" for ref in refs)
def _generate_tags(self, cve, poc_data: list) -> str:
"""Generate MITRE ATT&CK tags and other tags"""
"""Generate MITRE ATT&CK tags and other tags using CVE2CAPEC mappings"""
tags = []
# CVE tag
tags.append(cve.cve_id.lower())
# Add technique tags based on indicators
combined_indicators = self._combine_exploit_indicators(poc_data)
# Get MITRE ATT&CK techniques from CVE2CAPEC mapping
mitre_techniques = self.cve2capec_client.get_mitre_techniques_for_cve(cve.cve_id)
if combined_indicators.get('processes'):
tags.append('attack.t1059') # Command and Scripting Interpreter
if mitre_techniques:
logger.info(f"Found {len(mitre_techniques)} MITRE techniques for {cve.cve_id}: {mitre_techniques}")
# Add all mapped MITRE techniques
for technique in mitre_techniques:
# Convert to attack.t format (lowercase)
attack_tag = f"attack.{technique.lower()}"
if attack_tag not in tags:
tags.append(attack_tag)
else:
# Fallback to indicator-based technique detection
logger.info(f"No CVE2CAPEC mapping found for {cve.cve_id}, using indicator-based detection")
combined_indicators = self._combine_exploit_indicators(poc_data)
if combined_indicators.get('processes'):
tags.append('attack.t1059') # Command and Scripting Interpreter
if combined_indicators.get('network'):
tags.append('attack.t1071') # Application Layer Protocol
if combined_indicators.get('files'):
tags.append('attack.t1105') # Ingress Tool Transfer
if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])):
tags.append('attack.t1059.001') # PowerShell
if combined_indicators.get('network'):
tags.append('attack.t1071') # Application Layer Protocol
if combined_indicators.get('files'):
tags.append('attack.t1105') # Ingress Tool Transfer
if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])):
tags.append('attack.t1059.001') # PowerShell
# Get CWE codes for additional context
cwe_codes = self.cve2capec_client.get_cwe_for_cve(cve.cve_id)
if cwe_codes:
# Add the primary CWE as a tag
primary_cwe = cwe_codes[0].lower().replace('-', '.')
tags.append(primary_cwe)
# Add PoC quality tags
if poc_data:

View file

@ -12,6 +12,7 @@ from langchain_anthropic import ChatAnthropic
from langchain_community.llms import Ollama
from langchain_core.output_parsers import StrOutputParser
import yaml
from cve2capec_client import CVE2CAPECClient
logger = logging.getLogger(__name__)
@ -42,6 +43,7 @@ class LLMClient:
self.model = model or self._get_default_model(self.provider)
self.llm = None
self.output_parser = StrOutputParser()
self.cve2capec_client = CVE2CAPECClient()
self._initialize_llm()
@ -181,8 +183,11 @@ class LLMClient:
"existing_rule": existing_rule or "None"
}
logger.info(f"Sending to LLM for {cve_id}: CVE={cve_id}, Description length={len(cve_description)}, PoC length={len(poc_content)}")
logger.info(f"CVE Description for {cve_id}: {cve_description[:200]}...")
logger.info(f"PoC Content sample for {cve_id}: {poc_content[:200]}...")
# Generate the response
logger.info(f"Final prompt variables for {cve_id}: {list(input_data.keys())}")
response = await chain.ainvoke(input_data)
# Debug: Log raw LLM response
@ -194,6 +199,16 @@ class LLMClient:
# Post-process to ensure clean YAML
sigma_rule = self._post_process_sigma_rule(sigma_rule)
# Fix common YAML syntax errors
sigma_rule = self._fix_yaml_syntax_errors(sigma_rule)
# CRITICAL: Validate and fix CVE ID hallucination
sigma_rule = self._fix_hallucinated_cve_id(sigma_rule, cve_id)
# Additional fallback: If no CVE ID found, inject it into the rule
if not sigma_rule or 'CVE-' not in sigma_rule:
sigma_rule = self._inject_cve_id_into_rule(sigma_rule, cve_id)
# Debug: Log final processed rule
logger.info(f"Final processed rule for {cve_id}: {sigma_rule[:200]}...")
@ -263,18 +278,24 @@ class LLMClient:
- status: experimental
- description: Specific description based on CVE and PoC analysis
- author: 'AI Generated'
- date: Current date (2025/01/11)
- date: Current date (2025/01/14)
- references: Include the EXACT CVE URL with the CVE ID provided by the user
- tags: Relevant MITRE ATT&CK techniques based on PoC analysis
- logsource: Appropriate category based on exploit type
- detection: Specific indicators from PoC analysis (NOT generic examples)
- condition: Logic connecting the detection selections
**CRITICAL RULES:**
**CRITICAL ANTI-HALLUCINATION RULES:**
1. You MUST use the EXACT CVE ID provided in the user input - NEVER generate a different CVE ID
2. Analyze the provided CVE and PoC content to create SPECIFIC detection patterns
3. DO NOT hallucinate or invent CVE IDs from your training data
4. Use the CVE ID exactly as provided in the title and references"""
2. NEVER use example CVE IDs like CVE-2022-1234, CVE-2023-5678, or CVE-2024-1234
3. NEVER use placeholder CVE IDs from your training data
4. Analyze the provided CVE description and PoC content to create SPECIFIC detection patterns
5. DO NOT hallucinate or invent CVE IDs from your training data
6. Use the CVE ID exactly as provided in the title and references
7. Generate rules based ONLY on the provided CVE description and PoC code analysis
8. Do not reference vulnerabilities or techniques not present in the provided content
9. CVE-2022-1234 is a FORBIDDEN example CVE ID - NEVER use it
10. The user will provide the EXACT CVE ID to use - use that and ONLY that"""
if existing_rule:
user_template = """CVE ID: {cve_id}
@ -288,31 +309,67 @@ Existing SIGMA Rule:
Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with 'title:'."""
else:
user_template = """CREATE A SPECIFIC SIGMA RULE FOR THIS EXACT CVE:
# Get MITRE ATT&CK mappings for the CVE
mitre_mappings = self.cve2capec_client.get_full_mapping_for_cve(cve_id)
mitre_suggestions = ""
if mitre_mappings['mitre_techniques']:
technique_details = []
for tech in mitre_mappings['mitre_techniques']:
tech_name = self.cve2capec_client.get_technique_name(tech)
technique_details.append(f" - {tech}: {tech_name}")
mitre_suggestions = f"""
**MITRE ATT&CK TECHNIQUE MAPPINGS FOR {cve_id}:**
{chr(10).join(technique_details)}
**MANDATORY CVE ID TO USE: {cve_id}**
**CVE Description: {cve_description}**
**IMPORTANT:** Use these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1059 becomes attack.t1059)."""
if mitre_mappings['cwe_codes']:
mitre_suggestions += f"""
**CWE MAPPINGS:** {', '.join(mitre_mappings['cwe_codes'])}"""
user_template = f"""CREATE A SPECIFIC SIGMA RULE FOR THIS EXACT CVE:
**MANDATORY CVE ID TO USE: {{cve_id}}**
**CVE Description: {{cve_description}}**
**Proof-of-Concept Code Analysis:**
{poc_content}
{{poc_content}}
{mitre_suggestions}
**CRITICAL REQUIREMENTS:**
1. Use EXACTLY this CVE ID in the title: {cve_id}
2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{cve_id}
1. Use EXACTLY this CVE ID in the title: {{cve_id}}
2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{{cve_id}}
3. Analyze the CVE description to understand the vulnerability type
4. Extract specific indicators from the PoC code (files, processes, commands, network patterns)
5. Create detection logic based on the actual exploit behavior
6. Use relevant logsource category (process_creation, file_event, network_connection, etc.)
7. Include appropriate MITRE ATT&CK tags based on the exploit techniques
7. Include the MITRE ATT&CK tags listed above in your tags section (convert to attack.t format)
**IMPORTANT: You MUST use the exact CVE ID "{cve_id}" - do NOT generate a different CVE ID!**
**CRITICAL ANTI-HALLUCINATION REQUIREMENTS:**
- THE CVE ID IS: {{cve_id}}
- DO NOT use CVE-2022-1234, CVE-2023-1234, CVE-2024-1234, or any other example CVE ID
- DO NOT generate a different CVE ID from your training data
- You MUST use the exact CVE ID "{{cve_id}}" - this is the ONLY acceptable CVE ID for this rule
- Base your analysis ONLY on the provided CVE description and PoC code above
- Do not reference other vulnerabilities or exploits not mentioned in the provided content
- NEVER use placeholder CVE IDs like CVE-YYYY-NNNN or CVE-2022-1234
Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE ID {cve_id}."""
**ABSOLUTE REQUIREMENT: THE EXACT CVE ID TO USE IS: {{cve_id}}**
**FORBIDDEN: Do not use CVE-2022-1234, CVE-2023-5678, or any other example CVE ID**
Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE ID {{cve_id}}."""
return ChatPromptTemplate.from_messages([
SystemMessage(content=system_message),
HumanMessage(content=user_template)
# Create the prompt template with proper variable definitions
prompt_template = ChatPromptTemplate.from_messages([
("system", system_message),
("human", user_template)
])
return prompt_template
def _extract_sigma_rule(self, response_text: str) -> str:
"""Extract and clean SIGMA rule YAML from LLM response."""
@ -351,6 +408,10 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
if '{' in stripped and '}' in stripped:
continue
# Skip lines that contain template placeholder text
if 'cve_id' in stripped.lower() or 'cve description' in stripped.lower():
continue
# Skip lines that are clearly not YAML structure
if stripped and not ':' in stripped and len(stripped) > 20:
continue
@ -407,10 +468,17 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
]):
continue
# Skip template variables
# Skip template variables and placeholder text
if '{' in stripped and '}' in stripped:
continue
# Skip lines that contain template placeholder patterns
if any(placeholder in stripped.lower() for placeholder in [
'cve_id', 'cve description', 'poc_content', 'existing_rule',
'{cve_id}', '{cve_description}', '{poc_content}'
]):
continue
# Skip lines that look like explanations
if stripped and not ':' in stripped and not stripped.startswith('-') and not stripped.startswith(' '):
# This might be explanatory text, skip it
@ -425,6 +493,520 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE
return '\n'.join(cleaned_lines).strip()
def _fix_yaml_syntax_errors(self, rule_content: str) -> str:
"""Fix common YAML syntax errors in LLM-generated rules."""
import re
if not rule_content:
return rule_content
lines = rule_content.split('\n')
fixed_lines = []
fixes_applied = []
for line in lines:
fixed_line = line
# Fix invalid YAML alias syntax: - *image* -> - '*image*'
# YAML aliases must be alphanumeric, but LLM uses *word* or *multiple words* for wildcards
if '- *' in line and '*' in line:
# Match patterns like "- *image*" or "- *process*" or "- *unpatched system*"
pattern = r'(\s*-\s*)(\*[^*]+\*)'
if re.search(pattern, line):
fixed_line = re.sub(pattern, r"\1'\2'", line)
fixes_applied.append(f"Fixed invalid YAML alias syntax: {line.strip()} -> {fixed_line.strip()}")
# Also fix similar patterns in values: key: *value* -> key: '*value*'
elif re.search(r':\s*\*[^*]+\*\s*$', line) and not re.search(r'[\'"]', line):
pattern = r'(:\s*)(\*[^*]+\*)'
fixed_line = re.sub(pattern, r"\1'\2'", line)
fixes_applied.append(f"Fixed invalid YAML alias in value: {line.strip()} -> {fixed_line.strip()}")
# Fix unquoted strings that start with special characters
elif re.match(r'^\s*-\s*[*&|>]', line):
# If line starts with -, followed by special YAML chars, quote it
parts = line.split('-', 1)
if len(parts) == 2:
indent = parts[0]
content = parts[1].strip()
if content and not content.startswith(("'", '"')):
fixed_line = f"{indent}- '{content}'"
fixes_applied.append(f"Quoted special character value: {line.strip()} -> {fixed_line.strip()}")
# Fix invalid boolean values
elif ': *' in line and not line.strip().startswith('#'):
# Replace ": *something*" with ": '*something*'" if not already quoted
pattern = r'(:\s*)(\*[^*]+\*)'
if not re.search(r'[\'"]', line) and re.search(pattern, line):
fixed_line = re.sub(pattern, r"\1'\2'", line)
fixes_applied.append(f"Fixed unquoted wildcard value: {line.strip()} -> {fixed_line.strip()}")
# Fix missing quotes around values with special characters (but not YAML indicators)
elif re.search(r':\s*[*&]', line) and not re.search(r'[\'"]', line):
# Don't quote YAML multiline indicators (|, >)
if not re.search(r':\s*[|>]\s*$', line):
parts = line.split(':', 1)
if len(parts) == 2:
key = parts[0]
value = parts[1].strip()
if value and not value.startswith(("'", '"', '[', '{')):
fixed_line = f"{key}: '{value}'"
fixes_applied.append(f"Quoted special character value: {line.strip()} -> {fixed_line.strip()}")
# Fix invalid array syntax
elif re.search(r'^\s*\*[^*]+\*\s*$', line):
# Standalone *word* or *multiple words* lines should be quoted
indent = len(line) - len(line.lstrip())
content = line.strip()
fixed_line = f"{' ' * indent}'{content}'"
fixes_applied.append(f"Fixed standalone wildcard: {line.strip()} -> {fixed_line.strip()}")
fixed_lines.append(fixed_line)
result = '\n'.join(fixed_lines)
# Additional fixes for common YAML issues
# Fix missing spaces after colons
colon_fix = re.sub(r':([^\s])', r': \1', result)
if colon_fix != result:
fixes_applied.append("Added missing spaces after colons")
result = colon_fix
# Fix multiple spaces after colons
space_fix = re.sub(r':\s{2,}', ': ', result)
if space_fix != result:
fixes_applied.append("Fixed multiple spaces after colons")
result = space_fix
# Fix incorrect reference format: references: - https://... -> references:\n - https://...
ref_fix = re.sub(r'references:\s*-\s*', 'references:\n - ', result)
if ref_fix != result:
fixes_applied.append("Fixed references array format")
result = ref_fix
# Fix broken URLs in references (spaces in URLs)
url_fix = re.sub(r'https:\s*//nvd\.nist\.gov', 'https://nvd.nist.gov', result)
if url_fix != result:
fixes_applied.append("Fixed broken URLs in references")
result = url_fix
# Fix incorrect logsource format: logsource: category: X -> logsource:\n category: X
logsource_fix = re.sub(r'logsource:\s*(category|product|service):\s*', r'logsource:\n \1: ', result)
if logsource_fix != result:
fixes_applied.append("Fixed logsource structure format")
result = logsource_fix
# Fix incorrect detection format: detection: selection: key: value -> detection:\n selection:\n key: value
detection_fix = re.sub(r'detection:\s*(\w+):\s*(\w+):\s*', r'detection:\n \1:\n \2: ', result)
if detection_fix != result:
fixes_applied.append("Fixed detection structure format")
result = detection_fix
# Fix detection lines with == operators: detection: selection1: image == *value* -> detection:\n selection1:\n image: '*value*'
# This handles compressed syntax with equality operators
# Make the pattern more flexible to catch various formats
detection_eq_patterns = [
(r'detection:\s*(\w+):\s*(\w+)\s*==\s*(\*[^*\s]+\*)', r'detection:\n \1:\n \2: \'\3\''),
(r'detection:\s*(\w+):\s*(\w+)\s*==\s*([^\s]+)', r'detection:\n \1:\n \2: \'\3\''),
]
for pattern, replacement in detection_eq_patterns:
detection_eq_fix = re.sub(pattern, replacement, result)
if detection_eq_fix != result:
fixes_applied.append("Fixed detection equality operator syntax")
result = detection_eq_fix
break
# Fix standalone equality operators in detection sections: key == *value* -> key: '*value*'
# Also handle lines with multiple keys/values separated by colons and ==
lines = result.split('\n')
eq_fixed_lines = []
for line in lines:
original_line = line
# Look for pattern: whitespace + key == *value* or key == value
if ' == ' in line:
# Handle complex patterns like "detection: selection1: image == *value*"
if line.strip().startswith('detection:') and ' == ' in line:
# Split by colons to handle nested structure
parts = line.split(':')
if len(parts) >= 3:
# This looks like "detection: selection1: image == *value*"
base_indent = len(line) - len(line.lstrip())
# Extract the parts
detection_part = parts[0].strip() # "detection"
selection_part = parts[1].strip() # "selection1"
key_value_part = ':'.join(parts[2:]).strip() # "image == *value*"
# Parse the key == value part
if ' == ' in key_value_part:
eq_parts = key_value_part.split(' == ', 1)
key = eq_parts[0].strip()
value = eq_parts[1].strip()
# Quote the value if needed
if value.startswith('*') and value.endswith('*') and not value.startswith("'"):
value = f"'{value}'"
elif not value.startswith(("'", '"', '[', '{')):
value = f"'{value}'"
# Reconstruct as proper YAML
eq_fixed_lines.append(f"{' ' * base_indent}detection:")
eq_fixed_lines.append(f"{' ' * (base_indent + 4)}{selection_part}:")
eq_fixed_lines.append(f"{' ' * (base_indent + 8)}{key}: {value}")
fixes_applied.append(f"Fixed complex detection equality: {selection_part}: {key} == {value}")
continue
# Handle simpler patterns: " key == value"
elif re.match(r'^(\s+)(\w+)\s*==\s*(.+)$', line):
match = re.match(r'^(\s+)(\w+)\s*==\s*(.+)$', line)
indent = match.group(1)
key = match.group(2)
value = match.group(3).strip()
# Ensure wildcards are quoted
if value.startswith('*') and value.endswith('*') and not value.startswith("'"):
value = f"'{value}'"
elif not value.startswith(("'", '"', '[', '{')):
value = f"'{value}'"
eq_fixed_lines.append(f"{indent}{key}: {value}")
fixes_applied.append(f"Fixed equality operator: {key} == {value}")
continue
eq_fixed_lines.append(original_line)
if len(eq_fixed_lines) != len(lines):
result = '\n'.join(eq_fixed_lines)
# Fix invalid array-as-value syntax: key: - value -> key:\n - value
# This handles cases like "CommandLine: - '*image*'" which should be "CommandLine:\n - '*image*'"
lines = result.split('\n')
fixed_lines = []
for line in lines:
# Look for pattern: whitespace + key: - value
if re.match(r'^(\s+)(\w+):\s*-\s*(.+)$', line):
match = re.match(r'^(\s+)(\w+):\s*-\s*(.+)$', line)
indent = match.group(1)
key = match.group(2)
value = match.group(3)
# Convert to proper array format
fixed_lines.append(f"{indent}{key}:")
fixed_lines.append(f"{indent} - {value}")
fixes_applied.append(f"Fixed array-as-value syntax: {key}: - {value}")
else:
fixed_lines.append(line)
if len(fixed_lines) != len(lines):
result = '\n'.join(fixed_lines)
# Fix complex nested syntax errors like "selection1: Image: - '*path*': value"
# This should be "selection1:\n Image:\n - '*path*': value"
complex_fix = re.sub(r'^(\s+)(\w+):\s*(\w+):\s*-\s*(.+)$',
r'\1\2:\n\1 \3:\n\1 - \4',
result, flags=re.MULTILINE)
if complex_fix != result:
fixes_applied.append("Fixed complex nested structure syntax")
result = complex_fix
# Fix incorrect tags format: tags: - T1059.001 -> tags:\n - T1059.001
tags_fix = re.sub(r'tags:\s*-\s*', 'tags:\n - ', result)
if tags_fix != result:
fixes_applied.append("Fixed tags array format")
result = tags_fix
# Fix other common single-line array formats
for field in ['falsepositives', 'level', 'related']:
field_pattern = f'{field}:\\s*-\\s*'
field_replacement = f'{field}:\n - '
field_fix = re.sub(field_pattern, field_replacement, result)
if field_fix != result:
fixes_applied.append(f"Fixed {field} array format")
result = field_fix
# Fix placeholder UUID if LLM used the example one
import uuid
placeholder_uuid = '12345678-1234-1234-1234-123456789012'
if placeholder_uuid in result:
new_uuid = str(uuid.uuid4())
result = result.replace(placeholder_uuid, new_uuid)
fixes_applied.append(f"Replaced placeholder UUID with {new_uuid[:8]}...")
# Fix orphaned list items (standalone lines starting with -)
lines = result.split('\n')
fixed_lines = []
for i, line in enumerate(lines):
stripped = line.strip()
# Check for orphaned list items (lines starting with - but not part of an array)
if (stripped.startswith('- ') and
i > 0 and
not lines[i-1].strip().endswith(':') and
':' not in stripped and
not stripped.startswith('- https://')): # Don't remove reference URLs
# Check if this looks like a MITRE ATT&CK tag
if re.match(r'- T\d{4}', stripped):
# Try to find the tags section and add it there
tags_line_found = False
for j in range(len(fixed_lines)-1, -1, -1):
if fixed_lines[j].strip().startswith('tags:'):
# This is an orphaned tag, add it to the tags array
fixed_lines.append(f" {stripped}")
fixes_applied.append(f"Fixed orphaned MITRE tag: {stripped}")
tags_line_found = True
break
if not tags_line_found:
# No tags section found, remove the orphaned item
fixes_applied.append(f"Removed orphaned tag (no tags section): {stripped}")
continue
else:
# Other orphaned list items, remove them
fixes_applied.append(f"Removed orphaned list item: {stripped}")
continue
fixed_lines.append(line)
result = '\n'.join(fixed_lines)
# Final pass: Remove lines that are still malformed and would cause YAML parsing errors
lines = result.split('\n')
final_lines = []
for line in lines:
stripped = line.strip()
# Skip lines that have multiple colons in problematic patterns
if re.search(r':\s*\w+:\s*-\s*[\'"][^\'":]*[\'"]:\s*', line):
# This looks like "key: subkey: - 'value': more_stuff" which is malformed
fixes_applied.append(f"Removed malformed nested line: {stripped[:50]}...")
continue
# Skip lines with invalid YAML mapping structures
if re.search(r'^\s*\w+:\s*\w+:\s*-\s*[\'"][^\'":]*[\'"]:\s*\w+', line):
fixes_applied.append(f"Removed invalid mapping structure: {stripped[:50]}...")
continue
final_lines.append(line)
if len(final_lines) != len(lines):
result = '\n'.join(final_lines)
# Log if we made any fixes
if fixes_applied:
logger.info(f"Applied YAML syntax fixes: {', '.join(fixes_applied)}")
# Final YAML structure validation and repair
result = self._validate_and_repair_yaml_structure(result, fixes_applied)
return result
def _validate_and_repair_yaml_structure(self, content: str, fixes_applied: list) -> str:
"""Use YAML library to validate and repair structural issues."""
try:
# First, try to parse the YAML to see if it's valid
yaml.safe_load(content)
# If we get here, the YAML is valid
return content
except yaml.YAMLError as e:
logger.warning(f"YAML structure validation failed: {e}")
# Try to repair common structural issues
repaired_content = self._repair_yaml_structure(content, str(e))
# Test if the repair worked
try:
yaml.safe_load(repaired_content)
fixes_applied.append("Repaired YAML document structure")
logger.info("Successfully repaired YAML structure")
return repaired_content
except yaml.YAMLError as e2:
logger.warning(f"YAML repair attempt failed: {e2}")
# Last resort: try to build a minimal valid SIGMA rule
return self._build_minimal_valid_rule(content, fixes_applied)
def _repair_yaml_structure(self, content: str, error_msg: str) -> str:
"""Attempt to repair common YAML structural issues."""
lines = content.split('\n')
repaired_lines = []
# Track indentation levels to detect issues
expected_indent = 0
in_detection = False
detection_indent = 0
for i, line in enumerate(lines):
stripped = line.strip()
current_indent = len(line) - len(line.lstrip())
# Skip empty lines
if not stripped:
repaired_lines.append(line)
continue
# Track if we're in the detection section
if stripped.startswith('detection:'):
in_detection = True
detection_indent = current_indent
repaired_lines.append(line)
continue
elif in_detection and current_indent <= detection_indent and not stripped.startswith(('condition:', 'timeframe:')):
# We've left the detection section
in_detection = False
# Fix indentation issues in detection section
if in_detection:
# Ensure proper indentation for detection subsections
if stripped.startswith(('selection', 'filter', 'condition')):
# This should be indented under detection
if current_indent <= detection_indent:
corrected_line = ' ' * (detection_indent + 4) + stripped
repaired_lines.append(corrected_line)
continue
elif current_indent > detection_indent + 4:
# This might be a detection field that needs proper indentation
if ':' in stripped and not stripped.startswith('-'):
# This looks like a field under a selection
if i > 0 and 'selection' in lines[i-1]:
corrected_line = ' ' * (detection_indent + 8) + stripped
repaired_lines.append(corrected_line)
continue
# Fix lines that start with wrong indentation
if ':' in stripped and not stripped.startswith('-'):
# This is a key-value pair
key = stripped.split(':')[0].strip()
# Top-level keys should not be indented
if key in ['title', 'id', 'status', 'description', 'author', 'date', 'references', 'tags', 'logsource', 'detection', 'falsepositives', 'level']:
if current_indent > 0:
corrected_line = stripped
repaired_lines.append(corrected_line)
continue
repaired_lines.append(line)
return '\n'.join(repaired_lines)
def _build_minimal_valid_rule(self, content: str, fixes_applied: list) -> str:
"""Build a minimal valid SIGMA rule from the content."""
lines = content.split('\n')
# Extract key components
title = "Unknown SIGMA Rule"
rule_id = "00000000-0000-0000-0000-000000000000"
description = "Generated SIGMA rule"
for line in lines:
stripped = line.strip()
if stripped.startswith('title:'):
title = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('id:'):
rule_id = stripped.split(':', 1)[1].strip().strip('"\'')
elif stripped.startswith('description:'):
description = stripped.split(':', 1)[1].strip().strip('"\'')
# Build minimal valid rule
minimal_rule = f"""title: '{title}'
id: {rule_id}
status: experimental
description: '{description}'
author: 'AI Generated'
date: 2025/01/14
references:
- https://example.com
logsource:
category: process_creation
detection:
selection:
Image: '*'
condition: selection
level: medium"""
fixes_applied.append("Built minimal valid SIGMA rule structure")
logger.warning("Generated minimal valid SIGMA rule as fallback")
return minimal_rule
def _fix_hallucinated_cve_id(self, rule_content: str, correct_cve_id: str) -> str:
"""Detect and fix hallucinated CVE IDs in the generated rule."""
import re
# Pattern to match CVE IDs (CVE-YYYY-NNNNN format)
cve_pattern = r'CVE-\d{4}-\d{4,7}'
# Find all CVE IDs in the rule content
found_cves = re.findall(cve_pattern, rule_content, re.IGNORECASE)
if found_cves:
# Check if any found CVE is different from the correct one
hallucinated_cves = [cve for cve in found_cves if cve.upper() != correct_cve_id.upper()]
if hallucinated_cves:
logger.error(f"CRITICAL: LLM hallucinated CVE IDs: {hallucinated_cves}, expected: {correct_cve_id}")
logger.error(f"This indicates the LLM is not following the prompt correctly!")
# Replace all hallucinated CVE IDs with the correct one
corrected_content = rule_content
for hallucinated_cve in set(hallucinated_cves): # Use set to avoid duplicates
corrected_content = re.sub(
re.escape(hallucinated_cve),
correct_cve_id,
corrected_content,
flags=re.IGNORECASE
)
logger.info(f"Successfully corrected hallucinated CVE IDs to {correct_cve_id}")
return corrected_content
else:
logger.info(f"CVE ID validation passed: found correct {correct_cve_id}")
else:
# No CVE ID found in rule - this might be an issue, but let's add it
logger.warning(f"No CVE ID found in generated rule for {correct_cve_id}, this might need manual review")
return rule_content
def _inject_cve_id_into_rule(self, rule_content: str, cve_id: str) -> str:
"""Inject CVE ID into a rule that lacks it."""
if not rule_content:
logger.warning(f"Empty rule content for {cve_id}, cannot inject CVE ID")
return rule_content
lines = rule_content.split('\n')
modified_lines = []
for i, line in enumerate(lines):
stripped = line.strip()
# Fix title line if it has placeholders
if stripped.startswith('title:'):
if '{cve_id}' in line.lower() or '{cve_description}' in line.lower():
# Replace with a proper title
modified_lines.append(f"title: 'Detection of {cve_id} exploitation'")
elif cve_id not in line:
# Add CVE ID to existing title
title_text = line.split(':', 1)[1].strip(' \'"')
modified_lines.append(f"title: '{cve_id}: {title_text}'")
else:
modified_lines.append(line)
# Fix references section if it has placeholders
elif stripped.startswith('- https://nvd.nist.gov/vuln/detail/') and '{cve_id}' in line:
modified_lines.append(f" - https://nvd.nist.gov/vuln/detail/{cve_id}")
# Skip lines with template placeholders
elif any(placeholder in line.lower() for placeholder in ['{cve_id}', '{cve_description}', '{poc_content}']):
continue
else:
modified_lines.append(line)
result = '\n'.join(modified_lines)
logger.info(f"Injected CVE ID {cve_id} into rule")
return result
async def enhance_existing_rule(self,
existing_rule: str,
poc_content: str,
@ -488,7 +1070,7 @@ Output ONLY the enhanced SIGMA rule in valid YAML format."""
logger.error(f"Failed to enhance SIGMA rule for {cve_id}: {e}")
return None
def validate_sigma_rule(self, rule_content: str) -> bool:
def validate_sigma_rule(self, rule_content: str, expected_cve_id: str = None) -> bool:
"""Validate that the generated rule follows SIGMA specification."""
try:
# Parse as YAML
@ -542,11 +1124,33 @@ Output ONLY the enhanced SIGMA rule in valid YAML format."""
logger.warning(f"Invalid status: {status}")
return False
# Additional validation: Check for correct CVE ID if provided
if expected_cve_id:
import re
cve_pattern = r'CVE-\d{4}-\d{4,7}'
found_cves = re.findall(cve_pattern, rule_content, re.IGNORECASE)
if found_cves:
# Check if all found CVE IDs match the expected one
wrong_cves = [cve for cve in found_cves if cve.upper() != expected_cve_id.upper()]
if wrong_cves:
logger.warning(f"Rule contains wrong CVE IDs: {wrong_cves}, expected {expected_cve_id}")
return False
else:
logger.warning(f"Rule does not contain expected CVE ID: {expected_cve_id}")
# Don't fail validation for missing CVE ID, just warn
logger.info("SIGMA rule validation passed")
return True
except yaml.YAMLError as e:
logger.warning(f"YAML parsing error: {e}")
error_msg = str(e)
if "alias" in error_msg.lower() and "*" in error_msg:
logger.warning(f"YAML alias syntax error (likely unquoted wildcard): {e}")
elif "expected" in error_msg.lower():
logger.warning(f"YAML structure error: {e}")
else:
logger.warning(f"YAML parsing error: {e}")
return False
except Exception as e:
logger.warning(f"Rule validation error: {e}")

View file

@ -22,6 +22,7 @@ import hashlib
import logging
import threading
from mcdevitt_poc_client import GitHubPoCClient
from cve2capec_client import CVE2CAPECClient
# Setup logging
logging.basicConfig(level=logging.INFO)
@ -1849,6 +1850,23 @@ async def get_poc_stats(db: Session = Depends(get_db)):
logger.error(f"Error getting PoC stats: {e}")
return {"error": str(e)}
@app.get("/api/cve2capec-stats")
async def get_cve2capec_stats():
"""Get CVE2CAPEC MITRE ATT&CK mapping statistics"""
try:
client = CVE2CAPECClient()
stats = client.get_stats()
return {
"status": "success",
"data": stats,
"description": "CVE to MITRE ATT&CK technique mappings from CVE2CAPEC repository"
}
except Exception as e:
logger.error(f"Error getting CVE2CAPEC stats: {e}")
return {"error": str(e)}
@app.post("/api/regenerate-rules")
async def regenerate_sigma_rules(background_tasks: BackgroundTasks,
request: RuleRegenRequest,