🎉 **Architecture Transformation (v2.0)** - Complete migration from web app to professional CLI tool - File-based SIGMA rule management system - Git-friendly directory structure organized by year/CVE-ID - Multiple rule variants per CVE (template, LLM, hybrid) ✨ **New CLI System** - Professional command-line interface with Click framework - 8 command groups: process, generate, search, stats, export, migrate - Modular command architecture for maintainability - Comprehensive help system and configuration management 📁 **File-Based Storage Architecture** - Individual CVE directories: cves/YEAR/CVE-ID/ - Multiple SIGMA rule variants per CVE - JSON metadata with processing history and PoC data - Native YAML files perfect for version control 🚀 **Core CLI Commands** - process: CVE processing and bulk operations - generate: SIGMA rule generation with multiple methods - search: Advanced CVE and rule searching with filters - stats: Comprehensive statistics and analytics - export: Multiple output formats for different workflows - migrate: Database-to-file migration tools 🔧 **Migration Support** - Complete migration utilities from web database - Data validation and integrity checking - Backward compatibility with existing processors - Legacy web interface maintained for transition 📊 **Enhanced Features** - Advanced search with complex filtering (severity, PoC presence, etc.) - Multi-format exports (YAML, JSON, CSV) - Comprehensive statistics and coverage reports - File-based rule versioning and management 🎯 **Production Benefits** - No database dependency - runs anywhere - Perfect for cybersecurity teams using git workflows - Direct integration with SIGMA ecosystems - Portable architecture for CI/CD pipelines - Multiple rule variants for different detection scenarios 📝 **Documentation Updates** - Complete README rewrite for CLI-first approach - Updated CLAUDE.md with new architecture details - Detailed CLI documentation with examples - Migration guides and troubleshooting **Perfect for security teams wanting production-ready SIGMA rules with version control\! 🛡️** 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
282 lines
No EOL
11 KiB
Python
282 lines
No EOL
11 KiB
Python
"""
|
|
Export Commands
|
|
|
|
Commands for exporting SIGMA rules and CVE data in various formats.
|
|
"""
|
|
|
|
import json
|
|
import csv
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
from .base_command import BaseCommand
|
|
|
|
class ExportCommands(BaseCommand):
|
|
"""Commands for exporting data"""
|
|
|
|
async def export_sigma_rules(self, output_dir: str, year: Optional[int],
|
|
format_type: str, method: Optional[str]):
|
|
"""Export SIGMA rules to a directory"""
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.info(f"Exporting SIGMA rules to: {output_path}")
|
|
self.info(f"Format: {format_type}")
|
|
|
|
if year:
|
|
self.info(f"Filtering by year: {year}")
|
|
if method:
|
|
self.info(f"Filtering by method: {method}")
|
|
|
|
# Get CVEs to export
|
|
cves = self.get_all_cves(year)
|
|
if not cves:
|
|
self.warning("No CVEs found to export")
|
|
return
|
|
|
|
exported_count = 0
|
|
skipped_count = 0
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
rules = self.list_cve_rules(cve_id)
|
|
|
|
if method:
|
|
# Filter rules by method
|
|
rules = [r for r in rules if method.lower() in r.lower()]
|
|
|
|
if not rules:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Create CVE directory in export location
|
|
cve_export_dir = output_path / cve_id
|
|
cve_export_dir.mkdir(exist_ok=True)
|
|
|
|
for rule_file in rules:
|
|
rule_content = self.load_sigma_rule(cve_id, rule_file)
|
|
if not rule_content:
|
|
continue
|
|
|
|
if format_type == 'yaml':
|
|
# Export as YAML (original format)
|
|
export_file = cve_export_dir / rule_file
|
|
with open(export_file, 'w') as f:
|
|
f.write(rule_content)
|
|
|
|
elif format_type == 'json':
|
|
# Convert YAML to JSON (basic conversion)
|
|
try:
|
|
import yaml
|
|
rule_dict = yaml.safe_load(rule_content)
|
|
export_file = cve_export_dir / rule_file.replace('.sigma', '.json')
|
|
with open(export_file, 'w') as f:
|
|
json.dump(rule_dict, f, indent=2)
|
|
except Exception as e:
|
|
self.error(f"Error converting {rule_file} to JSON: {e}")
|
|
continue
|
|
|
|
exported_count += 1
|
|
|
|
# Export metadata for context
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if metadata:
|
|
metadata_file = cve_export_dir / "metadata.json"
|
|
with open(metadata_file, 'w') as f:
|
|
json.dump(metadata, f, indent=2, default=str)
|
|
|
|
if exported_count % 50 == 0:
|
|
self.info(f"Exported {exported_count} rules...")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error exporting rules for {cve_id}: {e}")
|
|
skipped_count += 1
|
|
|
|
self.success(f"Export completed!")
|
|
self.success(f"Exported {exported_count} rules from {len(cves) - skipped_count} CVEs")
|
|
self.success(f"Skipped {skipped_count} CVEs (no matching rules)")
|
|
|
|
async def export_metadata(self, output_file: str, year: Optional[int], format_type: str):
|
|
"""Export CVE metadata"""
|
|
output_path = Path(output_file)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.info(f"Exporting CVE metadata to: {output_path}")
|
|
self.info(f"Format: {format_type}")
|
|
|
|
if year:
|
|
self.info(f"Filtering by year: {year}")
|
|
|
|
# Get CVEs to export
|
|
cves = self.get_all_cves(year)
|
|
if not cves:
|
|
self.warning("No CVEs found to export")
|
|
return
|
|
|
|
metadata_list = []
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if not metadata:
|
|
continue
|
|
|
|
# Flatten metadata for export
|
|
export_record = self._flatten_metadata(metadata)
|
|
export_record['rules_count'] = len(self.list_cve_rules(cve_id))
|
|
|
|
metadata_list.append(export_record)
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing metadata for {cve_id}: {e}")
|
|
|
|
if not metadata_list:
|
|
self.warning("No metadata found to export")
|
|
return
|
|
|
|
# Export in requested format
|
|
try:
|
|
if format_type == 'json':
|
|
with open(output_path, 'w') as f:
|
|
json.dump(metadata_list, f, indent=2, default=str)
|
|
|
|
elif format_type == 'csv':
|
|
if metadata_list:
|
|
fieldnames = metadata_list[0].keys()
|
|
with open(output_path, 'w', newline='') as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(metadata_list)
|
|
|
|
self.success(f"Exported metadata for {len(metadata_list)} CVEs")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error writing export file: {e}")
|
|
|
|
def _flatten_metadata(self, metadata: Dict) -> Dict:
|
|
"""Flatten nested metadata structure for export"""
|
|
flattened = {}
|
|
|
|
# CVE info fields
|
|
cve_info = metadata.get('cve_info', {})
|
|
flattened.update({
|
|
'cve_id': cve_info.get('cve_id'),
|
|
'description': cve_info.get('description'),
|
|
'cvss_score': cve_info.get('cvss_score'),
|
|
'severity': cve_info.get('severity'),
|
|
'published_date': cve_info.get('published_date'),
|
|
'modified_date': cve_info.get('modified_date'),
|
|
'affected_products_count': len(cve_info.get('affected_products', [])),
|
|
'reference_urls_count': len(cve_info.get('reference_urls', []))
|
|
})
|
|
|
|
# PoC data fields
|
|
poc_data = metadata.get('poc_data', {})
|
|
flattened.update({
|
|
'poc_count': poc_data.get('poc_count', 0),
|
|
'has_nomi_sec_pocs': bool(poc_data.get('poc_data', {}).get('nomi_sec')),
|
|
'has_github_pocs': bool(poc_data.get('poc_data', {}).get('github')),
|
|
'has_exploitdb_pocs': bool(poc_data.get('poc_data', {}).get('exploitdb'))
|
|
})
|
|
|
|
# Processing fields
|
|
processing = metadata.get('processing', {})
|
|
flattened.update({
|
|
'data_source': processing.get('data_source'),
|
|
'bulk_processed': processing.get('bulk_processed', False),
|
|
'reference_sync_status': processing.get('reference_sync_status')
|
|
})
|
|
|
|
# Rule generation fields
|
|
rule_generation = metadata.get('rule_generation', {})
|
|
generation_methods = list(rule_generation.keys())
|
|
flattened.update({
|
|
'generation_methods': ','.join(generation_methods),
|
|
'generation_methods_count': len(generation_methods),
|
|
'has_template_rule': 'template' in generation_methods,
|
|
'has_llm_rule': any('llm' in method for method in generation_methods),
|
|
'has_hybrid_rule': 'hybrid' in generation_methods
|
|
})
|
|
|
|
# Timestamps
|
|
flattened.update({
|
|
'created_at': metadata.get('created_at'),
|
|
'updated_at': metadata.get('updated_at'),
|
|
'migrated_at': metadata.get('migrated_at')
|
|
})
|
|
|
|
return flattened
|
|
|
|
async def export_ruleset(self, output_file: str, year: Optional[int],
|
|
method: Optional[str], include_metadata: bool = True):
|
|
"""Export consolidated ruleset file"""
|
|
output_path = Path(output_file)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.info(f"Creating consolidated ruleset: {output_path}")
|
|
|
|
if year:
|
|
self.info(f"Including year: {year}")
|
|
if method:
|
|
self.info(f"Including method: {method}")
|
|
|
|
# Get CVEs and collect rules
|
|
cves = self.get_all_cves(year)
|
|
ruleset = {
|
|
'metadata': {
|
|
'generated_at': self.format_json_output({"timestamp": "now"})[:19] + 'Z',
|
|
'filter_year': year,
|
|
'filter_method': method,
|
|
'total_cves': len(cves),
|
|
'generator': 'CVE-SIGMA Auto Generator CLI'
|
|
},
|
|
'rules': []
|
|
}
|
|
|
|
rule_count = 0
|
|
|
|
for cve_id in cves:
|
|
try:
|
|
rules = self.list_cve_rules(cve_id)
|
|
|
|
if method:
|
|
rules = [r for r in rules if method.lower() in r.lower()]
|
|
|
|
for rule_file in rules:
|
|
rule_content = self.load_sigma_rule(cve_id, rule_file)
|
|
if not rule_content:
|
|
continue
|
|
|
|
rule_entry = {
|
|
'cve_id': cve_id,
|
|
'rule_file': rule_file,
|
|
'content': rule_content
|
|
}
|
|
|
|
if include_metadata:
|
|
metadata = self.load_cve_metadata(cve_id)
|
|
if metadata:
|
|
rule_entry['cve_metadata'] = {
|
|
'severity': metadata.get('cve_info', {}).get('severity'),
|
|
'cvss_score': metadata.get('cve_info', {}).get('cvss_score'),
|
|
'poc_count': metadata.get('poc_data', {}).get('poc_count', 0)
|
|
}
|
|
|
|
ruleset['rules'].append(rule_entry)
|
|
rule_count += 1
|
|
|
|
except Exception as e:
|
|
self.error(f"Error processing {cve_id}: {e}")
|
|
|
|
# Update metadata with actual counts
|
|
ruleset['metadata']['total_rules'] = rule_count
|
|
|
|
# Save ruleset
|
|
try:
|
|
with open(output_path, 'w') as f:
|
|
json.dump(ruleset, f, indent=2, default=str)
|
|
|
|
self.success(f"Created consolidated ruleset with {rule_count} rules")
|
|
|
|
except Exception as e:
|
|
self.error(f"Error creating ruleset file: {e}") |