基于 TensorFlow Eager 进行深度学习
简介
任务时间:时间未知
1.1. 描述
使用 tensorflow eager execution 演示如何搭建:
- 线性模型
- 深度神经网络
1.2. 导图
以下的所有视频都在 10 分钟内, 线性代数和微积分也通俗易懂。
- 机器学习: 讲解视频 是用 数据 学习 模型 再 应用。
- 线性代数 + 微积分: 讲解视频。
- 深度学习: 讲解视频 模型 与 学习。
1.3. 准备
实验所需库包,在终端运行以下指令。
sudo apt install -y python-pip
安装 tensorflow
pip --no-cache-dir install tensorflow
安装 Ipython
sudo apt install -y ipython
安装 scikit
pip install -U scikit-learn
由于意为练习,将使用 Ipython 来一行行的指令进行演示。
打开 Ipython
ipython
线性模型
任务时间:时间未知
2.1. 数据
导入 TensorFlow
import tensorflow as tf
即显模式
tf.enable_eager_execution()
生成 100 个输入数据 X
eager execution 下 numpy
的 array
数据可以直接送入网络
n_example = 100
X = tf.random_normal([n_example])
增加称重误差得出 Y,其中底层规律是 Y = 2*X。
noise = tf.random_uniform([n_example], -0.5, 0.5)
Y = X * 2 + noise
80 个样本的训练集
train_x = X[:80]
train_y = Y[:80]
20 个样本的测试集
test_x = X[80:]
test_y = Y[80:]
tensorflow
的 tensor
数据类型可以直接 print
。
print(train_x)
使用 .numpy()
转变为 numpy
的 array
类型。
查看数据集形状及单个样本。逗号前代表样本数。
print('训练样本数: %s' %train_x.numpy().shape)
print('第2个训练样本: x = %s, y = %s' %(train_x[1].numpy(),train_y[1].numpy()))
print('测试样本数: %s' %test_x.numpy().shape)
print('第2个训练样本: x = %s, y = %s' %(test_x[1].numpy(),test_y[1].numpy()))
数据形状大致为[?]:
完整代码,请在 /home/ubuntu
目录下创建 linear.py,参考下面的内容。
示例代码:/home/ubuntu/linear.py
#!/usr/bin/python
#coding:utf-8
# 导入 TensorFlow
import tensorflow as tf
# 即显模式
tf.enable_eager_execution()
# 生成数据
n_example = 100
X = tf.random_normal([n_example])
# 称重误差
noise = tf.random_uniform([n_example], -0.5, 0.5)
Y = X * 2 + noise
# 训练集
train_x = X[:80]
train_y = Y[:80]
# 测试集
test_x = X[80:]
test_y = Y[80:]
由于是随机生成的 X,每次会有些许差异。
2.2. 模型
导入用于定义参数的方法。使用 tfe.Variable()
定义用于更新的参数。
import tensorflow.contrib.eager as tfe
直接复制粘贴带有换行的代码到 ipython 下,格式会不正确。
需要使用 %cpaste
来开始复制内容,用 --
来结束复制内容(可直接复制粘贴下面内容到 Ipython)。
%cpaste
class Model(object):
def __init__(self):
# 定义参数 w 和 b,以及如何初始化参数,这里都初始化为 1.0.
# 初始化:随便给 参数 W 和 b 两个值
self.W = tfe.Variable(1.)
self.b = tfe.Variable(1.)
def __call__(self, x):
# 定义正向传递,即如何利用参数 w 和 b 从 x 获得 y。
y = self.W * x + self.b
return y
--
实例化一个模型。
model = Model()
你可以实例化多个不同名字的模型,每个实例化的模型的参数都是独立的。
model2 = Model()
完整代码,请打开 linear.py 插入下面的代码。
示例代码:/home/ubuntu/linear.py
import tensorflow.contrib.eager as tfe
# 定义模型
class Model(object):
def __init__(self):
# 参数
# 初始化:随便给 参数 W 和 b 两个值
self.W = tfe.Variable(1.)
self.b = tfe.Variable(1.)
def __call__(self, x):
# 正向传递
# 如何利用 参数 W 和 b 运算
y = self.W * x + self.b
return y
# 实例模型
model = Model()
2.3. 学习
定义误差函数
%cpaste
# 误差函数
def loss(prediction, label):
# 回归任务一般用 MSE
loss = tf.reduce_mean(tf.square(prediction - label))
return loss
--
反向传递
%cpaste
# 定义训练函数
def train(model, x, y, learning_rate, batch_size, epoch):
# 定义训练次数的循环
for e in range(epoch):
# 定义批量的循环
for b in range(0,len(x.numpy()),batch_size):
# 计算梯度
with tf.GradientTape() as tape:
loss_value = loss(model(x[b:b+batch_size]), y[b:b+batch_size])
dW, db = tape.gradient(loss_value, [model.W, model.b])
# 可以取消掉 # 来在训练过程中显示梯度
#print(dW, db)
# 更新参数
model.W.assign_sub(dW * learning_rate)
model.b.assign_sub(db * learning_rate)
# 显示训练动态
print("Epoch: %03d | Loss: %.3f | W: %.3f | b: %.3f" %(e, loss(model(x), y), model.W.numpy(), model.b.numpy()))
--
执行训练函数
# 请自行更改 学习速率、批量值、次数来体会差异
learning_rate = 0.01
batch_size = 2
epoch = 10
# 训练
train(model, train_x, train_y, learning_rate = learning_rate, batch_size = batch_size, epoch = epoch)
评估训练的模型
# 评估
test_p = model(test_x)
print("Final Test MSE: %s" %loss(test_p, test_y).numpy())
使用已训练的模型批量预测 1 斤 和 2 斤 的西瓜要多少钱。
会得到差不多是斤数的 2 倍的价格。
# 预测
test_p = model([1,2])
print(test_p.numpy())
完整代码,请打开 linear.py 插入下面的代码。
示例代码:/home/ubuntu/linear.py
# 误差
def loss(prediction, label):
loss = tf.reduce_mean(tf.square(prediction - label))
return loss
# 反向传递
def train(model, x, y, learning_rate, batch_size, epoch):
# 次数
for e in range(epoch):
# 批量
for b in range(0,len(x.numpy()),batch_size):
# 梯度
with tf.GradientTape() as tape:
loss_value = loss(model(x[b:b+batch_size]), y[b:b+batch_size])
dW, db = tape.gradient(loss_value, [model.W, model.b])
#print(dW, db)
# 更新参数
model.W.assign_sub(dW * learning_rate)
model.b.assign_sub(db * learning_rate)
# 显示
print("Epoch: %03d | Loss: %.3f | W: %.3f | b: %.3f" %(e, loss(model(x), y), model.W.numpy(), model.b.numpy()))
# 训练
train(model, train_x, train_y, learning_rate = 0.01, batch_size = 2, epoch = 10)
# 评估
test_p = model(test_x)
print("Final Test Loss: %s" %loss(test_p, test_y).numpy())
# 预测
test_p = model([1,2])
print(test_p.numpy())
退出 Ipython
exit
使用 python 文件来实行训练
python linear.py
深层模型
任务时间:时间未知
3.1. 数据
打开 Ipython
ipython
下载数字识别数据
%cpaste
# 国内用户
import subprocess
import tensorflow as tf
import numpy as np
tf.enable_eager_execution()
from tensorflow.examples.tutorials.mnist import input_data
# 下载 mnist 数据集
commands = ['wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-labels-idx1-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-labels-idx1-ubyte.gz']
for c in commands:
subprocess.call(c, shell=True)
mnist = input_data.read_data_sets('./', one_hot=False)
train_x = mnist.train.images.astype('float32')
train_y = mnist.train.labels.astype('int32')
test_x = mnist.test.images.astype('float32')
test_y = mnist.test.labels.astype('int32')
--
显示第 1 个训练样本
print('数字形式的图片:%s' %train_x[0].reshape([28,28]))
print('图片数字:%s' %train_y[0])
显示样本数量
print('训练数据量 %s, %s' %(train_x.shape,train_y.shape))
print('测试数据量 %s, %s' %(test_x.shape,test_y.shape))
完整代码,请在 /home/ubuntu
目录下创建 mnist_nn.py,参考下面的内容。
示例代码:/home/ubuntu/mnist_nn.py
#!/usr/bin/python
#coding:utf-8
# 国内用户
import subprocess
import tensorflow as tf
import numpy as np
tf.enable_eager_execution()
from tensorflow.examples.tutorials.mnist import input_data
# 下载 mnist 数据集
commands = ['wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-labels-idx1-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-labels-idx1-ubyte.gz']
#for c in commands:
#subprocess.call(c, shell=True)
mnist = input_data.read_data_sets('./', one_hot=False)
train_x = mnist.train.images.astype('float32')
train_y = mnist.train.labels.astype('int32')
test_x = mnist.test.images.astype('float32')
test_y = mnist.test.labels.astype('int32')
print('训练数据量 %s, %s' %(train_x.shape,train_y.shape))
print('测试数据量 %s, %s' %(test_x.shape,test_y.shape))
3.2. 模型
创建深层模型类
%cpaste
import tensorflow.contrib.eager as tfe
# 定义模型类
class Model(object):
# 初始化方法
def inits(self, shape):
return tf.random_uniform(shape,
minval=-np.sqrt(5) * np.sqrt(1.0 / shape[0]),
maxval=np.sqrt(5) * np.sqrt(1.0 / shape[0]))
def __init__(self):
# 参数初始化
self.W1 = tfe.Variable(self.inits([784,256]))
self.b1 = tfe.Variable(self.inits([256]))
self.W2 = tfe.Variable(self.inits([256,128]))
self.b2 = tfe.Variable(self.inits([128]))
self.W = tfe.Variable(self.inits([128,10]))
self.b = tfe.Variable(self.inits([10]))
def __call__(self, x):
# 正向传递
# tf.nn.relu 是非线性函数
# tf.matmul 是矩阵乘法,输入放在前,参数放在后
h1 = tf.nn.relu(tf.matmul(x, self.W1) + self.b1)
h2 = tf.nn.relu(tf.matmul(h1, self.W2) + self.b2)
y = tf.matmul(h2, self.W) + self.b
return y
# 实例模型
model = Model()
--
完整代码,请打开 mnist_nn.py 插入下面的代码。
示例代码:/home/ubuntu/mnist_nn.py
import tensorflow.contrib.eager as tfe
# 定义模型类
class Model(object):
# 初始化方法
def inits(self, shape):
return tf.random_uniform(shape,
minval=-np.sqrt(5) * np.sqrt(1.0 / shape[0]),
maxval=np.sqrt(5) * np.sqrt(1.0 / shape[0]))
def __init__(self):
# 参数初始化
self.W1 = tfe.Variable(self.inits([784,256]))
self.b1 = tfe.Variable(self.inits([256]))
self.W2 = tfe.Variable(self.inits([256,128]))
self.b2 = tfe.Variable(self.inits([128]))
self.W = tfe.Variable(self.inits([128,10]))
self.b = tfe.Variable(self.inits([10]))
def __call__(self, x):
# 正向传递
# tf.nn.relu 是非线性函数
# tf.matmul 是矩阵乘法,输入放在前,参数放在后
h1 = tf.nn.relu(tf.matmul(x, self.W1) + self.b1)
h2 = tf.nn.relu(tf.matmul(h1, self.W2) + self.b2)
y = tf.matmul(h2, self.W) + self.b
return y
# 实例模型
model = Model()
3.3. 学习
定义误差函数
%cpaste
# 误差函数
def loss(logits, label):
# 交叉熵函数
loss = tf.losses.sparse_softmax_cross_entropy(labels=label, logits=logits)
return loss
--
定义训练函数
%cpaste
from sklearn.metrics import accuracy_score
# 日志消息按问题严重性升序排列分别是 DEBUG,INFO,WARN,ERROR,FATAL 。
tf.logging.set_verbosity(tf.logging.ERROR)
# 更新方式
def train(model, x, y, learning_rate, batch_size, epoch):
# 更新次数
for e in range(epoch):
# 批量更新
for b in range(0,len(x),batch_size):
# 计算梯度
with tf.GradientTape() as tape:
loss_value = loss(model(np.array(x[b:b+batch_size])), np.array(y[b:b+batch_size]))
dW1, db1, dW2, db2, dW, db = tape.gradient(loss_value,
[model.W1, model.b1, model.W2, model.b2, model.W, model.b])
# 训练更新
model.W1.assign_sub(dW1 * learning_rate)
model.b1.assign_sub(db1 * learning_rate)
model.W2.assign_sub(dW2 * learning_rate)
model.b2.assign_sub(db2 * learning_rate)
model.W.assign_sub(dW * learning_rate)
model.b.assign_sub(db * learning_rate)
# 显示
# 训练集太大,内存不够,所以每次以 batch size 个来的计算预测值
train_p = tf.concat([model(train_x[b:b+batch_size]) for b in range(0,len(train_x),batch_size)],axis=0)
test_p = model(test_x)
print("Epoch: %03d | train loss: %.3f | train acc: %.3f | test loss: %.3f | test acc: %.3f"
%(e, loss(train_p, train_y), accuracy_score(tf.argmax(train_p,1), train_y),
loss(test_p, test_y), accuracy_score(tf.argmax(test_p,1), test_y)))
--
执行训练
# 请自行更改 学习速率、批量值、次数来体会差异
learning_rate = 0.01
batch_size = 128
epoch = 10
# 训练
# 打乱顺序
r = np.random.permutation(len(train_y))
# 送入打乱顺序的 train_x 和 train_y
train(model, [train_x[i] for i in r], [train_y[i] for i in r], learning_rate = learning_rate, batch_size = batch_size, epoch = epoch)
评估
test_p = model(test_x)
# 使用
print("Final Test Loss: %s" %accuracy_score(tf.argmax(test_p,1), test_y))
完整代码,请打开 mnist_nn.py 插入下面的代码。
示例代码:/home/ubuntu/mnist_nn.py
from sklearn.metrics import accuracy_score
# 误差函数
def loss(logits, label):
loss = tf.losses.sparse_softmax_cross_entropy(labels=label, logits=logits)
return loss
# 日志消息按问题严重性升序排列分别是 DEBUG,INFO,WARN,ERROR,FATAL 。
tf.logging.set_verbosity(tf.logging.ERROR)
from sklearn.metrics import accuracy_score
# 日志消息按问题严重性升序排列分别是 DEBUG,INFO,WARN,ERROR,FATAL 。
tf.logging.set_verbosity(tf.logging.ERROR)
# 更新方式
def train(model, x, y, learning_rate, batch_size, epoch):
# 更新次数
for e in range(epoch):
# 批量更新
for b in range(0,len(x),batch_size):
# 计算梯度
with tf.GradientTape() as tape:
loss_value = loss(model(np.array(x[b:b+batch_size])), np.array(y[b:b+batch_size]))
dW1, db1, dW2, db2, dW, db = tape.gradient(loss_value,
[model.W1, model.b1, model.W2, model.b2, model.W, model.b])
# 训练更新
model.W1.assign_sub(dW1 * learning_rate)
model.b1.assign_sub(db1 * learning_rate)
model.W2.assign_sub(dW2 * learning_rate)
model.b2.assign_sub(db2 * learning_rate)
model.W.assign_sub(dW * learning_rate)
model.b.assign_sub(db * learning_rate)
# 显示
# 训练集太大,内存不够,所以每次以 batch size 个来的计算预测值
train_p = tf.concat([model(train_x[b:b+batch_size]) for b in range(0,len(train_x),batch_size)],axis=0)
test_p = model(test_x)
print("Epoch: %03d | train loss: %.3f | train acc: %.3f | test loss: %.3f | test acc: %.3f"
%(e, loss(train_p, train_y), accuracy_score(tf.argmax(train_p,1), train_y),
loss(test_p, test_y), accuracy_score(tf.argmax(test_p,1), test_y)))
# 训练
r = np.random.permutation(len(train_y))
# 送入打乱顺序的 train_x 和 train_y
train(model, [train_x[i] for i in r], [train_y[i] for i in r], learning_rate = 0.001, batch_size = 128, epoch = 10)
# 评估
test_p = model(test_x)
print("Final Test Loss: %s" %accuracy_score(tf.argmax(test_p,1), test_y))
推出 Ipython
exit
使用 python 文件来实行训练
python mnist_nn.py
3.4. keras
随着网络的加深,自己不断的定义和更新参数会很麻烦、易出错。
所以可以使用 keras API 来替换以下的部分,所完成的工作都是相同的。
下面的代码不需要运行,观察差别就可以。
3.4.1. 模型部分替换
%cpaste
# 初始化方法
#def inits(shape):
# return tf.random_uniform(shape,
# minval=-np.sqrt(5) * np.sqrt(1.0 / shape[0]),
# maxval=np.sqrt(5) * np.sqrt(1.0 / shape[0]))
# 定义模型
#class Model(object):
# def __init__(self):
# # 参数初始化
# self.W1 = tfe.Variable(inits([784,512]))
# self.b1 = tfe.Variable(inits([512]))
# self.W2 = tfe.Variable(inits([512,256]))
# self.b2 = tfe.Variable(inits([256]))
# self.W3 = tfe.Variable(inits([256,10]))
# self.b3 = tfe.Variable(inits([10]))
# def __call__(self, x):
# # 正向传递
# y = tf.nn.relu(tf.matmul(x, self.W1) + self.b1)
# y = tf.nn.relu(tf.matmul(y, self.W2) + self.b2)
# y = tf.matmul(y, self.W3) + self.b3
# return y
# 实例模型
#model = Model()
# 以下的代码等价于以上被注释的代码
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation=tf.nn.relu, input_shape=(784,)),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Dense(10)
])
--
3.4.2. 参数更新部分替换
%cpaste
# 训练更新
# model.W1.assign_sub(dW1 * learning_rate)
# model.b1.assign_sub(db1 * learning_rate)
# model.W2.assign_sub(dW2 * learning_rate)
# model.b2.assign_sub(db2 * learning_rate)
# model.W3.assign_sub(dW3 * learning_rate)
# model.b3.assign_sub(db3 * learning_rate)
# 以下的代码等价于以上被注释的代码
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
optimizer.apply_gradients(zip(grads, model.variables),
global_step=tf.train.get_or_create_global_step())
--
3.4.2. 记录和显示部分替换
%cpaste
# 训练集太大,内存不够,所以每次以 batch size 个来的计算预测值
#train_p = tf.concat([model(train_x[b:b+batch_size]) for b in range(0,len(train_x),batch_size)],axis=0)
#test_p = model(test_x)
#print("Epoch: %03d | train loss: %.3f | train acc: %.3f | test loss: %.3f | test acc: %.3f"
# %(e, loss(train_p, train_y), accuracy_score(tf.argmax(train_p,1), train_y),
# loss(test_p, test_y), accuracy_score(tf.argmax(test_p,1), test_y)))
# 以下的代码等价于以上被注释的代码
# 创建用于记录训练集误差和准确率的方法
epoch_loss_avg = tfe.metrics.Mean()
epoch_accuracy = tfe.metrics.Accuracy()
# 批量更新
for b in range(0,len(x),batch_size):
# ... 略
# 在每个 batch 循环内部,记录训练集每个 batch 的误差和准确率
epoch_loss_avg(loss_value)
epoch_accuracy(tf.argmax(model(np.array(x[b:b+batch_size])), axis=1, output_type=tf.int32), np.array(y[b:b+batch_size]))
每个 batch 的平均误差和准确率就是
epoch_loss_avg.result()
epoch_accuracy.result()
--
最后使用 API 来进行训练,请在 /home/ubuntu
目录下创建 API.py,参考下面的内容。
示例代码:/home/ubuntu/API.py
#!/usr/bin/python
#coding:utf-8
# 国内用户
import subprocess
import tensorflow as tf
import numpy as np
tf.enable_eager_execution()
from tensorflow.examples.tutorials.mnist import input_data
# 下载 mnist 数据集
commands = ['wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/t10k-labels-idx1-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-images-idx3-ubyte.gz',
'wget https://devlab-1251520893.cos.ap-guangzhou.myqcloud.com/train-labels-idx1-ubyte.gz']
#for c in commands:
# subprocess.call(c, shell=True)
mnist = input_data.read_data_sets('./', one_hot=False)
train_x = mnist.train.images.astype('float32')
train_y = mnist.train.labels.astype('int32')
test_x = mnist.test.images.astype('float32')
test_y = mnist.test.labels.astype('int32')
print('训练数据量 %s, %s' %(train_x.shape,train_y.shape))
print('测试数据量 %s, %s' %(test_x.shape,test_y.shape))
import tensorflow.contrib.eager as tfe
# 使用 keras 定义模型(与之前的代码等价)
model = tf.keras.Sequential([
tf.keras.layers.Dense(512, activation=tf.nn.relu, input_shape=(784,)),
tf.keras.layers.Dense(256, activation=tf.nn.relu),
tf.keras.layers.Dense(10)
])
from sklearn.metrics import accuracy_score
# 误差函数
def loss(logits, label):
loss = tf.losses.sparse_softmax_cross_entropy(labels=label, logits=logits)
return loss
# 日志消息按问题严重性升序排列分别是 DEBUG,INFO,WARN,ERROR,FATAL 。
tf.logging.set_verbosity(tf.logging.ERROR)
# 更新方式
def train(model, x, y, learning_rate, batch_size, epoch):
# 更新次数
for e in range(epoch):
# 创建用于记录训练集误差和准确率的方法
epoch_loss_avg = tfe.metrics.Mean()
epoch_accuracy = tfe.metrics.Accuracy()
# 批量更新
for b in range(0,len(x),batch_size):
# 计算梯度
with tf.GradientTape() as tape:
loss_value = loss(model(np.array(x[b:b+batch_size])), np.array(y[b:b+batch_size]))
grads = tape.gradient(loss_value, model.variables)
# 训练更新
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
optimizer.apply_gradients(zip(grads, model.variables),
global_step=tf.train.get_or_create_global_step())
# 因为记忆量限制,记录训练集每个 batch 的误差和准确率
epoch_loss_avg(loss_value)
epoch_accuracy(tf.argmax(model(np.array(x[b:b+batch_size])), axis=1, output_type=tf.int32), np.array(y[b:b+batch_size]))
# 显示
test_p = model(test_x)
print("Epoch: %03d | train loss: %.3f | train acc: %.3f | test loss: %.3f | test acc: %.3f"
%(e, epoch_loss_avg.result(), epoch_accuracy.result(),
loss(test_p, test_y), accuracy_score(tf.argmax(test_p,1), test_y)))
# 训练
r = np.random.permutation(len(train_y))
# 送入打乱顺序的 train_x 和 train_y
train(model, [train_x[i] for i in r], [train_y[i] for i in r], learning_rate = 0.001, batch_size = 128, epoch = 10)
# 评估
test_p = model(test_x)
print("Final Test Loss: %s" %accuracy_score(tf.argmax(test_p,1), test_y))
使用 keras API 来训练
python API.py