RocketMQ发送顺序消息原理与代码demo

RocketMQ 的顺序消息功能允许消息以发送的顺序被消费,这对于很多业务场景(如交易处理、订单生成等, 或某些需要按照一定顺序执行的业务场景)至关重要,因为这些场景下操作的执行顺序不能被打乱。顺序消息的实现需要确保消息在发送和存储过程中保持顺序,同时消费过程也要严格按照这一顺序进行

一. 消息发送与存储顺序

1. 消息分区

  1. 在发送消息时, 消息会基于某种分区策略(通常是业务标识符, 如订单ID)被分配到特定的消息队列。

  2. RocketMQ 使用了一致性哈希或其他算法来决定消息应该路由到哪个队列, 所有相同分区键的消息都会发送到同一队列

  3. 总结来说, 就是生产者实例在发送消息的时候, 必须发送到同一个 topic, 由于该 topic 下面的 queue 队列不会只有一个, 所以还要根据某个唯一的业务id, 来做一致性哈希, 来确保本次发送的几条消息, 只发送到该 topic 下的一个同 queue 里面

  4. 比如该 topic 下面有 4个 messageQueue, 如果有多条消息, 只会发送到一个固定的 queue里面

2. 队列选择

  1. 当生产者发送消息时, 它可以指定一个队列选择器, 该选择器基于业务键(如订单ID)决定使用哪个队列
  2. 通过这种方式, 所有关于同一实体的操作都会被顺序处理, 因为它们被发送到同一个队列中并且队列内部消息是有序的

3. 发送原理图

其实就是一个先进先出队列, 同一个 (业务id) 下的消息, 比如 订单生成, 支付, 完成, 他们订单id 都会一样, 经过 hash 后, 都会投递到该 topic 下的 一个 queue 里面, 并且会按照投递顺序进行消费

4. 发送消息Demo代码示例

下面模拟订单流程的, 创建 -> 支付 -> 发货流程的伪代码, 之后发送消息

@Autowired
private MessageQueueSelector testMessageQueueSelector;

@GetMapping("/messageTags")
public void testMessageTags(HttpServletRequest request) {
// 全局订单id
long orderId = SnowFlakeIdWorker.createIdWorker();

// 订单创建消息
OrderProcessCreate orderProcessCreate = new OrderProcessCreate();
orderProcessCreate.setOrderId(orderId);
orderProcessCreate.setProcessType("ORDER_CREATE");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessCreate.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
                           orderProcessCreate, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));

// 订单支付消息
OrderProcessPay orderProcessPay = new OrderProcessPay();
orderProcessPay.setOrderId(orderId);
orderProcessPay.setProcessType("ORDER_PAY");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessPay.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
                           orderProcessPay, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));

// 订单发货消息
OrderProcessLogistics orderProcessLogistics = new OrderProcessLogistics();
orderProcessLogistics.setOrderId(orderId);
orderProcessLogistics.setProcessType("ORDER_LOGISTICS");
RocketMqSender.sendMessage("order-process-topic:" + orderProcessLogistics.getProcessType(), String.valueOf(SnowFlakeIdWorker.createIdWorker()),
                           orderProcessLogistics, "user-login-token", testMessageQueueSelector, String.valueOf(orderId));
}

RocketMqSender 是自行封装的, 里面用的 springboot 的 rocketmqTemplate, 相关代码如下, 关键的2行代码是

1. 发送之前自定义 messageQueueSelector 接口的实例

2. 之后调用 syncSendOrderly 方法, 传递 topic,  message 和 hashKey

        rocketMQTemplate.setMessageQueueSelector(messageQueueSelector);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);

注意: 由于我用的 SpringBoot 框架封装的 RockeMQ-Starter, 这个最外层方法的 message 对象的 Spring 框架的 Message 顶级接口: 

org.springframework.messaging.Message

后续代码, 框架会把  Spring 框架的 Message 类, 转换为 Rocketmq 框架中的 Message 类

@Slf4j
@Component
public class RocketMqSender {

    private static RocketMQTemplate rocketMQTemplate;

    @Autowired
    private void setRocketMQTemplate(RocketMQTemplate rocketMQTemplate) {
        RocketMqSender.rocketMQTemplate = rocketMQTemplate;
    }

    public static void sendMessage(String topic, String messageKey, Object payload, String token, MessageQueueSelector messageQueueSelector, String hashKey) {
        if (Objects.isNull(topic) || Objects.isNull(payload) || StringUtils.isBlank(messageKey)) {
            log.error("topic or message or key not be null");
            return;
        }
        Message<Object> message = MessageBuilder
                .withPayload(payload)
                .setHeader(RocketMQHeaders.KEYS, messageKey)
                .setHeader(SystemDefines.JWT_TOKEN_HEADER, token)
                .build();
        rocketMQTemplate.setMessageQueueSelector(messageQueueSelector);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
        log.info("mq sendMessage success, topic={}, message={}", topic, JSON.toJSONString(payload));
    }

}

5. 顺序发送相关框架源码 

    public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        try {
            long now = System.currentTimeMillis();
            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
            if (delayLevel > 0) {
                rocketMsg.setDelayTimeLevel(delayLevel);
            }
// 如果是发送顺序消息, 这里的参数有 messageQueueSelector, 和 hashKey
            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
            long costTime = System.currentTimeMillis() - now;
            if (log.isDebugEnabled()) {
                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
            }
            return sendResult;
        } catch (Exception e) {
            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
            throw new MessagingException(e.getMessage(), e);
        }
    }

再往下看源码, 发现在真正发消息之前, 会选择 MessageQueue

rocketmq 默认帮我们实现的是有队列选择器的, 不过我们也可以自定义队列选择器

不过开源版本的, 服务器机房相关的 SelectMessageQueueByMachineRoom 源码没有实现

自定义队列选择器, 必须实现 rocketmq 的  MessageQueueSelector 的接口, 实现逻辑也是根据一致性 hash 

@Component
public class TestMessageQueueSelector implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String id = String.valueOf(arg);
        int hashCode = id.hashCode();
        hashCode = Math.abs(hashCode);
        long index = hashCode % mqs.size();
        return mqs.get((int) index);
    }

}

6. 消息到达 broker

  1. 消息首先到达 broke r的消息接收系统, broker 根据生产者指定的 topic 和队列(或自行计算得出的队列)确定消息的目标队列

  2. 每个消息队列在 broker 中都对应一个逻辑存储单元, 所有队列的消息都存储在同一个物理文件(CommitLog)中, 但逻辑上通过不同的队列ID和偏移量分隔

  3. 当 broker 接收到消息后, 它将消息追加到 commitLog 的末尾, broker 还会在内存中维护一个队列索引, 记录每个队列当前的读写位置

二. 消息消费顺序

1. 消费者行为

  1. 顺序消息的消费者在启动时会尝试锁定一个或多个队列,以确保在给定时间内只有一个消费者实例消费该队列中的消息

那么问题来了: 

问 : 由于消费队列 MessageQueue 存在于 broker 端, 如何保证一个队列只被一个消费者实例拉取到? 

原理是: 消费客户端先向 broker 端发起对 messageQueue 的加锁请求, 只有加锁成功时才创建pullRequest 进行消息拉取, 下面看下 lock 加锁请求方法源码: 

    public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
        final long delayMills) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
// 顺序消费者实例, 开始申请 messageQueue 锁
                boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
                if (lockOK) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
                } else {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
                }
            }
        }, delayMills, TimeUnit.MILLISECONDS);
    }
public boolean lock(final MessageQueue mq) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
        if (findBrokerResult != null) {
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);

            try {
// 创建远程请求, 请求 broker 获取 messageQueus 队列锁, 锁定成功, 返回这个 messageQueue
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }

                boolean lockOK = lockedMq.contains(mq);
                log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }
        }

        return false;
    }
    public Set<MessageQueue> lockBatchMQ(
        final String addr,
        final LockBatchRequestBody requestBody,
        final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);

        request.setBody(requestBody.encode());
// 根据参数传递的 broker addr 创建远程请求对象
        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
                Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
                return messageQueues;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    }

再往下就是 rocketmq 使用 netty 发送网络请求的代码

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        //get the request id
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
// 使用 netty 发送网络请求
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                }

                responseFuture.setSendRequestOK(false);
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr);
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

加锁成功, 获取到 messageQueue 后, 把 messageQueue 提交到线程池中进行消费

public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue,
        final long delayMills) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
// 上面的向 broker 申请队列锁
                boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq);
                if (lockOK) {
                    // 加锁成功后, 把 messageQueue 给到线程池进行消费
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10);
                } else {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000);
                }
            }
        }, delayMills, TimeUnit.MILLISECONDS);
    }
    private void submitConsumeRequestLater(
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final long suspendTimeMillis
    ) {
        long timeMillis = suspendTimeMillis;
        if (timeMillis == -1) {
            timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }

        if (timeMillis < 10) {
            timeMillis = 10;
        } else if (timeMillis > 30000) {
            timeMillis = 30000;
        }

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
// 调度线程去消费这个 messageQueue
                ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
            }
        }, timeMillis, TimeUnit.MILLISECONDS);
    }
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
// 构建 ConsumeRequest 提交到线程池
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

来看一下这个 ConsumeRequest,  发现该类实现了 Runnable 接口, 那么必然有 run 方法

org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest

// 实现了 Runnable 接口
    class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return processQueue;
        }

        public MessageQueue getMessageQueue() {
            return messageQueue;
        }

来看一下 run 方法, 发现直接使用 synchronized 关键字去申请 jvm 进程内的排他锁, 同一时刻只有一个线程可以执行该方法

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 请求 jvm 进程内的 排他锁, 同一时刻只有一个线程, 可以执行该方法, 消费消息
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

所以, 整个顺序消费, 消费者的行为也清晰了, 可以梳理一下, 只要有2点:

1. 如有多个消费者实例, 每个消费者实例, 都会去向 broker 申请 messageQueue 锁,

实现方式是使用 netty 远程请求到 broker 申请 MessageQueue 锁

2. 获取到 MessageQueue 锁的消费者实例中, 同一时刻, 有且只能有一个消费者线程去

实现方式是使用 synchronized 关键字去申请 jvm 排他锁

所以保证了: 一个 MessageQueue 中的消息, 在同一时刻, 只能由一个消费者实例中的一个消费者线程去消费消息

2. 代码运行模拟

启动微服务, 发现消费者客户端已经注册上去

查看 topic 下的 queue 数量, 有 4 个

调用 接口直接发送消息

http://localhost:7004/test/messageTags

观察打印的日志, 发现这批消息, 只会被 ConsumeMessageThread_orderProcess-check-group_3 这个消费者现成, 顺序的消费: 

2024-06-03 17:19:00 [ConsumeMessageThread_orderProcess-check-group_3]  INFO  OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_CREATE, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F0210006, queue=1, orderProcess=ORDER_CREATE, key=1797558588185509889
2024-06-03 17:19:01 [ConsumeMessageThread_orderProcess-check-group_3]  INFO  OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_PAY, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F02D0007, queue=1, orderProcess=ORDER_PAY, key=1797558588240035840
2024-06-03 17:19:02 [ConsumeMessageThread_orderProcess-check-group_3]  INFO  OrderProcessConsumer - currentThread=ConsumeMessageThread_orderProcess-check-group_3, topic=order-process-topic, tag=ORDER_LOGISTICS, megId=FDB22C26F4E40001000000000000000168A118B4AAC20E03F0390008, queue=1, orderProcess=ORDER_LOGISTICS, key=1797558588290367488

通过日志可以看到, 消息的 queue id 都为 1,  我们再次打开 rocketmq 控制台, 查询这个 topic 下的 queue id 为 1 的 MessageQueue, 发现现在最大位点为 27, 比发送之前的最大位点 24 多了 3, 也可以说明刚才的消息, 都投递到了 queue id 为 1的这个 MessageQueue 中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/676376.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【30天精通Prometheus:一站式监控实战指南】第10天:blackbox_exporter从入门到实战:安装、配置详解与生产环境搭建指南,超详细

亲爱的读者们&#x1f44b;   欢迎加入【30天精通Prometheus】专栏&#xff01;&#x1f4da; 在这里&#xff0c;我们将探索Prometheus的强大功能&#xff0c;并将其应用于实际监控中。这个专栏都将为你提供宝贵的实战经验。&#x1f680;   Prometheus是云原生和DevOps的…

【51单片机】智能百叶窗项目

文章目录 功能演示&#xff1a;前置要求&#xff1a;主要功能&#xff1a;主要模块&#xff1a;主函数代码&#xff1a; 具体的仿真程序和代码程序已经免费放置在资源中&#xff0c;如有需要&#xff0c;可以下载进行操作。 功能演示&#xff1a; 前置要求&#xff1a; 编译软…

Linux - 文件管理高级 find、grep

0.管道 | 将前面命令的标准输出传递给管道作为后面的标准输入 1.文件查找 find find 进行文件查找时&#xff0c;默认进行递归查找&#xff0c;会查找隐藏目录下的文件 1.1 用法 # find 查找路径 查找条件... -type // 文件类型 f 普通文件 b 设备 d …

MacOS13-将数据库转为markdown,docx格式

MacOS13-将数据库转为markdown&#xff0c;docx格式 文章目录 先说踩坑点各种模块缺失 代码效果总结参考 先说踩坑点 各种模块缺失 tkinter mysql 没错&#xff0c;你可以直接点击安装&#xff1b; 如果还出现报错 你需要打开终端 pip install mysqlclient再次点进去安…

C语言| 输出菱形*(梳理篇II)

C语言| 输出菱形*-CSDN博客 凡事还是得自己独立思考后&#xff0c;写一遍程序才能发现问题所在。 容易犯的错误&#xff1a; 【完整程序注释】 运行结果 /* 输出菱形 1 总行数 n为奇数&#xff0c;分上三角形下三角形&#xff0c;只考虑左边的空格和星号* 2 上三角形 行数…

toefl listening_托福听力

x.1 课程介绍 x.1.1 课程介绍 考试介绍 注意事项如下&#xff0c; x.1.2 分数设定和方法论 x.2.1 细节题解法 x.2.2 对话主旨题解法 听力对话不要扣分&#xff1b; 内容主旨题&#xff0c;以what开头&#xff1b; 目的主旨题&#xff0c;以why开头&#xff1b; 目的主旨题…

【论文笔记】Content-based Unrestricted Adversarial Attack

图2&#xff1a;Adversarial Content Attack的流程。首先使用Image Latent Mapping将图像映射到潜变量空间。然后&#xff0c;用Adversarial Latent Optimization生成对抗性样本。最后&#xff0c;生成的对抗性样本可以欺骗到目标分类模型。 3.1 Image Latent Mapping 对于扩…

代码随想录算法训练营第四十一天 | 理论基础、509.斐波那契数列、70.爬楼梯、746.使用最小花费爬楼梯

目录 理论基础 509.斐波那契数列 思路 代码 70.爬楼梯 思路 代码 746.使用最小花费爬楼梯 思路 代码 理论基础 代码随想录 视频&#xff1a;从此再也不怕动态规划了&#xff0c;动态规划解题方法论大曝光 &#xff01;| 理论基础 |力扣刷题总结| 动态规划入门_哔哩哔…

uni微信小程序editor富文本组件如何插入图片

需求 在editor中插入图片&#xff0c;并对图片进行编辑&#xff0c;简略看一下组件的属性&#xff0c;官网editor 组件 | uni-app官网 解决方案 首先要使用到ready这个属性&#xff0c;然后官网有给代码粘过来&#xff0c;简单解释一下这段代码的意思&#xff08;作用是在不同…

带大家做一个,易上手的家常猪肉炖白菜

今天 带大家做一个 猪肉炖白菜 一块猪肉 切片 一块生姜 两边

20240603在飞凌的OK3588-C开发板上跑原厂IPC方案时确认OV5645

v4l2-ctl --list-devices media-ctl -p -d /dev/media2 20240603在飞凌的OK3588-C开发板上跑原厂IPC方案时确认OV5645 2024/6/3 16:39 确认OV5645已经正常挂载了&#xff1a; Microsoft Windows [版本 10.0.22621.3296] (c) Microsoft Corporation。保留所有权利。 C:\Users\Q…

音频pop音的数学与物理解释

音频数据跳变太大的时候通常会有pop音&#xff0c;此时频谱上看pop音位置能量较高 音频中的“pop”音通常是由于信号的不连续性或瞬态变化造成的。这种不连续性的数学和物理原因可以从以下几个方面解释&#xff1a; 数学解释 信号不连续性 当音频信号发生突变时&#xff0c;…

从 0 到 1 带你认识 Git 在个人和企业开发中的原理及应用

文章目录 学习目标Git 初识提出问题如何解决&#xff1f;—— 版本控制器注意事项 Git 安装Linux CentOSLinux UbuntuWindows Git 基本操作创建 Git 本地仓库配置 Git 认识工作区、暂存区、版本库添加文件——场景一查看 .git 文件 添加文件——场景二 修改文件版本回退 学习目…

一文读懂GDPR

GDPR将对人们的网络足迹、使用的APP和服务如何保护或利用这些数据产生重大影响。 下面我们将对有关GDPR人们最关心的问题进行解读。 GDPR是什么&#xff1f; 一般数据保护条例&#xff08;General Data Protection Regulation&#xff09;是一项全面的法律&#xff0c;赋予了…

SaaS增长| 联盟营销经理必须要知道的十个关键指标!

你对你的联盟合作伙伴计划了解多少&#xff1f;这个问题的答案将取决于你的数据有多好&#xff0c;以及你跟踪数据的效率如何。 如果你还在整合各种资源&#xff0c;不必担心。合作伙伴计划需要时间和努力来建立&#xff0c;而且很难立即实施适当的报告制度&#xff0c;尤其是…

Python私教张大鹏万字长文讲解Tailwindcss Flex 和 Grid 布局相关的样式,附完整源码和效果截图

flex-basics 样式类 Utilities for controlling the initial size of flex items. 用于控制伸缩项的初始大小的实用程序。 基础样式 ClassPropertiesbasis-0flex-basis: 0px;basis-1flex-basis: 0.25rem; /* 4px */basis-2flex-basis: 0.5rem; /* 8px */basis-3flex-basis:…

程序员的五大职业素养,你知道吗?

程序员职业生涯的挑战与机遇 在当今这个科技日新月异的时代&#xff0c;程序员作为技术行业的中坚力量&#xff0c;其职业生涯无疑充满了无数挑战与机遇。技术的快速迭代要求他们必须不断学习新知识、掌握新技能&#xff0c;以跟上时代的步伐。同时&#xff0c;云计算、人工智…

python常见数据分析函数

apply DataFrame.apply(func, axis0, broadcastFalse, rawFalse, reduceNone, args(), **kwds) 第一个参数是函数 可以在Series或DataFrame上执行一个函数 支持对行、列或单个值进行处理 import numpy as np import pandas as pdf lambda x: x.max()-x.min()df pd.DataFrame(…

Spring Cloud学习笔记(Nacos):Nacos持久化(未完成)

这是本人学习的总结&#xff0c;主要学习资料如下 - 马士兵教育 1、Overview2、单机使用MySQL 1、Overview 我们关闭单机下的Nacos后&#xff0c;再重新启动会发现之前配置的内容没有被删除。这时因为Nacos有内嵌的数据库derby&#xff0c;会自己持久化。 但是在集群的情况下…

【用户画像】用户偏好购物模型BP

一、前言 用户购物偏好模型BP&#xff08;Buyer Preferences Model&#xff09;旨在通过对用户购物行为的深入分析和建模&#xff0c;以量化用户对不同商品或服务的偏好程度。该模型对于电商平台、零售商以及其他涉及消费者决策的商业实体来说&#xff0c;具有重要的应用价值。…