twisted实现MMORPG 游戏数据库操作封装设计与实现

在设计 MMORPG(大规模多人在线角色扮演游戏)时,数据库系统是游戏架构中至关重要的一部分。数据库不仅承担了游戏中各种数据(如玩家数据、物品数据、游戏世界状态等)的存储和管理任务,还必须高效地支持并发访问、事务处理和复杂的查询。为了确保系统的可扩展性和维护性,我们需要对数据库操作进行封装和模块化设计。

为了实现上述目标,本文设计了一个基于 Twisted 的数据库封装系统。Twisted 是一个异步框架,适用于处理大量并发任务。结合 Twisted 和数据库连接池(adbapi.ConnectionPool),我们可以高效地执行异步数据库操作。
在这里插入图片描述

在这里插入图片描述

1. DatabaseError
作用:自定义异常类,用于在数据库操作发生错误时抛出详细的错误信息,包含错误消息和错误码(默认为500)。

2. DatabaseOperation
作用:抽象基类,定义了数据库操作的统一接口。所有具体的数据库操作类(如 SelectOperation, InsertOperation 等)都继承自此类,必须实现 excute 方法来执行数据库事务。
关键方法:
executeQuery:执行实际的 SQL 查询。
handleSuccess:操作成功时的回调函数。
handleFailure:操作失败时的回调函数。

3. SelectOperation
作用:继承自 DatabaseOperation,封装了 SELECT 查询操作。提供了 executeQuery 方法来执行 SQL 查询,并返回查询结果。
关键方法:
executeQuery:执行 SELECT 查询,并根据提供的表名、列名、查询条件等生成 SQL 语句。
excute:实现 DatabaseOperation 中的抽象方法,执行查询操作。

4. InsertOperation
作用:继承自 DatabaseOperation,封装了 INSERT 插入操作。通过 executeQuery 方法生成插入的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 INSERT SQL 语句,将数据插入指定的表。
excute:实现 DatabaseOperation 中的抽象方法,执行插入操作。


5. UpdateOperation
作用:继承自 DatabaseOperation,封装了 UPDATE 更新操作。通过 executeQuery 方法生成更新的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 UPDATE SQL 语句,用新值更新指定的行。
excute:实现 DatabaseOperation 中的抽象方法,执行更新操作。


6. DeleteOperation
作用:继承自 DatabaseOperation,封装了 DELETE 删除操作。通过 executeQuery 方法生成删除的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 DELETE SQL 语句,根据指定条件删除记录。
excute:实现 DatabaseOperation 中的抽象方法,执行删除操作。
7. DatabaseManager
作用:负责数据库连接池的管理和数据库操作的执行。它使用 adbapi.ConnectionPool 创建数据库连接池,执行操作并处理事务。
关键方法:
getConnection:返回数据库连接池的实例。
executeOperation:接受一个数据库操作对象,调用 runInteraction 方法来执行异步数据库事务,并处理操作成功或失败的回调。
8. GameQueryPlayerId
作用:继承自 SelectOperation,封装了查询玩家信息的操作。它指定查询条件为玩家的名称,并通过 excute 方法执行查询。
关键方法:
excute:执行 SelectOperation 中的 executeQuery 方法,查询玩家 ID。
set_query_name:设置查询的玩家名称。

代码

from twisted.enterprise import adbapi
from twisted.internet.defer import Deferred
import pymysql
import traceback
from twisted.internet import reactor
from functools import partial
from abc import ABC, abstractmethod
# 异常类定义
class DatabaseError(Exception):
    def __init__(self, message, code=500):
        self.message = message
        self.code = code


# 抽象的数据库操作类
class IDatabaseOperation(ABC):
    @abstractmethod
    def executeQuery(self, txn, table: str, columns: list, values: dict = {}, condition: dict = None) -> any:
        pass

    def handleSuccess(self, result: any):
        print("Operation succeeded with result:", result)

    def handleFailure(self, error: Exception):
        # 这里做一些额外的错误处理,比如记录日志或者返回友好的错误信息
        print("Operation failed:", error)

    @abstractmethod
    def excute(self, txn):
        #这里封装代码
        pass

class ABC_SelectOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns) if columns else "*"
            query = f"SELECT {column_str} FROM {table}"

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"

            print(f"Executing query: {query}, with values: {tuple(condition.values()) if condition else ()}")
            txn.execute(query, tuple(condition.values()) if condition else ())  # use condition values if any
            result = txn.fetchall()
            return result
        except Exception as e:
            print(f"Error executing query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_InsertOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns)
            placeholders = ", ".join(["%s"] * len(columns))  # Create placeholders based on column length

            # Extract values from the dictionary for each column
            value_tuple = tuple(values[col] for col in columns)

            query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"

            print(f"Executing insert query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            return txn.lastrowid  # Return the inserted record ID
        except Exception as e:
            print(f"Error executing insert query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass

class ABC_UpdateOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if condition and not isinstance(condition, dict):
                raise TypeError("Condition must be a dictionary")

            set_str = ", ".join([f"{col} = %s" for col in columns])
            query = f"UPDATE {table} SET {set_str}"

            # Ensure that values is passed as a tuple for update
            value_tuple = tuple(values[col] for col in columns)

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"
                value_tuple += tuple(condition.values())  # Append condition values

            print(f"Executing update query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            txn.connection.commit()
            return txn.rowcount  # Return the number of updated rows
        except Exception as e:
            print(f"Error executing update query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_DeleteOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if not condition:
                raise ValueError("Condition for deletion cannot be empty.")
            condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
            query = f"DELETE FROM {table} WHERE {condition_str}"

            print(f"Executing delete query: {query}, with values: {tuple(condition.values())}")
            txn.execute(query, tuple(condition.values()))  # Use condition values for parameterized query
            return txn.rowcount
        except Exception as e:
            print(f"Error executing delete query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self):
        pass



# 数据库管理类,负责数据库连接池和事务
class DatabaseManager:
    def __init__(self, db_config):
        # 初始化数据库连接池
        self.dbConnectionPool = adbapi.ConnectionPool("pymysql", **db_config)

    def getConnection(self):
        return self.dbConnectionPool

    def executeOperation(self, operation: DatabaseOperation) -> Deferred:
        try:
            # 使用 partial 创建一个指定了参数的函数

            deferred = self.dbConnectionPool.runInteraction(operation.excute)
            deferred.addCallback(operation.handleSuccess)
            deferred.addErrback(operation.handleFailure)
            return deferred
        except Exception as e:
            error = DatabaseError(str(e), 500)
            operation.handleFailure(error)
            return None


# 示例数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'database': 'test',
}


class GameQueryPlayerId(ABC_SelectOperation):
    def __init__(self):
        self.select_columns = ["name", "id"]
        self.elect_condition = {"name": "new_name"}  # 在此给出查询条件
    def excute(self, txn):

        # 执行查询操作
        return self.executeQuery(txn,"test", columns=self.select_columns, condition=self.elect_condition )

    def set_query_name(self, name):
        self.elect_condition["name"] = name

# 示例操作
def main():

    # 创建DatabaseManager实例
    db_manager = DatabaseManager(db_config)
    ccGameQueryPlayerId = GameQueryPlayerId()
    ccGameQueryPlayerId.set_query_name("new_name")
    deferred = db_manager.executeOperation(ccGameQueryPlayerId)



    reactor.run()


if __name__ == "__main__":
    main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/969674.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PyCharm 批量替换

选择替换的内容 1. 打开全局替换窗口 有两种方式可以打开全局替换窗口: 快捷键方式: 在 Windows 或 Linux 系统下,按下 Ctrl Shift R。在 Mac 系统下,按下 Command Shift R。菜单操作方式:点击菜单栏中的 Edit&…

LabVIEW用户界面设计原则

在LabVIEW开发中,用户界面(UI)设计不仅仅是为了美观,它直接关系到用户的操作效率和体验。一个直观、简洁、易于使用的界面能够大大提升软件的可用性,尤其是在复杂的实验或工业应用中。设计良好的UI能够减少操作错误&am…

网络防御高级-第8章及之前综合作业

标准版 接口ip配置 r2 [r2]interface GigabitEthernet 0/0/0 [r2-GigabitEthernet0/0/0]ip address 13.0.0.3 24 [r2-GigabitEthernet0/0/0]interface GigabitEthernet 0/0/1 [r2-GigabitEthernet0/0/1]ip address 100.1.1.254 24 [r2-GigabitEthernet0/0/1]interface Gigab…

若依系统环境搭建记录

开源若依系统网上资料也很全的,本篇博文记录下自己搭建环境过程中遇到的一些问题。 配置Maven和编辑器选择 我懒得配置Eclipse了,直接用vscode作为编辑器,后面构建运行都用命令行。 配置数据库连接 按照mysql5.7按网上教程即可&#xff1…

C# 运算符

总目录 前言 在C#中,运算符是用于执行特定操作的符号。它们可以用于处理变量、常量或其他表达式。C# 提供了丰富的运算符集合,用于执行各种操作,如算术运算、逻辑判断、位操作等。了解这些运算符及其使用方式对于编写高效且功能强大的C#程序…

为AI聊天工具添加一个知识系统 之103 详细设计之44 自性三藏 之4 祖传代码 之2

本文要点 要点 前面的所有讨论都是为了给出我的设计项目(为使用AI聊天工具的聊天者 开挂一个知识系统) 的祖传代码 的完整设计,其中 的“槽”(占位符变量)的 库元(宝性和自性creator -本俱 替换内容标准模…

wireshark网络抓包

由于图片和格式解析问题,可前往 阅读原文 到这里已经讲了两个抓包工具的使用了,大家应该对抓包不是很陌生了。而wireshark相对于fiddler和charles更加偏向于网络层面的抓包或者说是一个网络封包分析工具。使用对象更适合于网络相关人员(网络管理员/相关运…

深入理解Linux网络随笔(一):内核是如何接收网络包的(下篇)

3、接收网络数据 3.1.1硬中断处理 数据帧从网线到达网卡时候,首先到达网卡的接收队列,网卡会在初始化时分配给自己的RingBuffer中寻找可用内存位置,寻找成功后将数据帧DMA到网卡关联的内存里,DMA操作完成后,网卡会向…

新版电脑通过wepe安装系统

官方下载链接 WIN10下载 WIN11下载 微PE 启动盘制作 1:选择启动盘的设备 2:选择对应的U盘设备,点击安装就可以,建议大于8g 3:在上方链接下载需要安装的程序包,放入启动盘,按需 更新系统 …

蓝桥杯之KMP算法

算法思想 代码实现 int* getnext() {int* next new int[s2.size()];int j 0;//用来遍历子串int k -1;//子串中公共子串的长度next[0] -1;while (j < s2.size() - 1){if (k-1||s2[k] s2[j]){k;j;if (s2[k] s2[j]){next[j] next[k];}else{next[j] k;}}else{k next[k…

jsp页面跳转失败

今天解决一下jsp页面跳转失败的问题 在JavaWeb的学习过程中&#xff0c;编写了这样一段代码&#xff1a; <html> <body> <h2>Hello World!</h2><%--这里提交的路径&#xff0c;需要寻找到项目的路径--%> <%--${pageContext.request.context…

如何实现对 ELK 各组件的监控?试试 Metricbea

上一章基于 Filebeat 的日志收集使用Filebeat收集文件中的日志&#xff0c;而Metricbeat则是收集服务器存活性监测和系统指标的指标。 1. Filebeat和Metricbeat的区别 特性FilebeatHeartbeat作用收集和转发日志监测服务可用性数据来源服务器上的日志文件远程主机、API、服务主…

DeepSeek-VL2 环境配置与使用指南

DeepSeek-VL2 环境配置与使用指南 DeepSeek-VL2 是由 DeepSeek 公司开发的一种高性能视觉-语言模型&#xff08;VLM&#xff09;。它是 DeepSeek 系列多模态模型中的一个版本&#xff0c;专注于提升图像和文本之间的交互能力。 本文将详细介绍如何配置 DeepSeek-VL2 的运行环…

Golang的并发编程问题解决思路

Golang的并发编程问题解决思路 一、并发编程基础 并发与并行 在计算机领域&#xff0c;“并发”和“并行”经常被混为一谈&#xff0c;但它们有着不同的含义。并发是指一段时间内执行多个任务&#xff0c;而并行是指同时执行多个任务。在 Golang 中&#xff0c;通过 goroutines…

多能互补综合能源系统,改变能源结构---安科瑞 吴雅芳

多能互补综合能源系统是一种通过整合多种能源的形势&#xff08;如电力、天然气、热能、冷能等&#xff09;和多种能源技术&#xff08;如可再生能源、储能技术、智能电网等&#xff09;&#xff0c;实现能源利用和配置调整的系统。其目标是通过多能互补和协同优化&#xff0c;…

Linux部署DeepSeek r1 模型训练

之前写过一篇windows下部署deepseekR1的文章&#xff0c;有小伙伴反馈提供一篇linux下部署DeepSeek r1 模型训练教程&#xff0c;在 Linux 环境下&#xff0c;我找了足够的相关资料&#xff0c;花费了一些时间&#xff0c;我成功部署了 DeepSeek R1 模型训练任务&#xff0c;结…

使用pyCharm创建Django项目

使用pyCharm创建Django项目 1. 创建Django项目虚拟环境&#xff08;最新版版本的Django) 使用pyCharm的创建项目功能&#xff0c;选择Django,直接创建。 2. 创建Django项目虚拟环境&#xff08;安装特定版本&#xff09; 2.1创建一个基础的python项目 2.2 安装指定版本的D…

基于vue3实现的课堂点名程序

设计思路 采用vue3实现的课堂点名程序&#xff0c;模拟课堂座位布局&#xff0c;点击开始点名按钮后&#xff0c;一朵鲜花在座位间传递&#xff0c;直到点击结束点名按钮&#xff0c;鲜花停留的座位被点名。 课堂点名 座位组件 seat.vue <script setup>//组合式APIimpo…

Springboot 中如何使用Sentinel

在 Spring Boot 中使用 Sentinel 非常方便&#xff0c;Spring Cloud Alibaba 提供了 spring-cloud-starter-alibaba-sentinel 组件&#xff0c;可以快速将 Sentinel 集成到你的 Spring Boot 应用中&#xff0c;并利用其强大的流量控制和容错能力。 下面是一个详细的步骤指南 …

IoTDB 导入数据时提示内存不足如何处理

问题现象 IoTDB 导入数据时提示内存不足&#xff0c;该如何处理&#xff1f; 解决方案 数据导入脚本会在触发内存不足的时候主动进行重试。当遇到此问题时&#xff0c;用户不用做任何操作&#xff0c;脚本也可以正确进行处理。如果想从根源减少此类提示&#xff0c;可以按照下…