引言
欢迎来到这个从头开始构建深度学习库系列的第5部分。这篇文章将介绍库中自动微分部分的代码。自动微分在上一篇文章中已经讨论过了,所以如果你不知道自动微分是什么,请查看一下。
自动微分系统的核心是计算图,这是一种有向图,其中每个节点代表一个数学运算,而边则代表运算之间的依赖关系。在我们的库中,计算图是动态构建的,即每当进行数学运算时,相应的节点和边就会实时地添加到图中。这种方法的优势在于可以灵活地处理各种计算场景,并且由于图的结构实时生成,可以有效地适应程序中的变化。
构建完计算图后,接下来的任务是使用此图来计算图中所有变量的导数。这一过程通常涉及到前向传播和反向传播两个阶段。在前向传播阶段,计算从输入节点开始,按照计算图的方向逐步进行,直到输出节点。此过程中,每个节点都会计算其输出值,为下一步的导数计算做准备。
反向传播阶段是自动微分的关键,它从输出节点开始,逆向遍历计算图,利用链式法则逐个节点地计算每个变量的导数。具体来说,对于每一个节点,都将根据其输出对其他节点的影响(即偏导数)来计算输入的梯度,并将这些梯度传递回它的前驱节点。通过这种方式,可以有效地计算出每个变量的导数,而不需要显式地求解复杂的导数表达式。
例如,假设我们构建了以下的图,其中包括基本的算术运算如加法、乘法等,我们将展示如何通过这种计算图模型来计算涉及这些运算的函数的导数。
这代表了以下数学表达式:
c
=
a
+
b
c = a + b
c=a+b
e
=
c
×
d
e = c \times d
e=c×d
现在,使用图表,我们的目标是找到变量 e e e 关于图中所有变量 a , b , c , d , e a, b, c, d, e a,b,c,d,e 的导数。
对于实现,我发现以深度优先的方式遍历图来计算导数最为简便。
首先,我们从 e e e 开始,计算 d e d e \frac{de}{de} dede 相对于 e e e(就是1)。然后,我们查看节点 c c c,这意味着我们现在需要计算 d e d c \frac{de}{dc} dcde。我们可以看到 e e e 是 c c c 和 d d d 的乘积结果,这意味着 d e d c = d \frac{de}{dc} = d dcde=d(因为我们将除了我们所在的变量以外的所有变量视为常数)。
记住我们是在深度优先地遍历,下一个节点我们移动到的是节点 a a a,这意味着我们计算 d e d a \frac{de}{da} dade。这有点复杂,因为 a a a 没有直接与 e e e 相连。然而,使用链式法则,我们知道 d e d a = d e d c ⋅ d c d a \frac{de}{da} = \frac{de}{dc} \cdot \frac{dc}{da} dade=dcde⋅dadc。我们刚刚计算出 d e d c \frac{de}{dc} dcde,所以我们现在需要计算的是 d c d a \frac{dc}{da} dadc。我们可以看到 c c c 是 a a a 和 b b b 的和,所以 d c d a = 1 \frac{dc}{da} = 1 dadc=1。
希望您现在可以看出我们将如何使用图来找到该图中所有变量的导数。
张量类
首先,我们需要创建张量类,它将充当图上的可变节点。
import numpy as np
import string
import random
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
np.seterr(invalid='ignore')
def is_matrix(o):
return type(o) == np.ndarray
def same_shape(s1, s2):
for a, b in zip(s1, s2):
if a != b:
return False
return True
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
使用随机字符生成唯一id的函数
def is_matrix(o):
return type(o) == np.ndarray
检查一个值是否为numpy数组的简单函数。
self.value = value
self.dependencies = []
如果张量是任何操作的结果,例如加法或除法,这个属性将保存操作中涉及的张量列表,从而产生这个张量(计算图就是这样构建的)。如果张量不是任何运算的结果,那么这个张量是空的。
self.grads = []
这个属性将保存每个张量对张量的依赖关系的导数列表。
self.shape = 0
...
if is_matrix(value):
self.shape = value.shape
self.shape保持张量值的形状。只有 numpy 数组可以有一个形状,这就是为什么它默认为0。
self.matmul_product = False
指定张量是否是矩阵乘法的结果(由于链式规则在矩阵乘法中的作用不同,这对以后的计算会有帮助)。
self.gradient = np.ones_like(self.value)
在我们使用计算图来计算梯度之后,这个属性将保存为张量计算的梯度。它最初被设置为1的矩阵,其形状与其值相同。
self.trainable = trainable
图上的一些节点不需要计算它们的导数,所以这个性质指定了这个张量是否存在这种情况。
self.id = id_generator()
张量需要具有某种独特的标识。当我们在后续的文章中重新实现优化器以使用这个自动微分模块时,这一点将发挥作用。
张量运算
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
def depends_on(self, target):
if self == target:
return True
dependencies = self.dependencies
for dependency in dependencies:
if dependency == target:
return True
elif dependency.depends_on(target):
return True
return False
def __mul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __rmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __add__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __radd__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __sub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other)
var = Tensor(self.value - other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(-np.ones_like(other.value))
return var
def __rsub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value - self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(np.ones_like(other.value))
var.grads.append(-np.one_like(self.value))
return var
def __pow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value ** other.value)
var.dependencies.append(self)
var.dependencies.append(other)
grad_wrt_self = other.value * self.value ** (other.value - 1)
var.grads.append(grad_wrt_self)
grad_wrt_other = (self.value ** other.value) * np.log(self.value)
var.grads.append(grad_wrt_other)
return var
def __rpow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value ** self.value)
var.dependencies.append(other)
var.dependencies.append(self)
grad_wrt_other = self.value * other.value ** (self.value - 1)
var.grads.append(grad_wrt_other)
grad_wrt_self = (other.value ** self.value) * np.log(other.value)
var.grads.append(grad_wrt_self)
return var
def __truediv__(self, other):
return self * (other ** -1)
def __rtruediv__(self, other):
return other * (self ** -1)
def __matmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value @ other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value.T)
var.grads.append(self.value.T)
var.matmul_product = True
return var
def __rmatmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value @ self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(self.value.T)
var.grads.append(other.value.T)
var.matmul_product = True
return var
这段代码定义了一个名为 Tensor 的类,它是用于自动微分库的核心组件,主要用于深度学习和科学计算中的数值计算。这个类的目的是实现一个可以自动计算导数的张量对象,适用于各种算术运算和矩阵操作。下面是该类中各个部分的具体解释:
初始化 (init 方法):
value:张量的数值,可以是标量、向量、矩阵等。
trainable:标记这个张量是否需要计算梯度,用于训练过程中参数的更新。
dependencies:一个列表,存储当前张量依赖的其他张量。
grads:存储与依赖张量相对应的梯度值。
grad_value:当前张量的梯度值。
shape:张量的形状。
matmul_product:标记是否进行了矩阵乘法操作。
id:通过调用 id_generator 函数生成的唯一标识,用于优化器等功能。
依赖检查 (depends_on 方法):
检查当前张量是否直接或间接依赖于目标张量。这通过递归检查依赖关系来实现。
算术运算方法:
mul 和 rmul:重载乘法运算符,支持张量与张量或张量与标量的乘法,记录依赖和梯度。
add 和 radd:重载加法运算符,同样支持张量与张量或张量与标量的加法。
sub 和 rsub:重载减法运算符。
pow 和 rpow:重载幂运算符,用于张量的幂运算。
truediv 和 rtruediv:通过乘法和幂运算重载除法运算符。
matmul 和 rmatmul:重载矩阵乘法运算符,特别适用于神经网络中的前向传播。
这些方法不仅实现了基本的数值运算,还在运算过程中建立了一个计算图,通过 dependencies 和 grads 记录了每次运算的依赖关系和梯度。这种设计使得在反向传播过程中可以自动计算梯度,极大地简化了深度学习模型的训练过程。
使用图计算梯度
def get_gradients(self, grad = None):
grad = np.ones_like(self.value) if grad is None else grad
grad = np.float32(grad)
for dependency, _grad in zip(self.dependencies, self.grads):
if dependency.trainable:
local_grad = np.float32(_grad)
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad = local_grad * np.nan_to_num(grad)
dependency.gradient += local_grad
dependency.get_gradients(local_grad)
这段代码定义了一个名为 get_gradients 的方法,它是 Tensor 类的一部分,用于在自动微分框架中执行反向传播算法。该方法的目的是递归地计算每个张量以及其依赖张量的梯度。具体功能和步骤如下:
初始化梯度:
参数 grad 默认为 None,表示从最后一个操作(通常是损失函数)开始反向传播。如果 grad 为 None,则使用与 self.value 相同形状的全1数组初始化梯度。否则,使用传入的 grad 值。
将 grad 转换为 np.float32 类型,确保计算的数值稳定性和精度。
遍历依赖关系:
方法遍历当前张量的每一个依赖 (dependency) 及其对应的局部梯度 (_grad)。
只有当依赖张量 (dependency) 是可训练的(trainable 属性为 True),梯度才会被计算和更新。
梯度计算:
局部梯度 (_grad) 也被转换为 np.float32 类型。
如果当前操作是矩阵乘法(self.matmul_product 为 True),则根据矩阵链规则调整梯度计算。具体地,如果依赖是第一个操作数,则进行右乘;如果是第二个操作数,则进行左乘。
对于非矩阵乘法操作,首先检查梯度和局部梯度的形状是否相同。如果形状不同,则可能需要进行广播操作以匹配形状。
这通过添加维度和沿特定轴求和来实现,确保梯度的形状与依赖张量的形状一致。
局部梯度与全局梯度(grad)相乘,使用 np.nan_to_num 函数处理可能的数值不稳定性。
递归调用:
更新依赖张量的梯度 (dependency.gradient)。
递归调用 get_gradients 方法,将计算好的局部梯度作为新的梯度传递下去,继续反向传播过程。
总的来说,我们的 自动微分代码应该像这样:
import numpy as np
import string
import random
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
np.seterr(invalid='ignore')
def is_matrix(o):
return type(o) == np.ndarray
def same_shape(s1, s2):
for a, b in zip(s1, s2):
if a != b:
return False
return True
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
def depends_on(self, target):
if self == target:
return True
dependencies = self.dependencies
for dependency in dependencies:
if dependency == target:
return True
elif dependency.depends_on(target):
return True
return False
def __mul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __rmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __add__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __radd__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __sub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other)
var = Tensor(self.value - other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(-np.ones_like(other.value))
return var
def __rsub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value - self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(np.ones_like(other.value))
var.grads.append(-np.one_like(self.value))
return var
def __pow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value ** other.value)
var.dependencies.append(self)
var.dependencies.append(other)
grad_wrt_self = other.value * self.value ** (other.value - 1)
var.grads.append(grad_wrt_self)
grad_wrt_other = (self.value ** other.value) * np.log(self.value)
var.grads.append(grad_wrt_other)
return var
def __rpow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value ** self.value)
var.dependencies.append(other)
var.dependencies.append(self)
grad_wrt_other = self.value * other.value ** (self.value - 1)
var.grads.append(grad_wrt_other)
grad_wrt_self = (other.value ** self.value) * np.log(other.value)
var.grads.append(grad_wrt_self)
return var
def __truediv__(self, other):
return self * (other ** -1)
def __rtruediv__(self, other):
return other * (self ** -1)
def __matmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value @ other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value.T)
var.grads.append(self.value.T)
var.matmul_product = True
return var
def __rmatmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value @ self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(self.value.T)
var.grads.append(other.value.T)
var.matmul_product = True
return var
def grad(self, target, grad = None):
grad = self.value / self.value if grad is None else grad
grad = np.float32(grad)
if not self.depends_on(target):
return 0
if self == target:
return grad
final_grad = 0
for dependency, _grad in zip(self.dependencies, self.grads):
local_grad = np.float32(_grad) if dependency.depends_on(target) else 0
if local_grad is not 0:
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(local_grad.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad *= grad
final_grad += dependency.grad(target, local_grad)
return final_grad
def get_gradients(self, grad = None):
grad = np.ones_like(self.value) if grad is None else grad
grad = np.float32(grad)
for dependency, _grad in zip(self.dependencies, self.grads):
if dependency.trainable:
local_grad = np.float32(_grad)
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad = local_grad * np.nan_to_num(grad)
dependency.gradient += local_grad
dependency.get_gradients(local_grad)
def __repr__(self):
return f"Tensor ({self.value})"
应用示例如下:
a = Tensor(10)
b = Tensor(5)
c = 2
d = (a*b)**c
d.get_gradients()
print (a.gradient, b.gradient)
OUTPUT:
500.0 1000.0