add templates to enhanced sigma generator

This commit is contained in:
Brendan McDevitt 2025-07-09 07:22:51 -05:00
parent 790e4bd91f
commit cfaad8b359
4 changed files with 634 additions and 24 deletions

View file

@ -106,8 +106,9 @@ class EnhancedSigmaGenerator:
templates = self.db_session.query(RuleTemplate).all()
if not templates:
logger.warning("No rule templates found in database")
return None
logger.warning("No rule templates found in database - creating default template")
# Create a default template if none exist
return self._create_default_template(cve, best_poc)
# Score templates based on relevance
template_scores = {}
@ -135,7 +136,7 @@ class EnhancedSigmaGenerator:
logger.info(f"Selected template {best_template.template_name} with score {template_scores[best_template]}")
return best_template
return None
return self._create_default_template(cve, best_poc)
def _score_template_poc_match(self, template: object, indicators: dict) -> int:
"""Score template based on PoC indicators"""
@ -215,11 +216,17 @@ class EnhancedSigmaGenerator:
# Get base template content
rule_content = template.template_content
# Generate a unique rule ID
import uuid
rule_id = str(uuid.uuid4())
# Replace template placeholders
replacements = {
'{{CVE_ID}}': cve.cve_id,
'{{RULE_ID}}': rule_id,
'{{TITLE}}': f"{cve.cve_id} Enhanced Detection",
'{{DESCRIPTION}}': self._generate_description(cve, poc_data),
'{{DATE}}': datetime.now().strftime('%Y/%m/%d'),
'{{LEVEL}}': self._calculate_confidence_level(cve, poc_data).lower(),
'{{REFERENCES}}': self._generate_references(cve, poc_data),
'{{TAGS}}': self._generate_tags(cve, poc_data),
@ -235,6 +242,9 @@ class EnhancedSigmaGenerator:
for placeholder, value in replacements.items():
rule_content = rule_content.replace(placeholder, value)
# Clean up empty sections
rule_content = self._clean_empty_sections(rule_content)
# Add enhanced detection based on PoC quality
if poc_data:
rule_content = self._enhance_detection_logic(rule_content, combined_indicators, poc_data)
@ -338,18 +348,20 @@ class EnhancedSigmaGenerator:
def _format_indicators(self, indicators: list) -> str:
"""Format indicators for SIGMA rule"""
if not indicators:
return ''
return ' - "*" # No specific indicators available'
# Limit indicators to avoid overly complex rules
limited_indicators = indicators[:10]
formatted = []
for indicator in limited_indicators:
# Escape special characters for SIGMA
escaped = indicator.replace('\\\\', '\\\\\\\\').replace('*', '\\\\*').replace('?', '\\\\?')
formatted.append(f' - "{escaped}"')
# Clean and escape special characters for SIGMA
cleaned = str(indicator).strip()
if cleaned:
escaped = cleaned.replace('\\\\', '\\\\\\\\').replace('*', '\\\\*').replace('?', '\\\\?')
formatted.append(f' - "{escaped}"')
return '\\n'.join(formatted)
return '\\n'.join(formatted) if formatted else ' - "*" # No valid indicators'
def _enhance_detection_logic(self, rule_content: str, indicators: dict, poc_data: list) -> str:
"""Enhance detection logic based on PoC quality and indicators"""
@ -420,6 +432,145 @@ class EnhancedSigmaGenerator:
else:
return 'INFORMATIONAL'
def _create_default_template(self, cve, best_poc: Optional[dict]) -> object:
"""Create a default template based on CVE and PoC analysis"""
from main import RuleTemplate
import uuid
# Analyze the best PoC to determine the most appropriate template type
template_type = "process"
if best_poc:
indicators = best_poc.get('exploit_indicators', {})
if indicators.get('network') or indicators.get('urls'):
template_type = "network"
elif indicators.get('files'):
template_type = "file"
elif any('powershell' in p.lower() for p in indicators.get('processes', [])):
template_type = "powershell"
# Create template content based on type
if template_type == "network":
template_content = """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: network_connection
product: windows
detection:
selection:
Initiated: true
DestinationIp:
{{NETWORK}}
selection_url:
DestinationHostname|contains:
{{URLS}}
condition: selection or selection_url
falsepositives:
- Legitimate network connections
level: {{LEVEL}}"""
elif template_type == "file":
template_content = """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: file_event
product: windows
detection:
selection:
TargetFilename|contains:
{{FILES}}
condition: selection
falsepositives:
- Legitimate file operations
level: {{LEVEL}}"""
elif template_type == "powershell":
template_content = """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith:
- '\\powershell.exe'
- '\\pwsh.exe'
CommandLine|contains:
{{COMMANDS}}
condition: selection
falsepositives:
- Legitimate PowerShell scripts
level: {{LEVEL}}"""
else: # default to process
template_content = """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith:
{{PROCESSES}}
selection_cmd:
CommandLine|contains:
{{COMMANDS}}
condition: selection or selection_cmd
falsepositives:
- Legitimate software usage
level: {{LEVEL}}"""
# Create a temporary template object
class DefaultTemplate:
def __init__(self, name, content):
self.template_name = name
self.template_content = content
self.applicable_product_patterns = []
return DefaultTemplate(f"Default {template_type.title()} Template", template_content)
def _clean_empty_sections(self, rule_content: str) -> str:
"""Clean up empty sections in the SIGMA rule"""
# Remove lines that contain only placeholder indicators
lines = rule_content.split('\n')
cleaned_lines = []
for line in lines:
# Skip lines that are just placeholder indicators
if '- "*" # No' in line and 'or selection' in rule_content:
continue
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)
def _extract_log_source(self, template_name: str) -> str:
"""Extract log source from template name"""
template_lower = template_name.lower()

View file

@ -0,0 +1,220 @@
#!/usr/bin/env python3
"""
Initialize SIGMA rule templates for enhanced rule generation
"""
import json
from datetime import datetime
from main import SessionLocal, RuleTemplate, Base, engine
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
# Template definitions with actual SIGMA rule content
SIGMA_TEMPLATES = [
{
"template_name": "Process Execution Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith:
{{PROCESSES}}
selection_cmd:
CommandLine|contains:
{{COMMANDS}}
condition: selection or selection_cmd
falsepositives:
- Legitimate software installations
- System administration tasks
level: {{LEVEL}}""",
"applicable_product_patterns": ["windows", "microsoft", "office", "exchange", "sharepoint"],
"description": "Detects suspicious process execution based on PoC exploit indicators"
},
{
"template_name": "Network Connection Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: network_connection
product: windows
detection:
selection:
Initiated: true
DestinationIp:
{{NETWORK}}
selection_url:
DestinationHostname|contains:
{{URLS}}
condition: selection or selection_url
falsepositives:
- Legitimate network connections
- Software updates
level: {{LEVEL}}""",
"applicable_product_patterns": ["network", "web", "http", "https", "tcp", "udp"],
"description": "Detects suspicious network connections based on PoC exploit indicators"
},
{
"template_name": "File System Activity Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: file_event
product: windows
detection:
selection:
TargetFilename|contains:
{{FILES}}
condition: selection
falsepositives:
- Legitimate file operations
- Software installations
level: {{LEVEL}}""",
"applicable_product_patterns": ["file", "filesystem", "upload", "download"],
"description": "Detects suspicious file system activity based on PoC exploit indicators"
},
{
"template_name": "PowerShell Execution Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: process_creation
product: windows
detection:
selection:
Image|endswith:
- '\\powershell.exe'
- '\\pwsh.exe'
CommandLine|contains:
{{COMMANDS}}
condition: selection
falsepositives:
- Legitimate PowerShell scripts
- System administration
level: {{LEVEL}}""",
"applicable_product_patterns": ["powershell", "windows", "microsoft"],
"description": "Detects suspicious PowerShell execution based on PoC exploit indicators"
},
{
"template_name": "Web Application Attack Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: webserver
detection:
selection:
cs-uri-query|contains:
{{URLS}}
selection_user_agent:
cs-user-agent|contains:
{{COMMANDS}}
condition: selection or selection_user_agent
falsepositives:
- Legitimate web application usage
- Security scanners
level: {{LEVEL}}""",
"applicable_product_patterns": ["web", "http", "apache", "nginx", "iis"],
"description": "Detects web application attacks based on PoC exploit indicators"
},
{
"template_name": "Registry Modification Detection",
"template_content": """title: {{TITLE}}
id: {{RULE_ID}}
status: experimental
description: {{DESCRIPTION}}
author: CVE-SIGMA Auto Generator
date: {{DATE}}
references:
{{REFERENCES}}
tags:
{{TAGS}}
logsource:
category: registry_event
product: windows
detection:
selection:
TargetObject|contains:
{{REGISTRY}}
condition: selection
falsepositives:
- Legitimate software configuration changes
- System updates
level: {{LEVEL}}""",
"applicable_product_patterns": ["registry", "windows", "microsoft"],
"description": "Detects suspicious registry modifications based on PoC exploit indicators"
}
]
def initialize_templates():
"""Initialize rule templates in the database"""
db = SessionLocal()
try:
# Clear existing templates
db.query(RuleTemplate).delete()
# Add new templates
for template_data in SIGMA_TEMPLATES:
template = RuleTemplate(
template_name=template_data["template_name"],
template_content=template_data["template_content"],
applicable_product_patterns=template_data["applicable_product_patterns"],
description=template_data["description"]
)
db.add(template)
db.commit()
print(f"Successfully initialized {len(SIGMA_TEMPLATES)} rule templates")
except Exception as e:
db.rollback()
print(f"Error initializing templates: {e}")
raise
finally:
db.close()
if __name__ == "__main__":
initialize_templates()

View file

@ -207,59 +207,87 @@ class NomiSecClient:
full_text = " ".join(text_sources).lower()
# Process patterns
# Enhanced process patterns
process_patterns = [
r'\b(cmd\.exe|powershell\.exe|bash|sh|python\.exe|java\.exe)\b',
r'\b(createprocess|shellexecute|system)\b',
r'\b(reverse.?shell|bind.?shell)\b'
r'\b(cmd\.exe|powershell\.exe|bash|sh|python\.exe|java\.exe|node\.exe)\b',
r'\b(createprocess|shellexecute|system|winexec)\b',
r'\b(reverse.?shell|bind.?shell|web.?shell)\b',
r'\b(mshta\.exe|rundll32\.exe|regsvr32\.exe|wscript\.exe|cscript\.exe)\b',
r'\b(certutil\.exe|bitsadmin\.exe|schtasks\.exe)\b'
]
for pattern in process_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators["processes"].extend(matches)
# File patterns
# Enhanced file patterns
file_patterns = [
r'\b([a-zA-Z]:\\[^\\]+\\[^\\]+\.[a-zA-Z0-9]+)\b', # Windows paths
r'\b([a-zA-Z]:\\[^\\\s]+\\[^\\\s]+\.[a-zA-Z0-9]+)\b', # Windows paths
r'\b(/[^/\s]+/[^/\s]+\.[a-zA-Z0-9]+)\b', # Unix paths
r'\b(\w+\.(exe|dll|bat|ps1|py|sh|jar))\b' # Common executable files
r'\b(\w+\.(exe|dll|bat|ps1|py|sh|jar|php|jsp|asp|aspx))\b', # Common executable files
r'\b(\w+\.(txt|log|tmp|temp|dat|bin))\b', # Common data files
r'\b(payload|exploit|shell|backdoor|trojan)\b' # Malicious file indicators
]
for pattern in file_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
if isinstance(matches[0], tuple) if matches else False:
if matches and isinstance(matches[0], tuple):
indicators["files"].extend([m[0] for m in matches])
else:
indicators["files"].extend(matches)
# Network patterns
# Enhanced network patterns
network_patterns = [
r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', # IP addresses
r'\b((?:\d{1,5})|(?:0x[a-fA-F0-9]{1,4}))\b', # Ports
r'\b(http[s]?://[^\s]+)\b' # URLs
r'\b(http[s]?://[^\s<>"]+)\b', # URLs
r'\b([a-zA-Z0-9-]+\.[a-zA-Z]{2,})\b' # Domain names
]
for pattern in network_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
if pattern.startswith(r'\b(http'):
if 'http' in pattern:
indicators["urls"].extend(matches)
elif '\\d{1,3}\\.' in pattern or '\\d{1,5}' in pattern:
indicators["network"].extend(matches)
else:
indicators["network"].extend(matches)
# Command patterns
# Enhanced command patterns
command_patterns = [
r'\b(curl|wget|nc|netcat|ncat)\b',
r'\b(whoami|id|uname|systeminfo)\b',
r'\b(cat|type|more|less)\b'
r'\b(curl|wget|nc|netcat|ncat|telnet)\b',
r'\b(whoami|id|uname|systeminfo|ipconfig|ifconfig)\b',
r'\b(cat|type|more|less|head|tail)\b',
r'\b(echo|print|printf)\b',
r'\b(base64|decode|encode)\b',
r'\b(invoke|iex|downloadstring)\b',
r'\b(net\s+user|net\s+localgroup)\b',
r'\b(sc\s+create|sc\s+start)\b'
]
for pattern in command_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators["commands"].extend(matches)
# Registry patterns
registry_patterns = [
r'\b(HKEY_[A-Z_]+)\b',
r'\b(HKLM|HKCU|HKCR|HKU|HKCC)\b',
r'\b(reg\s+add|reg\s+query|reg\s+delete)\b',
r'\b(SOFTWARE\\\\[^\\\s]+)\b',
r'\b(SYSTEM\\\\[^\\\s]+)\b'
]
for pattern in registry_patterns:
matches = re.findall(pattern, full_text, re.IGNORECASE)
indicators["registry"].extend(matches)
# Clean up and deduplicate
for key in indicators:
indicators[key] = list(set(indicators[key]))
# Remove empty strings and duplicates
indicators[key] = list(set([item for item in indicators[key] if item and len(item.strip()) > 2]))
# Limit to reasonable number of indicators
indicators[key] = indicators[key][:15]
return indicators

View file

@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Test script for enhanced SIGMA rule generation
"""
import asyncio
import json
from datetime import datetime
from main import SessionLocal, CVE, SigmaRule, Base, engine
from enhanced_sigma_generator import EnhancedSigmaGenerator
from nomi_sec_client import NomiSecClient
from initialize_templates import initialize_templates
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
async def test_enhanced_rule_generation():
"""Test the enhanced rule generation with mock data"""
# Initialize templates
print("Initializing templates...")
initialize_templates()
db = SessionLocal()
try:
# Check if CVE already exists, if not create it
test_cve = db.query(CVE).filter(CVE.cve_id == "CVE-2014-7236").first()
if not test_cve:
# Create a test CVE with mock PoC data
test_cve = CVE(
cve_id="CVE-2014-7236",
description="Remote code execution vulnerability in Microsoft Office",
cvss_score=8.5,
severity="high",
published_date=datetime(2014, 10, 15),
affected_products=["Microsoft Office", "Windows"],
poc_count=2,
poc_data=[
{
"id": "test1",
"name": "CVE-2014-7236-exploit",
"owner": "security-researcher",
"full_name": "security-researcher/CVE-2014-7236-exploit",
"html_url": "https://github.com/security-researcher/CVE-2014-7236-exploit",
"description": "PowerShell exploit for CVE-2014-7236 using cmd.exe and powershell.exe",
"stargazers_count": 15,
"created_at": "2014-11-01T00:00:00Z",
"updated_at": "2014-11-15T00:00:00Z",
"quality_analysis": {
"quality_score": 75,
"quality_tier": "good",
"factors": {
"star_score": 30,
"recency_score": 10,
"description_score": 15,
"vuln_description_score": 15,
"name_relevance_score": 10
}
},
"exploit_indicators": {
"processes": ["powershell.exe", "cmd.exe"],
"files": ["exploit.ps1", "payload.exe"],
"commands": ["Invoke-Expression", "DownloadString", "whoami"],
"network": ["192.168.1.100", "8080"],
"urls": ["http://malicious.com/payload"],
"registry": ["HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft"]
}
},
{
"id": "test2",
"name": "office-exploit-poc",
"owner": "hacker",
"full_name": "hacker/office-exploit-poc",
"html_url": "https://github.com/hacker/office-exploit-poc",
"description": "Office document exploit with malicious macro",
"stargazers_count": 8,
"created_at": "2014-12-01T00:00:00Z",
"updated_at": "2014-12-10T00:00:00Z",
"quality_analysis": {
"quality_score": 45,
"quality_tier": "fair",
"factors": {
"star_score": 16,
"recency_score": 8,
"description_score": 12,
"vuln_description_score": 0,
"name_relevance_score": 5
}
},
"exploit_indicators": {
"processes": ["winword.exe", "excel.exe"],
"files": ["document.docx", "malicious.xlsm"],
"commands": ["CreateObject", "Shell.Application"],
"network": ["10.0.0.1"],
"urls": ["http://evil.com/download"],
"registry": ["HKEY_CURRENT_USER\\Software\\Microsoft\\Office"]
}
}
]
)
# Add to database
db.add(test_cve)
db.commit()
else:
# Update existing CVE with our mock PoC data
test_cve.poc_count = 2
test_cve.poc_data = [
{
"id": "test1",
"name": "CVE-2014-7236-exploit",
"owner": "security-researcher",
"full_name": "security-researcher/CVE-2014-7236-exploit",
"html_url": "https://github.com/security-researcher/CVE-2014-7236-exploit",
"description": "PowerShell exploit for CVE-2014-7236 using cmd.exe and powershell.exe",
"stargazers_count": 15,
"created_at": "2014-11-01T00:00:00Z",
"updated_at": "2014-11-15T00:00:00Z",
"quality_analysis": {
"quality_score": 75,
"quality_tier": "good",
"factors": {
"star_score": 30,
"recency_score": 10,
"description_score": 15,
"vuln_description_score": 15,
"name_relevance_score": 10
}
},
"exploit_indicators": {
"processes": ["powershell.exe", "cmd.exe"],
"files": ["exploit.ps1", "payload.exe"],
"commands": ["Invoke-Expression", "DownloadString", "whoami"],
"network": ["192.168.1.100", "8080"],
"urls": ["http://malicious.com/payload"],
"registry": ["HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft"]
}
},
{
"id": "test2",
"name": "office-exploit-poc",
"owner": "hacker",
"full_name": "hacker/office-exploit-poc",
"html_url": "https://github.com/hacker/office-exploit-poc",
"description": "Office document exploit with malicious macro",
"stargazers_count": 8,
"created_at": "2014-12-01T00:00:00Z",
"updated_at": "2014-12-10T00:00:00Z",
"quality_analysis": {
"quality_score": 45,
"quality_tier": "fair",
"factors": {
"star_score": 16,
"recency_score": 8,
"description_score": 12,
"vuln_description_score": 0,
"name_relevance_score": 5
}
},
"exploit_indicators": {
"processes": ["winword.exe", "excel.exe"],
"files": ["document.docx", "malicious.xlsm"],
"commands": ["CreateObject", "Shell.Application"],
"network": ["10.0.0.1"],
"urls": ["http://evil.com/download"],
"registry": ["HKEY_CURRENT_USER\\Software\\Microsoft\\Office"]
}
}
]
db.commit()
print(f"Using CVE: {test_cve.cve_id} with {test_cve.poc_count} PoCs")
# Generate enhanced rule
print("Generating enhanced SIGMA rule...")
generator = EnhancedSigmaGenerator(db)
result = await generator.generate_enhanced_rule(test_cve)
print(f"Generation result: {result}")
if result.get('success'):
# Fetch the generated rule
sigma_rule = db.query(SigmaRule).filter(SigmaRule.cve_id == test_cve.cve_id).first()
if sigma_rule:
print("\n" + "="*60)
print("GENERATED SIGMA RULE:")
print("="*60)
print(sigma_rule.rule_content)
print("="*60)
print(f"Detection Type: {sigma_rule.detection_type}")
print(f"Log Source: {sigma_rule.log_source}")
print(f"Confidence Level: {sigma_rule.confidence_level}")
print(f"PoC Quality Score: {sigma_rule.poc_quality_score}")
print(f"Exploit Indicators: {sigma_rule.exploit_indicators}")
print("="*60)
else:
print("No SIGMA rule found in database")
else:
print(f"Rule generation failed: {result.get('error')}")
except Exception as e:
print(f"Error during test: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
asyncio.run(test_enhanced_rule_generation())