1.工程目录
从3.2.1版本之后这个dolphinscheduler中的RPC框架工程就从原来的dolphinscheduler-remote
工程重构到了dolphinscheduler-extract
工程。
- dolphinscheduler 父项目
- dolphinscheduler-extract RPC服务项目
- dolphinscheduler-extract-alert 监控告警服务RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract-base RPC框架核心工程
- dolphinscheduler-extract-common RPC框架通用工程
- dolphinscheduler-extract-master Master调度服务的RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract-worker Worker任务执行服务的RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract RPC服务项目
1.核心注解的设计
Dolphinscheduler中的RPC核心注解包含**@RpcService和@RpcMethod**。
1.1.@RpcService注解
这个注解的主要作用就是用来标记被它注解的接口是一个RPC服务接口
/**
* 这个注解的主要作用就是用来标记被它注解的接口是一个RPC服务接口
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcService {
}
1.1.@RpcMethod注解
这个注解需要再被@RpcService注解的接口类中定义的方法中进行使用,表明这个方法是一个RPC方法。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcMethod {
long timeout() default 3000L;
}
2.核心注解的使用
这两个注解再Dolphinscheduler中一般结合进行使用。用来定义一个RPC服务接口,这个接口需要再服务提供者服务中存在具体的接口实现类。,以下是一个RPC远程日志管理的接口定义。这个RPC服务接口再MasterServer服务和WorkerServer服务中都有对应的实现类。
@RpcService
public interface ILogService {
@RpcMethod
TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest);
@RpcMethod
TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest);
@RpcMethod
GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest);
@RpcMethod
void removeTaskInstanceLog(String taskInstanceLogAbsolutePath);
}
MasterServer服务中实现类
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MasterLogServiceImpl implements ILogService {
@Override
public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest) {
byte[] bytes =
LogUtils.getFileContentBytes(logicTaskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
// todo: if file not exists, return error result
return new TaskInstanceLogFileDownloadResponse(bytes);
}
@Override
public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) {
List<String> lines = LogUtils.readPartFileContent(
taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
taskInstanceLogPageQueryRequest.getSkipLineNum(),
taskInstanceLogPageQueryRequest.getLimit());
String logContent = LogUtils.rollViewLogLines(lines);
return new TaskInstanceLogPageQueryResponse(logContent);
}
@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
return new GetAppIdResponse(Collections.emptyList());
}
@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
FileUtils.deleteFile(taskInstanceLogAbsolutePath);
}
}
WorkerServer服务中实现类
package org.apache.dolphinscheduler.server.worker.rpc;
import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdRequest;
import org.apache.dolphinscheduler.extract.common.transportor.GetAppIdResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class WorkerLogServiceImpl implements ILogService {
@Override
public TaskInstanceLogFileDownloadResponse getTaskInstanceWholeLogFileBytes(TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest) {
byte[] bytes = LogUtils
.getFileContentBytes(taskInstanceLogFileDownloadRequest.getTaskInstanceLogAbsolutePath());
// todo: if file not exists, return error result
return new TaskInstanceLogFileDownloadResponse(bytes);
}
@Override
public TaskInstanceLogPageQueryResponse pageQueryTaskInstanceLog(TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest) {
List<String> lines = LogUtils.readPartFileContent(
taskInstanceLogPageQueryRequest.getTaskInstanceLogAbsolutePath(),
taskInstanceLogPageQueryRequest.getSkipLineNum(),
taskInstanceLogPageQueryRequest.getLimit());
String logContent = LogUtils.rollViewLogLines(lines);
return new TaskInstanceLogPageQueryResponse(logContent);
}
@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
String appInfoPath = null;
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId());
if (workerTaskExecutor != null) {
// todo: remove this kind of logic, and remove get appId method, the appId should be send by worker rather
// than query by master
appInfoPath = workerTaskExecutor.getTaskExecutionContext().getAppInfoPath();
}
String logPath = getAppIdRequest.getLogPath();
List<String> appIds = org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
return new GetAppIdResponse(appIds);
}
@Override
public void removeTaskInstanceLog(String taskInstanceLogAbsolutePath) {
FileUtils.deleteFile(taskInstanceLogAbsolutePath);
}
}
3.核心注解的扫描和解析
核心注解的扫描和解析一般都是有RPC的服务提供者进行,Dolphinscheduler中的RPC服务提供者是基于Netty的RpcServer。RpcServer启动时,会获取Spring容器中的所有被@RpcService注解的接口的实现类以及接口的方法被@RpcMethod注解修复的服务Bean注册中调用程序, 等待向客户端提供服务。服务器启动后,它将监听端口并等待客户端连接。
public class RpcServer implements ServerMethodInvokerRegistry, AutoCloseable {
private final NettyRemotingServer nettyRemotingServer;
public RpcServer(NettyServerConfig nettyServerConfig) {
this.nettyRemotingServer = NettyRemotingServerFactory.buildNettyRemotingServer(nettyServerConfig);
}
public void start() {
nettyRemotingServer.start();
}
@Override
public void registerServerMethodInvokerProvider(Object serverMethodInvokerProviderBean) {
// 获取到服务提供者服务Bean的所有接口,如果接口被@RpcService注解, 说明是RPC服务接口的实现类
// 再获取这个接口中所有被@RpcMethod方法注解的方法,然后将这个Bean对象及这些方法注册到提供者服务中,等待RPC客户端调用
for (Class<?> anInterface : serverMethodInvokerProviderBean.getClass().getInterfaces()) {
if (anInterface.getAnnotation(RpcService.class) == null) {
continue;
}
for (Method method : anInterface.getDeclaredMethods()) {
// 获取RPC接口中所有被@RpcMethod方法注解的方法
RpcMethod rpcMethod = method.getAnnotation(RpcMethod.class);
if (rpcMethod == null) {
continue;
}
ServerMethodInvoker serverMethodInvoker =
new ServerMethodInvokerImpl(serverMethodInvokerProviderBean, method);
// 创建serverMethodInvoker对象注册到提供者服务中,等待RPC客户端调用
nettyRemotingServer.registerMethodInvoker(serverMethodInvoker);
log.debug("Register ServerMethodInvoker: {} to bean: {}",
serverMethodInvoker.getMethodIdentify(), serverMethodInvoker.getMethodProviderIdentify());
}
}
}
@Override
public void close() {
nettyRemotingServer.close();
}
}
以上就是RPC服务接口定义、使用及服务提供者如何扫描解析RPC服务接口实现并注册的整个实现过程。希望大家看完都能有所收获,如果觉得文章写的还不错,喜欢的童鞋们请点赞收藏,送你一朵小红花哈~~~~~~