网络安全-如何设计一个安全的API(安全角度)

目录

  • API安全概述
  • 设计一个安全的API
    • 一个基本的API
      • 主要代码
      • 调用
      • API的一些问题
    • BasicAuth
      • 认证流程
      • 主要代码
      • 问题
    • API Key
      • 流程
      • 主要代码
      • 问题
    • Bearer auth/Token auth
      • 流程
    • Digest Auth
      • 流程
      • 主要代码
      • 问题
    • JWT Token
      • 流程
      • 代码
      • 问题
    • Hmac
      • 流程
      • 主要代码
      • 问题
    • OAuth
    • 比较
    • 自定义请求签名
      • 身份认证&密钥加密
      • 防重放
      • 请求时效性
      • 请求签名算法设计
      • 代码
  • 攻击与防御
    • SQL注入
    • 敏感信息泄露
    • 越权攻击
    • 重放攻击
  • 全部代码
  • 参考


API安全概述

利用API可进行以下常见的攻击:

  • 注入攻击(SQL注入、命令注入、XSS等)
  • DOS/DDOS攻击
  • SSRF
  • 未授权/水平(垂直)越权
  • 敏感数据泄露
  • 中间人攻击
  • 更改请求方法调用
  • 并发攻击
  • 重放攻击
  • 数据篡改和伪造

有以下常见的防御方式:

  • 资源请求限制,通过限频等手段来解决DOS、DDOS攻击
  • 线程加锁来解决并发攻击
  • 权限控制,通过ABAC、RBAC等方式解决越权攻击
  • 敏感信息防泄露,通过分类分级引擎,数据库加密存储等方式来解决敏感信息泄露
  • 防重放,通过API认证解决重放攻击
  • 加密,例如HTTPS来解决中间人攻击
  • 安全产品,例如API网关、WAF等来解决大部分攻击

当然,有些还是需要API后端代码来进行防御,例如命令注入、SSRF等。

本文以API身份认证为主要内容,浅谈各种认证的使用场景与优缺点,同时穿插部分攻击与防御。

设计一个安全的API

一个基本的API

主要代码

import uuid
import re
import traceback

from flask import Flask, request, jsonify
from mysql import MysqlCli
from log import log
from setting import *

app = Flask(__name__)
log.set_file()

# 验证username
def validate_username(username:str)->bool:
    if len(username) > 20:
        return False
    return True

# 验证手机号
def validate_phone_number(phone_number:str)->bool:
    # 使用正则表达式检查手机号格式
    pattern = re.compile(r'^1[3456789]\d{9}$')
    if re.match(pattern, phone_number):
        return True
    else:
        return False

# 管理员注册用户接口
@app.route('/api/v1.0/admin/add_user', methods=['POST'])
def add_user():
    resp = {
        "requestid": uuid.uuid4()
    }
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        phone_number = data.get('phone_number')
        if phone_number is None:
            resp['error'] = 'phone_number is required'
            log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
            return jsonify(resp),400

        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        try:
            cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
            cli.insert_one("users",
                {
                    "username": username,
                    "phone_number": phone_number
                }
            )
            cli.close()
            resp['message'] = f'success to add user:{username}!'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        except Exception:
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
            return jsonify(resp),500
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp),400

# 用户获取信息接口
@app.route('/api/v1.0/get_user_info', methods=['POST'])
def get_user_info():
    resp = {
        "requestid": uuid.uuid4()
    }
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400

        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        try:
            cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
            sql = f"select * from users where username = '{username}' limit 1"
            user = cli.select_all(sql)
            resp['message'] = f'success to get user:{user}.'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        except Exception:
            resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),500
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp),400


if __name__ == '__main__':
    app.run()

可以看到共有两个API,管理员注册用户接口、用户获取信息接口,拥有以下功能或安全措施:

  • 版本控制
  • 日志记录
  • 请求方法校验
  • 请求数据校验

调用

在这里插入图片描述
在这里插入图片描述

API的一些问题

  • 没有身份认证,只要有人知道api地址、方法、参数就能调用
  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

BasicAuth

认证流程

  1. 客户端发送请求头Authorization为Basic username:password(base64编码)的数据包
  2. 服务端对请求头Authorization判断,解码后从数据库查询判断账号密码是否正确

主要代码

def basic_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        resp = {
            "requestid": uuid.uuid4()
        }
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Basic '):
            resp['error'] = 'basic auth is required'
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp), 400
        else:
            try:
                encoded_credentials = auth_header.split(' ')[1]
                decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')
                username, password = decoded_credentials.split(':')
                if not check_basic_auth(username, password):
                    resp['error'] = 'basic auth failed,check your username or password is right'
                    log.logger.error(f"url:{request.url},resp:{resp}")
                    return jsonify(resp), 401
            except Exception:
                resp['error'] = f'basic auth failed,err: {traceback.format_exc()}'
                log.logger.error(f"url:{request.url},resp:{resp}")
                return jsonify(resp), 500

        return f(*args, **kwargs)

    return decorated_function
# 管理员注册用户接口 v2.0 增加密码
@app.route('/api/v2.0/admin/basic_auth/add_user', methods=['POST'])
@basic_auth
def add_user_basic_auth():
    resp = {
        "requestid": uuid.uuid4()
    }
    # 请求是json格式
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        # username检查
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        # phone_number检查
        phone_number = data.get('phone_number')
        if phone_number is None:
            resp['error'] = 'phone_number is required'
            log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
            return jsonify(resp),400
        # 插入数据库
        try:
            cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
            cli.insert_one("users",
                {
                    "username": username,
                    "phone_number": phone_number
                }
            )
            pwd = generate_random_password(secrets.choice(range(8, 17)))
            cli.insert_one("passwords", {
                "username": username,
                "password": pwd
            })
            cli.close()
            resp['message'] = f'success to add user:{username},password {pwd},remember it!'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
        # 异常返回
        except Exception:
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}",500
            return jsonify(resp)
        return jsonify(resp)
    # 请求不是json格式
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp),400

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

问题

  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

引入问题:

  1. 请求的密码进行base64编码,容易获得并解码获得明文
  2. 数据库密码明文存储
  3. 数据库sql语句执行没有按事务处理,可能用户插入到数据库但密码没有入库

API Key

流程

  1. 客户端发送请求时通过query string、请求头(X-API-Key或其他自定义请求头)、Cookie中携带apikey
  2. 服务端根据约定的方式获取后查询数据库判断是否存在

主要代码

def api_key_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        resp = {
            "requestid": uuid.uuid4()
        }
        # api key检查
        if "api_key" not in request.headers:
            resp['error'] = 'api_key is required in headers'
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp), 400
        if not check_api_key(request.headers["api_key"]):
            resp['error'] = f'api_key {request.headers["api_key"]} is invalid'
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp), 401
        return f(*args, **kwargs)

    return decorated_function
# 管理员注册用户接口 v2.0 增加api token
@app.route('/api/v2.0/admin/api_key/add_user', methods=['POST'])
@api_key_auth
def add_user_api_key():
    resp = {
        "requestid": uuid.uuid4()
    }
    # request是json
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        phone_number = data.get('phone_number')
        if phone_number is None:
            resp['error'] = 'phone_number is required'
            log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
            return jsonify(resp),400
        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        try:
            cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
            cli.insert_one("users",
                {
                    "username": username,
                    "phone_number": phone_number
                }
            )
            key = secrets.token_hex(16)
            cli.insert_one("keys",
                           {
                               "username": username,
                               "key": key
                           })
            cli.close()
            resp['message'] =f'success to add user:{username},key {key},remember it!'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
        except Exception:
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
            return jsonify(resp),500
        return jsonify(resp)
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp),400

博主这里实现时,添加了自定义请求头api_key在这里插入图片描述

问题

  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

引入问题:

  1. API key明文发送,容器抓包获取
  2. API key明文存储
  3. 数据库sql语句执行没有按事务处理,可能用户插入到数据库但api key没有入库

Bearer auth/Token auth

流程

  1. 请求端通过某种认证机制(比如用户名密码登录、OAuth 认证等)获取令牌。 在发起 HTTP 请求时,客户端将这个令牌添加到Authorization 请求头中,格式为 “Bearer token”。
  2. 服务器接收到请求后,会验证这个令牌的有效性,如果有效则允许请求继续处理,否则拒绝访问。

使用 Bearer authentication 的优势在于令牌本身可以包含更多的信息、具有较长的有效期,并且不需要在服务器端保存会话状态,这样可以减轻服务器负担并提高安全性。

这里就不实现了,后面通过jwt token,算是实现其中的一种。

Digest Auth

流程

  1. 客户端发送一个未经认证的请求给服务器。 服务器返回一个 401 Unauthorized响应,有一个响应头WWW-Authenticate,其中包含一个随机数(nonce)和其他认证需要的信息。
  2. 客户端收到 401响应后,会向用户提示输入用户名和密码,然后根据特定的算法(通常是 MD5)对用户名、密码、随机数(nonce)、HTTP 方法和请求的URI 进行摘要计算
  3. 客户端将计算出的摘要放在 Authorization 请求头中发送给服务器。
  4. 服务器收到请求后,会根据事先约定好的算法再次计算摘要,如果两个摘要匹配,则验证通过,否则拒绝访问。

涉及的几个常见参数如下:

  • realm,必选。是一个保护空间的名称,用于向用户表明请求的资源属于哪个保护空间。它通常用于表示一组受保护的资源,用于构造摘要字符串。
  • nonce,必选。是一个唯一的字符串,401时由服务器生成并发送给客户端。它用于防止重放攻击(replay attack)。每次认证请求都会使用一个新的 nonce 值,使得每次请求的摘要都是不同的,从而提高了安全性。
  • qop(Quality of Protection) ,必选。可以是 “auth” 或 “auth-int”。auth 代表身份验证,而 auth-int 代表身份验证和消息完整性保护。Qop 的存在使得摘要认证更加灵活和安全。
  • algorithm,可选,默认MD5。指定了用于计算摘要的哈希算法,通常是 MD5。服务器在 WWW-Authenticate 响应头中指定,客户端按照这个算法进行摘要计算。还有MD5-sess、SHA、SHA-256、SHA-512等。
  • nc,可选,是一个计数器,用于跟踪特定 nonce 值的使用次数。每次客户端发送请求时,Nc 都会递增,帮助防止重放攻击。
  • cnonce(Client Nonce) ,可选。是客户端生成的随机字符串,用于与服务器的 nonce 一起使用,以增加请求的独特性和安全性。
  • opaque,可选。是服务器生成的字符串,客户端在后续请求中必须原样返回。它用来保持服务器状态或防止某些类型的攻击。
  • charset,可选。默认utf-8,编码方式。
  • userhash,可选。默认false。服务端返回的是否支持username哈希。

主要代码

# 将字符串保存到Redis中,并设置过期时间
def save_nonce_with_expiry(key, value, expiry_seconds):
    """
    :param key: 键
    :param value: 值
    :param expiry_seconds: 过期时间(秒)
    """
    # 连接Redis数据库
    redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD, port=REDIS_PORT, db=REDIS_DB)
    redis_client.setex(key, expiry_seconds, value)

def check_nonce(key):
    """
    检查字符串是否存在于Redis中
    :param key: 键
    :return: 布尔值,表示键是否存在
    """
    # 连接Redis数据库
    redis_client = redis.StrictRedis(host=REDIS_HOST,password=REDIS_PASSWORD ,port=REDIS_PORT, db=REDIS_DB)
    return redis_client.exists(key)

# 校验username 返回密码
def check_username(username):
    try:
        cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
        sql = f"select * from passwords where username = '{username}' limit 1"
        user = cli.select_one(sql)
        return user['password'] if user else ''
    except Exception:
        log.logger.error(f"check_basic_auth failed,error:{traceback.format_exc()}")
        return False

# 校验response
def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop):
    try:
        log.logger.info(f"check_response ,response:{response},realm:{realm},username:{username},"
                        f"password:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc},"
                        f"cnonce:{cnonce},qop:{qop}")
        # 校验nonce
        if not check_nonce(req_nonce):
            log.logger.error(f"check_signature failed,error:{req_nonce} not exist!")
            return False
        ha1=hashlib.md5(f"{username}:{realm}:{password}".encode()).hexdigest()
        ha2=hashlib.md5(f"{method}:{uri}".encode()).hexdigest()
        return response == hashlib.md5(f"{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}".encode()).hexdigest()
    except Exception:
        log.logger.error(f"check_responce failed,error:{traceback.format_exc()}")
        return False

# 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口
def add_user():
    resp = {
        "requestid": uuid.uuid4()
    }
    log.logger.info(request.headers)

    # request是json
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        phone_number = data.get('phone_number')
        if phone_number is None:
            resp['error'] = 'phone_number is required'
            log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
            return jsonify(resp)

        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        try:
            cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
            cli.insert_one("users",
                           {
                               "username": username,
                               "phone_number": phone_number
                           }
                           )
            cli.close()
            resp['message'] = f'success to add user:{username}!'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
        except Exception:
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
        return jsonify(resp)
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp)

# 管理员注册用户接口 v2.0增加digest算法
@app.route('/api/v2.0/admin/digest_auth/add_user', methods=['POST'])
@digest_auth
def add_user_digest():
    return add_user()

在这里插入图片描述
输入账号密码后登录,发起请求
在这里插入图片描述

问题

  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

优点:

  1. 可以防止重放攻击

引入问题:

  1. 发送两个请求,更加消耗资源
  2. 需要存储nonce,这里有设置过期时间
  3. 实现比较复杂

JWT Token

流程

通常由三个部分组成:header、payload 和 signature。

  • header:包含两个部分:令牌类型(即 “JWT”)和所使用的签名算法(如 HMAC SHA256 或 RSA)。
  • payload:包含声明(claims)。声明是关于实体(通常是用户)和其他数据的声明。
    • iss(Issuer):该声明标识了 JWT 的发行者。
    • sub(Subject):该声明标识了 JWT 的主题,即所描述的实体。
    • aud(Audience):该声明标识了 JWT 的受众,即预期的接收者。
    • exp(Expiration Time):该声明指定了 JWT 的过期时间,在此时间之后,JWT 将被认为是无效的。
    • nbf(Not Before):该声明指定了 JWT 的生效时间,在此时间之前,JWT 将被认为是无效的。
    • iat(Issued At):该声明指定了 JWT 的签发时间。
    • jti(JWT ID):该声明为 JWT 提供了一个唯一标识符。
  • signature:为了确保 JWT 未被篡改,需要对编码后的 header 和 payload 使用指定的签名算法和一个密钥进行签名。
    在这里插入图片描述

代码

def jwt_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        resp = {
            "requestid": uuid.uuid4()
        }
        auth_header = request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith("Bearer "):
            resp["message"] = "no Authorization header or invalid format"
            return jsonify(resp), 400
        try:
            token = auth_header.split(' ')[1]
            jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
        except jwt.ExpiredSignatureError:
            resp["error"] = "token has expired"
            log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
            return jsonify(resp), 401
        except jwt.InvalidTokenError:
            resp["message"] = f"invalid token {token}"
            log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
            return jsonify(resp), 401
        except Exception as e:
            resp["message"] = f"interal error {e}"
            log.logger.error(f"url:{request.url},token:{token},resp:{resp}")
            return jsonify(resp), 500
        return f(*args, **kwargs)

    return decorated_function

# 用户登录获取jwt token接口
@app.route('/api/v2.0/admin/jwt/login', methods=['POST'])
def get_jwt_token():
    resp = {
        "requestid": uuid.uuid4()
    }
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        password = data.get('password')
        if username is None or password is None:
            resp['error'] = 'username or password is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400

        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),400
        try:
            cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
            sql = f"select * from passwords where username = '{username}' limit 1"
            user = cli.select_one(sql)
            if not user:
                resp['error'] = 'username not found'
                log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
                return jsonify(resp), 400
            if user['password'] != password:
                resp['error'] = 'password is not right'
                log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
                return jsonify(resp), 401
            payload = {
                "iss": "lady_killer9",
                "exp": datetime.now() + timedelta(seconds=5*60),
                "jti": str(uuid.uuid4())
            }
            resp['message'] = f'success to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithm="HS256")}'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp), 200
        except Exception:
            resp['error'] = f'failed to get user, error:{traceback.format_exc()}'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp),500
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp),400

# 管理员注册用户接口 v2.0增加jwt sha256算法
@app.route('/api/v2.0/admin/jwt/add_user', methods=['POST'])
@jwt_auth
def add_user_jwt():
    return add_user()

# 用户获取信息接口 v2.0 增加jwt sha256算法
@app.route('/api/v2.0/jwt/get_user_info', methods=['POST'])
@jwt_auth
def get_user_info_jwt():
    return get_user_info()

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

问题

  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

优点:

  1. token可以设置时间限制,过期后不可调用
  2. payload的aud等参数可以用于鉴权
  3. 可添加自定义payload,方便做其他的功能

引入问题:

  1. jwt token容易破解

bejson jwt在线解密在这里插入图片描述

Hmac

流程

在这里插入图片描述
和Digest Auth差不多,可以由客户端生成随机数,这样请求一次即可,随机数不可重复。

主要代码

# 验证摘要
def check_signature(signture:str,username:str,nonce:int,data:dict):
    try:
        cli = MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)
        sql = f"select * from `secrets` where `username` = '{username}' limit 1"
        secret = cli.select_one(sql)
        cli.close()
        if secret:
            server_signature = hmac.new(str(secret['secret']).encode('utf-8'), json.dumps(data).encode('utf-8'), hashlib.sha256).hexdigest()
            log.logger.info(f"secret:{str(secret['secret'])},data:{data},server_signature:{server_signature}")
            return  server_signature == signture
        log.logger.error(f"check_signature failed,error:{username} {secret}")
        return False
    except Exception:
        log.logger.error(f"check_signature failed,error:{traceback.format_exc()}")
        return False

def hmac_auth(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        resp = {
            "requestid": uuid.uuid4()
        }
        # query检查 nonce username
        nonce = request.args.get('nonce', type=int)
        username = request.args.get('username')
        if nonce is None or username is None:
            resp['error'] = 'username or nonce not found in query string'
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp)
        # 随机数验证
        if check_nonce(nonce):
            log.logger.error(f"check_signature failed,error:{nonce} is in database")
            resp['error'] = f"check_signature failed,error:{nonce} is in database"
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp)
        save_nonce_with_expiry(nonce,1,MIN*60)
        # signature检查
        if "Signature" not in request.headers:
            resp['error'] = 'signature is required in headers'
            log.logger.error(f"url:{request.url},resp:{resp}")
            return jsonify(resp)
        # request是json
        if request.is_json:
            data = request.get_json()
            if not check_signature(request.headers['Signature'], username, nonce, data):
                resp['error'] = f'signature {request.headers["Signature"]} is invalid'
                log.logger.error(f"url:{request.url},resp:{resp}")
                return jsonify(resp),401
        else:
            resp['error'] = "Invalid JSON format in request"
            return jsonify(resp),400
        return f(*args, **kwargs)

    return decorated_function
# 管理员注册用户接口 v2.0增加hmac sha256算法
@app.route('/api/v2.0/admin/hmac/add_user', methods=['POST'])
@hmac_auth
def add_user_hmac():
    resp = {
        "requestid": uuid.uuid4()
    }
    # request是json
    if request.is_json:
        data = request.get_json()
        username = data.get('username')
        if username is None:
            resp['error'] = 'username is required'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        phone_number = data.get('phone_number')
        if phone_number is None:
            resp['error'] = 'phone_number is required'
            log.logger.error(f"url:{request.url},params:{phone_number},resp:{resp}")
            return jsonify(resp)

        if not validate_username(username):
            resp['error'] = 'username length should not exceed 10'
            log.logger.error(f"url:{request.url},params:{username},resp:{resp}")
            return jsonify(resp)
        try:
            cli = MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)
            cli.insert_one("users",
                {
                    "username": username,
                    "phone_number": phone_number
                }
            )
            secret = secrets.token_hex(16)
            cli.insert_one("secrets",
                           {
                               "username": username,
                               "secret": secret
                           })
            cli.close()
            resp['message'] =f'success to add user:{username},secret {secret},remember it!'
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
        except Exception:
            log.logger.info(f"url:{request.url},params:{username},resp:{resp}")
            resp['error'] = f"failed to insert to mysql:{traceback.format_exc()}"
        return jsonify(resp)
    else:
        resp['error'] = "Invalid JSON format in request"
        return jsonify(resp)

# 用户获取信息接口 v2.0 增加hmac
@app.route('/api/v2.0/hmac/get_user_info', methods=['POST'])
@hmac_auth
def get_user_info_hmac():
    return get_user_info()

w
在这里插入图片描述

问题

  • 没有调用次数限制,可能被Dos或DDos攻击,造成服务器压力
  • 没有防止数据篡改或伪造,被抓包后可能被篡改或伪造数据
  • 没有对重放攻击的防御,抓包后可能重放包
  • 越权问题,管理员接口任何人都可以调用
  • SQL注入,没有做参数化查询等防止SQL操作,可能被拖库

引入问题:

  1. 需要存储nonce,这里有设置过期时间

OAuth

内容太多,留坑先不看了

比较

比较项Basic AuthAPI keyBearer Auth/Token authDigest AuthHmac AuthJWT
身份认证
密钥加密××××
服务端存储×(token可以不存)
防重放××××(token失效前可重放)
时效性×××
自定义××

通过以上比较,如果设计一个签名具有以下优点会比较好:

  • 防重放。通过随机数防止请求重放,随机数由客户端计算可以减轻服务端压力。
  • 密钥加密。使用密钥计算签名。
  • 服务端存储。存储随机数,设置过期时间。
  • 时效性。添加时间戳,过期后请求失败。
  • 自定义。自定义请求签名当然可以自定义一些东西,用于鉴权等。

自定义请求签名

在这里插入图片描述

身份认证&密钥加密

请求需要验证身份,就需要有账密,这里就用SecretId、SecretKey,其中SecretKey用于进行签名的计算。

防重放

为了防重放,生成一个随机数Nonce,Nonce唯一,这样服务端收到携带该Nonce的请求后,还发送带该Nonce的请求,就拒绝掉。因此Nonce需要服务器保存,同时为了防止篡改,签名时需要。
那么问题来了,Nonce需要服务器保存,不能一直保存吧,随着时间推移,存储成本会越来越高,因此需要时间限制。

请求时效性

请求应该具有时效性,这里使用unix时间戳Timestamp。规定在1分钟内请求有效,这样Nonce保存时间在1分钟即可。

请求签名算法设计

密钥加密选择SHA-256算法,当然算法可以当做参数,由客户端指定,就用Algorithm吧
url类似:xxx?SecretId=xxx&Nonce=xxx&Timestamp=xxx&Algorithm=xxx
添加一个自定义请求头Signature,放上签名,待签名字符串规定格式如下

{Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)}

  • HTTPMethod:HTTP请求方法,例如POST
  • HTTPBody:HTTP请求体,例如{“name”:“lady_killer9”}

当然,还可以添加更多到待签名字符串

客户端

  • 生成一个随机字符串(不包含:)
  • 对请求头和请求体做base64编码,并按照格式拼接
  • 使用算法和SecretKey进行签名
  • 生成时间戳,发送请求

服务端

  • 从url获取时间戳Timestamp,校验是否在时间内
  • 从url获取随机字符串Nonce,校验是否在数据库中
  • 对请求头和请求体做base64编码
  • 从url获取SecretId后从数据库查询对应的SecretKey
  • 使用SecretKey和Algorithm算法对拼接同样格式字符串进行签名得到ServerSignature,比较Signature是否与请求头Signature的值一致

代码

def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64):
    format_str = f"{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}"
    server_signature = hashlib.sha256(format_str.encode()).hexdigest()
    log.logger.info(format_str)
    log.logger.info(server_signature)
    return signature == server_signature


def require_signature(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        resp = {
            "requestid": uuid.uuid4()
        }
        secret_id = request.args.get('SecretId', type=str)
        nonce = request.args.get('Nonce', type=str)
        timestamp = request.args.get('Timestamp', type=int)
        algorithm = request.args.get('Algorithm', type=str)
        if secret_id is None:
            resp['error'] = 'No SecretId in query string'
            return jsonify(resp),400
        if nonce is None:
            resp['error'] = 'No Nonce in query string'
            return jsonify(resp),400
        if timestamp is None:
            resp['error'] = 'No Timestamp in query string'
            return jsonify(resp),400
        if algorithm is None:
            resp['error'] = 'No Algorithm in query string'
            return jsonify(resp),400
        if algorithm not in ["sha256"]:
            resp['error'] = f'can not support {algorithm}'
            return jsonify(resp), 400
        if (datetime.now() - timedelta(minutes=MIN)).timestamp() > timestamp:
            resp['error'] = 'Request is send before 5 mins ago, check Timestamp'
            return jsonify(resp), 400
        if ':' in nonce:
            resp['error'] = 'can not contain : in Nonce, generate a new one'
            return jsonify(resp), 400
        cli = redis.Redis.from_url(REDIS_URL)
        if cli.exists(nonce):
            resp['error'] = 'can not request with same Nonce'
            return jsonify(resp), 400
        else:
            cli.setex(nonce, MIN*60, 1)

        signature = request.headers.get('Signature')
        if not signature:
            resp['error'] = 'no Signature in headers'
            return jsonify(resp), 400

        # 解析 Authorization header,验证签名
        body_base64 = bytes.decode(b64encode(json.dumps(request.get_json(),ensure_ascii=False).encode()))
        if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64):
            resp['error'] = 'invalid signature'
            return jsonify(resp), 401

        return f(*args, **kwargs)

    return decorated_function

攻击与防御

SQL注入

例如,在v1.0的get_user_info接口,存在将用户输入拼接到sql的漏洞,可以被SQL注入。
在这里插入图片描述

抓包如下:
在这里插入图片描述
防御方面可以通过预编译等方式来解决
v3.0已解决
在这里插入图片描述

敏感信息泄露

例如,在v1.0的get_user_info接口,用户手机号被完整返回,没有打码。
在这里插入图片描述
防御上可以通过加*打码或MFA等来解决
v3.0已解决
在这里插入图片描述

越权攻击

例如,在v2.0的api key认证接口,任意用户都能查询admin用户的信息,只需要知道username即可
在这里插入图片描述
防御方面可以通过添加RBAC等鉴权来解决

重放攻击

例如,v2.0的api key认证接口,设置burpsuite代理,放到重放器Reapter,发送多少次都可以。
在这里插入图片描述
v3.0通过自定义请求签名就解决了此类问题。
在这里插入图片描述

全部代码

Github-api_history

参考

API-Security Owasp top 10

API 鉴权都有哪些分类,这些重点不要错过
best-practices-for-authentication-and-authorization-for-rest-apis/

pyjwt
https://github.com/ticarpi/jwt_tool

rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage
rfc7616-HTTP Digest Access Authentication
rfc2617-HTTP Authentication: Basic and Digest Access Authentication
rfc7519-JSON web Token (JWT)

Github-jwt_tool

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/734483.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot集成Minio插件快速入门

1 Minio介绍 MinIO 是一个基于 Apache License v2.0 开源协议的对象存储服务。它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小&…

Vue38 安装脚手架 vue-cli ,并使用脚手架创建项目

安装脚手架 vue-cli ,并使用脚手架创建项目 第一步 安装脚手架 npm config set registry https:\\[registry.npmmirror.com // 切换淘宝镜像 npm install -g vue/cli第二步 切换到创建项目的目录,创建项目 cd XXX vue create XXX第三步 启动项目 npm…

FreeCAD中智能指针分析

实现原理 FreeCAD中有两套智能指针,一个是OCC的智能指针handle,另一个是自己定义的智能指针Reference,两种智能指针都是通过引用计数方式管理指针。 1.1 OCC智能指针handle OCC在基础类包中定义了一个模板类handle,该类包含一个私…

购买服务器,并安装宝塔

前言: 我们在开发项目时,总会遇到一个问题,就是将我们开发好的项目上传的公网中。对于中小型的项目,我们可以通过购买服务器进行项目的上线。 我们的项目一般是部署在Linux环境中。如果你不是专业的运维人员,可能对于…

力扣1901.寻找峰值II

力扣1901.寻找峰值II 二分每一行 并用函数找出每一行中最大值的下标若最大值比其下面相邻的元素大 则上方一定存在峰值若最大值比其下面相邻的元素小 则下方一定存在峰值 class Solution {int indexmax(vector<int> &a){return max_element(a.begin(),a.end()) - …

解决js打开新页面百度网盘显示不存在方法:啊哦,你所访问的页面不存在了。

用js打开新页面open或window.location.href打开百度网盘后都显示&#xff1a;啊哦&#xff0c;你所访问的页面不存在了。 window.open(baidu_url); window.location.href baidu_url;在浏览器上&#xff0c;回车后网盘资源是可以打开的&#xff0c;刷新也是打开的。这是很奇怪…

C/C++ vector模拟实现

模拟实现&#xff1a; 框架 namespace yx {template<class T>class vector{public:typedef T* iterator;private:iterator _start;iterator _finish;iterator _end_of_storage;}; } 这里我们声明定义不分离 reverse() 新开一个空间&#xff0c;拷贝数据&#xff0c;然…

ardupilot开发 --- RTSP视频流 篇

我年轻时很穷&#xff0c;努力了几年&#xff0c;终于不再年轻了 0. 一些概念1. Ubuntu搭建RTSP服务器的方式2. 在Ubuntu上搭建RTSP服务器3. 推流4. 拉流、播放5. 借鉴的一些例子6. 其他参考文献 0. 一些概念 RTSP服务、RTSP推流、RTSP拉流&#xff0c;缺一不可&#xff0c;尤其…

平凉特色小吃,味蕾的诱惑之旅

平凉&#xff0c;这座历史悠久的城市&#xff0c;不仅拥有深厚的文化底蕴&#xff0c;更有着让人垂涎欲滴的特色小吃。每一种小吃都承载着当地人的情感与记忆&#xff0c;成为了平凉独特的饮食符号。平凉特色小吃酿皮更是别具风味。爽滑透明的凉皮&#xff0c;配上香辣可口的调…

亿联 AM610 M.2 SSD PCIE 3.0X2 128GB测评

亿联 AM610 M.2 SSD PCIE 3.0X2 128GB测评 厂商&#xff1a;union memory国产固态硬盘SSD。 接口&#xff1a;PCIE 3.0X2 协议&#xff1a;支持NVME 1.协议 固件&#xff1a;固件版本号11.82 读取量&#xff1a;18TB左右 写入量&#xff1a;14TB左右&#xff0c;NAND闪存约被编…

统信UOS 安装二级制版MySQL8.4

统信UOS 安装二级制版MySQL8.4 建立MySQL用户和用户组 sudo groupadd mysqlsudo useradd -r -g mysql -s /bin/false mysql下载MySQL安装包 wget https://cdn.mysql.com//Downloads/MySQL-8.4/mysql-8.4.0-linux-glibc2.28-x86_64.tar.xz解压缩MySQL安装包 sudo tar -xvf m…

如何获取文件对应的路径

有时我们会把脚本文件复制到其他的路径或者电脑文件夹下&#xff0c;如果采用绝对路径的话&#xff0c;会发生找不到改文件&#xff0c;程序就会报错。那么我们如何避免这个问题呢&#xff1f;我们可以采用相对路径的方法。 可以看到&#xff0c;系统的当前路径"D:\python…

37 - 上级经理已离职的公司员工(高频 SQL 50 题基础版)

37 - 上级经理已离职的公司员工 selecte1.employee_id fromEmployees e1 left join Employees e2 on e1.manager_id e2.employee_id wheree2.manager_id is null and e1.manager_id is not null and e1.salary<30000;

【Qt】学习Day1

文章目录 Qt简介创建第一个Qt程序创建过程介绍main函数工程文件头文件控件源文件快捷键按钮控件常用API对象树坐标系 信号和槽自定义信号自定义槽函数触发自定义的信号案例-下课后&#xff0c;老师触发饿了信号&#xff0c;学生响应信号&#xff0c;请客吃饭重载信号连接信号La…

【tomcat】tomcat系统架构以及核心启动流程

对于web后端开发工程师来说&#xff0c;tomcat作为一个应用服务器框架本质上就是一个HTTP服务Servlet容器。研究过spring、spring mvc源码的同学应该了解&#xff0c;spring mvc其实就是基于Servlet规范实现的请求的转发路由、转发处理。而Spring和SpringMVC就是通过web.xml文件…

时序预测 | KAN+Transformer时间序列预测(Python)

预测效果 基本描述 KANTransformer时间序列预测 KAN作为这两年最新提出的机制&#xff0c;目前很少人用&#xff0c;很适合作为时间序列预测的创新点&#xff0c;可以结合常规的网络加上个优化方法做创新。适合功率预测&#xff0c;负荷预测&#xff0c;流量预测&#xff0c;浓…

MSPM0G3507——GPIO例程讲解1——input_capture

函数&#xff1a; 参数&#xff1a; 返回值&#xff1a; 主函数代码&#xff1a; #include "ti_msp_dl_config.h"extern volatile uint32_t interruptVectors[];int main(void) {SYSCFG_DL_init(); //把所有的LED灯和按键初始化了一…

css grid实现九宫格布局

常见的九宫格布局可以使用flex布局实现&#xff0c;但是flex布局有个致命的缺陷&#xff0c;比如3行3列的布局&#xff0c;当第不足3个元素的时候&#xff0c;元素依然是平局平铺的&#xff0c;这样就不满足九宫格的效果&#xff0c;这种情况&#xff0c;使用grid布局可以轻松搞…

对兼容各操作系统的Anki选择题模板的更新——提供更方便的笔记修改功能

2021年当我想做一个兼容各操作系统的Anki选择题模板的时候&#xff0c;到处搜索茧中网&#xff0c;根本找不到相关内容&#xff0c;直到偶然在github上看到Simon Lammer的Anki持久化模块&#xff0c;才算真正实现。现在再在茧中网上搜索兼容各种操作系统的Anki选择题模板&#…

【百问大模型01】GPT4o最新特性介绍

1、GPT4o 最大的特性是对话响应速度很快 端到端能力300ms&#xff1b;之前是语音转成文字&#xff0c;再来理解分析&#xff1b;现在是直接端到端。 1&#xff09;丰富的语音风格 2&#xff09;理解语音内外的内容 3&#xff09;发出非语音的声音 4&#xff09;自然而及时…