Tensorflow2.0学习——自动求导机制和tf2.0自定义模型训练方式

思维导图

自动求导机制

这里开始就有点像Pytorch的感觉了,和后面的tf2.0自定义模型训练方式一样,和Pytorch的感觉太像了。
GradientTape是eager模式下计算梯度用的

  • watch(tensor)

    作用:确保某个tensor被tape追踪

    参数: tensor: 一个Tensor或者一个Tensor列表

  • gradient(target, sources)

    作用:根据tape上面的上下文来计算某个或者某些tensor的梯度参数

    target: 被微分的Tensor或者Tensor列表,你可以理解为经过某个函数之后的值

    sources: Tensors 或者Variables列表(当然可以只有一个值). 你可以理解为函数的某个变量

    返回:

    一个列表表示各个变量的梯度值,和source中的变量列表一一对应,表明这个变量的梯度。

    上面的例子中的梯度计算部分可以更直观的理解这个函数的用法。

x = tf.constant(3.0)

with tf.GradientTape() as g:
    g.watch(x)
    y = x * x
dy_dx = g.gradient(y, x)  # y’ = 2*x = 2*3 = 6

dy_dx
# 输出:
#<tf.Tensor: shape=(), dtype=float32, numpy=6.0>

tf.GradientTape详解

tf.GradientTape(
persistent=False,
watch_accessed_variables
=True
)
  • persistent : 用来指定新创建的 gradient tape 是否是可持续性的。默认是 False意味着只能够调用一次 gradient ()函数。
  • watch_accessed_variables : 表明这个 GradientTape 是不是会自动追踪任何能被训练( trainable )的变量。默认是True 。要是为 False 的话,意味着你需要手动去指定你想追踪的那些变量。
gradient(target, sources)

作用:根据tape 上面的上下文来计算某个或者某些 tensor 的梯度参数
– target:被微分的 Tensor ,你可以理解为 loss 值 针对深度学习训练来说
– sources:Tensors 或者 Variables 列表(当然可以只有一个值) ).
– 返回
一个列表表示各个变量的梯度值,和source 中的变量列表一一对应,表明这个变量的梯度。


自定义模型训练方式

案例1 模型自动求导

构建模型(神经网络的前向传播) –> 定义损失函数 –> 定义优化函数 –> 定义tape –> 模型得到预测值 –> 前向传播得到loss –> 反向传播 –> 用优化函数将计算出来的梯度更新到变量上面去
先定义一个网络

class MyModel(tf.keras.Model):

    def __init__(self, num_classes=10):
        super(MyModel, self).__init__(name='my_model')
        self.num_classes = num_classes
        # 定义自己需要的层
        self.dense_1 = tf.keras.layers.Dense(32, activation='relu') #隐藏层
        self.dense_2 = tf.keras.layers.Dense(num_classes)#输出层

    def call(self, inputs):
        #定义前向传播
        # 使用在 (in `__init__`)定义的层
        x = self.dense_1(inputs)
        return self.dense_2(x)

随机构建数据集

import numpy as np
# 10分类问题
data = np.random.random((1000, 32))
labels = np.random.random((1000, 10))

自定义训练过程

model = MyModel(num_classes=10)

loss_object = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()

with tf.GradientTape() as tape:
    predictions = model(data)
    loss = loss_object(labels, predictions)

gradients = tape.gradient(loss, model.trainable_variables) #求梯度

optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # 
# 可以查看模型中训练的变量
model.trainable_variables

解释一下上面的程序

loss_object = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

with tf.GradientTape as tape:
    predictions = model(data)
    loss = loss_object ( predictions)
gradients = tape.gradient (loss, model.trainable_variables)
optimizer.apply_gradients(zip (gradients, model.trainable_variables))

一般在网络中使用时, 不需要显式调用 watch 函数 ,使用默认设置,GradientTape 会监控可训练变量。

with tf.GradientTape as tape:

利用这个语句可以自定义我们的模型训练过程,它的作用相当于model.fit(),但是显然,我们自定义的模型训练过程要具有更高的自由度,更加的灵活。

apply_gradients(grads_and_vars,name = None)
作用:把计算出来的梯度更新到变量上面去。
参数含义
grads_and_vars : (gradient, variable) 对的列表
name:操作名


案例 使用GradientTape自定义训练模型

class MyModel(tf.keras.Model):

    def __init__(self, num_classes=10):
        super(MyModel, self).__init__(name='my_model')
        self.num_classes = num_classes
        # 定义自己需要的层
        self.dense_1 = tf.keras.layers.Dense(32, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(num_classes)

    def call(self, inputs):
        #定义前向传播
        # 使用在 (in `__init__`)定义的层
        x = self.dense_1(inputs)
        return self.dense_2(x)

import numpy as np

data = np.random.random((1000, 32))
labels = np.random.random((1000, 10))

model = MyModel(num_classes=10)

# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = tf.keras.losses.CategoricalCrossentropy()

# Prepare the training dataset.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((data, labels))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

前面是构建模型、构建数据集、选择优化器和loss等步骤。下面是自定义模型训练的代码。

epochs = 3
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))

    # 遍历数据集的batch_size
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):


        # 打开GradientTape以记录正向传递期间运行的操作,这将启用自动区分。
        with tf.GradientTape() as tape:

            # 运行该模型的前向传播。 模型应用于其输入的操作将记录在GradientTape上。
            logits = model(x_batch_train, training=True)  # 这个minibatch的预测值

            # 计算这个minibatch的损失值
            loss_value = loss_fn(y_batch_train, logits)

        # 使用GradientTape自动获取可训练变量相对于损失的梯度。
        grads = tape.gradient(loss_value, model.trainable_weights)

        # 通过更新变量的值来最大程度地减少损失,从而执行梯度下降的一步。
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # 每200 batches打印一次.
        if step % 200 == 0:
            print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
            print('Seen so far: %s samples' % ((step + 1) * 64))

上面的过程相当于model.fit(),只不过model.fit()中还有评估函数,下面的案例将加入metrics


案例:使用GradientTape自定义训练模型进阶(加入评估函数)

让我们将metric添加到组合中。下面可以在从头开始编写的训练循环中随时使用内置指标(或编写的自定义指标)。流程如下:
– 在循环开始时初始化metrics
– metric.update_state():每个batch之后更新
– metric.result():需要显示metrics的当前值时调用
– metric.reset_states():需要清除metrics状态时重置(通常在每个epoch的结尾)

class MyModel(tf.keras.Model):

    def __init__(self, num_classes=10):
        super(MyModel, self).__init__(name='my_model')
        self.num_classes = num_classes
        # 定义自己需要的层
        self.dense_1 = tf.keras.layers.Dense(32, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(num_classes)

    def call(self, inputs):
        #定义前向传播
        # 使用在 (in `__init__`)定义的层
        x = self.dense_1(inputs)
        return self.dense_2(x)

import numpy as np
x_train = np.random.random((1000, 32))
y_train = np.random.random((1000, 10))
x_val = np.random.random((200, 32))
y_val = np.random.random((200, 10))
x_test = np.random.random((200, 32))
y_test = np.random.random((200, 10))

# 优化器
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
# 损失函数
loss_fn = tf.keras.losses.CategoricalCrossentropy(from_logits=True)

# 准备metrics函数
train_acc_metric = tf.keras.metrics.CategoricalAccuracy()
val_acc_metric = tf.keras.metrics.CategoricalAccuracy()

# 准备训练数据集
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# 准备测试数据集
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

进行几个epoch运行训练循环:

model = MyModel(num_classes=10)
epochs = 3
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))

    # 遍历数据集的batch_size
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        #一个batch
        with tf.GradientTape() as tape:
            logits = model(x_batch_train)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))####

        # 更新训练集的metrics
        train_acc_metric(y_batch_train, logits)     

    # 在每个epoch结束时显示metrics。
    train_acc = train_acc_metric.result()
    print('Training acc over epoch: %s' % (float(train_acc),))
    # 在每个epoch结束时重置训练指标
    train_acc_metric.reset_states()#!!!!!!!!!!!!!!!

    # 在每个epoch结束时运行一个验证集。
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val)
        # 更新验证集merics
        val_acc_metric(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    print('Validation acc: %s' % (float(val_acc),))
    val_acc_metric.reset_states()

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注