Dapr(三) Dapr核心组件的使用一

结合前两期 Dapr(一) 基于云原生了解Dapr(Dapr(一) 基于云原生了解Dapr-CSDN博客) Dapr(二) 分布式应用运行时搭建及服务调用(Dapr(二) 分布式应用运行时搭建及服务调用-CSDN博客)

下篇推出dapr服务注册与发现,dapr组件绑定,dapr Actor功能。

目录

1.0 Dapr状态管理

1.1 Dapr状态组件配置文件

1.2 状态控制器

1.3 切换其它状态存储

1.4 工作原理

2.0 发布订阅

2.1 什么是发布订阅

2.2 设置发布订阅组件

2.3 控制器代码

2.3.1 发布控制器

2.3.2 订阅控制器

2.4 修改文件Program.cs

2.5 切换组件 

2.6 工作原理

总结:


1.0 Dapr状态管理

Dapr的状态管理允许应用程序保存和检索键值对数据,具有可插拔的存储、配置的行为和额外的安全特性。以下是主要特点:

  1. 可插拔状态存储:Dapr支持多种数据存储,比如MySQL、Redis、Azure CosmosDB等,可以在不修改代码的情况下切换。

  2. 配置存储行为:你可以指定并发控制和一致性级别。默认是最终一致性,但也支持强一致性。

  3. 并发控制:通过ETags实现乐观并发控制(OCC)。写操作需要匹配当前的ETag值,防止冲突。

  4. 自动加密:预览功能,支持应用程序状态的自动加密和密钥轮换。

  5. 一致性选项:可以选择强一致性的写入,等待所有副本确认,或者默认的最终一致性。

  6. 批量操作:支持一次性处理多条状态记录。

1.1 Dapr状态组件配置文件

Dapr默认使用的Redis进行存储。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

1.2 状态控制器

 public class StateController : ControllerBase
    {
        private readonly ILogger<StateController> _logger;
        private readonly DaprClient _daprClient;
        public StateController(ILogger<StateController> logger, DaprClient daprClient)
        {
            _logger = logger;
            _daprClient = daprClient;
        }

        // 获取一个值
        [HttpGet]
        public async Task<ActionResult> GetAsync()
        {
            var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
            return Ok(result);
        }

        //保存一个值
        [HttpPost]
        public async Task<ActionResult> PostAsync()
        {
            await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
            return Ok("done");
        }

        //删除一个值
        [HttpDelete]
        public async Task<ActionResult> DeleteAsync()
        {
            await _daprClient.DeleteStateAsync("statestore", "guid");
            return Ok("done");
        }

        //通过tag防止并发冲突,保存一个值
        [HttpPost("withtag")]
        public async Task<ActionResult> PostWithTagAsync()
        {
            var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            await _daprClient.TrySaveStateAsync("statestore", "guid", Guid.NewGuid().ToString(), etag);
            return Ok("done");
        }

        //通过tag防止并发冲突,删除一个值
        [HttpDelete("withtag")]
        public async Task<ActionResult> DeleteWithTagAsync()
        {
            var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
        }


        // 从绑定获取一个值,健值name从路由模板获取
        [HttpGet("frombinding/{name}")]
        public ActionResult GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            return Ok(state.Value);
        }


        // 根据绑定获取并修改值,健值name从路由模板获取
        [HttpPost("withbinding/{name}")]
        public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            state.Value = Guid.NewGuid().ToString();
            return Ok(await state.TrySaveAsync());
        }


        // 获取多个个值
        [HttpGet("list")]
        public async Task<ActionResult> GetListAsync()
        {
            var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            return Ok(result);
        }

        // 删除多个个值
        [HttpDelete("list")]
        public async Task<ActionResult> DeleteListAsync()
        {
            var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            var removeList = new List<BulkDeleteStateItem>();
            foreach (var item in data)
            {
                removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
            }
            await _daprClient.DeleteBulkStateAsync("statestore", removeList);
            return Ok("done");
        }
    }

1.3 切换其它状态存储

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.mysql
  version: v1
  metadata:
  - name: connectionString
    value: "root:123456@tcp(192.168.157.157:3306)/?allowNativePasswords=true"

切换状态为MySql进行存储。

1.4 工作原理

应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用**可配置的状态存储组件**来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。

2.0 发布订阅

2.1 什么是发布订阅

发布订阅(Publish-Subscribe)是一种通信模式,允许发布者发送消息到一个中心节点(通常是消息代理或主题),而不关心具体哪些订阅者会接收到这些消息。订阅者则注册他们感兴趣的特定类型的消息,当匹配的消息发布时,他们会收到通知。这种模式的特点在于解耦了发布者和订阅者,提高了系统的灵活性和可扩展性。

关键元素包括:

  1. 发布者 (Publisher): 生产消息的实体,它向主题或消息代理发送消息,无需了解谁会接收这些消息。
  2. 订阅者 (Subscriber): 对特定消息感兴趣并希望接收通知的实体,它们通过订阅主题或消息代理来表达兴趣。
  3. 主题 或 消息代理 (Topic or Message Broker): 中间媒介,接收并分发消息,确保消息从发布者到达正确的订阅者。

一个简单的示例是新闻系统,其中发布者发布新闻到特定类别,而订阅者选择关注他们感兴趣的类别。发布者不直接通知订阅者,而是通过消息代理进行,这样订阅者仅接收与其订阅相匹配的新闻。

发布订阅模式的应用场景通常涉及异步通信、事件驱动的系统或需要解耦组件的场景。

2.2 设置发布订阅组件

Dapr默认使用的Redis进行发布订阅

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

2.3 控制器代码

2.3.1 发布控制器

    [ApiController]
    [Route("[controller]")]
    public class PubsubController : ControllerBase
    {
        private DaprClient _daprClient;
        private ILogger<PubsubController> _logger;

        public PubsubController(DaprClient daprClient, ILogger<PubsubController> logger)
        {
            _daprClient = daprClient;
            _logger = logger;
        }

        /// <summary>
        /// 发布消息的方法
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        [Route("pub")]
        public async Task<IActionResult> PublishMessage()
        {
            _logger.LogInformation("***发布消息***");
            var data = new UserInfo(10001,"操作员",19);
            await _daprClient.PublishEventAsync("pubsub", "user_topic",data);

            return Ok("***发布消息成功***");
        }
    }

2.3.2 订阅控制器

    [ApiController]
    [Route("[controller]")]
    public class SubController : ControllerBase
    {
        private ILogger<SubController> _logger;

        public SubController(ILogger<SubController> logger)
        {
            _logger = logger;
        }


        [HttpPost("sub")]
        [Topic("pubsub", "user_topic")]
        public IActionResult ConsumerMessage(UserInfo user)
        {
            _logger.LogInformation("***消费消息***");
            Console.WriteLine($"userId:{user.UserId} userName:{user.UserName}");
            return Ok();
        }
    }

2.4 修改文件Program.cs

app.UseCloudEvents();
app.MapSubscribeHandler();

2.5 切换组件 

切换为RabbitMQ

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://123:123@192.168.157.157:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: deliveryMode
    value: "0"
  - name: requeueInFailure
    value: "false"
  - name: prefetchCount
    value: "0"
  - name: reconnectWait
    value: "0"
  - name: concurrencyMode
    value: parallel
  - name: backOffPolicy
    value: "exponential"
  - name: backOffInitialInterval
    value: "100"
  - name: backOffMaxRetries
    value: "16"

2.6 工作原理

Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。服务将消息发布到指定主题, 业务服务订阅主题以使用消息。服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。

总结:

Dapr的发布订阅功能使得在分布式系统中实现发布/订阅消息模式变得更加简单。主要解决了不同消息产品之间实施复杂性和功能差异的问题。你可以通过Dapr的Sidecar API使用HTTP或gRPC来发布和订阅消息。以下是关键操作的概述:

  1. 发布(Publish)消息

    • 使用http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic> URL,其中 <dapr-port> 是Dapr Sidecar监听的端口,<pub-sub-name> 是选择的发布/订阅组件名,而 <topic> 是消息的目标主题。
  2. 订阅(Subscribe)消息

    • 应用程序在启动时,通过http://localhost:<appPort>/dapr/subscribe指定其订阅,其中 <appPort> 是应用程序监听的端口。
    • 订阅者处理消息后返回非错误响应,Dapr认为消息传递成功。
    • 支持订阅者通过响应负载中的状态进行精细化控制,比如指示重试(RETRY)或丢弃(DROP)消息。

Dapr的状态管理提供了一种跨服务持久化数据的方法,支持多种存储后端。关键特性包括:

  1. 原子性操作:支持原子性的读写操作,保证一致性。

  2. 版本控制:允许跟踪状态更改的历史版本,便于回滚。

  3. 事件驱动:状态变化可触发回调函数,实现基于状态变化的自动化操作。

  4. 过期策略:可设置状态项的过期时间。

  5. 备份与恢复:提供状态备份和恢复机制,确保高可用性。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/521976.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LVGL可视化设计-Gui Guider

&#xff08;提示&#xff1a;本篇编辑状态中&#xff0c;完成了70%左右&#xff0c;争取4-8前完成) 一、Gui Guider 概述 免费&#xff01;免费&#xff01;免费&#xff01;支持 LVGL v7、 v8.3很方便的&#xff1a;安装、使用 (另一种主流的visual studio模拟&#xff0c;省…

#pragma once的作用

使用visual studio新建头文件时&#xff0c;第一行会出现如下默认代码&#xff0c; #pragma once 它是一种编译器指令&#xff0c;通常用于确保头文件只被包含一次&#xff0c;以避免产生重复定义的问题。当编译器处理一个源文件时&#xff0c;遇到#pragma once指令时&#xf…

【Python】数据挖掘与机器学习(一)

【Python】数据挖掘与机器学习(一) 大家好 我是寸铁&#x1f44a; 总结了一篇【Python】数据挖掘与机器学习(一)sparkles: 喜欢的小伙伴可以点点关注 &#x1f49d; 【实验1】预测鲍鱼年龄 问题描述 请从一份数据中预测鲍鱼的年龄&#xff0c;数据集在abalone.cvs中&#xff…

SAP-SD VFX3释放销售订单发票报错:科目确定错误

VFX3 报错截图&#xff1a; VF03 - 检查发票信息 VKOA - 科目确定配置 核对是否有配置相应科目 以上~~

c++11 标准模板(STL)本地化库 - 平面类别 - (std::ctype) 定义字符分类表(六)

本地化库 本地环境设施包含字符分类和字符串校对、数值、货币及日期/时间格式化和分析&#xff0c;以及消息取得的国际化支持。本地环境设置控制流 I/O 、正则表达式库和 C 标准库的其他组件的行为。 平面类别 定义字符分类表 std::ctype template< class CharT > clas…

五款开放式蓝牙耳机推荐:这些宝藏耳机,你值得拥有!

如果你是长时间佩戴耳机的用户&#xff0c;那不入耳佩戴的开放式耳机可以让你彻底的“解放双耳”&#xff0c;长时间佩戴也不会觉得耳朵闷、耳朵疼&#xff1b;如果你是运动、健身爱好者&#xff0c;那通透、开放听感的开放式耳机可以提高你在运动时的安全性&#xff0c;让你在…

相位导数方差计算-matlab

%% 下面计算 相位导数方差% 假设 phase_map 是你的相位图二维矩阵 % K 是窗口的大小 k 3; % 请使用实际的窗口大小替换% 计算 x 和 y 方向的偏导 [dx, dy] gradient(wrappedPhase); Ksq k^2; % 计算 K^2half_k floor(k / 2);% 初始化结果矩阵 result zeros(size(wrappedPh…

【刷题篇】回溯算法(一)

文章目录 1、汉诺塔2、合并两个有序链表3、反转链表4、两两交换链表中的节点5、Pow(x, n)6、计算布尔二叉树的值 1、汉诺塔 在经典汉诺塔问题中&#xff0c;有 3 根柱子及 N 个不同大小的穿孔圆盘&#xff0c;盘子可以滑入任意一根柱子。一开始&#xff0c;所有盘子自上而下按升…

PyCharm使用指南(个性化设置、开发必备插件、常用快捷键)

&#x1f947;作者简介&#xff1a;CSDN内容合伙人、新星计划第三季Python赛道Top1 &#x1f525;本文已收录于Python系列专栏&#xff1a; 零基础学Python &#x1f4ac;订阅专栏后可私信博主进入Python学习交流群&#xff0c;进群可领取Python视频教程以及Python相关电子书合…

ubuntu20.04.6将虚拟机用户目录映射为磁盘Z

文章目录 linux虚拟机设置为NAT模式安装sshd服务映射目录到windows磁盘安装samba套件修改配置文件smb.conf重启smbd并设置用户名和密码 windows映射遇到的问题1、设置好之后映射不成功2、smbd下载失败3、smbd密码配置问题4、当有改动时候&#xff0c;最好重启一下smbd服务 linu…

阿里云2核4G服务器多少钱?优惠价格30元、165元和199元1年

阿里云2核4G服务器租用优惠价格&#xff0c;轻量2核4G服务器165元一年、u1服务器2核4G5M带宽199元一年、云服务器e实例30元3个月&#xff0c;活动链接 aliyunfuwuqi.com/go/aliyun 活动链接如下图&#xff1a; 阿里云2核4G服务器优惠价格 轻量应用服务器2核2G4M带宽、60GB高效…

代码随想录阅读笔记-二叉树【二叉搜索树的插入】

题目 给定二叉搜索树&#xff08;BST&#xff09;的根节点和要插入树中的值&#xff0c;将值插入二叉搜索树。 返回插入后二叉搜索树的根节点。 输入数据保证&#xff0c;新值和原始二叉搜索树中的任意节点值都不同。 注意&#xff0c;可能存在多种有效的插入方式&#xff0c;…

SpringBoot响应式RedisClient配置

大多数场景&#xff0c;默认配置的Redis客户端不满足业务场景&#xff0c;根源在于Redis key、value 序列化反序列化问题。因此&#xff0c;有必要配置自定义的客户端来满足需求。 默认配置源码如下&#xff0c;采用jdk序列化/反序列化方式进行&#xff0c;我们只需要配置相同…

中颖51芯片学习2. IO端口操作

一、SH79F9476 I/O端口介绍 1. 特性 SH79F9476提供了30/26位可编程双向 I/O 端口&#xff1b;端口数据在寄存器Px中&#xff1b;端口控制寄存器PxCRy是控制端口作为输入还是输出&#xff1b;端口作为输入时&#xff0c;每个I/O端口均带有PxPCRy控制的内部上拉电阻。有些I/O引…

软件测试(二)--测试用例

一、什么是用例: 用例就是用户使用案例的简称。以手机用例为例&#xff1a; 1.是否能开机&#xff1a;打开手机按下电源键3秒&#xff0c;看是否能开机。 2.验证内存&#xff1a;打开手机设置查看内存是否为64G. 3.验证屏幕&#xff1a;打开手机在白屏背景下检查屏幕是否有黑点…

【MySQL】数据操作语句(DML)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习计网、mysql和算法 ✈️专栏&#xff1a;MySQL学习 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac…

LeetCode 1017. 负二进制转换

解题思路 相关代码 class Solution {public String baseNeg2(int n) {if(n0) return "0";String s"";while(n!0)if(Math.abs(n)%20){nn/(-2);ss0;}else{ss1; n (n-1)/(-2);}String t reverse(s);return t;}public String reverse(String s){Str…

大广赛车机主体设计实践指南:必备技能速成攻略解读!

车机主体设计是什么 汽车作为代步工具距今已有 130 多年的历史。目前&#xff0c;在视觉范围内如此关注车载 HMI 的历史也只是近十年的事情&#xff0c;因为在过去&#xff0c;人们最注重的还是汽车技术的发展。但随着以交通安全为主的自动驾驶技术的不断发展&#xff0c;智能…

【nginx】使用nginx部署https协议

一、客户有证书提供 客户有证书的&#xff0c;或者有域名申请了免费证书的&#xff0c;直接根据下面的第5步骤&#xff0c;配置nginx即可。 二、 自己生成证书 1. 安装openssl-Win64 OpenSSL v3.1.1 Light 附下载地址 Win32/Win64 OpenSSL Installer for Windows - Shinin…

网站统计中的数据收集原理及实现

网站数据统计分析工具是网站站长和运营人员经常使用的一种工具&#xff0c;比较常用的有谷歌分析、百度统计和腾讯分析等等。所有这些统计分析工具的第一步都是网站访问数据的收集。目前主流的数据收集方式基本都是基于javascript的。本文将简要分析这种数据收集的原理&#xf…