#!/usr/bin/env python3 """ DNS query log generator for Splunk security testing Generates realistic DNS query logs with suspicious domain patterns and DGA domains """ import time import random import datetime import string from pathlib import Path # Legitimate domains for normal traffic LEGITIMATE_DOMAINS = [ 'google.com', 'microsoft.com', 'amazon.com', 'facebook.com', 'twitter.com', 'github.com', 'stackoverflow.com', 'wikipedia.org', 'youtube.com', 'linkedin.com', 'apple.com', 'netflix.com', 'adobe.com', 'salesforce.com', 'oracle.com', 'ibm.com', 'cisco.com', 'vmware.com', 'redhat.com', 'ubuntu.com', 'docker.com', 'kubernetes.io', 'python.org', 'nodejs.org', 'golang.org' ] # Suspicious/malicious domains (for testing detection rules) MALICIOUS_DOMAINS = [ 'badactor.com', 'malware-c2.net', 'phishing-site.org', 'exploit-kit.ru', 'trojan-command.xyz', 'ransomware-payment.onion', 'credential-harvest.tk', 'fake-bank-login.ml', 'virus-download.ga', 'spam-relay.cf' ] # Common TLDs for DGA generation DGA_TLDS = ['.com', '.net', '.org', '.info', '.biz', '.tk', '.ml', '.ga', '.cf', '.xyz'] # DNS query types QUERY_TYPES = ['A', 'AAAA', 'CNAME', 'MX', 'TXT', 'NS', 'PTR', 'SOA', 'SRV'] # Response codes RESPONSE_CODES = { 'NOERROR': 0, 'FORMERR': 1, 'SERVFAIL': 2, 'NXDOMAIN': 3, 'NOTIMP': 4, 'REFUSED': 5 } # Internal DNS servers DNS_SERVERS = ['192.168.1.1', '10.0.0.1', '172.16.1.1', '8.8.8.8', '1.1.1.1'] # Client IP ranges CLIENT_NETWORKS = [ '192.168.1.', '192.168.0.', '10.0.0.', '10.0.1.', '172.16.1.', '172.16.0.', '172.17.0.' ] def generate_client_ip(): """Generate internal client IP""" network = random.choice(CLIENT_NETWORKS) return f"{network}{random.randint(1, 254)}" def generate_dga_domain(): """Generate Domain Generation Algorithm (DGA) style domain""" # Common DGA patterns patterns = [ # Random string + TLD lambda: ''.join(random.choices(string.ascii_lowercase, k=random.randint(8, 16))) + random.choice(DGA_TLDS), # Dictionary words + numbers lambda: random.choice(['secure', 'update', 'service', 'system', 'admin']) + str(random.randint(100, 999)) + random.choice(DGA_TLDS), # Mixed alphanumeric lambda: ''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(6, 12))) + random.choice(DGA_TLDS), # Date-based (common in malware) lambda: datetime.datetime.now().strftime('%Y%m%d') + ''.join(random.choices(string.ascii_lowercase, k=4)) + random.choice(DGA_TLDS) ] return random.choice(patterns)() def generate_suspicious_domain(): """Generate suspicious domain patterns""" patterns = [ # Typosquatting lambda: random.choice(['googIe', 'microsft', 'amazom', 'facebok', 'twiter']) + '.com', # Homograph attacks lambda: random.choice(['gооgle', 'microѕoft', 'аmazon']) + '.com', # Using Cyrillic characters # Long subdomains (common in phishing) lambda: f"secure-login-{random.choice(LEGITIMATE_DOMAINS.copy())}.{random.choice(['tk', 'ml', 'ga'])}", # Known malicious patterns lambda: random.choice(MALICIOUS_DOMAINS), # DGA domains generate_dga_domain ] return random.choice(patterns)() def generate_bind_query_log(): """Generate BIND DNS query log entry""" timestamp = datetime.datetime.now().strftime('%d-%b-%Y %H:%M:%S.%f')[:-3] client_ip = generate_client_ip() dns_server = random.choice(DNS_SERVERS) query_type = random.choice(QUERY_TYPES) # Determine if this should be suspicious (20% chance) if random.random() < 0.2: domain = generate_suspicious_domain() # Suspicious queries more likely to fail response_code = random.choices( list(RESPONSE_CODES.keys()), weights=[60, 5, 10, 20, 3, 2], k=1 )[0] else: domain = random.choice(LEGITIMATE_DOMAINS) # Normal queries usually succeed response_code = random.choices( list(RESPONSE_CODES.keys()), weights=[95, 1, 2, 1, 0.5, 0.5], k=1 )[0] query_id = random.randint(1, 65535) # BIND query log format log_entry = f"{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query: {domain} IN {query_type} + ({dns_server})" # Add response if not just query if random.choice([True, False]): response_time = random.randint(1, 500) log_entry += f"\n{timestamp} client {client_ip}#{random.randint(1024, 65535)}: query response: {domain} IN {query_type} {response_code} {response_time}ms" return log_entry def generate_syslog_dns(): """Generate syslog format DNS log entry""" timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S') hostname = random.choice(['dns-01', 'dns-02', 'resolver']) client_ip = generate_client_ip() query_type = random.choice(QUERY_TYPES) # Determine domain type if random.random() < 0.15: # 15% suspicious domain = generate_suspicious_domain() if random.random() < 0.7: # Suspicious domains often blocked action = 'BLOCKED' else: action = 'ALLOWED' else: domain = random.choice(LEGITIMATE_DOMAINS) action = 'ALLOWED' response_code = list(RESPONSE_CODES.keys())[0] if action == 'ALLOWED' else random.choice(['NXDOMAIN', 'REFUSED']) query_time = random.randint(1, 200) # Syslog DNS format log_entry = f"{timestamp} {hostname} named[{random.randint(1000, 9999)}]: client {client_ip}: query {domain} {query_type} {action} ({response_code}) {query_time}ms" return log_entry def generate_windows_dns_log(): """Generate Windows DNS server log entry""" timestamp = datetime.datetime.now().strftime('%m/%d/%Y %I:%M:%S %p') client_ip = generate_client_ip() query_type = random.choice(QUERY_TYPES) if random.random() < 0.18: # 18% suspicious domain = generate_suspicious_domain() else: domain = random.choice(LEGITIMATE_DOMAINS) # Windows DNS log fields: Date, Time, Thread ID, Context, Internal packet identifier, UDP/TCP, Send/Recv, Remote IP, Xid, Query Type, Query Name, Result Code thread_id = f"{random.randint(100, 999):03X}" context = random.choice(['PACKET', 'UPDATE', 'NOTIFY']) packet_id = f"{random.randint(0, 0xFFFF):04X}" protocol = random.choice(['UDP', 'TCP']) direction = random.choice(['Snd', 'Rcv']) xid = f"{random.randint(0, 0xFFFF):04X}" result_code = random.choice(['NOERROR', 'NXDOMAIN', 'SERVFAIL']) log_entry = f"{timestamp} {thread_id} {context} {packet_id} {protocol} {direction} {client_ip} {xid} Q [{query_type}] {domain} {result_code}" return log_entry def generate_pi_hole_log(): """Generate Pi-hole DNS log entry""" timestamp = datetime.datetime.now().strftime('%b %d %H:%M:%S') client_ip = generate_client_ip() query_type = random.choice(QUERY_TYPES) # Pi-hole blocks more aggressively if random.random() < 0.3: # 30% blocked domain = generate_suspicious_domain() action = 'blocked' status = 'gravity' # Pi-hole's blocklist else: domain = random.choice(LEGITIMATE_DOMAINS) action = 'reply' status = random.choice(['NODATA', 'NXDOMAIN', '192.168.1.100']) # Reply with IP or negative response # Pi-hole log format log_entry = f"{timestamp} dnsmasq[{random.randint(1000, 9999)}]: {action} {domain} is {status} (client {client_ip})" return log_entry def main(): log_file = Path("/var/log/app/dns_queries.log") log_file.parent.mkdir(parents=True, exist_ok=True) print("Starting DNS query log generator...") while True: try: # Generate different DNS log formats log_format = random.choices( ['bind', 'syslog', 'windows', 'pihole'], weights=[30, 25, 25, 20], k=1 )[0] if log_format == 'bind': log_entry = generate_bind_query_log() elif log_format == 'syslog': log_entry = generate_syslog_dns() elif log_format == 'windows': log_entry = generate_windows_dns_log() else: log_entry = generate_pi_hole_log() with open(log_file, "a") as f: f.write(log_entry + "\n") print(f"Generated {log_format} DNS log") # Random delay between 1-10 seconds (DNS queries are frequent) time.sleep(random.uniform(1, 10)) except KeyboardInterrupt: print("Stopping DNS query log generator...") break except Exception as e: print(f"Error: {e}") time.sleep(5) if __name__ == "__main__": main()