221 lines
No EOL
7.6 KiB
Python
221 lines
No EOL
7.6 KiB
Python
"""
|
|
Claude API client for enhanced SIGMA rule generation.
|
|
"""
|
|
import os
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from anthropic import Anthropic
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ClaudeClient:
|
|
"""Client for interacting with Claude API for SIGMA rule generation."""
|
|
|
|
def __init__(self, api_key: Optional[str] = None):
|
|
"""Initialize Claude client with API key."""
|
|
self.api_key = api_key or os.getenv('CLAUDE_API_KEY')
|
|
self.client = None
|
|
|
|
if self.api_key:
|
|
try:
|
|
self.client = Anthropic(api_key=self.api_key)
|
|
logger.info("Claude API client initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Claude API client: {e}")
|
|
self.client = None
|
|
else:
|
|
logger.warning("No Claude API key provided. Claude-enhanced rule generation disabled.")
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Claude API client is available."""
|
|
return self.client is not None
|
|
|
|
async def generate_sigma_rule(self,
|
|
cve_id: str,
|
|
poc_content: str,
|
|
cve_description: str,
|
|
existing_rule: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Generate or enhance a SIGMA rule using Claude API.
|
|
|
|
Args:
|
|
cve_id: CVE identifier
|
|
poc_content: Proof-of-concept code content from GitHub
|
|
cve_description: CVE description from NVD
|
|
existing_rule: Optional existing SIGMA rule to enhance
|
|
|
|
Returns:
|
|
Generated SIGMA rule YAML content or None if failed
|
|
"""
|
|
if not self.is_available():
|
|
logger.warning("Claude API client not available")
|
|
return None
|
|
|
|
try:
|
|
# Construct the prompt for Claude
|
|
prompt = self._build_sigma_generation_prompt(
|
|
cve_id, poc_content, cve_description, existing_rule
|
|
)
|
|
|
|
# Make API call to Claude
|
|
response = self.client.messages.create(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=2000,
|
|
temperature=0.1,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": prompt
|
|
}
|
|
]
|
|
)
|
|
|
|
# Extract the SIGMA rule from response
|
|
sigma_rule = self._extract_sigma_rule(response.content[0].text)
|
|
|
|
logger.info(f"Successfully generated SIGMA rule for {cve_id} using Claude")
|
|
return sigma_rule
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate SIGMA rule for {cve_id} using Claude: {e}")
|
|
return None
|
|
|
|
def _build_sigma_generation_prompt(self,
|
|
cve_id: str,
|
|
poc_content: str,
|
|
cve_description: str,
|
|
existing_rule: Optional[str] = None) -> str:
|
|
"""Build the prompt for Claude to generate SIGMA rules."""
|
|
|
|
base_prompt = f"""You are a cybersecurity expert specializing in SIGMA rule creation for threat detection. Your goal is to analyze exploit code from GitHub PoC repositories and create syntactically correct SIGMA rules.
|
|
|
|
**CVE Information:**
|
|
- CVE ID: {cve_id}
|
|
- Description: {cve_description}
|
|
|
|
**Proof-of-Concept Code:**
|
|
```
|
|
{poc_content[:4000]} # Truncate if too long
|
|
```
|
|
|
|
**Your Task:**
|
|
1. Analyze the exploit code to identify:
|
|
- Process execution patterns
|
|
- File system activities
|
|
- Network connections
|
|
- Registry modifications
|
|
- Command line arguments
|
|
- Suspicious behaviors
|
|
|
|
2. Create a SIGMA rule that:
|
|
- Follows proper SIGMA syntax (YAML format)
|
|
- Includes appropriate detection logic
|
|
- Has relevant metadata (title, description, author, date, references)
|
|
- Uses correct field names for the target log source
|
|
- Includes proper condition logic
|
|
- Maps to relevant MITRE ATT&CK techniques when applicable
|
|
|
|
3. Focus on detection patterns that would catch this specific exploit in action
|
|
|
|
**Important Requirements:**
|
|
- Output ONLY the SIGMA rule in valid YAML format
|
|
- Do not include explanations or comments outside the YAML
|
|
- Use proper SIGMA rule structure with title, id, status, description, references, author, date, logsource, detection, and condition
|
|
- Make the rule specific enough to detect the exploit but not too narrow to miss variants
|
|
- Include relevant tags and MITRE ATT&CK technique mappings"""
|
|
|
|
if existing_rule:
|
|
base_prompt += f"""
|
|
|
|
**Existing SIGMA Rule (to enhance):**
|
|
```yaml
|
|
{existing_rule}
|
|
```
|
|
|
|
Please enhance the existing rule with insights from the PoC code analysis."""
|
|
|
|
return base_prompt
|
|
|
|
def _extract_sigma_rule(self, response_text: str) -> str:
|
|
"""Extract SIGMA rule YAML from Claude's response."""
|
|
# Look for YAML content in the response
|
|
lines = response_text.split('\n')
|
|
yaml_lines = []
|
|
in_yaml = False
|
|
|
|
for line in lines:
|
|
if line.strip().startswith('```yaml') or line.strip().startswith('```'):
|
|
in_yaml = True
|
|
continue
|
|
elif line.strip() == '```' and in_yaml:
|
|
break
|
|
elif in_yaml or line.strip().startswith('title:'):
|
|
yaml_lines.append(line)
|
|
in_yaml = True
|
|
|
|
if not yaml_lines:
|
|
# If no YAML block found, return the whole response
|
|
return response_text.strip()
|
|
|
|
return '\n'.join(yaml_lines).strip()
|
|
|
|
async def enhance_existing_rule(self,
|
|
existing_rule: str,
|
|
poc_content: str,
|
|
cve_id: str) -> Optional[str]:
|
|
"""
|
|
Enhance an existing SIGMA rule with PoC analysis.
|
|
|
|
Args:
|
|
existing_rule: Existing SIGMA rule YAML
|
|
poc_content: PoC code content
|
|
cve_id: CVE identifier
|
|
|
|
Returns:
|
|
Enhanced SIGMA rule or None if failed
|
|
"""
|
|
if not self.is_available():
|
|
return None
|
|
|
|
try:
|
|
prompt = f"""You are a SIGMA rule enhancement expert. Analyze the following PoC code and enhance the existing SIGMA rule with more specific detection patterns.
|
|
|
|
**CVE ID:** {cve_id}
|
|
|
|
**PoC Code:**
|
|
```
|
|
{poc_content[:3000]}
|
|
```
|
|
|
|
**Existing SIGMA Rule:**
|
|
```yaml
|
|
{existing_rule}
|
|
```
|
|
|
|
**Task:** Enhance the existing rule by:
|
|
1. Adding more specific detection patterns found in the PoC
|
|
2. Improving the condition logic
|
|
3. Adding relevant tags or MITRE ATT&CK mappings
|
|
4. Keeping the rule structure intact but making it more effective
|
|
|
|
Output ONLY the enhanced SIGMA rule in valid YAML format."""
|
|
|
|
response = self.client.messages.create(
|
|
model="claude-3-5-sonnet-20241022",
|
|
max_tokens=2000,
|
|
temperature=0.1,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": prompt
|
|
}
|
|
]
|
|
)
|
|
|
|
enhanced_rule = self._extract_sigma_rule(response.content[0].text)
|
|
logger.info(f"Successfully enhanced SIGMA rule for {cve_id}")
|
|
return enhanced_rule
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to enhance SIGMA rule for {cve_id}: {e}")
|
|
return None |