一、自适应大邻域搜索概念介绍
自适应大邻域搜索(Adaptive Large Neighborhood Search,ALNS)是一种用于解决组合优化问题的元启发式算法。以下是关于它的详细介绍:
-自适应大领域搜索的核心思想是:破坏解
、修复解
、动态调整权重并选择(自适应)
。
(一) 基本原理
(1)大邻域搜索(LNS)基础
传统的局部搜索算法通常在当前解的小邻域内进行搜索,容易陷入局部最优解。大邻域搜索则通过破坏(destroy)当前解的一部分,生成一个更大的邻域,然后再通过修复(repair)操作得到新的可行解,从而有可能跳出局部最优。
例如,在车辆路径规划问题中,破坏操作可以是移除一些客户节点,修复操作则是将这些移除的节点重新插入到路径中。
与同类算法相比:在邻域搜索算法中,有的算法可以只使用一种邻域,如模拟退火算法,因此它仅仅搜索了解空间的一小部分,找到全局最优的概率较小,它的优势之一是可以避免陷入局部最优;
邻域搜索算法以一个初始解 x xx 为输入,在 x xx 的邻域进行搜索,以寻找可能存在的更优解,每次找到一个更优解,就用它替换当前解,直到达到局部最优解(找不到任何更优解)。该算法被称为最佳改进算法,因为它总是选择邻域中的最佳解。
而有的算法可以使用多种算子,如变邻域搜索算法(VNS),它通过在当前解的多个邻域中寻找更满意的解,能够大大提高算法在解空间的搜索范围,但是它在使用算子时盲目地将每种算子形成的邻域结构都搜索一遍,缺少了一些启发式信息的指导;
而自适应大邻域搜索算法就弥补了这种不足,这种算法根据算子的历史表现与使用次数选择下一次迭代使用的算子,通过算子间的相互竞争来生成当前解的邻域结构,而在这种结构中有很大概率能够找到更好的解;
销毁方法通常包含随机元素,因此每次调用该方法时都会销毁解决方案的不同部分。然后,解 x xx 的邻域 N ( x ) N(x)N(x) 被定义为通过首先应用破坏方法然后应用修复方法可以达到的解集。
(2)自适应机制
ALNS在LNS的基础上引入了自适应机制。它会根据不同的破坏和修复操作在搜索过程中的表现,动态地调整这些操作被选择的概率。表现好的操作在后续搜索中被选择的概率会增加,反之则减少。
具体来说,算法会为每个破坏和修复操作维护一个分数,根据该分数更新操作的权重,进而影响操作被选择的概率。
(二) 主要步骤
(1) 初始化
生成一个初始解。这个初始解可以是通过某种启发式方法得到的,也可以是随机生成的。
初始化各种破坏和修复操作的权重,通常初始权重可以设置为相等。
(2) 迭代搜索
选择操作:根据当前操作的权重,使用某种选择策略(如轮盘赌选择)选择一个破坏操作和一个修复操作。
破坏阶段:使用选择的破坏操作对当前解进行破坏,得到一个部分解。
修复阶段:使用选择的修复操作对部分解进行修复,得到一个新的可行解。
接受准则:根据一定的接受准则(如模拟退火中的Metropolis准则)决定是否接受这个新解。如果接受,则将新解作为当前解;否则,保持当前解不变。
更新操作分数和权重:根据新解的质量,更新所使用的破坏和修复操作的分数,然后根据分数更新操作的权重。
(3) 终止条件
当满足一定的终止条件(如达到最大迭代次数、运行时间超过限制等)时,算法停止,输出当前找到的最优解。
1. 生成初始解,当前解X0 = 初始解,最优解X1 = X0 2. 重复以下步骤进行迭代直到停止准则 2.1 根据算子权重选择破坏与修复算子,并更新算子使用次数 2.2 破坏算子和修复算子依次对当前解操作得到新解X2 2.3 更新当前解 - 如f(X2) < f(X0),则X0 = X2 - 如f(X2) > f(X0),则以一定的概率接受该解作为当前解 2.4 更新最优解 - 如f(X2) < f(X1),则X1 = X2 2.5 更新算子的分数 2.6 更新算子的权重 2.7 重置当前解 3. 返回最优解X1
(4)关于破坏算子选择以及修复算子选择说明
(5)关于动态调整权重并选择(自适应过程)的说明
(三)算法评析
(1)优点
全局搜索能力强:通过大邻域搜索,能够跳出局部最优,有更大的机会找到全局最优解。
自适应特性:能够根据搜索过程中的反馈动态调整操作的选择概率,提高搜索效率。
灵活性高:可以应用于各种组合优化问题,只需要根据具体问题设计合适的破坏和修复操作。
(2) 缺点
参数调整复杂:算法中有多个参数需要调整,如初始权重、分数更新规则等,参数设置不当可能会影响算法的性能。
计算复杂度高:由于需要进行多次破坏和修复操作,计算量较大,对于大规模问题可能需要较长的运行时间。
(3) 应用领域
ALNS在许多组合优化问题中都有应用,例如:
车辆路径规划问题(VRP):包括带时间窗的VRP、多车辆VRP等。
调度问题:如作业车间调度、项目调度等。
资源分配问题:如护士排班、课程排课等。
二、算法实例
(一)ALNS算法实现34个城市的TSP问题
#以下是python实现的代码。 其中,destroy算子有3个:随机筛选N个城市、删除距离最大的N个城市和随机删#除连续的N个城市;repair算子有2个:随机插入和贪心插入,不过考虑到随机插入的效果大概率比较差,所以#代码中实际只使用了贪心插入。
import copy
import time
import random
import numpy as np
import pandas as pd
import os
import math
import folium
class ALNSSearch():
# 计算TSP总距离
# 计算TSP总距离
@staticmethod
def dis_cal(path, dist_mat):
distance = 0
for i in range(len(path) - 1):
distance += dist_mat[path[i]][path[i + 1]]
distance += dist_mat[path[-1]][path[0]]
return distance
# 随机删除N个城市
@staticmethod
def random_destroy(x, destroy_city_cnt):
new_x = copy.deepcopy(x)
removed_cities = []
# 随机选择N个城市,并删除
removed_index = random.sample(range(0, len(x)), destroy_city_cnt)
for i in removed_index:
removed_cities.append(new_x[i])
x.remove(new_x[i])
return removed_cities
# 删除距离最大的N个城市
@staticmethod
def max_n_destroy(x, destroy_city_cnt):
new_x = copy.deepcopy(x)
removed_cities = []
# 计算顺序距离并排序
dis = []
for i in range(len(new_x) - 1):
dis.append(dist_mat[new_x[i]][new_x[i + 1]])
dis.append(dist_mat[new_x[-1]][new_x[0]])
sorted_index = np.argsort(np.array(dis))
# 删除最大的N个城市
for i in range(destroy_city_cnt):
removed_cities.append(new_x[sorted_index[-1 - i]])
x.remove(new_x[sorted_index[-1 - i]])
return removed_cities
# 随机删除连续的N个城市
@staticmethod
def continue_n_destroy(x, destroy_city_cnt):
new_x = copy.deepcopy(x)
removed_cities = []
# 随机选择N个城市,并删除
removed_index = random.sample(range(0, len(x) - destroy_city_cnt), 1)[0]
for i in range(removed_index + destroy_city_cnt, removed_index, -1):
removed_cities.append(new_x[i])
x.remove(new_x[i])
return removed_cities
# destroy操作
def destroy(self, flag, x, destroy_city_cnt):
# 三个destroy算子,第一个是随机删除N个城市,第二个是删除距离最大的N个城市,第三个是随机删除连续的N个城市
removed_cities = []
if flag == 0:
# 随机删除N个城市
removed_cities = self.random_destroy(x, destroy_city_cnt)
elif flag == 1:
# 删除距离最大的N个城市
removed_cities = self.max_n_destroy(x, destroy_city_cnt)
elif flag == 2:
# 随机删除连续的N个城市
removed_cities = self.continue_n_destroy(x, destroy_city_cnt)
return removed_cities
# 随机插入
def random_insert(x, removed_cities):
insert_index = random.sample(range(0, len(x)), len(removed_cities))
for i in range(len(insert_index)):
x.insert(insert_index[i], removed_cities[i])
# 贪心插入
def greedy_insert(self, x, removed_cities):
dis = float('inf')
insert_index = -1
for i in range(len(removed_cities)):
# 寻找插入后的最小总距离
for j in range(len(x) + 1):
new_x = copy.deepcopy(x)
new_x.insert(j, removed_cities[i])
if self.dis_cal(new_x, dist_mat) < dis:
dis = self.dis_cal(new_x, dist_mat)
insert_index = j
# 最小位置处插入
x.insert(insert_index, removed_cities[i])
dis = float('inf')
# repair操作
def repair(self, flag, x, removed_cities):
# 两个repair算子,第一个是随机插入,第二个贪心插入
if flag == 0:
self.random_insert(x, removed_cities)
elif flag == 1:
self.greedy_insert(x, removed_cities)
# 选择destroy算子
def select_and_destroy(self, destroy_w, x, destroy_city_cnt):
# 轮盘赌逻辑选择算子
prob = destroy_w / np.array(destroy_w).sum()
seq = [i for i in range(len(destroy_w))]
destroy_operator = np.random.choice(seq, size=1, p=prob)[0]
# destroy操作
removed_cities = self.destroy(destroy_operator, x, destroy_city_cnt)
return removed_cities, destroy_operator
# 选择repair算子
def select_and_repair(self, repair_w, x, removed_cities):
# # 轮盘赌逻辑选择算子
prob = repair_w / np.array(repair_w).sum()
seq = [i for i in range(len(repair_w))]
repair_operator = np.random.choice(seq, size=1, p=prob)[0]
# repair操作:此处设定repair_operator=1,即只使用贪心策略
self.repair(1, x, removed_cities)
return repair_operator
# ALNS主程序
def calc_by_alns(self, dist_mat):
# 模拟退火温度
T = 100
# 降温速度
a = 0.97
# destroy的城市数量
destroy_city_cnt = int(len(dist_mat) * 0.1)
# destroy算子的初始权重
destroy_w = [1, 1, 1]
# repair算子的初始权重
repair_w = [1, 1]
# destroy算子的使用次数
destroy_cnt = [0, 0, 0]
# repair算子的使用次数
repair_cnt = [0, 0]
# destroy算子的初始得分
destroy_score = [1, 1, 1]
# repair算子的初始得分
repair_score = [1, 1]
# destroy和repair的挥发系数
lambda_rate = 0.5
# 当前解,第一代,贪心策略生成
removed_cities = [i for i in range(dist_mat.shape[0])]
x = []
self.repair(1, x, removed_cities)
# 历史最优解,第一代和当前解相同,注意是深拷贝,此后有变化不影响x,也不会因x的变化而被影响
history_best_x = copy.deepcopy(x)
# 迭代
cur_iter = 0
max_iter = 100
print(
'cur_iter: {}, best_f: {}, best_x: {}'.format(cur_iter, self.dis_cal(history_best_x, dist_mat),
history_best_x))
while cur_iter < max_iter:
# 生成测试解,即伪代码中的x^t
test_x = copy.deepcopy(x)
# destroy算子
remove, destroy_operator_index = self.select_and_destroy(destroy_w, test_x, destroy_city_cnt)
destroy_cnt[destroy_operator_index] += 1
# repair算子
repair_operator_index = self.select_and_repair(repair_w, test_x, remove)
repair_cnt[repair_operator_index] += 1
if self.dis_cal(test_x, dist_mat) <= self.dis_cal(x, dist_mat):
# 测试解更优,更新当前解
x = copy.deepcopy(test_x)
if self.dis_cal(test_x, dist_mat) <= self.dis_cal(history_best_x, dist_mat):
# 测试解为历史最优解,更新历史最优解,并设置最高的算子得分
history_best_x = copy.deepcopy(test_x)
destroy_score[destroy_operator_index] = 1.5
repair_score[repair_operator_index] = 1.5
else:
# 测试解不是历史最优解,但优于当前解,设置第二高的算子得分
destroy_score[destroy_operator_index] = 1.2
repair_score[repair_operator_index] = 1.2
else:
if np.random.random() < np.exp((self.dis_cal(x, dist_mat) - self.dis_cal(test_x, dist_mat)) / T):
# 当前解优于测试解,但满足模拟退火逻辑,依然更新当前解,设置第三高的算子得分
x = copy.deepcopy(test_x)
destroy_score[destroy_operator_index] = 0.8
repair_score[repair_operator_index] = 0.8
else:
# 当前解优于测试解,也不满足模拟退火逻辑,不更新当前解,设置最低的算子得分
destroy_score[destroy_operator_index] = 0.5
repair_score[repair_operator_index] = 0.5
# 更新destroy算子的权重
destroy_w[destroy_operator_index] = \
destroy_w[destroy_operator_index] * lambda_rate + \
(1 - lambda_rate) * destroy_score[destroy_operator_index] / destroy_cnt[destroy_operator_index]
# 更新repair算子的权重
repair_w[repair_operator_index] = \
repair_w[repair_operator_index] * lambda_rate + \
(1 - lambda_rate) * repair_score[repair_operator_index] / repair_cnt[repair_operator_index]
# 降低温度
T = a * T
# 结束一轮迭代,重置模拟退火初始温度
cur_iter += 1
print(
'cur_iter: {}, best_f: {}, best_x: {}'.format(cur_iter, self.dis_cal(history_best_x, dist_mat),
history_best_x))
# 打印ALNS得到的最优解
print(history_best_x)
print(self.dis_cal(history_best_x, dist_mat))
return history_best_x
if __name__ == '__main__':
original_cities = [['西宁', 101.74, 36.56],
['兰州', 103.73, 36.03],
['银川', 106.27, 38.47],
['西安', 108.95, 34.27],
['郑州', 113.65, 34.76],
['济南', 117, 36.65],
['石家庄', 114.48, 38.03],
['太原', 112.53, 37.87],
['呼和浩特', 111.65, 40.82],
['北京', 116.407526, 39.90403],
['天津', 117.200983, 39.084158],
['沈阳', 123.38, 41.8],
['长春', 125.35, 43.88],
['哈尔滨', 126.63, 45.75],
['上海', 121.473701, 31.230416],
['杭州', 120.19, 30.26],
['南京', 118.78, 32.04],
['合肥', 117.27, 31.86],
['武汉', 114.31, 30.52],
['长沙', 113, 28.21],
['南昌', 115.89, 28.68],
['福州', 119.3, 26.08],
['台北', 121.3, 25.03],
['香港', 114.173355, 22.320048],
['澳门', 113.54909, 22.198951],
['广州', 113.23, 23.16],
['海口', 110.35, 20.02],
['南宁', 108.33, 22.84],
['贵阳', 106.71, 26.57],
['重庆', 106.551556, 29.563009],
['成都', 104.06, 30.67],
['昆明', 102.73, 25.04],
['拉萨', 91.11, 29.97],
['乌鲁木齐', 87.68, 43.77]]
original_cities = pd.DataFrame(original_cities, columns=['城市', '经度', '纬度'])
D = original_cities[['经度', '纬度']].values * math.pi / 180
city_cnt = len(original_cities)
dist_mat = np.zeros((city_cnt, city_cnt))
for i in range(city_cnt):
for j in range(city_cnt):
if i == j:
# 相同城市不允许访问
dist_mat[i][j] = 1000000
else:
# 单位:km
dist_mat[i][j] = 6378.14 * math.acos(
math.cos(D[i][1]) * math.cos(D[j][1]) * math.cos(D[i][0] - D[j][0]) +
math.sin(D[i][1]) * math.sin(D[j][1]))
# ALNS求解TSP
time0 = time.time()
alns = ALNSSearch()
history_best_x = alns.calc_by_alns(dist_mat)
print(city_cnt)
print('使用ALNS求解TSP,耗时: {} s'.format(time.time() - time0))
#运行代码后可发现,经过不到4s的计算时间,ALNS便可以得到15662.59的解,和全局最优解15614.84之间的#gap仅约0.3%。
(2) Robust Optimization for the Dual Sourcing Inventory Routing Problem in Disaster Relief
import numpy as np
import math
import re
import csv
import random
import copy
import time
import os
# --------------------------- Global Configuration ---------------------------------------
removal_operators_list=['shaw','distance','worst','worst_replenishment_time','random']
insertion_operators_list=['greedy','2_regret','shaw']
##################################################################################
for root, dirs, files in os.walk('./Test_set'):
for set_name in files:
print(set_name)
time_start = time.time()
instance = set_name
problem = base.generate_problem(instance)
records = base.Record(removal_operators_list, insertion_operators_list)
solution = base.generate_initial_solution(problem)
removal_operators = base.Removal_operators(removal_operators_list)
insertion_operators = base.Insertion_operators(insertion_operators_list)
solution.obj = base.calculate_objective(solution, problem)
current_solution = copy.deepcopy(solution)
best_solution = copy.deepcopy(solution)
time_end = time.time()
time_cost = time_end - time_start
records.initial_generator_time_cost = time_cost
time_start = time.time()
iteration_num = 0
iteration_counter = 0
segment_counter = 0
while time_cost <= 3600:
if problem.temperature <= 0.01:
break
if iteration_counter >= problem.segments_capacity:
segment_counter += 1
iteration_counter = 0
for i in removal_operators_list:
records.removal_operator_score_record[i].append(removal_operators.removal_operator_score[i])
records.removal_operator_weights_record[i].append(removal_operators.removal_operator_weight[i])
records.removal_operator_times_record[i].append(removal_operators.removal_operator_times[i])
for i in insertion_operators_list:
records.insertion_operator_score_record[i].append(insertion_operators.insertion_operator_score[i])
records.insertion_operator_weights_record[i].append(insertion_operators.insertion_operator_weight[i])
records.insertion_operator_times_record[i].append(insertion_operators.insertion_operator_times[i])
removal_operators.update_weight(removal_operators_list,problem)
insertion_operators.update_weight(insertion_operators_list,problem)
for i in removal_operators_list:
removal_operators.removal_operator_score[i] = 0
removal_operators.removal_operator_times[i] = 0
for i in insertion_operators_list:
insertion_operators.insertion_operator_score[i] = 0
insertion_operators.insertion_operator_times[i] = 0
best_solution, current_solution = base.do_2opt(best_solution, current_solution, problem)
solution = copy.deepcopy(current_solution)
selected_removal_operator = removal_operators.select(removal_operators_list)
selected_insertion_operator = insertion_operators.select(insertion_operators_list)
removal_operators.removal_operator_times[selected_removal_operator] +=1
insertion_operators.insertion_operator_times[selected_insertion_operator] +=1
sub_time_start = time.time()
sub_time_cost = sub_time_start - time_end
base.do_remove(selected_removal_operator, solution, problem)
sub_time_end = time.time()
sub_time_cost = sub_time_end - sub_time_start
sub_time_start = sub_time_end
base.do_insert(selected_insertion_operator, solution, problem)
sub_time_end = time.time()
sub_time_cost = sub_time_end - sub_time_start
sub_time_start = sub_time_end
records.best_solution_obj_record.append(best_solution.obj)
if solution.obj < current_solution.obj:
if solution.obj < best_solution.obj:
removal_operators.removal_operator_score[selected_removal_operator] += problem.score_factor_1
insertion_operators.insertion_operator_score[selected_insertion_operator] += problem.score_factor_1
current_solution = copy.deepcopy(solution)
best_solution = copy.deepcopy(solution)
current_solution = copy.deepcopy(solution)
records.best_solution_obj_record[iteration_num] = best_solution.obj
segment_counter = 0
print('get best solution= ', best_solution.obj)
records.best_solution_update_iteration_num.append(iteration_num)
else:
removal_operators.removal_operator_score[selected_removal_operator] += problem.score_factor_2
insertion_operators.insertion_operator_score[selected_insertion_operator] += problem.score_factor_2
current_solution = copy.deepcopy(solution)
elif random.random() < math.exp(-(solution.obj - current_solution.obj)/problem.temperature):
removal_operators.removal_operator_score[selected_removal_operator] += problem.score_factor_3
insertion_operators.insertion_operator_score[selected_insertion_operator] += problem.score_factor_3
current_solution = copy.deepcopy(solution)
iteration_num += 1
iteration_counter += 1
time_end = time.time()
time_cost = time_end - time_start
if time_cost >= 1200 and iteration_num <= 3000/3:
problem.cooling_rate = (0.01/problem.temperature) ** (1/iteration_num)
if time_cost >= 2400 and iteration_num <= 3000/2:
problem.cooling_rate = (0.01/problem.temperature) ** (1/(iteration_num/2))
problem.temperature = problem.temperature*problem.cooling_rate
print('iteration '+ str(iteration_num)+' finished!')
print(problem.instance_name,' has finished!')
with open('Data.csv', 'a', newline='', encoding="ANSI") as f_out:
csv.writer(f_out, delimiter=',').writerow([set_name, best_solution.obj,time_cost])
print(problem.instance_name, ' has finished!')
三、基础知识介绍
(一)轮盘赌是什么?
来源:Evolutionary Computing: 遗传算法_轮盘赌选择(转载) - 柠檬雨 - 博客园
参考引用(它山之石)
「1」https://zhuanlan.zhihu.com/p/353562821
「2」元启发式算法详解:(自适应)大邻域搜索算法((A)LNS)-CSDN博客
「3」https://zhuanlan.zhihu.com/p/102298240