背景:分布式部署 一个主节点往各个节点下发任务(调用第三方服务),目的是为了测试各节点与第三方的连通性
思路:
主节点实现
创建Spring Boot项目:作为主节点的后端服务。
集成Eureka客户端:在主节点的pom.xml中添加Eureka客户端依赖,并在配置文件中配置Eureka服务器地址。
服务发现逻辑:编写服务来获取Eureka中的子节点列表,可以通过Eureka的REST API或者直接使用Spring Cloud提供的DiscoveryClient。
任务下发:接收外部请求,解析参数(外部系统接口名、IP、端口)。遍历从Eureka获取的子节点列表,对每个子节点发起HTTP请求,传递上述参数。
结果收集:收集各子节点的响应,组织成最终的结果列表。
子节点实现
创建Spring Boot项目:作为子节点的后端服务,并注册到Eureka服务器。
处理主节点请求:定义一个API来接收主节点的HTTP请求,提取外部系统接口名、IP、端口参数。
Socket调用外部接口:根据接收到的参数,使用Java的Socket编程建立到外部系统的连接,发送请求并等待响应。记得设置超时时间为2秒。
响应主节点:根据Socket调用的结果,构建响应信息(成功/失败及原因、外部接口名、IP、端口),并通过HTTP响应返回给主节点。
注意事项
错误处理:确保在每个关键步骤都有恰当的错误处理逻辑,比如网络请求失败、超时、Socket异常等。
日志记录:在主节点和子节点上都应记录详细的日志,便于问题追踪。
安全性:考虑对HTTP请求和Socket通信进行适当的安全加固,如使用HTTPS、身份验证等
废话不多说,直接上代码:新建一个简单的 springboot 项目,引入Eureka 等所需简单启动依赖
server 端(主节点)
参数说明:
externalApiName 策略名
ip 第三方 ip
port 第三方端口号
controller(表单方式接收数据)
@RestController
public class TaskDistributorController {
@Autowired
private TaskManagerService taskManagerService;
@PostMapping("/distributeTask")
public ResponseEntity<List<ResultDto>> distributeTask(TaskRequest taskRequest) {
try {
// 调用服务层逻辑处理任务分配
List<ResultDto> results = taskManagerService.distributeTasks(taskRequest);
return ResponseEntity.ok(results);
} catch (Exception e) {
return ResponseEntity.status(500).body(null);
}
}
}
service
@Service
@Slf4j
public class TaskManagerService {
@Autowired
private DiscoveryClient discoveryClient;
public List<ResultDto> distributeTasks(TaskRequest taskRequest) {
// 获取子节点列表
List<ServiceInstance> instances = discoveryClient.getInstances("deployment-test-tool-node");
String externalApiName = taskRequest.getExternalApiName();
List<ResultDto> results = new ArrayList<>();
for (ServiceInstance instance : instances) {
String workerUrl = "http://" + instance.getHost() + ":" + instance.getPort() + "/executeTask";
// 构建请求参数
MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
params.put("externalApiName", Collections.singletonList(externalApiName));
params.put("ip", Collections.singletonList(taskRequest.getIp()));
params.put("port", Collections.singletonList(String.valueOf(taskRequest.getPort())));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<MultiValueMap<String, String>> requestEntity = new HttpEntity<>(params, headers);
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ResultDto> response = restTemplate.postForEntity(workerUrl, requestEntity, ResultDto.class);
log.info("Response from worker: " + response.getBody());
response.getBody().setIp(instance.getHost());
response.getBody().setPort(String.valueOf(instance.getPort()));
response.getBody().setExternalApiName(externalApiName);
response.getBody().setExternalIpPort(taskRequest.getIp()+":"+taskRequest.getPort());
results.add(response.getBody());
}
return results;
}
}
node子节点(以 socket 方式发送请求)
controller
@RestController
public class WorkerController {
@Autowired
private ExternalSystemClient externalSystemClient;
@PostMapping("/executeTask")
public ResultDto executeTask(TaskRequest request) {
return externalSystemClient.callExternalApi(request.getExternalApiName(), request.getIp(), request.getPort());
}
}
service
@Service
public class ExternalSystemClient {
public ResultDto callExternalApi(String apiName, String ip, int port) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket();
SocketAddress address = new InetSocketAddress(ip, port);
socket.connect(address, 2000); // 这里的2000是创建链接超时时间,单位为毫秒
socket.setSoTimeout(2000); // 设置超时时间2秒
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
// 发送请求
out.println(apiName);
// 读取响应
String response = in != null ? in.readLine() : "";
if (response != null) {
return new ResultDto("success", "连接成功");
} else {
return new ResultDto("failed", "未收到响应");
}
} catch (Exception e) {
return new ResultDto("failed", e.getMessage());
} finally {
// 关闭资源
if (in != null) try { in.close(); } catch (Exception ignored) {}
if (out != null) try { out.close(); } catch (Exception ignored) {}
if (socket != null) try { socket.close(); } catch (Exception ignored) {}
}
}
}
模拟第三方以socket方式接收
socketserver
@Component
@Slf4j
public class SocketServer {
@Value("${server.port}")
private int port;
@PostConstruct
public void startServer() {
try (ServerSocket serverSocket = new ServerSocket(port)) {
log.info("Socket服务已启动,等待连接...");
while (true) {
try (Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
out.println("连接成功,测试联通性良好!");
} catch (IOException e) {
log.error("处理客户端连接时发生错误", e);
}
}
} catch (IOException e) {
log.error("启动Socket服务器时发生错误", e);
}
}
}