1.导入webSocket依赖
<!--websocket依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.编写webSocket类
package com.skyable.device.config.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* @author Administrator
*/
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
private static final Set<Session> SESSIONS = new HashSet<>();
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
log.info("webSocket link close");
}
/**
* @param error
*/
@OnError
public void onError(Throwable error) {
error.printStackTrace();
}
/**
* 接收数据
*
* @param data
*/
public static void sendDataToClients(String data) {
for (Session session : SESSIONS) {
try {
session.getBasicRemote().sendText(data);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnOpen
public void onOpen(Session session) {
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
/**
* 接收domainId
*/
SESSIONS.add(session);
sendDataToClients();
}
public void sendDataToClients() {
for (Session session : SESSIONS) {
try {
session.getBasicRemote().sendText("webSocket link succeed");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.skyable.device.config.webSocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Administrator
*/
@EnableWebSocket
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.kafak消费数据后调用webSocket方法
/**
* 获取kafka数据
*
* @param
*/
@Override
public void saveBatch(String jsonValue) {
ObjectMapper objectMapper = new ObjectMapper();
try {
//位置
JsonNode jsonNode = objectMapper.readTree(jsonValue);
if (jsonNode.has(VehicleConstant.LOCATION)) {
RealTimePosition realTimePosition = new RealTimePosition();
JsonNode locationNode = jsonNode.get("location");
String vehicleId = locationNode.get("vehicleId").asText();
double longitude = Double.parseDouble(locationNode.get("longitude").asText());
double latitude = Double.parseDouble(locationNode.get("latitude").asText());
long timeStamp = locationNode.get("timestamp").asLong();
realTimePosition.setTimeStamp(timeStamp);
realTimePosition.setLatitude(String.valueOf(latitude));
realTimePosition.setLongitude(String.valueOf(longitude));
realTimePosition.setVehicleId(vehicleId);
VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);
if (!Objects.isNull(locationVo)) {
//计算距离
RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");
RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");
Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");
Double meters = RedisUtil.convertMilesToKilometers(result);
DecimalFormat decimalFormat = new DecimalFormat("#.###");
String distance = decimalFormat.format(meters);
realTimePosition.setDistance(Double.parseDouble(distance));
} else {
realTimePosition.setDistance(0);
}
//获取省份
Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);
Map data = (Map) position.get("data");
String provinceName = data.get("shortname").toString().replaceAll("\"", "");
realTimePosition.setArea(provinceName);
deviceMapMapper.insertRealTimePosition(realTimePosition);
RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
try {
//报警
JsonNode jsonNode = objectMapper.readTree(jsonValue);
if (jsonNode.has(VehicleConstant.ALERT)) {
JsonNode alertNode = jsonNode.get("alert");
String vehicleId = alertNode.get("vehicleId").asText();
Integer alertType = alertNode.get("alertType").asInt();
long timeStamp = alertNode.get("timestamp").asLong();
Alerts alerts = new Alerts();
alerts.setAlertType(alertType);
alerts.setTimeStamp(timeStamp);
alerts.setVehicleId(vehicleId);
deviceMapMapper.insertAlerts(alerts);
RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//webSocket发送消息
VehicleAllVo vehicles = vehicles();
WebSocketServer.sendDataToClients(vehicles.toString());
}
4.发送消息内容
VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容
5.kafak消费者
package com.skyable.device.listener.Vehicle;
import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
/**
* Description:
*
* @author yangJun
* @date: 2023-08-18-14:12
*/
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {
private final DeviceMapService deviceMapService;
@KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")
public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {
recordList.parallelStream()
.map(ConsumerRecord::value)
.forEach(this::saveVehicleData);
}
private void saveVehicleData(String jsonValue) {
log.info("kafka data:" + jsonValue);
deviceMapService.saveBatch(jsonValue);
}
}
package com.skyable.device.listener.Vehicle;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName KafkaConsumerConfig
* @Description Kafka消费者配置
* @Author gaoy
* @Date 2021/2/25 15:02
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.concurrency}")
private int concurrency;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecords;
/**
* 批量消费工厂bean
* @return
*/
@Bean
KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
// 开启批量监听
factory.setBatchListener(true);
factory.setConcurrency(concurrency);
// 设置手动提交ackMode
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public Map consumerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//设置每次接收Message的数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//开启幂等性。
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
return props;
}
}