go rabbitmq 操作

go rabbitmq 操作

go 依赖包github.com/streadway/amqp

docker快速部署

docker pull rabbitmq:management
docker run -d rabbitmq:management # 先跑一个看看监听了哪些端口
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq #5672 go 程序连接,15672是管理页面

写个最基本生产者消费者demo(headers 模式)

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj *MQOBJ
)

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		log.Fatalln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}

}
func producer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		return
	}
	err = obj.ExchangeDeclare("go-test-exchange2", amqp.ExchangeHeaders, true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	//这个queue绑定,你也可以放消费者那边绑定,更灵活
	err = obj.Channel.QueueBind("go-test2", "go-test2", "go-test-exchange2", false, amqp.Table{"name": "jesko"})
	if err != nil {
		log.Fatalln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("", "go-test2", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain", Headers: amqp.Table{"x-match": "any", "name": "jesko", "age": 22}})
			if err != nil {
				log.Fatalln(err)
			}
			i++
		}
	}

}
func customer() {
	_, err := obj.Channel.QueueDeclare("go-test2", true, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	msgch, err := obj.Channel.Consume("go-test2", "", true, false, true, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + string(msg.Body))
		case <-ch:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	customer()
}

请添加图片描述
这里可以看到我们创建的queue
请添加图片描述

topic模式

topic模式不用绑定headers去匹配

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange", amqp.ExchangeTopic, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange", "go-test", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, name, "go-test-exchange", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

go-test 有信息(topic 匹配),go-test2没信息(topic未匹配)。

direct模式

package test

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"testing"
	"time"

	"github.com/streadway/amqp"
)

var (
	obj    *MQOBJ
	logger *log.Logger = log.New(os.Stdout, "", log.Llongfile|log.LUTC)
)

func Fataln(a ...any) {
	logger.Println(a...)
	os.Exit(0)
}

type MQOBJ struct {
	*amqp.Connection
	*amqp.Channel
}

func (mq *MQOBJ) Close() error {
	mq.Connection.Close()
	mq.Channel.Close()
	return nil
}
func init() {
	var mqurl = "amqp://cho:123@192.168.101.7:5672"
	con, err := amqp.Dial(mqurl)
	if err != nil {
		Fataln(err)
	}
	ch, err := con.Channel()
	if err != nil {
		Fataln(err)
	}
	obj = &MQOBJ{Connection: con, Channel: ch}
	fmt.Println("init success")

}
func producer() {
	err := obj.ExchangeDeclare("go-test-exchange3", amqp.ExchangeDirect, true, false, false, false, nil)
	if err != nil {
		Fataln(err)
	}
	ticker := time.NewTicker(time.Millisecond * 300)
	var i int
	for {
		select {
		case <-ticker.C:
			err = obj.Publish("go-test-exchange3", "", false, false, amqp.Publishing{Body: []byte("hello " + strconv.Itoa(i)), ContentType: "text/plain"})
			if err != nil {
				Fataln(err)
			}
			i++
		}
	}

}

type Empty struct{}

func customer(name string, stopchan <-chan Empty) {
	ch, err := obj.Connection.Channel()
	if err != nil {
		Fataln(err)
	}
	defer ch.Close()
	_, err = ch.QueueDeclare(name, true, false, false, false, nil)
	if err != nil {
		Fataln("queue declare failed", err)
	}
	err = ch.QueueBind(name, "", "go-test-exchange3", false, nil)
	if err != nil {
		fmt.Fprintln(os.Stderr, "queue bind failed", err)
		return
	}
	msgch, err := ch.Consume(name, "", true, false, true, false, nil)
	if err != nil {
		Fataln("consume failed", err)
	}
	for {
		select {
		case msg := <-msgch:
			fmt.Println("accept msg " + name + " " + string(msg.Body))
		case <-stopchan:
			return
		}
	}
}
func TestAmqp(t *testing.T) {
	defer obj.Close()
	go func() {
		producer()
	}()
	time.Sleep(2 * time.Second)
	stopchanlist := make([]chan Empty, 2)
	stopchanlist[0], stopchanlist[1] = make(chan Empty, 1), make(chan Empty, 1)
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-ch
		for _, c := range stopchanlist {
			c <- Empty{}
		}
	}()
	go customer("go-test", stopchanlist[0])
	customer("go-test2", stopchanlist[1])
}

demo测试命令

go test -v amqp_test.go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/464756.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

TPU浅谈

前言 大家好&#xff0c;我是jiantaoyab&#xff0c;上篇文章讲了FPGA和ASIC&#xff0c;讲解了 FPGA 如何实现通过“软件”来控制“硬件”&#xff0c;以及我们可以进一步把 FPGA 设计出来的电路变成一块 ASIC 芯片。今天我们来看看TPU。大家可以点击这篇文章TPU深入了解TPU。…

操作系统(OS)

文章目录 前言一、操作系统是什么&#xff1f;二、用户对资源的访问三、操作系统是怎么做到管理的&#xff1f; 前言 任何计算机系统都包含一个基本的程序集合&#xff0c;称为操作系统(OS)。冯诺依曼体系结构中的硬件单元提供的功能&#xff0c;这些硬件由操作系统来控制与管…

ChatGPT登陆提示:“Please unblock challenges.cloudflare.com to proceed…”

ChatGPT登陆时提示&#xff1a;“Please unblock challenges.cloudflare.com to proceed”&#xff0c; 说明&#xff1a;请解除对challenges.cloudflare.com的屏蔽以继续 原因及解决方法&#xff1a; 1、出现这个问题&#xff0c;一般都是网络和本地环境问题&#xff0c;可以…

Seata 2.x 系列【7】服务端集成 Nacos 2.x

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Seata 版本 2.0.0 本系列Spring Boot 版本 3.2.0 本系列Spring Cloud 版本 2023.0.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-seata-demo 文章目录 1. 概述2. 安装 N…

滑动窗口最大值(leetcode hot100)

给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 示例 1&#xff1a; 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输…

mvnd 安装和配置

mvnd 是 maven 的增强工具&#xff0c;在执行速度方面优于 maven 下载安装&#xff1a; https://github.com/apache/maven-mvnd/releases/ 根据不同的系统下载不同的安装包 配置环境变量 Path 新增 mvnd 安装路径下的 bin 目录 E:\maven-mvnd-1.0-m8-m39-windows-amd64\b…

学会Promise,看这里!!!

前言 众所周知&#xff0c;在JavaScript的世界中&#xff0c;代码都是单线程执行的。由于这个原因&#xff0c;JavaScript中的耗时操作&#xff0c;如网络操作、浏览器事件等&#xff0c;都需要异步执行。这也导致在JavaScript中异步操作是非常频繁且常见的。 异步&#xff1a…

B端能用就行,颜值无所谓?你现在还敢说吗,马上轮到工业HMI

在当前的商业环境下&#xff0c;用户体验和界面设计的重要性越来越受到重视&#xff0c;即使是B端用户也希望能够使用界面美观、易于操作的工业HMI系统。 漂亮的设计不仅可以提高用户的工作效率和满意度&#xff0c;还可以提升产品的竞争力和市场份额。因此&#xff0c;即使是…

Java 面试题之框架

1. Spring 是什么 Sping 是包含了众多工具方法的 IOC 容器&#xff0c;IOC是控制反转&#xff0c;说的是对象的创建和销毁的权利都交给 Spring 来管理了, 它本身又具备了存储对象和获取对象的能力. 。 容器&#xff1a;字面意思&#xff0c;用来容纳某种物品的装置。 比如 L…

力扣题目训练(22)

2024年2月15日力扣题目训练 2024年2月15日力扣题目训练563. 二叉树的坡度637. 二叉树的层平均值643. 子数组最大平均数 I304. 二维区域和检索 - 矩阵不可变154. 寻找旋转排序数组中的最小值 II 2024年2月15日力扣题目训练 2024年2月15日第二十二天编程训练&#xff0c;今天主要…

高并发缓存策略大揭秘:面试必备的缓存更新模式解析

在高并发场景中&#xff0c;缓存能抵挡大量数据库查询&#xff0c;减少数据库压力&#xff0c;对于缓存更新通常有以下几种模式可以选择&#xff1a; cache aside read/write through write behind caching cache aside模式 Cache-aside模式是一种常用的用于管理缓存的模…

【linux深入剖析】操作系统与用户之间的接口:自定义简易shell制作全过程

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1.shell2.自定义shell的准…

如何遍历map

小王学习录 前言遍历map集合1. 使用for-each循环遍历 entrySet()2. 使用迭代器遍历 entrySet()3. 通过 keySet() 遍历4. 使用迭代器遍历 keySet()5. 仅遍历 values() 如果只关心map中的值而不关心键&#xff0c;可以遍历 values()&#xff1a;6. 使用流(Streams)进行遍历 总结 …

typeorm导致nestjs通过@Query接收的参数为undefined

依赖版本如下,发现引入typeorm后导致接收不到Query参数,解决办法是将 TypeOrmModule导入语句放到前面就可以了

MT3004·找四边形

题目&#xff1a; 样例输入 4 12 1 2 1 3 1 4 2 1 2 3 2 4 3 1 3 2 3 4 4 1 4 2 4 3 样例输出 12 数据范围 算法设计 涉及的算法 枚举和图论基础 采用邻接矩阵g[N]来存储图&#xff0c;其中vector<ll> g[N]是建立了一个二维的vector 来用sum记录每个点 i 到达点 j…

java集合框架——Map集合概述

前言&#xff1a; 之前接触了单列合集&#xff0c;现在又接触了双列合集。整理下心得&#xff0c;打好基础&#xff0c;daydayup&#xff01;&#xff01; Map集合 Map集合称为双列集合&#xff0c;也被称为“键值对集合”。格式&#xff1a;{key1value1,key2value2...}&#…

网络学习:邻居发现协议NDP

目录 前言&#xff1a; 一、报文内容 二、地址解析----NS/NA 目标的被请求组播IP地址 邻居不可达性检测&#xff1a; 重复地址检测 路由器发现 地址自动配置 默认路由器优先级和路由信息发现 重定向 前言&#xff1a; 邻居发现协议NDP&#xff08;Neighbor Discovery…

MySQL数据库实现增删改查基础操作

准备工作 安装mysql8.0 (安装时一定要记住用户名和密码)安装数据库可视化视图工具Navicat 请注意⚠️⚠️⚠️⚠️ a. 编程类所有软件不要安装在中文目录下 b. Navicat破解版下载安装教程&#xff1a;&#xff08;由于文章审核提示版权问题&#xff0c;链接不方便给出&#xff…

虚拟内存相关知识汇总(程序重定位)

前置知识&#xff1a; Windows的内存可以被分为两个层面&#xff1a;物理内存和虚拟内存。其中&#xff0c;物理内存非常复杂&#xff0c;需要进入到Windows内核级别ring0才能看到。通常在用户模式下&#xff0c;用调试器看到的内存地址都是虚拟地址。 1.虚拟内存的定义 虚拟…

PCIE问题定位000:PCIe需要的定位手段

1、PCIe debug环境说明 本文将以PCIe EP用户逻辑举例&#xff0c;描述PCIe可以添加哪些定位手段。 如图所示&#xff0c;PCIe IP作为endpoint与RC对接&#xff0c;用户实现了应用逻辑&#xff0c;与PCIe IP进行交互&#xff0c;交互信号中data格式为TLP报文格式&#xff0c;且…