D* 算法完全解析(应该是全网最详细的吧)

花了几天时间学习了下 D* 算法,只能说熟悉了一下流程,远不能说掌握,算法实在是非常巧妙

参考

《制造车间无人搬运系统调度方法研究》

《基于D*Lite算法的移动机器人路径规划研究》

人工智能: 自动寻路算法实现(四、D、D*算法)

D* 算法

D*路径规划算法及python实现

【机器人】 D*算法-动态路径规划

https://github.com/zhm-real/PathPlanning

原文阅读

这几天的教训告诉我阅读原文真的很重要,第一遍读很难受,看了很多乱七八糟的中文论文以及博客

有用倒确实有些用,后面再读论文时就没有起初那么抗拒,能读得下去

大概就是:原论文 → 中文论文+博客 → 原论文 → 论文 + 代码 → 论文 + 博客

这么反复折腾几遍,再跟着实际 PPT 内容推着走一遍,总归能掌握一些吧

制造车间那篇论文是 18 年大连理工的,对 D* 介绍的部分基本就是原文翻译,可以对着原文一起读

下面对原文中重要的部分进行摘录和翻译,有助于后面对算法的理解

Like A*, D* maintains an OPEN list of states. The OPEN list is used to propagate information about changes to the arc cost function and to calculate path costs to states in the space. Every state X has an associated tag t ( X ) , such that t(X) = NEW if X has never been on the OPEN list, t(X) = OPEN if X is currently on the OPEN list, and t(X) = CLOSED if X is no longer on the OPEN list.

在这里插入图片描述

在这里插入图片描述

For each state X , D* maintains an estimate of the sum of the arc costs from X to G given by the path cost function h(G, X ) . Given the proper conditions, this estimate is equivalent to the optimal (minimal) cost from state X to G, given by the implicit function o(G, X) . For each state X on the OPEN list (i.e., r(X) = OPEN), the key function, k(G, X) , is defined to be equal to the minimum of h(G, X) before modification and all values assumed by h(G, X) since X was placed on the OPEN list.

在这里插入图片描述

The key function classifies a state X on the OPEN list into one of two types: a RAISE state if k(G, X) < h(G, X ) , and a LOWER State if k(G, X) = h(G, X). D uses RAISE states on the OPEN list to propagate information about path cost increases (e.g., due to an increased arc cost) and LOWER states to propagate information about path cost reductions (e.g., due to a reduced arc cost or new path to the goal).*

在这里插入图片描述

The propagation takes place through the repeated removal of states from the OPEN list. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors. These neighbors are in turn placed on the OPEN list to continue the process.

传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程

States on the OPEN list are sorted by their key function value. The parameter kmin is defined to be min(k(X)) for all X such that t(X) = OPEN. The parameter kmin represents an important threshold in D*: path costs less than or equal to kmin are optimal, and those greater than kmin may not be optimal. The parameter kold is defined to be equal to kmin prior to most recent removal of a state from the OPEN list. If no states have been removed, kold is undefined.

在这里插入图片描述

The D* algorithm consists primarily of two functions: PROCESS - STATE is used to compute optimal path costs to the goal, and MODIFY - COST is used to change the arc cost function c(O) and enter affected states on the OPEN list. Initially, t(O) is set to NEW for all states, h(G) is set to zero, and G is placed on the OPEN list. The first function, PROCESS- STATE, is repeatedly called until the robot’s state, X , is removed from the OPEN list (i.e., t(X) = CLOSED) or a value of -1 is returned, at which point either the sequence { X } has been computed or does not exist respectively. The robot then proceeds to follow the backpointers in the sequence { X ) until it either reaches the goal or discovers an error in the arc cost function c(") (e.g., due to a detected obstacle). The second function, MUDIFY- COST, is immediately called to correct c(O) and place affected states on the OPEN list. Let Y be the robot’s state at which it discovers an error in c(O). By calling PROCESS - STATE until it returns kmin ≥ h(Y), the cost changes are propagated to state Y such that h(Y) = o(Y). At this point, a possibly new sequence { Y) has been constructed, and the robot continues to follow the backpointers in the sequence toward the goal.

在这里插入图片描述

The role of RAISE ans LOWER states is central to the operation of the algorithm. The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions. When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost, these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.

RAISE和LOWER状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个弧线的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到包含该弧线的所有状态序列。当RAISE状态与相邻的低成本状态接触时,这些lower状态被放在OPEN列表中,它们随后尽可能地降低先前升高状态的成本。如果穿越一条弧线的成本降低了,那么这种降低将通过LOWER状态传播到包含该弧线的所有状态序列,以及成本也可以降低的相邻状态。

算法原理

命名由来

原话:The name of the algorithm, D*, was chosen because it resembles A*

解释:就是觉着像 A* 干脆起名 D*,说是Dynamic A* 不太严谨

主流程

情景模拟

  • 假设有一机器人任务是从起点S到终点G,首先寻找一条最优路径{S, ……, Y, X, ……,G}
  • 之后沿该路径行驶过程中在Y处发现X点塌方(无法到达),需要寻找新的路径{ Y, Z, ……,G}继续行驶

符号约定

1、有向图:D* 算法以有向图作为前提,即在A、B节点间,“A到B” 与 “B到A” 表示两个弧,因此对应的权重不一定一样

2、X、Y 、Z、S、G等:称为节点,也称为state,可理解为一个坐标position,其中S表示Start,G表示Goal

3、{S,……,G}:表示一条路径,该路径以S为起点,G为终点,简记为{S}

4、c(X, Y):表示“Y->X”节点的成本/权重(cost),具体计算逻辑如下

  • 以Y为中心构成8邻域的九宫格
  • 当不存在障碍物时,横向、竖向的连接/移动成本为1、斜向连接成本为1.4
  • 当存在障碍物时,不管什么方向的连接/移动成本都设置为比较大的值,比如10000,以确保很难通过
  • 虽然是有向图,但是在此文中可以设定双向移动成本一样即c(X, Y) == c(Y, X)

5、b(X) = Y: 用来记录反向指针(backpointer),即X->Y,当处于X位置时可以根据b(X)获得下一个要到达的节点Y,以使得路径最优

6、t(X):节点的访问标记位(tag),支持3个类型:NEW、OPEN、CLOSED

  • NEW:节点从未访问过,最初的设置,可能存在部分节点永远处于改状态
  • OPEN:节点正在访问中,存储在OPEN_LIST(使用优先队列PriorityQueue实现)中,也称为OPEN_QUEUE,以待访问扩展
  • CLOSED:节点已经被访问过,从优先队列中移除,并完成了相邻节点的扩展,一般找到了到达重点G的最优路径

7、h(X):用来存储路径代价,指从X到达终点G的路径({X,……G},简记为{X})代价,不一定是全局最优,第一次搜索到起点时时,所有点的h会被更新,计算方式同Dijkstra算法,是用相邻两点的代价+上一个点的代价累加得到

8、k(X):用来记录自X节点被加入到OPEN_LIST中后的最小h(X)值(具体计算方式由Insert函数决定),也是优先队列OPEN_LIST的排序依据,k将会保持到最小,它表示了本点在全图环境中到G点的最小代价

过程拆解

  • P1、构建一条从S到G的路径{S, ……, Y, X, ……,G},采用反向构建路径的方法(以G为起点的Dijkstra算法),在PROCESS_STATE()函数的部分分支逻辑中实现
  • P2、机器人沿着{S, ……, Y, X, ……,G}路径行驶,直到发现异常点(塌方,不可到达)后停下来,在Y处发现X塌方,停在Y
  • P3、修正塌方节点X与周边节点间(最不可少的是Y)的访问成本(代价/权重),在MODIFY_COST()函数中实现
  • P4、然后将节点X阻塞的信息传播到临近周边节点(最重要的是传到Y)中,并同时进行新路径的规划,得到新的路径{ Y, Z, ……,G},在PROCESS_STATE()函数的部分分支逻辑中实现

主逻辑伪代码

这里的伪代码实际上就是上面拆解后的过程

 //S is the start State
 //G is the goal State
 h(G) = 0;
 put_into_OPEN_QUEUE(G); //将G加入到OPEN_QUEUE/OPEN_LIST中
 do {
     k_min = PROCESS_STATE(); //主要是减少cost,寻找最优路径
 }while(k_min != -1 && S_not_removed_from_open_list);
 if(k_min == -1){
     G_unreachable;
     exit;
 }else{
     do{
         do{
             trace_optimal_path(); //沿着最优路径行驶{S, ……, G},根据具体情形来实现
         }while(G_is_not_reached && map == environment); //map != environment 遇到了异常,塌陷、障碍物等
         if(G_is_reached){ //到达终点
             exit;
         }else{
             X = state_of_discrepancy_reached_trying_to_move_from_some_state_Y;
             MODIFY_COST(X, Y, new_c(X, Y));
             do{
                 k_min = PROCESS_STATE();   //减少cost 和 异常信息传播
             }while(k_min != -1 && k_min<h(Y)); //k_min>=h(Y)时表示已经找到了最优路径,不可能有更优
             if(k_min == -1){
                 exit;
             }
         }
     }while(1);
 }

在这里插入图片描述

核心函数

PROCESS_STATE() 函数

PROCESS_STATE()主要解决两大问题:

1、计算最优路径(传播cost减少)

2、传播异常信息(传播cost增加)

以上两种情况分别用k(X)与h(X)的关系来进行控制

假设X是从优先队列OPEN_QUEUE中弹出来的最优节点

前序必要准备工作已经完成:

  • t(·)=NEW //t(·)表示所有节点的t()值
  • h(G)=0
  • G被加入到了OPEN_QUEUE中

具体伪代码如下:

// PROCESS_STATE()
 X = MIN_STATE()
 if(X==NULL){
     return -1
 }
 k_old = GET_KMIN()DELETE(X);
 if( k_old < h(X) ){
     for( each neighbor Y of X ){
         if( h(Y) <= k_old  and  h(X) > h(Y) + c(Y, X) ){ //尝试在现有路径条件下,先更新当前cost
               b(X) = Y;
               h(X) = h(Y) + c(Y, X); 
         }
     }
 }
 if( k_old == h(X) ){
     for( each neighbor Y of X ){
         if( t(Y) == NEW or
             ( b(Y) == X and h(Y) != h(X)+c(X,Y) ) or
             ( b(Y) != X and h(Y) > h(X)+c(X,Y) ) ){ //正常扩展
             b(Y) = X;
             INSERT( Y, h(X)+c(X,Y) );
         } 
     }
 }else{   //k_old < h(X) condition
     for( each neighbor Y of X){
         if( t(Y) == NEW or
             ( b(Y) == X and h(Y) != h(X)+c(X,Y) ) ){ //传递异常信息c(X,Y)带来的变化
             b(Y) == X;
             INSERT( Y, h(X)+c(X,Y) )
         }else{
             if( b(Y) != X and h(Y) > h(X) + c(X, Y) ){ //将X再次插入队列,从RAISE模式变成了LOWER模式来扩展修正Y
                 INSERT( X, h(X) ); 
             }else{
                 if( b(Y) != X and h(X) > h(Y)+c(Y,X) and 
                     t(Y) == CLOSED and h(Y) > k_old ){  //寻找次优解,新的路径
                     INSERT( Y, h(Y) );
                 }
             }
         }
     }
 }
 return GET_KMIN()

一些辅助函数,都是基于OPEN_QUEUE进行的操作:

// MIN_STATE()
 Return X if k(X) is minimum in the OPEN_QUEUE
 
 // GET_KMIN()
 Return the minimum value of k in the OPEN_QUEUE
 
 // DELETE(X)
 delete X state from the OPEN_QUEUE
 
 // INSERT(X, h_new) 
 // 隐含着将X节点加入到OPEN_QUOPEN(如果不存在的的话)
 if( t(X) == NEW ){
     k(X) = h_new;
 }else if( t(X) == OPEN ){
     k(X) = min( k(X), h_new );
 }else{ // t(X) == CLOSED
     k(X) = min( h(X), h_new ); //可使得X节点从RAISE进入LOWER模式
 }
 h(X) = h_new;
 t(X) = OPEN;
 SORT_OPEN_QUEUE_BY_k() //对OPEN_QUEUE按k进行排序

在这里插入图片描述

  • L1 ~ L3:表示拥有最低 K 值的 X 由 openlist 中移出,如果 X 为LOWER,那么它的路径代价为最优的,因为 h(X)==kold
  • L8 ~ L13:检查 X 的每个邻居,看它的路径开销是否可以降低。此外,为 t(Y) = NEW 的节点设置一个初始路径成本(这里相当于一次遍历,采用 Dijkstra 算法)。所有 b(Y) = X 节点的路径开销都会发生变化,这里相当于 LOWER 状态的传播。发生变化的 Y 节点会重新放入 openlist,再由这些节点传播路径成本的变化
  • L4 ~ L7:因为 kold < h(X),此时 X 是 RAISE 状态,路径开销增大,h(X) 变为 inf,经过 X 的路径不是最优的,在节点 X 将这种路径开销的变化传播给其邻居节点前,看看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销,如果存在,调整 b(X) = Y,此时已调整路径为最优(optimal)的
  • L15 ~ L18:和 L8 ~ L13 类似,主要是进行状态的传播,包括 RAISELOWER 两种状态,其实这里主要是 RAISE 状态的传播
  • L20 ~ L21:如果 X 可以使得 b(Y) ≠ X 的邻居节点路径开销 h(Y) 减小,则将节点 X 重新放入 openlist 中以优化 Y
  • L23 ~ L25:如果 X 可以通过状态为 CLOSED 的次优邻居节点 Y 减小路径开销,则将 Y 重新放入 openlist 以优化 X

MODIFY_COST() 函数

代价的修改是在发现障碍物所处的 state 时进行的,假设机器人在 Y 正准备前往 X 之前(即Y->X链接关系),发现了 X 处突然塌陷或者出现了障碍物则进行 cost 的修正,其实主要就是修正 Y 处的 h 值,Y 处的 k 值暂时不发生变化

下面的伪代码和实际编程时有一些区别,到代码部分再说

// MODIFY_COST( X, Y, new_c(X,Y) )
 c(X, Y) = new_c(X, Y); //做cost修正
 if( t(X) == CLOSED ){
     INSERT( X, h(X) ); //将X重新插入队列,X处于LOWER模式,进行扩展,进而引起Y的路径发生变动,Y变成RAISE模式
 }
 return GET_KMIN();

Python 仿真

"""
D_star 2D
@author: huiming zhou
"""

import os
import sys
import math
import matplotlib.pyplot as plt

sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
                "/../../Search_based_Planning/")

from Search_2D import plotting, env

class DStar:
    def __init__(self, s_start, s_goal):
        self.s_start, self.s_goal = s_start, s_goal

        self.Env = env.Env()
        self.Plot = plotting.Plotting(self.s_start, self.s_goal)

        self.u_set = self.Env.motions
        self.obs = self.Env.obs
        self.x = self.Env.x_range
        self.y = self.Env.y_range

        self.fig = plt.figure()

        self.OPEN = set()
        self.t = dict()
        self.PARENT = dict()
        self.h = dict()
        self.k = dict()
        self.path = []
        self.visited = set()
        self.count = 0

    def init(self):
        for i in range(self.Env.x_range):
            for j in range(self.Env.y_range):
                self.t[(i, j)] = 'NEW'
                self.k[(i, j)] = 0.0
                self.h[(i, j)] = float("inf")
                self.PARENT[(i, j)] = None

        self.h[self.s_goal] = 0.0

    def run(self, s_start, s_end):
        self.init()
        self.insert(s_end, 0)

        while True:
            self.process_state()
            if self.t[s_start] == 'CLOSED':
                break

        self.path = self.extract_path(s_start, s_end)
        self.Plot.plot_grid("Dynamic A* (D*)")
        self.plot_path(self.path)
        self.fig.canvas.mpl_connect('button_press_event', self.on_press)
        plt.show()

    def on_press(self, event):
        x, y = event.xdata, event.ydata
        if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
            print("Please choose right area!")
        else:
            x, y = int(x), int(y)
            if (x, y) not in self.obs:
                print("Add obstacle at: s =", x, ",", "y =", y)
                self.obs.add((x, y))
                self.Plot.update_obs(self.obs)

                s = self.s_start
                self.visited = set()
                self.count += 1

                while s != self.s_goal:
                    if self.is_collision(s, self.PARENT[s]):
                        self.modify(s)
                        continue
                    s = self.PARENT[s]

                self.path = self.extract_path(self.s_start, self.s_goal)

                plt.cla()
                self.Plot.plot_grid("Dynamic A* (D*)")
                self.plot_visited(self.visited)
                self.plot_path(self.path)

            self.fig.canvas.draw_idle()

    def extract_path(self, s_start, s_end):
        path = [s_start]
        s = s_start
        while True:
            s = self.PARENT[s]
            path.append(s)
            if s == s_end:
                return path

    def process_state(self):
        s = self.min_state()  # get node in OPEN set with min k value
        self.visited.add(s)

        if s is None:
            return -1  # OPEN set is empty

        k_old = self.get_k_min()  # record the min k value of this iteration (min path cost)
        self.delete(s)  # move state s from OPEN set to CLOSED set

        # k_min < h[s] --> s: RAISE state (increased cost)
        if k_old < self.h[s]:
            for s_n in self.get_neighbor(s):
                if self.h[s_n] <= k_old and \
                        self.h[s] > self.h[s_n] + self.cost(s_n, s):

                    # update h_value and choose parent
                    self.PARENT[s] = s_n
                    self.h[s] = self.h[s_n] + self.cost(s_n, s)

        # s: k_min >= h[s] -- > s: LOWER state (cost reductions)
        if k_old == self.h[s]:
            for s_n in self.get_neighbor(s):
                if self.t[s_n] == 'NEW' or \
                        (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
                        (self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):

                    # Condition:
                    # 1) t[s_n] == 'NEW': not visited
                    # 2) s_n's parent: cost reduction
                    # 3) s_n find a better parent
                    self.PARENT[s_n] = s
                    self.insert(s_n, self.h[s] + self.cost(s, s_n))
        else:
            for s_n in self.get_neighbor(s):
                if self.t[s_n] == 'NEW' or \
                        (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):

                    # Condition:
                    # 1) t[s_n] == 'NEW': not visited
                    # 2) s_n's parent: cost reduction
                    self.PARENT[s_n] = s
                    self.insert(s_n, self.h[s] + self.cost(s, s_n))
                else:
                    if self.PARENT[s_n] != s and \
                            self.h[s_n] > self.h[s] + self.cost(s, s_n):

                        # Condition: LOWER happened in OPEN set (s), s should be explored again
                        self.insert(s, self.h[s])
                    else:
                        if self.PARENT[s_n] != s and \
                                self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
                                self.t[s_n] == 'CLOSED' and \
                                self.h[s_n] > k_old:

                            # Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
                            self.insert(s_n, self.h[s_n])

        return self.get_k_min()

    def min_state(self):
        """
        choose the node with the minimum k value in OPEN set.
        :return: state
        """

        if not self.OPEN:
            return None

        return min(self.OPEN, key=lambda x: self.k[x])

    def get_k_min(self):
        """
        calc the min k value for nodes in OPEN set.
        :return: k value
        """

        if not self.OPEN:
            return -1

        return min([self.k[x] for x in self.OPEN])

    def insert(self, s, h_new):
        """
        insert node into OPEN set.
        :param s: node
        :param h_new: new or better cost to come value
        """

        if self.t[s] == 'NEW':
            self.k[s] = h_new
        elif self.t[s] == 'OPEN':
            self.k[s] = min(self.k[s], h_new)
        elif self.t[s] == 'CLOSED':
            self.k[s] = min(self.h[s], h_new)

        self.h[s] = h_new
        self.t[s] = 'OPEN'
        self.OPEN.add(s)

    def delete(self, s):
        """
        delete: move state s from OPEN set to CLOSED set.
        :param s: state should be deleted
        """

        if self.t[s] == 'OPEN':
            self.t[s] = 'CLOSED'

        self.OPEN.remove(s)

    def modify(self, s):
        """
        start processing from state s.
        :param s: is a node whose status is RAISE or LOWER.
        """

        self.modify_cost(s)

        while True:
            k_min = self.process_state()

            if k_min >= self.h[s]:
                break

    def modify_cost(self, s):
        # if node in CLOSED set, put it into OPEN set.
        # Since cost may be changed between s - s.parent, calc cost(s, s.p) again

        if self.t[s] == 'CLOSED':
            self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))

    def get_neighbor(self, s):
        nei_list = set()

        for u in self.u_set:
            s_next = tuple([s[i] + u[i] for i in range(2)])
            if s_next not in self.obs:
                nei_list.add(s_next)

        return nei_list

    def cost(self, s_start, s_goal):
        """
        Calculate Cost for this motion
        :param s_start: starting node
        :param s_goal: end node
        :return:  Cost for this motion
        :note: Cost function could be more complicate!
        """

        if self.is_collision(s_start, s_goal):
            return float("inf")

        return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])

    def is_collision(self, s_start, s_end):
        if s_start in self.obs or s_end in self.obs:
            return True

        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
            else:
                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))

            if s1 in self.obs or s2 in self.obs:
                return True

        return False

    def plot_path(self, path):
        px = [x[0] for x in path]
        py = [x[1] for x in path]
        plt.plot(px, py, linewidth=2)
        plt.plot(self.s_start[0], self.s_start[1], "bs")
        plt.plot(self.s_goal[0], self.s_goal[1], "gs")

    def plot_visited(self, visited):
        color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
                 'bisque', 'navajowhite', 'moccasin', 'wheat',
                 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']

        if self.count >= len(color) - 1:
            self.count = 0

        for x in visited:
            plt.plot(x[0], x[1], marker='s', color=color[self.count])

def main():
    s_start = (5, 5)
    s_goal = (45, 25)
    dstar = DStar(s_start, s_goal)
    dstar.run(s_start, s_goal)

if __name__ == '__main__':
    main()

在这里插入图片描述

main() 函数中首先调用 DStar() 函数构建环境地图

dstar = DStar(s_start, s_goal)

def __init__(self, s_start, s_goal):
        self.s_start, self.s_goal = s_start, s_goal

        self.Env = env.Env()
        self.Plot = plotting.Plotting(self.s_start, self.s_goal)

        self.u_set = self.Env.motions
        self.obs = self.Env.obs
        self.x = self.Env.x_range
        self.y = self.Env.y_range

        self.fig = plt.figure()

        self.OPEN = set()
        self.t = dict()
        self.PARENT = dict()
        self.h = dict()
        self.k = dict()
        self.path = []
        self.visited = set()
        self.count = 0

然后直接调用 run() 函数模拟 D* 算法的流程,函数代码和上文的主逻辑伪代码是一致的

dstar.run(s_start, s_goal)

def run(self, s_start, s_end):
        self.init()
        self.insert(s_end, 0)

        while True:
            self.process_state()
            if self.t[s_start] == 'CLOSED':
                break

        self.path = self.extract_path(s_start, s_end)
        self.Plot.plot_grid("Dynamic A* (D*)")
        self.plot_path(self.path)
        self.fig.canvas.mpl_connect('button_press_event', self.on_press)
        plt.show()
  • 首先用 init() 函数初始化地图信息
  • 然后向 openlist 中插入终点 s_end
  • 开始反向搜索,即一次遍历,反复调用 process_state() 函数直到找到起点
  • 调用 extract_path() 函数提取路径信息,并调用 plot_grid()plot_path() 函数绘图
  • 一次遍历结束,这里用 on_press() 函数模拟二次遍历过程中路径点前方突然出现障碍物

precess_state() 函数是最关键的函数,其实现和论文中的伪代码完全一致

def process_state(self):
        s = self.min_state()  # get node in OPEN set with min k value
        self.visited.add(s)

        if s is None:
            return -1  # OPEN set is empty

        k_old = self.get_k_min()  # record the min k value of this iteration (min path cost)
        self.delete(s)  # move state s from OPEN set to CLOSED set

        # k_min < h[s] --> s: RAISE state (increased cost)
        if k_old < self.h[s]:
            for s_n in self.get_neighbor(s):
                if self.h[s_n] <= k_old and \
                        self.h[s] > self.h[s_n] + self.cost(s_n, s):

                    # update h_value and choose parent
                    self.PARENT[s] = s_n
                    self.h[s] = self.h[s_n] + self.cost(s_n, s)

        # s: k_min >= h[s] -- > s: LOWER state (cost reductions)
        if k_old == self.h[s]:
            for s_n in self.get_neighbor(s):
                if self.t[s_n] == 'NEW' or \
                        (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)) or \
                        (self.PARENT[s_n] != s and self.h[s_n] > self.h[s] + self.cost(s, s_n)):

                    # Condition:
                    # 1) t[s_n] == 'NEW': not visited
                    # 2) s_n's parent: cost reduction
                    # 3) s_n find a better parent
                    self.PARENT[s_n] = s
                    self.insert(s_n, self.h[s] + self.cost(s, s_n))
        else:
            for s_n in self.get_neighbor(s):
                if self.t[s_n] == 'NEW' or \
                        (self.PARENT[s_n] == s and self.h[s_n] != self.h[s] + self.cost(s, s_n)):

                    # Condition:
                    # 1) t[s_n] == 'NEW': not visited
                    # 2) s_n's parent: cost reduction
                    self.PARENT[s_n] = s
                    self.insert(s_n, self.h[s] + self.cost(s, s_n))
                else:
                    if self.PARENT[s_n] != s and \
                            self.h[s_n] > self.h[s] + self.cost(s, s_n):

                        # Condition: LOWER happened in OPEN set (s), s should be explored again
                        self.insert(s, self.h[s])
                    else:
                        if self.PARENT[s_n] != s and \
                                self.h[s] > self.h[s_n] + self.cost(s_n, s) and \
                                self.t[s_n] == 'CLOSED' and \
                                self.h[s_n] > k_old:

                            # Condition: LOWER happened in CLOSED set (s_n), s_n should be explored again
                            self.insert(s_n, self.h[s_n])

        return self.get_k_min()

on_press() 函数是模拟路径重规划的关键函数

def on_press(self, event):
        x, y = event.xdata, event.ydata
        if x < 0 or x > self.x - 1 or y < 0 or y > self.y - 1:
            print("Please choose right area!")
        else:
            x, y = int(x), int(y)
            if (x, y) not in self.obs:
                print("Add obstacle at: s =", x, ",", "y =", y)
                self.obs.add((x, y))
                self.Plot.update_obs(self.obs)

                s = self.s_start
                self.visited = set()
                self.count += 1

                while s != self.s_goal:
                    if self.is_collision(s, self.PARENT[s]):
                        self.modify(s)
                        continue
                    s = self.PARENT[s]

                self.path = self.extract_path(self.s_start, self.s_goal)

                plt.cla()
                self.Plot.plot_grid("Dynamic A* (D*)")
                self.plot_visited(self.visited)
                self.plot_path(self.path)

            self.fig.canvas.draw_idle()

手动在路径上放置障碍物,然后先 modify(s) ,然后再反复调用 precess_state() 函数进行重规划

while s != self.s_goal:
                    if self.is_collision(s, self.PARENT[s]):
                        self.modify(s)
                        continue
                    s = self.PARENT[s]
def modify(self, s):
        """
        start processing from state s.
        :param s: is a node whose status is RAISE or LOWER.
        """

        self.modify_cost(s)

        while True:
            k_min = self.process_state()

            if k_min >= self.h[s]:
                break

    def modify_cost(self, s):
        # if node in CLOSED set, put it into OPEN set.
        # Since cost may be changed between s - s.parent, calc cost(s, s.p) again

        if self.t[s] == 'CLOSED':
            self.insert(s, self.h[self.PARENT[s]] + self.cost(s, self.PARENT[s]))

从代码中可以看出,二次遍历中走到路径点 Y,发现下一路径点 X 是障碍物,则对 Y 处的路径开销进行修正,h(Y) = h(X) + c(X,Y),此时 Y 变为 RAISE 状态,将其加入 openlist 以传播该状态

另外代码中在轨迹重规划绘图时加了一些小细节,体现在 plot_visited() 函数上,会根据已添加障碍物的个数,然后给本次重规划时访问过的节点上色

s = self.s_start
                self.visited = set()
                self.count += 1

self.Plot.plot_grid("Dynamic A* (D*)")
                self.plot_visited(self.visited)
                self.plot_path(self.path)

def plot_visited(self, visited):
        color = ['gainsboro', 'lightgray', 'silver', 'darkgray',
                 'bisque', 'navajowhite', 'moccasin', 'wheat',
                 'powderblue', 'skyblue', 'lightskyblue', 'cornflowerblue']

        if self.count >= len(color) - 1:
            self.count = 0

        for x in visited:
            plt.plot(x[0], x[1], marker='s', color=color[self.count])

实例推演

光是看讲解是体会不到算法的精妙的,结合实例看下吧

再贴一下 PROCESS-STATE() 的伪代码

在这里插入图片描述

1、下面是一开始的地图

在这里插入图片描述

2、将终点加入 openlist,开始一次遍历,该过程可以说是反向 Dijkstra

在这里插入图片描述

在这里插入图片描述

这个过程就是反复执行伪代码中的 L8 ~ L13,直到起点也被加入 openlist

有一点需要注意,一次遍历后,所有访问过的节点都保存了到终点的路径以及距离信息,相当于建立了一个路径场,这在遇到障碍物后的二次规划时是很重要的,能够减少运算量

3、从起点开始二次遍历,遍历过程中出现新的障碍物

在这里插入图片描述

此时立刻调用 MODIFY_COST() 函数更改 (3,2) 节点的路径开销,更改如下

在这里插入图片描述

此时的 openlist 如下

statek
(3,2)5.6
(1,2)7.6
(1,3)8.0
(1,1)8.0

4、反复调用 PROCESS_STATE() 函数重新规划路径

在这里插入图片描述

此时 (3,2) 节点处于 RAISE 状态,在传播状态变化之前,先看能不能通过其邻居中已经处于最优开销(即 h(Y) < kold)得到节点来优化 X 的路径开销(对应 L4 ~ L7),显然不存在满足条件的邻居

那么此时就将 (3,2) 节点的状态变化进行传播,对应 L15 ~ L18,将其子节点(将 b(Y) = X 中的 Y 定义为子节点)的开销扩大,子节点有 (2,1)、(2,2)、(3,1),将这些节点都加入 openlist 中

此时节点 (4,1) 有可能减小节点 (3,2) 的路径开销,对应 L23 ~ L25,将其加入 openlist 中以优化 (3,2) 节点

此时发生变化的节点如下图

在这里插入图片描述

此时的 openlist 如下

(4,1)6.2
(2,2)6.6
(3,1)6.6
(2,1)7.0
(1,2)7.6
(1,3)8.0

此时 h([3,2]) = 10000,kmin = 6.2,不满足 kmin ≥ h[s],继续 process_state()

将 (4,1) 节点弹出 openlist,该节点处于 LOWER 状态,那么则将该 LOWER 状态传播,对应 L8 ~ L13,调整其邻居节点 (3,1)、(3,2) 的路径开销,并加入 openlist

在这里插入图片描述

注意到节点 (3,2) 经过调整已经处于 LOWER 状态,将其将入 openlist 可以继续传播该状态

此时对于下面这句话应该有更深的认识

The propagation takes place through the repeated removal of states from the OPEN list. Each time a state is removed from the list, it is expanded to pass cost changes to its neighbors. These neighbors are in turn placed on the OPEN list to continue the process.

此时发生变化的节点如下

在这里插入图片描述

此时的 openlist 如下

(3,1)6.6
(2,2)6.6
(2,1)7.0
(1,2)7.6
(3,2)7.6
(1,3)8.0

此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()

将 (3,1) 节点弹出 openlist,该节点此时处于 RAISE 状态,在传播 RAISE 状态前,依然会先执行 L4 ~ L7,依旧没没有满足条件的邻居节点

但是 (3,1) 节点能够减少其邻居节点的路径开销,对应 L20 ~ L21,所以将其重新加入 openlist,注意每次加入时都会更新一下节点的 k 值和 h 值,贴一下加入的代码

def insert(self, s, h_new):
        """
        insert node into OPEN set.
        :param s: node
        :param h_new: new or better cost to come value
        """

        if self.t[s] == 'NEW':
            self.k[s] = h_new
        elif self.t[s] == 'OPEN':
            self.k[s] = min(self.k[s], h_new)
        elif self.t[s] == 'CLOSED':
            self.k[s] = min(self.h[s], h_new)

        self.h[s] = h_new
        self.t[s] = 'OPEN'
        self.OPEN.add(s)

此时 (3,1) 节点已经处于 LOWER 状态,将其加入 openlist 可以传播该状态

此时发生变化的节点如下

在这里插入图片描述

此时的 openlist 如下

(2,2)6.6
(2,1)7.0
(3,1)7.2
(1,2)7.6
(3,2)7.6
(1,3)8.0

此时 h([3,2]) = 7.6,kmin = 6.6,不满足 kmin ≥ h[s],继续 process_state()

将 (2,2) 节点弹出 openlist,按照 process_state() 走一遍没有发生变化的节点

在这里插入图片描述

此时的 openlist 如下

(2,1)7.0
(3,1)7.2
(1,2)7.6
(3,2)7.6
(1,3)8.0

此时 h([3,2]) = 7.6,kmin = 7.0,不满足 kmin ≥ h[s],继续 process_state()

将 (2,1) 节点弹出 openlist,按照 process_state() 走一遍同样没有发生变化的节点

在这里插入图片描述

此时的 openlist 如下

(3,1)7.2
(1,2)7.6
(3,2)7.6
(1,3)8.0

此时 h([3,2]) = 7.6,kmin = 7.2,不满足 kmin ≥ h[s],继续 process_state()

将 (3,1) 节点弹出 openlist,该节点此时处于 LOWER 状态,将该状态传播,对应 L4 ~ L7,调整其邻居节点的开销,经过调整,其邻居节点 (2,1)、(2,2) 都由 RAISE 状态转变为 LOWER 状态

在这里插入图片描述

此时发生变化的节点如下

在这里插入图片描述

此时的 openlist 如下

(1,2)7.6
(3,2)7.6
(1,3)8.0
(1,1)8.0

此时 h([3,2]) = 7.6,kmin = 7.6,满足 kmin ≥ h[s],说明已经规划出了一条合适的路径

在这里插入图片描述

机器人从当前位置继续向终点行进

在这里插入图片描述

至此,轨迹重规划结束,相信经过上面的推演,对原文中的下述表达有更深刻的认识

The role of RAISE ans LOWER states is central to the operation of the algorithm. The RAISE states (i.e., k(X) < h(X)) propagate cost increases, and the LOWER states (i.e., k(x) = h(X)) propagate cost reductions. When the cost of traversing an arc is increased, an affected neighbor state is placed on the OPEN list, and the cost increase is propagated via RAISE states through all state sequences containing the arc. As the RAISE states come in contact with neighboring states of lower cost, these LOWER states are placed on the OPEN list, and they subsequently decrease the cost of previously raised states wherever possible. If the cost of traversing an arc is decreased, the reduction is propagated via LOWER states through all state sequences containing the arc, as well as neighboring states whose cost can also be lowered.

总结

核心思想

D* 算法先采用 Dijkstra 算法从目标点 G 向起始点 S 反向搜索,在搜索过程中访问过的每个节点到终点 G 的路径信息以及距离信息,构造出一个路径场,在遇到临时障碍物时,路径场信息能够避免不必要的重新规划路径带来的庞大运算量

RAISELOWER状态的作用对算法的操作至关重要。RAISE状态(即k(X) <h(X))传播成本增加,而LOWER状态(即k(X) = h(X))传播成本降低。当遍历一个节点的开销增加时,受影响的邻居状态被放到OPEN列表中,开销的增加通过RAISE状态传播到其相邻节点。当RAISE状态与相邻的LOWER状态节点接触时,这些LOWER状态节点被加入OPEN列表中,它们随后尽可能地降低之前RAISE状态节点的路径开销。如果某一节点的路径开销降低了,那么这种降低将通过LOWER状态传播到该节点的所有相邻节点

状态的传播通过从OPEN列表中反复删除状态来进行。每次从列表中删除一个状态时,它都会展开以将成本更改传递给它的邻居。这些邻居依次被加入OPEN列表,以继续这一过程

通过 h(X) 和 k(X) 的大小关系切换节点的 RAISE 和 LOWER 状态,通过状态切换和状态传播实现轨迹的重规划

使用场合

适用于动态环境(/路网)中的路径/运动规划(motion planning),与Dijkstra、A* 形成对比(两者适用于静态环境/路网)

  • 动态是指在沿着最优路径前进时,突然出现了既定路径中断的情况需要进行重新规划
  • 路径中断在现实生活中可以理解为诸如塌方、巨石、车祸、阻塞等情况的发生,在图/Graph中是某条边Edge/弧Arc的代价/权重cost/weight突然变得非常大,使得当前路径成本陡增,需要重新规划
  • 当然该方法也可以应用在静态环境/路网中,向下兼容,实际就相当于反向(从终点G开始求解)Dijkstra算法
  • 该方法不是唯一的算法,比如在遇到障碍时再次调用A*(参考A* replanner)也是可以的,但是该方法计算成本低,效率高
  • 同时该方法对已知局部环境(类似游戏中的趟地图开荒)、未知环境情况下也有很好的实用性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/71304.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java 文件/文件夹复制,添加压缩zip

复制文件夹,并压缩成zip 需求&#xff1a;创建A文件夹&#xff0c;把B文件夹复制到A文件夹。然后把A文件夹压缩成zip包 public static void main(String[] args) throws Exception {try {String A "D:\\dev\\program";String B "D:\\program";// 创建临…

iOS- git对单个或者多个文件权限设置,使用pre-commit hook 和shell脚本,拦截校验

前提&#xff1a;最近&#xff0c;由于团队代码规范和安全问题&#xff0c;有一些文件只能是指定用户才能修改。 对比&#xff1a;调查了一下资料&#xff0c;发现好多人都在使用pre-commit技术。于是&#xff0c;就朝着这个方向去研究。于是抽空写了脚本&#xff0c;在提交的…

写一个函数返回参数二进制中 1 的个数(c语言三种实现方法)

&#xff08;本文旨在自己做题时的总结&#xff0c;我会给出不同的解法&#xff0c;后面如果碰到新的题目还会加入其中&#xff0c;等于是我自己的题库。 1.写一个函数返回参数二进制中 1 的个数。 比如&#xff1a; 15 0000 1111 4 个 1 方法一&#xff1a; #include…

form中表单切换,导致 relus 中的事件无法触发,原因:页面切换不要一直切换DOM,会导致问题,需要都显示出来

修改前&#xff0c;因为重复渲染DOM导致绑定rules失效 修改前代码使用 computed 计算出渲染的DOM&#xff0c;影响rules事件<el-formref"form"inline:model"billDetailCopy":rules"rules"size"small"label-position"right&quo…

WebStorm

WebStorm 介绍下载安装Activation 介绍 WebStorm是由JetBrains公司开发的一款集成开发环境&#xff08;IDE&#xff09;&#xff0c;主要专注于前端开发和Web开发。它旨在提供一套强大的工具和功能&#xff0c;以支持开发者在前端项目中编写、调试和维护代码。 JetBrains官网: …

Java中声明,定义,分配内存,初始化,赋值,是啥?

一. 声明&#xff0c;定义和分配内存 在Java中&#xff0c;声明和定义是同一个意思&#xff0c;不做区分。下面这些都是声明&#xff08;定义&#xff09;一个变量。 栈&#xff1a;存放局部变量&#xff08;包括基本数据类型的变量和对象的引用&#xff09; 堆&#xff1a;存…

配置:Terminal和oh-my-posh

目录 命令行安装oh-my-posh查看安装情况配置PowerShell启用oh-my-posh、设置主题配色安装字体Terminal中的配置 命令行安装oh-my-posh Set-ExecutionPolicy Bypass -Scope Process -Force; Invoke-Expression ((New-Object System.Net.WebClient).DownloadString(https://ohmy…

YAPi在线接口文档简单案例(结合Vue前端Demo)

在前后端分离开发中&#xff0c;我们都是基于文档进行开发&#xff0c;那前端人员有时候无法马上拿到后端的数据&#xff0c;该怎么办&#xff1f;我们一般采用mock模拟伪造数据直接进行测试&#xff0c;本篇文章主要介绍YApi在线接口文档的简单使用&#xff0c;并结合Vue的小d…

流量,日志分析--理论

提供资料&#xff1a; Wireshark 基本语法&#xff0c;基本使用方法&#xff0c;及包过虑规则 : https://blog.csdn.net/qq_17457175/article/det ails/53019490 ctf 常见流量分析题目类型 : https://ctf-wiki.org/misc/traffic/introduction/ windows 日志 : https://jone…

IntelliJ IDEA和Android studio怎么去掉usage和作者提示

截止到目前我已经写了 600多道算法题&#xff0c;其中部分已经整理成了pdf文档&#xff0c;目前总共有1000多页&#xff08;并且还会不断的增加&#xff09;&#xff0c;大家可以免费下载 下载链接&#xff1a;https://pan.baidu.com/s/1hjwK0ZeRxYGB8lIkbKuQgQ 提取码&#xf…

5.PyCharm基础使用及快捷键

在前几篇文章中介绍了PyCharm的安装和汉化,本篇文章一起来看一下PyCharm的基本用法和一些快捷键的使用方法。 本篇文章PyCharm的版本为PyCharm2023.2 新建项目和运行 打开工具,在菜单中——文件——新建项目 选择项目的创建位置(注意最好不要使用中文路径和中文名项目名称…

基于Gradio的GPT聊天程序

网上很多别人写的&#xff0c;要用账号也不放心。就自己写了一个基于gradio的聊天界面&#xff0c;部署后可以本地运行。 特点&#xff1a; 可以用openai的&#xff0c;也可以用api2d&#xff0c;其他api可以自己测试一下。使用了langchain的库 可以更改模型&#xff0c;会的…

IO密集型服务提升性能的三种方法

文章目录 批处理缓存多线程总结 大部分的业务系统其实都是IO密集型的系统&#xff0c;比如像我们面向B端提供摄像头服务&#xff0c;很多的接口其实就是将各种各样的数据汇总起来&#xff0c;展示给用户&#xff0c;我们的数据来源包括Redis、Mysql、Hbase、以及依赖的一些服务…

IP协议

网络层 对于网络层来说&#xff0c;它是传输层协议的具体实施&#xff0c;那么它具体是如何实施的呢&#xff1f; IP协议 IP能够实现将数据从A主机送到B主机&#xff0c;在网络中&#xff0c;每一个IP报文都包含了它的目标网络和目标主机。 而IP协议就是网络层使用的协议。 I…

【mysql】—— 表的约束

目录 序言 &#xff08;一&#xff09;空属性 &#xff08;二&#xff09;默认值 &#xff08;三&#xff09;列描述 &#xff08;四&#xff09;zerofill &#xff08;五&#xff09;主键 &#xff08;六&#xff09;自增长 &#xff08;七&#xff09;唯一键 &#…

stm32_断点调试无法进入串口接收中断

先说结果&#xff0c;可能是stm32调试功能/keil软件/调试器&#xff08;试过STLINK和JLINK两种&#xff09;的问题&#xff0c;不是代码&#xff1b; 1、入坑 配置完串口后&#xff0c;可以发送数据到串口助手&#xff0c;但不能接收数据并做处理&#xff0c;所以第一步&…

安全防御(3)

1.总结当堂NAT与双机热备原理&#xff0c;形成思维导图 2.完成课堂nat与双机热备试验 引用IDS是指入侵检测系统&#xff0c;它可以在网络中检测和防御入侵行为。IDS的签名是指根据已知入侵行为的特征制定的规则&#xff0c;用于检测和警告可能存在的入侵行为。签名过滤器可以根…

图论——最短路算法

引入&#xff1a; 如上图&#xff0c;已知图G。 问节点1到节点3的最短距离。 可心算而出为d[1,2]d[2,3]112,比d[1,3]要小。 求最短路径算法&#xff1a; 1.Floyd(弗洛伊德) 是一种基于三角形不等式的多源最短路径算法。边权可以为负数 表现为a[i,j]a[j,k]<a[i,k]。 …

9.2.2Socket(TCP)

一.过程: 1.建立连接(不是握手),虽然内核中的连接有很多,但是在应用程序中,要一个一个处理. 2. 获取任务:使用ServerSocket.accept()方法,作用是把内核中的连接获取到应用程序中,这个过程类似于生产者消费者模型. 3. 使用缓冲的时候,注意全缓冲和行缓冲. 4.注意关闭文件资源…

排序算法(二)

1.希尔排序-Shell Sort 1.算法原理 将未排序序列按照增量gap的不同分割为若干个子序列&#xff0c;然后分别进行插入排序&#xff0c;得到若干组排好序的序列&#xff1b; 缩小增量gap&#xff0c;并对分割为的子序列进行插入排序&#xff1b;最后一次的gap1&#xff0c;即整个…