Python 实现深度学习模型预测控制--预测模型构建

链接:深度学习模型预测控制  (如果认为有用,动动小手为我点亮github小星星哦),持续更新中……

链接:WangXiaoMingo/TensorDL-MPC: DL-MPC(deep learning model predictive control) is a software toolkit developed based on the Python and TensorFlow frameworks, designed to enhance the performance of traditional Model Predictive Control (MPC) through deep learning technology. This toolkit provides core functionalities such as model training, simulation, parameter optimization. (github.com)icon-default.png?t=O83Ahttps://github.com/WangXiaoMingo/TensorDL-MPC此文档为如何使用深度学习模型预测控制系列文档。

使用 TensorDL-MPC 进行 MPC 控制,包括初始化系统、训练模型、执行 MPC 控制、模拟系统动力学等。

1. 构建模型

方法1:可以直接加载模型进行网络训练。

方法2:同样允许我们自己构建特定模型。使用方法与使用TensorFlow的Keras API构建方式类似,包括Sequential模型、函数式API(Functional API)、以及子类化(Subclassing)方法。这些方法都可以完成从模型构建到训练的整个过程。下面将分别展示这些方式的具体实现。

1.1  使用Sequential模型

import tensorflow as tf
from tensorflow.keras import layers, models

# 使用Sequential API 构建模型
model = models.Sequential([
    layers.Dense(128, activation='relu', input_shape=(784,)),  # 输入层
    layers.Dense(64, activation='relu'),  # 隐藏层
    layers.Dense(10, activation='softmax')  # 输出层
])

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

1.2  函数式API(Functional API)

函数式API允许创建更复杂的模型结构,例如多输入、多输出或共享层的模型。

from tensorflow.keras import Input, Model

# 使用Functional API 构建模型
inputs = Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

也可以自定义类实现模型封装,如:

# dlmpc/models/BPMM.py

import tensorflow as tf
from tensorflow import keras
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from layers.normlization import MinMaxNormalization
import numpy as np
import random
import os


# 固定随机数种子
random_seed = 42
random.seed(random_seed)  # set random seed for python
np.random.seed(random_seed)  # set random seed for numpy
tf.random.set_seed(random_seed)  # set random seed for tensorflow-cpu
os.environ['TF_DETERMINISTIC_OPS'] = '1' # set random seed for tensorflow-gpu
# warnings.filterwarnings("ignore")



class MultiBPNet():
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                     use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        super().__init__()
        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x, max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u, max_val=self.max_val_u)

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None
    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def build(self, units=32, time_step = 3 ,u_steps = 1, data_type='1D'):
        # 构建模型,2个输入,分别x和u
        input_x = keras.Input(shape=(time_step,self.dim_x))
        input_u = keras.Input(shape=(u_steps,self.dim_u))

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(units,norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = keras.layers.Dense(units, activation='linear')(inputs)
            inputs = x
        out = keras.layers.Dense(self.dim_y, activation='linear')(x)
        out = out[:, -1]
        model = keras.models.Model(inputs=[input_x, input_u], outputs=out, name='BP')
        return model
    def _concatenate_inputs(self, units,norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]  # 0.0002  0.0133  0.0105  0.9896  0.9915  0.0272
            return tf.concat([tf.reshape(norm_x,[-1,1,_dim_1x * self.dim_x]), tf.reshape(norm_u,[-1,1,_dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = keras.layers.Dense(units, activation='linear')(norm_x)
            u = keras.layers.Dense(units, activation='linear')(norm_u)
            # return tf.concat([x, u], axis=1)          #  0.0034  0.0581  0.0464  0.8209  0.8454  0.1254; 0.0027  0.0521  0.0407  0.7753  0.7776  0.1065
            # return x @ tf.transpose(u,[0,2,1])        # 0.0003  0.0161  0.0127  0.9833  0.9858  0.0328, 0.0001  0.0118  0.0097  0.9916  0.9917  0.0262
            # return u @ tf.transpose(x, [0, 2, 1])    # 0.0004  0.0197  0.0152  0.9732  0.9768  0.0382; 0.0002  0.0126  0.0103  0.9904  0.9905  0.028
            return tf.reshape(tf.concat([x, u], axis=1),[-1,1,(_dim_1x+_dim_1u)*units]) # 0.0001  0.0117  0.0093  0.9919  0.9919  0.0248; 0.0002  0.0145  0.0118  0.9877  0.9904  0.031


        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")


if __name__ == '__main__':
    min_val_x = tf.constant([0.,0,0])  # 手动指定的最小值
    max_val_x = tf.constant([10,10,10.])  # 手动指定的最大值
    min_val_u = tf.constant([0.,0])  # 手动指定的最小值
    max_val_u = tf.constant([10.,10.])  # 手动指定的最大值
    min_val = [min_val_x,min_val_u]
    max_val = [max_val_x, max_val_u]
    model = MultiBPNet(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val,
                       use_mask=True).build(units=32, data_type='1D')
    model.compile(optimizer='adam', loss='mse')
    model.summary()
    print(model.name)

1.3 使用子类化(Subclassing)方法

子类化方法允许对模型的结构和前向传播过程进行最大程度的控制。

class MyModel(tf.keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()
        self.dense1 = layers.Dense(128, activation='relu')
        self.dense2 = layers.Dense(64, activation='relu')
        self.dense3 = layers.Dense(10, activation='softmax')

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return self.dense3(x)

model = MyModel()

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

对于更复杂的功能,可以如下:

import tensorflow as tf
from tensorflow import keras

class MinMaxNormalization(keras.layers.Layer):
    def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)
        self.feature_range = feature_range
        self.min_val = min_val
        self.max_val = max_val

    def call(self, inputs):
        if self.min_val is not None and self.max_val is not None:
            scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
            return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
        return inputs


class BPNeuralNetwork(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)

        self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
        self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')

        # if data_type == '2D':
        self.dense_x = keras.layers.Dense(32, activation='linear')
        self.dense_u = keras.layers.Dense(32, activation='linear')

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = self.dense_x(norm_x)
            u = self.dense_u(norm_u)
            # return x @ tf.transpose(u,[0,2,1])   
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs, data_type='1D'):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = self.dense_layers[j](inputs)
            inputs = x
        out = self.output_layer(x)
        out = out[:, -1]
        return out

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]

    model = BPNeuralNetwork(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)

    # model.compile(optimizer='adam', loss='mse')
    # model.build([(None, 3, 3), (None, 1, 1)])
    model.summary()

这三种方法各有优缺点:

  • Sequential API: 最简单,但只适用于线性堆叠的模型。
  • Functional API: 灵活性好,可以处理复杂模型结构,同时保持了简洁性。
  • Model Subclassing: 最灵活,但相对复杂,适合需要自定义层或复杂逻辑的情况。

当然,我们也可以利用三种方式的组合实现更复杂的模型构建

1.4 使用函数式API(Functional API)构建更复杂模型

方法1: Model Subclassing+Functional API

# dlmpc/models/BPNN1.py 

import tensorflow as tf
from tensorflow import keras

class MinMaxNormalization(keras.layers.Layer):
    def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)
        self.feature_range = feature_range
        self.min_val = min_val
        self.max_val = max_val

    def call(self, inputs):
        if self.min_val is not None and self.max_val is not None:
            scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
            return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
        return inputs


class BPNeuralNetwork1(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork1, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)

        self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
        self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')

        # if data_type == '2D':
        self.dense_x = keras.layers.Dense(32, activation='linear')
        self.dense_u = keras.layers.Dense(32, activation='linear')

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = self.dense_x(norm_x)
            u = self.dense_u(norm_u)
            # return x @ tf.transpose(u,[0,2,1])   
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs, data_type='1D'):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = self.dense_layers[j](inputs)
            inputs = x
        out = self.output_layer(x)
        out = out[:, -1]
        return out

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]    

    input_x = keras.layers.Input(shape=(3, 3))
    input_u = keras.layers.Input(shape=(1, 2))
    model = BPNeuralNetwork1(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
    output = model([input_x, input_u], data_type='1D')
    model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP')

方法2: Model Subclassing+Functional API

# dlmpc/models/BPNN.py

import tensorflow as tf
from tensorflow import keras


class BPNeuralNetwork(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)
    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u):
        if self.data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif self.data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = keras.layers.Dense(self.units)(norm_x)
            u = keras.layers.Dense(self.units)(norm_u)
            # return tf.concat([x, u], axis=1)          #  0.0034  0.0581  0.0464  0.8209  0.8454  0.1254; 0.0027  0.0521  0.0407  0.7753  0.7776  0.1065
            # return x @ tf.transpose(u,[0,2,1])        # 0.0003  0.0161  0.0127  0.9833  0.9858  0.0328, 0.0001  0.0118  0.0097  0.9916  0.9917  0.0262
            # return u @ tf.transpose(x, [0, 2, 1])    # 0.0004  0.0197  0.0152  0.9732  0.9768  0.0382; 0.0002  0.0126  0.0103  0.9904  0.9905  0.028
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * self.units])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u)

        for j in range(self.nblocks):
            x = keras.layers.Dense(self.units, activation='relu')(inputs)
            inputs = x
        out = keras.layers.Dense(self.dim_y, activation='linear')(x)
        out = out[:, -1]
        return out

    def build(self, units=32, time_step=3, u_steps=1, data_type='1D'):
        self.units = units
        self.data_type = data_type
        input_x = keras.Input(shape=(time_step, self.dim_x))
        input_u = keras.Input(shape=(u_steps, self.dim_u))
        model = keras.models.Model(inputs=[input_x, input_u], outputs=self.call([input_x, input_u]), name='BP')
        return model

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]

    model = BPNeuralNetwork(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
    model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')
    model.summary()
    print(model.name)

2. 模型训练


if __name__ == '__main__':
    from src.dlmpc import Dataset
    from src.dlmpc import WindowGeneratorMIMO
    from src.dlmpc import DataLoader
    from src.dlmpc import MultiBPNet,BPNeuralNetwork1, BPNeuralNetwork
    from src.dlmpc import TrainModel
    from src.dlmpc import MinMaxNormalization
    from src.dlmpc import Calculate_Regression_metrics
    from src.dlmpc import plot_line
    from src.dlmpc import loadpack
    import seaborn as sns
    from tensorflow import keras
    import tensorflow as tf
    from tensorflow.keras.models import load_model
    import matplotlib.pyplot as plt
    import numpy as np

    train = False

    # TODO: 1. load model and generate data, (u,y)
    plant = Dataset(plant_name='MISO',noise_amplitude=0.01,sine_wave=True)
    data = plant.preprocess(num=1000)
    print(np.array(data[['u1','u2']]))

    # TODO: 2. generate Window data
    data = {
        'u': np.array(data[['u1','u2']]),
        'y': np.array(data[['y1']])
    }
    print(data['u'])

    window_generator = WindowGeneratorMIMO(data, input_dy=[3], input_du=[3, 3], input_shift_y=[1], input_shift_u=[2, 2],
                                 output_predict_steps=1, u_steps=1)
    x_sequences, u_sequences, y_sequences = window_generator.generate_sequences()
    print(np.shape(x_sequences))

    # TODO 3. generate data for train, valid, test
    loader = DataLoader((x_sequences, u_sequences, y_sequences))
    split_seed = [0.8, 0.1, 0.1]
    (train_data, valid_data, test_data) =  loader.load_data(split_seed)
    # print(train_data['train_x_sequences'].shape)

    '''
    train_x_sequences, train_u_sequences, train_y_sequences
    valid_x_sequences, valid_u_sequences, valid_y_sequences
    test_x_sequences, test_u_sequences, test_y_sequences
    '''

    # TODO: 4. train model and save model
    dim_x = np.shape(x_sequences)[2]      # state variable number
    dim_u = np.shape(u_sequences)[2]      # input variable number
    dim_y = np.shape(y_sequences)[2]

    # 找到 x_sequences 每个变量的最小值和最大值
    x_min = np.amin(x_sequences, axis=(0, 1))
    x_max = np.amax(x_sequences, axis=(0, 1))

    # 找到 u_sequences 每个变量的最小值和最大值
    u_min = np.amin(u_sequences, axis=(0, 1))
    u_max = np.amax(u_sequences, axis=(0, 1))

    min_val = [tf.constant(x_min,dtype=tf.float32), tf.constant(u_min,dtype=tf.float32)]
    max_val = [tf.constant(x_max,dtype=tf.float32), tf.constant(u_max,dtype=tf.float32)]

    # 方法1:BPMM
    my_model = MultiBPNet(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # my_model = MultiBPNet(hidden_blocks=1, dim_u=dim_u, dim_x=dim_x, dim_y=dim_y,use_mask=True)
    model = my_model.build(units=32,time_step = 3 ,u_steps = 1, data_type='1D')

    # 方法2:BPNN1
    # input_x = keras.layers.Input(shape=(3, 3))
    # input_u = keras.layers.Input(shape=(1, 2))
    # model = BPNeuralNetwork1(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # output = model([input_x, input_u], data_type='1D')
    # model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP1')

    # # 方法3:BPNN1
    # model = BPNeuralNetwork(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')


    '''data_type can select: 1D, 2D, 2DT'''

    model_name = model.name
    print(model_name)

    # train model and load best model
    if train:
        TrainModel(model,lr = 0.01,epoch=200).train_model(train_data,valid_data,show_loss=True)
        model.summary()
        # 方法2注释以下两行
        model = load_model(f'models_save/{model_name}.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})
        model.save(f'models_save/{model_name}_predictor.h5')

    else:
        model = load_model(f'models_save/{model_name}_predictor.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})

    model.summary()
    # TODO: predict and plot
    keras.utils.plot_model(model, to_file=f'model_fig/{model_name}.png', show_shapes=True,
                           show_layer_names=True)
    y_pred_train = model.predict([train_data['train_x_sequences'], train_data['train_u_sequences']])
    y_pred_test = model.predict([test_data['test_x_sequences'], test_data['test_u_sequences']])
    print(model.evaluate([test_data['test_x_sequences'], test_data['test_u_sequences']], test_data['test_y_sequences']))

    train_result = Calculate_Regression_metrics(y_pred_train.flatten(), train_data['train_y_sequences'].reshape(-1, 1),
                                                 label=f'{model_name}_train')
    test_result = Calculate_Regression_metrics(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1),
                                                label=f'{model_name}_test')
    figure_property = {'title': model_name, 'X_label': 'Prediction set samples', 'Y_label': 'Prediction Value'}
    plot_line(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1), figure_property)

    print('train\n ', train_result)

    print('test:\n', test_result)
    # 设置Seaborn样式
    sns.set_style("whitegrid")
    # 创建一个Matplotlib图
    fig, ax = plt.subplots(figsize=(12, 2))
    # 移除坐标轴
    ax.axis('off')
    ax.set_title(f'{model_name}_predictor test result.png', fontsize=16, pad=2)  # pad参数可以调整标题与表格之间的距离
    # 将DataFrame转换为表格
    tab = ax.table(cellText=test_result.values, colLabels=test_result.columns, rowLabels=test_result.index, loc='center')
    # 可以为表格添加样式
    tab.auto_set_font_size(True)
    tab.set_fontsize(12)
    tab.scale(1.0, 1.0)
    # 保存图片
    plt.savefig(f'model_result/{model_name}_predictor_test_result.png', bbox_inches='tight', dpi=500)
    # 显示图片
    plt.show()

不同模型构建方式结果对比--2D--units=32
msermsemaer2mape
方法1(BPMM)0.0002  0.0125 0.0098  0.9909 0.0268
方法2(BPNN1) 0.0002 0.014  0.01150.99110.0309
方法3(BPNN) 0.0001  0.01070.0088 0.9933 0.0246
不同模型构建方式结果对比--1D--units=32
msermsemaer2mape
方法1(BPMM)0.0002 0.01390.011 0.98860.029
方法2(BPNN1)0.0002  0.0144 0.01150.9877 0.0302
方法3(BPNN)0.0001    0.01130.0091 0.99250.0251

2D相较于1D输入较好,方法3模型精度更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/907023.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安宝特案例 | AR技术在院外心脏骤停急救中的革命性应用

00 案例背景 在院外心脏骤停 (OHCA) 的突发救援中,时间与效率直接决定着患者的生命。传统急救模式下,急救人员常通过视频或电话与医院医生进行沟通,以描述患者状况并依照指令行动。然而,这种信息传递方式往往因信息不完整或传递延…

书生大模型第一关Linux基础知识

任务一:完成SSH连接与端口映射并运行hello_world.py 1.SSH及其端口映射 2.在VSCode中安装插件: 3.创建开发机 最后点击创建,然后可能需要等待一段较长的时间,大概需要5分钟左右,如果需要排队则更长时间 然后选择…

openGauss数据库-头歌实验1-5 修改数据库

一、查看表结构与修改表名 (一)任务描述 本关任务:修改表名,并能顺利查询到修改后表的结构。 (二)相关知识 为了完成本关任务,你需要掌握: 1.如何查看表的结构; 2.如…

【机器学习】26. 聚类评估方法

聚类评估方法 1. Unsupervised Measure1.1. Method 1: measure cohesion and separationSilhouette coefficient Method 2:Correlation between two similarity matricesMethod 3:Visual Inspection of similarity matrix 2. Supervised measures3. 决定…

基于stm32单片机的智能循迹小车

功能描述 STM32单片机循迹避障蓝牙控制温度采集烟雾采集火焰探测声光报警按键调节OLED显示 1. STM32单片机为控制核心 2. 通过ds18b20传感器测量环境温度 3. 通过mq-2烟雾传感器测量环境中的烟雾浓度 4. 温度阈值和烟雾浓度阈值可以通过按键进行调节 5. 当温度或者烟雾浓度超过…

【图解版】力扣第70题:爬楼梯

推理出状态表达式 f(5)表示到达第5层,所有可能的方法数。 到达第5层,有可能是从第4层走一步上来,也有可能是从第3层走两步上来。所以我们可以慢慢延伸,画出上面👆🏻的图。 从图中,我们可以看到…

MySQL基础(三)

目录 一. 插入内容insert 1.1 默认插入 1.2 指定某些列插入数据 1.3 一次插入多行 1.4 insert 插入时间 二. 查询数据select(比较复杂) 2.1 全列查询 2.2 指定列查询 2.3 查询字段为表达式 2.4 别名 as 2.5 去重查询 distinct 2.6 排序…

OpenAI 的 Whisper:盛名之下,其实难副?

OpenAI 的 Whisper:盛名之下,其实难副? Whisper 的崛起与承诺 严重缺陷的曝光 风险分析 应对措施 结论 在人工智能的浪潮中,OpenAI 一直以其创新性和强大的技术实力备受瞩目。然而,最近 OpenAI 的语音转写工具 Wh…

在kanzi 3.9.8里使用API创建自定义材质

1. kanzi studio设置 1.1 创建一个纹理贴图,起名Render Target Texture 1.2 创建一个Image节点,使用该贴图 2. 代码设置 2.1 创建一个自定义节点类 class mynode2d : public Node2D { public: virtual void renderOverride(Renderer3D& renderer…

音频中sample rate是什么意思?

‌sample rate‌在数字信号处理中,指的是‌采样频率‌,即每秒钟从连续信号中抽取的样本数量。采样频率越高,信号的还原度越高,但同时也会增加计算负担和存储需求‌。 实际应用场景 在音频处理中,设置合适的采样率可以…

杨辉三角形

大家好,今天给大家分享一下杨辉三角形是如何打印的,首先我们来看看它的原理。 我们先来看结果 1.如果把它看为一个二维数组(包括后面的空格),那么它数字的这边是一个直角三角形,它的第一列和对角线都为1&a…

详解ARM64可执行程序的生成过程

版权归作者所有,如有转发,请注明文章出处:https://cyrus-studio.github.io/blog/ ARM64可执行程序的生成过程 根据 ARM64 可执行程序生成的四个主要步骤:预处理、编译、汇编、链接,我们可以详细分解整个过程如下 1. …

DB-GPT系列(二):DB-GPT部署(镜像一键部署、源码部署)

一、简介 DB-GPT 是一个开源项目,其将大语言模型 LLM 与数据库紧密结合。该项目主要致力于探索如何让预训练的大规模语言模型(例如 GPT)能够直接与数据库进行交互,从而生成更为准确且信息丰富的回答。 DB-GPT部署后能否直接使用…

升序数组两两不相等

题目:给定一个排好升序的数组A[1],A[2],… A[n],其元素的值两两都不相等。请设计一个高效算法,找出其中所有A[]i的下标,并分析其复杂度。 算法分析:一个升序且值都不相等的数组,如果第一个数大于右下标&…

基于vue框架的的乐守护儿童成长记录系统b65tg(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能:用户,成长指标,疫苗接种,学业档案,课外活动,旅游经历,交流论坛 开题报告内容 基于Vue框架的乐守护儿童成长记录系统开题报告 一、研究背景与意义 随着科技的飞速发展和家庭对子女成长关注度的不断提升,如何科学、系统地记…

VSCode 设置环境变量(WSL 2)

环境:openEuler、Windows 11、WSL 2、python 3.12.3 背景:使用vscode连接Windows 的Linux子系统,开发python项目,获取环境变量失败 时间:20241029 说明:使用os.environ获取不到变量,设置/etc…

使用Git LFS管理大型文件

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 使用Git LFS管理大型文件 引言 Git LFS 简介 安装 Git LFS 安装 Git 安装 Git LFS 配置 Git LFS 初始化 Git 仓库 指定需要使用…

RHCE的练习(10)

实验1:反向解析 准备工作 [rootserver ~]# setenforce 0[rootserver ~]# systemctl stop firewalld# 服务端安装bind软件 [rootserver ~]# dnf install bind -y DNS配置 第一步:服务端操作,编辑bind主配置文件 [rootbogon ~]# cat /e…

Redis-结构化value对象的类型

文章目录 一、Redis的结构化value对象类型的介绍二、Redis的这些结构化value对象类型的通用操作查看指定key的数据类型查看所有的key判断指定key是否存在为已存在的key进行重命名为指定key设置存活时间pexpire与expire 查看指定Key的存活时间为指定key设置成永久存活 三、Redis…

产品结构设计(六):结构设计全过程

参考引用 产品结构设计实例教程 1. ID 图及 PCB 堆叠分析 1.1 产品说明及相关资料 1、新产品开发指令单 2、ID 图 3、产品功能规格书 1.2 ID 图分析 ID(Industrial Design,工业设计)是以工业产品为主要对象,综合运用工学、…