刷到这篇文章使用Q-learning去求接JSP ft06 问题用基本Q-learning解决作业车间调度问题(JSP),以FT06案例为例_q-learning算法在车间调度-CSDN博客
本着贼不走空的原则打算全部copy到本地试下,文章作者使用的tf06.txt在这里获取
https://web.cecs.pdx.edu/bart/cs510ss/project/jobshop/jobshop/ft06.txt
6 6
2 1 0 3 1 6 3 7 5 3 4 6
1 8 2 5 4 10 5 10 0 10 3 4
2 5 3 4 5 8 0 9 1 1 4 7
1 5 0 5 2 5 3 3 4 8 5 9
2 9 1 3 4 5 5 4 0 3 3 1
1 3 3 3 5 9 0 10 4 4 2 1
FT06 是什么问题
FT06 数据文件通常用于调度问题,特别是在作业车间调度(Job Shop Scheduling)中。根据你提供的数据格式,下面是对 ft06.txt
文件内容的详细解释:
文件结构
第一行:6 6 这行表示有 6 个作业(Jobs)和 6 个操作(Operations)。在 FT06 的情况下,作业和操作的数量相等。
接下来的几行是作业的调度信息,通常每行代表一个作业在不同机器上的操作时间。
数据解析
每一行的结构如下:
2 1 0 3 1 6 3 7 5 3 4 6
- 每行中的数字表示该作业在不同机器上的操作时间。
上面例子表示
2 1
:表示工件的第一个操作需要在机器 2 上进行,且该操作的加工时间为 1。0 3
:表示工件的第二个操作需要在机器 0 上进行,且该操作的加工时间为 3。1 6
:表示工件的第三个操作需要在机器 1 上进行,且该操作的加工时间为 6。3 7
:表示工件的第四个操作需要在机器 3 上进行,且该操作的加工时间为 7。5 3
:表示工件的第五个操作需要在机器 5 上进行,且该操作的加工时间为 3。4 6
:表示工件的第六个操作需要在机器 4 上进行,且该操作的加工时间为 6。
目标
FT06 的目标是最小化所有作业的完成时间(makespan)或其他调度指标,如总延迟、最大延迟等。执行代码
执行完成后打印
工件顺序列表: [5, 0, 5, 1, 0, 0, 0, 2, 4, 4, 0, 5, 5, 0, 4, 4, 1, 1, 1, 1, 1, 2, 3, 3, 2, 5, 2, 2, 2, 5, 3, 3, 3, 4, 3, 4]
各工序完工时间: [[1, 4, 17, 24, 27, 33],
[11, 20, 48, 58, 68, 72],
[6, 76, 84, 93, 94, 101],
[25, 73, 78, 81, 109, 118],
[15, 20, 38, 42, 96, 97],
[3, 6, 36, 46, 52, 53]]
开始时间列表:[0, 0, 3, 3, 1, 11, 17, 1, 6, 17, 24, 27, 36, 27, 33, 38, 15, 38, 48, 58, 68, 72, 20, 68, 76, 48, 84, 93, 94, 52, 73, 78, 101, 93, 109, 96]
文章作者也说算法实在不断收敛的,但是最终和FT06案例的最优解(55)还有很大的差距,这是因为对于动作空间的设计太过简单,而且没有将机器的空闲时间利用起来,同一机器前后工序间的空闲时间太大,导致调度效果不理想。
遗传算法
知乎上看到这篇使用遗传算法求接的帖子
https://zhuanlan.zhihu.com/p/684600755
import numpy as np
import matplotlib.pyplot as plt
import random
import copy
import random
def createInd(J):
'''
初始化操作,一次初始化一个个体,机器矩阵从1开始
J: 机器顺序矩阵,J[i, j]表示加工i工件的第j个操作的机器号。大小为n*m
T: 加工时间矩阵,T[i, j]表示工件j再机器i上的加工时间。大小为m*n
'''
n = J.shape[0] # 工件数
# m = J.shape[1] # 机器数
s = []
Ji = J.copy()
while not np.all(Ji == 0):
I = np.random.randint(0, n)
M = Ji[I, 0]
if M != 0:
s.append(I)
b = np.roll(Ji[I, :], -1)
b[-1] = 0
Ji[I, :] = b
return s
def decode(J, P, s):
"""
function:
JSP解码函数,用于计算C_max和生成甘特图。
parameter:
- J: 机器顺序矩阵,J[i, j]表示加工i工件的第j个操作的机器号。
- P: 加工时间矩阵,P[i, j]表示工件i在机器j上的加工时间。
- s: 待解码的序列。
return:
- T: 甘特图矩阵。
- M: 工件排列矩阵。
- C: 完工时间矩阵。
"""
n, m = J.shape
T = [[[0]] for _ in range(m)]
C = np.zeros((n, m))
k = np.zeros(n, dtype=int)
for job in s:
machine = J[job, k[job]] - 1
process_time = P[job, k[job]]
last_job_finish = C[job, k[job] - 1] if k[job] > 0 else 0
# 寻找机器上的第一个合适空闲时间段
start_time = max(last_job_finish, T[machine][-1][-1]) # 默认在最后一个任务后开始
insert_index = len(T[machine]) # 默认插入位置在末尾
for i in range(1, len(T[machine])):
gap_start = max(T[machine][i - 1][-1], last_job_finish)
gap_end = T[machine][i][0]
if gap_end - gap_start >= process_time:
start_time = gap_start # 找到合适的起始时间
insert_index = i # 更新插入位置
break
end_time = start_time + process_time
C[job, k[job]] = end_time
T[machine].insert(insert_index, [start_time, job, k[job], end_time])
k[job] += 1
# 根据T矩阵构建M矩阵
M = [[] for _ in range(m)]
for machine in range(m):
for entry in T[machine][1:]:
M[machine].append(entry[1]) # 工件号
return T, M, C
def drawGantt(timeList):
T = timeList.copy()
# 创建一个新的图形
plt.rcParams['font.sans-serif'] = ['SimHei']
fig, ax = plt.subplots(figsize=(10, 6))
# 颜色映射字典,为每个工件分配一个唯一的颜色
color_map = {}
for machine_schedule in T:
for task_data in machine_schedule[1:]:
job_idx, operation_idx = task_data[1], task_data[2]
if job_idx not in color_map:
# 为新工件分配一个随机颜色
color_map[job_idx] = (random.random(), random.random(), random.random())
# 遍历机器
for machine_idx, machine_schedule in enumerate(T):
for task_data in machine_schedule[1:]:
start_time, job_idx, operation_idx, end_time = task_data
color = color_map[job_idx] # 获取工件的颜色
# 绘制甘特图条形,使用工件的颜色
ax.barh(machine_idx, end_time - start_time, left=start_time, height=0.4, color=color)
# 在色块内部标注工件-工序
label = f'{job_idx}-{operation_idx}'
ax.text((start_time + end_time) / 2, machine_idx, label, ha='center', va='center', color='white',
fontsize=10)
# 设置Y轴标签为机器名称
ax.set_yticks(range(len(T)))
ax.set_yticklabels([f'Machine {i+1}' for i in range(len(T))])
# 设置X轴标签
plt.xlabel("时间")
# 添加标题
plt.title("JSP问题甘特图")
# 创建图例,显示工件颜色
legend_handles = []
for job_idx, color in color_map.items():
legend_handles.append(plt.Rectangle((0, 0), 1, 1, color=color, label=f'Job {job_idx}'))
plt.legend(handles=legend_handles, title='工件')
# # 显示图形
# plt.show()
def createPop(Jm, popSize):
pop = []
for i in range(popSize):
pop.append(createInd(Jm))
return pop
def cross(A, B):
'''
A, B是两个相同长度的list,该函数用于交叉两个list,然后返回两个新的list
'''
n = len(A)
r1 = np.random.randint(n)
r2 = np.random.randint(n)
# r1, r2 = 3, 1
rl, rr = min(r1, r2), max(r1, r2)
if rl == rr:
return A, B
# for A
bt = copy.deepcopy(B)
afinal = copy.deepcopy(A)
for i in range(rl, rr+1):
bt.remove(A[i])
k = 0
for i in range(n):
if i < rl or i > rr:
afinal[i] = bt[k]
k += 1
# for B
# print(A, B)
at = copy.deepcopy(A)
bfinal = copy.deepcopy(B)
for i in range(rl, rr+1):
at.remove(B[i])
k = 0
for i in range(n):
if i < rl or i > rr:
bfinal[i] = at[k]
k += 1
return afinal, bfinal
def load_data(path):
# 从文本文件读取数据
with open(path, 'r') as file:
lines = file.readlines()
# 解析工件数和机器数
workpiece, machines = map(int, lines[0].split())
# 初始化 J 和 P 数组
J = np.zeros((workpiece, len(lines[1].split()) // 2), dtype=int)
P = np.zeros((workpiece, len(lines[1].split()) // 2), dtype=int)
# 解析机器编号和加工时间
'''
偶数索引(机器编号):
J[i - 1][j // 2] = data[j] + 1:机器编号存储在 J 中。因为机器编号可能从 1 开始(如机器1、机器2),在存储时加1,以确保在后续计算中与 Python 的 0 索引保持一致。
i - 1 是当前工件的索引(因为 i 从 1 开始)。
j // 2 将偶数索引映射到 J 的列索引。
奇数索引(加工时间):
P[i - 1][j // 2] = data[j]:加工时间直接存储在 P 中,j // 2 同样用于映射到 P 的列索引。
'''
for i in range(1, len(lines)):
data = list(map(int, lines[i].split()))
# print(data)
for j in range(len(data)):
if j % 2 == 0:
J[i - 1][j // 2] = data[j]+1
else:
P[i - 1][j // 2] = data[j]
return J, P
if __name__ == '__main__':
J, P = load_data('../jsp_q_learning/FT06.txt')
n = J.shape[0] # 工件数
m = J.shape[1] # 机器数
pop_size = 40
gen = n*m
pop = createPop(J, pop_size)
Tmax, _, C = decode(J, P, pop[0])
fitness = [C.max()]
Cmax = C.max()
bestID = 0
bestInd = copy.deepcopy(pop[0])
for i in range(1, pop_size):
T_, _, C_ = decode(J, P, pop[i])
if C_.max() < Cmax:
Tmax = T_
Cmax = C_.max()
bestInd = copy.deepcopy(pop[i])
fitness.append(C_.max())
g = 0
chistory = []
while g < gen:
g += 1
# 所有个体的交叉操作
l = 0
newInd = []
newFitness = []
while l < pop_size/2:
tm = np.random.randint(pop_size) # 随机一个与最优个体交叉
I1, I2 = cross(pop[tm], bestInd)
T1, _, C1 = decode(J, P, I1) # 对交叉后的解码
newInd.append(I1) # 交叉后的个体添加入newInd
newFitness.append(C1.max()) # 交叉后的适应度添加入newFitness
if C1.max() < Cmax: # 如果适应度比已知最优个体还好
Cmax = C1.max() # 更新最佳适应度
Tmax = T1 # 更新最优调度
bestInd = copy.deepcopy(I1) # 更新最优个体
T2, _, C2 = decode(J, P, I2)
newInd.append(I2)
newFitness.append(C2.max())
if C2.max() < Cmax:
Cmax = C2.max()
Tmax = T2
bestInd = copy.deepcopy(I2)
l += 1
newpop = pop + newInd # 交叉后的种群与原来种群合并
newFit = fitness + newFitness # 适应度也合并
newId = np.array(newFit).argsort()[:pop_size] # 取最好的40个的ID
pop = copy.deepcopy([newpop[i] for i in newId])
fitness = [newFit[i] for i in newId]
# 变异操作
for i in range(pop_size):
index1, index2 = random.sample(range(n*m), 2)
pop[i][index1], pop[i][index2] = pop[i][index2], pop[i][index1]
Ind = copy.deepcopy(pop[i])
Tt, _, Ct = decode(J, P, Ind)
fitness[i] = Ct.max()
if Ct.max() < Cmax:
Cmax = Ct.max()
Tmax = Tt
bestInd = Ind
print('第{}代,Cmax={}'.format(g, Cmax))
wait_time = 0
for i in Tmax:
for j in range(1, len(i)):
wait_time += i[j][0] - i[j-1][-1]
print('第{}代,平均机器等待时间={}'.format(g, (Cmax*m - J.sum())/m))
chistory.append(Cmax)
index = chistory.index(Cmax)
print(f"{Cmax}首次出现的索引是:{index}")
print(Tmax)
print(decode(J, P, bestInd)[1])
plt.plot(chistory)
drawGantt(Tmax)
plt.show()
createInd 函数用于初始化遗传算法中的一个个体(解)
J
:一个机器顺序矩阵,形状为 n*m
,其中 J[i, j]
表示工件 i
在第 j
个操作中所需的机器号。
通过 J.shape[0]
获取工件的数量 n
。
s
是一个空列表,用于存储生成的个体(即工件的顺序)。Ji
是J
的副本,用于在后续处理中修改原始的数据而不影响它。- 进入一个循环,直到
Ji
中的所有元素都变为0。这个条件确保了所有工件的操作都被处理。随机选择一个工件I
(在0
到n-1
之间),并获取该工件当前第一个操作所需的机器号M
。
if M != 0:
s.append(I)
b = np.roll(Ji[I, :], -1)
b[-1] = 0
Ji[I, :] = b
如果机器号 M
不为0(意味着该工件还有未完成的操作)则:
- 将工件
I
添加到个体列表s
中。 - 使用
np.roll
将工件I
的操作顺序向左滚动,最前面的操作被移到最后,最后一个操作的机器号设置为0,表示该操作已完成。 - 更新
Ji
,反映该工件的操作顺序已经更新。
代码流程署里下
1. 数据加载与初始化
J, P = load_data('../jsp_q_learning/ft06.txt') n = J.shape[0] # 工件数 m = J.shape[1] # 机器数 pop_size = 40 gen = n * m pop = createPop(J, pop_size)
load_data
:从指定文件加载工件和机器的数据。n
和m
:分别代表工件数量和机器数量。pop_size
:设定种群大小为40。gen
:设定最大代数(生成数),为工件数与机器数的乘积。createPop
:生成初始种群。2. 解码与适应度评估
Tmax, _, C = decode(J, P, pop[0]) fitness = [C.max()] Cmax = C.max() bestID = 0 bestInd = copy.deepcopy(pop[0])
decode
:对种群中的个体进行解码,得到调度安排Tmax
和完成时间C
。fitness
:记录种群中第一个个体的适应度。Cmax
:更新当前的最大完成时间。bestInd
:记录当前最优个体。3. 评估种群中的其他个体
for i in range(1, pop_size): T_, _, C_ = decode(J, P, pop[i]) if C_.max() < Cmax: Tmax = T_ Cmax = C_.max() bestInd = copy.deepcopy(pop[i]) fitness.append(C_.max())
- 遍历种群中的所有个体,解码并计算适应度,更新最优个体和最大适应度。
4. 迭代优化过程
g = 0 chistory = [] while g < gen: g += 1 # 所有个体的交叉操作 l = 0 newInd = [] newFitness = [] while l < pop_size/2: tm = np.random.randint(pop_size) # 随机一个与最优个体交叉 I1, I2 = cross(pop[tm], bestInd) ... l += 1
while g < gen
:进行代数迭代,直到达到设定的最大代数。- 在内部循环中,进行交叉操作,生成新个体。
5. 交叉与适应度更新
newpop = pop + newInd # 交叉后的种群与原来种群合并 newFit = fitness + newFitness # 适应度也合并 newId = np.array(newFit).argsort()[:pop_size] # 取最好的40个的ID pop = copy.deepcopy([newpop[i] for i in newId]) fitness = [newFit[i] for i in newId]
- 将原种群和新生成的个体合并,并更新种群为适应度最好的40个个体。
6. 变异操作
for i in range(pop_size): index1, index2 = random.sample(range(n*m), 2) pop[i][index1], pop[i][index2] = pop[i][index2], pop[i][index1] Ind = copy.deepcopy(pop[i]) Tt, _, Ct = decode(J, P, Ind) fitness[i] = Ct.max() if Ct.max() < Cmax: Cmax = Ct.max() Tmax = Tt bestInd = Ind
- 对每个个体进行变异(交换两个随机位置的工件),并解码更新适应度。
7. 输出当前代信息
print('第{}代,Cmax={}'.format(g, Cmax)) wait_time = 0 for i in Tmax: for j in range(1, len(i)): wait_time += i[j][0] - i[j-1][-1] print('第{}代,平均机器等待时间={}'.format(g, (Cmax*m - J.sum())/m)) chistory.append(Cmax)
- 输出当前代的最大完成时间和平均机器等待时间。
8. 记录最优结果
index = chistory.index(Cmax) print(f"{Cmax}首次出现的索引是:{index}")
他使用上面代码经过两代变异就找到了最优
第1代,Cmax=60.0
第1代,平均机器等待时间=39.0
第2代,Cmax=55.0
第2代,平均机器等待时间=34.0
最近逛codewithpage时发现了一篇论文https://arxiv.org/pdf/2201.00548v1
在txyz上进行阅读理解
全文总结
这篇论文提出了一个通用框架,使用强化学习(RL)来解决动态车间调度问题(DJSP)。该框架的关键方面包括:(1) 将DJSP表述为马尔可夫决策过程,其中以析取图为状态,以通用调度规则为动作;(2) 使用注意力机制进行图表示学习,以从状态中提取特征;以及(3) 采用带有优先级回放和噪声网络的双重对抗深度Q网络(D3QPN)作为RL算法。作者还引入了一个名为Gymjsp的新基准,以促进基于RL的DJSP研究。该提出的框架旨在提供一种通用有效的DJSP解决方案,将结构化的人类知识(调度规则)与数据驱动的RL智能相结合。
主要观点
- 将DJSP公式化为马尔可夫决策过程,其中离散图为状态,通用调度规则为动作
- 使用注意力机制进行图表示学习,从状态中提取特征
- 采用双重对抗深度Q网络(D3QPN)作为强化学习算法,包括优先级回放和噪声网络
- 引入一个新的基准测试环境Gymjsp,以促进基于强化学习的DJSP研究
- 旨在提供一种灵活有效的DJSP解决方案,结合结构化的人类知识(调度规则)和数据驱动的强化学习智能
找到对那个的代码
GitHub - Yunhui1998/Gymjsp: Share a benchmark that can easily apply reinforcement learning in Job-shop-scheduling
里面恰好有个例子是针对ft06 作业的,跑一下自动生成甘特图
import os
os.sys.path.insert(0, '../gymjsp')
from gymjsp.jsspenv import HeuristicAttentionJsspEnv
env = HeuristicAttentionJsspEnv('ft06', schedule_cycle=10)
env.reset()
for _ in range(10000):
env.step(env.action_space.sample())
env.render()