本文使用 TensorFlow 1.15 环境搭建深度神经网络(PINN)求解一维 Poisson 方程:
− Δ u = f in Ω , u = 0 on Γ : = ∂ Ω . \begin{align} -\Delta u &= f \quad & \text{in } \Omega,\\ u & =0 \quad & \text{on } \Gamma:=\partial \Omega. \end{align} −Δuu=f=0in Ω,on Γ:=∂Ω.
其中 Ω = [ X a , X b ] \Omega = [X_a,X_b] Ω=[Xa,Xb] 是一段区间,一维情况 Δ u = u x x \Delta u = u_{xx} Δu=uxx.
完整代码及其注释如下:
import tensorflow as tf
#print(tf.__version__)
import numpy as np
import math
# def is_log2_close_to_int(n, eps=1e-9):
# log_n = math.log2(n)
# return math.isclose(math.fmod(log_n, 1), 0, abs_tol=eps)
# 定义Exact类,用于计算精确解
class Exact:
def __init__(self, xa, xb):
# 初始化类,接受两个参数xa和xb,代表区间的两个端点
self.xa = xa # 区间左端点
self.xb = xb # 区间右端点
def u_exact(self, X):
# 计算并返回精确解u(X)
# 这里使用正弦函数作为精确解,其频率与区间长度(xb-xa)有关
u = np.sin(2*np.pi*X / (self.xb - self.xa))
return u
# 定义Dataset类,用于生成数据
class Dataset:
def __init__(self, x_range, N_res, N_b, xa, xb):
# 初始化类,接受多个参数
self.x_range = x_range # 区间范围,例如[0, 1]
self.N_res = N_res # 内部节点数,用于构建内部网格
self.N_b = N_b # 边界条件点数(通常这里N_b为2,因为有两个边界)
self.xa = xa # 区间左端点
self.xb = xb # 区间右端点
def bc(self, X_b):
# 计算并返回边界条件上的精确解
# 创建一个Exact对象,用于计算精确解
U_bc = Exact(self.xa, self.xb)
u_bc = U_bc.u_exact(X_b) # 计算边界条件X_b上的精确解
return u_bc
def build_data(self):
# 构建并返回数据集,包括内部网格点、边界网格点和区间端点
x0 = self.x_range[0] # 区间左端点
x1 = self.x_range[1] # 区间右端点
# 区间端点(最小值和最大值)
Xmin = np.array([[x0]])
Xmax = np.array([[x1]])
# 构建内部网格点
# 可以选择使用均匀网格或随机网格
## For the equation, you can choose using the uniform mesh
"""
N = self.N_res
X_res_input = np.linspace(x0, x1, N).reshape((-1, 1))
"""
# 这里展示了如何使用随机网格
X_res_input = x0 + (x1-x0)*np.random.rand(self.N_res,1) # 生成N_res个在[x0, x1]区间内的随机点
# 边界网格点(在这个例子中,我们手动指定了边界点)
X_b0_input = np.array([[x0]]) # 左边界点
X_b1_input = np.array([[x1]]) # 右边界点
# 返回构建的数据集
return X_res_input, X_b0_input, X_b1_input, Xmin, Xmax
def calculate_errors(sess, x_res_train, u_pred, x_t, u_e):
"""
计算并打印L2范数和最大模范数的误差。
"""
u_pred_vals = sess.run(u_pred, feed_dict={x_res_train: x_t})
error_l2 = np.linalg.norm(u_pred_vals - u_e, ord=2) / np.linalg.norm(u_e, ord=2)
error_max = np.max(np.abs(u_pred_vals - u_e)) / np.max(np.abs(u_e))
print(f"L2 Error: {error_l2:.8f}")
print(f"Max Error: {error_max:.8f}")
神经网络及其训练过程所需要的函数定义:
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt
class Train:
def __init__(self, train_dict):
"""
初始化Train类。
Args:
train_dict (dict): 用于训练的feed_dict,包含训练数据和其他必要的TensorFlow变量。
"""
self.train_dict = train_dict
self.step = 0 # 初始化训练步数计数器
def callback(self, loss_):
"""
回调函数,用于在LBFGS优化器每次迭代后打印损失。
Args:
loss_ (float): 当前迭代的损失值。
"""
self.step += 1
if math.isclose(math.fmod(math.log2(self.step), 1), 0, abs_tol=1e-9):
print('Loss: %.3e' % (loss_))
def nntrain(self, sess, u_pred, loss, test_dict, u_e, x_t, train_adam, train_lbfgs):
"""
执行神经网络训练。
Args:
sess (tf.Session): TensorFlow会话。
u_pred (tf.Tensor): 预测值的Tensor。
loss (tf.Tensor): 损失函数的Tensor。
test_dict (dict): 用于测试的feed_dict。
u_e (np.array): 精确解的数值数组。
x_t (np.array): 测试点或网格点的x坐标数组。
train_adam (tf.Operation): Adam优化器的TensorFlow操作。
train_lbfgs (LBFGSOptimizer 或类似): 用于精细调整的LBFGS优化器。
Returns:
None
"""
n = 0 # 初始化迭代计数器
nmax = 10000 # 最大迭代次数
loss_c = 1.0e-4 # 收敛条件:当损失小于此值时停止训练
loss_ = 1.0 # 初始化损失值
while n < nmax and loss_ > loss_c:
n += 1
# 使用Adam优化器进行训练
u_, loss_, _ = sess.run([u_pred, loss, train_adam], feed_dict=self.train_dict)
# 每2^n步打印一次损失并绘制结果
if math.isclose(math.fmod(math.log2(n), 1), 0, abs_tol=1e-9):
print('Steps: %d, loss: %.3e' % (n, loss_))
# 在测试集上评估模型
u_test = sess.run(u_pred, feed_dict=test_dict)
# 绘制精确解和预测解的对比图
plt.cla() # 清除之前的图表
plt.plot(x_t, u_e, 'bo', markersize=0.4, label='Exact solution')
plt.plot(x_t, u_test, 'rv', markersize=0.4, label='PINN solution')
plt.legend()
plt.show()
plt.pause(0.1) # 暂停一段时间以便观察图表
# 使用LBFGS优化器进行精细调整
train_lbfgs.minimize(sess, feed_dict=self.train_dict, fetches=[loss], loss_callback=self.callback)
import tensorflow as tf
import numpy as np
class DNN:
"""
深度神经网络类,用于构建和训练神经网络。
"""
def __init__(self, layer_size, Xmin, Xmax):
"""
初始化DNN类。
Args:
layer_size (list): 网络各层的神经元数量。
Xmin (numpy.ndarray): 输入数据的最小值。
Xmax (numpy.ndarray): 输入数据的最大值。
"""
self.size = layer_size
self.Xmin = Xmin
self.Xmax = Xmax
def hyper_initial(self):
"""
初始化网络的权重和偏置。
Returns:
tuple: 包含权重和偏置的列表。
"""
L = len(self.size)
Weights = []
Biases = []
for l in range(1, L):
in_dim = self.size[l-1]
out_dim = self.size[l]
std = np.sqrt(2/(in_dim + out_dim))
weight = tf.Variable(tf.random_normal(shape=[in_dim, out_dim], stddev=std))
bias = tf.Variable(tf.zeros(shape=[1, out_dim]))
Weights.append(weight)
Biases.append(bias)
return Weights, Biases
def fnn(self, X, W, b):
"""
前馈神经网络的前向传播。
Args:
X (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 网络的输出。
"""
A = 2.0*(X - self.Xmin)/(self.Xmax - self.Xmin) - 1.0 # 归一化和缩放输入
L = len(W)
for i in range(L-1):
A = tf.tanh(tf.add(tf.matmul(A, W[i]), b[i])) # 应用激活函数和线性变换
u = tf.add(tf.matmul(A, W[-1]), b[-1]) # 输出层
return u
def pdenn(self, x, W, b):
"""
计算物理驱动的神经网络残差(无边界条件)。
Args:
x (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 残差f。
"""
u = self.fnn(x, W, b)
u_x = tf.gradients(u, x)[0]
u_xx = tf.gradients(u_x, x)[0]
rhf = np.pi**2 * tf.sin(np.pi*x) # 右侧手边项
f = -u_xx - rhf # 计算残差
return f
def fnn_BC(self, X, W, b):
"""
应用边界条件的前馈神经网络。
Args:
X (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 应用边界条件后的输出。
"""
Xmax = self.Xmax
Xmin = self.Xmin
u = self.fnn(X, W, b)
ua = self.fnn(tf.cast(Xmin, tf.float32), W, b)
ub = self.fnn(tf.cast(Xmax, tf.float32), W, b)
K = tf.subtract(ub, ua)/(Xmax[0,0] - Xmin[0,0])
c = tf.subtract(ua, Xmin*K)
u = tf.subtract(u, tf.add(tf.matmul(X, K), c))
return u
def pdenn_BC(self, x, W, b):
"""
计算物理驱动的神经网络残差(带边界条件)。
Args:
x (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 残差f。
"""
u = self.fnn_BC(x, W, b)
u_x = tf.gradients(u, x)[0]
u_xx = tf.gradients(u_x, x)[0]
rhf = np.pi**2 * tf.sin(np.pi*x)
f = -u_xx - rhf
return f
def fnn_BC2(self, X, W, b):
"""
另一种应用边界条件的方法(示例)。
Args:
X (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 应用边界条件后的输出。
"""
Xmax = self.Xmax
Xmin = self.Xmin
u = self.fnn(X, W, b)
u = (X-Xmax)*(X-Xmin) * u
return u
def pdenn_BC2(self, x, W, b):
"""
使用另一种边界条件计算物理驱动的神经网络残差。
Args:
x (tf.Tensor): 输入数据。
W (list): 权重列表。
b (list): 偏置列表。
Returns:
tf.Tensor: 残差f。
"""
u = self.fnn_BC2(x, W, b)
u_x = tf.gradients(u, x)[0]
u_xx = tf.gradients(u_x, x)[0]
rhf = np.pi**2 * tf.sin(np.pi*x)
f = -u_xx - rhf
return f
画图:
# 导入必要的库
import tensorflow as tf # TensorFlow库,用于构建和训练神经网络
import numpy as np # NumPy库,用于处理数值数据
import matplotlib.pyplot as plt # Matplotlib库,用于绘图
import os # os库,用于与操作系统交互,如文件路径操作
# 设置保存结果的路径
savepath='./Output'
if not os.path.exists(savepath):
os.makedirs(savepath) # 如果路径不存在,则创建该路径
# 定义一个类SavePlot,用于保存预测结果并绘制图形
class SavePlot:
def __init__(self, sess, x_range, N, xa, xb):
# 初始化函数,设置类的属性
self.x_range = x_range # 预测时x的范围
self.N = N # 预测时x的样本数
self.sess = sess # TensorFlow会话,用于执行TensorFlow操作
self.xa = xa # 精确解计算时可能需要的参数a
self.xb = xb # 精确解计算时可能需要的参数b
def saveplt(self, u_pred, x_res_train):
# 在给定范围内生成均匀的x点
x_t = np.linspace(self.x_range[0], self.x_range[1], self.N).reshape((-1, 1))
# 构建feed_dict,用于在TensorFlow会话中执行u_pred
test_dict = {x_res_train: x_t}
# 使用TensorFlow会话执行u_pred,得到预测结果
u_test = self.sess.run(u_pred, feed_dict=test_dict)
# 将预测结果保存到文件
np.savetxt('./Output/u_pred', u_test, fmt='%e')
# 计算并保存精确解
Exact_sln = Exact(self.xa, self.xb)
u_e = Exact_sln.u_exact(x_t)
np.savetxt('./Output/u_e', u_e, fmt='%e')
# 计算并打印预测误差
err_ = np.linalg.norm(u_test - u_e)/np.linalg.norm(u_e)
print(err_)
# 绘制精确解和预测解的对比图
plt.plot(x_t, u_e, 'bo', markersize=0.4, label='Exact solution')
plt.plot(x_t, u_test, 'rv', markersize=0.4, label='PINN solution')
plt.legend() # 显示图例
plt.show() # 显示图形
# 注意:代码中注释掉的plt.close()和.close()通常不是必要的,除非在循环中多次调用plt.plot或打开文件需要关闭。
# plt.close()用于关闭当前图形窗口,但在这里调用plt.show()后通常不需要。
下面是主函数:
# 导入必要的库
import os
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt
import scipy.io
# 设置随机数种子以确保结果的可重复性
np.random.seed(1234)
tf.set_random_seed(1234) # 注意:在TensorFlow 2.x中,应使用tf.random.set_seed
def main():
# 定义问题域和分辨率等参数
x_range = [-1.0, 1.0] # 定义x的范围
N_res = 50 # 残差点数量
N_bx = 2 # 边界点数量
xa, xb = x_range # 边界值
# 创建数据集对象
data = Dataset(x_range, N_res, N_bx, xa, xb)
# 构建数据
X_res, X_b0, X_b1, Xmin, Xmax = data.build_data()
# 定义神经网络结构
layers = [1] + 5*[40] + [1] # 神经网络层数和每层的神经元数
# 创建占位符
x_res_train = tf.placeholder(shape=[None, 1], dtype=tf.float32)
x_b0_train = tf.placeholder(shape=[None, 1], dtype=tf.float32)
x_b1_train = tf.placeholder(shape=[None, 1], dtype=tf.float32)
# 创建PINN对象
pinn = DNN(layers, Xmin, Xmax)
W, b = pinn.hyper_initial() # 初始化权重和偏置
# 定义网络输出和物理方程残差
u_pred = pinn.fnn_BC2(x_res_train, W, b) # 使用特定边界条件的网络输出
f_pred = pinn.pdenn_BC2(x_res_train, W, b) # 残差,即物理方程的不满足度
# 定义边界条件输出
u_b0_pred = pinn.fnn(x_b0_train, W, b)
u_b1_pred = pinn.fnn(x_b1_train, W, b)
# 定义损失函数(这里只考虑了残差的平方和)
loss = tf.reduce_mean(tf.square(f_pred))
# 定义优化器
train_adam = tf.train.AdamOptimizer(0.0008).minimize(loss)
train_lbfgs = tf.contrib.opt.ScipyOptimizerInterface(loss,
method = "L-BFGS-B",
options = {'maxiter': 80000,
'ftol': 1.0*np.finfo(float).eps
}
) # 注意:tf.contrib在TensorFlow 2.x中已被移除
# TensorFlow 1.x 会话管理
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# 准备训练和测试数据字典
train_dict = {x_res_train: X_res, x_b0_train: X_b0,
x_b1_train: X_b1}
x_t = np.linspace(xa, xb, 101).reshape((-1, 1))
test_dict = {x_res_train: x_t}
Exact_sln = Exact(xa, xb) # 真实解的计算对象
u_e = Exact_sln.u_exact(x_t) # 真实解
# 训练模型
Model = Train(train_dict)
start_time = time.perf_counter()
Model.nntrain(sess, u_pred, loss, test_dict, u_e, x_t, train_adam, train_lbfgs)
# 打印训练时间
stop_time = time.perf_counter()
print('Duration time is %.3f seconds'%(stop_time - start_time))
# 在模型训练结束后,计算并打印误差
calculate_errors(sess, x_res_train, u_pred, x_t, u_e)
#Save the data
N_test = 101
datasave = SavePlot(sess, x_range, N_test, xa, xb)
datasave.saveplt(u_pred, x_res_train)
if __name__ == '__main__':
main()
我是在 Jupyter 文件上运行的,在其他集成开发环境中也可以类似运行。
运行结果:
效果不错!
下期预告:
- Python 机器学习求解 PDE 学习项目——PINN 求解二维 Poisson 方程
本专栏目标从简单的一维 Poisson 方程,到对流扩散方程,Burges 方程,到二维,三维以及非线性方程,发展方程,积分方程等等,所有文章包含全部可运行代码。请持续关注!