Python与PostgreSQL的深度整合:CRUD操作全指南

Python与PostgreSQL的深度整合:CRUD操作全指南

1. 环境准备

1.1 安装必要的包

pip install sqlalchemy psycopg2-binary sqlmodel

1.2 数据库连接

from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel

# 连接字符串格式
DATABASE_URL = "postgresql://username:password@localhost:5432/dbname"

# 创建引擎
engine = create_engine(DATABASE_URL)

# 创建所有表
SQLModel.metadata.create_all(engine)

2. 模型定义

2.1 基本模型

from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(index=True)
    email: str = Field(unique=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    is_active: bool = Field(default=True)

2.2 关系模型

class Post(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str
    content: str
    user_id: int = Field(foreign_key="user.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)

3. CRUD 操作

3.1 创建操作 (Create)

def create_user(username: str, email: str) -> User:
    with Session(engine) as session:
        # 创建用户对象
        user = User(username=username, email=email)
        
        try:
            # 添加到会话
            session.add(user)
            # 提交事务
            session.commit()
            # 刷新对象(获取数据库生成的值)
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
new_user = create_user("john_doe", "john@example.com")

3.2 查询操作 (Read)

from sqlmodel import select

def get_user_by_id(user_id: int) -> Optional[User]:
    with Session(engine) as session:
        return session.get(User, user_id)

def get_user_by_email(email: str) -> Optional[User]:
    with Session(engine) as session:
        statement = select(User).where(User.email == email)
        return session.exec(statement).first()

def get_all_users(skip: int = 0, limit: int = 100) -> list[User]:
    with Session(engine) as session:
        statement = select(User).offset(skip).limit(limit)
        return session.exec(statement).all()

# 复杂查询示例
def get_active_users_with_posts():
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .join(Post)
            .where(User.is_active == True)
            .order_by(User.created_at.desc())
        )
        return session.exec(statement).all()

3.3 更新操作 (Update)

def update_user(user_id: int, **kwargs) -> Optional[User]:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return None
        
        # 更新提供的字段
        for key, value in kwargs.items():
            if hasattr(user, key):
                setattr(user, key, value)
        
        try:
            session.add(user)
            session.commit()
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
updated_user = update_user(1, email="newemail@example.com", is_active=False)

3.4 删除操作 (Delete)

def delete_user(user_id: int) -> bool:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return False
        
        try:
            session.delete(user)
            session.commit()
            return True
        except Exception as e:
            session.rollback()
            raise e

4. 高级用法

4.1 事务管理

from sqlalchemy.orm import Session

def transfer_posts(from_user_id: int, to_user_id: int) -> bool:
    with Session(engine) as session:
        try:
            # 开始事务
            with session.begin():
                # 更新所有帖子的用户ID
                statement = (
                    update(Post)
                    .where(Post.user_id == from_user_id)
                    .values(user_id=to_user_id)
                )
                session.exec(statement)
                
                # 如果需要,可以在这里执行更多操作
                
            return True
        except Exception as e:
            print(f"Error: {e}")
            return False

4.2 批量操作

def bulk_create_users(users_data: list[dict]) -> list[User]:
    with Session(engine) as session:
        try:
            users = [User(**data) for data in users_data]
            session.add_all(users)
            session.commit()
            
            for user in users:
                session.refresh(user)
            
            return users
        except Exception as e:
            session.rollback()
            raise e

4.3 异步操作

from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

# 异步数据库URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

async_engine = create_async_engine(ASYNC_DATABASE_URL)

async def async_get_user(user_id: int) -> Optional[User]:
    async with AsyncSession(async_engine) as session:
        statement = select(User).where(User.id == user_id)
        result = await session.exec(statement)
        return result.first()

5. 最佳实践

5.1 连接池配置

engine = create_engine(
    DATABASE_URL,
    pool_size=5,  # 连接池大小
    max_overflow=10,  # 超出pool_size后的最大连接数
    pool_timeout=30,  # 连接池获取连接的超时时间
    pool_recycle=1800,  # 连接重置时间(秒)
)

5.2 异常处理

from sqlalchemy.exc import IntegrityError, OperationalError

def safe_create_user(username: str, email: str) -> tuple[Optional[User], str]:
    try:
        user = create_user(username, email)
        return user, "Success"
    except IntegrityError:
        return None, "User with this email already exists"
    except OperationalError:
        return None, "Database connection error"
    except Exception as e:
        return None, f"Unexpected error: {str(e)}"

5.3 模型验证

from pydantic import EmailStr, validator

class UserCreate(SQLModel):
    username: str
    email: EmailStr
    
    @validator('username')
    def username_must_be_valid(cls, v):
        if len(v) < 3:
            raise ValueError('Username must be at least 3 characters long')
        return v

6. 性能优化

6.1 查询优化

# 使用 select_from 优化多表查询
def get_user_posts_optimized(user_id: int):
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .select_from(User)
            .join(Post)
            .where(User.id == user_id)
            .options(selectinload(User.posts))
        )
        return session.exec(statement).all()

6.2 缓存实现

from functools import lru_cache

@lru_cache(maxsize=100)
def get_cached_user(user_id: int) -> Optional[User]:
    return get_user_by_id(user_id)

7. 调试和监控

7.1 SQL语句日志

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

7.2 性能分析

from sqlalchemy import event

@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())

@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop()
    print(f"Query took {total:.2f} seconds")

8. 部署注意事项

  1. 使用环境变量管理数据库连接信息
  2. 实现连接重试机制
  3. 正确配置连接池
  4. 实现数据库迁移策略
  5. 定期备份数据
  6. 监控数据库性能
  7. 实现错误报告机制

9. 常见问题解决

  1. 连接池耗尽
  2. 死锁处理
  3. 长事务管理
  4. 并发访问控制
  5. 大数据集处理
  6. 内存使用优化

10. 参考资源

  • SQLModel 官方文档
  • SQLAlchemy 官方文档
  • PostgreSQL 官方文档
  • Python 数据库最佳实践

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/942646.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[手机Linux] 七,NextCloud优化设置

安装完成后在个人设置里发现很多警告&#xff0c;一一消除。 只能一条一条解决了。 关于您的设置有一些错误。 1&#xff0c;PHP 内存限制低于建议值 512 MB。 设置php配置文件&#xff1a; /usr/local/php/etc/php.ini 把里面的&#xff1a; memory_limit 128M 根据你自…

使用Excel制作通达信自定义“序列数据“

序列数据的视频教程演示 Excel制作通达信自定义序列数据 1.序列数据的制作方法&#xff1a;删掉没有用的数据&#xff08;行与列&#xff09;和股代码格式处理&#xff0c;是和外部数据的制作方法是相同&#xff0c;自己上面看历史博文。只需要判断一下&#xff0c;股代码跟随的…

逆向工程在医疗器械中的应用

关于逆向工程&#xff1a; 逆向设计跟正向设计流程不同&#xff0c;它是对己有产品原型进行分析、改进和再创造的过程。通过先进的数字测量手段反向获取产品的外形数据&#xff0c;然后利用各种造型软件由点云数据重构出该产品的CAD模型。逆向工程的辅助设计建构可以缩短产品的…

Web安全攻防入门教程——hvv行动详解

Web安全攻防入门教程 Web安全攻防是指在Web应用程序的开发、部署和运行过程中&#xff0c;保护Web应用免受攻击和恶意行为的技术与策略。这个领域不仅涉及防御措施的实现&#xff0c;还包括通过渗透测试、漏洞挖掘和模拟攻击来识别潜在的安全问题。 本教程将带你入门Web安全攻防…

rk3588 android12 root

问题说明&#xff1a; 将 andorid12 root 测试情况说明&#xff1a;我在 串口上 实际上 是可以 使用 su root 命令 进入 root 的&#xff0c;但是 使用 root check apk 检测的时候却通不过。 是否解决说明&#xff1a; 已解决 解决问题的逻辑&#xff1a; 就按照 网上的资料…

基于Mysql、JavaScript、PHP、ajax开发的MBTI性格测试网站(前端+后端)

源码地址&#xff1a;https://download.csdn.net/download/2302_79553009/89933699 项目简介 本项目旨在构建一个基于MBTI&#xff08;迈尔斯-布里格斯性格分类指标&#xff09;理论的在线平台——“16Personalities”。该平台利用PHP、MySQL、JavaScript等技术栈开发&#x…

数字IC后端设计实现十大精华主题分享

今天小编给大家分享下吾爱IC社区星球上周十大后端精华主题。 Q1:星主&#xff0c;请教个问题&#xff0c;长tree的时候发现这个scan的tree 的skew差不多400p&#xff0c;我高亮了整个tree的schematic&#xff0c;我在想是不是我在这一系列mux前边打断&#xff0c;设置ignore p…

Docker 快速搭建 GBase 8s数据库服务

1.查看Gbase 8s镜像版本 可以去到docker hub网站搜索&#xff1a;gbase8s liaosnet/gbase8s如果无法访问到该网站&#xff0c;可以通过docker search搜索 docker search gbase8s2.拉取Gbase 8s镜像 以下演示的版本是目前官网最新版本Gbase8sV8.8_3.5.1 docker pull liaosn…

大型语言模型(LLMs)演化树 Large Language Models

大型语言模型&#xff08;LLMs&#xff09;演化树 Large Language Models flyfish 下面的图来自论文地址 Transformer 模型&#xff08;如 BERT 和 GPT-3&#xff09;已经给自然语言处理&#xff08;NLP&#xff09;领域带来了革命性的变化。这得益于它们具备并行化能力&…

让 AMD GPU 在大语言模型推理中崭露头角:机遇与挑战

在当今科技飞速发展的时代&#xff0c;大语言模型&#xff08;LLM&#xff09;的兴起彻底改变了人工智能领域的格局。从智能客服到文本生成&#xff0c;从知识问答到代码编写辅助&#xff0c;大语言模型的应用无处不在&#xff0c;深刻影响着我们的生活和工作。然而&#xff0c…

CPU条件下Pytorch、jupyter环境配置

一、创建虚拟环境 查看虚拟环境 conda env list 创建python虚拟环境 conda create -n minist python3.11 激活虚拟环境 conda activate minist 查看虚拟环境下有哪些包 pip list 二、安装pytorch 切换清华源 conda config --add channels https://mirrors.tuna.tsing…

【iOS安全】Block开发与逆向

1. OC中的Block 1.1 Block的基本概念 在iOS开发中&#xff0c;Block是一种特殊的数据类型&#xff0c;类似于其他编程语言中的匿名函数。它可以封装一段代码&#xff0c;并且能够像普通变量一样传递、存储和执行。Block可以捕获并访问定义它时所在作用域的变量&#xff0c;这…

C# 中的记录类型简介 【代码之美系列】

&#x1f380;&#x1f380;&#x1f380;代码之美系列目录&#x1f380;&#x1f380;&#x1f380; 一、C# 命名规则规范 二、C# 代码约定规范 三、C# 参数类型约束 四、浅析 B/S 应用程序体系结构原则 五、浅析 C# Async 和 Await 六、浅析 ASP.NET Core SignalR 双工通信 …

查询 MySQL 默认的存储引擎(SELECT @@default_storage_engine;)

要查询 MySQL 默认的存储引擎&#xff0c;可以使用以下 SQL 查询语句&#xff1a; SELECT default_storage_engine;解释&#xff1a; SELECT: 表示你要执行一个查询。default_storage_engine: 这是一个 MySQL 系统变量&#xff0c;它存储着当前 MySQL 服务器的默认存储引擎。…

大数据技术-Hadoop(二)HDFS的介绍与使用

目录 1、HDFS简介 1.1 什么是HDFS 1.2 HDFS的优点 1.3、HDFS的架构 1.3.1、 NameNode 1.3.2、 NameNode的职责 1.3.3、DataNode 1.3.4、 DataNode的职责 1.3.5、Secondary NameNode 1.3.6、Secondary NameNode的职责 2、HDFS的工作原理 2.1、文件存储 2.2 、数据写…

SpringBoot项目的5种搭建方式(以idea2017为例)

目录 1. idea中使用官方API 2. idea中使用阿里云API 3. 在spring官网创建 4. 在阿里云官网创建 5. Maven项目改造成springboot项目 SpringBoot项目的创建细分一共有5种&#xff0c;其实主要分为以下三种&#xff1a; ①使用开发工具idea创建springboot项目&#xff08; Sp…

Android 设置铃声和闹钟

Android设置铃声和闹钟使用的方法是一样的&#xff0c;但是要区别的去获取对应的权限。 统一权限&#xff0c;不管是设置闹钟还是铃声&#xff0c;他们都需要一个系统设置权限如下: //高版本需要WRITE_SETTINGS权限//此权限是敏感权限&#xff0c;无法动态申请&#xff0c;需要…

三维扫描在汽车/航空行业应用

三维扫描技术应用范围广泛&#xff0c;从小型精密零件到大型工业设备&#xff0c;都能实现快速、准确的测量。 通过先进三维扫描技术获取产品和物体的形面三维数据&#xff0c;建立实物的三维图档&#xff0c;满足各种实物3D模型数据获取、三维数字化展示、3D多媒体开发、三维…

optuna和 lightgbm

文章目录 optuna使用1.导入相关包2.定义模型可选参数3.定义训练代码和评估代码4.定义目标函数5.运行程序6.可视化7.超参数的重要性8.查看相关信息9.可视化的一个完整示例10.lightgbm实验 optuna使用 1.导入相关包 import torch import torch.nn as nn import torch.nn.functi…

【Yonghong 企业日常问题 06】上传的文件不在白名单,修改allow.jar.digest属性添加允许上传的文件SH256值?

文章目录 前言问题描述问题分析问题解决1.允许所有用户上传驱动文件2.如果是想只上传白名单的驱动 前言 该方法适合永洪BI系列产品&#xff0c;包括不限于vividime desktop&#xff0c;vividime z-suit&#xff0c;vividime x-suit产品。 问题描述 当我们连接数据源的时候&a…