numpy-ml\numpy_ml\trees\gbdt.py
# 导入 numpy 库并重命名为 np
import numpy as np
# 从当前目录下的 dt 模块中导入 DecisionTree 类
# 从当前目录下的 losses 模块中导入 MSELoss 和 CrossEntropyLoss 类
from .dt import DecisionTree
from .losses import MSELoss, CrossEntropyLoss
# 定义一个函数,将标签转换为 one-hot 编码
def to_one_hot(labels, n_classes=None):
# 如果标签的维度大于 1,则抛出异常
if labels.ndim > 1:
raise ValueError("labels must have dimension 1, but got {}".format(labels.ndim))
# 获取标签的数量
N = labels.size
# 计算 one-hot 编码的列数
n_cols = np.max(labels) + 1 if n_classes is None else n_classes
# 创建一个全零矩阵,用于存储 one-hot 编码
one_hot = np.zeros((N, n_cols))
# 将对应位置的值设为 1.0,实现 one-hot 编码
one_hot[np.arange(N), labels] = 1.0
return one_hot
# 定义一个梯度提升决策树的类
class GradientBoostedDecisionTree:
def __init__(
self,
n_iter,
max_depth=None,
classifier=True,
learning_rate=1,
loss="crossentropy",
step_size="constant",
# 定义一个预测方法,用于对输入数据进行分类或预测
def predict(self, X):
"""
Use the trained model to classify or predict the examples in `X`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Returns
-------
preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
The integer class labels predicted for each example in `X` if
``self.classifier = True``, otherwise the predicted target values.
"""
# 初始化预测结果矩阵
Y_pred = np.zeros((X.shape[0], self.out_dims))
# 遍历每个迭代器
for i in range(self.n_iter):
# 遍历每个输出维度
for k in range(self.out_dims):
# 根据权重和学习器的预测结果更新预测结果矩阵
Y_pred[:, k] += self.weights[i, k] * self.learners[i, k].predict(X)
# 如果是分类器,则取预测结果矩阵中每行最大值的索引作为预测结果
if self.classifier:
Y_pred = Y_pred.argmax(axis=1)
return Y_pred
numpy-ml\numpy_ml\trees\losses.py
import numpy as np
# 导入 NumPy 库
#######################################################################
# Base Estimators #
#######################################################################
# 定义 ClassProbEstimator 类,用于计算类别概率
class ClassProbEstimator:
# 训练函数,计算类别概率
def fit(self, X, y):
self.class_prob = y.sum() / len(y)
# 预测函数,返回类别概率
def predict(self, X):
pred = np.empty(X.shape[0], dtype=np.float64)
pred.fill(self.class_prob)
return pred
# 定义 MeanBaseEstimator 类,用于计算均值
class MeanBaseEstimator:
# 训练函数,计算均值
def fit(self, X, y):
self.avg = np.mean(y)
# 预测函数,返回均值
def predict(self, X):
pred = np.empty(X.shape[0], dtype=np.float64)
pred.fill(self.avg)
return pred
#######################################################################
# Loss Functions #
#######################################################################
# 定义 MSELoss 类,用于计算均方误差
class MSELoss:
# 计算均方误差
def __call__(self, y, y_pred):
return np.mean((y - y_pred) ** 2)
# 返回基本估计器
def base_estimator(self):
return MeanBaseEstimator()
# 计算梯度
def grad(self, y, y_pred):
return -2 / len(y) * (y - y_pred)
# 线搜索函数
def line_search(self, y, y_pred, h_pred):
# TODO: revise this
Lp = np.sum((y - y_pred) * h_pred)
Lpp = np.sum(h_pred * h_pred)
# if we perfectly fit the residuals, use max step size
return 1 if np.sum(Lpp) == 0 else Lp / Lpp
# 定义 CrossEntropyLoss 类,用于计算交叉熵损失
class CrossEntropyLoss:
# 计算交叉熵损失
def __call__(self, y, y_pred):
eps = np.finfo(float).eps
return -np.sum(y * np.log(y_pred + eps))
# 返回基本估计器
def base_estimator(self):
return ClassProbEstimator()
# 计算梯度
def grad(self, y, y_pred):
eps = np.finfo(float).eps
return -y * 1 / (y_pred + eps)
# 线搜索函数
def line_search(self, y, y_pred, h_pred):
raise NotImplementedError
Tree-Based Models
This module implements:
- Decision trees for classification and regression via the CART algorithm (Breiman, Friedman, Olshen, & Stone, 1984).
- Random forests for classification and regression (an example of bootstrap
aggregating) (Breiman, 2001). - Gradient-boosted decision
trees for classification and regression (an
example of gradient boosted machines) (Friedman, 1999/2001).
Plots
numpy-ml\numpy_ml\trees\rf.py
# 导入 numpy 库并重命名为 np
import numpy as np
# 从当前目录下的 dt 模块中导入 DecisionTree 类
from .dt import DecisionTree
# 定义一个函数 bootstrap_sample,用于生成数据集的自助采样
def bootstrap_sample(X, Y):
# 获取数据集 X 的行数和列数
N, M = X.shape
# 从 0 到 N 中随机选择 N 个数,允许重复
idxs = np.random.choice(N, N, replace=True)
# 返回自助采样后的数据集 X 和 Y
return X[idxs], Y[idxs]
# 定义一个类 RandomForest,表示随机森林模型
class RandomForest:
def __init__(
self, n_trees, max_depth, n_feats, classifier=True, criterion="entropy"
):
"""
An ensemble (forest) of decision trees where each split is calculated
using a random subset of the features in the input.
Parameters
----------
n_trees : int
The number of individual decision trees to use within the ensemble.
max_depth: int or None
The depth at which to stop growing each decision tree. If None,
grow each tree until the leaf nodes are pure.
n_feats : int
The number of features to sample on each split.
classifier : bool
Whether `Y` contains class labels or real-valued targets. Default
is True.
criterion : {'entropy', 'gini', 'mse'}
The error criterion to use when calculating splits for each weak
learner. When ``classifier = False``, valid entries are {'mse'}.
When ``classifier = True``, valid entries are {'entropy', 'gini'}.
Default is 'entropy'.
"""
# 初始化随机森林模型的属性
self.trees = []
self.n_trees = n_trees
self.n_feats = n_feats
self.max_depth = max_depth
self.criterion = criterion
self.classifier = classifier
def fit(self, X, Y):
"""
Create `n_trees`-worth of bootstrapped samples from the training data
and use each to fit a separate decision tree.
"""
# 初始化一个空列表用于存放决策树
self.trees = []
# 循环创建n_trees个决策树
for _ in range(self.n_trees):
# 从训练数据中创建一个自助采样样本
X_samp, Y_samp = bootstrap_sample(X, Y)
# 创建一个决策树对象
tree = DecisionTree(
n_feats=self.n_feats,
max_depth=self.max_depth,
criterion=self.criterion,
classifier=self.classifier,
)
# 使用自助采样样本拟合决策树
tree.fit(X_samp, Y_samp)
# 将拟合好的决策树添加到列表中
self.trees.append(tree)
def predict(self, X):
"""
Predict the target value for each entry in `X`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features.
Returns
-------
y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
Model predictions for each entry in `X`.
"""
# 对每个决策树在输入数据X上进行预测
tree_preds = np.array([[t._traverse(x, t.root) for x in X] for t in self.trees])
# 对每个决策树的预测结果进行投票,返回最终的预测结果
return self._vote(tree_preds)
# 对于每个问题,返回随机森林中所有树的预测结果的聚合值
def _vote(self, predictions):
"""
Return the aggregated prediction across all trees in the RF for each problem.
Parameters
----------
predictions : :py:class:`ndarray <numpy.ndarray>` of shape `(n_trees, N)`
The array of predictions from each decision tree in the RF for each
of the `N` problems in `X`.
Returns
-------
y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
If classifier is True, the class label predicted by the majority of
the decision trees for each problem in `X`. If classifier is False,
the average prediction across decision trees on each problem.
"""
# 如果是分类器,则对每个问题返回大多数决策树预测的类标签
if self.classifier:
out = [np.bincount(x).argmax() for x in predictions.T]
# 如果不是分类器,则对每个问题返回决策树预测的平均值
else:
out = [np.mean(x) for x in predictions.T]
# 将结果转换为数组并返回
return np.array(out)
numpy-ml\numpy_ml\trees\__init__.py
# 从当前目录中导入 losses 模块
from . import losses
# 从当前目录中导入 dt 模块中的所有内容
from .dt import *
# 从当前目录中导入 rf 模块中的所有内容
from .rf import *
# 从当前目录中导入 gbdt 模块中的所有内容
from .gbdt import *
numpy-ml\numpy_ml\utils\data_structures.py
# 导入 heapq 模块,用于实现优先队列
import heapq
# 从 copy 模块中导入 copy 函数
from copy import copy
# 从 collections 模块中导入 Hashable 类
from collections import Hashable
# 导入 numpy 模块,并使用 np 别名
import numpy as np
# 从当前目录下的 distance_metrics 模块中导入 euclidean 函数
from .distance_metrics import euclidean
#######################################################################
# Priority Queue #
#######################################################################
# 定义优先队列节点类
class PQNode(object):
def __init__(self, key, val, priority, entry_id, **kwargs):
"""A generic node object for holding entries in :class:`PriorityQueue`"""
# 初始化节点对象的属性
self.key = key
self.val = val
self.entry_id = entry_id
self.priority = priority
def __repr__(self):
# 返回节点对象的字符串表示
fstr = "PQNode(key={}, val={}, priority={}, entry_id={})"
return fstr.format(self.key, self.val, self.priority, self.entry_id)
def to_dict(self):
"""Return a dictionary representation of the node's contents"""
# 返回节点对象的字典表示
d = self.__dict__
d["id"] = "PQNode"
return d
def __gt__(self, other):
# 实现大于比较操作符
if not isinstance(other, PQNode):
return -1
if self.priority == other.priority:
return self.entry_id > other.entry_id
return self.priority > other.priority
def __ge__(self, other):
# 实现大于等于比较操作符
if not isinstance(other, PQNode):
return -1
return self.priority >= other.priority
def __lt__(self, other):
# 实现小于比较操作符
if not isinstance(other, PQNode):
return -1
if self.priority == other.priority:
return self.entry_id < other.entry_id
return self.priority < other.priority
def __le__(self, other):
# 实现小于等于比较操作符
if not isinstance(other, PQNode):
return -1
return self.priority <= other.priority
# 定义优先队列类
class PriorityQueue:
# 初始化优先队列对象
def __init__(self, capacity, heap_order="max"):
"""
A priority queue implementation using a binary heap.
Notes
-----
A priority queue is a data structure useful for storing the top
`capacity` largest or smallest elements in a collection of values. As a
result of using a binary heap, ``PriorityQueue`` offers `O(log N)`
:meth:`push` and :meth:`pop` operations.
Parameters
----------
capacity: int
The maximum number of items that can be held in the queue.
heap_order: {"max", "min"}
Whether the priority queue should retain the items with the
`capacity` smallest (`heap_order` = 'min') or `capacity` largest
(`heap_order` = 'max') priorities.
"""
# 检查堆序是否合法
assert heap_order in ["max", "min"], "heap_order must be either 'max' or 'min'"
# 初始化容量和堆序
self.capacity = capacity
self.heap_order = heap_order
# 初始化优先队列列表、计数器和条目计数器
self._pq = []
self._count = 0
self._entry_counter = 0
# 返回优先队列的字符串表示
def __repr__(self):
fstr = "PriorityQueue(capacity={}, heap_order={}) with {} items"
return fstr.format(self.capacity, self.heap_order, self._count)
# 返回优先队列中元素的数量
def __len__(self):
return self._count
# 返回优先队列的迭代器
def __iter__(self):
return iter(self._pq)
def push(self, key, priority, val=None):
"""
Add a new (key, value) pair with priority `priority` to the queue.
Notes
-----
If the queue is at capacity and `priority` exceeds the priority of the
item with the largest/smallest priority currently in the queue, replace
the current queue item with (`key`, `val`).
Parameters
----------
key : hashable object
The key to insert into the queue.
priority : comparable
The priority for the `key`, `val` pair.
val : object
The value associated with `key`. Default is None.
"""
# 如果队列是最大堆,则将优先级取负数
if self.heap_order == "max":
priority = -1 * priority
# 创建一个新的 PQNode 对象
item = PQNode(key=key, val=val, priority=priority, entry_id=self._entry_counter)
# 将新节点加入堆中
heapq.heappush(self._pq, item)
# 更新计数器
self._count += 1
self._entry_counter += 1
# 当队列超过容量时,弹出队列中的元素
while self._count > self.capacity:
self.pop()
def pop(self):
"""
Remove the item with the largest/smallest (depending on
``self.heap_order``) priority from the queue and return it.
Notes
-----
In contrast to :meth:`peek`, this operation is `O(log N)`.
Returns
-------
item : :class:`PQNode` instance or None
Item with the largest/smallest priority, depending on
``self.heap_order``.
"""
# 从堆中弹出具有最大/最小优先级的元素,并将其转换为字典
item = heapq.heappop(self._pq).to_dict()
# 如果队列是最大堆,则将优先级取负数恢复原值
if self.heap_order == "max":
item["priority"] = -1 * item["priority"]
# 更新计数器
self._count -= 1
return item
# 返回具有最大/最小优先级(取决于self.heap_order)的项目,但不从队列中删除它
def peek(self):
"""
Return the item with the largest/smallest (depending on
``self.heap_order``) priority *without* removing it from the queue.
Notes
-----
In contrast to :meth:`pop`, this operation is O(1).
Returns
-------
item : :class:`PQNode` instance or None
Item with the largest/smallest priority, depending on
``self.heap_order``.
"""
# 初始化item为None
item = None
# 如果队列中有元素
if self._count > 0:
# 复制队列中第一个元素的字典表示
item = copy(self._pq[0].to_dict())
# 如果堆的顺序是最大堆
if self.heap_order == "max":
# 将item的优先级取反
item["priority"] = -1 * item["priority"]
# 返回item
return item
# 定义 BallTreeNode 类,表示 Ball 树的节点
class BallTreeNode:
# 初始化方法,设置节点的属性
def __init__(self, centroid=None, X=None, y=None):
self.left = None
self.right = None
self.radius = None
self.is_leaf = False
self.data = X
self.targets = y
self.centroid = centroid
# 重写 __repr__ 方法,返回节点的字符串表示
def __repr__(self):
fstr = "BallTreeNode(centroid={}, is_leaf={})"
return fstr.format(self.centroid, self.is_leaf)
# 将节点转换为字典形式
def to_dict(self):
d = self.__dict__
d["id"] = "BallTreeNode"
return d
# 定义 BallTree 类
class BallTree:
# 初始化 BallTree 数据结构
def __init__(self, leaf_size=40, metric=None):
"""
A ball tree data structure.
Notes
-----
A ball tree is a binary tree in which every node defines a
`D`-dimensional hypersphere ("ball") containing a subset of the points
to be searched. Each internal node of the tree partitions the data
points into two disjoint sets which are associated with different
balls. While the balls themselves may intersect, each point is assigned
to one or the other ball in the partition according to its distance
from the ball's center. Each leaf node in the tree defines a ball and
enumerates all data points inside that ball.
Parameters
----------
leaf_size : int
The maximum number of datapoints at each leaf. Default is 40.
metric : :doc:`Distance metric <numpy_ml.utils.distance_metrics>` or None
The distance metric to use for computing nearest neighbors. If
None, use the :func:`~numpy_ml.utils.distance_metrics.euclidean`
metric. Default is None.
References
----------
.. [1] Omohundro, S. M. (1989). "Five balltree construction algorithms". *ICSI
Technical Report TR-89-063*.
.. [2] Liu, T., Moore, A., & Gray A. (2006). "New algorithms for efficient
high-dimensional nonparametric classification". *J. Mach. Learn. Res.,
7*, 1135-1158.
"""
# 初始化根节点为 None
self.root = None
# 设置叶子节点最大数据点数,默认为 40
self.leaf_size = leaf_size
# 设置距离度量方式,如果未指定则使用欧氏距离
self.metric = metric if metric is not None else euclidean
def fit(self, X, y=None):
"""
Build a ball tree recursively using the O(M log N) `k`-d construction
algorithm.
Notes
-----
Recursively divides data into nodes defined by a centroid `C` and radius
`r` such that each point below the node lies within the hyper-sphere
defined by `C` and `r`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
An array of `N` examples each with `M` features.
y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, \*)` or None
An array of target values / labels associated with the entries in
`X`. Default is None.
"""
# 分割数据集,得到中心点、左子集、右子集
centroid, left_X, left_y, right_X, right_y = self._split(X, y)
# 创建根节点,设置中心点和半径
self.root = BallTreeNode(centroid=centroid)
self.root.radius = np.max([self.metric(centroid, x) for x in X])
# 递归构建左右子树
self.root.left = self._build_tree(left_X, left_y)
self.root.right = self._build_tree(right_X, right_y)
def _build_tree(self, X, y):
# 分割数据集,得到中心点、左子集、右子集
centroid, left_X, left_y, right_X, right_y = self._split(X, y)
# 如果数据集大小小于等于叶子节点大小阈值,则创建叶子节点
if X.shape[0] <= self.leaf_size:
leaf = BallTreeNode(centroid=centroid, X=X, y=y)
leaf.radius = np.max([self.metric(centroid, x) for x in X])
leaf.is_leaf = True
return leaf
# 创建内部节点,设置中心点和半径
node = BallTreeNode(centroid=centroid)
node.radius = np.max([self.metric(centroid, x) for x in X])
# 递归构建左右子树
node.left = self._build_tree(left_X, left_y)
node.right = self._build_tree(right_X, right_y)
return node
# 将数据集 X 拆分成两部分,根据最大方差的维度进行拆分
def _split(self, X, y=None):
# 找到方差最大的维度
split_dim = np.argmax(np.var(X, axis=0))
# 沿着 split_dim 对 X 和 y 进行排序
sort_ixs = np.argsort(X[:, split_dim])
X, y = X[sort_ixs], y[sort_ixs] if y is not None else None
# 在 split_dim 的中位数处划分数据
med_ix = X.shape[0] // 2
centroid = X[med_ix] # , split_dim
# 在中心点(中位数总是出现在右侧划分)处将数据分成两半
left_X, left_y = X[:med_ix], y[:med_ix] if y is not None else None
right_X, right_y = X[med_ix:], y[med_ix:] if y is not None else None
return centroid, left_X, left_y, right_X, right_y
# 使用 KNS1 算法找到球树中距离查询向量 x 最近的 k 个邻居
def nearest_neighbors(self, k, x):
"""
Find the `k` nearest neighbors in the ball tree to a query vector `x`
using the KNS1 algorithm.
Parameters
----------
k : int
The number of closest points in `X` to return
x : :py:class:`ndarray <numpy.ndarray>` of shape `(1, M)`
The query vector.
Returns
-------
nearest : list of :class:`PQNode` s of length `k`
List of the `k` points in `X` to closest to the query vector. The
``key`` attribute of each :class:`PQNode` contains the point itself, the
``val`` attribute contains its target, and the ``distance``
attribute contains its distance to the query vector.
"""
# 维护一个最大优先队列,优先级为到 x 的距离
PQ = PriorityQueue(capacity=k, heap_order="max")
# 调用 _knn 方法找到最近的 k 个邻居
nearest = self._knn(k, x, PQ, self.root)
# 计算每个邻居点到查询向量 x 的距离
for n in nearest:
n.distance = self.metric(x, n.key)
return nearest
# 使用 k 近邻算法查找最近的 k 个点
def _knn(self, k, x, PQ, root):
# 定义距离度量函数
dist = self.metric
# 计算点 x 到当前节点球体表面的距离
dist_to_ball = dist(x, root.centroid) - root.radius
# 计算点 x 到当前优先队列中最远邻居的距离
dist_to_farthest_neighbor = dist(x, PQ.peek()["key"]) if len(PQ) > 0 else np.inf
# 如果点 x 到球体表面的距离大于等于到最远邻居的距离,并且优先队列中已经有 k 个点,则直接返回当前优先队列
if dist_to_ball >= dist_to_farthest_neighbor and len(PQ) == k:
return PQ
# 如果当前节点是叶子节点
if root.is_leaf:
# 初始化目标值列表
targets = [None] * len(root.data) if root.targets is None else root.targets
# 遍历当前节点的数据点和目标值
for point, target in zip(root.data, targets):
# 计算点 x 到当前数据点的距离
dist_to_x = dist(x, point)
# 如果优先队列中已经有 k 个点,并且点 x 到当前数据点的距离小于到最远邻居的距离,则将当前数据点加入优先队列
if len(PQ) == k and dist_to_x < dist_to_farthest_neighbor:
PQ.push(key=point, val=target, priority=dist_to_x)
else:
PQ.push(key=point, val=target, priority=dist_to_x)
else:
# 判断点 x 更接近左子节点还是右子节点
l_closest = dist(x, root.left.centroid) < dist(x, root.right.centroid)
# 递归调用_knn函数,继续在左子树中查找最近的 k 个点
PQ = self._knn(k, x, PQ, root.left if l_closest else root.right)
# 递归调用_knn函数,继续在右子树中查找最近的 k 个点
PQ = self._knn(k, x, PQ, root.right if l_closest else root.left)
# 返回最终的优先队列
return PQ
#######################################################################
# Multinomial Sampler #
#######################################################################
# 定义一个离散采样器类
class DiscreteSampler:
# 从概率分布`probs`中生成随机抽样整数,范围在[0, N)之间
def __call__(self, n_samples=1):
"""
Generate random draws from the `probs` distribution over integers in
[0, N).
Parameters
----------
n_samples: int
The number of samples to generate. Default is 1.
Returns
-------
sample : :py:class:`ndarray <numpy.ndarray>` of shape `(n_samples,)`
A collection of draws from the distribution defined by `probs`.
Each sample is an int in the range `[0, N)`.
"""
return self.sample(n_samples)
# 从概率分布`probs`中生成随机抽样整数,范围在[0, N)之间
def sample(self, n_samples=1):
"""
Generate random draws from the `probs` distribution over integers in
[0, N).
Parameters
----------
n_samples: int
The number of samples to generate. Default is 1.
Returns
-------
sample : :py:class:`ndarray <numpy.ndarray>` of shape `(n_samples,)`
A collection of draws from the distribution defined by `probs`.
Each sample is an int in the range `[0, N)`.
"""
# 生成随机整数索引
ixs = np.random.randint(0, self.N, n_samples)
# 计算概率值
p = np.exp(self.prob_table[ixs]) if self.log else self.prob_table[ixs]
# 生成二项分布随机数
flips = np.random.binomial(1, p)
# 根据二项分布结果生成抽样
samples = [ix if f else self.alias_table[ix] for ix, f in zip(ixs, flips)]
# 使用递归拒绝采样来进行无重复抽样
if not self.with_replacement:
unique = list(set(samples))
while len(samples) != len(unique):
n_new = len(samples) - len(unique)
samples = unique + self.sample(n_new).tolist()
unique = list(set(samples))
return np.array(samples, dtype=int)
# 定义一个名为 Dict 的字典子类
class Dict(dict):
# 初始化方法,接受一个编码器参数
def __init__(self, encoder=None):
"""
A dictionary subclass which returns the key value if it is not in the
dict.
Parameters
----------
encoder : function or None
A function which is applied to a key before adding / retrieving it
from the dictionary. If None, the function defaults to the
identity. Default is None.
"""
# 调用父类的初始化方法
super(Dict, self).__init__()
# 设置编码器和最大 ID
self._encoder = encoder
self._id_max = 0
# 重写 __setitem__ 方法
def __setitem__(self, key, value):
# 如果存在编码器,则对键进行编码
if self._encoder is not None:
key = self._encoder(key)
# 如果键不可哈希,则转换为元组
elif not isinstance(key, Hashable):
key = tuple(key)
# 调用父类的 __setitem__ 方法
super(Dict, self).__setitem__(key, value)
# 编码键的方法
def _encode_key(self, key):
# 获取父类的实例
D = super(Dict, self)
# 对键进行编码
enc_key = self._encoder(key)
# 如果编码后的键存在于字典中,则返回对应的值,否则添加新键值对
if D.__contains__(enc_key):
val = D.__getitem__(enc_key)
else:
val = self._id_max
D.__setitem__(enc_key, val)
self._id_max += 1
return val
# 重写 __getitem__ 方法
def __getitem__(self, key):
# 深拷贝键
self._key = copy.deepcopy(key)
# 如果存在编码器,则返回编码后的键对应的值
if self._encoder is not None:
return self._encode_key(key)
# 如果键不可哈希,则转换为元组
elif not isinstance(key, Hashable):
key = tuple(key)
# 调用父类的 __getitem__ 方法
return super(Dict, self).__getitem__(key)
# 处理缺失键的方法
def __missing__(self, key):
return self._key
numpy-ml\numpy_ml\utils\distance_metrics.py
import numpy as np
# 计算两个实向量之间的欧几里德(L2)距离
def euclidean(x, y):
"""
Compute the Euclidean (`L2`) distance between two real vectors
Notes
-----
The Euclidean distance between two vectors **x** and **y** is
.. math::
d(\mathbf{x}, \mathbf{y}) = \sqrt{ \sum_i (x_i - y_i)^2 }
Parameters
----------
x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
The two vectors to compute the distance between
Returns
-------
d : float
The L2 distance between **x** and **y.
"""
return np.sqrt(np.sum((x - y) ** 2))
# 计算两个实向量之间的曼哈顿(L1)距离
def manhattan(x, y):
"""
Compute the Manhattan (`L1`) distance between two real vectors
Notes
-----
The Manhattan distance between two vectors **x** and **y** is
.. math::
d(\mathbf{x}, \mathbf{y}) = \sum_i |x_i - y_i|
Parameters
----------
x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
The two vectors to compute the distance between
Returns
-------
d : float
The L1 distance between **x** and **y.
"""
return np.sum(np.abs(x - y))
# 计算两个实向量之间的切比雪夫(L∞)距离
def chebyshev(x, y):
"""
Compute the Chebyshev (:math:`L_\infty`) distance between two real vectors
Notes
-----
The Chebyshev distance between two vectors **x** and **y** is
.. math::
d(\mathbf{x}, \mathbf{y}) = \max_i |x_i - y_i|
Parameters
----------
x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
The two vectors to compute the distance between
Returns
-------
d : float
The Chebyshev distance between **x** and **y.
"""
return np.max(np.abs(x - y))
# 计算两个实向量之间的闵可夫斯基(Minkowski)距离
def minkowski(x, y, p):
"""
Compute the Minkowski-`p` distance between two real vectors.
Notes
-----
The Minkowski-`p` distance between two vectors **x** and **y** is
.. math::
d(\mathbf{x}, \mathbf{y}) = \left( \sum_i |x_i - y_i|^p \\right)^{1/p}
Parameters
----------
# 定义两个形状为`(N,)`的ndarray类型的向量x和y,用于计算它们之间的距离
x,y : :py:class:`ndarray <numpy.ndarray>` s of shape `(N,)`
The two vectors to compute the distance between
# 定义距离函数的参数p,当p = 1时,为L1距离,当p = 2时,为L2距离。当p < 1时,闵可夫斯基-p不满足三角不等式,因此不是有效的距离度量
p : float > 1
The parameter of the distance function. When `p = 1`, this is the `L1`
distance, and when `p=2`, this is the `L2` distance. For `p < 1`,
Minkowski-`p` does not satisfy the triangle inequality and hence is not
a valid distance metric.
# 返回值
Returns
-------
# 返回向量x和y之间的闵可夫斯基-p距离
d : float
The Minkowski-`p` distance between **x** and **y**.
"""
# 计算闵可夫斯基-p距离的公式
return np.sum(np.abs(x - y) ** p) ** (1 / p)
# 计算两个整数向量之间的汉明距离
# 汉明距离是指两个向量 x 和 y 之间的距离
# 其计算方式为:d(x, y) = 1/N * Σ(1_{x_i ≠ y_i})
# 参数:
# x, y:形状为(N,)的numpy.ndarray数组
# 要计算距离的两个向量。这两个向量应为整数值。
# 返回值:
# d:浮点数
# x 和 y 之间的汉明距离。
def hamming(x, y):
return np.sum(x != y) / len(x)
numpy-ml\numpy_ml\utils\graphs.py
from abc import ABC, abstractmethod
from collections import defaultdict
from itertools import combinations, permutations
import numpy as np
#######################################################################
# Graph Components #
#######################################################################
# 定义边对象
class Edge(object):
def __init__(self, fr, to, w=None):
"""
A generic directed edge object.
Parameters
----------
fr: int
The id of the vertex the edge goes from
to: int
The id of the vertex the edge goes to
w: float, :class:`Object` instance, or None
The edge weight, if applicable. If weight is an arbitrary Object it
must have a method called 'sample' which takes no arguments and
returns a random sample from the weight distribution. If `w` is
None, no weight is assumed. Default is None.
"""
self.fr = fr
self.to = to
self._w = w
def __repr__(self):
return "{} -> {}, weight: {}".format(self.fr, self.to, self._w)
@property
def weight(self):
return self._w.sample() if hasattr(self._w, "sample") else self._w
def reverse(self):
"""Reverse the edge direction"""
return Edge(self.t, self.f, self.w)
#######################################################################
# Graph Types #
#######################################################################
# 定义图的抽象基类
class Graph(ABC):
def __init__(self, V, E):
self._I2V = {i: v for i, v in zip(range(len(V)), V)}
self._V2I = {v: i for i, v in zip(range(len(V)), V)}
self._G = {i: set() for i in range(len(V))}
self._V = V
self._E = E
self._build_adjacency_list()
def __getitem__(self, v_i):
return self.get_neighbors(v_i)
# 获取给定顶点的内部索引
def get_index(self, v):
"""Get the internal index for a given vetex"""
return self._V2I[v]
# 根据给定内部索引获取原始顶点
def get_vertex(self, v_i):
"""Get the original vertex from a given internal index"""
return self._I2V[v_i]
# 返回图的顶点集合
@property
def vertices(self):
return self._V
# 返回图的顶点索引集合
@property
def indices(self):
return list(range(len(self.vertices)))
# 返回图的边集合
@property
def edges(self):
return self._E
# 返回从具有索引 `v_i` 的顶点可达的顶点的内部索引列表
def get_neighbors(self, v_i):
"""
Return the internal indices of the vertices reachable from the vertex
with index `v_i`.
"""
return [self._V2I[e.to] for e in self._G[v_i]]
# 返回图的邻接矩阵表示
def to_matrix(self):
"""Return an adjacency matrix representation of the graph"""
adj_mat = np.zeros((len(self._V), len(self._V)))
for e in self.edges:
fr, to = self._V2I[e.fr], self._V2I[e.to]
adj_mat[fr, to] = 1 if e.weight is None else e.weight
return adj_mat
# 返回图的邻接字典表示
def to_adj_dict(self):
"""Return an adjacency dictionary representation of the graph"""
adj_dict = defaultdict(lambda: list())
for e in self.edges:
adj_dict[e.fr].append(e)
return adj_dict
def path_exists(self, s_i, e_i):
"""
Check whether a path exists from vertex index `s_i` to `e_i`.
Parameters
----------
s_i: Int
The interal index of the start vertex
e_i: Int
The internal index of the end vertex
Returns
-------
path_exists : Boolean
Whether or not a valid path exists between `s_i` and `e_i`.
"""
# 初始化队列,起始点为s_i,路径为[s_i]
queue = [(s_i, [s_i])]
# 循环直到队列为空
while len(queue):
# 取出队列中的第一个元素
c_i, path = queue.pop(0)
# 获取当前节点的邻居节点,但不包括已经在路径中的节点
nbrs_not_on_path = set(self.get_neighbors(c_i)) - set(path)
# 遍历邻居节点
for n_i in nbrs_not_on_path:
# 将邻居节点加入路径,并加入队列
queue.append((n_i, path + [n_i]))
# 如果找到终点e_i,则返回True
if n_i == e_i:
return True
# 如果循环结束仍未找到路径,则返回False
return False
def all_paths(self, s_i, e_i):
"""
Find all simple paths between `s_i` and `e_i` in the graph.
Notes
-----
Uses breadth-first search. Ignores all paths with repeated vertices.
Parameters
----------
s_i: Int
The interal index of the start vertex
e_i: Int
The internal index of the end vertex
Returns
-------
complete_paths : list of lists
A list of all paths from `s_i` to `e_i`. Each path is represented
as a list of interal vertex indices.
"""
# 初始化存储所有路径的列表
complete_paths = []
# 初始化队列,起始点为s_i,路径为[s_i]
queue = [(s_i, [s_i])]
# 循环直到队列为空
while len(queue):
# 取出队列中的第一个元素
c_i, path = queue.pop(0)
# 获取当前节点的邻居节点,但不包括已经在路径中的节点
nbrs_not_on_path = set(self.get_neighbors(c_i)) - set(path)
# 遍历邻居节点
for n_i in nbrs_not_on_path:
# 如果找到终点e_i,则将完整路径加入complete_paths
if n_i == e_i:
complete_paths.append(path + [n_i])
else:
# 将邻居节点加入路径,并加入队列
queue.append((n_i, path + [n_i]))
# 返回所有找到的路径
return complete_paths
@abstractmethod
def _build_adjacency_list(self):
pass
class DiGraph(Graph):
def __init__(self, V, E):
"""
A generic directed graph object.
Parameters
----------
V : list
A list of vertex IDs.
E : list of :class:`Edge <numpy_ml.utils.graphs.Edge>` objects
A list of directed edges connecting pairs of vertices in ``V``.
"""
# 调用父类的构造函数,传入顶点列表和边列表
super().__init__(V, E)
# 设置图为有向图
self.is_directed = True
# 初始化拓扑排序列表为空
self._topological_ordering = []
def _build_adjacency_list(self):
"""Encode directed graph as an adjancency list"""
# 假设没有平行边
for e in self.edges:
# 获取起始顶点在顶点列表中的索引
fr_i = self._V2I[e.fr]
# 将边添加到起始顶点的邻接表中
self._G[fr_i].add(e)
def reverse(self):
"""Reverse the direction of all edges in the graph"""
# 返回一个新的有向图对象,边的方向取反
return DiGraph(self.vertices, [e.reverse() for e in self.edges])
def is_acyclic(self):
"""Check whether the graph contains cycles"""
# 检查图是否包含环,通过判断拓扑排序结果是否为None来判断
return self.topological_ordering() is not None
class UndirectedGraph(Graph):
def __init__(self, V, E):
"""
A generic undirected graph object.
Parameters
----------
V : list
A list of vertex IDs.
E : list of :class:`Edge <numpy_ml.utils.graphs.Edge>` objects
A list of edges connecting pairs of vertices in ``V``. For any edge
connecting vertex `u` to vertex `v`, :class:`UndirectedGraph
<numpy_ml.utils.graphs.UndirectedGraph>` will assume that there
exists a corresponding edge connecting `v` to `u`, even if this is
not present in `E`.
"""
# 调用父类的构造函数,传入顶点列表和边列表
super().__init__(V, E)
# 设置图为无向图
self.is_directed = False
# 构建邻接表来表示无向、无权重图
def _build_adjacency_list(self):
"""Encode undirected, unweighted graph as an adjancency list"""
# 假设没有平行边
# 每条边出现两次,分别为 (u,v) 和 (v,u)
for e in self.edges:
# 获取起始节点和目标节点在节点列表中的索引
fr_i = self._V2I[e.fr]
to_i = self._V2I[e.to]
# 将边添加到起始节点的邻接表中
self._G[fr_i].add(e)
# 将边的反向边添加到目标节点的邻接表中
self._G[to_i].add(e.reverse())
#######################################################################
# Graph Generators #
#######################################################################
# 生成一个无权 Erdős-Rényi 随机图
def random_unweighted_graph(n_vertices, edge_prob=0.5, directed=False):
"""
Generate an unweighted Erdős-Rényi random graph [*]_.
References
----------
.. [*] Erdős, P. and Rényi, A. (1959). On Random Graphs, *Publ. Math. 6*, 290.
Parameters
----------
n_vertices : int
The number of vertices in the graph.
edge_prob : float in [0, 1]
The probability of forming an edge between two vertices. Default is
0.5.
directed : bool
Whether the edges in the graph should be directed. Default is False.
Returns
-------
G : :class:`Graph` instance
The resulting random graph.
"""
vertices = list(range(n_vertices))
candidates = permutations(vertices, 2) if directed else combinations(vertices, 2)
edges = []
# 遍历所有可能的边
for (fr, to) in candidates:
# 根据概率确定是否生成边
if np.random.rand() <= edge_prob:
edges.append(Edge(fr, to))
# 如果是有向图,则返回有向图对象;否则返回无向图对象
return DiGraph(vertices, edges) if directed else UndirectedGraph(vertices, edges)
# 创建一个 '随机' 无权有向无环图,通过修剪所有反向连接来生成
def random_DAG(n_vertices, edge_prob=0.5):
"""
Create a 'random' unweighted directed acyclic graph by pruning all the
backward connections from a random graph.
Parameters
----------
n_vertices : int
The number of vertices in the graph.
edge_prob : float in [0, 1]
The probability of forming an edge between two vertices in the
underlying random graph, before edge pruning. Default is 0.5.
Returns
-------
G : :class:`Graph` instance
The resulting DAG.
"""
# 生成一个有向随机图
G = random_unweighted_graph(n_vertices, edge_prob, directed=True)
# 修剪边以删除顶点之间的反向连接
G = DiGraph(G.vertices, [e for e in G.edges if e.fr < e.to])
# 如果我们删除了所有边,则生成一个新的图
while not len(G.edges):
# 生成一个具有随机权重的有向图
G = random_unweighted_graph(n_vertices, edge_prob, directed=True)
# 创建一个新的有向图,只包含起点小于终点的边
G = DiGraph(G.vertices, [e for e in G.edges if e.fr < e.to])
# 返回生成的图
return G
numpy-ml\numpy_ml\utils\kernels.py
# 导入 re 模块,用于正则表达式操作
# 导入 ABC 抽象基类和 abstractmethod 装饰器
import re
from abc import ABC, abstractmethod
# 导入 numpy 模块,并重命名为 np
import numpy as np
# 定义一个抽象基类 KernelBase
class KernelBase(ABC):
# 初始化方法
def __init__(self):
super().__init__()
# 初始化参数字典和超参数字典
self.parameters = {}
self.hyperparameters = {}
# 抽象方法,用于计算核函数
@abstractmethod
def _kernel(self, X, Y):
raise NotImplementedError
# 重载 __call__ 方法,调用 _kernel 方法
def __call__(self, X, Y=None):
"""Refer to documentation for the `_kernel` method"""
return self._kernel(X, Y)
# 重载 __str__ 方法,返回模型的字符串表示
def __str__(self):
P, H = self.parameters, self.hyperparameters
p_str = ", ".join(["{}={}".format(k, v) for k, v in P.items()])
return "{}({})".format(H["id"], p_str)
# 定义 summary 方法,返回模型参数、超参数和 ID 的字典
def summary(self):
"""Return the dictionary of model parameters, hyperparameters, and ID"""
return {
"id": self.hyperparameters["id"],
"parameters": self.parameters,
"hyperparameters": self.hyperparameters,
}
def set_params(self, summary_dict):
"""
Set the model parameters and hyperparameters using the settings in
`summary_dict`.
Parameters
----------
summary_dict : dict
A dictionary with keys 'parameters' and 'hyperparameters',
structured as would be returned by the :meth:`summary` method. If
a particular (hyper)parameter is not included in this dict, the
current value will be used.
Returns
-------
new_kernel : :doc:`Kernel <numpy_ml.utils.kernels>` instance
A kernel with parameters and hyperparameters adjusted to those
specified in `summary_dict`.
"""
# 将 self 和 summary_dict 分别赋值给 kr 和 sd
kr, sd = self, summary_dict
# 将 `parameters` 和 `hyperparameters` 嵌套字典合并为一个字典
flatten_keys = ["parameters", "hyperparameters"]
for k in flatten_keys:
if k in sd:
entry = sd[k]
sd.update(entry)
del sd[k]
# 遍历 summary_dict 中的键值对
for k, v in sd.items():
# 如果键在 self 的 parameters 中,则更新参数值
if k in self.parameters:
kr.parameters[k] = v
# 如果键在 self 的 hyperparameters 中,则更新超参数值
if k in self.hyperparameters:
kr.hyperparameters[k] = v
# 返回更新后的 kernel 实例
return kr
# 定义线性核函数类,继承自 KernelBase 类
class LinearKernel(KernelBase):
# 初始化函数,设置线性核函数的参数 c0
def __init__(self, c0=0):
"""
The linear (i.e., dot-product) kernel.
Notes
-----
For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the linear
kernel is:
.. math::
k(\mathbf{x}, \mathbf{y}) = \mathbf{x}^\\top \mathbf{y} + c_0
Parameters
----------
c0 : float
An "inhomogeneity" parameter. When `c0` = 0, the kernel is said to be
homogenous. Default is 1.
"""
# 调用父类的初始化函数
super().__init__()
# 设置超参数字典
self.hyperparameters = {"id": "LinearKernel"}
# 设置参数字典,包括 c0 参数
self.parameters = {"c0": c0}
# 计算线性核函数的内部函数,计算输入矩阵 X 和 Y 的点积
def _kernel(self, X, Y=None):
"""
Compute the linear kernel (i.e., dot-product) between all pairs of rows in
`X` and `Y`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
Collection of `N` input vectors
Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)` or None
Collection of `M` input vectors. If None, assume `Y` = `X`.
Default is None.
Returns
-------
out : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
Similarity between `X` and `Y`, where index (`i`, `j`) gives
:math:`k(x_i, y_j)`.
"""
# 检查输入矩阵 X 和 Y,确保它们符合要求
X, Y = kernel_checks(X, Y)
# 返回 X 和 Y 的点积加上参数 c0 的结果
return X @ Y.T + self.parameters["c0"]
class PolynomialKernel(KernelBase):
# 初始化多项式核函数对象,设置默认参数值
def __init__(self, d=3, gamma=None, c0=1):
"""
The degree-`d` polynomial kernel.
Notes
-----
For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the polynomial
kernel is:
.. math::
k(\mathbf{x}, \mathbf{y}) = (\gamma \mathbf{x}^\\top \mathbf{y} + c_0)^d
In contrast to the linear kernel, the polynomial kernel also computes
similarities *across* dimensions of the **x** and **y** vectors,
allowing it to account for interactions between features. As an
instance of the dot product family of kernels, the polynomial kernel is
invariant to a rotation of the coordinates about the origin, but *not*
to translations.
Parameters
----------
d : int
Degree of the polynomial kernel. Default is 3.
gamma : float or None
A scaling parameter for the dot product between `x` and `y`,
determining the amount of smoothing/resonlution of the kernel.
Larger values result in greater smoothing. If None, defaults to 1 /
`C`. Sometimes referred to as the kernel bandwidth. Default is
None.
c0 : float
Parameter trading off the influence of higher-order versus lower-order
terms in the polynomial. If `c0` = 0, the kernel is said to be
homogenous. Default is 1.
"""
# 调用父类的初始化方法
super().__init__()
# 设置核函数的超参数字典
self.hyperparameters = {"id": "PolynomialKernel"}
# 设置核函数的参数字典
self.parameters = {"d": d, "c0": c0, "gamma": gamma}
# 定义一个私有方法,计算输入矩阵 X 和 Y 之间的度为 d 的多项式核
def _kernel(self, X, Y=None):
"""
Compute the degree-`d` polynomial kernel between all pairs of rows in `X`
and `Y`.
# 参数:
# X:形状为 `(N, C)` 的 ndarray,包含 `N` 个输入向量
# Y:形状为 `(M, C)` 的 ndarray 或 None
# 包含 `M` 个输入向量。如果为 None,则假定 `Y = X`。默认为 None。
# 返回:
# out:形状为 `(N, M)` 的 ndarray
# `X` 和 `Y` 之间的相似度,其中索引 (`i`, `j`) 给出了 :math:`k(x_i, y_j)`(即核的格拉姆矩阵)。
"""
# 获取模型参数
P = self.parameters
# 检查并处理输入矩阵 X 和 Y
X, Y = kernel_checks(X, Y)
# 计算 gamma 值,如果参数中未指定 gamma,则默认为 1 / X.shape[1]
gamma = 1 / X.shape[1] if P["gamma"] is None else P["gamma"]
# 计算多项式核的值并返回结果
return (gamma * (X @ Y.T) + P["c0"]) ** P["d"]
# 定义 RBFKernel 类,继承自 KernelBase 类
class RBFKernel(KernelBase):
# 初始化方法,接受一个 sigma 参数
def __init__(self, sigma=None):
"""
Radial basis function (RBF) / squared exponential kernel.
Notes
-----
For input vectors :math:`\mathbf{x}` and :math:`\mathbf{y}`, the radial
basis function kernel is:
.. math::
k(\mathbf{x}, \mathbf{y}) = \exp \left\{ -0.5
\left\lVert \\frac{\mathbf{x} -
\mathbf{y}}{\sigma} \\right\\rVert_2^2 \\right\}
The RBF kernel decreases with distance and ranges between zero (in the
limit) to one (when **x** = **y**). Notably, the implied feature space
of the kernel has an infinite number of dimensions.
Parameters
----------
sigma : float or array of shape `(C,)` or None
A scaling parameter for the vectors **x** and **y**, producing an
isotropic kernel if a float, or an anisotropic kernel if an array of
length `C`. Larger values result in higher resolution / greater
smoothing. If None, defaults to :math:`\sqrt(C / 2)`. Sometimes
referred to as the kernel 'bandwidth'. Default is None.
"""
# 调用父类的初始化方法
super().__init__()
# 设置超参数字典
self.hyperparameters = {"id": "RBFKernel"}
# 设置参数字典,包含 sigma 参数
self.parameters = {"sigma": sigma}
# 计算输入向量 X 和 Y 之间的径向基函数(RBF)核
def _kernel(self, X, Y=None):
"""
Computes the radial basis function (RBF) kernel between all pairs of
rows in `X` and `Y`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
Collection of `N` input vectors, each with dimension `C`.
Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)`
Collection of `M` input vectors. If None, assume `Y` = `X`. Default
is None.
Returns
-------
out : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
Similarity between `X` and `Y` where index (i, j) gives :math:`k(x_i, y_j)`.
"""
# 获取参数字典
P = self.parameters
# 检查输入向量 X 和 Y,确保它们是合法的 ndarray
X, Y = kernel_checks(X, Y)
# 计算默认的 sigma 值,如果参数字允许的话
sigma = np.sqrt(X.shape[1] / 2) if P["sigma"] is None else P["sigma"]
# 计算核矩阵,使用高斯核函数
return np.exp(-0.5 * pairwise_l2_distances(X / sigma, Y / sigma) ** 2)
class KernelInitializer(object):
# 初始化学习率调度器的类,可以接受以下类型的输入:
# (a) `KernelBase` 实例的 __str__ 表示
# (b) `KernelBase` 实例
# (c) 参数字典(例如,通过 `KernelBase` 实例的 `summary` 方法生成的)
# 如果 `param` 为 None,则返回 `LinearKernel`
def __init__(self, param=None):
self.param = param
def __call__(self):
param = self.param
if param is None:
kernel = LinearKernel()
elif isinstance(param, KernelBase):
kernel = param
elif isinstance(param, str):
kernel = self.init_from_str()
elif isinstance(param, dict):
kernel = self.init_from_dict()
return kernel
def init_from_str(self):
# 定义正则表达式,用于解析参数字符串
r = r"([a-zA-Z0-9]*)=([^,)]*)"
# 将参数字符串转换为小写
kr_str = self.param.lower()
# 解析参数字符串,生成参数字典
kwargs = dict([(i, eval(j)) for (i, j) in re.findall(r, self.param)])
if "linear" in kr_str:
kernel = LinearKernel(**kwargs)
elif "polynomial" in kr_str:
kernel = PolynomialKernel(**kwargs)
elif "rbf" in kr_str:
kernel = RBFKernel(**kwargs)
else:
raise NotImplementedError("{}".format(kr_str))
return kernel
def init_from_dict(self):
S = self.param
# 获取超参数字典
sc = S["hyperparameters"] if "hyperparameters" in S else None
if sc is None:
raise ValueError("Must have `hyperparameters` key: {}".format(S))
if sc and sc["id"] == "LinearKernel":
scheduler = LinearKernel().set_params(S)
elif sc and sc["id"] == "PolynomialKernel":
scheduler = PolynomialKernel().set_params(S)
elif sc and sc["id"] == "RBFKernel":
scheduler = RBFKernel().set_params(S)
elif sc:
raise NotImplementedError("{}".format(sc["id"]))
return scheduler
# 检查输入数据 X 和 Y 的维度,如果 X 是一维数组,则将其转换为二维数组
X = X.reshape(-1, 1) if X.ndim == 1 else X
# 如果 Y 为 None,则将 Y 设置为 X
Y = X if Y is None else Y
# 检查 Y 的维度,如果 Y 是一维数组,则将其转换为二维数组
Y = Y.reshape(-1, 1) if Y.ndim == 1 else Y
# 断言 X 必须有两个维度
assert X.ndim == 2, "X must have 2 dimensions, but got {}".format(X.ndim)
# 断言 Y 必须有两个维度
assert Y.ndim == 2, "Y must have 2 dimensions, but got {}".format(Y.ndim)
# 断言 X 和 Y 必须有相同的列数
assert X.shape[1] == Y.shape[1], "X and Y must have the same number of columns"
# 返回经过检查和处理后的 X 和 Y
return X, Y
def pairwise_l2_distances(X, Y):
"""
A fast, vectorized way to compute pairwise l2 distances between rows in `X`
and `Y`.
Notes
-----
An entry of the pairwise Euclidean distance matrix for two vectors is
.. math::
d[i, j] &= \sqrt{(x_i - y_i) @ (x_i - y_i)} \\\\
&= \sqrt{sum (x_i - y_j)^2} \\\\
&= \sqrt{sum (x_i)^2 - 2 x_i y_j + (y_j)^2}
The code below computes the the third line using numpy broadcasting
fanciness to avoid any for loops.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
Collection of `N` input vectors
Y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)`
Collection of `M` input vectors. If None, assume `Y` = `X`. Default is
None.
Returns
-------
dists : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
Pairwise distance matrix. Entry (i, j) contains the `L2` distance between
:math:`x_i` and :math:`y_j`.
"""
# 计算 X 和 Y 之间的 L2 距离矩阵
D = -2 * X @ Y.T + np.sum(Y ** 2, axis=1) + np.sum(X ** 2, axis=1)[:, np.newaxis]
# 将小于 0 的值裁剪为 0(由于数值精度问题可能导致小于 0 的值)
D[D < 0] = 0
# 返回计算得到的 L2 距离矩阵
return np.sqrt(D)
numpy-ml\numpy_ml\utils\misc.py
# 导入 numpy 库
import numpy as np
# 重新定义 logsumexp 函数,计算对数概率的和
def logsumexp(log_probs, axis=None):
# 计算输入数组中的最大值
_max = np.max(log_probs)
# 对数概率减去最大值,避免数值溢出
ds = log_probs - _max
# 计算指数和
exp_sum = np.exp(ds).sum(axis=axis)
# 返回对数概率的和
return _max + np.log(exp_sum)
# 计算对数高斯概率密度函数 log N(x_i | mu, sigma)
def log_gaussian_pdf(x_i, mu, sigma):
# 获取均值向量的长度
n = len(mu)
# 计算常数项 a
a = n * np.log(2 * np.pi)
# 计算矩阵的行列式的自然对数
_, b = np.linalg.slogdet(sigma)
# 计算 y = sigma^(-1) * (x_i - mu)
y = np.linalg.solve(sigma, x_i - mu)
# 计算 c = (x_i - mu)^T * y
c = np.dot(x_i - mu, y)
# 返回对数高斯概率密度函数的值
return -0.5 * (a + b + c)
Utilities
The utilities module implements a number of useful functions and objects that
power other ML algorithms across the repo.
-
data_structures.py
implements a few useful data structures- A max- and min-heap ordered priority queue
- A ball tree with the KNS1 algorithm (Omohundro, 1989; Moore & Gray, 2006)
- A discrete sampler implementing Vose’s algorithm for the alias method (Walker, 1977; Vose, 1991)
-
kernels.py
implements several general-purpose similarity kernels- Linear kernel
- Polynomial kernel
- Radial basis function kernel
-
distance_metrics.py
implements common distance metrics- Euclidean (L2) distance
- Manhattan (L1) distance
- Chebyshev (L-infinity) distance
- Minkowski-p distance
- Hamming distance
-
graphs.py
implements simple data structures and algorithms for graph
processing.- Undirected + directed graph objects allowing for probabilistic edge weights
- Graph generators (Erdos-Renyi, random DAGs)
- Topological sorting for DAGs
- Cycle detection
- Simple path-finding
-
windows.py
implements several common windowing functions- Hann
- Hamming
- Blackman-Harris
- Generalized cosine
-
testing.py
implements helper functions that prove useful when writing unit
tests, including data generators and various assert statements
numpy-ml\numpy_ml\utils\testing.py
# 导入必要的库
import numbers
import numpy as np
# 断言函数
# 检查数组 `X` 是否沿其主对角线对称
def is_symmetric(X):
return np.allclose(X, X.T)
# 检查矩阵 `X` 是否对称且正定
def is_symmetric_positive_definite(X):
if is_symmetric(X):
try:
# 如果矩阵对称,检查 Cholesky 分解是否存在(仅对对称/共轭正定矩阵定义)
np.linalg.cholesky(X)
return True
except np.linalg.LinAlgError:
return False
return False
# 检查 `X` 是否包含沿列和为1的概率
def is_stochastic(X):
msg = "Array should be stochastic along the columns"
assert len(X[X < 0]) == len(X[X > 1]) == 0, msg
assert np.allclose(np.sum(X, axis=1), np.ones(X.shape[0]), msg)
return True
# 检查值 `a` 是否为数值型
def is_number(a):
return isinstance(a, numbers.Number)
# 如果数组 `x` 是二进制数组且只有一个1,则返回True
def is_one_hot(x):
msg = "Matrix should be one-hot binary"
assert np.array_equal(x, x.astype(bool)), msg
assert np.allclose(np.sum(x, axis=1), np.ones(x.shape[0]), msg)
return True
# 如果数组 `x` 仅由二进制值组成,则返回True
def is_binary(x):
msg = "Matrix must be binary"
assert np.array_equal(x, x.astype(bool), msg)
return True
# 创建一个形状为 (`n_examples`, `n_classes`) 的随机独热编码矩阵
def random_one_hot_matrix(n_examples, n_classes):
# 创建一个单位矩阵,行数为类别数,列数为类别数,表示每个类别的独热编码
X = np.eye(n_classes)
# 从类别数中随机选择 n_examples 个类别的独热编码
X = X[np.random.choice(n_classes, n_examples)]
return X
# 创建一个形状为 (`n_examples`, `n_classes`) 的随机随机矩阵
def random_stochastic_matrix(n_examples, n_classes):
# 创建一个形状为 (`n_examples`, `n_classes`) 的随机矩阵
X = np.random.rand(n_examples, n_classes)
# 对每一行进行归一化,使得每一行的和为1
X /= X.sum(axis=1, keepdims=True)
return X
# 创建一个形状为 `shape` 的随机实值张量。如果 `standardize` 为 True,则确保每列的均值为0,标准差为1
def random_tensor(shape, standardize=False):
# 创建一个形状为 `shape` 的随机偏移量
offset = np.random.randint(-300, 300, shape)
# 创建一个形状为 `shape` 的随机矩阵,并加上偏移量
X = np.random.rand(*shape) + offset
if standardize:
# 计算一个很小的数,用于避免除以0
eps = np.finfo(float).eps
# 对每列进行标准化,使得均值为0,标准差为1
X = (X - X.mean(axis=0)) / (X.std(axis=0) + eps)
return X
# 创建一个形状为 `shape` 的随机二值张量。`sparsity` 是一个控制输出张量中0和1比例的值
def random_binary_tensor(shape, sparsity=0.5):
# 创建一个形状为 `shape` 的随机矩阵,大于等于 (1 - sparsity) 的值设为1,小于 (1 - sparsity) 的值设为0
return (np.random.rand(*shape) >= (1 - sparsity)).astype(float)
# 生成一个由 `n_words` 个单词组成的随机段落。如果 `vocab` 不为 None,则从该列表中随机抽取单词;否则,从包含 26 个拉丁单词的集合中均匀抽取单词
# 如果输入的词汇表为空,则使用默认的词汇表
if vocab is None:
vocab = [
"at",
"stet",
"accusam",
"aliquyam",
"clita",
"lorem",
"ipsum",
"dolor",
"dolore",
"dolores",
"sit",
"amet",
"consetetur",
"sadipscing",
"elitr",
"sed",
"diam",
"nonumy",
"eirmod",
"duo",
"ea",
"eos",
"erat",
"est",
"et",
"gubergren",
]
# 返回一个包含 n_words 个随机词汇的列表
return [np.random.choice(vocab) for _ in range(n_words)]
# 自定义警告类,继承自 RuntimeWarning
class DependencyWarning(RuntimeWarning):
pass
numpy-ml\numpy_ml\utils\windows.py
import numpy as np # 导入 NumPy 库
def blackman_harris(window_len, symmetric=False):
"""
The Blackman-Harris window.
Notes
-----
The Blackman-Harris window is an instance of the more general class of
cosine-sum windows where `K=3`. Additional coefficients extend the Hamming
window to further minimize the magnitude of the nearest side-lobe in the
frequency response.
.. math::
\\text{bh}(n) = a_0 - a_1 \cos\left(\\frac{2 \pi n}{N}\\right) +
a_2 \cos\left(\\frac{4 \pi n }{N}\\right) -
a_3 \cos\left(\\frac{6 \pi n}{N}\\right)
where `N` = `window_len` - 1, :math:`a_0` = 0.35875, :math:`a_1` = 0.48829,
:math:`a_2` = 0.14128, and :math:`a_3` = 0.01168.
Parameters
----------
window_len : int
The length of the window in samples. Should be equal to the
`frame_width` if applying to a windowed signal.
symmetric : bool
If False, create a 'periodic' window that can be used in with an FFT /
in spectral analysis. If True, generate a symmetric window that can be
used in, e.g., filter design. Default is False.
Returns
-------
window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
The window
"""
return generalized_cosine( # 调用 generalized_cosine 函数
window_len, [0.35875, 0.48829, 0.14128, 0.01168], symmetric
)
def hamming(window_len, symmetric=False):
"""
The Hamming window.
Notes
-----
The Hamming window is an instance of the more general class of cosine-sum
windows where `K=1` and :math:`a_0 = 0.54`. Coefficients selected to
minimize the magnitude of the nearest side-lobe in the frequency response.
.. math::
\\text{hamming}(n) = 0.54 -
0.46 \cos\left(\\frac{2 \pi n}{\\text{window_len} - 1}\\right)
Parameters
----------
window_len : int
The length of the window in samples. Should be equal to the
`frame_width` if applying to a windowed signal.
symmetric : bool
# 定义一个布尔型参数 symmetric,用于指定是否生成对称窗口
If False, create a 'periodic' window that can be used in with an FFT /
in spectral analysis. If True, generate a symmetric window that can be
used in, e.g., filter design. Default is False.
# 如果 symmetric 为 False,则创建一个周期性窗口,可用于 FFT 或频谱分析;如果为 True,则生成对称窗口,可用于滤波器设计,默认为 False
Returns
-------
window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
# 返回值为一个形状为 (window_len,) 的 ndarray 类型的窗口数组
The window
"""
# 调用 generalized_cosine 函数,传入窗口长度和对应的参数列表 [0.54, 1 - 0.54],以及 symmetric 参数
return generalized_cosine(window_len, [0.54, 1 - 0.54], symmetric)
# 定义汉宁窗口函数
def hann(window_len, symmetric=False):
"""
The Hann window.
Notes
-----
汉宁窗口是余弦和窗口的一个特例,其中 `K=1` 和 :math:`a_0` = 0.5。与 Hamming 窗口不同,汉宁窗口的端点接触到零。
.. math::
\\text{hann}(n) = 0.5 - 0.5 \cos\left(\\frac{2 \pi n}{\\text{window_len} - 1}\\right)
Parameters
----------
window_len : int
窗口的长度(以样本为单位)。如果应用于窗口信号,则应等于 `frame_width`。
symmetric : bool
如果为 False,则创建一个可以在 FFT / 频谱分析中使用的“周期性”窗口。如果为 True,则生成一个可以在滤波器设计等方面使用的对称窗口。默认为 False。
Returns
-------
window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
窗口
"""
return generalized_cosine(window_len, [0.5, 0.5], symmetric)
# 定义广义余弦窗口函数
def generalized_cosine(window_len, coefs, symmetric=False):
"""
The generalized cosine family of window functions.
Notes
-----
广义余弦窗口是余弦项的简单加权和。
对于 :math:`n \in \{0, \ldots, \\text{window_len} \}`:
.. math::
\\text{GCW}(n) = \sum_{k=0}^K (-1)^k a_k \cos\left(\\frac{2 \pi k n}{\\text{window_len}}\\right)
Parameters
----------
window_len : int
窗口的长度(以样本为单位)。如果应用于窗口信号,则应等于 `frame_width`。
coefs: list of floats
:math:`a_k` 系数值
symmetric : bool
如果为 False,则创建一个可以在 FFT / 频谱分析中使用的“周期性”窗口。如果为 True,则生成一个可以在滤波器设计等方面使用的对称窗口。默认为 False.
Returns
-------
"""
window : :py:class:`ndarray <numpy.ndarray>` of shape `(window_len,)`
The window
"""
# 如果不是对称窗口,窗口长度加1
window_len += 1 if not symmetric else 0
# 生成等间距的数组,范围为 -π 到 π,长度为 window_len
entries = np.linspace(-np.pi, np.pi, window_len) # (-1)^k * 2pi*n / window_len
# 根据给定的系数生成窗口
window = np.sum([ak * np.cos(k * entries) for k, ak in enumerate(coefs)], axis=0)
# 如果不是对称窗口,去掉最后一个元素
return window[:-1] if not symmetric else window
# 定义一个 WindowInitializer 类
class WindowInitializer:
# 定义 __call__ 方法,用于初始化窗口函数
def __call__(self, window):
# 如果窗口函数为 hamming,则返回 hamming 函数
if window == "hamming":
return hamming
# 如果窗口函数为 blackman_harris,则返回 blackman_harris 函数
elif window == "blackman_harris":
return blackman_harris
# 如果窗口函数为 hann,则返回 hann 函数
elif window == "hann":
return hann
# 如果窗口函数为 generalized_cosine,则返回 generalized_cosine 函数
elif window == "generalized_cosine":
return generalized_cosine
# 如果窗口函数不在以上几种情况中,则抛出 NotImplementedError 异常
else:
raise NotImplementedError("{}".format(window))
numpy-ml\numpy_ml\utils\__init__.py
# 导入测试模块
from . import testing
# 导入数据结构模块
from . import data_structures
# 导入距离度量模块
from . import distance_metrics
# 导入核函数模块
from . import kernels
# 导入窗口函数模块
from . import windows
# 导入图模块
from . import graphs
# 导入杂项模块
from . import misc
numpy-ml\numpy_ml\__init__.py
# noqa
"""Common ML and ML-adjacent algorithms implemented in NumPy"""
# 导入自定义的工具模块
from . import utils
# 导入数据预处理模块
from . import preprocessing
# 导入高斯混合模型算法模块
from . import gmm
# 导入隐马尔可夫模型算法模块
from . import hmm
# 导入线性判别分析算法模块
from . import lda
# 导入线性模型算法模块
from . import linear_models
# 导入神经网络算法模块
from . import neural_nets
# 导入 n 元语法模块
from . import ngram
# 导入非参数模型算法模块
from . import nonparametric
# 导入强化学习模型算法模块
from . import rl_models
# 导入树模型算法模块
from . import trees
# 导入赌博机算法模块
from . import bandits
# 导入因子分解算法模块
from . import factorization
numpy-ml\setup.py
# 禁用 flake8 检查
from codecs import open
# 导入 setup 和 find_packages 函数
from setuptools import setup, find_packages
# 读取 README.md 文件内容作为长描述
with open("README.md", encoding="utf-8") as f:
LONG_DESCRIPTION = f.read()
# 读取 requirements.txt 文件内容,去除空行后作为依赖列表
with open("requirements.txt") as requirements:
REQUIREMENTS = [r.strip() for r in requirements if r != "\n"]
# 定义项目相关链接
PROJECT_URLS = {
"Bug Tracker": "https://github.com/ddbourgin/numpy-ml/issues",
"Documentation": "https://numpy-ml.readthedocs.io/en/latest/",
"Source": "https://github.com/ddbourgin/numpy-ml",
}
# 设置项目信息
setup(
name="numpy-ml",
version="0.1.2",
author="David Bourgin",
author_email="ddbourgin@gmail.com",
project_urls=PROJECT_URLS,
url="https://github.com/ddbourgin/numpy-ml",
description="Machine learning in NumPy",
long_description=LONG_DESCRIPTION,
long_description_content_type="text/markdown",
install_requires=REQUIREMENTS,
packages=find_packages(),
license="GPLv3+",
include_package_data=True,
python_requires=">=3.5",
extras_require={"rl": ["gym", "matplotlib"]},
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"Topic :: Scientific/Engineering",
"License :: OSI Approved :: GNU General Public License (GPL)",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
],
)