🎉 **Architecture Transformation (v2.0)** - Complete migration from web app to professional CLI tool - File-based SIGMA rule management system - Git-friendly directory structure organized by year/CVE-ID - Multiple rule variants per CVE (template, LLM, hybrid) ✨ **New CLI System** - Professional command-line interface with Click framework - 8 command groups: process, generate, search, stats, export, migrate - Modular command architecture for maintainability - Comprehensive help system and configuration management 📁 **File-Based Storage Architecture** - Individual CVE directories: cves/YEAR/CVE-ID/ - Multiple SIGMA rule variants per CVE - JSON metadata with processing history and PoC data - Native YAML files perfect for version control 🚀 **Core CLI Commands** - process: CVE processing and bulk operations - generate: SIGMA rule generation with multiple methods - search: Advanced CVE and rule searching with filters - stats: Comprehensive statistics and analytics - export: Multiple output formats for different workflows - migrate: Database-to-file migration tools 🔧 **Migration Support** - Complete migration utilities from web database - Data validation and integrity checking - Backward compatibility with existing processors - Legacy web interface maintained for transition 📊 **Enhanced Features** - Advanced search with complex filtering (severity, PoC presence, etc.) - Multi-format exports (YAML, JSON, CSV) - Comprehensive statistics and coverage reports - File-based rule versioning and management 🎯 **Production Benefits** - No database dependency - runs anywhere - Perfect for cybersecurity teams using git workflows - Direct integration with SIGMA ecosystems - Portable architecture for CI/CD pipelines - Multiple rule variants for different detection scenarios 📝 **Documentation Updates** - Complete README rewrite for CLI-first approach - Updated CLAUDE.md with new architecture details - Detailed CLI documentation with examples - Migration guides and troubleshooting **Perfect for security teams wanting production-ready SIGMA rules with version control\! 🛡️** 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
194 lines
No EOL
7.6 KiB
Python
194 lines
No EOL
7.6 KiB
Python
"""
|
|
Search Commands
|
|
|
|
Commands for searching CVEs and SIGMA rules in the file-based system.
|
|
"""
|
|
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple
|
|
from .base_command import BaseCommand
|
|
|
|
class SearchCommands(BaseCommand):
|
|
"""Commands for searching CVEs and rules"""
|
|
|
|
async def search_cves(self, pattern: str, year: Optional[int], severity: Optional[str],
|
|
has_poc: bool, has_rules: bool, limit: int):
|
|
"""Search for CVEs by pattern"""
|
|
self.info(f"Searching CVEs with pattern: '{pattern}'")
|
|
|
|
if year:
|
|
self.info(f"Filtering by year: {year}")
|
|
if severity:
|
|
self.info(f"Filtering by severity: {severity}")
|
|
if has_poc:
|
|
self.info("Only showing CVEs with PoC data")
|
|
if has_rules:
|
|
self.info("Only showing CVEs with generated rules")
|
|
|
|
# Get CVEs to search
|
|
cves_to_search = self.get_all_cves(year)
|
|
|
|
if not cves_to_search:
|
|
self.warning("No CVEs found to search")
|
|
return
|
|
|
|
matches = []
|
|
pattern_regex = re.compile(pattern, re.IGNORECASE)
|
|
|
|
for cve_id in cves_to_search:
|
|
try:
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if not metadata:
|
|
continue
|
|
|
|
cve_info = metadata.get('cve_info', {})
|
|
poc_data = metadata.get('poc_data', {})
|
|
|
|
# Apply filters
|
|
if severity and cve_info.get('severity', '').lower() != severity.lower():
|
|
continue
|
|
|
|
if has_poc and poc_data.get('poc_count', 0) == 0:
|
|
continue
|
|
|
|
if has_rules:
|
|
rules = self.list_cve_rules(cve_id)
|
|
if not rules:
|
|
continue
|
|
|
|
# Check pattern match
|
|
match_found = False
|
|
|
|
# Search in CVE ID
|
|
if pattern_regex.search(cve_id):
|
|
match_found = True
|
|
|
|
# Search in description
|
|
description = cve_info.get('description', '')
|
|
if description and pattern_regex.search(description):
|
|
match_found = True
|
|
|
|
# Search in affected products
|
|
products = cve_info.get('affected_products', [])
|
|
for product in products:
|
|
if pattern_regex.search(product):
|
|
match_found = True
|
|
break
|
|
|
|
if match_found:
|
|
rule_count = len(self.list_cve_rules(cve_id))
|
|
matches.append({
|
|
'cve_id': cve_id,
|
|
'severity': cve_info.get('severity', 'Unknown'),
|
|
'cvss_score': cve_info.get('cvss_score', 'N/A'),
|
|
'poc_count': poc_data.get('poc_count', 0),
|
|
'rule_count': rule_count,
|
|
'description': (description[:100] + '...') if len(description) > 100 else description
|
|
})
|
|
|
|
if len(matches) >= limit:
|
|
break
|
|
|
|
except Exception as e:
|
|
self.error(f"Error searching {cve_id}: {e}")
|
|
|
|
# Display results
|
|
if matches:
|
|
headers = ["CVE ID", "Severity", "CVSS", "PoCs", "Rules", "Description"]
|
|
rows = []
|
|
|
|
for match in matches:
|
|
rows.append([
|
|
match['cve_id'],
|
|
match['severity'],
|
|
str(match['cvss_score']),
|
|
str(match['poc_count']),
|
|
str(match['rule_count']),
|
|
match['description']
|
|
])
|
|
|
|
self.print_table(headers, rows, f"CVE Search Results ({len(matches)} matches)")
|
|
else:
|
|
self.warning("No matching CVEs found")
|
|
|
|
async def search_rules(self, pattern: str, rule_type: Optional[str], method: Optional[str], limit: int):
|
|
"""Search for SIGMA rules by pattern"""
|
|
self.info(f"Searching SIGMA rules with pattern: '{pattern}'")
|
|
|
|
if rule_type:
|
|
self.info(f"Filtering by rule type: {rule_type}")
|
|
if method:
|
|
self.info(f"Filtering by generation method: {method}")
|
|
|
|
matches = []
|
|
pattern_regex = re.compile(pattern, re.IGNORECASE)
|
|
|
|
# Search through all CVEs and their rules
|
|
all_cves = self.get_all_cves()
|
|
|
|
for cve_id in all_cves:
|
|
try:
|
|
rules = self.list_cve_rules(cve_id)
|
|
|
|
for rule_file in rules:
|
|
# Apply method filter
|
|
if method:
|
|
rule_method = rule_file.replace('rule_', '').replace('.sigma', '')
|
|
if method.lower() not in rule_method.lower():
|
|
continue
|
|
|
|
# Load and search rule content
|
|
rule_content = self.load_sigma_rule(cve_id, rule_file)
|
|
if not rule_content:
|
|
continue
|
|
|
|
# Apply rule type filter (search in logsource)
|
|
if rule_type:
|
|
if f'category: {rule_type}' not in rule_content.lower() and \
|
|
f'product: {rule_type}' not in rule_content.lower():
|
|
continue
|
|
|
|
# Check pattern match in rule content
|
|
if pattern_regex.search(rule_content):
|
|
# Extract rule title
|
|
title_match = re.search(r'^title:\s*(.+)$', rule_content, re.MULTILINE)
|
|
title = title_match.group(1) if title_match else 'Unknown'
|
|
|
|
# Extract detection type from logsource
|
|
logsource_match = re.search(r'category:\s*(\w+)', rule_content)
|
|
detection_type = logsource_match.group(1) if logsource_match else 'Unknown'
|
|
|
|
matches.append({
|
|
'cve_id': cve_id,
|
|
'rule_file': rule_file,
|
|
'title': title,
|
|
'detection_type': detection_type,
|
|
'method': rule_file.replace('rule_', '').replace('.sigma', '')
|
|
})
|
|
|
|
if len(matches) >= limit:
|
|
break
|
|
|
|
if len(matches) >= limit:
|
|
break
|
|
|
|
except Exception as e:
|
|
self.error(f"Error searching rules for {cve_id}: {e}")
|
|
|
|
# Display results
|
|
if matches:
|
|
headers = ["CVE ID", "Rule File", "Title", "Type", "Method"]
|
|
rows = []
|
|
|
|
for match in matches:
|
|
rows.append([
|
|
match['cve_id'],
|
|
match['rule_file'],
|
|
match['title'][:50] + '...' if len(match['title']) > 50 else match['title'],
|
|
match['detection_type'],
|
|
match['method']
|
|
])
|
|
|
|
self.print_table(headers, rows, f"SIGMA Rule Search Results ({len(matches)} matches)")
|
|
else:
|
|
self.warning("No matching rules found") |