Redis消息队列与thinkphp/queue操作

业务场景

场景一

用户完成注册后需要发送欢迎注册的问候邮件、同时后台要发送实时消息给用户对应的业务员有新的客户注册、最后将用户的注册数据通过接口推送到一个营销用的第三方平台。

遇到两个问题:
  1. 由于代码是串行方式,流程大致为:开启数据库事务回滚->数据入库准备->发邮件->发实时消息->推送第三方平台->提交写入数据库。但是后续的3个步骤任意一个流程出了问题都会影响用户的注册结果。
  2. 发送邮件使用的不是成熟的第三方产品,而是利用phpmaile自写代码实现的,然而这个过程耗时相对较长且偶尔有失败的情况;另外通过接口推送注册用户数据到的第三方平台是一个国外的产品接口通讯时间很长且一样有失败的情况。
以上两个问题就会导致用户的注册交互流程时间很长产品体验感非常差;且发送邮件、发送消息、推送数据任意一个步骤由于特殊情况导致执行失败都不能终止用户注册这样就只能通过日志捕获相应的失败情况。

场景二

用户在Shopify平台(一个跨境电商平台)付款下单后,商家会将订单同步到我的系统中,在我的系统中完成询价、报价、付款后我需要再将订单数据推送到第三方配货发货的平台。平台发货完成后通过设置好的回调地址通知我的系统发货的物流信息数据,我需要将物流信息数据存入到我的数据库后再将物流信息同步给Shopify平台用以展示给真实下单用户查看物流轨迹。

遇到一个问题:
  1. 正常情况下的回调代码逻辑是将物流信息写入数据库,再同步物流数据给Shopify。但是由于各种原因后者(同步物流数据给Shopify)有一定概率会失败。
这样就出现了我系统内成功展示了物流信息而Shopify反馈没有成功同步物流轨迹的订单出现。而回调又是一次性的我只能自查数据库进行回补。

英雄登场(消息队列Redis)

官方介绍:消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。

Redis安装

我用的宝塔安装的方便快捷,软件商品搜索Redis然后点击系统对应php版本的立即前往
在这里插入图片描述
再后续弹窗中安装redis扩展即可
在这里插入图片描述
后续Redis中的队列数据也可以通过宝塔进行查看:
在这里插入图片描述

thinkphp/queue

扩展这个内置了 Redis、Database、Topthink、Sync四种驱动,这里我用的Redis。think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作。

thinkphp/queue引入扩展

composer require topthink/think-queue

thinkphp/queue配置文件

我使用的是TP5要再application/extra目录下新增queue.php文件,文件内容如下(视各自情况调整哈):

return [
    'connector'  => 'Redis', // 驱动类型
    'expire'     => 60, // 任务的过期时间,默认为60秒; 如果任务执行时间超过此时间将会被认为是过期,将不会被执行
    'default'    => 'default', // 默认的队列名称
    'host'       => '127.0.0.1', // Redis 主机地址
    'port'       => 6379, // Redis 端口
    'password'   => '', // Redis 密码
    'select'     => 0, // 使用哪一个 Redis 数据库
    'timeout'    => 0, // 连接超时时间
    'persistent' => false, // 是否长连接
];

解决方案(注册部分)

引入消息队列后就是将原来串行方式改为并行,用户注册逻辑代码中关于后三个步骤只要单纯的推送队列即可。而后三者采用并行方式(也就是异步)执行对应的逻辑。这样既提高了注册的速度又可以通过队列将出错的数据多次执行提高成功率

注册逻辑代码

	public static function doSaveRegister($postParam)
    {
        db()->startTrans();
        try {
            $first_name = trim(outputstr($postParam, "first_name"));
            $last_name = trim(outputstr($postParam, "last_name"));
            $email = trim(outputstr($postParam, "email"));
            $password = trim(outputstr($postParam, "password"));

           	//注册部分就展示部分代码了
           	
            $info = new UserModel();
            $info->id = Uuid::uuid4();
            $info->number = createUserNumber();
            $info->short_name = substr($email, 0, strripos($email, "@"));
            $info->email = $email;
            if ($info->save() === false) {
                throw new Exception('Operation error!');
            }
            
            //发送用户注册成功的问候邮件,将要发送邮件的邮箱推送到消息队列
            $result = RedisUtils::redisQueueSendForSendRegisterWelcomeEmail('common\job\UserRegisterJob@sendRegisterSuccessWelcomeEmail', $email);
            if ($result['success'] === false) {
                throw new Exception('redis queue error!');
            }

			//这里只展示发送邮件的代码示例其他都是一样的道理

            db()->commit();
            $result = result_success('Register successful!');
        } catch (Exception $e) {
            db()->rollback();
            $result = result_error($e->getMessage());
        }
        return $result;
    }

推送发送邮件消息队列(生产者)

	/**
     * 用户注册成功需要发送问候邮件的用户数据加入队列
     * @param string $job 处理该任务的任务名
     * @param string $data 加入队列的数据-邮箱号
     * @param string $queue_name 队列名,可以不写
     */
    public static function redisQueueSendForSendRegisterWelcomeEmail($job, $data, $queue_name = 'user_register_email')
    {
        //此处做了延时推送,原因是邮件服务是自己写程序实现的避免高并发导致发送失败,所以延时推送一下
        $isPushed = Queue::later(5, $job, $data, $queue_name);
        if ($isPushed !== false) {
            $result = result_success('队列加入成功');
        } else {
            $result = result_error('队列加入失败');
        }
        return $result;
    }

消息队列处理逻辑(消费者)

<?php

namespace common\job;

use common\utils\email\EmailUtils;
use common\utils\gateway\GatewaysUtils;
use common\utils\log\LoggerUtils;
use common\utils\systemMessage\SystemMessageUtils;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use think\queue\Job;

/**
 * 处理所有用户注册方面的队列数据,代码逻辑写在这里,运行方式是命令行执行的
 * Class UserRegister
 * @package common\job
 */
class UserRegisterJob
{
    /**
     * 处理发送用户注册成功邮件的队列
     * @param Job $job
     * @param string $data 要发送邮件的邮箱
     */
    public function sendRegisterSuccessWelcomeEmail(Job $job, $data)
    {
        $result = EmailUtils::sendUserRegisterSuccessEmail($data);
        if ($result['success'] === false) {
            //判断一下发送失败的次数,超过3次剔除队列
            $attempts = $job->attempts();
            if ($attempts > 3) {
                //发送失败,写进日志,邮件通知开发者
                $message = '新用户注册发送问候邮件失败,程序错误内容:' . $result['msg'] . ',数据源:' . $data;
                LoggerUtils::systemErrorLog()->info($message);
                EmailUtils::sendSystemErrorEmailToDeveloper($message);
                $job->delete();
            }
        } else {
            //发送成功,剔除队列
            $job->delete();
        }
    }
}

启动队列监听

进入项目根目录执行

php think queue:work --queue 队列名1,队列名2

多个队列可以用逗号拼接一次性监听

这个进行一般都要后台运行且开机自启动,自己写的脚本如下:

#!/bin/bash
#启动Redis队列监听
cd /www/wwwroot/english-e-commerce/ && php think queue:listen --queue user_register_email,user_register_workman_message,sync_user_to_tidio,order_sync_mabang_track,fulfillment_shopify_order &

开机启动方法根据不同linux系统有很多种此处不做记录

不喜勿喷,也是初学。记录一下方便后面查找

参考链接

  • ThinkPHP 使用 think-queue 实现 redis 消息队列(超详细)
  • 消息队列使用的四种场景介绍

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/484500.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

蓝桥杯day12刷题日记

P8720 [蓝桥杯 2020 省 B2] 平面切分 思路&#xff1a;首先借用dalao的图解释一下&#xff0c;又多出一条与当前平面任意一条直线都不重合线时&#xff0c;多了的平面是交点数1&#xff0c;所以用双层循环每次往里面加一条直线&#xff0c;计算交点 #include <iostream>…

生日悖论与概率分析:如何运用随机算法解答生日问题

生日&#xff0c;这个我们每年都会庆祝的特殊日子&#xff0c;在概率学和随机算法中却隐藏着许多有趣的秘密。今天&#xff0c;我们就来探讨一下如何利用概率分析和随机算法来解决与生日有关的几个常见问题&#xff1a;一次聚会需要邀请多少人&#xff0c;才能让其中3人的生日很…

嵌入式多层总线矩阵结构介绍

嵌入式系统中&#xff0c;多层总线矩阵结构是一种常见的总线连接方式&#xff0c;用于连接处理器核、内存、外设和其他系统组件&#xff0c;以实现数据传输和控制信号的交换。本文将介绍嵌入式多层总线矩阵结构的基本概念、主要特点以及在实际应用中的优势。 以下是我整理的关…

阿里云服务器价格购买价格表,2024新版报价查询

2024年腾讯云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新的云服务器优惠券…

【算法每日一练]-动态规划(保姆级教程 篇17 状态压缩)#POJ1185:炮兵阵地 #互不侵犯

目录 今日知识点&#xff1a; 把状态压缩成j,dp每行i的布置状态&#xff0c;从i-1和i-2行进行不断转移 把状态压缩成j,dp每行i的布置状态&#xff0c;从i-1行进行状态匹配&#xff0c;然后枚举国王数转移 POJ1185&#xff1a;炮兵阵地 思路&#xff1a; 题目&#xff1a;互…

代码随想录|Day27|贪心02|122.买卖股票的最佳时机II、55.跳跃游戏、45.跳跃游戏II

122.买卖股票的最佳时机II 本题可以将最终利润分解为每日利润&#xff1a; 假如第 0 天买入&#xff0c;第 3 天卖出&#xff0c;那么利润为&#xff1a;prices[3] - prices[0]。 相当于(prices[3] - prices[2]) (prices[2] - prices[1]) (prices[1] - prices[0])。 如下图所…

机器人流程自动化技术(RPA)金融应用指南

1 范围 本文件提供了机器人流程自动化技术在金融领域应用的参考框架、技术设计、场景应用、安全管理、 成效评估等指南。 本文件适用于金融机构开展机器人流程自动化技术应用的产品设计、软件开发、系统评估等。 2 规范性引用文件 下列文件中的内容通过文中的规范性引用而构成…

深入探索JDK动态代理:从入门到原理的全面解析

文章目录 基本概念入门案例实现JDK动态代理的步骤入门实操拓展--动态生成代理类的几种方法方式一&#xff1a;通过getProxyClass方法获取代理实例方式二&#xff1a;通过newProxyInstance方法获取代理实例&#xff08;常用&#xff01;&#xff09;方式三&#xff1a;通过Lambd…

python--初学函数

函数&#xff08;function&#xff09;&#xff1a; 什么是函数&#xff1f; 具有名称的&#xff0c;是为了解决某一问题&#xff0c;功能代码的集合&#xff0c;叫做函数 python中函数如何定义&#xff1a;def>define function定义函数 def function_name([args临时变量…

Oracle 11G备份集中控制文件和spfile被异常删除

坐标大理&#xff0c;苍山下&#xff0c;洱海旁&#xff0c;风花雪月&#xff01;&#xff01; 今日一大早就接到一个case&#xff0c;根据客户描述&#xff0c;大概意思是昨天晚上发现18号的ctl和spfile无故消失&#xff0c;备份策略是一周一个0级增量备份&#xff0c;每日进…

数据库系统原理实验报告3 | 数据定义

整理自博主本科《数据库系统原理》专业课自己完成的实验报告&#xff0c;以便各位学习数据库系统概论的小伙伴们参考、学习。 专业课本&#xff1a; ———— 本次实验使用到的图形化工具&#xff1a;Heidisql 目录 一、实验目的 二、实验内容及步骤 1、创建数据库educ及其基…

自媒体洗稿软件文心一言api洗稿软件介绍

大家好&#xff0c;我是淘小白~ 这几天给一个客户写了一个文心一言api洗稿的软件。 一、客户要求&#xff1a; 1、采集头条文章&#xff08;软件内置可采集头条文章网址、微信文章网址、搜狐文章网址&#xff09; 2、调用文心一言api 3、多线程并发 4、逐段改写文章 5、…

关于YOLOv9项目中使用已有模块自由改进的教程

专栏介绍&#xff1a;YOLOv9改进系列 | 包含深度学习最新创新&#xff0c;助力高效涨点&#xff01;&#xff01;&#xff01; 1. 文件说明 在YOLOv5-v9&#xff0c;模型的结构是以yaml文件的存储。我们可以在原有的yaml基础上增、减、改模块&#xff0c;创作我们自己的模型。 …

springboot3+jdk17+MP整合最新版jersey详细案例,正真做到拿来即用

如题&#xff0c;springboot3.x java17 MP 整合最新jersey&#xff0c;各种请求类型&#xff08;实战/详解&#xff09; 文件上传下载 jersey资源注册 拦截器&#xff08;JWT&#xff09; 跨域处理 全局异常 Valid注解校验 等等 &#xff0c;除非你必须整合security&am…

五、分布式锁-redission

源码仓库地址&#xff1a;gitgitee.com:chuangchuang-liu/hm-dingping.git 1、redission介绍 目前基于redis的setnx特性实现的自定义分布式锁仍存在的问题&#xff1a; 问题描述重入问题同一个线程无法多次获取统一把锁。当方法A成功获取锁后&#xff0c;调用方法B&#xff0…

深度学习pytorch——GPU加速(持续更新)

使用 .to(device)&#xff0c;以前使用 .cuda() &#xff0c;但是现在基本不使用了。 代码示例&#xff1a; 查看电脑GPU运行情况&#xff1a; 使用Ctrl Shift ESC快捷键&#xff1a;

【DP】01背包问题与完全背包问题

一、01背包问题 有 N件物品和一个容量是 V 的背包。每件物品只能使用一次。 第 i 件物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。 输入格式 第一行两个整数&…

初探Flink集群【持续更新】

周末下雨&#xff0c;倒杯茶&#xff0c;在家练习Flink相关。 开发工具&#xff1a;IntelliJ Idea 第一步、创建项目 打开Idea&#xff0c;新建Maven项目&#xff0c;包和项目命名 在pom.xml 文件中添加依赖 <properties><flink.version>1.13.0</flink.vers…

SQLServer TRY_CONVERT函数

TRY_CONVERT&#xff1a;数据库中的安全转换利器 在数据库操作中&#xff0c;数据类型转换是一个常见的需求。然而&#xff0c;传统的转换方法在面对无法转换的数据时&#xff0c;往往会抛出错误&#xff0c;影响程序的稳定性和用户体验。为了解决这个问题&#xff0c;SQL Serv…

学习vue3第十节(插槽v-slot)

本节主要介绍一下 v-slot 插槽指令&#xff0c;以及插槽相关内容 1、定义&#xff1a; 子组件给父组件提供使用的一个位置&#xff0c;使用<slot></slot>表示&#xff0c;父组件可以在这个位置填充任何代码&#xff1b; 2、默认插槽 匿名插槽&#xff1a;会自定…