auto_sigma_rule_generator/cli/commands/export_commands.py
bpmcdevitt e579c91b5e MAJOR: Transform web application to professional CLI-based SIGMA rule generator
🎉 **Architecture Transformation (v2.0)**
- Complete migration from web app to professional CLI tool
- File-based SIGMA rule management system
- Git-friendly directory structure organized by year/CVE-ID
- Multiple rule variants per CVE (template, LLM, hybrid)

 **New CLI System**
- Professional command-line interface with Click framework
- 8 command groups: process, generate, search, stats, export, migrate
- Modular command architecture for maintainability
- Comprehensive help system and configuration management

📁 **File-Based Storage Architecture**
- Individual CVE directories: cves/YEAR/CVE-ID/
- Multiple SIGMA rule variants per CVE
- JSON metadata with processing history and PoC data
- Native YAML files perfect for version control

🚀 **Core CLI Commands**
- process: CVE processing and bulk operations
- generate: SIGMA rule generation with multiple methods
- search: Advanced CVE and rule searching with filters
- stats: Comprehensive statistics and analytics
- export: Multiple output formats for different workflows
- migrate: Database-to-file migration tools

🔧 **Migration Support**
- Complete migration utilities from web database
- Data validation and integrity checking
- Backward compatibility with existing processors
- Legacy web interface maintained for transition

📊 **Enhanced Features**
- Advanced search with complex filtering (severity, PoC presence, etc.)
- Multi-format exports (YAML, JSON, CSV)
- Comprehensive statistics and coverage reports
- File-based rule versioning and management

🎯 **Production Benefits**
- No database dependency - runs anywhere
- Perfect for cybersecurity teams using git workflows
- Direct integration with SIGMA ecosystems
- Portable architecture for CI/CD pipelines
- Multiple rule variants for different detection scenarios

📝 **Documentation Updates**
- Complete README rewrite for CLI-first approach
- Updated CLAUDE.md with new architecture details
- Detailed CLI documentation with examples
- Migration guides and troubleshooting

**Perfect for security teams wanting production-ready SIGMA rules with version control\! 🛡️**

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-21 13:11:03 -05:00

282 lines
No EOL
11 KiB
Python

"""
Export Commands
Commands for exporting SIGMA rules and CVE data in various formats.
"""
import json
import csv
import shutil
from pathlib import Path
from typing import Dict, List, Optional
from .base_command import BaseCommand
class ExportCommands(BaseCommand):
"""Commands for exporting data"""
async def export_sigma_rules(self, output_dir: str, year: Optional[int],
format_type: str, method: Optional[str]):
"""Export SIGMA rules to a directory"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
self.info(f"Exporting SIGMA rules to: {output_path}")
self.info(f"Format: {format_type}")
if year:
self.info(f"Filtering by year: {year}")
if method:
self.info(f"Filtering by method: {method}")
# Get CVEs to export
cves = self.get_all_cves(year)
if not cves:
self.warning("No CVEs found to export")
return
exported_count = 0
skipped_count = 0
for cve_id in cves:
try:
rules = self.list_cve_rules(cve_id)
if method:
# Filter rules by method
rules = [r for r in rules if method.lower() in r.lower()]
if not rules:
skipped_count += 1
continue
# Create CVE directory in export location
cve_export_dir = output_path / cve_id
cve_export_dir.mkdir(exist_ok=True)
for rule_file in rules:
rule_content = self.load_sigma_rule(cve_id, rule_file)
if not rule_content:
continue
if format_type == 'yaml':
# Export as YAML (original format)
export_file = cve_export_dir / rule_file
with open(export_file, 'w') as f:
f.write(rule_content)
elif format_type == 'json':
# Convert YAML to JSON (basic conversion)
try:
import yaml
rule_dict = yaml.safe_load(rule_content)
export_file = cve_export_dir / rule_file.replace('.sigma', '.json')
with open(export_file, 'w') as f:
json.dump(rule_dict, f, indent=2)
except Exception as e:
self.error(f"Error converting {rule_file} to JSON: {e}")
continue
exported_count += 1
# Export metadata for context
metadata = self.load_cve_metadata(cve_id)
if metadata:
metadata_file = cve_export_dir / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
if exported_count % 50 == 0:
self.info(f"Exported {exported_count} rules...")
except Exception as e:
self.error(f"Error exporting rules for {cve_id}: {e}")
skipped_count += 1
self.success(f"Export completed!")
self.success(f"Exported {exported_count} rules from {len(cves) - skipped_count} CVEs")
self.success(f"Skipped {skipped_count} CVEs (no matching rules)")
async def export_metadata(self, output_file: str, year: Optional[int], format_type: str):
"""Export CVE metadata"""
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
self.info(f"Exporting CVE metadata to: {output_path}")
self.info(f"Format: {format_type}")
if year:
self.info(f"Filtering by year: {year}")
# Get CVEs to export
cves = self.get_all_cves(year)
if not cves:
self.warning("No CVEs found to export")
return
metadata_list = []
for cve_id in cves:
try:
metadata = self.load_cve_metadata(cve_id)
if not metadata:
continue
# Flatten metadata for export
export_record = self._flatten_metadata(metadata)
export_record['rules_count'] = len(self.list_cve_rules(cve_id))
metadata_list.append(export_record)
except Exception as e:
self.error(f"Error processing metadata for {cve_id}: {e}")
if not metadata_list:
self.warning("No metadata found to export")
return
# Export in requested format
try:
if format_type == 'json':
with open(output_path, 'w') as f:
json.dump(metadata_list, f, indent=2, default=str)
elif format_type == 'csv':
if metadata_list:
fieldnames = metadata_list[0].keys()
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(metadata_list)
self.success(f"Exported metadata for {len(metadata_list)} CVEs")
except Exception as e:
self.error(f"Error writing export file: {e}")
def _flatten_metadata(self, metadata: Dict) -> Dict:
"""Flatten nested metadata structure for export"""
flattened = {}
# CVE info fields
cve_info = metadata.get('cve_info', {})
flattened.update({
'cve_id': cve_info.get('cve_id'),
'description': cve_info.get('description'),
'cvss_score': cve_info.get('cvss_score'),
'severity': cve_info.get('severity'),
'published_date': cve_info.get('published_date'),
'modified_date': cve_info.get('modified_date'),
'affected_products_count': len(cve_info.get('affected_products', [])),
'reference_urls_count': len(cve_info.get('reference_urls', []))
})
# PoC data fields
poc_data = metadata.get('poc_data', {})
flattened.update({
'poc_count': poc_data.get('poc_count', 0),
'has_nomi_sec_pocs': bool(poc_data.get('poc_data', {}).get('nomi_sec')),
'has_github_pocs': bool(poc_data.get('poc_data', {}).get('github')),
'has_exploitdb_pocs': bool(poc_data.get('poc_data', {}).get('exploitdb'))
})
# Processing fields
processing = metadata.get('processing', {})
flattened.update({
'data_source': processing.get('data_source'),
'bulk_processed': processing.get('bulk_processed', False),
'reference_sync_status': processing.get('reference_sync_status')
})
# Rule generation fields
rule_generation = metadata.get('rule_generation', {})
generation_methods = list(rule_generation.keys())
flattened.update({
'generation_methods': ','.join(generation_methods),
'generation_methods_count': len(generation_methods),
'has_template_rule': 'template' in generation_methods,
'has_llm_rule': any('llm' in method for method in generation_methods),
'has_hybrid_rule': 'hybrid' in generation_methods
})
# Timestamps
flattened.update({
'created_at': metadata.get('created_at'),
'updated_at': metadata.get('updated_at'),
'migrated_at': metadata.get('migrated_at')
})
return flattened
async def export_ruleset(self, output_file: str, year: Optional[int],
method: Optional[str], include_metadata: bool = True):
"""Export consolidated ruleset file"""
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
self.info(f"Creating consolidated ruleset: {output_path}")
if year:
self.info(f"Including year: {year}")
if method:
self.info(f"Including method: {method}")
# Get CVEs and collect rules
cves = self.get_all_cves(year)
ruleset = {
'metadata': {
'generated_at': self.format_json_output({"timestamp": "now"})[:19] + 'Z',
'filter_year': year,
'filter_method': method,
'total_cves': len(cves),
'generator': 'CVE-SIGMA Auto Generator CLI'
},
'rules': []
}
rule_count = 0
for cve_id in cves:
try:
rules = self.list_cve_rules(cve_id)
if method:
rules = [r for r in rules if method.lower() in r.lower()]
for rule_file in rules:
rule_content = self.load_sigma_rule(cve_id, rule_file)
if not rule_content:
continue
rule_entry = {
'cve_id': cve_id,
'rule_file': rule_file,
'content': rule_content
}
if include_metadata:
metadata = self.load_cve_metadata(cve_id)
if metadata:
rule_entry['cve_metadata'] = {
'severity': metadata.get('cve_info', {}).get('severity'),
'cvss_score': metadata.get('cve_info', {}).get('cvss_score'),
'poc_count': metadata.get('poc_data', {}).get('poc_count', 0)
}
ruleset['rules'].append(rule_entry)
rule_count += 1
except Exception as e:
self.error(f"Error processing {cve_id}: {e}")
# Update metadata with actual counts
ruleset['metadata']['total_rules'] = rule_count
# Save ruleset
try:
with open(output_path, 'w') as f:
json.dump(ruleset, f, indent=2, default=str)
self.success(f"Created consolidated ruleset with {rule_count} rules")
except Exception as e:
self.error(f"Error creating ruleset file: {e}")