花了几天时间学习了下 D* 算法,只能说熟悉了一下流程,远不能说掌握,算法实在是非常巧妙
参考
《制造车间无人搬运系统调度方法研究》
《基于D*Lite算法的移动机器人路径规划研究》
人工智能: 自动寻路算法实现(四、D、D*算法)
D* 算法
D*路径规划算法及python实现
【机器人】 D*算法-动态路径规划
https://github.com/zhm-real/PathPlanning
原文阅读
这几天的教训告诉我阅读原文真的很重要,第一遍读很难受,看了很多乱七八糟的中文论文以及博客
有用倒确实有些用,后面再读论文时就没有起初那么抗拒,能读得下去
大概就是:原论文 → 中文论文+博客 → 原论文 → 论文 + 代码 → 论文 + 博客
这么反复折腾几遍,再跟着实际 PPT 内容推着走一遍,总归能掌握一些吧
制造车间那篇论文是 18 年大连理工的,对 D* 介绍的部分基本就是原文翻译,可以对着原文一起读
下面对原文中重要的部分进行摘录和翻译,有助于后面对算法的理解
Like A*, D* maintains an OPEN list of states.
The OPEN list is used to propagate information about changes to the arc cost function and to calculate path costs to states in the space.
Every state X has an associated tag t ( X ) , such that t(X) = NEW if X has never been on the OPEN list, t(X) = OPEN if X is currently on the OPEN list, and t(X) = CLOSED if X is no longer on the OPEN list.
For each state X , D* maintains an estimate of
the sum of the arc costs
from X to G given by the path cost functionh(G, X )
. Given the proper conditions, this estimate is equivalent to the optimal (minimal) cost from state X to G, given by the implicit function o(G, X) . For each state X on the OPEN list (i.e., r(X) = OPEN), the key function,k(G, X)
, is defined to be equal to the minimum of h(G, X) before modification and all values assumed by h(G, X) since X was placed on the OPEN list.
The key function classifies a state X on the OPEN list into one of two types: a
RAISE
state ifk(G, X) < h(G, X )
, and aLOWER
State ifk(G, X) = h(G, X)
. D uses RAISE states on the OPEN list to propagate information about path cost increases (e.g., due to an increased arc cost) and LOWER states to propagate information about path cost reductions (e.g., due to a reduced arc cost or new path to the goal).*
The propagation takes place
through the repeated removal of states from the OPEN list
. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors.These neighbors are in turn placed on the OPEN list to continue the process.
传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程
States on the OPEN list are sorted by their key function value. The parameter kmin is defined to be min(k(X)) for all X such that t(X) = OPEN.
The parameter kmin represents an important threshold in D*
: path costs less than or equal to kmin are optimal, and those greater than kmin may not be optimal. The parameter kold is defined to be equal to kmin prior to most recent removal of a state from the OPEN list. If no states have been removed, kold is undefined.
The D* algorithm consists primarily of two functions:
PROCESS - STATE
is used to compute optimal path costs to the goal, andMODIFY - COST
is used to change the arc cost function c(O) and enter affected states on the OPEN list. Initially, t(O) is set to NEW for all states, h(G) is set to zero, and G is placed on the OPEN list. The first function, PROCESS- STATE, is repeatedly called until the robot’s state, X , is removed from the OPEN list (i.e., t(X) = CLOSED) or a value of -1 is returned, at which point either the sequence { X } has been computed or does not exist respectively. The robot then proceeds to follow the backpointers in the sequence { X )until it either reaches the goal or discovers an error in the arc cost function c(") (e.g., due to a detected obstacle
). The second function, MUDIFY- COST, is immediately called to correct c(O) and place affected states on the OPEN list. Let Y be the robot’s state at which it discovers an error in c(O). By calling PROCESS - STATE until it returnskmin ≥ h(Y)
, the cost changes are propagated to state Y such that h(Y) = o(Y). At this point, a possibly new sequence { Y) has been constructed, and the robot continues to follow the backpointers in the sequence toward the goal.
The role of RAISE ans LOWER states is central to the operation of the algorithm.
The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions.
When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost,these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible
. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.
RAISE和LOWER状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个弧线的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到包含该弧线的所有状态序列。当RAISE状态与相邻的低成本状态接触时,这些lower状态被放在OPEN列表中,它们随后尽可能地降低先前升高状态的成本。如果穿越一条弧线的成本降低了,那么这种降低将通过LOWER状态传播到包含该弧线的所有状态序列,以及成本也可以降低的相邻状态。
算法原理
命名由来
原话:The name of the algorithm, D*, was chosen because it resembles A*
解释:就是觉着像 A* 干脆起名 D*,说是Dynamic A* 不太严谨
主流程
情景模拟
- 假设有一机器人任务是从起点S到终点G,首先寻找一条最优路径{S, ……, Y, X, ……,G}
- 之后沿该路径行驶过程中在Y处发现X点塌方(无法到达),需要寻找新的路径{ Y, Z, ……,G}继续行驶
符号约定
1、有向图:D* 算法以有向图作为前提,即在A、B节点间,“A到B” 与 “B到A” 表示两个弧,因此对应的权重不一定一样
2、X、Y 、Z、S、G等:称为节点,也称为state,可理解为一个坐标position,其中S表示Start,G表示Goal
3、{S,……,G}:表示一条路径,该路径以S为起点,G为终点,简记为{S}
4、c(X, Y):表示“Y->X”节点的成本/权重(cost),具体计算逻辑如下
- 以Y为中心构成8邻域的九宫格
- 当不存在障碍物时,横向、竖向的连接/移动成本为1、斜向连接成本为1.4
- 当存在障碍物时,不管什么方向的连接/移动成本都设置为比较大的值,比如10000,以确保很难通过
- 虽然是有向图,但是在此文中可以设定双向移动成本一样即
c(X, Y) == c(Y, X)
5、b(X) = Y: 用来记录反向指针(backpointer),即X->Y
,当处于X位置时可以根据b(X)获得下一个要到达的节点Y,以使得路径最优
6、t(X):节点的访问标记位(tag),支持3个类型:NEW、OPEN、CLOSED
NEW
:节点从未访问过,最初的设置,可能存在部分节点永远处于改状态OPEN
:节点正在访问中,存储在OPEN_LIST(使用优先队列PriorityQueue实现)中,也称为OPEN_QUEUE,以待访问扩展CLOSED
:节点已经被访问过,从优先队列中移除,并完成了相邻节点的扩展,一般找到了到达重点G的最优路径
7、h(X):用来存储路径代价,指从X到达终点G的路径({X,……G},简记为{X})代价,不一定是全局最优,第一次搜索到起点时时,所有点的h会被更新,计算方式同Dijkstra
算法,是用相邻两点的代价+上一个点的代价累加得到
8、k(X):用来记录自X节点被加入到OPEN_LIST中后的最小h(X)值(具体计算方式由Insert函数决定),也是优先队列OPEN_LIST的排序依据,k将会保持到最小,它表示了本点在全图环境中到G点的最小代价
过程拆解
- P1、构建一条从S到G的路径{S, ……, Y, X, ……,G},采用
反向构建
路径的方法(以G为起点的Dijkstra算法),在PROCESS_STATE()函数的部分分支逻辑中实现 - P2、机器人沿着{S, ……, Y, X, ……,G}路径行驶,直到发现异常点(塌方,不可到达)后停下来,在Y处发现X塌方,停在Y
- P3、修正塌方节点X与周边节点间(最不可少的是Y)的访问成本(代价/权重),在MODIFY_COST()函数中实现
- P4、然后将节点X阻塞的信息传播到临近周边节点(最重要的是传到Y)中,并同时进行新路径的规划,得到新的路径{ Y, Z, ……,G},在PROCESS_STATE()函数的部分分支逻辑中实现
主逻辑伪代码
这里的伪代码实际上就是上面拆解后的过程
//S is the start State
//G is the goal State
h(G) = 0;
put_into_OPEN_QUEUE(G); //将G加入到OPEN_QUEUE/OPEN_LIST中
do {
k_min = PROCESS_STATE(); //主要是减少cost,寻找最优路径
}while(k_min != -1 && S_not_removed_from_open_list);
if(k_min == -1){
G_unreachable;
exit;
}else{
do{
do{
trace_optimal_path(); //沿着最优路径行驶{S, ……, G},根据具体情形来实现
}while(G_is_not_reached && map == environment); //map != environment 遇到了异常,塌陷、障碍物等
if(G_is_reached){ //到达终点
exit;
}else{
X = state_of_discrepancy_reached_trying_to_move_from_some_state_Y;
MODIFY_COST(X, Y, new_c(X, Y));
do{
k_min = PROCESS_STATE(); //减少cost 和 异常信息传播
}while(k_min != -1 && k_min<h(Y)); //k_min>=h(Y)时表示已经找到了最优路径,不可能有更优
if(k_min == -1){
exit;
}
}
}while(1);
}
核心函数
PROCESS_STATE() 函数
PROCESS_STATE()主要解决两大问题:
1、计算最优路径(传播cost减少)
2、传播异常信息(传播cost增加)
以上两种情况分别用k(X)与h(X)的关系
来进行控制
假设X是从优先队列OPEN_QUEUE中弹出来的最优节点
前序必要准备工作已经完成:
- t(·)=NEW //t(·)表示所有节点的t()值
- h(G)=0
- G被加入到了OPEN_QUEUE中
具体伪代码如下:
// PROCESS_STATE()
X = MIN_STATE()
if(X==NULL){
return -1
}
k_old = GET_KMIN();
DELETE(X);
if( k_old < h(X) ){
for( each neighbor Y of X ){
if( h(Y) <= k_old and h(X) > h(Y) + c(Y, X) ){ //尝试在现有路径条件下,先更新当前cost
b(X) = Y;
h(X) = h(Y) + c(Y, X);
}
}
}
if( k_old == h(X) ){
for( each neighbor Y of X ){
if( t(Y) == NEW or
( b(Y) == X and h(Y) != h(X)+c(X,Y) ) or
( b(Y) != X and h(Y) > h(X)+c(X,Y) ) ){ //正常扩展
b(Y) = X;
INSERT( Y, h(X)+c(X,Y) );
}
}
}else{ //k_old < h(X) condition
for( each neighbor Y of X){
if( t(Y) == NEW or
( b(Y) == X and h(Y) != h(X)+c(X,Y) ) ){ //传递异常信息c(X,Y)带来的变化
b(Y) == X;
INSERT( Y, h(X)+c(X,Y) )
}else{
if( b(Y) != X and h(Y) > h(X) + c(X, Y) ){ //将X再次插入队列,从RAISE模式变成了LOWER模式来扩展修正Y
INSERT( X, h(X) );
}else{
if( b(Y) != X and h(X) > h(Y)+c(Y,X) and
t(Y) == CLOSED and h(Y) > k_old ){ //寻找次优解,新的路径
INSERT( Y, h(Y) );
}
}
}
}
}
return GET_KMIN()
一些辅助函数,都是基于OPEN_QUEUE进行的操作:
// MIN_STATE()
Return X if k(X) is minimum in the OPEN_QUEUE
// GET_KMIN()
Return the minimum value of k in the OPEN_QUEUE
// DELETE(X)
delete X state from the OPEN_QUEUE
// INSERT(X, h_new)
// 隐含着将X节点加入到OPEN_QUOPEN(如果不存在的的话)
if( t(X) == NEW ){
k(X) = h_new;
}else if( t(X) == OPEN ){
k(X) = min( k(X), h_new );
}else{ // t(X) == CLOSED
k(X) = min( h(X), h_new ); //可使得X节点从RAISE进入LOWER模式
}
h(X) = h_new;
t(X) = OPEN;
SORT_OPEN_QUEUE_BY_k() //对OPEN_QUEUE按k进行排序
- L1 ~ L3:表示拥有最低 K 值的 X 由 openlist 中移出,如果 X 为
LOWER
,那么它的路径代价为最优的,因为h(X)==kold
- L8 ~ L13:检查 X 的每个邻居,看它的路径开销是否可以降低。此外,为 t(Y) = NEW 的节点设置一个初始路径成本(这里相当于一次遍历,采用 Dijkstra 算法)。所有 b(Y) = X 节点的路径开销都会发生变化,这里相当于
LOWER
状态的传播。发生变化的 Y 节点会重新放入 openlist,再由这些节点传播路径成本的变化 - L4 ~ L7:因为 kold < h(X),此时 X 是
RAISE
状态,路径开销增大,h(X) 变为inf
,经过 X 的路径不是最优的,在节点 X 将这种路径开销的变化传播给其邻居节点前,看看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销,如果存在,调整 b(X) = Y,此时已调整路径为最优(optimal
)的 - L15 ~ L18:和 L8 ~ L13 类似,主要是进行状态的传播,包括
RAISE
和LOWER
两种状态,其实这里主要是RAISE
状态的传播 - L20 ~ L21:如果 X 可以使得 b(Y) ≠ X 的邻居节点路径开销 h(Y) 减小,则将节点 X 重新放入 openlist 中以优化 Y
- L23 ~ L25:如果 X 可以通过状态为
CLOSED
的次优邻居节点 Y 减小路径开销,则将 Y 重新放入 openlist 以优化 X
MODIFY_COST() 函数
代价的修改是在发现障碍物所处的 state 时进行的,假设机器人在 Y 正准备前往 X 之前(即Y->X链接关系),发现了 X 处突然塌陷或者出现了障碍物则进行 cost 的修正,其实主要就是修正 Y 处的 h 值,Y 处的 k 值暂时不发生变化
下面的伪代码和实际编程时有一些区别,到代码部分再说
// MODIFY_COST( X, Y, new_c(X,Y) )
c(X, Y) = new_c(X, Y); //做cost修正
if( t(X) == CLOSED ){
INSERT( X, h(X) ); //将X重新插入队列,X处于LOWER模式,进行扩展,进而引起Y的路径发生变动,Y变成RAISE模式
}
return GET_KMIN();
Python 仿真
"""
D_star 2D
@author: huiming zhou
"""
import os
import sys
import math
import matplotlib.pyplot as plt
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../Search_based_Planning/")
from Search_2D import plotting, env
class DStar:
def __init__(self, s_start, s_goal):
self.s_start, self.s_goal = s_start, s_goal
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.fig = plt.figure()
self.OPEN = set()
self.t = dict()
self.PARENT = dict()
self.h = dict()
self.k = dict()
self.path = []
self.visited = set()
self.count = 0
def init(self):
for i in range(self.Env.x_range):
for j in range(self.Env.y_range):
self.t[(i, j)] = 'NEW'
self.k[(i, j)] = 0.0
self.h[(i, j)] = float("inf")
self.PARENT[(i, j)] = None
self.h[self.s_goal] = 0.0
def run(self, s_start, s_end):
self.init()
self.insert(s_end, 0)
while True:
self.process_state()
if self.t[s_start] == 'CLOSED':
break
self.path = self.extract_path(s_start, s_end)
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_path(self.path)
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
x, y = int(x), int(y)
if (x, y) not in self.obs:
print("Add obstacle at: s =", x, ",", "y =", y)
self.obs.add((x, y))
self.Plot.update_obs(self.obs)
s = self.s_start
self.visited = set()
self.count += 1
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
self.modify(s)
continue
s = self.PARENT[s]
self.path = self.extract_path(self.s_start, self.s_goal)
plt.cla()
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_visited(self.visited)
self.plot_path(self.path)
self.fig.canvas.draw_idle()
def extract_path(self, s_start, s_end):
path = [s_start]
s = s_start
while True:
s = self.PARENT[s]
path.append(s)
if s == s_end:
return path
def process_state(self):
s = self.min_state() # get node in OPEN set with min k value
self.visited.add(s)
if s is None:
return -1 # OPEN set is empty
k_old = self.get_k_min() # record the min k value of this iteration (min path cost)
self.delete(s) # move state s from OPEN set to CLOSED set
# k_min < h[s] --> s: RAISE state (increased cost)
if k_old < self.h[s]:
for s_n in self.get_neighbor(s):
if self.h[s_n] <= k_old and \
self.h[s] > self.h[s_n] + self.cost(s_n, s):
# update h_value and choose parent
self.PARENT[s] = s_n
self.h[s] = self.h[s_n] + self.cost(s_n, s)
# s: k_min >= h[s] -- > s: LOWER state (cost reductions)
if k_old == self.h[s]:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
# 3) s_n find a better parent
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
if self.PARENT[s_n] != s and \
self.h[s_n] > self.h[s] + self.cost(s, s_n):
# Condition: LOWER happened in OPEN set (s), s should be explored again
self.insert(s, self.h[s])
else:
if self.PARENT[s_n] != s and \
self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
self.t[s_n] == 'CLOSED' and \
self.h[s_n] > k_old:
# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
self.insert(s_n, self.h[s_n])
return self.get_k_min()
def min_state(self):
"""
choose the node with the minimum k value in OPEN set.
:return: state
"""
if not self.OPEN:
return None
return min(self.OPEN, key=lambda x: self.k[x])
def get_k_min(self):
"""
calc the min k value for nodes in OPEN set.
:return: k value
"""
if not self.OPEN:
return -1
return min([self.k[x] for x in self.OPEN])
def insert(self, s, h_new):
"""
insert node into OPEN set.
:param s: node
:param h_new: new or better cost to come value
"""
if self.t[s] == 'NEW':
self.k[s] = h_new
elif self.t[s] == 'OPEN':
self.k[s] = min(self.k[s], h_new)
elif self.t[s] == 'CLOSED':
self.k[s] = min(self.h[s], h_new)
self.h[s] = h_new
self.t[s] = 'OPEN'
self.OPEN.add(s)
def delete(self, s):
"""
delete: move state s from OPEN set to CLOSED set.
:param s: state should be deleted
"""
if self.t[s] == 'OPEN':
self.t[s] = 'CLOSED'
self.OPEN.remove(s)
def modify(self, s):
"""
start processing from state s.
:param s: is a node whose status is RAISE or LOWER.
"""
self.modify_cost(s)
while True:
k_min = self.process_state()
if k_min >= self.h[s]:
break
def modify_cost(self, s):
# if node in CLOSED set, put it into OPEN set.
# Since cost may be changed between s - s.parent, calc cost(s, s.p) again
if self.t[s] == 'CLOSED':
self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
def get_neighbor(self, s):
nei_list = set()
for u in self.u_set:
s_next = tuple([s[i] + u[i] for i in range(2)])
if s_next not in self.obs:
nei_list.add(s_next)
return nei_list
def cost(self, s_start, s_goal):
"""
Calculate Cost for this motion
:param s_start: starting node
:param s_goal: end node
:return: Cost for this motion
:note: Cost function could be more complicate!
"""
if self.is_collision(s_start, s_goal):
return float("inf")
return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])
def is_collision(self, s_start, s_end):
if s_start in self.obs or s_end in self.obs:
return True
if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
else:
s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
if s1 in self.obs or s2 in self.obs:
return True
return False
def plot_path(self, path):
px = [x[0] for x in path]
py = [x[1] for x in path]
plt.plot(px, py, linewidth=2)
plt.plot(self.s_start[0], self.s_start[1], "bs")
plt.plot(self.s_goal[0], self.s_goal[1], "gs")
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
def main():
s_start = (5, 5)
s_goal = (45, 25)
dstar = DStar(s_start, s_goal)
dstar.run(s_start, s_goal)
if __name__ == '__main__':
main()
在 main()
函数中首先调用 DStar()
函数构建环境地图
dstar = DStar(s_start, s_goal)
def __init__(self, s_start, s_goal):
self.s_start, self.s_goal = s_start, s_goal
self.Env = env.Env()
self.Plot = plotting.Plotting(self.s_start, self.s_goal)
self.u_set = self.Env.motions
self.obs = self.Env.obs
self.x = self.Env.x_range
self.y = self.Env.y_range
self.fig = plt.figure()
self.OPEN = set()
self.t = dict()
self.PARENT = dict()
self.h = dict()
self.k = dict()
self.path = []
self.visited = set()
self.count = 0
然后直接调用 run()
函数模拟 D* 算法的流程,函数代码和上文的主逻辑伪代码是一致的
dstar.run(s_start, s_goal)
def run(self, s_start, s_end):
self.init()
self.insert(s_end, 0)
while True:
self.process_state()
if self.t[s_start] == 'CLOSED':
break
self.path = self.extract_path(s_start, s_end)
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_path(self.path)
self.fig.canvas.mpl_connect('button_press_event', self.on_press)
plt.show()
- 首先用
init()
函数初始化地图信息 - 然后向 openlist 中插入终点
s_end
- 开始反向搜索,即一次遍历,反复调用
process_state()
函数直到找到起点 - 调用
extract_path()
函数提取路径信息,并调用plot_grid()
和plot_path()
函数绘图 - 一次遍历结束,这里用
on_press()
函数模拟二次遍历过程中路径点前方突然出现障碍物
precess_state()
函数是最关键的函数,其实现和论文中的伪代码完全一致
def process_state(self):
s = self.min_state() # get node in OPEN set with min k value
self.visited.add(s)
if s is None:
return -1 # OPEN set is empty
k_old = self.get_k_min() # record the min k value of this iteration (min path cost)
self.delete(s) # move state s from OPEN set to CLOSED set
# k_min < h[s] --> s: RAISE state (increased cost)
if k_old < self.h[s]:
for s_n in self.get_neighbor(s):
if self.h[s_n] <= k_old and \
self.h[s] > self.h[s_n] + self.cost(s_n, s):
# update h_value and choose parent
self.PARENT[s] = s_n
self.h[s] = self.h[s_n] + self.cost(s_n, s)
# s: k_min >= h[s] -- > s: LOWER state (cost reductions)
if k_old == self.h[s]:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
(self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
# 3) s_n find a better parent
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
for s_n in self.get_neighbor(s):
if self.t[s_n] == 'NEW' or \
(self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):
# Condition:
# 1) t[s_n] == 'NEW': not visited
# 2) s_n's parent: cost reduction
self.PARENT[s_n] = s
self.insert(s_n, self.h[s] + self.cost(s, s_n))
else:
if self.PARENT[s_n] != s and \
self.h[s_n] > self.h[s] + self.cost(s, s_n):
# Condition: LOWER happened in OPEN set (s), s should be explored again
self.insert(s, self.h[s])
else:
if self.PARENT[s_n] != s and \
self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
self.t[s_n] == 'CLOSED' and \
self.h[s_n] > k_old:
# Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
self.insert(s_n, self.h[s_n])
return self.get_k_min()
on_press()
函数是模拟路径重规划的关键函数
def on_press(self, event):
x, y = event.xdata, event.ydata
if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
print("Please choose right area!")
else:
x, y = int(x), int(y)
if (x, y) not in self.obs:
print("Add obstacle at: s =", x, ",", "y =", y)
self.obs.add((x, y))
self.Plot.update_obs(self.obs)
s = self.s_start
self.visited = set()
self.count += 1
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
self.modify(s)
continue
s = self.PARENT[s]
self.path = self.extract_path(self.s_start, self.s_goal)
plt.cla()
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_visited(self.visited)
self.plot_path(self.path)
self.fig.canvas.draw_idle()
手动在路径上放置障碍物,然后先 modify(s)
,然后再反复调用 precess_state()
函数进行重规划
while s != self.s_goal:
if self.is_collision(s, self.PARENT[s]):
self.modify(s)
continue
s = self.PARENT[s]
def modify(self, s):
"""
start processing from state s.
:param s: is a node whose status is RAISE or LOWER.
"""
self.modify_cost(s)
while True:
k_min = self.process_state()
if k_min >= self.h[s]:
break
def modify_cost(self, s):
# if node in CLOSED set, put it into OPEN set.
# Since cost may be changed between s - s.parent, calc cost(s, s.p) again
if self.t[s] == 'CLOSED':
self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))
从代码中可以看出,二次遍历中走到路径点 Y,发现下一路径点 X 是障碍物,则对 Y 处的路径开销进行修正,h(Y) = h(X) + c(X,Y)
,此时 Y 变为 RAISE
状态,将其加入 openlist 以传播该状态
另外代码中在轨迹重规划绘图时加了一些小细节,体现在 plot_visited()
函数上,会根据已添加障碍物的个数,然后给本次重规划时访问过的节点上色
s = self.s_start
self.visited = set()
self.count += 1
self.Plot.plot_grid("Dynamic A* (D*)")
self.plot_visited(self.visited)
self.plot_path(self.path)
def plot_visited(self, visited):
color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
'bisque', 'navajowhite', 'moccasin', 'wheat',
'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']
if self.count >= len(color) - 1:
self.count = 0
for x in visited:
plt.plot(x[0], x[1], marker='s', color=color[self.count])
实例推演
光是看讲解是体会不到算法的精妙的,结合实例看下吧
再贴一下 PROCESS-STATE()
的伪代码
1、下面是一开始的地图
2、将终点加入 openlist,开始一次遍历,该过程可以说是反向 Dijkstra
这个过程就是反复执行伪代码中的 L8 ~ L13,直到起点也被加入 openlist
有一点需要注意,一次遍历后,所有访问过的节点都保存了到终点的路径以及距离信息,相当于建立了一个路径场
,这在遇到障碍物后的二次规划时是很重要的,能够减少运算量
3、从起点开始二次遍历,遍历过程中出现新的障碍物
此时立刻调用 MODIFY_COST()
函数更改 (3,2) 节点的路径开销,更改如下
此时的 openlist 如下
state | k |
---|---|
(3,2) | 5.6 |
(1,2) | 7.6 |
(1,3) | 8.0 |
(1,1) | 8.0 |
4、反复调用 PROCESS_STATE()
函数重新规划路径
此时 (3,2) 节点处于 RAISE
状态,在传播状态变化之前,先看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销(对应 L4 ~ L7
),显然不存在满足条件的邻居
那么此时就将 (3,2) 节点的状态变化进行传播,对应 L15 ~ L18
,将其子节点(将 b(Y) = X 中的 Y 定义为子节点)的开销扩大,子节点有 (2,1)、(2,2)、(3,1),将这些节点都加入 openlist 中
此时节点 (4,1) 有可能减小节点 (3,2) 的路径开销,对应 L23 ~ L25
,将其加入 openlist 中以优化 (3,2) 节点
此时发生变化的节点如下图
此时的 openlist 如下
(4,1) | 6.2 |
---|---|
(2,2) | 6.6 |
(3,1) | 6.6 |
(2,1) | 7.0 |
(1,2) | 7.6 |
(1,3) | 8.0 |
此时 h([3,2]) = 10000,kmin = 6.2,不满足 kmin ≥ h[s],继续 process_state()
将 (4,1) 节点弹出 openlist,该节点处于 LOWER
状态,那么则将该 LOWER
状态传播,对应 L8 ~ L13
,调整其邻居节点 (3,1)、(3,2) 的路径开销,并加入 openlist
注意到节点 (3,2) 经过调整已经处于 LOWER
状态,将其将入 openlist 可以继续传播该状态
此时对于下面这句话应该有更深的认识
The propagation takes place
through the repeated removal of states from the OPEN list
. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors.These neighbors are in turn placed on the OPEN list to continue the process.
此时发生变化的节点如下
此时的 openlist 如下
(3,1) | 6.6 |
---|---|
(2,2) | 6.6 |
(2,1) | 7.0 |
(1,2) | 7.6 |
(3,2) | 7.6 |
(1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()
将 (3,1) 节点弹出 openlist,该节点此时处于 RAISE
状态,在传播 RAISE
状态前,依然会先执行 L4 ~ L7
,依旧没没有满足条件的邻居节点
但是 (3,1) 节点能够减少其邻居节点的路径开销,对应 L20 ~ L21
,所以将其重新加入 openlist,注意每次加入时都会更新一下节点的 k 值和 h 值,贴一下加入的代码
def insert(self, s, h_new):
"""
insert node into OPEN set.
:param s: node
:param h_new: new or better cost to come value
"""
if self.t[s] == 'NEW':
self.k[s] = h_new
elif self.t[s] == 'OPEN':
self.k[s] = min(self.k[s], h_new)
elif self.t[s] == 'CLOSED':
self.k[s] = min(self.h[s], h_new)
self.h[s] = h_new
self.t[s] = 'OPEN'
self.OPEN.add(s)
此时 (3,1) 节点已经处于 LOWER
状态,将其加入 openlist 可以传播该状态
此时发生变化的节点如下
此时的 openlist 如下
(2,2) | 6.6 |
---|---|
(2,1) | 7.0 |
(3,1) | 7.2 |
(1,2) | 7.6 |
(3,2) | 7.6 |
(1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()
将 (2,2) 节点弹出 openlist,按照 process_state() 走一遍没有发生变化的节点
此时的 openlist 如下
(2,1) | 7.0 |
---|---|
(3,1) | 7.2 |
(1,2) | 7.6 |
(3,2) | 7.6 |
(1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.0,不满足 kmin ≥ h[s],继续 process_state()
将 (2,1) 节点弹出 openlist,按照 process_state() 走一遍同样没有发生变化的节点
此时的 openlist 如下
(3,1) | 7.2 |
---|---|
(1,2) | 7.6 |
(3,2) | 7.6 |
(1,3) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.2,不满足 kmin ≥ h[s],继续 process_state()
将 (3,1) 节点弹出 openlist,该节点此时处于 LOWER
状态,将该状态传播,对应 L4 ~ L7,调整其邻居节点的开销,经过调整,其邻居节点 (2,1)、(2,2) 都由 RAISE
状态转变为 LOWER
状态
此时发生变化的节点如下
此时的 openlist 如下
(1,2) | 7.6 |
---|---|
(3,2) | 7.6 |
(1,3) | 8.0 |
(1,1) | 8.0 |
此时 h([3,2]) = 7.6,kmin = 7.6,满足 kmin ≥ h[s],说明已经规划出了一条合适的路径
机器人从当前位置继续向终点行进
至此,轨迹重规划结束,相信经过上面的推演,对原文中的下述表达有更深刻的认识
The role of RAISE ans LOWER states is central to the operation of the algorithm.
The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions.
When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost,these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible
. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.
总结
核心思想
D* 算法先采用 Dijkstra 算法从目标点 G 向起始点 S 反向搜索,在搜索过程中访问过的每个节点到终点 G 的路径信息以及距离信息,构造出一个路径场
,在遇到临时障碍物时,路径场信息能够避免不必要的重新规划路径带来的庞大运算量
RAISE
和LOWER
状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个节点的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到其相邻节点。当RAISE状态与相邻的LOWER状态节点接触时,这些LOWER状态节点被加入OPEN列表中,它们随后尽可能地降低之前RAISE状态节点的路径开销。如果某一节点的路径开销降低了,那么这种降低将通过LOWER状态传播到该节点的所有相邻节点
状态的传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程
通过 h(X) 和 k(X) 的大小关系切换节点的 RAISE 和 LOWER 状态,通过状态切换和状态传播实现轨迹的重规划
使用场合
适用于动态环境(/路网)中的路径/运动规划(motion planning),与Dijkstra、A* 形成对比(两者适用于静态环境/路网)
- 动态是指在沿着最优路径前进时,突然出现了既定路径中断的情况需要进行重新规划
- 路径中断在现实生活中可以理解为诸如塌方、巨石、车祸、阻塞等情况的发生,在图/Graph中是某条边Edge/弧Arc的代价/权重cost/weight突然变得非常大,使得当前路径成本陡增,需要重新规划
- 当然该方法也可以应用在静态环境/路网中,向下兼容,实际就相当于反向(从终点G开始求解)Dijkstra算法
- 该方法不是唯一的算法,比如在遇到障碍时再次调用A*(参考A* replanner)也是可以的,但是该方法计算成本低,效率高
- 同时该方法对已知局部环境(类似游戏中的趟地图开荒)、未知环境情况下也有很好的实用性