文章目录
- 原理
- Python实现
- 验证
原理
DBSCAN,即Density-Based Spatial Clustering of Applications with Noise,基于密度的噪声应用空间聚类。
在DBSCAN算法中,将数据点分为三类:
- 核心点:若样本 x i x_i xi的 ε \varepsilon ε邻域内至少包含了 M M M个点,则为核心点
- 边界点:若样本 x i x_i xi的 ε \varepsilon ε邻域内包含的点数小于 M M M,但在其他核心点的 ε \varepsilon ε邻域内,则为边界点
- 噪声:既非核心点也非边界点则为噪声
那么,在实际实现DBSCAN算法时,对这三种点理应采取不同的操作
- 若一个点是核心点,那么应该穷尽所有与其相连接的边界点,再得到与边界点相连接的所有点,知道遍历完整个点集。
- 若一个点是边界点,若这个点尚未归类;若已经归类,则如1中所说,继续搜索与其相连的点。
- 若为噪声,则直接跳过。
可见,DBSCAN算法需要两个参数,分别是邻域半径 ε \varepsilon ε和点数 M M M。
Python实现
为了衡量两点之间的距离,计算一个距离矩阵以备调用,可以降低计算次数。对于一组点集data,其欧氏距离矩阵的求解方法如下
# 距离矩阵
def disMat(data):
arr = np.array(data)
dMat = lambda arr : arr.reshape(1,-1) - arr.reshape(-1,1)
# 此为单个轴的距离矩阵
mats = [dMat(arr[:,i]) for i in range(arr.shape[1])]
return np.linalg.norm(mats, axis=0)
其中,dMat用于求解一维向量的距离矩阵。
下面则是基于距离矩阵的DBSCAN算法。
import numpy as np
class DBSCAN(object):
# e 最小距离, minPts 最少样本数量
def __init__(self, e, minPts):
self.e = e
self.minPts = minPts
# 获取点id的临近点
def nearby(self, id):
return np.where(self.mat[id] <= self.e)[0]
def searchNearbyPts(self, points, group):
for id in points:
if id not in self.data:
continue
group.append(id)
self.data.remove(id)
# 查看id点的临近点
nearbyPts = self.nearby(id)
if len(nearbyPts) >= self.minPts:
self.searchNearbyPts(nearbyPts, group)
def fit(self, mat):
self.mat = mat
groups = list()
keys = range(mat.shape[0])
self.data = list(keys)
for id in keys:
if id not in self.data:
continue
# id点的临近点
nearbyPts = self.nearby(id)
if len(nearbyPts) < self.minPts:
continue
group = [id]
self.data.remove(id)
self.searchNearbyPts(nearbyPts, group)
groups.append(group)
# 加入飞点
groups += self.data
return groups
在上面的DBSCAN类中,fit相当于执行聚类的开关,其输入参数mat是点集的距离矩阵。
self.data是点的序号的列表。其中的for循环是DBSCAN算法的核心部分,其基本逻辑是,如果一个点已经被归类了,那么就从self.data中删除;如果这个点恰好又是核心点,那么就要搜寻其所有的邻近点。
函数nearby用于查找序号为id的点的所有临近点,searchNearbyPts则将某点的所有临近点进行归类。
验证
其数据生成、绘图代码如下所示
# 初始化数据
def initData(num, min, max):
return [np.random.randint(min, max, size=2) for _ in range(num)]
def drawRes(data, groups):
for gp in groups:
xy = np.array([data[i] for i in gp])
plt.scatter(xy[:,0], xy[:,1])
plt.title(u'DBSCAN')
plt.grid()
plt.tight_layout()
plt.show()
if __name__ == '__main__':
np.random.seed(42)
ds1 = initData(20, 0, 30)
ds2 = initData(20, 40, 60)
ds3 = initData(20, 70, 100)
ds = ds1 + ds2 + ds3
score_mat = disMat(ds)
groups = DBSCAN(20, 3).fit(score_mat)
drawRes(ds, groups)
聚类结果为