利用 DynamoDB 和 S3 结合 gzip 压缩,最大化存储玩家数据

前言

一些传统游戏架构中,采用 MySQL 存储玩家存档数据,利用分库分表分散单库单表的存储和性能压力,从而达到支持更多玩家的目的。随着数据量增长,数据表中 varchar 类型已经无法满足游戏中单字段的存储需求,而 blob 字段的应用对于这种架构下改造成本是最低的,因此一些游戏开始在最初设计的时候,数据库表结构就采用了 Blob 字段作为其玩家的游戏任务、道具等数据的存储。

Blob 字段在 MySQL 5.6 / 5.7 中存在 bug(MySQL Bugs: #96466),这个 bug 有概率导致数据库集群崩溃,造成数据丢失。即使在 MySQL 8.0 中,由于引擎本身设计的限制,在单表 20GB 以上,高频的更新就会导致数据库出现性能受限。并且随着表增大,性能问题会越来越明显。

随着当游戏业务爆发时增长的时候,传统关系型数据库在分库分表的时候,需要进行应用改造,同时存在一定的停机维护时间。而且这些扩展完成后,在游戏的夕阳期进行收缩也需要进行应用改造,这无疑对业务开发和基础运维的部门造成了很多额外的工作量。

DynamoDB 在应用到这个场景上是非常适用的。在业务发展任意阶段,都可以实现 0 停机的扩展,自动伸缩的特性。而且这一切对于应用层是完全透明的。同时在日常运维中也可以贴合业务负载进行动态扩缩容,从而进一步降低成本。

亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活动与竞赛等。帮助中国开发者对接世界最前沿技术,观点,和项目,并将中国优秀开发者或技术推荐给全球云社区。如果你还没有关注/收藏,看到这里请一定不要匆匆划过,点这里让它成为你的技术宝库!

概述

本文主要讲述在游戏场景下,根据 DynamoDB 的限制(每个项目都必须小于 400KB),在限制下尽可能存储更多的数据和当存储量超出限制时,扩展存储的最大化利用空间。重点描述如何利用 DynamoDB+S3 保存玩家存档中的大数据量属性,避免数据存在 S3 上后,在数据写入 S3 时,发生读取到 S3 旧存档的情况。同时利用 gzip 压缩减少数据大小,减少 IO 的开销提升性能。

架构图

实战编码

目标

  1. 所有数据保存前都进行 gzip 压缩,读取后都用 gzip 解压。
  2. S3 存储和 DynamoDB 的 binary 字段存储可以自适应。如果用户数据压缩后如果大于指定的值则写入 S3,否则直接保存到当前数据库项目中的字段。
  3. DynamoDB 项目读取的时候,解析解压后的字段,如果字符串以 s3:// 开头,则继续从 S3 中获取数据
  4. 设置 S3 读锁字段,判断当前状态是否正在写入 S3,以阻塞读进程。在每个项目需要写入 S3 前都会设置 read_lock为Ture,S3 写成功后则设置为 False。读取记录后,read_lock 是否为 True,如果是判断被阻塞,进程会等待一段时间后进行重试,直到重试次数超出指定的值。重试超时后,读进程会认为写进程可能由于某种原因导致写永远无法成功,于是会将 read_lock 设置成 False。

第一步:初始化环境参数

from time import sleep
import boto3
import gzip
import random
import json
import hashlib
import logging

# 写入 S3 的门槛,超过这个值数据会写入 S3,否则保存在数据库内,默认值 350KB
UPLOAD_TO_S3_THRESHOLD_BYTES = 358400
# 用户数据库保存的目标S3存储桶
USER_DATA_BUCKET = 'linyesh-user-data'
# 遇到 S3 有读锁,重新请求最大次数,超出次数限制锁会被自动清除
S3_READ_LOCK_RETRY_TIMES = 10
# 遇到 S3 有读锁,读请求重试间隔时间
S3_READ_RETRY_INTERVAL = 0.2

dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

参数说明

  • UPLOAD_TO_S3_THRESHOLD_BYTES:为字段最大的数据存储长度限制。单位为:字节数。由于 DynamoDB 一个项目(Item)数据大小限制为 400KB。我们除了数据存档中最大字段还必须预留一部分空间给其他字段,避免整个 Item 超出 400KB。
  • USER_DATA_BUCKET:S3 用于存储超出 400KB 后的玩家大字段数据。需要提前建好,具体步骤参考:创建存储桶
  • S3_READ_LOCK_RETRY_TIMES:限制当玩家在 S3 上的存档处在写入状态时候,读请求重试的次数。在项目处于读锁状态的时候,读进程会等待一段时间后重试。
  • S3_READ_RETRY_INTERVAL:读锁状态下,重试读的间隔时间,单位:秒。

注意:S3_READ_LOCK_RETRY_TIMES乘以S3_READ_RETRY_INTERVAL 的时间理论上必须小于S3存档上传时间的最大值,因此实际使用本文中的代码应该根据存档可能的大小来调整这 2 个参数。否则可能存档会有大概率会发生脏读的情况。

第二步:创建 DynamoDB 表

def create_tables():
    """
    创建表
    :return:
    """
    response = dynamodb.create_table(
        TableName='players',
        KeySchema=[
            {
                'AttributeName': 'username',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'username',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

    # Wait until the table exists.
    response.wait_until_exists()

    # Print out some data about the table.
    logger.debug(response.item_count)

第三步:编写辅助逻辑

指数级回退函数

def run_with_backoff(function, retries=5, **function_parameters):
    base_backoff = 0.1  # base 100ms backoff
    max_backoff = 10  # sleep for maximum 10 seconds
    tries = 0
    while True:
        try:
            return function(function_parameters)
        except (ConnectionError, TimeoutError):
            if tries >= retries:
                raise
            backoff = min(max_backoff, base_backoff * (pow(2, tries) + random.random()))
            logger.debug(f"sleeping for {backoff:.2f}s")
            sleep(backoff)
            tries += 1

S3 路径判断函数

def is_s3_path(content):
    return content.startswith('s3://')

S3 文件获取

def get_s3_object(key):
    response = s3.get_object(Bucket=USER_DATA_BUCKET, Key=s3_key_generator(key))
    return response['Body']

检查大小超限

def check_threshold(current_size):
     return current_size > UPLOAD_TO_S3_THRESHOLD_BYTES

S3 Key 生成函数

这个函数可以将玩家的存档随机分配到 S3 桶下不同的 Prefix 中,这有利于提高 S3 中 IO 的性能。

def s3_key_generator(key):  
    s3_prefix = hashlib.md5((key).encode('utf-8')).hexdigest()[:8]  
    return s3_prefix + '/' + key 

文件上传到 S3

def upload_content_to_s3(obj_param):  
    s3_key = s3_key_generator(obj_param['key'])  
    try:  
        response = s3.put_object(  
            Body=obj_param['content_bytes'],  
            Bucket=USER_DATA_BUCKET,  
            Key=s3_key)  
        return "s3://%s/%s" % (USER_DATA_BUCKET, s3_key)  
    except Exception as e:  
        logger.error(e)  
        raise e  

第四步:编写主体逻辑

写入单个项目到 DynamoDB 数据库

def put_item(load_data):  
    gzip_data = gzip.compress(load_data)  # 压缩数据  
    logger.debug('压缩后大小%.2fKB,原始大小 %.2fKB,压缩率 %.2f%%' % (  
        len(gzip_data) / 1024.0,  
        len(load_data) / 1024.0,  
        100.0 * len(gzip_data) / len(load_data)))  
  
    table = dynamodb.Table('players')  
    player_username = 'player' + str(random.randint(1, 1000))  
    if check_threshold(len(gzip_data)):  
        try:  
            # 读锁保护  
            table.update_item(  
                Key={  
                    'username': player_username,  
                },  
                UpdateExpression="set read_lock = :read_lock",  
                ExpressionAttributeValues={  
                    ':read_lock': True,  
                },  
            )  
  
            # 写入数据到 S3  
            s3_path = run_with_backoff(upload_content_to_s3, key=player_username, content_bytes=gzip_data)  
            # 解除读锁保护,同时存储数据在 S3 上到路径  
            response = table.put_item(  
                Item={  
                    'username': player_username,  
                    'read_lock': False,  
                    'inventory': gzip.compress(s3_path.encode(encoding='utf-8', errors='strict')),  
                }  
            )  
            logger.debug('成功上传大纪录到S3,路径:%s' % s3_path)  
        except Exception as e:  
            logger.debug('存档失败')  
            logger.error(e)  
    else:  
        response = table.put_item(  
            Item={  
                'username': player_username,  
                'inventory': gzip_data,  
            }  
        )  
        logger.debug('成功上传纪录, username=%s' % player_username) 

读取数据库中一条玩家记录

def get_player_profile(uid):  
    """ 
    读取记录 
    :param uid: 玩家 id 
    :return: 
    """  
    table = dynamodb.Table('players')  
    player_name = 'player' + str(uid)  
  
    retry_count = 0  
    while True:  
        response = table.get_item(  
            Key={  
                'username': player_name,  
            }  
        )  
  
        if 'Item' not in response:  
            logger.error('Not Found')  
            return {}  
  
        item = response['Item']  
        # 检查读锁信息, 如果存在锁根据参数设置,间隔一段时间重新读取记录  
        if 'read_lock' in item and item['read_lock']:  
            retry_count += 1  
            logger.info('当前第%d次重试' % retry_count)  
            # 如果超时无法读取记录,则消除读锁,并重新读取记录  
            if retry_count < S3_READ_LOCK_RETRY_TIMES:  
                sleep(S3_READ_RETRY_INTERVAL)  
                continue  
            else:  
                table.update_item(  
                    Key={  
                        'username': player_name,  
                    },  
                    UpdateExpression="set read_lock = :read_lock",  
                    ExpressionAttributeValues={  
                        ':read_lock': False,  
                    },  
                )  
  
        inventory_bin = gzip.decompress(item['inventory'].value)  # 解压缩数据  
        inventory_str = inventory_bin.decode("utf-8")  
        if is_s3_path(inventory_str):  
            player_data = gzip.decompress(get_s3_object(player_name).read())  
            inventory_json = json.loads(player_data)  
        else:  
            inventory_json = json.loads(inventory_str)  
  
        user_profile = {**response['Item'], **{'inventory': inventory_json}}  
        return user_profile  

最后,编写测试逻辑

准备几个不同大小的 json 文件,观察写入数据库中的变化。

if __name__ == '__main__':  
    path_example = 'small.json'  
    # path_example = '500kb.json'  
    # path_example = '2MB.json'  
    with open(path_example, 'r') as load_f:  
        load_str = json.dumps(json.load(load_f))  
        test_data = load_str.encode(encoding='utf-8', errors='strict')  
    put_item(test_data)  
  
    # player_profile = get_player_profile(238)  
    # logger.info(player_profile)  

如果需要测试读锁,可以将数据库中单个项目的 read_lock 手动设置成 True,然后观察读取逻辑在这个过程中的变化。

总结

在本次测试中发现,json 格式的数据使用 gzip 后,压缩率约为 25% 左右,理论上我们可以把单个项目(item) 中可以存储最大约为 1.6MB 的数据项。即便有少量压缩后超过 400KB 的数据,也可以存储到 S3 上,仅在 DynamoDB 中存储元数据和大字段数据在 S3 上的路径。

gzip 会带来一些额外的计算和 IO 开销,但是这些开销主要会落在游戏服务器上,对于数据库来说反而减少了 IO 的开销。

在大多数场景下,玩家数据即便不压缩也很少会超过 400KB。这种情况下,建议可以尝试对比压缩启用和不启用两种场景的性能数据。以决定哪种方式更适合自己的游戏。

限制

对于存在单用户有高并发存档需求的游戏而言,以上设计中并未包含在数据存储在 S3 上后,出现并发写的场景考虑。如果有此场景的需求,需要一些应用逻辑或者架构调整。

本篇作者

林业

Amazon 解决方案架构师,负责基于 Amazon 的云计算方案的咨询与架构设计。拥有超过 14 年研发经验,曾打造千万级用户 APP,多项 Github 开源项目贡献者。在游戏、IOT、智慧城市、汽车、电商等多个领域都拥有丰富的实践经验。

文章来源:https://dev.amazoncloud.cn/column/article/630a281576658473a321ffeb?sc_medium=regulartraffic&amp;sc_campaign=crossplatform&amp;sc_channel=CSDN 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/21999.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

去哪儿酒店数据下载

字段内容包含&#xff1a; id int(11) NOT NULL AUTO_INCREMENT, hotelid varchar(50) DEFAULT NULL, url varchar(200) DEFAULT NULL, hotelname2 varchar(100) DEFAULT NULL, name varchar(100) DEFAULT NULL, province varchar(50) DEFAULT NULL, d…

RabbitMQ集群安装

RabbitMQ集群安装 1.前言 OS: CentOS Linux release 7.9.2009 (Core) 机器: IPnodecpu内存存储10.106.1.241max-rabbitmg-018 核16 G100 G10.106.1.242max-rabbitmg-028 核16 G100 G10.106.1.243max-rabbitmg-038 核16 G100 G 因为操作系统版本是 centos7&#xff0c;所以…

跟着chatGPT学习:kubernetes中的Reflector、list-watcher、informer等概念

以下是我跟chatGPT学习kubernetes中Reflector、list-watcher、informer等的概念的过程 不敢保证chatGPT回答的百分之百准确。但是&#xff0c;确实帮助我了我理解&#xff01; 最终学习的是下面的图&#xff0c; 1、在kubernetes中Reflector原理&#xff1f; 在Kubernetes…

【操作系统】线程简介

线程简介 线程概念 在许多经典的操作系统教科书中&#xff0c;总是把进程定义为程序的执行实例&#xff0c;它并不执行什么, 只是维护应用程序所需的各种资源&#xff0c;而线程则是真正的执行实体。 所以&#xff0c;线程是轻量级的进程&#xff08;LWP&#xff1a;light w…

短视频矩阵源码-智能剪辑生成技术数值组如何编程?

短视频混剪生成时长逻辑一般采用根据用户设定的总时长、视频数量、时长比例等参数计算出每个视频在混剪中所占的时长&#xff0c;然后根据视频的总时长与所占比例来划分每个视频在混剪中的时长&#xff0c;最后将各个视频拼接起来形成混剪视频。此算法可以进行灵活的时长调整和…

RabbitMQ 小白教程,从安装到使用

主要内容 AMQP简介 RabbitMQ简介 RabbitMQ原理 Erlang安装 安装RabbitMQ RabbitMQ账户管理 交换器 学习目标 知识点要求AMQP简介掌握RabbmitMQ简介掌握RabbitMQ原理掌握Erlang安装掌握安装RabbitMQ掌握RabbitMQ账户管理掌握交换器掌握 一、 AMQP简介 1 AMQP是什么?…

【Midjourney】Midjourney 连续性人物创作 ④ ( 使用 URL + Seed 随机种子生成连续性的人物 )

文章目录 一、生成图片并获取 Seed二、使用 URL Seed 随机种子生成连续性的人物 使用 URL 链接 和 Seed 随机种子 生成连续性人物 , 必须先生成一组图片 , 然后按 U 按钮 , 选择一张大图 , 之后所有的连续性人物图片都基于该图片进行生成 ; 使用 URL Seed 随机种子生成连续性…

Flink学习——状态编程

目录 一、Flink中的状态 二、状态编程 (一)ValueState案例——判断传感器的数据 1.代码实现 2.端口进行传输数据 3.运行结果 (二)ListState (三)MapState案例——比较学生每次考试成绩 1.代码实现 2.端口传输学生成绩 3.运行结果 (四)ReducingState 一、Flink中的状…

DETR3D 论文学习

1. 解决了什么问题&#xff1f; 对于低成本自动驾驶系统&#xff0c;仅凭视觉信息进行 3D 目标检测是非常有挑战性的。目前的多相机 3D 目标检测方法有两类&#xff0c;一类直接对单目图像做预测&#xff0c;没有考虑 3D 场景的结构或传感器配置。这类方法需要多步后处理&…

C语言小游戏——扫雷

前言 结合前边我们所学的C语言知识&#xff0c;本期我们将使用C语言实现一个简单的小游戏——扫雷 目录 前言 总体框架设计 多文件分装程序 各功能模块化实现 初始化棋盘 棋盘打印 埋雷 判赢与排雷 游戏逻辑安排 总结 总体框架设计 和三子棋相同&#xff0c;游戏开始时…

Linux安装MySQL后无法通过IP地址访问处理方法

本文主要总结Linux安装Mysql后&#xff0c;其他主机访问不了MySQL数据库的原因和解决方法 环境说明&#xff1a; MySQL 5.7.30CentOS Linux release 7.6.1810 (Core) 创建完Mysql数据库后可以查看mysql 日志获取root 用户登录密码 [rootlocalhost mysql-5.7.30]# cat /var/l…

spring源码学习

1.xmlBeanFactory对defaultListableBeanFactory类进行扩展&#xff0c;主要用于从XML文档中获取BeanDefinition&#xff0c;对于注册及获取bean都是使用从父类DefaultListableBeanFactory继承的方法去实现。 xmlBeanFactory 主要是使用reader属性对资源文件进行读取和注册。 2.…

Maven属性与版本管理

文章目录 1 属性1.1 问题分析1.2 解决步骤步骤1:父工程中定义属性步骤2:修改依赖的version 2 配置文件加载属性步骤1:父工程定义属性步骤2:jdbc.properties文件中引用属性步骤3:设置maven过滤文件范围步骤4:测试是否生效 3 版本管理 在这一章节内容中&#xff0c;我们将学习两个…

cpp11实现线程池(一)——项目介绍

项目介绍 线程池是库的形式提供给用户&#xff0c;是必须放到代码中&#xff0c;不能单独运行&#xff0c;亦称为基础组件 第一版线程池任务对象使用继承技术&#xff0c;提供一个抽象基类Task&#xff0c;里面有一个纯虚函数run()&#xff0c;使用时继承该类&#xff0c;并重…

c++综合学习

1.函数调用 传值调用&#xff1a;在函数内部修改形式参数&#xff0c;不改编实际参数的值&#xff1b;引用调用&#xff1a;即指针调用&#xff0c;传入的是变量的指针&#xff0c;则在函数内部修改形式参数&#xff0c;实际参数跟着改变。 2. 数组 数组名即该数组的首地址&a…

CSPM 未来发展的思考

由于数据泄露的持续威胁以及云的短暂和快节奏的特性&#xff0c;只有在最基础的层面上保护您的云才有意义。组织已经转向 CSPM 解决方案来锁定他们的平台。 今天我们来聊聊什么是CSPM&#xff0c;它如何去产生有有效的帮助&#xff0c;未来会向哪发展。 什么是 CSPM&#xff1…

阿拉德手游服务端Centos搭建教程

阿拉德手游服务端Centos搭建教程 大家好我是艾西&#xff0c;又有几天没有更新文章了。这几天看了看还是有不少人对手游感兴趣&#xff0c;今天给大家分享一款早些年大火的pc游戏&#xff0c;现在也有手游了“阿拉德”。 你是否还记得DNF&#xff0c;一天你不小心救了赛丽亚&a…

Win10系统电脑开机黑屏一直转圈无法进入桌面怎么办?

Win10系统电脑开机黑屏一直转圈无法进入桌面怎么办&#xff1f;有用户电脑开机了之后无法进入到桌面中&#xff0c;开机了之后&#xff0c;电脑桌面只有显示一个黑屏和转圈的图标&#xff0c;一直都无法进入到桌面中。强制重启电脑之后依然是这样&#xff0c;那么这个情况怎么去…

今天公司来了个拿 30K 出来的测试,算是见识到了基础的天花板

今天上班开早会就是新人见面仪式&#xff0c;听说来了个很厉害的大佬&#xff0c;年纪还不大&#xff0c;是上家公司离职过来的&#xff0c;薪资已经达到中高等水平&#xff0c;很多人都好奇不已&#xff0c;能拿到这个薪资应该人不简单&#xff0c;果然&#xff0c;自我介绍的…

Mysql-存储过程简单入门

定义&#xff1a; 存储过程的英文是 Stored Procedure 。它的思想很简单&#xff0c;就是一组经过 预先编译 的 SQL 语句 的封装。 执行过程&#xff1a;存储过程预先存储在 MySQL 服务器上&#xff0c;需要执行的时候&#xff0c;客户端只需要向服务器端发出调用 存储过程的命…