auto_sigma_rule_generator/backend/exploitdb_client_local.py

725 lines
No EOL
29 KiB
Python

"""
ExploitDB Local Filesystem Integration Client
Interfaces with the local ExploitDB submodule at exploit-db-mirror/
"""
import os
import re
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from sqlalchemy.orm import Session
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ExploitDBLocalClient:
"""Client for interfacing with local ExploitDB mirror filesystem"""
def __init__(self, db_session: Session):
self.db_session = db_session
# Path to the local exploit-db-mirror submodule (in container: /app/exploit-db-mirror)
self.exploitdb_path = Path("/app/exploit-db-mirror")
self.exploits_path = self.exploitdb_path / "exploits"
# ExploitDB URL pattern for mapping
self.exploit_url_pattern = re.compile(r'https?://(?:www\.)?exploit-db\.com/exploits/(\d+)')
# Cache for file searches
self.file_cache = {}
# Build file index on initialization
self._build_file_index()
def _build_file_index(self):
"""Build an index of exploit ID to file path for fast lookups"""
logger.info("Building ExploitDB file index...")
self.file_index = {}
if not self.exploits_path.exists():
logger.error(f"ExploitDB path not found: {self.exploits_path}")
return
# Walk through all exploit files
for root, dirs, files in os.walk(self.exploits_path):
for file in files:
# Extract exploit ID from filename (e.g., "12345.py" -> "12345")
match = re.match(r'^(\d+)\.(\w+)$', file)
if match:
exploit_id = match.group(1)
file_extension = match.group(2)
file_path = Path(root) / file
# Store in index
self.file_index[exploit_id] = {
'path': file_path,
'filename': file,
'extension': file_extension,
'category': self._extract_category_from_path(file_path),
'subcategory': self._extract_subcategory_from_path(file_path)
}
logger.info(f"Built index with {len(self.file_index)} exploits")
def _extract_category_from_path(self, file_path: Path) -> str:
"""Extract category from file path (e.g., linux, windows, etc.)"""
parts = file_path.parts
exploits_index = None
for i, part in enumerate(parts):
if part == "exploits":
exploits_index = i
break
if exploits_index and exploits_index + 1 < len(parts):
return parts[exploits_index + 1]
return "unknown"
def _extract_subcategory_from_path(self, file_path: Path) -> str:
"""Extract subcategory from file path (e.g., local, remote, webapps, etc.)"""
parts = file_path.parts
exploits_index = None
for i, part in enumerate(parts):
if part == "exploits":
exploits_index = i
break
if exploits_index and exploits_index + 2 < len(parts):
return parts[exploits_index + 2]
return "unknown"
def extract_exploit_id_from_url(self, url: str) -> Optional[str]:
"""Extract exploit ID from ExploitDB URL"""
match = self.exploit_url_pattern.search(url)
if match:
return match.group(1)
return None
def get_exploit_details(self, exploit_id: str) -> Optional[dict]:
"""Get exploit details from local filesystem"""
if exploit_id not in self.file_index:
logger.debug(f"Exploit {exploit_id} not found in local index")
return None
file_info = self.file_index[exploit_id]
file_path = file_info['path']
try:
# Read file content
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Get file stats
stat = file_path.stat()
return {
'id': exploit_id,
'filename': file_info['filename'],
'path': str(file_path.relative_to(self.exploitdb_path)),
'full_path': str(file_path),
'category': file_info['category'],
'subcategory': file_info['subcategory'],
'extension': file_info['extension'],
'content': content,
'size': stat.st_size,
'modified_time': datetime.fromtimestamp(stat.st_mtime),
'local_url': f"file://{file_path}"
}
except Exception as e:
logger.error(f"Error reading exploit file {file_path}: {e}")
return None
def analyze_exploit_content(self, exploit_data: dict) -> dict:
"""Analyze exploit content to extract indicators"""
if not exploit_data or not exploit_data.get('content'):
return {}
content = exploit_data['content']
indicators = {
'processes': [],
'files': [],
'network': [],
'registry': [],
'commands': [],
'urls': [],
'techniques': [],
'languages': [],
'platforms': [],
'syscalls': [],
'functions': []
}
# Determine programming language from extension
extension = exploit_data.get('extension', '').lower()
language_map = {
'py': 'python',
'rb': 'ruby',
'pl': 'perl',
'c': 'c',
'cpp': 'cpp',
'cc': 'cpp',
'cxx': 'cpp',
'sh': 'bash',
'ps1': 'powershell',
'java': 'java',
'js': 'javascript',
'php': 'php',
'asp': 'asp',
'aspx': 'aspx',
'jsp': 'jsp',
'go': 'go',
'rs': 'rust',
'asm': 'assembly',
's': 'assembly',
'nasm': 'assembly'
}
if extension in language_map:
indicators['languages'].append(language_map[extension])
# Extract platform from path
category = exploit_data.get('category', '').lower()
if category in ['linux', 'windows', 'osx', 'macos', 'android', 'freebsd', 'solaris']:
indicators['platforms'].append(category)
# Extract indicators from content
content_lower = content.lower()
# Process patterns - enhanced for different languages
process_patterns = [
r'\b(cmd\.exe|powershell\.exe|bash|sh|python|ruby|perl|java)\b',
r'\b(system|exec|popen|subprocess|shell_exec|eval|execve|execl|execlp)\b',
r'\b(createprocess|shellexecute|winexec|createthread)\b',
r'\b(mshta|rundll32|regsvr32|wscript|cscript|certutil|bitsadmin)\b',
r'\b(/bin/sh|/bin/bash|/usr/bin/python|/usr/bin/perl)\b'
]
for pattern in process_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators['processes'].extend(matches)
# File patterns - enhanced
file_patterns = [
r'\b([a-zA-Z]:\\[^\\\s"\']+\\[^\\\s"\']+\.[a-zA-Z0-9]+)\b', # Windows paths
r'\b(/[^/\s"\']+/[^/\s"\']+\.[a-zA-Z0-9]+)\b', # Unix paths
r'\b(\w+\.(exe|dll|so|dylib|bat|ps1|py|sh|jar|war|php|jsp|asp|aspx|txt|log|conf))\b',
r'\b(/tmp/[^\s"\']+)\b', # Temp files
r'\b(/etc/[^\s"\']+)\b', # Config files
r'\b(/var/[^\s"\']+)\b', # Var files
r'\b(/proc/[^\s"\']+)\b' # Proc files
]
for pattern in file_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches and isinstance(matches[0], tuple):
indicators['files'].extend([m[0] for m in matches])
else:
indicators['files'].extend(matches)
# Network patterns - enhanced
network_patterns = [
r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', # IP addresses
r'\b(https?://[^\s<>"\']+)\b', # URLs
r'\b([a-zA-Z0-9-]+\.[a-zA-Z]{2,})\b', # Domain names
r'\b(bind|connect|listen|socket|recv|send|accept)\b', # Network functions
r'\b(AF_INET|SOCK_STREAM|SOCK_DGRAM)\b', # Socket constants
r'\b(nc|netcat|ncat|telnet|ssh|ftp|wget|curl)\b' # Network tools
]
for pattern in network_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if 'http' in pattern:
indicators['urls'].extend(matches)
else:
indicators['network'].extend(matches)
# Command patterns - enhanced
command_patterns = [
r'\b(curl|wget|nc|netcat|telnet|ssh|ftp)\b',
r'\b(whoami|id|uname|systeminfo|ipconfig|ifconfig|netstat|ps|top|lsof)\b',
r'\b(cat|type|dir|ls|find|grep|awk|sed|sort|uniq)\b',
r'\b(echo|printf|print)\b',
r'\b(base64|decode|encode|openssl|gpg)\b',
r'\b(sudo|su|chmod|chown|mount|umount)\b',
r'\b(service|systemctl|chkconfig|update-rc\.d)\b'
]
for pattern in command_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators['commands'].extend(matches)
# Registry patterns (Windows-specific)
registry_patterns = [
r'\b(HKEY_[A-Z_]+)\b',
r'\b(HKLM|HKCU|HKCR|HKU|HKCC)\b',
r'\b(reg\s+add|reg\s+query|reg\s+delete|regedit)\b',
r'\b(SOFTWARE\\\\[^\\\s"\']+)\b',
r'\b(SYSTEM\\\\[^\\\s"\']+)\b',
r'\b(CurrentVersion\\\\[^\\\s"\']+)\b'
]
for pattern in registry_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators['registry'].extend(matches)
# System call patterns (Linux/Unix)
syscall_patterns = [
r'\b(open|close|read|write|lseek|stat|fstat|lstat)\b',
r'\b(fork|vfork|clone|execve|wait|waitpid)\b',
r'\b(socket|bind|listen|accept|connect|send|recv)\b',
r'\b(mmap|munmap|mprotect|brk|sbrk)\b',
r'\b(ptrace|kill|signal|alarm)\b'
]
for pattern in syscall_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators['syscalls'].extend(matches)
# Function patterns
function_patterns = [
r'\b(main|printf|scanf|malloc|free|strcpy|strcat|strlen)\b',
r'\b(gets|puts|fgets|fputs|fopen|fclose|fread|fwrite)\b',
r'\b(sprintf|snprintf|memcpy|memset|strcmp|strncmp)\b'
]
for pattern in function_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
indicators['functions'].extend(matches)
# Clean up and deduplicate
for key in indicators:
# Remove empty strings and duplicates
indicators[key] = list(set([item.strip() for item in indicators[key] if item and len(item.strip()) > 1]))
# Limit to reasonable number of indicators
indicators[key] = indicators[key][:25]
return indicators
def calculate_exploit_quality_score(self, exploit_data: dict) -> dict:
"""Calculate quality score for an exploit"""
quality_score = 0
factors = {}
# File size factor (0-25 points)
file_size = exploit_data.get('size', 0)
if file_size > 0:
if file_size > 10000: # Large files (10KB+)
size_score = 25
elif file_size > 5000: # Medium files (5KB+)
size_score = 20
elif file_size > 1000: # Small files (1KB+)
size_score = 15
elif file_size > 500: # Very small files (500B+)
size_score = 10
else: # Tiny files
size_score = 5
quality_score += size_score
factors['size_score'] = size_score
# Content analysis factor (0-30 points)
content = exploit_data.get('content', '')
if content:
content_score = 0
lines = content.split('\n')
# Check for comments and documentation
comment_lines = 0
for line in lines:
stripped = line.strip()
if (stripped.startswith('#') or stripped.startswith('//') or
stripped.startswith('/*') or stripped.startswith('*') or
stripped.startswith('"""') or stripped.startswith("'''")):
comment_lines += 1
comment_score = min(comment_lines, 10) # Up to 10 points
content_score += comment_score
# Check for function definitions
function_patterns = [
r'\bdef\s+\w+', # Python
r'\bfunction\s+\w+', # JavaScript
r'\bvoid\s+\w+', # C/C++
r'\bint\s+\w+', # C/C++
r'\bchar\s+\w+', # C/C++
r'\bsub\s+\w+', # Perl
r'^\w+\s*\(', # Generic function calls
]
function_count = 0
for pattern in function_patterns:
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
function_count += len(matches)
function_score = min(function_count, 15) # Up to 15 points
content_score += function_score
# Check for include/import statements
include_patterns = [
r'#include\s*[<"]', # C/C++
r'import\s+\w+', # Python/Java
r'require\s+\w+', # Ruby/Perl
r'use\s+\w+', # Perl
]
include_count = 0
for pattern in include_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
include_count += len(matches)
include_score = min(include_count, 5) # Up to 5 points
content_score += include_score
quality_score += content_score
factors['content_score'] = content_score
# Platform/category factor (0-20 points)
category = exploit_data.get('category', '').lower()
subcategory = exploit_data.get('subcategory', '').lower()
platform_score = 0
if category in ['linux', 'windows', 'osx', 'macos']:
platform_score += 10
elif category in ['android', 'freebsd', 'solaris']:
platform_score += 8
elif category in ['multiple', 'unix']:
platform_score += 6
if subcategory in ['local', 'remote']:
platform_score += 10
elif subcategory in ['webapps', 'dos']:
platform_score += 8
elif subcategory in ['shellcode']:
platform_score += 6
quality_score += platform_score
factors['platform_score'] = platform_score
# Language factor (0-15 points)
extension = exploit_data.get('extension', '').lower()
lang_score = 0
if extension in ['c', 'cpp', 'cc', 'cxx']: # Compiled languages
lang_score = 15
elif extension in ['py', 'rb', 'pl', 'java']: # High-level languages
lang_score = 12
elif extension in ['sh', 'ps1', 'bat']: # Scripting languages
lang_score = 8
elif extension in ['asm', 's', 'nasm']: # Assembly
lang_score = 10
elif extension in ['php', 'asp', 'aspx', 'jsp']: # Web languages
lang_score = 6
elif extension in ['txt', 'html']: # Text/docs
lang_score = 3
quality_score += lang_score
factors['language_score'] = lang_score
# File age factor (0-10 points) - newer exploits might be more relevant
modified_time = exploit_data.get('modified_time')
if modified_time:
days_old = (datetime.now() - modified_time).days
if days_old < 365: # Less than 1 year
age_score = 10
elif days_old < 365 * 3: # Less than 3 years
age_score = 8
elif days_old < 365 * 5: # Less than 5 years
age_score = 6
elif days_old < 365 * 10: # Less than 10 years
age_score = 4
else: # Very old
age_score = 2
quality_score += age_score
factors['age_score'] = age_score
# Determine quality tier
quality_tier = self._get_exploit_quality_tier(quality_score)
return {
'quality_score': quality_score,
'factors': factors,
'quality_tier': quality_tier
}
def _get_exploit_quality_tier(self, score: int) -> str:
"""Get quality tier based on score"""
if score >= 80:
return 'excellent'
elif score >= 65:
return 'good'
elif score >= 45:
return 'fair'
elif score >= 25:
return 'poor'
else:
return 'very_poor'
async def sync_cve_exploits(self, cve_id: str) -> dict:
"""Synchronize ExploitDB data for a specific CVE using local filesystem"""
from main import CVE, SigmaRule
# Get existing CVE
cve = self.db_session.query(CVE).filter(CVE.cve_id == cve_id).first()
if not cve:
logger.warning(f"CVE {cve_id} not found in database")
return {"error": "CVE not found"}
# Extract ExploitDB URLs from reference URLs
exploit_urls = []
if cve.reference_urls:
for url in cve.reference_urls:
exploit_id = self.extract_exploit_id_from_url(url)
if exploit_id:
exploit_urls.append((url, exploit_id))
if not exploit_urls:
logger.info(f"No ExploitDB URLs found for {cve_id}")
return {"cve_id": cve_id, "exploits_found": 0}
# Fetch exploit details from local filesystem
exploit_data = []
total_quality_score = 0
for url, exploit_id in exploit_urls:
try:
details = self.get_exploit_details(exploit_id)
if details:
# Analyze exploit content
indicators = self.analyze_exploit_content(details)
quality_analysis = self.calculate_exploit_quality_score(details)
exploit_entry = {
'id': exploit_id,
'url': url,
'filename': details.get('filename'),
'path': details.get('path'),
'category': details.get('category'),
'subcategory': details.get('subcategory'),
'extension': details.get('extension'),
'size': details.get('size'),
'modified_time': details.get('modified_time').isoformat() if details.get('modified_time') else None,
'local_url': details.get('local_url'),
'indicators': indicators,
'quality_analysis': quality_analysis
}
exploit_data.append(exploit_entry)
total_quality_score += quality_analysis['quality_score']
logger.info(f"Successfully processed exploit {exploit_id} from local filesystem")
except Exception as e:
logger.error(f"Error processing exploit {exploit_id}: {e}")
# Update CVE with ExploitDB data
if exploit_data:
# Store in existing poc_data field
if not cve.poc_data:
cve.poc_data = {}
cve.poc_data['exploitdb'] = {
'exploits': exploit_data,
'total_exploits': len(exploit_data),
'average_quality': total_quality_score // len(exploit_data) if exploit_data else 0,
'synced_at': datetime.utcnow().isoformat(),
'source': 'local_filesystem'
}
cve.updated_at = datetime.utcnow()
# Update SIGMA rule with ExploitDB data
sigma_rule = self.db_session.query(SigmaRule).filter(
SigmaRule.cve_id == cve_id
).first()
if sigma_rule:
# Combine indicators from all exploits
combined_indicators = {}
for exploit in exploit_data:
for key, values in exploit['indicators'].items():
if key not in combined_indicators:
combined_indicators[key] = []
combined_indicators[key].extend(values)
# Deduplicate
for key in combined_indicators:
combined_indicators[key] = list(set(combined_indicators[key]))
# Update rule with ExploitDB data
if not sigma_rule.nomi_sec_data:
sigma_rule.nomi_sec_data = {}
sigma_rule.nomi_sec_data['exploitdb'] = {
'total_exploits': len(exploit_data),
'average_quality': total_quality_score // len(exploit_data) if exploit_data else 0,
'best_exploit': max(exploit_data, key=lambda x: x['quality_analysis']['quality_score']) if exploit_data else None,
'indicators': combined_indicators,
'source': 'local_filesystem'
}
# Update exploit indicators
existing_indicators = json.loads(sigma_rule.exploit_indicators) if sigma_rule.exploit_indicators else {}
for key, values in combined_indicators.items():
if key not in existing_indicators:
existing_indicators[key] = []
existing_indicators[key].extend(values)
existing_indicators[key] = list(set(existing_indicators[key]))
sigma_rule.exploit_indicators = json.dumps(existing_indicators)
sigma_rule.updated_at = datetime.utcnow()
self.db_session.commit()
logger.info(f"Synchronized {len(exploit_data)} ExploitDB exploits for {cve_id} from local filesystem")
return {
"cve_id": cve_id,
"exploits_found": len(exploit_data),
"total_quality_score": total_quality_score,
"average_quality": total_quality_score // len(exploit_data) if exploit_data else 0,
"exploit_urls": [e['url'] for e in exploit_data],
"source": "local_filesystem"
}
async def bulk_sync_exploitdb(self, batch_size: int = 50, cancellation_flag: Optional[callable] = None) -> dict:
"""Synchronize ExploitDB data for all CVEs with ExploitDB references using local filesystem"""
from main import CVE, BulkProcessingJob
from sqlalchemy import text
# Create bulk processing job
job = BulkProcessingJob(
job_type='exploitdb_sync_local',
status='running',
started_at=datetime.utcnow(),
job_metadata={'batch_size': batch_size, 'source': 'local_filesystem'}
)
self.db_session.add(job)
self.db_session.commit()
total_processed = 0
total_found = 0
results = []
try:
# Get all CVEs with ExploitDB references using text search
cves = self.db_session.query(CVE).filter(
text("reference_urls::text LIKE '%exploit-db%'")
).all()
job.total_items = len(cves)
self.db_session.commit()
logger.info(f"Found {len(cves)} CVEs with ExploitDB references for local sync")
# Process in batches
for i in range(0, len(cves), batch_size):
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("ExploitDB local sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
batch = cves[i:i + batch_size]
for cve in batch:
# Check for cancellation
if cancellation_flag and cancellation_flag():
logger.info("ExploitDB local sync cancelled by user")
job.status = 'cancelled'
job.cancelled_at = datetime.utcnow()
job.error_message = "Job cancelled by user"
break
try:
result = await self.sync_cve_exploits(cve.cve_id)
total_processed += 1
if result.get("exploits_found", 0) > 0:
total_found += result["exploits_found"]
results.append(result)
job.processed_items += 1
# Very small delay for responsiveness
# No need for long delays with local filesystem
except Exception as e:
logger.error(f"Error syncing ExploitDB for {cve.cve_id}: {e}")
job.failed_items += 1
# Break out of outer loop if cancelled
if job.status == 'cancelled':
break
# Commit after each batch
self.db_session.commit()
logger.info(f"Processed ExploitDB local batch {i//batch_size + 1}/{(len(cves) + batch_size - 1)//batch_size}")
# Update job status
if job.status != 'cancelled':
job.status = 'completed'
job.completed_at = datetime.utcnow()
job.job_metadata.update({
'total_processed': total_processed,
'total_exploits_found': total_found,
'cves_with_exploits': len(results),
'source': 'local_filesystem'
})
except Exception as e:
job.status = 'failed'
job.error_message = str(e)
job.completed_at = datetime.utcnow()
logger.error(f"Bulk ExploitDB local sync job failed: {e}")
finally:
self.db_session.commit()
return {
'job_id': str(job.id),
'status': job.status,
'total_processed': total_processed,
'total_exploits_found': total_found,
'cves_with_exploits': len(results),
'source': 'local_filesystem'
}
async def get_exploitdb_sync_status(self) -> dict:
"""Get ExploitDB synchronization status for local filesystem"""
from main import CVE
from sqlalchemy import text
# Count CVEs with ExploitDB references
total_cves = self.db_session.query(CVE).count()
# Count CVEs with ExploitDB data
result = self.db_session.execute(
text("SELECT COUNT(*) FROM cves WHERE poc_data::text LIKE '%\"exploitdb\"%'")
)
cves_with_exploitdb = result.scalar()
# Count CVEs with ExploitDB URLs in references
result2 = self.db_session.execute(
text("SELECT COUNT(*) FROM cves WHERE reference_urls::text LIKE '%exploit-db%'")
)
cves_with_exploitdb_refs = result2.scalar()
return {
'total_cves': total_cves,
'cves_with_exploitdb_refs': cves_with_exploitdb_refs,
'cves_with_exploitdb_data': cves_with_exploitdb,
'exploitdb_coverage': (cves_with_exploitdb / cves_with_exploitdb_refs * 100) if cves_with_exploitdb_refs > 0 else 0,
'exploitdb_sync_status': 'active' if cves_with_exploitdb > 0 else 'pending',
'exploitdb_local_index_size': len(self.file_index),
'source': 'local_filesystem'
}