迁移学习
Inceptipn-v3模型
Inception-v3模型中的Inception结构是将不同的卷积层通过并联的方式结合在一起。其卷积层使用了不同尺寸的过滤器,然后将得到的矩阵拼接起来。图一是一个Inception模块的一个单元结构图:
它会使用不同尺寸的过滤器处理输入矩阵,由图中可以看到,最上面的矩阵是使用了边长为1的过滤器的卷积层前向传播结果。类似的,中间矩阵使用的过滤器边长为3,下方矩阵使用的过滤器边长为5。不同的矩阵代表了Inception模块中的一条计算路径,虽然过滤器大小不同,但若所有的过滤器都是用全0填充且步长为1,那么前向传播得到的结果长和宽都与输入矩阵一致。这样经过不同过滤器处理的结果可以拼接成一个更深的矩阵,如上图所示,可以将它们在深度这个维度上组合起来。上图所示的Inception模块得到的结果矩阵的长和宽输入一样,深度为三个矩阵深度的和。
如上图所示,Inception-v3模型总共有46层,由11个Inception模块组成,共有96个卷积层,因此代码量较大,给出实现模型结构中红框处的实现代码。
with slim.arg_scope([slim.conv2d,slim.max_pool2d,slim.avg_pool2d], stride = 1 ,padding = 'SAME' ): ... net = 上一层的输出节点矩阵 with tf.variable_scope('last_inception' ): with tf.variable_scope('Branch_0' ): branch_0 = slim.conv2d(net,320 ,[1 ,1 ],scope = 'Conv2d_0a_lxl' ) with tf.variable_scope('Branch_1' ): branch_1 = slim.conv2d(net,384 ,[1 ,1 ],scope = 'Conv2d_0a_1x1' ) branch_1 = tf.concat(3 ,[ slim.conv2d(branch_1,384 ,[1 ,3 ],scope = 'Conv2d_0b_1x3' ), slim.conv2d(branch_1,384 ,[3 ,1 ],scope = 'Conv2d_0b_3x1' )]) with tf.variable_scope('Branch_2' ): branch_2 = slim.conv2d(net,448 ,[1 ,1 ],scope = 'Conv2d_0b_3x3' ) branch_2 = slim.conv2d(branch_2,384 ,[3 ,3 ],scope = 'Conv2d_0b_3x3' ) branch_2 = tf.concat(3 ,[ slim.conv2d(branch_2,384 ,[1 ,3 ],scope = 'Conv2d_0c_1x3' ), slim.conv2d(branch_2,384 ,[3 ,1 ],scope = 'Conv2d_0c_3x1' )]) with tf.variable_scope('Branch_3' ): branch_3 = slim.avg_pool2d(net,[3 ,3 ],scope = 'AvgPool_0a_3x3' ) branch_3 = slim.conv2d(branch_3,192 ,[1 ,1 ],scope = 'Conv2d_0b_1x1' ) net = tf.concat(3 ,[branch_0,branch_1,branch_2,branch_3])
模型的迁移学习
所谓迁移学习,就是将一个问题上训练好的模型通过简单的调整使其适用于一个新的问题。根据论文DeCAF中的结论,可以保留训练好的Inception-3模型中所有卷积层的参数,只是替换最后一层全连接层,在最后这一层全连接层之前的网络层称之为瓶颈层。
将新的图像通过训练好的卷积神经网络直到瓶颈层的过程可以看成是对图像进行特征提取的过程。在训练好的Inception-3模型中,因为将瓶颈层的输出再通过一个单层的全连接层神经网络可以很好的区分1000种类别的图像,所以瓶颈层输出的节点向量可以直接利用这个训练好的神经网络对图像进行特征提取,然后再将提取到的特征向量作为输入来训练一个新的单层全连接神经网络处理的分类问题。
但是,在数据量足够的情况下,迁移学习的效果远远不如完全重新训练。但迁移学习所需要的时间较短。
使用TensorFlow进行迁移学习
准备数据
当前实验所需要的数据集为花的图片,存放于“flower_photo”文件夹下,其中共有5个文件夹,每个文件夹对应一种花的图片,每个种类的图片数量均大于500。
每种花的图片数据如下图所示:
数据分类
初步处理完数据后,将数据分为训练集和测试集,并且将分类结果存储在result字典中。
def creat_image_list(testing_persentage, validation_persentage): # result为字典,存储所有图片,key:label_name,value:字典,存储图片名称、数据集属性等 result = {} # 获取当前目录的所有子目录 sub_dirs = [x[0] for x in os.walk(INPUT_DATA)] is_root_dir = True for sub_dir in sub_dirs: # 得到的第一个为当前目录,跳过 if is_root_dir: is_root_dir = False continue # 获取当前目录下所有有效图片文件 extensions = ['jpg', 'jpeg', 'JPG', 'JPEG'] file_list = [] dir_name = os.path.basename(sub_dir) # 文件名,如“daisy” for extension in extensions: file_glob = os.path.join(INPUT_DATA, dir_name, "*." + extension) file_list.extend(glob.glob(file_glob)) if not file_list: continue # 目录名为类的名称,如daisy label_name = dir_name.lower() # 初始化当前类的训练集、测试集、验证集 training_images = [] testing_images = [] validation_images = [] for file_name in file_list: base_name = os.path.basename(file_name) # base_name为图片的名字,如:‘5673551_01d1ea993e_n.jpg’ chance = np.random.randint(100) # 随机将图片分配给三个数据集 if chance < validation_persentage: validation_images.append(base_name) elif chance < (testing_persentage + validation_persentage): testing_images.append(base_name) else: training_images.append(base_name) # 将当前类别数据放入结果字典 result[label_name] = { 'dir': dir_name, 'training': training_images, 'testing': testing_images, 'validation': validation_images } return result
首先从数据文件夹中读取所有的图片列表,按照训练和测试数据分开,其中testing_percentage
和 validation_percentage
指定了测试数据和验证数据集的大小。
辅助函数
在将数据喂给神经网络之前,先定义一些辅助函数,有利于帮助简化程序 #### 获取图片地址 此函数可以通过类别名称、所属数据集和图片编号获取图片的地址。类别名称就是图片所属的文件夹名称,所属数据类别是其为训练集还是测试集,图片编号即为图片名字。
def get_image_path(image_lists, image_dir, label_name, index, category): label_lists = image_lists[label_name] # 获取label_name这个类别的所有图片信息 category_list = label_lists[category] # 获取指定类中三个数据集中指定的数据集,如:training中的图片信息 mod_index = index % len(category_list) base_name = category_list[mod_index] # 获取指定编号的图片的文件名 sub_dir = label_lists['dir'] full_path = os.path.join(image_dir, sub_dir, base_name) # 最终的path为image_dir/sub_dir/base_name return full_path
image_lists:所有图片信息
image_dir:根目录
label_name:类别名称
index:图片编号
category:图片所在集合为训练集、测试集还是验证集
获取特征向量地址
通过所有图片信息、所属类别(如daisy)、图片编号、所属数据集(如training)获取通过模型之后的特征向量地址。参数比get_image_path方法少了image_dir,替换成CACHE_DIR为保存模型变量的临时文件存储文件夹。
def get_bottleneck_path (image_lists, label_name, index, category) : return get_image_path(image_lists, CACHE_DIR, label_name, index, category) + '.txt'
加载Inception-V3模型
读取训练好的.pb格式的模型,并且返回数据输入所对应的张量及计算瓶颈层结果所对应的张量。
def load_model () : with gfile.FastGFile(os.path.join(MODEL_DIR, MODEL_FILE), 'rb' ) as f: graph_def = tf.GraphDef() graph_def.ParseFromString(f.read()) bottleneck_tensor, jpeg_data_tensor = tf.import_graph_def( graph_def, return_elements=[BOTTLENECK_TENSOR_NAME, JPEG_DATA_TENSOR_NAME]) return bottleneck_tensor, jpeg_data_tensor
判断当前文件夹是否存在
确保输入路径存在,若不存在则创建
def ensure_dir_exists (dir_name) : if not os.path.exists(dir_name): os.makedirs(dir_name)
训练过程
计算当前图片的瓶颈张量值
将当前图片作为输入计算bottleneck_tensor的值,返回值为特征向量。从此处开始需要用到session,因为这时已经开始进行前向传播的过程。
def run_bottleneck_on_image (sess, image_data, image_data_tensor, bottleneck_tensor) : bottleneck_values = sess.run(bottleneck_tensor, {image_data_tensor: image_data}) bottenneck_values = np.squeeze(bottleneck_values) return bottenneck_values
获取图片的特征向量
首先视图寻找已经计算且保存下来的特征向量,如果找不到该特征向量,再计算特征向量并保存到文件。
def get_or_create_bottleneck (sess, image_lists, label_name, index, category, jpeg_data_tensor, bottleneck_tensor) : label_lists = image_lists[label_name] sub_dir = label_lists['dir' ] sub_dir_path = os.path.join(CACHE_DIR, sub_dir) ensure_dir_exists(sub_dir_path) bottleneck_path = get_bottleneck_path(image_lists, label_name, index, category) if not os.path.exists(bottleneck_path): image_path = get_image_path(image_lists, INPUT_DATA, label_name, index, category) image_data = gfile.FastGFile(image_path, 'rb' ).read() bottelneck_values = run_bottleneck_on_image( sess, image_data, jpeg_data_tensor, bottleneck_tensor) bottleneck_string = ',' .join(str(x) for x in bottelneck_values) with open(bottleneck_path, 'w' ) as bottleneck_file: bottleneck_file.write(bottleneck_string) else : with open(bottleneck_path, 'r' ) as bottleneck_file: bottleneck_string = bottleneck_file.read() bottelneck_values = [float(x) for x in bottleneck_string.split(',' )] return bottelneck_values
小批量获取特征向量
随机获取一个batch的图片作为训练数据
def get_random_cached_bottlenecks (sess, image_lists, how_many, category, jpeg_data_tensor, bottleneck_tensor) : n_classes = len(image_lists.keys()) bottenecks = [] ground_truths = [] for _ in range(how_many): label_index = random.randrange(n_classes) label_name = list(image_lists.keys())[label_index] image_index = random.randrange(MAX_NUM_IMAGES_PER_CLASS + 1 ) botteneck = get_or_create_bottleneck( sess, image_lists, label_name, image_index, category, jpeg_data_tensor, bottleneck_tensor) ground_truth = np.zeros(n_classes, dtype=np.float32) ground_truth[label_index] = 1.0 bottenecks.append(botteneck) ground_truths.append(ground_truth) return bottenecks, ground_truths
定义全连接层
训练过程,增加最后一个全连接层进行训练。同时定义其损失函数为交叉熵函数。
def add_final_training_ops (class_count, final_tensor_name, bottleneck_tensor) : with tf.name_scope('input' ): bottleneck_input = tf.placeholder_with_default( bottleneck_tensor, shape=[None , BOTTLENECK_TENSOR_SIZE], name='BottleneckInputPlaceholder' ) ground_truth_input = tf.placeholder(tf.float32, [None , class_count], name='GroundTruthInput' ) layer_name = 'final_training_ops' with tf.name_scope(layer_name): with tf.name_scope('weights' ): layer_weights = tf.Variable(tf.truncated_normal([BOTTLENECK_TENSOR_SIZE, class_count], stddev=0.001 ), name='final_weights' ) with tf.name_scope('biases' ): layer_biases = tf.Variable(tf.zeros([class_count]), name='final_biases' ) with tf.name_scope('Wx_plus_b' ): logits = tf.matmul(bottleneck_input, layer_weights) + layer_biases final_tensor = tf.nn.softmax(logits, name=final_tensor_name) with tf.name_scope('cross_entropy' ): cross_entropy = tf.nn.softmax_cross_entropy_with_logits( logits=logits, labels=ground_truth_input) with tf.name_scope('total' ): cross_entropy_mean = tf.reduce_mean(cross_entropy) with tf.name_scope('train' ): train_step = tf.train.GradientDescentOptimizer(LEARNING_RATE).minimize( cross_entropy_mean) return train_step, cross_entropy_mean, bottleneck_input, \ ground_truth_input, final_tensor
计算正确率
evaluaion_step可以用于计算训练集和测试集的正确率。
def add_evaluation_step (result_tensor, ground_truth_tensor) : with tf.name_scope('accuracy' ): with tf.name_scope('correct_prediction' ): correct_prediction = tf.equal(tf.argmax(result_tensor, 1 ), tf.argmax(ground_truth_tensor, 1 )) with tf.name_scope('accuracy' ): evaluation_step = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) return evaluation_step
测试过程
在最终测试的时候需要在所有的测试数据上计算正确率,此需要获取测试数据特征值。
def get_test_bottenecks (sess, image_lists, n_classes, jpeg_data_tensor, botteneck_tensor) : bottenecks = [] ground_truths = [] label_name_list = list(image_lists.keys()) for label_index, label_name in enumerate(label_name_list): category = 'testing' for index, unused_base_name in enumerate(image_lists[label_name][category]): botteneck = get_or_create_bottleneck( sess, image_lists, label_name, index, category, jpeg_data_tensor, botteneck_tensor) ground_truth = np.zeros(n_classes, dtype=np.float32) ground_truth[label_index] = 1.0 bottenecks.append(botteneck) ground_truths.append(ground_truth) return bottenecks, ground_truths
主函数
创建函数框架,创建main函数,在主函数中的操作为:
加载训练数据
读取V3模型,获取瓶颈层参数
定义最后一层全连接层及损失函数
训练模型
测试模型
MODEL_INPUT_WIDTH = 299 MODEL_INPUT_HEIGHT = 299 MODEL_INPUT_DEPTH = 3 BOTTLENECK_TENSOR_SIZE = 2048 BOTTLENECK_TENSOR_NAME = "pool_3/_reshape:0" JPEG_DATA_TENSOR_NAME = "DecodeJpeg/contents:0" MODEL_DIR = "./model/inception_dec_2015" MODEL_FILE = "tensorflow_inception_graph.pb" CACHE_DIR = "./cache" INPUT_DATA = "./flower_photos" SAVED_MODEL_DIR = "./model/output_model/flower_recognition.pb" OUTPUT_LABEL = "./model/output_model/output_labels.txt" VALIDATION_PERCENTAGE = 10 TEST_PERCENTAGE = 10 LEARNING_RATE = 0.01 STEPS = 4000 BATCH = 128 MAX_NUM_IMAGES_PER_CLASS = 2 ** 27 - 1 def main (_) : image_lists = creat_image_list(TEST_PERCENTAGE, VALIDATION_PERCENTAGE) graph, bottleneck_tensor, jpeg_data_tensor = ( create_inception_graph()) with tf.Session() as sess: config = tf.ConfigProto(allow_soft_placement=True , log_device_placement=True ) config.gpu_options.allow_growth = True sess._config = config (train_step, cross_entropy, bottleneck_input, ground_truth_input, final_tensor) = add_final_training_ops(len(image_lists.keys()), "final_result" , bottleneck_tensor) evaluation_step = add_evaluation_step(final_tensor, ground_truth_input) tf.global_variables_initializer().run() for i in range(STEPS): train_bottlenecks, train_ground_truth = get_random_cached_bottlenecks( sess, image_lists, BATCH, 'training' , jpeg_data_tensor, bottleneck_tensor) sess.run(train_step, feed_dict={bottleneck_input: train_bottlenecks, ground_truth_input: train_ground_truth}) is_last_step = (i + 1 == STEPS) if is_last_step or (i % 100 ) == 0 : train_accuracy, cross_entropy_value = sess.run( [evaluation_step, cross_entropy], feed_dict={bottleneck_input: train_bottlenecks, ground_truth_input: train_ground_truth}) print('Step %d: Train accuracy = %.1f%%' % (i, train_accuracy * 100 )) validation_bottlenecks, validation_ground_truth = ( get_random_cached_bottlenecks( sess, image_lists, STEPS, 'validation' , jpeg_data_tensor, bottleneck_tensor)) validation_accuracy = sess.run(evaluation_step, feed_dict={bottleneck_input: validation_bottlenecks, ground_truth_input: validation_ground_truth}) print('Step %d: Validation accuracy = %.1f%%' % (i, validation_accuracy * 100 )) test_bottlenecks, test_ground_truth = get_random_cached_bottlenecks( sess, image_lists, BATCH, 'testing' , jpeg_data_tensor, bottleneck_tensor) test_accuracy = sess.run(evaluation_step, feed_dict={bottleneck_input: test_bottlenecks, ground_truth_input: test_ground_truth}) print('Final test accuracy = %.1f%%' % (test_accuracy * 100 ))
备注:
完整的代码保存在”/home/student/public/deep_learning/TensorFlow/class4“中。