最近想用tensorflow做点实际的东西,最后选取了12306的验证码的识别。网上有很多这种实战,但都是识别26个字母或者10个数字的小项目。当然,在csdn上也有一位博主说他用简单的cnn将12306的验证码识别准确率提升到了95%;相关网络结构和代码没有公布,在他的回答中,都说的是最简单的cnn处理的验证码和汉字。于是,我有了自己实践的念头。
数据:
一开始我是自己做的爬虫,爬取了大约10000张验证码。当时心里美滋滋的,但是随后的标记工作苦不堪言,我标记了大约100张就放弃了这个工作。在网上查的方法有人工打码,当然,这是要钱的,100张1元左右,让我心生退意。我不得已疯狂浏览相关博文,,,,最后终于下载了一万一千多张验证码和相应标签。
处理:
在阅读csdn大神博主的博文后,我想当然的以为最简单的cnn必然有不错的效果,但是我尝试了很多次,修改了很多的网络结构,均以失败告终。正确率都在10%到20%左右。
我开始借鉴一些成熟的网络结构,首先是lenet5,一开始我没有改变图片的大小,自己先用一层卷积将输出大小变为lenet的输入大小,但是没有效果。不得已,我将图片直接转为32*32的大小,最后迭代20多万次,才有50%的正确率(100个分类)。这也是我最后的结果了。
我当然感到不满意,尝试使用alenet的网络结构,但是第一步就把我难住了,alenet的输入是227*227的,而我下载的数据才66*66,直接将图片变大肯定是不行的,我直接放弃了这个网络结构。(后来其它的分类项目中,证实alenet结构不是我这个垃圾PC能运行的)。所以,最后的结果也就50%多一点。
我不知道csdn大神是的网络结构是怎样的,只能说,他们真的很nb,
唉,我还是个入门小白啊,,,,
学到的东西吧:
1.自己的网络结构大多数是不成熟的,往往得不到很好的效果,这时候直接借鉴是不错的选择。
2.学习率衰减的问题。一开始我设置的基础学习率为0.0001,衰减率0.5,300轮更新一次,训练了几千轮后loss一直摆动,正确率也特别低。一直找不到原因,后来发现tensorflow中学习率最多就10负6次,然后就是0了,在之后的训练中,根本没有学习。
3.batch_size的大小设置。这个我一直不知道怎么设置合适的值,一开始没注意,一直是固定的。后来在找原因的时候,一个个查找相关参数的影响。网上说的是太小不好,太大也不好,只能一个个地试。然而还是硬件的问题,自己的电脑让我没心情去试验到底哪个数值比较好。
4.修改已知的网络结构。经典的网络结构往往是不适合自己的数据的,怎么修改成适合自己的成了最大的难题。在北京大学曹健老师的课件里也对lenet网络结构进行了修改以适应mnist的数据。但只是给出了修改的网络结构,没有具体的分析,我捯饬了很久也没找出其中的规律。我一开始自己修改的lenet结构在验证码数据的效果不是很好。我在想,经典的网络结构该怎么修改,这一定是有经验可循的;或者说,这本身就是一个金标准,不可动摇。
generate_tfrecord
import tensorflow as tf
from PIL import Image
import os
tfRecord_train = "./data/train1.tfrecords"
image_train_path = "./data/data/"
label_train_path = "./data/label.txt"
file_dir = "C:/Users/Lenovo/Desktop/python/pachong_test/trian_12306/data/captcha"
label_names = os.listdir(file_dir)
# 生成tfrecords文件
def write_tfRecord(tfRecordName, image_path, label_path):
# 新建一个writer
writer = tf.python_io.TFRecordWriter(tfRecordName)
num_pic = 0
f = open(label_path, 'r')
contents = f.readlines()
f.close()
# 循环遍历每张图和标签
for content in contents:
value = content.split()
img_path = image_path + value[0]
# img = Image.open(img_path).convert("L")
img = Image.open(img_path)
img_raw = img.tobytes()
labels = [0] * 100
labels[label_names.index(value[1])] = 1
# 把每张图片和标签封装到example中
example = tf.train.Example(features=tf.train.Features(feature={
'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])),
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=labels))
}))
# 把example进行序列化
writer.write(example.SerializeToString())
num_pic += 1
print("the number of picture:", num_pic)
# 关闭writer
writer.close()
print("write tfrecord successful")
def generate_tfRecord(data_path):
isExists = os.path.exists(data_path)
if not isExists:
os.makedirs(data_path)
print('The directory was created successfully')
else:
print('directory already exists')
write_tfRecord(tfRecord_train, image_train_path, label_train_path)
# 解析tfrecords文件
def read_tfRecord(tfRecord_path):
# 该函数会生成一个先入先出的队列,文件阅读器会使用它来读取数据
filename_queue = tf.train.string_input_producer([tfRecord_path], shuffle=True)
# 新建一个reader
reader = tf.TFRecordReader()
# 把读出的每个样本保存在serialized_example中进行解序列化,标签和图片的键名应该和制作tfrecords的键名相同,其中标签给出几分类。
_, serialized_example = reader.read(filename_queue)
# 将tf.train.Example协议内存块(protocol buffer)解析为张量
features = tf.parse_single_example(serialized_example,
features={
'label': tf.FixedLenFeature([100], tf.int64),
'img_raw': tf.FixedLenFeature([], tf.string)
})
# 将img_raw字符串转换为8位无符号整型
img = tf.decode_raw(features['img_raw'], tf.uint8)
# 将形状变为一行列
img.set_shape([1024 * 3])
img = tf.cast(img, tf.float32) * (1. / 255)
# 变成0到1之间的浮点数
label = tf.cast(features['label'], tf.float32)
# 返回图片和标签
return img, label
def get_tfrecord(num):
tfRecord_path = tfRecord_train
img, label = read_tfRecord(tfRecord_path)
print(img)
# 随机读取一个batch的数据
img_batch, label_batch = tf.train.shuffle_batch([img, label],
batch_size=num,
num_threads=2,
capacity=1000,
min_after_dequeue=500)
# 返回的图片和标签为随机抽取的batch_size组
return img_batch, label_batch
def create_label(file_dir):
labels = os.listdir(file_dir)
with open("./data/label.txt", "w") as f:
for label in labels:
files = os.listdir(file_dir + "/" + label)
for file in files:
img = Image.open(file_dir + "/" + label + "/" + file)
img = img.resize((32, 32))
img.save("C:/Users/Lenovo/Desktop/python/pachong_test/trian_12306/data/data/" + file)
f.write(file + " " + label + "\n")
# 生成label文件和合并图片在同一个文件夹下
# create_label(file_dir)
# 生成tfrecord文件
# generate_tfRecord("./data/")
forward and backward
import os
import numpy as np
import tensorflow as tf
from trian_12306 import generate_tfrecord
from tensorflow.examples.tutorials.mnist import input_data
OUTPUT_NODE = 100
IMAGE_SIZE = 32
NUM_CHANNELS = 3
CONV1_SIZE = 5
CONV1_KERNEL_NUM = 16
CONV2_SIZE = 5
CONV2_KERNEL_NUM = 16
FC_SIZE = 1000
REGULARIZER = 0.0001
LEARNING_RATE_BASE = 0.001
BATCH_SIZE = 100
LEARNING_RATE_DECAY = 0.99
MOVING_AVERAGE_DECAY = 0.99
MODEL_SAVE_PATH = "./model2/"
STEPS = 100000
MODEL_NAME = "DNN"
def get_weight(shape, regularizer): # 获取参数w
w = tf.Variable(tf.truncated_normal(shape, stddev=0.1)) # w随机生成
if regularizer is not None:
# 如果regularizer不为空,则正则化
tf.add_to_collection("losses", tf.contrib.layers.l2_regularizer(regularizer)(w))
return w
def get_bias(shape): # 获取参数b
b = tf.Variable(tf.zeros(shape)) # 初始全为0
return b
def conv2d(x, w, isPad="SAME"):
if isPad == "SAME":
return tf.nn.conv2d(x, w, strides=[1, 1, 1, 1], padding='SAME')
else:
return tf.nn.conv2d(x, w, strides=[1, 1, 1, 1], padding='VALID')
def conv2d1(x, w, isPad="SAME"):
if isPad == "SAME":
return tf.nn.conv2d(x, w, strides=[1, 2, 2, 1], padding='SAME')
else:
return tf.nn.conv2d(x, w, strides=[1, 2, 2, 1], padding='VALID')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
def forward(x, train, regularizer):
conv1_w = get_weight([CONV1_SIZE, CONV1_SIZE, NUM_CHANNELS, CONV1_KERNEL_NUM], regularizer) #
conv1_b = get_bias([CONV1_KERNEL_NUM])
conv1 = conv2d(x, conv1_w, isPad="VALID") #
relu1 = tf.nn.relu(tf.nn.bias_add(conv1, conv1_b)) #
pool1 = max_pool_2x2(relu1) #
conv2_w = get_weight([CONV2_SIZE, CONV2_SIZE, CONV1_KERNEL_NUM, CONV2_KERNEL_NUM], regularizer) #
conv2_b = get_bias([CONV2_KERNEL_NUM])
conv2 = conv2d(pool1, conv2_w, isPad="VALID") # 该层的输入就是上一层的输出 pool1
relu2 = tf.nn.relu(tf.nn.bias_add(conv2, conv2_b))
pool2 = max_pool_2x2(relu2)
pool_shape = pool2.get_shape().as_list()
nodes = pool_shape[1] * pool_shape[2] * pool_shape[3]
reshaped = tf.reshape(pool2, [pool_shape[0], nodes])
fc1_w = get_weight([nodes, FC_SIZE], regularizer) # 初始化全连接层的权重,并加入正则化
fc1_b = get_bias([FC_SIZE]) #
fc1 = tf.nn.relu(tf.matmul(reshaped, fc1_w) + fc1_b)
#if train:
#pass
#fc1 = tf.nn.dropout(fc1, 0.5)
fc2_w = get_weight([FC_SIZE, OUTPUT_NODE], regularizer)
fc2_b = get_bias([OUTPUT_NODE])
y = tf.matmul(fc1, fc2_w) + fc2_b
return y
def backward():
x = tf.placeholder(tf.float32, [BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS])
y_ = tf.placeholder(tf.float32, [None, OUTPUT_NODE])
y = forward(x, True, REGULARIZER)
global_step = tf.Variable(0, trainable=False)
ce = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
cem = tf.reduce_mean(ce)
loss = cem + tf.add_n(tf.get_collection('losses'))
learning_rate = tf.train.exponential_decay(
LEARNING_RATE_BASE,
global_step,
500,
LEARNING_RATE_DECAY,
staircase=True)
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)
ema = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
ema_op = ema.apply(tf.trainable_variables())
with tf.control_dependencies([train_step, ema_op]):
train_op = tf.no_op(name='train')
saver = tf.train.Saver()
img_batch, label_batch = generate_tfrecord.get_tfrecord(BATCH_SIZE)
with tf.Session() as sess: # 创建一个会话,并通过 python 中的上下文管理器来管理这个会话
init_op = tf.global_variables_initializer() # 初始化计算图中的变量
sess.run(init_op)
ckpt = tf.train.get_checkpoint_state(MODEL_SAVE_PATH)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
for i in range(STEPS):
# xs, ys = mnist.train.next_batch(BATCH_SIZE)
xs, ys = sess.run([img_batch, label_batch]) # 读取一个 batch 的数据
reshaped_xs = np.reshape(xs, (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, NUM_CHANNELS))
sess.run(train_op, feed_dict={x: reshaped_xs, y_: ys})
if i % 100 == 0: #
loss_value, step, learning_out = sess.run([loss, global_step, learning_rate],
feed_dict={x: reshaped_xs, y_: ys})
print("After %d training step(s), loss on training batch is %g. learning rate:%20f" % (step, loss_value, learning_out))
saver.save(sess, os.path.join(MODEL_SAVE_PATH, MODEL_NAME), global_step=global_step)
coord.request_stop()
coord.join(threads)
if __name__ == "__main__":
backward()
Twitter
Facebook
Reddit
LinkedIn
StumbleUpon
Pinterest
Email