本文共 9806 字,大约阅读时间需要 32 分钟。
@author:wepon
@blog:
本文介绍Softmax回归算法,特别是详细解读其代码实现,基于python theano,代码来自:,参考。
一、Softmax回归简介
关于算法的详细教程本文没必要多说,可以参考UFLDL。下面只简单地总结一下,以便更好地理解代码。 Softmax回归其实就相当于多类别情况下的逻辑回归,对比如下: 整个逻辑回归模型的参数就是theta,h(*)是sigmoid函数,输出在0~1之间,一般作为二分类算法。对于具体的问题,找出最合适的theta便是最重要的步骤,这是最优化问题,一般通过定义代价函数,然后最小化代价函数来求解,逻辑回归的代价函数为: 最小化J(theta),一般采用梯度下降算法,迭代计算梯度并更新theta。 逻辑回归里将-theta*x作为sigmoid函数的输入,得到的是0或者1,两个类别。而softmax有有k个类别,并且将-theta*x作为指数的系数,所以就有e^(-theta_1*x)至e^( -theta_k*x)共k项,然后除以它们的累加和,这样做就实现了归一化,使得输出的k个数的和为1,而每一个数就代表那个类别出现的概率。因此:softmax的假设函数输出的是一个k维列向量,每一个维度的数就代表那个类别出现的概率。 本质上跟逻辑回归是一样的,采用NLL,如果加上权重衰减项(正则化项),则为: 最小化代价函数,同样可以采用简单而有效的梯度下降,需要提到的是,在程序实现中,我们一般采用批量随机梯度下降,即MSGD,minibatch Stochastic Gradient Descent,简单来说,就是每遍历完一个batch的样本才计算梯度和更新参数,一个batch一般有几十到几百的单个样本。PS:随机梯度下降则是一个样本更新一次。 二、Softmax代码详细解读
首先说明一点,下面的程序采用的是MSGD算法,代价函数是不带权重衰减项的,整个程序实现用Softmax回归来classfy MINST数据集(识别手写数字0~9)。代码解读是个人理解,仅供参考,不一定正确,如有错误请不吝指出。
原始代码和经过我注释的代码:
参数说明:上面第一部分我们的参数用theta表示,在下面的程序中,用的是W,权重,这两者是一样的。还有一点需要注意,上面的假设函数中是-theta*x,而在程序中,用的是W*X+b,本质也是一样的,因为可以将b看成W0,增加一个x0=1,则W*X+b=WX=-theta*x。
(1)导入一些必要的模块
- import cPickle
- import gzip
- import os
- import sys
- import time
-
- import numpy
-
- import theano
- import theano.tensor as T
(2)定义Softmax回归模型
在deeplearning tutorial中,直接将LogisticRegression视为Softmax,而我们所认识的二类别的逻辑回归就是当n_out=2时的LogisticRegression,因此下面代码定义的LogisticRegression就是Softmax。 -
-
-
-
- class LogisticRegression(object):
- def __init__(self, input, n_in, n_out):
-
-
-
- self.W = theano.shared(
- value=numpy.zeros(
- (n_in, n_out),
- dtype=theano.config.floatX
- ),
- name='W',
- borrow=True
- )
-
- self.b = theano.shared(
- value=numpy.zeros(
- (n_out,),
- dtype=theano.config.floatX
- ),
- name='b',
- borrow=True
- )
-
-
-
-
-
-
- self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
-
-
- self.y_pred = T.argmax(self.p_y_given_x, axis=1)
-
-
- self.params = [self.W, self.b]
-
-
-
-
-
-
- def negative_log_likelihood(self, y):
- return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
-
-
- def errors(self, y):
-
- if y.ndim != self.y_pred.ndim:
- raise TypeError(
- 'y should have the same shape as self.y_pred',
- ('y', y.type, 'y_pred', self.y_pred.type)
- )
-
-
-
-
- if y.dtype.startswith('int'):
- return T.mean(T.neq(self.y_pred, y))
- else:
- raise NotImplementedError()
上面已经定义好了softmax模型,包括输入的batch :input,每个样本的大小n_in,输出的类别n_out,模型的参数W、b,模型预测的输出y_pred,代价函数NLL,以及误差率errors。
(3)加载MNIST数据集
- def load_data(dataset):
-
-
- data_dir, data_file = os.path.split(dataset)
- if data_dir == "" and not os.path.isfile(dataset):
-
- new_path = os.path.join(
- os.path.split(__file__)[0],
- "..",
- "data",
- dataset
- )
- if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz':
- dataset = new_path
-
- if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
- import urllib
- origin = (
- 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
- )
- print 'Downloading data from %s' % origin
- urllib.urlretrieve(origin, dataset)
-
- print '... loading data'
-
-
-
-
-
- f = gzip.open(dataset, 'rb')
- train_set, valid_set, test_set = cPickle.load(f)
- f.close()
-
-
-
-
- def shared_dataset(data_xy, borrow=True):
- data_x, data_y = data_xy
- shared_x = theano.shared(numpy.asarray(data_x,
- dtype=theano.config.floatX),
- borrow=borrow)
- shared_y = theano.shared(numpy.asarray(data_y,
- dtype=theano.config.floatX),
- borrow=borrow)
- return shared_x, T.cast(shared_y, 'int32')
-
-
- test_set_x, test_set_y = shared_dataset(test_set)
- valid_set_x, valid_set_y = shared_dataset(valid_set)
- train_set_x, train_set_y = shared_dataset(train_set)
-
- rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
- (test_set_x, test_set_y)]
- return rval
(4)将模型应用于MNIST数据集
- def sgd_optimization_mnist(learning_rate=0.13, n_epochs=1000,
- dataset='mnist.pkl.gz',
- batch_size=600):
-
- datasets = load_data(dataset)
- train_set_x, train_set_y = datasets[0]
- valid_set_x, valid_set_y = datasets[1]
- test_set_x, test_set_y = datasets[2]
-
- n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
- n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
- n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size
-
-
-
-
- print '... building the model'
-
-
-
- index = T.lscalar()
- x = T.matrix('x')
- y = T.ivector('y')
-
-
-
- classifier = LogisticRegression(input=x, n_in=28 * 28, n_out=10)
-
-
-
-
- cost = classifier.negative_log_likelihood(y)
-
-
-
-
-
-
-
-
- test_model = theano.function(
- inputs=[index],
- outputs=classifier.errors(y),
- givens={
- x: test_set_x[index * batch_size: (index + 1) * batch_size],
- y: test_set_y[index * batch_size: (index + 1) * batch_size]
- }
- )
-
-
- validate_model = theano.function(
- inputs=[index],
- outputs=classifier.errors(y),
- givens={
- x: valid_set_x[index * batch_size: (index + 1) * batch_size],
- y: valid_set_y[index * batch_size: (index + 1) * batch_size]
- }
-
-
- g_W = T.grad(cost=cost, wrt=classifier.W)
- g_b = T.grad(cost=cost, wrt=classifier.b)
-
-
- updates = [(classifier.W, classifier.W - learning_rate * g_W),
- (classifier.b, classifier.b - learning_rate * g_b)]
-
-
- train_model = theano.function(
- inputs=[index],
- outputs=cost,
- updates=updates,
- givens={
- x: train_set_x[index * batch_size: (index + 1) * batch_size],
- y: train_set_y[index * batch_size: (index + 1) * batch_size]
- }
- )
-
-
-
-
- print '... training the model'
-
- patience = 5000
- patience_increase = 2
-
- improvement_threshold = 0.995
-
- validation_frequency = min(n_train_batches, patience / 2)
-
-
- best_validation_loss = numpy.inf
- test_score = 0.
- start_time = time.clock()
-
- done_looping = False
- epoch = 0
-
-
-
-
-
-
-
-
-
- while (epoch < n_epochs) and (not done_looping):
- epoch = epoch + 1
- for minibatch_index in xrange(n_train_batches):
-
- minibatch_avg_cost = train_model(minibatch_index)
-
- iter = (epoch - 1) * n_train_batches + minibatch_index
-
- if (iter + 1) % validation_frequency == 0:
-
- validation_losses = [validate_model(i)
- for i in xrange(n_valid_batches)]
- this_validation_loss = numpy.mean(validation_losses)
-
- print(
- 'epoch %i, minibatch %i/%i, validation error %f %%' %
- (
- epoch,
- minibatch_index + 1,
- n_train_batches,
- this_validation_loss * 100.
- )
- )
-
-
- if this_validation_loss < best_validation_loss:
-
- if this_validation_loss < best_validation_loss * \
- improvement_threshold:
- patience = max(patience, iter * patience_increase)
-
- best_validation_loss = this_validation_loss
-
-
- test_losses = [test_model(i)
- for i in xrange(n_test_batches)]
- test_score = numpy.mean(test_losses)
-
- print(
- (
- ' epoch %i, minibatch %i/%i, test error of'
- ' best model %f %%'
- ) %
- (
- epoch,
- minibatch_index + 1,
- n_train_batches,
- test_score * 100.
- )
- )
-
- if patience <= iter:
- done_looping = True
- break
-
-
- end_time = time.clock()
- print(
- (
- 'Optimization complete with best validation score of %f %%,'
- 'with test performance %f %%'
- )
- % (best_validation_loss * 100., test_score * 100.)
- )
- print 'The code run for %d epochs, with %f epochs/sec' % (
- epoch, 1. * epoch / (end_time - start_time))
- print >> sys.stderr, ('The code for file ' +
- os.path.split(__file__)[1] +
- ' ran for %.1fs' % ((end_time - start_time)))