auto_sigma_rule_generator/backend/claude_client.py

221 lines
No EOL
7.6 KiB
Python

"""
Claude API client for enhanced SIGMA rule generation.
"""
import os
import logging
from typing import Optional, Dict, Any
from anthropic import Anthropic
logger = logging.getLogger(__name__)
class ClaudeClient:
"""Client for interacting with Claude API for SIGMA rule generation."""
def __init__(self, api_key: Optional[str] = None):
"""Initialize Claude client with API key."""
self.api_key = api_key or os.getenv('CLAUDE_API_KEY')
self.client = None
if self.api_key:
try:
self.client = Anthropic(api_key=self.api_key)
logger.info("Claude API client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Claude API client: {e}")
self.client = None
else:
logger.warning("No Claude API key provided. Claude-enhanced rule generation disabled.")
def is_available(self) -> bool:
"""Check if Claude API client is available."""
return self.client is not None
async def generate_sigma_rule(self,
cve_id: str,
poc_content: str,
cve_description: str,
existing_rule: Optional[str] = None) -> Optional[str]:
"""
Generate or enhance a SIGMA rule using Claude API.
Args:
cve_id: CVE identifier
poc_content: Proof-of-concept code content from GitHub
cve_description: CVE description from NVD
existing_rule: Optional existing SIGMA rule to enhance
Returns:
Generated SIGMA rule YAML content or None if failed
"""
if not self.is_available():
logger.warning("Claude API client not available")
return None
try:
# Construct the prompt for Claude
prompt = self._build_sigma_generation_prompt(
cve_id, poc_content, cve_description, existing_rule
)
# Make API call to Claude
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
temperature=0.1,
messages=[
{
"role": "user",
"content": prompt
}
]
)
# Extract the SIGMA rule from response
sigma_rule = self._extract_sigma_rule(response.content[0].text)
logger.info(f"Successfully generated SIGMA rule for {cve_id} using Claude")
return sigma_rule
except Exception as e:
logger.error(f"Failed to generate SIGMA rule for {cve_id} using Claude: {e}")
return None
def _build_sigma_generation_prompt(self,
cve_id: str,
poc_content: str,
cve_description: str,
existing_rule: Optional[str] = None) -> str:
"""Build the prompt for Claude to generate SIGMA rules."""
base_prompt = f"""You are a cybersecurity expert specializing in SIGMA rule creation for threat detection. Your goal is to analyze exploit code from GitHub PoC repositories and create syntactically correct SIGMA rules.
**CVE Information:**
- CVE ID: {cve_id}
- Description: {cve_description}
**Proof-of-Concept Code:**
```
{poc_content[:4000]} # Truncate if too long
```
**Your Task:**
1. Analyze the exploit code to identify:
- Process execution patterns
- File system activities
- Network connections
- Registry modifications
- Command line arguments
- Suspicious behaviors
2. Create a SIGMA rule that:
- Follows proper SIGMA syntax (YAML format)
- Includes appropriate detection logic
- Has relevant metadata (title, description, author, date, references)
- Uses correct field names for the target log source
- Includes proper condition logic
- Maps to relevant MITRE ATT&CK techniques when applicable
3. Focus on detection patterns that would catch this specific exploit in action
**Important Requirements:**
- Output ONLY the SIGMA rule in valid YAML format
- Do not include explanations or comments outside the YAML
- Use proper SIGMA rule structure with title, id, status, description, references, author, date, logsource, detection, and condition
- Make the rule specific enough to detect the exploit but not too narrow to miss variants
- Include relevant tags and MITRE ATT&CK technique mappings"""
if existing_rule:
base_prompt += f"""
**Existing SIGMA Rule (to enhance):**
```yaml
{existing_rule}
```
Please enhance the existing rule with insights from the PoC code analysis."""
return base_prompt
def _extract_sigma_rule(self, response_text: str) -> str:
"""Extract SIGMA rule YAML from Claude's response."""
# Look for YAML content in the response
lines = response_text.split('\n')
yaml_lines = []
in_yaml = False
for line in lines:
if line.strip().startswith('```yaml') or line.strip().startswith('```'):
in_yaml = True
continue
elif line.strip() == '```' and in_yaml:
break
elif in_yaml or line.strip().startswith('title:'):
yaml_lines.append(line)
in_yaml = True
if not yaml_lines:
# If no YAML block found, return the whole response
return response_text.strip()
return '\n'.join(yaml_lines).strip()
async def enhance_existing_rule(self,
existing_rule: str,
poc_content: str,
cve_id: str) -> Optional[str]:
"""
Enhance an existing SIGMA rule with PoC analysis.
Args:
existing_rule: Existing SIGMA rule YAML
poc_content: PoC code content
cve_id: CVE identifier
Returns:
Enhanced SIGMA rule or None if failed
"""
if not self.is_available():
return None
try:
prompt = f"""You are a SIGMA rule enhancement expert. Analyze the following PoC code and enhance the existing SIGMA rule with more specific detection patterns.
**CVE ID:** {cve_id}
**PoC Code:**
```
{poc_content[:3000]}
```
**Existing SIGMA Rule:**
```yaml
{existing_rule}
```
**Task:** Enhance the existing rule by:
1. Adding more specific detection patterns found in the PoC
2. Improving the condition logic
3. Adding relevant tags or MITRE ATT&CK mappings
4. Keeping the rule structure intact but making it more effective
Output ONLY the enhanced SIGMA rule in valid YAML format."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
temperature=0.1,
messages=[
{
"role": "user",
"content": prompt
}
]
)
enhanced_rule = self._extract_sigma_rule(response.content[0].text)
logger.info(f"Successfully enhanced SIGMA rule for {cve_id}")
return enhanced_rule
except Exception as e:
logger.error(f"Failed to enhance SIGMA rule for {cve_id}: {e}")
return None