From 06c4ed74b87aa9c52ca24a7521db289d9320763c Mon Sep 17 00:00:00 2001 From: bpmcdevitt Date: Mon, 14 Jul 2025 15:48:10 -0500 Subject: [PATCH] add cve2capec client to map mitre attack data to cves --- README.md | 542 ++++++++--------------- backend/cve2capec_client.py | 447 +++++++++++++++++++ backend/enhanced_sigma_generator.py | 52 ++- backend/llm_client.py | 644 +++++++++++++++++++++++++++- backend/main.py | 18 + 5 files changed, 1308 insertions(+), 395 deletions(-) create mode 100644 backend/cve2capec_client.py diff --git a/README.md b/README.md index d4f1de5..ce1ca57 100644 --- a/README.md +++ b/README.md @@ -1,236 +1,139 @@ -# CVE-SIGMA Auto Generator (Enhanced) +# CVE-SIGMA Auto Generator -An advanced automated platform that processes comprehensive CVE data and generates enhanced SIGMA rules for threat detection using curated exploit intelligence. +Automated platform that generates SIGMA detection rules from CVE data using AI-enhanced exploit analysis. -## πŸš€ Enhanced Features +## ✨ Key Features -### Data Processing -- **Bulk NVD Processing**: Downloads and processes complete NVD JSON datasets (2002-2025) -- **nomi-sec PoC Integration**: Uses curated PoC data from github.com/nomi-sec/PoC-in-GitHub -- **Incremental Updates**: Efficient updates using NVD modified/recent feeds -- **Quality Assessment**: Advanced PoC quality scoring with star count, recency, and relevance analysis +- **Bulk CVE Processing**: Complete NVD datasets (2002-2025) with nomi-sec PoC integration +- **AI-Powered Rule Generation**: Multi-provider LLM support (OpenAI, Anthropic, local Ollama) +- **Quality-Based PoC Analysis**: 5-tier quality scoring system for exploit reliability +- **Real-time Monitoring**: Live job tracking and progress dashboard +- **Advanced Indicators**: Extract processes, files, network patterns from actual exploits -### Intelligence Generation -- **Enhanced SIGMA Rules**: Creates rules using real exploit indicators from curated PoCs -- **AI-Powered Rule Generation**: Multi-provider LLM integration (OpenAI, Anthropic, Ollama) -- **Local LLM Processing**: Built-in Ollama container for offline AI rule generation -- **Quality Tiers**: Excellent, Good, Fair, Poor, Very Poor classification system -- **Smart Template Selection**: AI-driven template matching based on PoC characteristics -- **Advanced Indicator Extraction**: Processes, files, network, registry, and command patterns -- **MITRE ATT&CK Mapping**: Automatic technique identification based on exploit analysis - -### User Experience -- **Modern Web Interface**: React-based UI with enhanced bulk processing controls -- **Real-time Monitoring**: Live job tracking and progress monitoring -- **Comprehensive Statistics**: PoC coverage, quality metrics, and processing status -- **Bulk Operations Dashboard**: Centralized control for all data processing operations - -## Architecture - -- **Backend**: FastAPI with SQLAlchemy ORM -- **Frontend**: React with Tailwind CSS -- **Database**: PostgreSQL -- **Cache**: Redis (optional) -- **LLM Engine**: Ollama (local models) with multi-provider support -- **Containerization**: Docker & Docker Compose - -## Quick Start +## πŸš€ Quick Start ### Prerequisites - - Docker and Docker Compose -- (Optional) NVD API Key for increased rate limits +- (Optional) API keys for enhanced features -### Setup +### Installation -1. Clone the repository: ```bash +# Clone and start git clone -cd cve-sigma-generator -``` - -2. **Quick Start** (Recommended): -```bash +cd auto_sigma_rule_generator chmod +x start.sh ./start.sh ``` -3. **Manual Setup**: -```bash -# Copy environment file -cp .env.example .env - -# (Optional) Edit .env and add your NVD API key -nano .env - -# Start the application -docker-compose up -d --build -``` - -4. Wait for services to initialize (about 30-60 seconds) - -5. Access the application: - - Frontend: http://localhost:3000 - - Backend API: http://localhost:8000 - - API Documentation: http://localhost:8000/docs - - Ollama API: http://localhost:11434 +**Access Points:** +- Frontend: http://localhost:3000 +- API: http://localhost:8000 +- API Docs: http://localhost:8000/docs ### First Run +The application automatically: +1. Initializes database with rule templates +2. Fetches recent CVEs from NVD +3. Generates SIGMA rules with AI enhancement +4. Polls for new CVEs hourly -The application will automatically: -1. Initialize the database with rule templates -2. Start fetching recent CVEs from NVD -3. Generate SIGMA rules for each CVE -4. Continue polling for new CVEs every hour - -## Usage +## 🎯 Usage ### Web Interface - -The web interface provides three main sections: - -1. **Dashboard**: Overview statistics and recent CVEs -2. **CVEs**: Complete list of all fetched CVEs with details -3. **SIGMA Rules**: Generated detection rules organized by CVE - -### Manual CVE Fetch - -You can trigger a manual CVE fetch using the "Fetch New CVEs" button in the dashboard or via API: - -```bash -curl -X POST http://localhost:8000/api/fetch-cves -``` +- **Dashboard**: Statistics and system overview +- **CVEs**: Complete CVE listing with PoC data +- **SIGMA Rules**: Generated detection rules +- **Bulk Jobs**: Processing status and controls ### API Endpoints -#### Core Endpoints -- `GET /api/cves` - List all CVEs -- `GET /api/cves/{cve_id}` - Get specific CVE details -- `GET /api/sigma-rules` - List all SIGMA rules -- `GET /api/sigma-rules/{cve_id}` - Get SIGMA rules for specific CVE -- `POST /api/fetch-cves` - Manually trigger CVE fetch -- `GET /api/stats` - Get application statistics +#### Core Operations +```bash +# Fetch CVEs +curl -X POST http://localhost:8000/api/fetch-cves -#### LLM-Enhanced Endpoints -- `POST /api/llm-enhanced-rules` - Generate SIGMA rules using LLM AI analysis -- `GET /api/llm-status` - Check LLM API availability and configuration -- `POST /api/llm-switch` - Switch between LLM providers and models +# Bulk processing +curl -X POST http://localhost:8000/api/bulk-seed +curl -X POST http://localhost:8000/api/incremental-update -## Configuration +# LLM-enhanced rules +curl -X POST http://localhost:8000/api/llm-enhanced-rules +``` + +#### Data Access +- `GET /api/cves` - List CVEs +- `GET /api/sigma-rules` - List rules +- `GET /api/stats` - Statistics +- `GET /api/llm-status` - LLM provider status + +## βš™οΈ Configuration ### Environment Variables -#### Core Configuration -- `DATABASE_URL`: PostgreSQL connection string -- `NVD_API_KEY`: Optional NVD API key for higher rate limits (5β†’50 requests/30s) -- `GITHUB_TOKEN`: Optional GitHub personal access token for exploit analysis -- `REACT_APP_API_URL`: Backend API URL for frontend - -#### LLM Configuration -- `LLM_PROVIDER`: LLM provider selection (openai, anthropic, ollama) - **Default: ollama** -- `LLM_MODEL`: Model selection (provider-specific) - **Default: llama3.2** -- `OLLAMA_BASE_URL`: Ollama service URL - **Default: http://ollama:11434** -- `OPENAI_API_KEY`: Optional OpenAI API key for GPT models -- `ANTHROPIC_API_KEY`: Optional Anthropic API key for Claude models - -### GitHub Integration (Optional) - -For enhanced SIGMA rule generation with exploit analysis: - -1. **Create GitHub Token**: Visit https://github.com/settings/tokens -2. **Required Permissions**: Only needs "public_repo" scope for searching public repositories -3. **Add to Environment**: `GITHUB_TOKEN=your_token_here` in `.env` file -4. **Benefits**: - - Automatically searches for CVE-related exploit code - - Extracts real indicators (processes, files, network connections) - - Generates more accurate and specific SIGMA rules - - Higher confidence ratings for exploit-based rules - -**Rate Limits**: 5000 requests/hour with token, 60/hour without - -### Rule Templates - -The application includes pre-configured rule templates for: -- Windows Process Execution -- Network Connections -- File Modifications - -Additional templates can be added to the database via the `rule_templates` table. - -## SIGMA Rule Generation Logic - -The enhanced rule generation process supports multiple generation methods: - -### Traditional Rule Generation -1. **CVE Analysis**: Analyzes CVE description and affected products -2. **GitHub Exploit Search**: Searches GitHub for exploit code using multiple query strategies -3. **Code Analysis**: Extracts specific indicators from exploit code: - - Process names and command lines - - File paths and registry keys - - Network connections and ports - - PowerShell commands and scripts - - Command execution patterns -4. **Template Selection**: Chooses appropriate SIGMA rule template based on exploit analysis -5. **Enhanced Rule Population**: Fills template with real exploit indicators -6. **MITRE ATT&CK Mapping**: Maps to specific MITRE ATT&CK techniques -7. **Confidence Scoring**: Higher confidence for exploit-based rules - -### AI-Enhanced Rule Generation -1. **LLM Provider Selection**: Chooses between OpenAI, Anthropic, or local Ollama -2. **Contextual Analysis**: LLM analyzes CVE description and PoC code -3. **Intelligent Rule Creation**: AI generates sophisticated SIGMA rules with: - - Proper YAML syntax and structure - - Advanced detection logic - - Contextual field selection - - Relevant MITRE ATT&CK mappings -4. **Automatic Validation**: Generated rules are validated for syntax compliance -5. **Fallback Mechanism**: Falls back to template-based generation if LLM fails - -### Rule Quality Levels - -- **Basic Rules**: Generated from CVE description only -- **Exploit-Based Rules**: Enhanced with GitHub exploit analysis (marked with πŸ”) -- **AI-Enhanced Rules**: Generated using LLM analysis of PoC code (marked with πŸ€–) -- **Confidence Ratings**: - - **High**: CVSS β‰₯9.0 + exploit analysis + AI enhancement - - **Medium**: CVSS β‰₯7.0 or exploit analysis or AI enhancement - - **Low**: Basic CVE description only - -### Template Matching - -- **PowerShell Execution**: Exploit contains PowerShell scripts or cmdlets -- **Process Execution**: Exploit shows process creation or command execution -- **Network Connection**: Exploit demonstrates network communications -- **File Modification**: Exploit involves file system operations - -### Example Enhanced Rules - -**Traditional Exploit-Based Rule:** -```yaml -title: CVE-2025-1234 Exploit-Based Detection -description: Detection for CVE-2025-1234 remote code execution [Enhanced with GitHub exploit analysis] -tags: - - attack.t1059.001 - - cve-2025-1234 - - exploit.github -detection: - selection: - Image|contains: - - "powershell.exe" - - "malicious_payload.exe" - - "reverse_shell.ps1" - condition: selection -level: high +**Core Settings** +```bash +DATABASE_URL=postgresql://user:pass@db:5432/dbname +NVD_API_KEY=your_nvd_key # Optional: 5β†’50 req/30s +GITHUB_TOKEN=your_github_token # Optional: Enhanced PoC analysis ``` -**AI-Enhanced Rule (Generated by Ollama):** +**LLM Configuration** +```bash +LLM_PROVIDER=ollama # Default: ollama (local) +LLM_MODEL=llama3.2 # Provider-specific model +OLLAMA_BASE_URL=http://ollama:11434 + +# External providers (optional) +OPENAI_API_KEY=your_openai_key +ANTHROPIC_API_KEY=your_anthropic_key +``` + +### API Keys Setup + +**NVD API** (Recommended) +1. Get key: https://nvd.nist.gov/developers/request-an-api-key +2. Add to `.env`: `NVD_API_KEY=your_key` +3. Benefit: 10x rate limit increase + +**GitHub Token** (Optional) +1. Create: https://github.com/settings/tokens (public_repo scope) +2. Add to `.env`: `GITHUB_TOKEN=your_token` +3. Benefit: Enhanced exploit-based rules + +**LLM APIs** (Optional) +- **Local Ollama**: No setup required (default) +- **OpenAI**: Get key from https://platform.openai.com/api-keys +- **Anthropic**: Get key from https://console.anthropic.com/ + +## 🧠 Rule Generation + +### AI-Enhanced Generation +1. **PoC Analysis**: LLM analyzes actual exploit code +2. **Intelligent Detection**: Creates sophisticated SIGMA rules +3. **Context Awareness**: Maps CVE descriptions to detection patterns +4. **Validation**: Automatic SIGMA syntax verification +5. **Fallback**: Template-based generation if LLM unavailable + +### Quality Tiers +- **Excellent** (80+ pts): High-quality PoCs with recent updates +- **Good** (60-79 pts): Moderate quality indicators +- **Fair** (40-59 pts): Basic PoCs with some validation +- **Poor** (20-39 pts): Minimal quality indicators +- **Very Poor** (<20 pts): Low-quality PoCs + +### Rule Types +- πŸ€– **AI-Enhanced**: LLM-generated with PoC analysis +- πŸ” **Exploit-Based**: Template + GitHub exploit indicators +- ⚑ **Basic**: CVE description only + +### Example Output ```yaml title: CVE-2025-1234 AI-Enhanced Detection -description: Detection for CVE-2025-1234 remote code execution [AI-Enhanced with PoC analysis] +description: Detection for CVE-2025-1234 RCE [AI-Enhanced with PoC analysis] tags: - attack.t1059.001 - - attack.t1071.001 - - cve-2025-1234 + - cve-2025-1234 - ai.enhanced detection: selection_process: @@ -238,207 +141,126 @@ detection: CommandLine|contains: - '-EncodedCommand' - 'bypass' - - 'downloadstring' selection_network: - Initiated: true - DestinationPort: - - 443 - - 80 + DestinationPort: [443, 80] condition: selection_process and selection_network level: high ``` -## Development +## πŸ› οΈ Development ### Local Development - -1. Start the database and services: ```bash +# Start dependencies docker-compose up -d db redis ollama -``` -2. Run the backend: -```bash -cd backend -pip install -r requirements.txt +# Backend +cd backend && pip install -r requirements.txt uvicorn main:app --reload + +# Frontend +cd frontend && npm install && npm start ``` -3. Run the frontend: -```bash -cd frontend -npm install -npm start -``` - -### Testing Ollama Integration - -To test the local LLM functionality: - -1. **Check Ollama Status**: +### Testing LLM Integration ```bash +# Check Ollama curl http://localhost:11434/api/tags -``` -2. **Test LLM API Status**: -```bash +# Test LLM status curl http://localhost:8000/api/llm-status -``` -3. **Generate AI-Enhanced Rule**: -```bash -curl -X POST http://localhost:8000/api/llm-enhanced-rules \ - -H "Content-Type: application/json" \ - -d '{"cve_id": "CVE-2025-1234", "poc_content": "example exploit code"}' -``` - -4. **Switch LLM Provider**: -```bash +# Switch providers curl -X POST http://localhost:8000/api/llm-switch \ -H "Content-Type: application/json" \ -d '{"provider": "ollama", "model": "llama3.2"}' ``` -### Database Migration +## πŸ“Š Architecture -The application automatically creates tables on startup. For manual schema changes: +- **Backend**: FastAPI + SQLAlchemy ORM +- **Frontend**: React + Tailwind CSS +- **Database**: PostgreSQL with enhanced schema +- **Cache**: Redis (optional) +- **LLM**: Ollama container + multi-provider support +- **Deployment**: Docker Compose -```bash -# Connect to database -docker-compose exec db psql -U cve_user -d cve_sigma_db +### Enhanced Database Schema +- **CVEs**: PoC metadata, bulk processing fields +- **SIGMA Rules**: Quality scoring, nomi-sec data +- **Rule Templates**: Pattern templates for generation +- **Bulk Jobs**: Job tracking and status -# Run custom SQL -\i /path/to/migration.sql -``` - -## SIGMA Rule Quality - -Generated rules are marked as "experimental" and should be: -- Reviewed by security analysts -- Tested in a lab environment -- Tuned to reduce false positives -- Validated against real attack scenarios - -## Monitoring - -### Logs - -View application logs: -```bash -# All services -docker-compose logs -f - -# Specific service -docker-compose logs -f backend -``` - -### Health Checks - -The application includes health checks for database connectivity. Monitor with: -```bash -docker-compose ps -``` - -## βœ… **Recent Fixes (July 2025)** - -- **Fixed 404 CVE fetch error**: Corrected NVD API 2.0 endpoint format and parameters -- **Updated for current dates**: Now properly fetches CVEs from July 2025 (current date) -- **Improved API integration**: Better error handling, fallback mechanisms, and debugging -- **Enhanced date handling**: Proper ISO-8601 format with UTC timezone -- **API key integration**: Correctly passes API keys in headers for higher rate limits - -## Troubleshooting +## πŸ”§ Troubleshooting ### Common Issues -1. **Frontend build fails with "npm ci" error**: This is fixed in the current version. The Dockerfile now uses `npm install` instead of `npm ci`. -2. **CVE Fetch returns 404**: Fixed in latest version. The application now uses proper NVD API 2.0 format with current 2025 dates. -3. **No CVEs being fetched**: - - Check if you have an NVD API key configured in `.env` for better rate limits - - Use the "Test NVD API" button to verify connectivity - - Check backend logs: `docker-compose logs -f backend` -4. **Database Connection Error**: Ensure PostgreSQL is running and accessible -5. **Frontend Not Loading**: Verify backend is running and CORS is configured -6. **Rule Generation Issues**: Check CVE description quality and template matching -7. **Port conflicts**: If ports 3000, 8000, or 5432 are in use, stop other services or modify docker-compose.yml +**CVE Fetch Issues** +- Verify NVD API key in `.env` +- Check API connectivity: Use "Test NVD API" button +- Review logs: `docker-compose logs -f backend` -### API Key Setup +**No Rules Generated** +- Ensure LLM provider is accessible +- Check `/api/llm-status` for provider health +- Verify PoC data quality in CVE details -**NVD API (Recommended)** -For optimal CVE fetching performance: -1. Visit: https://nvd.nist.gov/developers/request-an-api-key -2. Add to your `.env` file: `NVD_API_KEY=your_key_here` -3. Restart the application +**Performance Issues** +- Start with recent years (2020+) for faster initial setup +- Use smaller batch sizes for bulk operations +- Monitor system resources during processing -Without an API key: 5 requests per 30 seconds -With an API key: 50 requests per 30 seconds - -**GitHub API (Optional)** -For enhanced exploit-based SIGMA rules: -1. Visit: https://github.com/settings/tokens -2. Create token with "public_repo" scope -3. Add to your `.env` file: `GITHUB_TOKEN=your_token_here` -4. Restart the application - -Without a GitHub token: Basic rules only -With a GitHub token: Enhanced rules with exploit analysis (πŸ” Exploit-Based) - -**LLM API Keys (Optional)** -For AI-enhanced SIGMA rule generation: - -**Local Ollama (Recommended - No API Key Required)** -- Ollama runs locally in Docker container -- No external API dependencies -- Models downloaded automatically on first use -- Default model: llama3.2 (configurable) - -**OpenAI API (Optional)** -1. Visit: https://platform.openai.com/api-keys -2. Create API key -3. Add to your `.env` file: `OPENAI_API_KEY=your_key_here` -4. Set `LLM_PROVIDER=openai` in `.env` - -**Anthropic API (Optional)** -1. Visit: https://console.anthropic.com/ -2. Create API key -3. Add to your `.env` file: `ANTHROPIC_API_KEY=your_key_here` -4. Set `LLM_PROVIDER=anthropic` in `.env` +**Port Conflicts** +- Default ports: 3000 (frontend), 8000 (backend), 5432 (db) +- Modify `docker-compose.yml` if ports are in use ### Rate Limits +- **NVD API**: 5/30s (no key) β†’ 50/30s (with key) +- **nomi-sec API**: 1/second (built-in limiting) +- **GitHub API**: 60/hour (no token) β†’ 5000/hour (with token) -Without an API key, NVD limits requests to 5 per 30 seconds. With an API key, the limit increases to 50 per 30 seconds. +## πŸ›‘οΈ Security -## Security Considerations +- Store API keys in environment variables +- Validate generated rules before production deployment +- Rules marked as "experimental" - require analyst review +- Use strong database passwords in production -- **API Keys**: Store NVD API keys securely using environment variables -- **Database Access**: Use strong passwords and restrict database access -- **Network Security**: Deploy behind a reverse proxy in production -- **Rule Validation**: Always validate generated SIGMA rules before deployment +## πŸ“ˆ Monitoring -## Contributing +```bash +# View logs +docker-compose logs -f backend +docker-compose logs -f frontend -1. Fork the repository -2. Create a feature branch -3. Make changes and add tests -4. Submit a pull request +# Check service health +docker-compose ps -## License +# Monitor bulk jobs +curl http://localhost:8000/api/bulk-status +``` -This project is licensed under the MIT License - see the LICENSE file for details. +## πŸ—ΊοΈ Roadmap -## Support - -For issues and questions: -1. Check the troubleshooting section -2. Review application logs -3. Open an issue on GitHub - -## Roadmap - -Planned features: - [ ] Custom rule template editor -- [ ] MITRE ATT&CK mapping -- [ ] Rule effectiveness scoring -- [ ] Export to SIEM platforms -- [ ] Advanced threat intelligence integration -- [ ] Machine learning-based rule optimization +- [ ] Advanced MITRE ATT&CK mapping +- [ ] SIEM platform export +- [ ] ML-based rule optimization +- [ ] Threat intelligence integration + +## πŸ“ License + +MIT License - see LICENSE file for details. + +## 🀝 Contributing + +1. Fork repository +2. Create feature branch +3. Add tests and documentation +4. Submit pull request + +## πŸ“ž Support + +- Check troubleshooting section +- Review application logs +- Open GitHub issue for bugs/questions \ No newline at end of file diff --git a/backend/cve2capec_client.py b/backend/cve2capec_client.py new file mode 100644 index 0000000..5848de7 --- /dev/null +++ b/backend/cve2capec_client.py @@ -0,0 +1,447 @@ +""" +CVE2CAPEC client for retrieving MITRE ATT&CK technique mappings. +Integrates with the CVE2CAPEC repository: https://github.com/Galeax/CVE2CAPEC +""" +import json +import logging +import requests +from typing import Dict, List, Optional +import time +from datetime import datetime, timedelta +import os + +logger = logging.getLogger(__name__) + +class CVE2CAPECClient: + """Client for accessing CVE to MITRE ATT&CK technique mappings.""" + + def __init__(self): + self.base_url = "https://raw.githubusercontent.com/Galeax/CVE2CAPEC/main" + self.cache_file = "/tmp/cve2capec_cache.json" + self.cache_expiry_hours = 24 # Cache for 24 hours + self.cve_mappings = {} + self.technique_names = {} # Map technique IDs to names + + # Load cached data if available + self._load_cache() + + # Load MITRE ATT&CK technique names + self._load_technique_names() + + def _load_cache(self): + """Load cached CVE mappings if they exist and are fresh.""" + try: + if os.path.exists(self.cache_file): + with open(self.cache_file, 'r') as f: + cache_data = json.load(f) + + # Check if cache is still fresh + cache_time = datetime.fromisoformat(cache_data.get('timestamp', '2000-01-01')) + if datetime.now() - cache_time < timedelta(hours=self.cache_expiry_hours): + self.cve_mappings = cache_data.get('mappings', {}) + logger.info(f"Loaded {len(self.cve_mappings)} CVE mappings from cache") + return + + # Cache is stale or doesn't exist, fetch fresh data + self._fetch_fresh_data() + + except Exception as e: + logger.error(f"Error loading CVE2CAPEC cache: {e}") + self._fetch_fresh_data() + + def _fetch_fresh_data(self): + """Fetch fresh CVE mappings from the repository.""" + try: + logger.info("Fetching fresh CVE2CAPEC data from all database files...") + + # Define year range to fetch (focusing on recent years first for better performance) + # Start with recent years that are most likely to be relevant + years_to_fetch = list(range(2018, 2026)) # 2018-2025 + + all_mappings = {} + + for year in years_to_fetch: + try: + url = f"{self.base_url}/database/CVE-{year}.jsonl" + logger.info(f"Fetching CVE mappings for year {year}...") + + response = requests.get(url, timeout=30) + response.raise_for_status() + + # Parse JSONL format + year_mappings = {} + for line in response.text.strip().split('\n'): + if line.strip(): + try: + data = json.loads(line) + year_mappings.update(data) + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse line in {year} data: {e}") + continue + + all_mappings.update(year_mappings) + logger.info(f"Loaded {len(year_mappings)} CVE mappings from {year}") + + # Add a small delay to be respectful to the server + time.sleep(0.5) + + except requests.RequestException as e: + logger.warning(f"Failed to fetch CVE-{year}.jsonl: {e}") + continue + except Exception as e: + logger.warning(f"Error processing CVE-{year}.jsonl: {e}") + continue + + # Also try to fetch the new_cves.jsonl for the latest data + try: + logger.info("Fetching latest CVE mappings from new_cves.jsonl...") + url = f"{self.base_url}/results/new_cves.jsonl" + response = requests.get(url, timeout=30) + response.raise_for_status() + + latest_mappings = {} + for line in response.text.strip().split('\n'): + if line.strip(): + try: + data = json.loads(line) + latest_mappings.update(data) + except json.JSONDecodeError: + continue + + all_mappings.update(latest_mappings) + logger.info(f"Added {len(latest_mappings)} latest CVE mappings") + + except Exception as e: + logger.warning(f"Failed to fetch new_cves.jsonl: {e}") + + self.cve_mappings = all_mappings + + # Save to cache + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'mappings': all_mappings, + 'years_fetched': years_to_fetch + } + + with open(self.cache_file, 'w') as f: + json.dump(cache_data, f) + + logger.info(f"Successfully fetched and cached {len(all_mappings)} total CVE mappings") + + except Exception as e: + logger.error(f"Error fetching CVE2CAPEC data: {e}") + # Continue with empty mappings if fetch fails + self.cve_mappings = {} + + def _load_technique_names(self): + """Load MITRE ATT&CK technique names for better rule descriptions.""" + # Common MITRE ATT&CK techniques and their names + self.technique_names = { + # Initial Access + "1189": "Drive-by Compromise", + "1190": "Exploit Public-Facing Application", + "1133": "External Remote Services", + "1200": "Hardware Additions", + "1566": "Phishing", + "1091": "Replication Through Removable Media", + "1195": "Supply Chain Compromise", + "1199": "Trusted Relationship", + "1078": "Valid Accounts", + + # Execution + "1059": "Command and Scripting Interpreter", + "1059.001": "PowerShell", + "1059.003": "Windows Command Shell", + "1059.005": "Visual Basic", + "1059.006": "Python", + "1203": "Exploitation for Client Execution", + "1559": "Inter-Process Communication", + "1106": "Execution through Module Load", + "1053": "Scheduled Task/Job", + "1129": "Shared Modules", + "1204": "User Execution", + "1047": "Windows Management Instrumentation", + + # Persistence + "1098": "Account Manipulation", + "1197": "BITS Jobs", + "1547": "Boot or Logon Autostart Execution", + "1037": "Boot or Logon Initialization Scripts", + "1176": "Browser Extensions", + "1554": "Compromise Client Software Binary", + "1136": "Create Account", + "1543": "Create or Modify System Process", + "1546": "Event Triggered Execution", + "1133": "External Remote Services", + "1574": "Hijack Execution Flow", + "1525": "Implant Internal Image", + "1556": "Modify Authentication Process", + "1137": "Office Application Startup", + "1542": "Pre-OS Boot", + "1053": "Scheduled Task/Job", + "1505": "Server Software Component", + "1205": "Traffic Signaling", + "1078": "Valid Accounts", + + # Privilege Escalation + "1548": "Abuse Elevation Control Mechanism", + "1134": "Access Token Manipulation", + "1547": "Boot or Logon Autostart Execution", + "1037": "Boot or Logon Initialization Scripts", + "1543": "Create or Modify System Process", + "1484": "Domain Policy Modification", + "1546": "Event Triggered Execution", + "1068": "Exploitation for Privilege Escalation", + "1574": "Hijack Execution Flow", + "1055": "Process Injection", + "1053": "Scheduled Task/Job", + "1078": "Valid Accounts", + + # Defense Evasion + "1548": "Abuse Elevation Control Mechanism", + "1134": "Access Token Manipulation", + "1197": "BITS Jobs", + "1610": "Deploy Container", + "1140": "Deobfuscate/Decode Files or Information", + "1006": "Direct Volume Access", + "1484": "Domain Policy Modification", + "1480": "Execution Guardrails", + "1211": "Exploitation for Defense Evasion", + "1222": "File and Directory Permissions Modification", + "1564": "Hide Artifacts", + "1574": "Hijack Execution Flow", + "1562": "Impair Defenses", + "1070": "Indicator Removal on Host", + "1202": "Indirect Command Execution", + "1036": "Masquerading", + "1556": "Modify Authentication Process", + "1112": "Modify Registry", + "1207": "Rogue Domain Controller", + "1014": "Rootkit", + "1218": "Signed Binary Proxy Execution", + "1216": "Signed Script Proxy Execution", + "1553": "Subvert Trust Controls", + "1221": "Template Injection", + "1205": "Traffic Signaling", + "1535": "Unused/Unsupported Cloud Regions", + "1078": "Valid Accounts", + "1497": "Virtualization/Sandbox Evasion", + "1220": "XSL Script Processing", + + # Credential Access + "1557": "Adversary-in-the-Middle", + "1110": "Brute Force", + "1555": "Credentials from Password Stores", + "1212": "Exploitation for Credential Access", + "1187": "Forced Authentication", + "1606": "Forge Web Credentials", + "1056": "Input Capture", + "1556": "Modify Authentication Process", + "1040": "Network Sniffing", + "1003": "OS Credential Dumping", + "1528": "Steal Application Access Token", + "1558": "Steal or Forge Kerberos Tickets", + "1111": "Two-Factor Authentication Interception", + "1552": "Unsecured Credentials", + + # Discovery + "1087": "Account Discovery", + "1010": "Application Window Discovery", + "1217": "Browser Bookmark Discovery", + "1580": "Cloud Infrastructure Discovery", + "1538": "Cloud Service Dashboard", + "1526": "Cloud Service Discovery", + "1613": "Container and Resource Discovery", + "1482": "Domain Trust Discovery", + "1083": "File and Directory Discovery", + "1615": "Group Policy Discovery", + "1046": "Network Service Scanning", + "1135": "Network Share Discovery", + "1201": "Password Policy Discovery", + "1069": "Permission Groups Discovery", + "1057": "Process Discovery", + "1012": "Query Registry", + "1018": "Remote System Discovery", + "1518": "Software Discovery", + "1082": "System Information Discovery", + "1614": "System Location Discovery", + "1016": "System Network Configuration Discovery", + "1049": "System Network Connections Discovery", + "1033": "System Owner/User Discovery", + "1007": "System Service Discovery", + "1124": "System Time Discovery", + "1497": "Virtualization/Sandbox Evasion", + + # Lateral Movement + "1210": "Exploitation of Remote Services", + "1534": "Internal Spearphishing", + "1570": "Lateral Tool Transfer", + "1021": "Remote Service Session Hijacking", + "1021.001": "RDP Hijacking", + "1021.002": "SSH Hijacking", + "1021.004": "Tty Shell Hijacking", + "1021.005": "VNC Hijacking", + "1080": "Taint Shared Content", + "1550": "Use Alternate Authentication Material", + + # Collection + "1557": "Adversary-in-the-Middle", + "1560": "Archive Collected Data", + "1123": "Audio Capture", + "1119": "Automated Collection", + "1185": "Browser Session Hijacking", + "1115": "Clipboard Data", + "1530": "Data from Cloud Storage Object", + "1602": "Data from Configuration Repository", + "1213": "Data from Information Repositories", + "1005": "Data from Local System", + "1039": "Data from Network Shared Drive", + "1025": "Data from Removable Media", + "1074": "Data Staged", + "1114": "Email Collection", + "1056": "Input Capture", + "1113": "Screen Capture", + "1125": "Video Capture", + + # Command and Control + "1071": "Application Layer Protocol", + "1092": "Communication Through Removable Media", + "1132": "Data Encoding", + "1001": "Data Obfuscation", + "1568": "Dynamic Resolution", + "1573": "Encrypted Channel", + "1008": "Fallback Channels", + "1105": "Ingress Tool Transfer", + "1104": "Multi-Stage Channels", + "1095": "Non-Application Layer Protocol", + "1571": "Non-Standard Port", + "1572": "Protocol Tunneling", + "1090": "Proxy", + "1219": "Remote Access Software", + "1102": "Web Service", + + # Exfiltration + "1020": "Automated Exfiltration", + "1030": "Data Transfer Size Limits", + "1048": "Exfiltration Over Alternative Protocol", + "1041": "Exfiltration Over C2 Channel", + "1011": "Exfiltration Over Other Network Medium", + "1052": "Exfiltration Over Physical Medium", + "1567": "Exfiltration Over Web Service", + "1029": "Scheduled Transfer", + "1537": "Transfer Data to Cloud Account", + + # Impact + "1531": "Account Access Removal", + "1485": "Data Destruction", + "1486": "Data Encrypted for Impact", + "1565": "Data Manipulation", + "1491": "Defacement", + "1561": "Disk Wipe", + "1499": "Endpoint Denial of Service", + "1495": "Firmware Corruption", + "1490": "Inhibit System Recovery", + "1498": "Network Denial of Service", + "1496": "Resource Hijacking", + "1489": "Service Stop", + "1529": "System Shutdown/Reboot" + } + + def get_mitre_techniques_for_cve(self, cve_id: str) -> List[str]: + """Get MITRE ATT&CK techniques for a given CVE ID.""" + try: + cve_data = self.cve_mappings.get(cve_id, {}) + techniques = cve_data.get('TECHNIQUES', []) + + # Convert technique IDs to T-prefixed format + formatted_techniques = [] + for tech in techniques: + if isinstance(tech, (int, str)): + formatted_techniques.append(f"T{tech}") + + return formatted_techniques + + except Exception as e: + logger.error(f"Error getting MITRE techniques for {cve_id}: {e}") + return [] + + def get_technique_name(self, technique_id: str) -> str: + """Get the name for a MITRE ATT&CK technique ID.""" + # Remove T prefix if present + clean_id = technique_id.replace('T', '') + return self.technique_names.get(clean_id, f"Technique {technique_id}") + + def get_cwe_for_cve(self, cve_id: str) -> List[str]: + """Get CWE codes for a given CVE ID.""" + try: + cve_data = self.cve_mappings.get(cve_id, {}) + cwes = cve_data.get('CWE', []) + + # Format CWE IDs + formatted_cwes = [] + for cwe in cwes: + if isinstance(cwe, (int, str)): + formatted_cwes.append(f"CWE-{cwe}") + + return formatted_cwes + + except Exception as e: + logger.error(f"Error getting CWEs for {cve_id}: {e}") + return [] + + def get_capec_for_cve(self, cve_id: str) -> List[str]: + """Get CAPEC codes for a given CVE ID.""" + try: + cve_data = self.cve_mappings.get(cve_id, {}) + capecs = cve_data.get('CAPEC', []) + + # Format CAPEC IDs + formatted_capecs = [] + for capec in capecs: + if isinstance(capec, (int, str)): + formatted_capecs.append(f"CAPEC-{capec}") + + return formatted_capecs + + except Exception as e: + logger.error(f"Error getting CAPECs for {cve_id}: {e}") + return [] + + def get_full_mapping_for_cve(self, cve_id: str) -> Dict: + """Get complete CVE mapping including CWE, CAPEC, and MITRE techniques.""" + try: + return { + 'cve_id': cve_id, + 'mitre_techniques': self.get_mitre_techniques_for_cve(cve_id), + 'cwe_codes': self.get_cwe_for_cve(cve_id), + 'capec_codes': self.get_capec_for_cve(cve_id), + 'has_mappings': bool(self.cve_mappings.get(cve_id, {})) + } + + except Exception as e: + logger.error(f"Error getting full mapping for {cve_id}: {e}") + return { + 'cve_id': cve_id, + 'mitre_techniques': [], + 'cwe_codes': [], + 'capec_codes': [], + 'has_mappings': False + } + + def get_stats(self) -> Dict: + """Get statistics about the CVE2CAPEC dataset.""" + total_cves = len(self.cve_mappings) + cves_with_techniques = len([cve for cve, data in self.cve_mappings.items() + if data.get('TECHNIQUES')]) + cves_with_cwe = len([cve for cve, data in self.cve_mappings.items() + if data.get('CWE')]) + cves_with_capec = len([cve for cve, data in self.cve_mappings.items() + if data.get('CAPEC')]) + + return { + 'total_cves': total_cves, + 'cves_with_mitre_techniques': cves_with_techniques, + 'cves_with_cwe': cves_with_cwe, + 'cves_with_capec': cves_with_capec, + 'coverage_percentage': (cves_with_techniques / total_cves * 100) if total_cves > 0 else 0 + } \ No newline at end of file diff --git a/backend/enhanced_sigma_generator.py b/backend/enhanced_sigma_generator.py index c67bdc0..cab9472 100644 --- a/backend/enhanced_sigma_generator.py +++ b/backend/enhanced_sigma_generator.py @@ -10,6 +10,7 @@ from typing import Dict, List, Optional, Tuple from sqlalchemy.orm import Session import re from llm_client import LLMClient +from cve2capec_client import CVE2CAPECClient # Configure logging logging.basicConfig(level=logging.INFO) @@ -21,6 +22,7 @@ class EnhancedSigmaGenerator: def __init__(self, db_session: Session, llm_provider: str = None, llm_model: str = None): self.db_session = db_session self.llm_client = LLMClient(provider=llm_provider, model=llm_model) + self.cve2capec_client = CVE2CAPECClient() async def generate_enhanced_rule(self, cve, use_llm: bool = True) -> dict: """Generate enhanced SIGMA rule for a CVE using PoC data""" @@ -141,8 +143,8 @@ class EnhancedSigmaGenerator: ) if rule_content: - # Validate the generated rule - if self.llm_client.validate_sigma_rule(rule_content): + # Validate the generated rule with CVE ID check + if self.llm_client.validate_sigma_rule(rule_content, cve.cve_id): logger.info(f"Successfully generated LLM-enhanced rule for {cve.cve_id}") return rule_content else: @@ -468,26 +470,46 @@ class EnhancedSigmaGenerator: return '\\n'.join(f" - {ref}" for ref in refs) def _generate_tags(self, cve, poc_data: list) -> str: - """Generate MITRE ATT&CK tags and other tags""" + """Generate MITRE ATT&CK tags and other tags using CVE2CAPEC mappings""" tags = [] # CVE tag tags.append(cve.cve_id.lower()) - # Add technique tags based on indicators - combined_indicators = self._combine_exploit_indicators(poc_data) + # Get MITRE ATT&CK techniques from CVE2CAPEC mapping + mitre_techniques = self.cve2capec_client.get_mitre_techniques_for_cve(cve.cve_id) - if combined_indicators.get('processes'): - tags.append('attack.t1059') # Command and Scripting Interpreter + if mitre_techniques: + logger.info(f"Found {len(mitre_techniques)} MITRE techniques for {cve.cve_id}: {mitre_techniques}") + # Add all mapped MITRE techniques + for technique in mitre_techniques: + # Convert to attack.t format (lowercase) + attack_tag = f"attack.{technique.lower()}" + if attack_tag not in tags: + tags.append(attack_tag) + else: + # Fallback to indicator-based technique detection + logger.info(f"No CVE2CAPEC mapping found for {cve.cve_id}, using indicator-based detection") + combined_indicators = self._combine_exploit_indicators(poc_data) + + if combined_indicators.get('processes'): + tags.append('attack.t1059') # Command and Scripting Interpreter + + if combined_indicators.get('network'): + tags.append('attack.t1071') # Application Layer Protocol + + if combined_indicators.get('files'): + tags.append('attack.t1105') # Ingress Tool Transfer + + if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])): + tags.append('attack.t1059.001') # PowerShell - if combined_indicators.get('network'): - tags.append('attack.t1071') # Application Layer Protocol - - if combined_indicators.get('files'): - tags.append('attack.t1105') # Ingress Tool Transfer - - if any('powershell' in p.lower() for p in combined_indicators.get('processes', [])): - tags.append('attack.t1059.001') # PowerShell + # Get CWE codes for additional context + cwe_codes = self.cve2capec_client.get_cwe_for_cve(cve.cve_id) + if cwe_codes: + # Add the primary CWE as a tag + primary_cwe = cwe_codes[0].lower().replace('-', '.') + tags.append(primary_cwe) # Add PoC quality tags if poc_data: diff --git a/backend/llm_client.py b/backend/llm_client.py index 959b2ac..f946ab1 100644 --- a/backend/llm_client.py +++ b/backend/llm_client.py @@ -12,6 +12,7 @@ from langchain_anthropic import ChatAnthropic from langchain_community.llms import Ollama from langchain_core.output_parsers import StrOutputParser import yaml +from cve2capec_client import CVE2CAPECClient logger = logging.getLogger(__name__) @@ -42,6 +43,7 @@ class LLMClient: self.model = model or self._get_default_model(self.provider) self.llm = None self.output_parser = StrOutputParser() + self.cve2capec_client = CVE2CAPECClient() self._initialize_llm() @@ -181,8 +183,11 @@ class LLMClient: "existing_rule": existing_rule or "None" } logger.info(f"Sending to LLM for {cve_id}: CVE={cve_id}, Description length={len(cve_description)}, PoC length={len(poc_content)}") + logger.info(f"CVE Description for {cve_id}: {cve_description[:200]}...") + logger.info(f"PoC Content sample for {cve_id}: {poc_content[:200]}...") # Generate the response + logger.info(f"Final prompt variables for {cve_id}: {list(input_data.keys())}") response = await chain.ainvoke(input_data) # Debug: Log raw LLM response @@ -194,6 +199,16 @@ class LLMClient: # Post-process to ensure clean YAML sigma_rule = self._post_process_sigma_rule(sigma_rule) + # Fix common YAML syntax errors + sigma_rule = self._fix_yaml_syntax_errors(sigma_rule) + + # CRITICAL: Validate and fix CVE ID hallucination + sigma_rule = self._fix_hallucinated_cve_id(sigma_rule, cve_id) + + # Additional fallback: If no CVE ID found, inject it into the rule + if not sigma_rule or 'CVE-' not in sigma_rule: + sigma_rule = self._inject_cve_id_into_rule(sigma_rule, cve_id) + # Debug: Log final processed rule logger.info(f"Final processed rule for {cve_id}: {sigma_rule[:200]}...") @@ -263,18 +278,24 @@ class LLMClient: - status: experimental - description: Specific description based on CVE and PoC analysis - author: 'AI Generated' -- date: Current date (2025/01/11) +- date: Current date (2025/01/14) - references: Include the EXACT CVE URL with the CVE ID provided by the user - tags: Relevant MITRE ATT&CK techniques based on PoC analysis - logsource: Appropriate category based on exploit type - detection: Specific indicators from PoC analysis (NOT generic examples) - condition: Logic connecting the detection selections -**CRITICAL RULES:** +**CRITICAL ANTI-HALLUCINATION RULES:** 1. You MUST use the EXACT CVE ID provided in the user input - NEVER generate a different CVE ID -2. Analyze the provided CVE and PoC content to create SPECIFIC detection patterns -3. DO NOT hallucinate or invent CVE IDs from your training data -4. Use the CVE ID exactly as provided in the title and references""" +2. NEVER use example CVE IDs like CVE-2022-1234, CVE-2023-5678, or CVE-2024-1234 +3. NEVER use placeholder CVE IDs from your training data +4. Analyze the provided CVE description and PoC content to create SPECIFIC detection patterns +5. DO NOT hallucinate or invent CVE IDs from your training data +6. Use the CVE ID exactly as provided in the title and references +7. Generate rules based ONLY on the provided CVE description and PoC code analysis +8. Do not reference vulnerabilities or techniques not present in the provided content +9. CVE-2022-1234 is a FORBIDDEN example CVE ID - NEVER use it +10. The user will provide the EXACT CVE ID to use - use that and ONLY that""" if existing_rule: user_template = """CVE ID: {cve_id} @@ -288,31 +309,67 @@ Existing SIGMA Rule: Enhance this rule with PoC insights. Output only valid SIGMA YAML starting with 'title:'.""" else: - user_template = """CREATE A SPECIFIC SIGMA RULE FOR THIS EXACT CVE: + # Get MITRE ATT&CK mappings for the CVE + mitre_mappings = self.cve2capec_client.get_full_mapping_for_cve(cve_id) + mitre_suggestions = "" + + if mitre_mappings['mitre_techniques']: + technique_details = [] + for tech in mitre_mappings['mitre_techniques']: + tech_name = self.cve2capec_client.get_technique_name(tech) + technique_details.append(f" - {tech}: {tech_name}") + + mitre_suggestions = f""" +**MITRE ATT&CK TECHNIQUE MAPPINGS FOR {cve_id}:** +{chr(10).join(technique_details)} -**MANDATORY CVE ID TO USE: {cve_id}** -**CVE Description: {cve_description}** +**IMPORTANT:** Use these exact MITRE ATT&CK techniques in your tags section. Convert them to lowercase attack.t format (e.g., T1059 becomes attack.t1059).""" + + if mitre_mappings['cwe_codes']: + mitre_suggestions += f""" + +**CWE MAPPINGS:** {', '.join(mitre_mappings['cwe_codes'])}""" + + user_template = f"""CREATE A SPECIFIC SIGMA RULE FOR THIS EXACT CVE: + +**MANDATORY CVE ID TO USE: {{cve_id}}** +**CVE Description: {{cve_description}}** **Proof-of-Concept Code Analysis:** -{poc_content} +{{poc_content}} + +{mitre_suggestions} **CRITICAL REQUIREMENTS:** -1. Use EXACTLY this CVE ID in the title: {cve_id} -2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{cve_id} +1. Use EXACTLY this CVE ID in the title: {{cve_id}} +2. Use EXACTLY this CVE URL in references: https://nvd.nist.gov/vuln/detail/{{cve_id}} 3. Analyze the CVE description to understand the vulnerability type 4. Extract specific indicators from the PoC code (files, processes, commands, network patterns) 5. Create detection logic based on the actual exploit behavior 6. Use relevant logsource category (process_creation, file_event, network_connection, etc.) -7. Include appropriate MITRE ATT&CK tags based on the exploit techniques +7. Include the MITRE ATT&CK tags listed above in your tags section (convert to attack.t format) -**IMPORTANT: You MUST use the exact CVE ID "{cve_id}" - do NOT generate a different CVE ID!** +**CRITICAL ANTI-HALLUCINATION REQUIREMENTS:** +- THE CVE ID IS: {{cve_id}} +- DO NOT use CVE-2022-1234, CVE-2023-1234, CVE-2024-1234, or any other example CVE ID +- DO NOT generate a different CVE ID from your training data +- You MUST use the exact CVE ID "{{cve_id}}" - this is the ONLY acceptable CVE ID for this rule +- Base your analysis ONLY on the provided CVE description and PoC code above +- Do not reference other vulnerabilities or exploits not mentioned in the provided content +- NEVER use placeholder CVE IDs like CVE-YYYY-NNNN or CVE-2022-1234 -Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE ID {cve_id}.""" +**ABSOLUTE REQUIREMENT: THE EXACT CVE ID TO USE IS: {{cve_id}}** +**FORBIDDEN: Do not use CVE-2022-1234, CVE-2023-5678, or any other example CVE ID** + +Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE ID {{cve_id}}.""" - return ChatPromptTemplate.from_messages([ - SystemMessage(content=system_message), - HumanMessage(content=user_template) + # Create the prompt template with proper variable definitions + prompt_template = ChatPromptTemplate.from_messages([ + ("system", system_message), + ("human", user_template) ]) + + return prompt_template def _extract_sigma_rule(self, response_text: str) -> str: """Extract and clean SIGMA rule YAML from LLM response.""" @@ -351,6 +408,10 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE if '{' in stripped and '}' in stripped: continue + # Skip lines that contain template placeholder text + if 'cve_id' in stripped.lower() or 'cve description' in stripped.lower(): + continue + # Skip lines that are clearly not YAML structure if stripped and not ':' in stripped and len(stripped) > 20: continue @@ -407,10 +468,17 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE ]): continue - # Skip template variables + # Skip template variables and placeholder text if '{' in stripped and '}' in stripped: continue + # Skip lines that contain template placeholder patterns + if any(placeholder in stripped.lower() for placeholder in [ + 'cve_id', 'cve description', 'poc_content', 'existing_rule', + '{cve_id}', '{cve_description}', '{poc_content}' + ]): + continue + # Skip lines that look like explanations if stripped and not ':' in stripped and not stripped.startswith('-') and not stripped.startswith(' '): # This might be explanatory text, skip it @@ -425,6 +493,520 @@ Output ONLY valid SIGMA YAML starting with 'title:' that includes the exact CVE return '\n'.join(cleaned_lines).strip() + def _fix_yaml_syntax_errors(self, rule_content: str) -> str: + """Fix common YAML syntax errors in LLM-generated rules.""" + import re + + if not rule_content: + return rule_content + + lines = rule_content.split('\n') + fixed_lines = [] + fixes_applied = [] + + for line in lines: + fixed_line = line + + # Fix invalid YAML alias syntax: - *image* -> - '*image*' + # YAML aliases must be alphanumeric, but LLM uses *word* or *multiple words* for wildcards + if '- *' in line and '*' in line: + # Match patterns like "- *image*" or "- *process*" or "- *unpatched system*" + pattern = r'(\s*-\s*)(\*[^*]+\*)' + if re.search(pattern, line): + fixed_line = re.sub(pattern, r"\1'\2'", line) + fixes_applied.append(f"Fixed invalid YAML alias syntax: {line.strip()} -> {fixed_line.strip()}") + + # Also fix similar patterns in values: key: *value* -> key: '*value*' + elif re.search(r':\s*\*[^*]+\*\s*$', line) and not re.search(r'[\'"]', line): + pattern = r'(:\s*)(\*[^*]+\*)' + fixed_line = re.sub(pattern, r"\1'\2'", line) + fixes_applied.append(f"Fixed invalid YAML alias in value: {line.strip()} -> {fixed_line.strip()}") + + # Fix unquoted strings that start with special characters + elif re.match(r'^\s*-\s*[*&|>]', line): + # If line starts with -, followed by special YAML chars, quote it + parts = line.split('-', 1) + if len(parts) == 2: + indent = parts[0] + content = parts[1].strip() + if content and not content.startswith(("'", '"')): + fixed_line = f"{indent}- '{content}'" + fixes_applied.append(f"Quoted special character value: {line.strip()} -> {fixed_line.strip()}") + + # Fix invalid boolean values + elif ': *' in line and not line.strip().startswith('#'): + # Replace ": *something*" with ": '*something*'" if not already quoted + pattern = r'(:\s*)(\*[^*]+\*)' + if not re.search(r'[\'"]', line) and re.search(pattern, line): + fixed_line = re.sub(pattern, r"\1'\2'", line) + fixes_applied.append(f"Fixed unquoted wildcard value: {line.strip()} -> {fixed_line.strip()}") + + # Fix missing quotes around values with special characters (but not YAML indicators) + elif re.search(r':\s*[*&]', line) and not re.search(r'[\'"]', line): + # Don't quote YAML multiline indicators (|, >) + if not re.search(r':\s*[|>]\s*$', line): + parts = line.split(':', 1) + if len(parts) == 2: + key = parts[0] + value = parts[1].strip() + if value and not value.startswith(("'", '"', '[', '{')): + fixed_line = f"{key}: '{value}'" + fixes_applied.append(f"Quoted special character value: {line.strip()} -> {fixed_line.strip()}") + + # Fix invalid array syntax + elif re.search(r'^\s*\*[^*]+\*\s*$', line): + # Standalone *word* or *multiple words* lines should be quoted + indent = len(line) - len(line.lstrip()) + content = line.strip() + fixed_line = f"{' ' * indent}'{content}'" + fixes_applied.append(f"Fixed standalone wildcard: {line.strip()} -> {fixed_line.strip()}") + + fixed_lines.append(fixed_line) + + result = '\n'.join(fixed_lines) + + # Additional fixes for common YAML issues + # Fix missing spaces after colons + colon_fix = re.sub(r':([^\s])', r': \1', result) + if colon_fix != result: + fixes_applied.append("Added missing spaces after colons") + result = colon_fix + + # Fix multiple spaces after colons + space_fix = re.sub(r':\s{2,}', ': ', result) + if space_fix != result: + fixes_applied.append("Fixed multiple spaces after colons") + result = space_fix + + # Fix incorrect reference format: references: - https://... -> references:\n - https://... + ref_fix = re.sub(r'references:\s*-\s*', 'references:\n - ', result) + if ref_fix != result: + fixes_applied.append("Fixed references array format") + result = ref_fix + + # Fix broken URLs in references (spaces in URLs) + url_fix = re.sub(r'https:\s*//nvd\.nist\.gov', 'https://nvd.nist.gov', result) + if url_fix != result: + fixes_applied.append("Fixed broken URLs in references") + result = url_fix + + # Fix incorrect logsource format: logsource: category: X -> logsource:\n category: X + logsource_fix = re.sub(r'logsource:\s*(category|product|service):\s*', r'logsource:\n \1: ', result) + if logsource_fix != result: + fixes_applied.append("Fixed logsource structure format") + result = logsource_fix + + # Fix incorrect detection format: detection: selection: key: value -> detection:\n selection:\n key: value + detection_fix = re.sub(r'detection:\s*(\w+):\s*(\w+):\s*', r'detection:\n \1:\n \2: ', result) + if detection_fix != result: + fixes_applied.append("Fixed detection structure format") + result = detection_fix + + # Fix detection lines with == operators: detection: selection1: image == *value* -> detection:\n selection1:\n image: '*value*' + # This handles compressed syntax with equality operators + # Make the pattern more flexible to catch various formats + detection_eq_patterns = [ + (r'detection:\s*(\w+):\s*(\w+)\s*==\s*(\*[^*\s]+\*)', r'detection:\n \1:\n \2: \'\3\''), + (r'detection:\s*(\w+):\s*(\w+)\s*==\s*([^\s]+)', r'detection:\n \1:\n \2: \'\3\''), + ] + + for pattern, replacement in detection_eq_patterns: + detection_eq_fix = re.sub(pattern, replacement, result) + if detection_eq_fix != result: + fixes_applied.append("Fixed detection equality operator syntax") + result = detection_eq_fix + break + + # Fix standalone equality operators in detection sections: key == *value* -> key: '*value*' + # Also handle lines with multiple keys/values separated by colons and == + lines = result.split('\n') + eq_fixed_lines = [] + for line in lines: + original_line = line + + # Look for pattern: whitespace + key == *value* or key == value + if ' == ' in line: + # Handle complex patterns like "detection: selection1: image == *value*" + if line.strip().startswith('detection:') and ' == ' in line: + # Split by colons to handle nested structure + parts = line.split(':') + if len(parts) >= 3: + # This looks like "detection: selection1: image == *value*" + base_indent = len(line) - len(line.lstrip()) + + # Extract the parts + detection_part = parts[0].strip() # "detection" + selection_part = parts[1].strip() # "selection1" + key_value_part = ':'.join(parts[2:]).strip() # "image == *value*" + + # Parse the key == value part + if ' == ' in key_value_part: + eq_parts = key_value_part.split(' == ', 1) + key = eq_parts[0].strip() + value = eq_parts[1].strip() + + # Quote the value if needed + if value.startswith('*') and value.endswith('*') and not value.startswith("'"): + value = f"'{value}'" + elif not value.startswith(("'", '"', '[', '{')): + value = f"'{value}'" + + # Reconstruct as proper YAML + eq_fixed_lines.append(f"{' ' * base_indent}detection:") + eq_fixed_lines.append(f"{' ' * (base_indent + 4)}{selection_part}:") + eq_fixed_lines.append(f"{' ' * (base_indent + 8)}{key}: {value}") + fixes_applied.append(f"Fixed complex detection equality: {selection_part}: {key} == {value}") + continue + + # Handle simpler patterns: " key == value" + elif re.match(r'^(\s+)(\w+)\s*==\s*(.+)$', line): + match = re.match(r'^(\s+)(\w+)\s*==\s*(.+)$', line) + indent = match.group(1) + key = match.group(2) + value = match.group(3).strip() + + # Ensure wildcards are quoted + if value.startswith('*') and value.endswith('*') and not value.startswith("'"): + value = f"'{value}'" + elif not value.startswith(("'", '"', '[', '{')): + value = f"'{value}'" + + eq_fixed_lines.append(f"{indent}{key}: {value}") + fixes_applied.append(f"Fixed equality operator: {key} == {value}") + continue + + eq_fixed_lines.append(original_line) + + if len(eq_fixed_lines) != len(lines): + result = '\n'.join(eq_fixed_lines) + + # Fix invalid array-as-value syntax: key: - value -> key:\n - value + # This handles cases like "CommandLine: - '*image*'" which should be "CommandLine:\n - '*image*'" + lines = result.split('\n') + fixed_lines = [] + for line in lines: + # Look for pattern: whitespace + key: - value + if re.match(r'^(\s+)(\w+):\s*-\s*(.+)$', line): + match = re.match(r'^(\s+)(\w+):\s*-\s*(.+)$', line) + indent = match.group(1) + key = match.group(2) + value = match.group(3) + # Convert to proper array format + fixed_lines.append(f"{indent}{key}:") + fixed_lines.append(f"{indent} - {value}") + fixes_applied.append(f"Fixed array-as-value syntax: {key}: - {value}") + else: + fixed_lines.append(line) + + if len(fixed_lines) != len(lines): + result = '\n'.join(fixed_lines) + + # Fix complex nested syntax errors like "selection1: Image: - '*path*': value" + # This should be "selection1:\n Image:\n - '*path*': value" + complex_fix = re.sub(r'^(\s+)(\w+):\s*(\w+):\s*-\s*(.+)$', + r'\1\2:\n\1 \3:\n\1 - \4', + result, flags=re.MULTILINE) + if complex_fix != result: + fixes_applied.append("Fixed complex nested structure syntax") + result = complex_fix + + # Fix incorrect tags format: tags: - T1059.001 -> tags:\n - T1059.001 + tags_fix = re.sub(r'tags:\s*-\s*', 'tags:\n - ', result) + if tags_fix != result: + fixes_applied.append("Fixed tags array format") + result = tags_fix + + # Fix other common single-line array formats + for field in ['falsepositives', 'level', 'related']: + field_pattern = f'{field}:\\s*-\\s*' + field_replacement = f'{field}:\n - ' + field_fix = re.sub(field_pattern, field_replacement, result) + if field_fix != result: + fixes_applied.append(f"Fixed {field} array format") + result = field_fix + + # Fix placeholder UUID if LLM used the example one + import uuid + placeholder_uuid = '12345678-1234-1234-1234-123456789012' + if placeholder_uuid in result: + new_uuid = str(uuid.uuid4()) + result = result.replace(placeholder_uuid, new_uuid) + fixes_applied.append(f"Replaced placeholder UUID with {new_uuid[:8]}...") + + # Fix orphaned list items (standalone lines starting with -) + lines = result.split('\n') + fixed_lines = [] + + for i, line in enumerate(lines): + stripped = line.strip() + + # Check for orphaned list items (lines starting with - but not part of an array) + if (stripped.startswith('- ') and + i > 0 and + not lines[i-1].strip().endswith(':') and + ':' not in stripped and + not stripped.startswith('- https://')): # Don't remove reference URLs + + # Check if this looks like a MITRE ATT&CK tag + if re.match(r'- T\d{4}', stripped): + # Try to find the tags section and add it there + tags_line_found = False + for j in range(len(fixed_lines)-1, -1, -1): + if fixed_lines[j].strip().startswith('tags:'): + # This is an orphaned tag, add it to the tags array + fixed_lines.append(f" {stripped}") + fixes_applied.append(f"Fixed orphaned MITRE tag: {stripped}") + tags_line_found = True + break + + if not tags_line_found: + # No tags section found, remove the orphaned item + fixes_applied.append(f"Removed orphaned tag (no tags section): {stripped}") + continue + else: + # Other orphaned list items, remove them + fixes_applied.append(f"Removed orphaned list item: {stripped}") + continue + + fixed_lines.append(line) + + result = '\n'.join(fixed_lines) + + # Final pass: Remove lines that are still malformed and would cause YAML parsing errors + lines = result.split('\n') + final_lines = [] + for line in lines: + stripped = line.strip() + + # Skip lines that have multiple colons in problematic patterns + if re.search(r':\s*\w+:\s*-\s*[\'"][^\'":]*[\'"]:\s*', line): + # This looks like "key: subkey: - 'value': more_stuff" which is malformed + fixes_applied.append(f"Removed malformed nested line: {stripped[:50]}...") + continue + + # Skip lines with invalid YAML mapping structures + if re.search(r'^\s*\w+:\s*\w+:\s*-\s*[\'"][^\'":]*[\'"]:\s*\w+', line): + fixes_applied.append(f"Removed invalid mapping structure: {stripped[:50]}...") + continue + + final_lines.append(line) + + if len(final_lines) != len(lines): + result = '\n'.join(final_lines) + + # Log if we made any fixes + if fixes_applied: + logger.info(f"Applied YAML syntax fixes: {', '.join(fixes_applied)}") + + # Final YAML structure validation and repair + result = self._validate_and_repair_yaml_structure(result, fixes_applied) + + return result + + def _validate_and_repair_yaml_structure(self, content: str, fixes_applied: list) -> str: + """Use YAML library to validate and repair structural issues.""" + try: + # First, try to parse the YAML to see if it's valid + yaml.safe_load(content) + # If we get here, the YAML is valid + return content + except yaml.YAMLError as e: + logger.warning(f"YAML structure validation failed: {e}") + + # Try to repair common structural issues + repaired_content = self._repair_yaml_structure(content, str(e)) + + # Test if the repair worked + try: + yaml.safe_load(repaired_content) + fixes_applied.append("Repaired YAML document structure") + logger.info("Successfully repaired YAML structure") + return repaired_content + except yaml.YAMLError as e2: + logger.warning(f"YAML repair attempt failed: {e2}") + + # Last resort: try to build a minimal valid SIGMA rule + return self._build_minimal_valid_rule(content, fixes_applied) + + def _repair_yaml_structure(self, content: str, error_msg: str) -> str: + """Attempt to repair common YAML structural issues.""" + lines = content.split('\n') + repaired_lines = [] + + # Track indentation levels to detect issues + expected_indent = 0 + in_detection = False + detection_indent = 0 + + for i, line in enumerate(lines): + stripped = line.strip() + current_indent = len(line) - len(line.lstrip()) + + # Skip empty lines + if not stripped: + repaired_lines.append(line) + continue + + # Track if we're in the detection section + if stripped.startswith('detection:'): + in_detection = True + detection_indent = current_indent + repaired_lines.append(line) + continue + elif in_detection and current_indent <= detection_indent and not stripped.startswith(('condition:', 'timeframe:')): + # We've left the detection section + in_detection = False + + # Fix indentation issues in detection section + if in_detection: + # Ensure proper indentation for detection subsections + if stripped.startswith(('selection', 'filter', 'condition')): + # This should be indented under detection + if current_indent <= detection_indent: + corrected_line = ' ' * (detection_indent + 4) + stripped + repaired_lines.append(corrected_line) + continue + elif current_indent > detection_indent + 4: + # This might be a detection field that needs proper indentation + if ':' in stripped and not stripped.startswith('-'): + # This looks like a field under a selection + if i > 0 and 'selection' in lines[i-1]: + corrected_line = ' ' * (detection_indent + 8) + stripped + repaired_lines.append(corrected_line) + continue + + # Fix lines that start with wrong indentation + if ':' in stripped and not stripped.startswith('-'): + # This is a key-value pair + key = stripped.split(':')[0].strip() + + # Top-level keys should not be indented + if key in ['title', 'id', 'status', 'description', 'author', 'date', 'references', 'tags', 'logsource', 'detection', 'falsepositives', 'level']: + if current_indent > 0: + corrected_line = stripped + repaired_lines.append(corrected_line) + continue + + repaired_lines.append(line) + + return '\n'.join(repaired_lines) + + def _build_minimal_valid_rule(self, content: str, fixes_applied: list) -> str: + """Build a minimal valid SIGMA rule from the content.""" + lines = content.split('\n') + + # Extract key components + title = "Unknown SIGMA Rule" + rule_id = "00000000-0000-0000-0000-000000000000" + description = "Generated SIGMA rule" + + for line in lines: + stripped = line.strip() + if stripped.startswith('title:'): + title = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('id:'): + rule_id = stripped.split(':', 1)[1].strip().strip('"\'') + elif stripped.startswith('description:'): + description = stripped.split(':', 1)[1].strip().strip('"\'') + + # Build minimal valid rule + minimal_rule = f"""title: '{title}' +id: {rule_id} +status: experimental +description: '{description}' +author: 'AI Generated' +date: 2025/01/14 +references: + - https://example.com +logsource: + category: process_creation +detection: + selection: + Image: '*' + condition: selection +level: medium""" + + fixes_applied.append("Built minimal valid SIGMA rule structure") + logger.warning("Generated minimal valid SIGMA rule as fallback") + return minimal_rule + + def _fix_hallucinated_cve_id(self, rule_content: str, correct_cve_id: str) -> str: + """Detect and fix hallucinated CVE IDs in the generated rule.""" + import re + + # Pattern to match CVE IDs (CVE-YYYY-NNNNN format) + cve_pattern = r'CVE-\d{4}-\d{4,7}' + + # Find all CVE IDs in the rule content + found_cves = re.findall(cve_pattern, rule_content, re.IGNORECASE) + + if found_cves: + # Check if any found CVE is different from the correct one + hallucinated_cves = [cve for cve in found_cves if cve.upper() != correct_cve_id.upper()] + + if hallucinated_cves: + logger.error(f"CRITICAL: LLM hallucinated CVE IDs: {hallucinated_cves}, expected: {correct_cve_id}") + logger.error(f"This indicates the LLM is not following the prompt correctly!") + + # Replace all hallucinated CVE IDs with the correct one + corrected_content = rule_content + for hallucinated_cve in set(hallucinated_cves): # Use set to avoid duplicates + corrected_content = re.sub( + re.escape(hallucinated_cve), + correct_cve_id, + corrected_content, + flags=re.IGNORECASE + ) + + logger.info(f"Successfully corrected hallucinated CVE IDs to {correct_cve_id}") + return corrected_content + else: + logger.info(f"CVE ID validation passed: found correct {correct_cve_id}") + else: + # No CVE ID found in rule - this might be an issue, but let's add it + logger.warning(f"No CVE ID found in generated rule for {correct_cve_id}, this might need manual review") + + return rule_content + + def _inject_cve_id_into_rule(self, rule_content: str, cve_id: str) -> str: + """Inject CVE ID into a rule that lacks it.""" + if not rule_content: + logger.warning(f"Empty rule content for {cve_id}, cannot inject CVE ID") + return rule_content + + lines = rule_content.split('\n') + modified_lines = [] + + for i, line in enumerate(lines): + stripped = line.strip() + + # Fix title line if it has placeholders + if stripped.startswith('title:'): + if '{cve_id}' in line.lower() or '{cve_description}' in line.lower(): + # Replace with a proper title + modified_lines.append(f"title: 'Detection of {cve_id} exploitation'") + elif cve_id not in line: + # Add CVE ID to existing title + title_text = line.split(':', 1)[1].strip(' \'"') + modified_lines.append(f"title: '{cve_id}: {title_text}'") + else: + modified_lines.append(line) + + # Fix references section if it has placeholders + elif stripped.startswith('- https://nvd.nist.gov/vuln/detail/') and '{cve_id}' in line: + modified_lines.append(f" - https://nvd.nist.gov/vuln/detail/{cve_id}") + + # Skip lines with template placeholders + elif any(placeholder in line.lower() for placeholder in ['{cve_id}', '{cve_description}', '{poc_content}']): + continue + + else: + modified_lines.append(line) + + result = '\n'.join(modified_lines) + logger.info(f"Injected CVE ID {cve_id} into rule") + return result + async def enhance_existing_rule(self, existing_rule: str, poc_content: str, @@ -488,7 +1070,7 @@ Output ONLY the enhanced SIGMA rule in valid YAML format.""" logger.error(f"Failed to enhance SIGMA rule for {cve_id}: {e}") return None - def validate_sigma_rule(self, rule_content: str) -> bool: + def validate_sigma_rule(self, rule_content: str, expected_cve_id: str = None) -> bool: """Validate that the generated rule follows SIGMA specification.""" try: # Parse as YAML @@ -542,11 +1124,33 @@ Output ONLY the enhanced SIGMA rule in valid YAML format.""" logger.warning(f"Invalid status: {status}") return False + # Additional validation: Check for correct CVE ID if provided + if expected_cve_id: + import re + cve_pattern = r'CVE-\d{4}-\d{4,7}' + found_cves = re.findall(cve_pattern, rule_content, re.IGNORECASE) + + if found_cves: + # Check if all found CVE IDs match the expected one + wrong_cves = [cve for cve in found_cves if cve.upper() != expected_cve_id.upper()] + if wrong_cves: + logger.warning(f"Rule contains wrong CVE IDs: {wrong_cves}, expected {expected_cve_id}") + return False + else: + logger.warning(f"Rule does not contain expected CVE ID: {expected_cve_id}") + # Don't fail validation for missing CVE ID, just warn + logger.info("SIGMA rule validation passed") return True except yaml.YAMLError as e: - logger.warning(f"YAML parsing error: {e}") + error_msg = str(e) + if "alias" in error_msg.lower() and "*" in error_msg: + logger.warning(f"YAML alias syntax error (likely unquoted wildcard): {e}") + elif "expected" in error_msg.lower(): + logger.warning(f"YAML structure error: {e}") + else: + logger.warning(f"YAML parsing error: {e}") return False except Exception as e: logger.warning(f"Rule validation error: {e}") diff --git a/backend/main.py b/backend/main.py index 0b498e6..78dae8c 100644 --- a/backend/main.py +++ b/backend/main.py @@ -22,6 +22,7 @@ import hashlib import logging import threading from mcdevitt_poc_client import GitHubPoCClient +from cve2capec_client import CVE2CAPECClient # Setup logging logging.basicConfig(level=logging.INFO) @@ -1849,6 +1850,23 @@ async def get_poc_stats(db: Session = Depends(get_db)): logger.error(f"Error getting PoC stats: {e}") return {"error": str(e)} +@app.get("/api/cve2capec-stats") +async def get_cve2capec_stats(): + """Get CVE2CAPEC MITRE ATT&CK mapping statistics""" + + try: + client = CVE2CAPECClient() + stats = client.get_stats() + + return { + "status": "success", + "data": stats, + "description": "CVE to MITRE ATT&CK technique mappings from CVE2CAPEC repository" + } + except Exception as e: + logger.error(f"Error getting CVE2CAPEC stats: {e}") + return {"error": str(e)} + @app.post("/api/regenerate-rules") async def regenerate_sigma_rules(background_tasks: BackgroundTasks, request: RuleRegenRequest,