springboot集成dolphinscheduler
说明
为了避免对DolphinScheduler产生过度依赖,实践中通常不会全面采用其内置的所有任务节点类型。相反,会选择性地利用DolphinScheduler的HTTP任务节点功能,以此作为工作流执行管理的桥梁,对接并驱动自有项目的业务流程。这种策略不仅确保了流程编排的灵活性与扩展性,还有效减少了对外部调度系统的深度绑定,从而在提升项目自洽能力的同时,保持了良好的系统间解耦。
简而言之,我们倾向于仅采纳DolphinScheduler中的HTTP任务节点,作为调度机制的一部分,来促进我们内部项目工作流的自动化执行。这样做既能享受DolphinScheduler带来的调度便利,又避免了全盘接受其所有组件所带来的潜在风险,实现了更为稳健、可控的项目管理方案。
代码实现
为了优化与DolphinScheduler的集成,以下是三个关键配置类的概述,它们旨在通过初始化接口实现项目及租户信息的同步通知。值得注意的是,为了确保数据一致性和高效通信,你的Spring Boot应用所使用的数据库应与DolphinScheduler共享同一数据源。这一策略不仅简化了数据管理,还促进了实时状态更新,增强了系统的整体协调性。
简而言之,我们精心设计了三组配置规则,允许我们的Spring Boot项目无缝对接DolphinScheduler平台。通过这些配置,项目和租户的动态变化能够及时反映到DolphinScheduler中,前提是两者共用一个数据库实例。这种架构决策不仅优化了资源分配,还促进了跨系统间的紧密协作,为后续的业务拓展奠定了坚实的基础。
package cn.com.lyb.data.dev.init;
import cn.com.lyb.common.security.annotation.InnerRequest;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.core.web.request.Response;
import cn.com.lyb.core.web.request.ResultWrap;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
@Api(tags = "初始化dolphinscheduler数据库信息")
@RestController
@RequestMapping("/data-dev")
public class InitializePlugin {
@Autowired
private DataSource dataSource;
@Autowired
private DolphinschedulerConfig dolphinschedulerConfig;
@GetMapping("/init")
@ApiOperation("部署全新的环境可以用此接口,否则会报错")
@InnerRequest
public Response init(){
Connection connection = null;
try{
connection = dataSource.getConnection();
// 项目表(xgov如果集成项目的话,这个sql不能再执行,配置也需要改)
String projectSql = "INSERT INTO `dolphinscheduler`.`t_ds_project` (`id`, `name`, `code`, `description`, `user_id`, `flag`, `create_time`, `update_time`) VALUES (1, 'lyb', '" + dolphinschedulerConfig.getProjectCode() + "', '', 1, 1, '2024-06-13 02:49:43', '2024-06-13 02:49:43');";
String tokenSql = "INSERT INTO `dolphinscheduler`.`t_ds_access_token` (`id`, `user_id`, `token`, `expire_time`, `create_time`, `update_time`) VALUES (1, 1, '"+dolphinschedulerConfig.getDsdToken()+"', '2039-12-30 10:51:26', '2024-06-13 02:50:37', '2024-06-18 10:00:13');";
String tenantSql = "INSERT INTO `dolphinscheduler`.`t_ds_tenant` (`id`, `tenant_code`, `description`, `queue_id`, `create_time`, `update_time`) VALUES (1, 'default', '', 1, '2024-06-13 02:50:20', '2024-06-13 02:50:20');";
String userSql = "UPDATE `dolphinscheduler`.`t_ds_user` SET `user_name` = 'admin', `user_password` = '470b9934942620215ad1cb3ac2d48497', `user_type` = 0, `email` = 'xxx@qq.com', `phone` = '', `tenant_id` = 1, `create_time` = '2024-06-12 10:23:37', `update_time` = '2024-06-18 09:59:52', `queue` = '', `state` = 1, `time_zone` = 'Asia/Shanghai' WHERE `id` = 1;";
List<String> sqlList = Arrays.asList(projectSql, tenantSql, tokenSql, userSql);
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
for (String sql : sqlList) {
statement.addBatch(sql);
}
statement.executeBatch();
connection.commit();
statement.close();
}catch (Exception e){
throw new BizException("初始化报错");
}finally {
if(connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return ResultWrap.ok();
}
}
package cn.com.lyb.data.dev.web.config;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
@Configuration
public class RestTemplateConfig {
@Value("${xgov.template.connectTimeout}")
private int connectTimeout;
@Value("${xgov.template.socketTimeout}")
private int socketTimeout;
@Bean
public RestTemplate restTemplate() {
return new RestTemplate(httpRequestFactory());
}
@Bean
public HttpComponentsClientHttpRequestFactory httpRequestFactory() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(200); // 最大连接数
connectionManager.setDefaultMaxPerRoute(20); // 每个路由默认的最大连接数
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(connectTimeout) // 连接超时时间
.setSocketTimeout(socketTimeout) // 读取超时时间
.setConnectionRequestTimeout(5000) // 从连接池获取连接的超时时间
.build();
CloseableHttpClient httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig)
.build();
return new HttpComponentsClientHttpRequestFactory(httpClient);
}
}
package cn.com.dev.data.dev.web.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DolphinschedulerConfig {
@Value("${lyb.dolphinscheduler.server.username}")
private String dsdUsername;
@Value("${lyb.dolphinscheduler.server.token}")
private String dsdToken;
@Value("${lyb.dolphinscheduler.server.url}")
private String dsdUrl;
@Value("${lyb.dolphinscheduler.server.porjectCode}")
private String projectCode;
@Value("${lyb.dolphinscheduler.server.tenantCode}")
private String tenantCode;
public String getDsdUsername() {
return dsdUsername;
}
public void setDsdUsername(String dsdUsername) {
this.dsdUsername = dsdUsername;
}
public String getDsdToken() {
return dsdToken;
}
public void setDsdToken(String dsdToken) {
this.dsdToken = dsdToken;
}
public String getDsdUrl() {
return dsdUrl;
}
public void setDsdUrl(String dsdUrl) {
this.dsdUrl = dsdUrl;
}
public String getProjectCode() {
return projectCode;
}
public void setProjectCode(String projectCode) {
this.projectCode = projectCode;
}
public String getTenantCode() {return tenantCode;}
public void setTenantCode(String tenantCode) {this.tenantCode = tenantCode;}
}
构建一个调用类,该类全面集成了与DolphinScheduler接口的交互逻辑,为我们的应用提供了一层抽象。对于涉及具体业务逻辑的数据封装细节,此处将不再赘述,旨在保持代码的清晰度与通用性。
简言之,我们设计了一个专门的类来处理所有与DolphinScheduler的API调用,确保了业务核心逻辑的独立性和可维护性。这一封装策略使得代码库更加整洁,同时也提升了开发效率和系统的整体健壮性。
通过这种方式,我们不仅隔离了与外部服务的直接交互,还简化了业务逻辑的实现,使其更加专注于核心功能,而非调度系统的细节。这样的架构设计,有助于团队成员快速理解系统架构,同时也便于未来的功能扩展和系统维护。
package cn.com.lyb.data.dev.web.dolphinscheduler.service;
import cn.com.lyb.common.redis.service.RedisService;
import cn.com.lyb.core.exception.BizException;
import cn.com.lyb.data.dev.enums.ProcessExecutionTypeEnum;
import cn.com.lyb.data.dev.enums.TaskExecutionStatus;
import cn.com.lyb.data.dev.enums.WorkflowExecutionStatus;
import cn.com.lyb.data.dev.web.config.DolphinschedulerConfig;
import cn.com.lyb.data.dev.workflow.entity.delphinscheduler.TaskDefinition;
import cn.com.lyb.data.dev.workflow.entity.vo.GanttTaskVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ProcessInstanceVO;
import cn.com.lyb.data.dev.workflow.entity.vo.ResponseTaskLog;
import cn.com.lyb.data.dev.workflow.entity.vo.TaskInstanceVO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class DolphinschedulerService {
private static final Logger logger = LoggerFactory.getLogger(DolphinschedulerService.class);
private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
private static final String X_REQUESTED_WITH = "XMLHttpRequest";
@Autowired
private RestTemplate restTemplate;
@Autowired
private DolphinschedulerConfig dolphinschedulerConfig;
private static final Boolean DSD_SUCCESS = true;
@Autowired
private RedisService redisService;
private static final String DSD_SESSION_KEY = "DSD_SESSION_KEY";
private static final String SUCCESS = "success";
private static final String MSG = "msg";
// 此种方法适用登录后获取SESSION设置到header里面
private static final String SESSION_ID = "sessionId";
private static final String TOKEN = "token";
private static final String DATA = "data";
/**
* 登录,返回 sessionId
*/
public String login() {
String sessionValue = redisService.getCacheObject(DSD_SESSION_KEY);
if (StringUtils.isNotBlank(sessionValue)) {
return sessionValue;
}
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());
//linkedMultiValueMap.add("userPassword", dolphinschedulerConfig.getDsdPassword());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/login";
JSONObject resultJSON = doPostForObject(url, httpEntity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
String sessionId = data.get(SESSION_ID).toString();
redisService.setCacheObject(DSD_SESSION_KEY, sessionId, 23L, TimeUnit.HOURS);
return sessionId;
}
/**
* 创建项目
*
* @return
*/
public void createProject(String projectName, String description) {
// 如果是https登录可以使用该方法
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
linkedMultiValueMap.add("projectName", projectName);
linkedMultiValueMap.add("description", description);
linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
logger.info("Azkaban请求信息:" + httpEntity.toString());
String url = dolphinschedulerConfig.getDsdUrl() + "/projects";
doPostForObject(url, httpEntity);
}
/**
* 项目列表
*
* @param pageNo
* @param pageSize
* @param searchVal
*/
public Object projectsPage(Integer pageNo, Integer pageSize, String searchVal) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
String url;
if (StringUtils.isNotBlank(searchVal)) {
url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;
} else {
url = dolphinschedulerConfig.getDsdUrl() + "/projects?pageNo=" + pageNo + "&pageSize=" + pageSize;
}
JSONObject resultJSON = doGetForObject(url, entity);
return resultJSON.get(DATA);
}
/**
* 修改项目
*
* @param code
* @param projectName
* @param description
* @return
*/
public Object updateProjects(String code, String projectName, String description) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
linkedMultiValueMap.add("projectName", projectName);
linkedMultiValueMap.add("description", description);
linkedMultiValueMap.add("userName", dolphinschedulerConfig.getDsdUsername());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;
JSONObject resultJSON = doPutForObject(url, httpEntity);
return resultJSON.get(DATA);
}
/**
* 删除项目
*
* @param code
* @return
*/
public Object delProjects(String code) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + code;
JSONObject resultJSON = doDeleteForObject(url, httpEntity);
return resultJSON.get(DATA);
}
public JSONObject doPostForObject(String url, HttpEntity httpEntity) {
logger.info("调用url:{}", url);
try {
ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.POST, httpEntity, String.class);
String result = exchange.getBody();
logger.info("post类型接口调用返回信息:{}", result);
return getJsonObject(result);
} catch (BizException be) {
throw be;
} catch (Exception e) {
logger.error("post类型接口调用失败:{}", e);
throw new BizException(e.getMessage());
}
}
public JSONObject doGetForObject(String url, HttpEntity httpEntity) {
logger.info("调用url:{}", url);
try {
ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.GET, httpEntity, String.class);
String result = exchange.getBody();
logger.info("get类型接口调用返回信息:{}", result);
return getJsonObject(result);
} catch (BizException be) {
throw be;
} catch (Exception e) {
logger.error("get类型接口调用失败:{}", e);
throw new BizException(e.getMessage());
}
}
public JSONObject doPutForObject(String url, HttpEntity httpEntity) {
logger.info("调用url:{}", url);
try {
ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.PUT, httpEntity, String.class);
String result = exchange.getBody();
logger.info("put类型接口调用返回信息:{}", result);
return getJsonObject(result);
} catch (BizException be) {
throw be;
} catch (Exception e) {
logger.error("put类型接口调用失败:{}", e);
throw new BizException(e.getMessage());
}
}
public JSONObject doDeleteForObject(String url, HttpEntity httpEntity) {
logger.info("调用url:{}", url);
try {
ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.DELETE, httpEntity, String.class);
String result = exchange.getBody();
logger.info("delete类型接口调用返回信息:{}", result);
return getJsonObject(result);
} catch (BizException be) {
throw be;
} catch (Exception e) {
logger.error("delete类型接口调用失败:{}", e);
throw new BizException(e.getMessage());
}
}
private static JSONObject getJsonObject(String result) {
JSONObject resultJSON = JSON.parseObject(result);
if (!DSD_SUCCESS.equals(resultJSON.get(SUCCESS))) {
logger.error("调用结果返回异常:{}" + result);
Integer code = (Integer) resultJSON.get("code");
if(code.intValue() == 50019){
throw new BizException("流程节点间存在循环依赖");
}else if(code.intValue() == 50036){
throw new BizException("工作流任务关系参数错误");
} else {
throw new BizException(resultJSON.get(MSG).toString());
}
}
return resultJSON;
}
/**
* 创建工作流
*
* @param name
* @param description
* @param globalParams
* @param locations
* @param timeout
* @param taskRelationJson
* @param taskDefinitionJson
* @param otherParamsJson
* @param executionType
* @return 3.2.0 版本
*/
public Long createWorkFlow320(String name, String description, String globalParams, String locations, int timeout,
String taskRelationJson, String taskDefinitionJson, String otherParamsJson,
ProcessExecutionTypeEnum executionType) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("description", description);
linkedMultiValueMap.add("name", name);
linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
linkedMultiValueMap.add("timeout", timeout);
linkedMultiValueMap.add("executionType", executionType);
linkedMultiValueMap.add("otherParamsJson", otherParamsJson);
linkedMultiValueMap.add("globalParams", globalParams);
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";
JSONObject resultJSON = doPostForObject(url, httpEntity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
Long code = (Long) data.get("code");
return code;
}
/**
* 查询工作流列表
*
* @param pageNo
* @param pageSize
* @param searchVal
* @return
*/
public Object selectFlowPage(Integer pageNo, Integer pageSize, String searchVal) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
String url;
if (StringUtils.isNotBlank(searchVal)) {
url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize + "&searchVal=" + searchVal;
} else {
url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition?pageNo=" + pageNo + "&pageSize=" + pageSize;
}
JSONObject resultJSON = doGetForObject(url, entity);
return resultJSON.get(DATA);
}
public Object selectOneFlow(String code) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code;
JSONObject resultJSON = doGetForObject(url, entity);
return resultJSON.get(DATA);
}
public Long createWorkFlow(String name, String description, String locations,
String taskDefinitionJson, String taskRelationJson,
String executionType) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("description", description);
linkedMultiValueMap.add("name", name);
linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
linkedMultiValueMap.add("timeout", 0);
linkedMultiValueMap.add("executionType", executionType);
linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());
linkedMultiValueMap.add("locations", locations);
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition";
JSONObject resultJSON = doPostForObject(url, httpEntity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
Long code = (Long) data.get("code");
return code;
}
public void updateReleaseState(String name, String releaseState, Long code) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("name", name);
linkedMultiValueMap.add("releaseState", releaseState);
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + code + "/release";
doPostForObject(url, httpEntity);
}
public List<TaskDefinition> getTaskByWorkflowCode(Long dsdCode) {
//SSLUtil.turnOffSslChecking();
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;
JSONObject resultJSON = doGetForObject(url, httpEntity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
JSONArray jsonArray = (JSONArray) data.get("taskDefinitionList");
return jsonArray.toJavaList(TaskDefinition.class);
}
/**
* 删除工作流
*
* @param codes
*/
public void delWorkflow(String codes) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("codes", codes);
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/batch-delete";
doPostForObject(url, httpEntity);
}
public void updateWorkFlow(Long dsdCode, String name, String description, String locations, String taskDefinitionJson, String taskRelationJson, String executionType) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("description", description);
linkedMultiValueMap.add("name", name);
linkedMultiValueMap.add("taskDefinitionJson", taskDefinitionJson);
linkedMultiValueMap.add("taskRelationJson", taskRelationJson);
linkedMultiValueMap.add("timeout", 0);
linkedMultiValueMap.add("executionType", executionType);
linkedMultiValueMap.add("tenantCode", dolphinschedulerConfig.getTenantCode());
linkedMultiValueMap.add("locations", locations);
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/process-definition/" + dsdCode;
doPutForObject(url, httpEntity);
}
/**
* 运行工作流
*
* @param code
*/
public void runWorkflow(Long code) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
linkedMultiValueMap.add("processDefinitionCode", code);
linkedMultiValueMap.add("failureStrategy", "CONTINUE");
linkedMultiValueMap.add("warningType", "NONE");
linkedMultiValueMap.add("scheduleTime", getStringDate());
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/start-process-instance";
doPostForObject(url, httpEntity);
}
public String getStringDate() {
LocalDateTime currentDateTime = LocalDateTime.now();
LocalDateTime startDate = currentDateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
LocalDateTime endDate = startDate;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedStartDate = startDate.format(formatter);
JSONObject jsonObject = new JSONObject();
jsonObject.put("complementStartDate", formattedStartDate);
jsonObject.put("complementEndDate", formattedStartDate);
return jsonObject.toString();
}
/**
* 获取任务日志
*
* @param id
* @return
*/
public ResponseTaskLog getLog(Integer id, Integer limit, Integer skipLineNum) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/log/detail?taskInstanceId=" + id + "&limit=" + limit + "&skipLineNum=" + skipLineNum;
JSONObject resultJSON = doGetForObject(url, entity);
return JSON.parseObject(resultJSON.get(DATA).toString(), ResponseTaskLog.class);
}
/**
* 重跑任务
*
* @param processInstanceId
*/
public void operation(Integer processInstanceId, String executeType) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
// 1 REPEAT_RUNNING 重跑 2 STOP 停止 3 RECOVER_SUSPENDED_PROCESS 恢复运行 4 PAUSE 暂停
linkedMultiValueMap.add("processInstanceId", processInstanceId);
switch (executeType) {
case "1":
addExecutionDetails(linkedMultiValueMap, 1, "REPEAT_RUNNING", "run");
break;
case "2":
linkedMultiValueMap.add("executeType", "STOP");
break;
case "3":
addExecutionDetails(linkedMultiValueMap, 0, "RECOVER_SUSPENDED_PROCESS", "suspend");
break;
case "4":
linkedMultiValueMap.add("executeType", "PAUSE");
break;
default:
throw new BizException("暂不支持该操作");
}
HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
String url = dolphinschedulerConfig.getDsdUrl() + "/projects/" + dolphinschedulerConfig.getProjectCode() + "/executors/execute";
doPostForObject(url, httpEntity);
}
public void addExecutionDetails(MultiValueMap<String, Object> map, int index, String executeType, String buttonType) {
map.add("index", String.valueOf(index));
map.add("executeType", executeType);
if (buttonType != null) {
map.add("buttonType", buttonType);
}
}
public PageInfo<ProcessInstanceVO> processInstances(Long dsdWorkflowCode, String searchVal, Integer pageNum, Integer pageSize,
String startDate, String endDate, String stateType) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
.append("/projects/")
.append(dolphinschedulerConfig.getProjectCode())
.append("/process-instances?pageNo=").append(pageNum)
.append("&pageSize=").append(pageSize)
.append("&call=").append("1");// 这个必须加,不然删除工作流后,实例会不见
if(null != dsdWorkflowCode){
urlBuilder.append("&processDefineCode=").append(dsdWorkflowCode);
}
if(StringUtils.isNotBlank(searchVal)){
urlBuilder.append("&searchVal=").append(searchVal);
}
supplementaryParameters(startDate, endDate, stateType, urlBuilder);
JSONObject resultJSON = doGetForObject(urlBuilder.toString(), entity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
JSONArray jsonArray = (JSONArray) data.get("totalList");
List<ProcessInstanceVO> javaList = jsonArray.toJavaList(ProcessInstanceVO.class);
Integer total = (Integer) data.get("total");
PageInfo<ProcessInstanceVO> res = new PageInfo<>();
res.setList(javaList);
res.setTotal(total);
return res;
}
private void supplementaryParameters(String startDate, String endDate, String stateType, StringBuilder urlBuilder) {
if (stateType != null && !stateType.isEmpty()) {
urlBuilder.append("&stateType=").append(stateType);
}
if (startDate != null && !startDate.isEmpty()) {
urlBuilder.append("&startDate=").append(startDate);
}
if (endDate != null && !endDate.isEmpty()) {
urlBuilder.append("&endDate=").append(endDate);
}
}
public PageInfo<TaskInstanceVO> taskInstances(Integer processInstanceId, String startDate, String endDate,
String stateType, int pageNum, int pageSize) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
.append("/projects/")
.append(dolphinschedulerConfig.getProjectCode())
.append("/task-instances?pageNo=").append(pageNum)
.append("&pageSize=").append(pageSize)
.append("&processInstanceId=").append(processInstanceId)
.append("&taskExecuteType=").append("BATCH");
supplementaryParameters(startDate, endDate, stateType, urlBuilder);
String url = urlBuilder.toString();
JSONObject resultJSON = doGetForObject(url, entity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
JSONArray jsonArray = (JSONArray) data.get("totalList");
List<TaskInstanceVO> javaList = jsonArray.toJavaList(TaskInstanceVO.class);
Integer total = (Integer) data.get("total");
PageInfo<TaskInstanceVO> res = new PageInfo<>();
res.setList(javaList);
res.setTotal(total);
return res;
}
/**
* 获取工作流执行顺序
* @param processInstanceId
* @return
*/
public List<GanttTaskVO> viewGantt(Long processInstanceId) {
HttpHeaders hs = new HttpHeaders();
hs.add("Content-Type", CONTENT_TYPE);
hs.add("X-Requested-With", X_REQUESTED_WITH);
hs.add(TOKEN, dolphinschedulerConfig.getDsdToken());
HttpEntity<String> entity = new HttpEntity<String>(hs);
StringBuilder urlBuilder = new StringBuilder(dolphinschedulerConfig.getDsdUrl())
.append("/projects/")
.append(dolphinschedulerConfig.getProjectCode())
.append("/process-instances/")
.append(processInstanceId)
.append("/view-gantt");
String url = urlBuilder.toString();
JSONObject resultJSON = doGetForObject(url, entity);
JSONObject data = (JSONObject) resultJSON.get(DATA);
JSONArray jsonArray = (JSONArray) data.get("tasks");
List<GanttTaskVO> javaList = jsonArray.toJavaList(GanttTaskVO.class);
return javaList;
}
}