camunda 与 pycamunda
相关链接:
camunda 官方社区:https://docs.camunda.org/manual/7.17/
官方社区提供的REST_API:https://docs.camunda.org/manual/7.17/reference/rest/
GITHUB 社区:https://github.com/camunda-community-hub
GitHub开源库:https://github.com/camunda-community-hub
pycamunda的官方文档:https://pycamunda.readthedocs.io/en/latest/index.html
pycamunda相关介绍
1.介绍
主要使用到的功能分为3部分,创建用户,将camunda modeler文档部署到camunda服务器上,启动流程定义和流程实例,进行流程任务的管理
配置方式说明:
URL_CAMUNDA_ENGINE = "http://"+IP_ADDR+":8080/engine-rest"
其中的IP_ADDR需要换成camunda引擎部署机的IP,在多个函数中均会用到该内容
camunda文件的部署到启动过程包含:部署的创建,添加相关的资源文件,进行部署并获取部署的流程定义id,根据流程定义生成对应的流程实例,运行流程实例即开启了一个流程。
部署
在当前代码中采用的部署方式为单独创建部署,并且记录部署文件的"proc_key",
部署camunda操作使用:script.camunda_execute文件进行操作
每个camunda model文件包含一个唯一的proc_key
在生成一个部署后将该id 记录,每一个作业票的流程有对应的流程id
启动流程
在前端点击按键发起一个作业申请时,后端会执行启动一个流程的功能。启动流程的功能在下面进行介绍
完成相关任务
在前端页面有确认按键,以及同意或者拒绝,都会调用后端的端口,后端的实现是通过完成相关的任务实现的。
要启动流程需要知道一个流程的proc_def_id以及启动文件时所需的变量,
获取peoc_def_id 的方式:
def getlist_proc_def(key_name):
#key = key_name
getlist = processdef.GetList(url=URL_CAMUNDA_ENGINE,key=key_name,latest_version=True,sort_by='version',ascending=False)
proc_def = getlist()[0]
return proc_def.id_
启动流程核心代码位于:utils.camunda_func.start_proc2(proc_def_id,variable),其中填写的proc_def_id 是由
获取任务列表
当前端访问接口时,要获取用户当前所需处理的任务,就需要访问获取某个用户需要执行的任务的函数 获取到的任务就是需要执行的任务,其他没有执行的任务的可以从对应的停送电日志表,维修日志表,质量检测日志表中获取到,其中的内容包含对应的状态
2.创建用户函数
pycamunda提供的函数
from pycamunda import user
def create_user(user_list):
for users in user_list:
create = user.Create(url=url, id_=users, first_name=users,last_name=users, password="123456")
create()
# 说明:user.Create()在实例化后,需要进行执行才能创建出用户,create()不能省 url 为 URL_CAMUNDA_ENGINE = "http://"+IP_ADDR+":8080/engine-rest" 以下不再赘述
生成假数据的相关函数:
位置:script.postgresql_create_data 包含多个函数:添加部门,用户数据,以及将数据导入文件等操作
3.部署与启动流程相关
部署相关
from pycamunda import deployment
create_file = deployment.Create(url=url, name=filepath.split(".")[0], )
create_file.add_resource(content)
deploy = create_file()
def create_deploy_single(bpmnfile):
filepath = bpmnfile
with open(filepath, "rb") as ff:
content = ff.read()
create_file = deployment.Create(url=url, name=filepath.split(".")[0], )
create_file.add_resource(content)
deploy_dic = {}
try:
deploy = create_file()
pro_dic = deploy.deployed_process_definitions
deploy_id = deploy.id_
deploy_dic.update({"deploy_id": deploy_id})
#print(pro_dic)
# print(deploy_dic)
# items = pro_dic.items()
# print(len(pro_dic),pro_dic,type(pro_dic))
for key, value in pro_dic.items():
pro_id = value.id_
deploy_dic.update({"pro_id": pro_id,"key":value.key,"name":value.name})
return deploy_dic
except Exception as e:
print(e,sys._getframe().f_code.co_name)
return deploy_dic
流程定义和流程实例
获取流程定义id
from utils import camunda_func
proc_def_id = camunda_func.getlist_proc_def(key_name=proc_key)
根据流程定义id生成和启动流程实例
from pycamunda import processdef,processinst
def start_proc2(proc_def_id,variables):
# 部署后生成的proc_def_id
try:
"""
name: str, value: typing.Any, type_: str = None, value_info: str = None
"""
key = str(uuid.uuid4())
#print(key)
start_pro = processdef.StartInstance(url=url, id_=proc_def_id, business_key=key)
for variable in variables:
start_pro.add_variable(name=variable.get("name"),value=variable.get("value"),type_=variable.get("type"))
process1 = start_pro()
processinst.Activate(url=url, id_=process1.id_)
#print(process1.id_)
# camunda_api_url = url + '/process-definition/{}/form-variables'.format(str(proc_id))
#return json.dumps({"code": 0, "message": "start instance successful","proc_id":process1.id_})
#print( {"proc_inst_id":process1.id_})
return {"code": 0, "message": "start instance successful","proc_inst_id":process1.id_}
except Exception as e:
print(e,sys._getframe().f_code.co_name)
return {"code":1,"message":"start instance fail"}
其中涉及到了启动一个流程定义的实例,添加变量,对实例进行激活的操作
分别对应processdef.StartInstance,add_variables,
processinst.Activate
4.任务处理相关
任务列表获取
from pycamunda import task
def get_list_application_imformation2(assignee,taskname=None):
getlist = task.GetList(url=URL_CAMUNDA_ENGINE,assignee=assignee,process_definition_key=setup.power_proc_key)
tasktur = getlist()
代码实现在 utils.camunda_func.get_list_application_imformation2
任务完成
from pycamunda import task
resolve = task.Resolve(url=URL_CAMUNDA_ENGINE, id_=taskid)
var_label = item.get("label")
var = imformation_dic.get(var_label)
print(item,"#*"*30)
#status_judge(proc_inst_id,item,reason)
resolve.add_variable(name=var.get("id"), value=item.get("value"),
type_=var.get("type"),
value_info={})
resolve()
在代码中实现在utils.camunda_func.task_complete_variable
任务用户的委任
from pycamunda import task
set_assignee = task.SetAssignee(url=url,id_=task_id,user_id=user_name)
set_assignee()
代码实现在utils.set_assignee.user_task_set_assignee_complete函数以及该文件内其他函数中实现
5.定时设置任务实现
获取任务的执行部门的实现方式
代码位置:utils.camunda_func
def get_task_list_key(department,duty):
getlist = task.GetList(url=URL_CAMUNDA_ENGINE,process_definition_key=setup.test_proc_key)
tasktur = getlist()
# print(tasktur)
# print(duty)
for item in tasktur:
task_id = item.id_
task_assignee = item.assignee
if task_assignee == department:
mm = task.SetAssignee(url=URL_CAMUNDA_ENGINE,id_=task_id,user_id=duty)
mm()
代码位置:script.set_assignee
namelist = ["生产管理部"]
url = "http://192.168.8.99:9000/department/monitor/"
def set_assignee(namelist):
for name in namelist:
# 查看生产管理部当班人员是谁
post_json = {"department": name}
response_json = request_post(url, post_json)
#print(response_json)
monitor = response_json.get("账号")
# 查看哪些任务的当前委任者为生产管理部
# 修改任务的委任者为当值人员
get_task_list_key(name, monitor)