splunk_local/generators/dns_logs.py
bpmcdevitt fa8fd73f1a Add security-focused log generators for SOC and SIEM testing
- Implement 5 new security log generators: Windows events, firewall logs, DNS queries, authentication logs, and cloud service logs
- Add 'security' Docker Compose profile for easy deployment of security generators
- Windows generator creates realistic Security/System/Application events with attack patterns (failed logins, account creation, service events)
- Firewall generator supports pfSense, iptables, and Cisco ASA formats with malicious traffic blocking simulation
- DNS generator includes DGA domains, suspicious lookups, and multiple DNS server formats (BIND, Pi-hole, Windows DNS)
- Authentication generator creates LDAP, RADIUS, and SSH logs with brute force attack patterns
- Cloud generator produces AWS CloudTrail, Azure Activity, and GCP audit logs with security-relevant events
- Update documentation with comprehensive security use cases for SOC training, threat hunting, and compliance testing
- Enhance Docker Compose configuration with new security profile and service definitions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 17:44:47 -05:00

244 lines
No EOL
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
DNS query log generator for Splunk security testing
Generates realistic DNS query logs with suspicious domain patterns and DGA domains
"""
import time
import random
import datetime
import string
from pathlib import Path
# Legitimate domains for normal traffic
LEGITIMATE_DOMAINS = [
'google.com', 'microsoft.com', 'amazon.com', 'facebook.com', 'twitter.com',
'github.com', 'stackoverflow.com', 'wikipedia.org', 'youtube.com', 'linkedin.com',
'apple.com', 'netflix.com', 'adobe.com', 'salesforce.com', 'oracle.com',
'ibm.com', 'cisco.com', 'vmware.com', 'redhat.com', 'ubuntu.com',
'docker.com', 'kubernetes.io', 'python.org', 'nodejs.org', 'golang.org'
]
# Suspicious/malicious domains (for testing detection rules)
MALICIOUS_DOMAINS = [
'badactor.com', 'malware-c2.net', 'phishing-site.org', 'exploit-kit.ru',
'trojan-command.xyz', 'ransomware-payment.onion', 'credential-harvest.tk',
'fake-bank-login.ml', 'virus-download.ga', 'spam-relay.cf'
]
# Common TLDs for DGA generation
DGA_TLDS = ['.com', '.net', '.org', '.info', '.biz', '.tk', '.ml', '.ga', '.cf', '.xyz']
# DNS query types
QUERY_TYPES = ['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'PTR', 'SOA', 'SRV']
# Response codes
RESPONSE_CODES = {
'NOERROR': 0,
'FORMERR': 1,
'SERVFAIL': 2,
'NXDOMAIN': 3,
'NOTIMP': 4,
'REFUSED': 5
}
# Internal DNS servers
DNS_SERVERS = ['192.168.1.1', '10.0.0.1', '172.16.1.1', '8.8.8.8', '1.1.1.1']
# Client IP ranges
CLIENT_NETWORKS = [
'192.168.1.', '192.168.0.', '10.0.0.', '10.0.1.',
'172.16.1.', '172.16.0.', '172.17.0.'
]
def generate_client_ip():
"""Generate internal client IP"""
network = random.choice(CLIENT_NETWORKS)
return f"{network}{random.randint(1, 254)}"
def generate_dga_domain():
"""Generate Domain Generation Algorithm (DGA) style domain"""
# Common DGA patterns
patterns = [
# Random string + TLD
lambda: ''.join(random.choices(string.ascii_lowercase, k=random.randint(8, 16))) + random.choice(DGA_TLDS),
# Dictionary words + numbers
lambda: random.choice(['secure', 'update', 'service', 'system', 'admin']) + str(random.randint(100, 999)) + random.choice(DGA_TLDS),
# Mixed alphanumeric
lambda: ''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(6, 12))) + random.choice(DGA_TLDS),
# Date-based (common in malware)
lambda: datetime.datetime.now().strftime('%Y%m%d') + ''.join(random.choices(string.ascii_lowercase, k=4)) + random.choice(DGA_TLDS)
]
return random.choice(patterns)()
def generate_suspicious_domain():
"""Generate suspicious domain patterns"""
patterns = [
# Typosquatting
lambda: random.choice(['googIe', 'microsft', 'amazom', 'facebok', 'twiter']) + '.com',
# Homograph attacks
lambda: random.choice(['gооgle', 'microѕoft', 'аmazon']) + '.com', # Using Cyrillic characters
# Long subdomains (common in phishing)
lambda: f"secure-login-{random.choice(LEGITIMATE_DOMAINS.copy())}.{random.choice(['tk', 'ml', 'ga'])}",
# Known malicious patterns
lambda: random.choice(MALICIOUS_DOMAINS),
# DGA domains
generate_dga_domain
]
return random.choice(patterns)()
def generate_bind_query_log():
"""Generate BIND DNS query log entry"""
timestamp = datetime.datetime.now().strftime('%d-%b-%Y %H:%M:%S.%f')[:-3]
client_ip = generate_client_ip()
dns_server = random.choice(DNS_SERVERS)
query_type = random.choice(QUERY_TYPES)
# Determine if this should be suspicious (20% chance)
if random.random() < 0.2:
domain = generate_suspicious_domain()
# Suspicious queries more likely to fail
response_code = random.choices(
list(RESPONSE_CODES.keys()),
weights=[60, 5, 10, 20, 3, 2],
k=1
)[0]
else:
domain = random.choice(LEGITIMATE_DOMAINS)
# Normal queries usually succeed
response_code = random.choices(
list(RESPONSE_CODES.keys()),
weights=[95, 1, 2, 1, 0.5, 0.5],
k=1
)[0]
query_id = random.randint(1, 65535)
# BIND query log format
log_entry = f"{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query: {domain} IN {query_type} + ({dns_server})"
# Add response if not just query
if random.choice([True, False]):
response_time = random.randint(1, 500)
log_entry += f"\n{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query response: {domain} IN {query_type} {response_code} {response_time}ms"
return log_entry
def generate_syslog_dns():
"""Generate syslog format DNS log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
hostname = random.choice(['dns-01', 'dns-02', 'resolver'])
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
# Determine domain type
if random.random() < 0.15: # 15% suspicious
domain = generate_suspicious_domain()
if random.random() < 0.7: # Suspicious domains often blocked
action = 'BLOCKED'
else:
action = 'ALLOWED'
else:
domain = random.choice(LEGITIMATE_DOMAINS)
action = 'ALLOWED'
response_code = list(RESPONSE_CODES.keys())[0] if action == 'ALLOWED' else random.choice(['NXDOMAIN', 'REFUSED'])
query_time = random.randint(1, 200)
# Syslog DNS format
log_entry = f"{timestamp} {hostname} named[{random.randint(1000, 9999)}]: client {client_ip}: query {domain} {query_type} {action} ({response_code}) {query_time}ms"
return log_entry
def generate_windows_dns_log():
"""Generate Windows DNS server log entry"""
timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p')
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
if random.random() < 0.18: # 18% suspicious
domain = generate_suspicious_domain()
else:
domain = random.choice(LEGITIMATE_DOMAINS)
# Windows DNS log fields: Date, Time, Thread ID, Context, Internal packet identifier, UDP/TCP, Send/Recv, Remote IP, Xid, Query Type, Query Name, Result Code
thread_id = f"{random.randint(100, 999):03X}"
context = random.choice(['PACKET', 'UPDATE', 'NOTIFY'])
packet_id = f"{random.randint(0, 0xFFFF):04X}"
protocol = random.choice(['UDP', 'TCP'])
direction = random.choice(['Snd', 'Rcv'])
xid = f"{random.randint(0, 0xFFFF):04X}"
result_code = random.choice(['NOERROR', 'NXDOMAIN', 'SERVFAIL'])
log_entry = f"{timestamp} {thread_id} {context} {packet_id} {protocol} {direction} {client_ip} {xid} Q [{query_type}] {domain} {result_code}"
return log_entry
def generate_pi_hole_log():
"""Generate Pi-hole DNS log entry"""
timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S')
client_ip = generate_client_ip()
query_type = random.choice(QUERY_TYPES)
# Pi-hole blocks more aggressively
if random.random() < 0.3: # 30% blocked
domain = generate_suspicious_domain()
action = 'blocked'
status = 'gravity' # Pi-hole's blocklist
else:
domain = random.choice(LEGITIMATE_DOMAINS)
action = 'reply'
status = random.choice(['NODATA', 'NXDOMAIN', '192.168.1.100']) # Reply with IP or negative response
# Pi-hole log format
log_entry = f"{timestamp} dnsmasq[{random.randint(1000, 9999)}]: {action} {domain} is {status} (client {client_ip})"
return log_entry
def main():
log_file = Path("/var/log/app/dns_queries.log")
log_file.parent.mkdir(parents=True, exist_ok=True)
print("Starting DNS query log generator...")
while True:
try:
# Generate different DNS log formats
log_format = random.choices(
['bind', 'syslog', 'windows', 'pihole'],
weights=[30, 25, 25, 20],
k=1
)[0]
if log_format == 'bind':
log_entry = generate_bind_query_log()
elif log_format == 'syslog':
log_entry = generate_syslog_dns()
elif log_format == 'windows':
log_entry = generate_windows_dns_log()
else:
log_entry = generate_pi_hole_log()
with open(log_file, "a") as f:
f.write(log_entry + "\n")
print(f"Generated {log_format} DNS log")
# Random delay between 1-10 seconds (DNS queries are frequent)
time.sleep(random.uniform(1, 10))
except KeyboardInterrupt:
print("Stopping DNS query log generator...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(5)
if __name__ == "__main__":
main()