某马头条——day06

自媒体文章上下架

使用消息队列在自媒体下架时通知文章微服务。 

kafka概述

 

kafka环境搭建

docker pull zookeeper:3.4.14
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

安装kafka

docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门案例

 

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

生产者类 

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}

 消费者类

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

存在一个消费者组和多个消费者组的区别

 分区机制

相当于数据分片?redis也有这个机制。

高可用设计方案

kafka生产者详解

发送类型

参数详解

acks=0和UDP很像

kafka消费者详解

多个分区无法保证消息的有序性,相当于,a先后发了两条消息给b,两条消息进了不同的分区,结果因为网络原因导致b先接收到了第二条消息。

提交和偏移量

这里因为不是每次消费都提交就会出现丢失和重复的问题。

偏移量提交方式

SpringBoot集成kafka收发消息

1.导入spring-kafka依赖信息

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>

2.在resources下创建文件application.yml

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 3.消息生产者

package com.heima.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("itcast-topic","黑马程序员");
        return "ok";
    }
}

4.消息消费者

package com.heima.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "itcast-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

传递消息为对象时

发送

@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}

接受

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }

    }
}

自媒体文章上下架功能实现

  • 已发表且已上架的文章可以下架

  • 已发表且已下架的文章可以上架

流程说明

 文章表存在一个属性字段是否上架

功能接口开发

在heima-leadnews-wemedia工程下的WmNewsController新增方法

@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
    return wmNewsService.downOrUp(dto);
}

在WmNewsDto中新增enable属性 

    /**
     * 上下架 0 下架  1 上架
     */
    private Short enable;

在WmNewsService新增方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */
public ResponseResult downOrUp(WmNewsDto dto);

实现方法

/**
 * 文章的上下架
 * @param dto
 * @return
 */
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
    //1.检查参数
    if(dto.getId() == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }

    //2.查询文章
    WmNews wmNews = getById(dto.getId());
    if(wmNews == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
    }

    //3.判断文章是否已发布
    if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
    }

    //4.修改文章enable
    if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
        update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
                .eq(WmNews::getId,wmNews.getId()));
    }
    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

测试通过

消息通知article数据同步

 

在heima-leadnews-common模块下导入kafka依赖

<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

在自媒体端的nacos配置中心配置kafka的生产者

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

在自媒体端文章上下架后发送消息

//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
    Map<String,Object> map = new HashMap<>();
    map.put("articleId",wmNews.getArticleId());
    map.put("enable",dto.getEnable());
    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}

在article端的nacos配置中心配置kafka的消费者

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

在article端编写监听,接收数据

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
public class ArtilceIsDownListener {

    @Autowired
    private ApArticleConfigService apArticleConfigService;

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
    public void onMessage(String message){
        if(StringUtils.isNotBlank(message)){
            Map map = JSON.parseObject(message, Map.class);
            apArticleConfigService.updateByMap(map);
            log.info("article端文章配置修改,articleId={}",map.get("articleId"));
        }
    }
}

新建ApArticleConfigService

public interface ApArticleConfigService extends IService<ApArticleConfig> {

    /**
     * 修改文章配置
     * @param map
     */
    public void updateByMap(Map map);
}

实现

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {


    /**
     * 修改文章配置
     * @param map
     */
    @Override
    public void updateByMap(Map map) {
        //0 下架 1 上架
        Object enable = map.get("enable");
        boolean isDown = true;
        if(enable.equals(1)){
            isDown = false;
        }
        //修改文章配置
        update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));

    }
}

测试成功tmd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/336014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode518. 零钱兑换 II

Every day a Leetcode 题目来源&#xff1a;518. 零钱兑换 II 解法1&#xff1a;动态规划 dp[i]: 总金额为 i 的硬币组合数。 初始化&#xff1a; dp[0] 1。 边界&#xff1a;dp[0]1。只有当不选取任何硬币时&#xff0c;金额之和才为 0&#xff0c;因此只有 1 种硬币组…

RT-Thread experimental 代码学习(1)thread_sample

RTOS的最基础功能是线程。 线程的调度是如何工作的&#xff1f;RT-thread官方的实验文档是最好的参考。 老规矩&#xff0c;先放法国人doxygen。 thread_sample 代码的调用关系图 有意思的是&#xff0c;RT有两种创建线程的方式 - 静态和动态&#xff0c;粗略的理解是&…

【跳槽面试】Redis中分布式锁的实现

分布式锁常见的三种实现方式&#xff1a; 数据库乐观锁&#xff1b;基于Redis的分布式锁&#xff1b;基于ZooKeeper的分布式锁。 本地面试考点是&#xff0c;你对Redis使用熟悉吗&#xff1f;Redis中是如何实现分布式锁的。 在Redis中&#xff0c;分布式锁的实现主要依赖于R…

各种Linux版本安装Docker

文章目录 一、Ubuntu 20.04.61. 网卡和DNS配置2. Docker安装 二、CentOS Linux 7.91. 网卡和DNS配置2. Docker安装 三、Alibaba Cloud Linux 31. DNS配置2. repo说明3. Docker安装 四、验证是否安装成功 一、Ubuntu 20.04.6 1. 网卡和DNS配置 /etc/netplan 找到 *.yaml 文件 …

Ansible详解(架构,模块)及部署示例

Ansible概述 Ansible是一个基于Python开发的配置管理和应用部署工具&#xff0c;也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点&#xff0c;几乎可以实现Puppet和Saltstack能实现的功能。 Ansible是一款开源的IT自动化工具&#xff0c;它能够自动执行配置管理、…

OpenCV-Python(43):姿势估计

目标 学习了解calib3D 模块学习在图像中创建3D效果 calib3D模块 OpenCV-Python的calib3D模块是OpenCV库中的一个重要模块&#xff0c;用于摄像头标定和三维重建等计算机视觉任务。该模块提供了一些函数和类&#xff0c;用于摄像头标定、立体视觉和三维重建等方面的操作。 下…

【Linux install】Ubuntu和win双系统安装及可能遇到的所有问题

文章目录 1.前期准备1.1 制作启动盘1.2关闭快速启动、安全启动、bitlocker1.2.1 原因1.2.2 进入BIOSshell命令行进入BIOSwindows设置中高级启动在开机时狂按某个键进入BIOS 1.2.3 关闭Fast boot和Secure boot 1.3 划分磁盘空间1.3.1 查看目前的虚拟内存大小 2.开始安装2.1 使用…

flutter开发web应用网络请求后台失败--记录遇到的跨源资源共享问题

前因 愉快开发flutter的web应用&#xff0c;发现网络请求后台一直请求不通啊&#xff0c;百思不得其解后偶然遇到了跨源资源共享&#xff08;CORS&#xff09;这一名词&#xff0c;才发现了问题关键所在。 什么是跨源资源共享 引用跨源资源共享&#xff08;CORS&#xff09;…

openGauss学习笔记-202 openGauss 数据库运维-常见故障定位案例-不同用户查询同表显示数据不同

文章目录 openGauss学习笔记-202 openGauss 数据库运维-常见故障定位案例-不同用户查询同表显示数据不同202.1 不同用户查询同表显示数据不同202.1.1 问题现象202.1.2 原因分析202.1.3 处理办法 openGauss学习笔记-202 openGauss 数据库运维-常见故障定位案例-不同用户查询同表…

从零开始c++精讲:第二篇——类和对象

文章目录 一、类的定义二、类的访问限定符及封装三、类的作用域四、类的实例化五、类对象模型5.1计算对象的大小5.2结构体内存对齐规则 六、this指针6.1简介6.2 this指针的特性 七、类的6个默认函数7.1构造函数7.2析构函数7.3拷贝构造函数7.4赋值运算符重载7.4.1运算符重载7.4.…

详解Python web框架到底是怎么来的?

前言 咱都知道软件开发的架构有两种&#xff0c;分别是C/S架构与B/S架构&#xff0c;本质上都是借助socket实现网络通信&#xff0c;因此Django作为一个web框架本质上也是一个socket服务端&#xff0c;浏览器则是客户端&#xff0c;我们可以自己实现简易的web框架来更好的理解…

linux sudo指令提权

sudo指令 sudo 是在linux中用于以超级用户&#xff08;root&#xff09;权限执行命令的命令。它允许普通用户在执行特定命令时提升其权限&#xff0c;以完成需要超级用户权限的任务。sudo 的名称是 "superuser do" 的缩写。 格式 接受权限的用户登陆的主机 &#xff…

【MySQL】——关系数据库标准语言SQL(大纲)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

OceanBase集群扩缩容

​ OceanBase 数据库采用 Shared-Nothing 架构&#xff0c;各个节点之间完全对等&#xff0c;每个节点都有自己的 SQL 引擎、存储引擎、事务引擎&#xff0c;天然支持多租户&#xff0c;租户间资源、数据隔离&#xff0c;集群运行的最小资源单元是Unit&#xff0c;每个租户在每…

MCM备赛笔记——图论模型

Key Concept 图论是数学的一个分支&#xff0c;专注于研究图的性质和图之间的关系。在图论中&#xff0c;图是由顶点&#xff08;或节点&#xff09;以及连接这些顶点的边&#xff08;或弧&#xff09;组成的。图论的模型广泛应用于计算机科学、通信网络、社会网络、生物信息学…

如何在Docker下部署MinIO存储服务通过Buckets实现文件的远程上传

&#x1f4d1;前言 本文主要是Linux下通过Docker部署MinIO存储服务实现远程上传的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#…

Modern C++ 一个例子学习条件变量

目录 问题程序 施魔法让BUG浮出水面 条件变量注意事项 修改程序 问题程序 今天无意中看到一篇帖子&#xff0c;关于条件变量的&#xff0c;不过仔细看看发现它并达不到原本的目的。 程序如下&#xff0c;读者可以先想想他的本意&#xff0c;以及有没有问题&#xff1a; #…

洛谷P5732 【深基5.习7】杨辉三角(C语言)

入门递推题&#xff0c;就算你不是OIer也该知道的杨辉三角 同时这也是组合数的公式&#xff0c;很重要&#xff0c;因为常规组合数公式是阶乘运算会爆&#xff0c;而这个就不怎么会了 赋 arr[i][j]初值1&#xff0c;接下来就可以递推了 #include<stdio.h> int main() …

温度采样【通道选通】S9KEAZ128的PTA2和PTA3引脚无法拉高

1、问题记录&#xff1a;由18串温度采样修改成32串温度采样&#xff0c;增加一路adc采样&#xff0c;通过cd4051控制通道选通&#xff0c;代码中增加了相应的代码&#xff0c;发现增加的最后8路温度不能够控制&#xff0c;以24串为例&#xff0c;给温度传感器增加温度&#xff…

01-开始Rust之旅

1. 下载Rust 官方推荐使用 rustup 下载 Rust&#xff0c;这是一个管理 Rust 版本和相关工具的命令行工具。下载时需要连接互联网。 这边提供了离线安装版本。本人学习的机器环境为&#xff1a; ubuntu x86_64&#xff0c;因此选用第②个工具链&#xff1b; 1. rust-1.75.0-x86_…