Kafka系列(二)将消息数据写入Kafka系统--生产者【异步发送、同步发送、单线程发送、多线程发送、配置生产者属性、自定义序列化、自定义主题分区】

Kafka系列

    • 发送消息到 Kafka 主题
    • 了解异步模式
    • 了解同步模式
    • 线程发送消息的步骤
      • 生产者用单线程发送消息
      • 生产者用多线程发送消息
    • 配置生产者属性
    • 保存对象的各个属性一序列化
      • 序列化一个对象
      • 序列化对象的存储格式
      • 自己实现 序列化的步骤
        • 1. 创建序列化对象
        • 2. 编写序列化工具类
        • 3. 编写自定义序列化逻辑代码
        • 4. 编写生产者应用程序
    • 自定义主题分区
      • 编写自定义主题分区算法
      • 演示自定义分区的作用

转自 《Kafka并不难学!入门、进阶、商业实战》

发送消息到 Kafka 主题

Kafka 0.10.0.0 及以后的版本,对生产者代码的底层实现进行了重构。kafka.producer.Producer类被 org.apache.kafka.clients.producer.KafkaProducer 类替换
Kafka 系统支持两种不同的发送方式–同步模式(Sync)和异步模式(ASync)。

了解异步模式

在 Kafka 0.10.0.0 及以后的版本中,客户端应用程序调用生产者应用接口,默认使用异步的方式发送消息。
生产者客户端在通过异步模式发送消息时,通常会调用回调函数的 send()方法发送消息。生产者端收到 Kafka 代理节点的响应后会触发回调函数

  1. 什么场景下需要使用异步模式
    假如生产者客户端与 Kafka 集群节点间存在网络延时(100ms),此时发送 10 条消息记录,则延时将达到 1s。而大数据场景下有着海量的消息记录,发送的消息记录是远不止 10条,延时将非常严重。
    大数据场景下,如果采用异步模式发送消息记录,几乎没有任何耗时,通过回调函数可以知道消息发送的结果。
  2. 异步模式数据写入流程
    例如,一个业务主题(ip login)有6个分区。生产者客户端写入一条消息记录时,消息记录会先写入某个缓冲区,生产者客户端直接得到结果(这时,缓冲区里的数据并没有写到 Kafka代理节点中主题的某个分区)。之后,缓冲区中的数据会通过异步模式发送到 Kafka 代理节点中主题的某个分区中
	//实例化一个消息记录对象,用来保存主题名、分区索引、键、值和时间戳
	ProducerRecord<byte[],byte[]> record =new ProducerRecord<byte[],byte[]>("ip login", key, value);
	//调用 send()方法和回调函数
	producer.send(myRecord,new Callback() {
		public void onCompletion (RecordMetadata metadata, Exception e){
			if (e != null) {
				e.printStackTrace();
			} else {
				System.out.println("The offset of the record we just sent is:" + metadata.offset());
			}
		}
	};

消息记录提交给 send()方法后,实际上该消息记录被放入一个缓冲区的发送队列,然后通过后台线程将其从缓冲区队列中取出并进行发送;发送成功后会触发send方法的回调函数-Callback.

了解同步模式

生产者客户端通过 send()方法实现同步模式发送消息,并返回一个 Future 对象,同时调用get()方法等待 Future 对象,看 send()方法是否发送成功。

  1. 什么场景下使用同步模式
    如果要收集用户访问网页的数据,在写数据到 Kafka 集群代理节点时需要立即知道消息是否写入成功,此时应使用同步模式。
// 将字符串转换成字节数组
byte[] key = "key".getBytes();
byte[] value ="value".getBytes();
// 实例化一个消息记录对象,用来保存主题名、分区索引、键、值和时间戳
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("ip_login",key, value);
//调用 send()函数后,再通过 get()方法等待返回结果
producer.send(record).get();

这里通过调用 Future 接口中的 get()方法等待 Kafka 集群代理节点(Broker)的状态返回如果 Producer 发送消息记录成功了,则返回 RecordMetadata 对象,该对象可用来查看消息记录的偏移量(Offset)。

线程发送消息的步骤

在 Kafka 系统中,为了保证生产者客户端应用程序的独立运行,通常使用线程的方式发送消息。
创建一个简单的生产者应用程序的步骤如下。
(1)实例化 Properties 类对象,配置生产者应答机制。有以下三个属性是必须设置的。其他属性一般都会有默认值,可以按需添加设置。

  • bootstrap.servers:配置Kafka集群代理节点地址;
  • key.serializer:序列化消息主键;
  • value.serializer:序列化消息数据内容,

(2)根据属性对象实例化一个 KafkaProducer.
(3)通过实例化一个ProducerRecord 对象,将消息记录以“键-值”对的形式进行封装。
(4)通过调用 KafkaProducer 对象中带有回调函数的 send方法发送消息给 Kafka 集群
(5)关闭KafkaProducer 对象,释放连接资源,

生产者用单线程发送消息

/**
 * 实现一个生产者客户端应用程序.
 */
public class JProducer extends Thread {

	private final Logger LOG = LoggerFactory.getLogger(JProducer.class);

	/** 配置Kafka连接信息. */
	public Properties configure() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址
		props.put("acks", "1"); // 设置应答模式, 1表示有一个Kafka代理节点返回结果
		props.put("retries", 0); // 重试次数
		props.put("batch.size", 16384); // 批量提交大小
		props.put("linger.ms", 1); // 延时提交
		props.put("buffer.memory", 33554432); // 缓冲大小
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化主键
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化值

		return props;
	}
	
	public static void main(String[] args) {
		JProducer producer = new JProducer();
		producer.start();
	}
	
	/** 实现一个单线程生产者客户端. */
	public void run() {
		Producer<String, String> producer = new KafkaProducer<>(configure());
		// 发送100条JSON格式的数据
		for (int i = 0; i < 100; i++) {
			// 封装JSON格式
			JSONObject json = new JSONObject();
			json.put("id", i);
			json.put("ip", "192.168.0." + i);
			json.put("date", new Date().toString());
			String k = "key" + i;
			// 异步发送
			producer.send(new ProducerRecord<String, String>("test_kafka_game_x", k, json.toJSONString()), new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null) {
						LOG.error("Send error, msg is " + e.getMessage());
					} else {
						LOG.info("The offset of the record we just sent is: " + metadata.offset());
					}
				}
			});
		}
		try {
			sleep(3000);// 间隔3秒
		} catch (InterruptedException e) {
			LOG.error("Interrupted thread error, msg is " + e.getMessage());
		}
		producer.close();// 关闭生产者对象
	}
}

这里的主题只有一个分区和一个副本,所以,发送的所有消息会写入同一个分区中
如果希望发送完消息后获取一些返回信息(比如获取消息的偏移量、分区索引值、提交的时间戳等),则可以通过回调函数 CallBack 返回的 RecordMetadata 对象来实现。
由于 Kafka 系统的生产者对象是线程安全的,所以,可通过增加生产者对象的线程数来提高 Kafka 系统的吞吐量。

生产者用多线程发送消息

public class JProducerThread extends Thread {

	// 创建一个日志对象
	private final Logger LOG = LoggerFactory.getLogger(JProducerThread.class);
	// 声明最大线程数
	private final static int MAX_THREAD_SIZE = 6;

	/** 配置Kafka连接信息. */
	public Properties configure() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址
		props.put("acks", "1"); // 设置应答模式, 1表示有一个Kafka代理节点返回结果
		props.put("retries", 0); // 重试次数
		props.put("batch.size", 16384); // 批量提交大小
		props.put("linger.ms", 1); // 延时提交
		props.put("buffer.memory", 33554432); // 缓冲大小
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化主键
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化值
		props.put("partitioner.class", "org.smartloli.kafka.game.x.book_4.JPartitioner");// 指定自定义分区类
		
		return props;
	}

	public static void main(String[] args) {
		// 创建一个固定线程数量的线程池
		ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_SIZE);
		// 提交任务
		executorService.submit(new JProducerThread());
		// 关闭线程池
		executorService.shutdown();
	}

	/** 实现一个单线程生产者客户端. */
	public void run() {
		Producer<String, String> producer = new KafkaProducer<>(configure());
		// 发送100条JSON格式的数据
		for (int i = 0; i < 10; i++) {
			// 封装JSON格式
			JSONObject json = new JSONObject();
			json.put("id", i);
			json.put("ip", "192.168.0." + i);
			json.put("date", new Date().toString());
			String k = "key" + i;
			// 异步发送
			producer.send(new ProducerRecord<String, String>("ip_login_rt", k, json.toJSONString()), new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null) {
						LOG.error("Send error, msg is " + e.getMessage());
					} else {
						LOG.info("The offset of the record we just sent is: " + metadata.offset());
					}
				}
			});
		}
		try {
			sleep(3000);// 间隔3秒
		} catch (InterruptedException e) {
			LOG.error("Interrupted thread error, msg is " + e.getMessage());
		}

		producer.close();// 关闭生产者对象
	}

}

配置生产者属性

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

保存对象的各个属性一序列化

序列化一个对象

在分布式环境下,无论哪种格式的数据都会被分解成二进制,以便存储在文件中或者在网络上传输。
序列化就是,将对象以一连串的字节进行描述,用来解决对象在进行读写操作时所引发的问题。
序列化可以将对象的状态写成数据流,并进行网络传输或者保存在文件或数据库中,在需要时再把该数据流读取出来,重新构造一个相同的对象。

  1. 为什么需要序列化
    在传统的企业应用中,不同的组件分布在不同的系统和网络中。如果两个组件之间想要进行通信,那么它们之间必须有数据转换机制。实现这个过程需要遵照一个协议来传输对象,这意味着,接收端需要知道发送端所使用的协议才能重新构建对象,以此来保证两个组件之间的通信是安全的。
public class JObjectSerial implements Serializable {

	private static Logger LOG = LoggerFactory.getLogger(JObjectSerial.class);

	/**
	 * 序列化版本ID.
	 */
	private static final long serialVersionUID = 1L;

	public byte id = 1; // 用户ID
	public byte money = 100; // 充值金额

	/** 实例化入口函数. */
	public static void main(String[] args) {
		try {
			FileOutputStream fos = new FileOutputStream("/tmp/salary.out"); // 实例化一个输出流对象
			ObjectOutputStream oos = new ObjectOutputStream(fos);// 实例化一个对象输出流
			JObjectSerial jos = new JObjectSerial(); // 实例化序列化类
			oos.writeObject(jos); // 写入对象
			oos.flush(); // 刷新数据流
			oos.close();// 关闭连接
		} catch (Exception e) {
			LOG.error("Serial has error, msg is " + e.getMessage());// 打印异常信息
		}
	}
}

序列化对象的存储格式

在这里插入图片描述

自己实现 序列化的步骤

在这里插入图片描述
如果使用原生的序列化方式,则需要将传输的内容拼接成字符串或转成字符数组,抑或是其他类型,这样在实现代码时就会比较麻烦。而 Kafka 为了解决这种问题,提供了序列化的接口,让用户可以自定义对象的序列化方式,来完成对象的传输。
以下实例将演示生产者客户端应用程序中序列化的用法,利用 Serializable 接口来序列化对象。

1. 创建序列化对象
/**
 * 声明一个序列化类.
 * 
 * @author smartloli.
 *
 *         Created by Apr 30, 2018
 */
public class JSalarySerial implements Serializable {

	/**
	 * 序列化版本ID.
	 */
	private static final long serialVersionUID = 1L;

	private String id;// 用户ID
	private String salary;// 金额

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getSalary() {
		return salary;
	}

	public void setSalary(String salary) {
		this.salary = salary;
	}

	// 打印对象属性值
	@Override
	public String toString() {
		return "JSalarySerial [id=" + id + ", salary=" + salary + "]";
	}

}

2. 编写序列化工具类
/**
 * 封装一个序列化的工具类.
 * 
 * @author smartloli.
 *
 *         Created by Apr 30, 2018
 */
public class SerializeUtils {

	/** 实现序列化. */
	public static byte[] serialize(Object object) {
		try {
			return object.toString().getBytes("UTF8");// 返回字节数组
		} catch (Exception e) {
			e.printStackTrace(); // 抛出异常信息
		}
		return null;
	}

	/** 实现反序列化. */
	public static <T> Object deserialize(byte[] bytes) {
		try {
			return new String(bytes, "UTF8");// 反序列化
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

}
3. 编写自定义序列化逻辑代码
/**
 * 自定义序列化实现.
 * 
 * @author smartloli.
 *
 *         Created by Apr 30, 2018
 */
public class JSalarySeralizer implements Serializer<JSalarySerial> {

	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {

	}

	/** 实现自定义序列化. */
	@Override
	public byte[] serialize(String topic, JSalarySerial data) {
		return SerializeUtils.serialize(data);
	}

	@Override
	public void close() {

	}

}

4. 编写生产者应用程序
/**
 * 自定义序列化, 发送消息给Kafka.
 * 
 * @author smartloli.
 *
 *         Created by Apr 30, 2018
 */
public class JProducerSerial extends Thread {

	private static Logger LOG = LoggerFactory.getLogger(JProducerSerial.class);

	/** 配置Kafka连接信息. */
	public Properties configure() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址
		props.put("acks", "1"); // 设置应答模式, 1表示有一个Kafka代理节点返回结果
		props.put("retries", 0); // 重试次数
		props.put("batch.size", 16384); // 批量提交大小
		props.put("linger.ms", 1); // 延时提交
		props.put("buffer.memory", 33554432); // 缓冲大小
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化主键
		props.put("value.serializer", "org.smartloli.kafka.game.x.book_4.serialization.JSalarySeralizer");// 自定义序列化值

		return props;
	}

	public static void main(String[] args) {
		JProducerSerial producer = new JProducerSerial();
		producer.start();
	}

	/** 实现一个单线程生产者客户端. */
	public void run() {
		Producer<String, JSalarySerial> producer = new KafkaProducer<>(configure());
		JSalarySerial jss = new JSalarySerial();
		jss.setId("2018");
		jss.setSalary("100");

		producer.send(new ProducerRecord<String, JSalarySerial>("test_topic_ser_des", "key", jss), new Callback() {
			public void onCompletion(RecordMetadata metadata, Exception e) {
				if (e != null) {
					LOG.error("Send error, msg is " + e.getMessage());
				} else {
					LOG.info("The offset of the record we just sent is: " + metadata.offset());
				}
			}
		});

		try {
			sleep(3000);// 间隔3秒
		} catch (InterruptedException e) {
			LOG.error("Interrupted thread error, msg is " + e.getMessage());
		}

		producer.close();// 关闭生产者对象
	}
}

在这里插入图片描述

自定义主题分区

在这里插入图片描述

编写自定义主题分区算法

/**
 * 实现一个自定义分区类.
 *
 * @author smartloli.
 *
 *         Created by Apr 30, 2018
 */
public class JPartitioner implements Partitioner {

	@Override
	public void configure(Map<String, ?> configs) {

	}

	/** 实现Kafka主题分区索引算法. */
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		int partition = 0;
		String k = (String) key;
		partition = Math.abs(k.hashCode()) % cluster.partitionCountForTopic(topic);
		return partition;
	}

	@Override
	public void close() {

	}

}

演示自定义分区的作用

/**
 * 实现一个生产者客户端应用程序.
 * 
 * @author smartloli.
 *
 *         Created by Apr 27, 2018
 */
public class JProducerThread extends Thread {

	// 创建一个日志对象
	private final Logger LOG = LoggerFactory.getLogger(JProducerThread.class);
	// 声明最大线程数
	private final static int MAX_THREAD_SIZE = 6;

	/** 配置Kafka连接信息. */
	public Properties configure() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");// 指定Kafka集群地址
		props.put("acks", "1"); // 设置应答模式, 1表示有一个Kafka代理节点返回结果
		props.put("retries", 0); // 重试次数
		props.put("batch.size", 16384); // 批量提交大小
		props.put("linger.ms", 1); // 延时提交
		props.put("buffer.memory", 33554432); // 缓冲大小
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化主键
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 序列化值
		props.put("partitioner.class", "org.smartloli.kafka.game.x.book_4.JPartitioner");// 指定自定义分区类
		
		return props;
	}

	public static void main(String[] args) {
		// 创建一个固定线程数量的线程池
		ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_SIZE);
		// 提交任务
		executorService.submit(new JProducerThread());
		// 关闭线程池
		executorService.shutdown();
	}

	/** 实现一个单线程生产者客户端. */
	public void run() {
		Producer<String, String> producer = new KafkaProducer<>(configure());
		// 发送100条JSON格式的数据
		for (int i = 0; i < 10; i++) {
			// 封装JSON格式
			JSONObject json = new JSONObject();
			json.put("id", i);
			json.put("ip", "192.168.0." + i);
			json.put("date", new Date().toString());
			String k = "key" + i;
			// 异步发送
			producer.send(new ProducerRecord<String, String>("ip_login_rt", k, json.toJSONString()), new Callback() {
				public void onCompletion(RecordMetadata metadata, Exception e) {
					if (e != null) {
						LOG.error("Send error, msg is " + e.getMessage());
					} else {
						LOG.info("The offset of the record we just sent is: " + metadata.offset());
					}
				}
			});
		}
		try {
			sleep(3000);// 间隔3秒
		} catch (InterruptedException e) {
			LOG.error("Interrupted thread error, msg is " + e.getMessage());
		}

		producer.close();// 关闭生产者对象
	}

}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/371009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

企业级大数据安全架构(九)FreeIPA管理员密码忘记后如何修改

作者&#xff1a;楼高 1重置Directory Server管理员密码 1.1停止directory server服务 [rootipa schema]# start-dirsrv HDP-HADOOP 如果你不知道你的实例名&#xff0c;可以通过如下方式获取 1.2生成一个新的HASH密码 停止服务后使用pwdhash命令生成一个新的HASH密码 [r…

HashMap的put和get流程

一、put流程图 首先进行哈希值的扰动&#xff0c;获取一个新的哈希值。(key null) ? 0 : (h key.hashCode()) ^ (h >>> 16); 判断tab是否位空或者长度为0&#xff0c;如果是则进行扩容操作。 if ((tab table) null || (n tab.length) 0)n (tab resize()).l…

JAVASE进阶:Collection高级(2)——源码剖析ArrayList、LinkedList、迭代器

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;JAVASE进阶&#xff1a;Collection高级&#xff08;1&#xff09;——源码分析contains方法、lambda遍历集合 &#x1f4da;订阅…

Java学习-内部类

内部类概述 1.成员内部类 注意&#xff1a; 2.静态内部类 3.局部内部类&#xff08;看看就行&#xff09; 4.匿名内部类 应用场景&#xff1a;通常作为一个参数传给方法 Eg.小猫和小狗都参加游泳比赛

图解支付-金融级密钥管理系统:构建支付系统的安全基石

经常在网上看到某某公司几千万的个人敏感信息被泄露&#xff0c;这要是放在持牌的支付公司&#xff0c;可能就是一个非常大的麻烦&#xff0c;不但会失去用户的信任&#xff0c;而且可能会被吊销牌照。而现实情况是很多公司的技术研发人员并没有足够深的安全架构经验来设计一套…

使用WPS制作三线表

点击边框和底纹点击1、2、3、4并且应用于表格点击确定 再次选中表格点击右键表格属性选择边框和底纹 选中表格第一行右键点击表格属性选择边框和底纹 如果表格中存在虚线

用户访问一个购物网站时TCP/IP五层参考模型中每一层的功能

当用户访问一个购物网站时&#xff0c;网络上的每一层都会涉及不同的协议&#xff0c;具体网络模型如下图所示。 以下是每个网络层及其相关的协议示例&#xff1a; 物理层&#xff1a;负责将比特流传输到物理媒介上&#xff0c;例如电缆或无线信号。所以在物理层&#xff0c;可…

调用其他数据库,事务回滚

1、定时 JDBC 的事务 2、事务提交 3、事务回滚 样例 Transactional(propagation Propagation.REQUIRES_NEW)RequestMapping(value "/ix_work_order", method RequestMethod.POST, consumes MediaType.APPLICATION_JSON_VALUE,produces MediaType.APPLICATION_…

spring boot3x登录开发-上(整合jwt)

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《spring boot实战》 &#x1f30a;山高路远&#xff0c;行路漫漫&#xff0c;终有归途。 目录 前置条件 jwt简介 导依赖 编写jwt工具类 1.配置项直接嵌入代码&#xff0c;通过类名.静态方法使用 2.配置项写到…

大数据 - Spark系列《三》- 加载各种数据源创建RDD

Spark系列文章&#xff1a; 大数据 - Spark系列《一》- 从Hadoop到Spark&#xff1a;大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 目录 3.1&#x1f9c0;加载文件(本地) 1. 加载本地文件路径 &#x1f32e;使用te…

让IIS支持SSE (Server Sent Events)

本文只探讨IISPython网站的情况&#xff0c;对于asp.net也应该不用这么麻烦。 先上结论&#xff1a;用反向代理&#xff1a; IIS URL Rewrite waitress Waitress是一个纯python编写独立的WSGI服务器&#xff0c;功能比Gunicorn弱一些&#xff0c;但可以运行在windows平台上&…

基于springboot智慧养老平台源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

牛客周赛 Round 31

D. 思路&#xff1a;使用map构造两个链表。 #include <bits/stdc.h> using namespace std;map<int,int> l,r; int main() {int q;cin>>q;int op-1e9-1;int ed1e91;r[op]ed;l[ed]op;while(q--){int a;cin>>a;if(a1){int x,y;cin>>x>>y;int…

echarts使用之饼图(四)

1 基本使用 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" cont…

【Elasticsearch】从入门到精通

目前java常见的针对大数据存储的方案并不多&#xff0c;常见的就是mysql的分库分表、es存储 这里偏向es存储方案&#xff0c;es不同的版本之间其实差异还挺大的&#xff0c;本篇博文版本Elasticsearch 7.14.0 Springboot整合Easy-Es Easy-Es官方文档 Elasticsearch的初步认识 …

【MATLAB源码-第135期】基于matlab的变色龙群优化算法CSA)机器人栅格路径规划,输出做短路径图和适应度曲线。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 变色龙群优化算法&#xff08;Chameleon Swarm Algorithm&#xff0c;CSA&#xff09;是一种新颖的群体智能优化算法&#xff0c;受到自然界中变色龙捕食和社交行为的启发。变色龙以其独特的适应能力而著称&#xff0c;能够根…

【vue3学习P5-P10】vue3语法;vue响应式实现

0、vue2和vue3对比 框架版本API方式双向绑定原理domFragmentsTree-Shakingvue2选项式API&#xff08;Options API&#xff09;基于Object.defineProperty&#xff08;监听&#xff09;实现&#xff0c;不能双向绑定对象类型的数据【通过Object.defineProperty里面的set和get做…

【Linux网络编程三】Udp套接字编程网络应用场景

【Linux网络编程三】Udp套接字编程网络应用场景 应用场景一&#xff1a;远程命令执行应用场景二&#xff1a;与Windos端相互通信应用场景三&#xff1a;简单聊天1.多线程化2.输入输出分开 应用场景一&#xff1a;远程命令执行 简单的服务器上一篇已经完成&#xff0c;接下来我…

Java项目管理01-Maven基础

一、Maven的常用命令和生命周期 1.Maven的常用命令使用方式 complie&#xff1a;编译&#xff0c;将java文件编译为class字节码文件 clean&#xff1a;清理&#xff0c;删除字节码文件 test&#xff1a;测试&#xff0c;运行项目中的test类 package&#xff1a;打包&#x…

高斯消去法 | LU分解 | PA=LU分解(MatLab)

一、问题描述 利用高斯消去法&#xff0c;LU 分解及PALU 分解求解非线性方程组。 二、实验目的 掌握高斯消去法、LU 分解、PALU 分解的算法原理&#xff1b;编写代码实现利用高斯消去法、LU 分解、PALU 分解来求解线性方程组。 三、实验内容及要求 1. 利用顺序高斯消去法求…