DTW(Dynamic Time Warping)算法是一种用于度量两个时间序列之间的相似性的方法。它的构建原理如下:
-
基本思想:DTW算法通过计算两个时间序列之间的最小距离,来度量它们的相似性。与欧氏距离等传统距离度量方法不同,DTW算法考虑了时间序列中元素之间的时序关系。
-
算法步骤:
- 创建一个距离矩阵,用于存储计算过程中的距离。
- 初始化第一行和第一列的值为无穷大。
- 从矩阵的(1,1)位置开始,计算每个位置的距离。具体计算方法是将当前位置的距离设为当前位置的元素之间的距离加上三个相邻位置中最小的距离。
- 最后返回距离矩阵的最后一个元素,即两个时间序列的DTW距离。
DTW算法的优点:
- 考虑了时间序列中元素之间的时序关系,适用于比较不同长度和变化速度的时间序列。
- 具有较好的鲁棒性,对于时间序列中的噪声、缺失数据等问题有一定的容忍度。
DTW算法的缺点:
- 计算复杂度较高,时间和空间复杂度都为O(n*m),其中n和m分别是两个时间序列的长度。
- 对于高维数据的处理能力有限,当时间序列的维度较高时,DTW算法的效果可能下降。
实例原理图如下所示:
DTW 算法重要的特征就是通过“扭曲”X 轴来解释 Y 轴变量,以达到使时间序列对齐的目的。但是简单的通过 Y 轴的变量值来扭曲 X 轴会导致“不直观”( unintuitive )的对齐,其中一个时间序列上的单个点会映射到另一个时间序列的一部分上。我们称这种我们不期望看到的行为为“奇点”( singularities )。
DTW算法的核心思想就是通过扭曲来实现序列的近似对齐处理,基于动态规划的思想找到最优匹配路径。
相关的原理这里就不再赘述了,感兴趣的话可以自行查阅即可,这里接下来主要是想要基于python开发实现DTW算法,如下所示:
def dtw_distance(s1, s2):
"""
两序列DTW距离计算
"""
n = len(s1)
m = len(s2)
# 创建一个n*m的矩阵,用于存储计算过程中的距离
dtw_matrix = np.zeros((n + 1, m + 1))
# 初始化第一行和第一列的值为无穷大
dtw_matrix[0, 1:] = np.inf
dtw_matrix[1:, 0] = np.inf
# 计算每个位置的距离
for i in range(1, n + 1):
for j in range(1, m + 1):
cost = abs(s1[i - 1] - s2[j - 1]) # 计算s1[i]和s2[j]之间的距离
dtw_matrix[i, j] = cost + min(
dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1]
)
# 返回最终的DTW距离
dtw = dtw_matrix[n, m]
print("[-]" * 30)
print("dtw_matrix: ")
print(dtw_matrix)
print("[-]" * 30)
print("dtw_distance: ", dtw)
return dtw
结果输出如下所示:
[-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-]
dtw_matrix:
[[ 0. inf inf inf inf inf]
[inf 1. 4. 9. 16. 25.]
[inf 1. 3. 7. 13. 21.]
[inf 2. 2. 5. 10. 17.]
[inf 4. 2. 4. 8. 14.]
[inf 7. 3. 3. 6. 11.]]
[-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-]
dtw_distance: 11.0
上面打印输出的是dtw动态规划过程中产生的转移矩阵,下面打印的是dtw算法计算得到的序列之间的距离。
上面的代码是不依赖于任何第三方库实现的dtw算法,在现在已经有现成的第三方库可以直接进行使用,代码实现也很简单,如下所示:
s1 = [1, 2, 3, 4, 5]
s2 = [2, 4, 6, 8, 10]
x = np.array(s1)
y = np.array(s2)
dtw_distance, dtw_matrix = fastdtw(x, y, dist=euclidean)
print("[-]" * 30)
print("dtw_matrix: ")
print(dtw_matrix)
print("[-]" * 30)
print("dtw_distance: ", dtw_distance)
结果输出如下所示:
[-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-]
dtw_matrix:
[(0, 0), (1, 0), (2, 1), (3, 1), (4, 2), (4, 3), (4, 4)]
[-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-][-]
dtw_distance: 11.0
接下来我想要对比下不同算法实现的效率情况,核心实现如下所示:
nums_list = [10,100,1000,10000]
time_list1=[]
for one_num in nums_list:
start = time.time()
for i in range(one_num):
one = [random.randint(1, 1000) for _ in range(random.randint(1, 100))]
two = [random.randint(1, 1000) for _ in range(random.randint(1, 100))]
dtw_distance(one, two)
end = time.time()
delta = end - start
time_list1.append(delta)
print(time_list1)
另一个算法也是类似,这里就不再赘述了,分别进行了四组计算,对比可视化如下:
plt.clf()
plt.plot(time_list1,label="selfDTW")
plt.plot(time_list2,label="fastDTW")
plt.legend()
plt.title("Different DTW Method TimeConsume Compare")
plt.savefig("DTW_compare.png")
结果如下所示:
很奇怪的就是我原以为第三方模块的效率应该会更高,但是在这里对比实验结果下反而是相反的情况,不知道是我哪里设置出了问题还是什么原因,如果有发现的还望不吝指教,感兴趣的话都是可以自行尝试实践下的。
基于动态时间规整(DTW)算法的变种算法有多种,以下是其中一些常见的变种算法:
-
FastDTW(快速动态时间规整):FastDTW是对DTW算法的一种加速版本。它通过将时间序列进行下采样,然后在较小的序列上应用DTW算法来减少计算量。FastDTW在保持较小时间和空间复杂度的同时,仍能提供较好的近似DTW距离。
-
Multivariate DTW(多变量动态时间规整):DTW算法最初应用于单变量时间序列,但在实际问题中,往往存在多个变量之间的关联。Multivariate DTW扩展了DTW算法,使其能够处理多维时间序列。它考虑了每个维度上的距离度量和对齐路径的形状。
-
Weighted DTW(加权动态时间规整):在标准的DTW算法中,每个时间步的匹配都被视为等权重。然而,在某些应用中,不同时间步的重要性可能不同。Weighted DTW引入了一个权重系数,用于调整每个时间步的贡献,从而更好地适应不同的应用场景。
-
Constrained DTW(约束动态时间规整):在某些情况下,我们可能希望对DTW算法的搜索空间进行限制,以便更好地满足特定的约束条件。Constrained DTW通过引入限制条件,如全局约束或局部约束,来约束匹配路径的搜索范围,从而减少计算复杂度并提高算法的效率。
这些是基于DTW算法的一些常见变种算法,每种算法都有其适用的场景和优势。具体选择哪种算法取决于你的应用需求、时间序列的特征以及对计算效率和准确性的要求。感兴趣的话都可以对照了解学习一下。