目录
一.线性模型及回归:
1.一维数据线性模型:
2.多维数据:
二.对数线性回归:
三.Logistic回归:
1.极大似然估计:
2.梯度下降:
三.本次实验数据集介绍:
1.数据集信息介绍:
2.采用算法:
3.训练数据:(其中第二栏的性别男性为1,女性为 0)
4.代码和代码实现:
一.线性模型及回归:
线性模型的一般形式: 其中 x =( x 1 , x 2 , ..., x d ) 是由 d 维属性描述的样本,其中 x i 是 x 在第 i 个属性上的取值。 向量形式可记为:1.一维数据线性模型:
给定数据集:
一维数据线性模型最终需要得到:
这样可以准确的预测出该线性模型在xi处的值;
即我们需要将误差尽可能的减少;
下面我们将用到最小二乘法,即利用均方误差最小化来求解参数
该函数已被证明为凸函数,由于凸函数的性质我们可以通过求取极值点来求取极小值
分别对这两个参数求偏导数,并且令这两个偏导数为0;
最终得到w和b的值:
通过上述公式,我们可以通过已知的数值算出最终一维数据线性模型的w和b的值。
2.多维数据:
多维数据集:
我们想通过:
在多维数据集中,我们可以将其表示为向量模式:
同样通过最小二乘法,求取最小误差:
二.对数线性回归:
通过上面的线性模型回归,我们知道可以通过
来得到接近真实数据的预测值,那么对于非线性的函数我们是怎样进行预测的呢?
我们可以将线性回归推广为:
如下图,其中g是单调可微函数:
三.Logistic回归:
logistic回归就是一种广义线性回归:
单调可微、任意阶可导;(Sigmoid 函数)
为了实现Logistic回归分类器,我们可以将每个属性乘上一个回归系数,再把所有结果之相加,将这个综合代入Sigmoid函数中,得到值域为[0,1]。
1.极大似然估计:
求解过程:
确定待求解的未知参数 ,如均值、方差或特定 分布函数的参数等 计算每个样本 X 1 , X 2 ,..., X n 的概率密度 f ( X i ) 样本作为正例的相对可能性的对数:所以有:
假定样本,则可根据样本的概率密度累乘构造似然函数:
通过似然函数最大化(求导为零),最终求解出未知参数.
2.梯度下降:
基本思想:沿着某函数的梯度方向探寻找到该函数的最大值。
梯度下降算法到达每个点后都会重新估计移动的方向。从 P0 开始,计算完该点的梯度,函数就根据梯度移动到下一点 P1。在 P1 点,梯度再次被重新计算,并沿着新的梯度方向移动到 P2 。如此循环迭代,直到满足停止条件。迭代过程中,梯度算子总是保证我们能选取到最佳的移动方向。梯度算子总是指向函数值下降最快的方向。
三.本次实验数据集介绍:
1.数据集信息介绍:
本次数据集是印度肝脏病人数据,该数据集包含10个变量,即年龄、性别、总胆红素、直接胆红素、总蛋白、白蛋白、A/G比值、SGPT、SGOT和碱性磷酸酶。
该数据集包含416份肝脏病人(1)记录和167份非肝脏病人(0)记录,数据集来自印度安得拉邦东北部。选择器是一个类标签,用于分组(不管是否有肝脏病人)。这套数据包括441份男性病历和142份女性病历。任何年龄超过89岁的病人都被列为“90岁”。数据来源于UCI Machine Learning Repository: Data Sets。
2.采用算法:
使用梯度上升找到最佳参数,通过梯度上升法找到最佳回归系数,也就是拟合出Logistic回归模型的最佳参数。
设数据集有100个样本点,每个点包含两个数值型特征:X1和X2。在此数据集上通过梯度上升算法找到最佳回归系数,也就是拟合出Logstic回归模型的最佳参数。
#Logistic回归梯度上升优化算法
def loadDataSet():
'''
函数说明:打开文件并逐行读取
:return: 数据集列表和类标签列表
'''
dataMat = []
labelMat = []
fr = open('./data/testSet.txt')
for line in fr.readlines():
lineArr = line.strip().split()
#每行的前两个值是X1和X2
dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])])
#对应的类标签
labelMat.append(int(lineArr[2]))
return dataMat,labelMat
#sigmoid()函数定义,x=0时,Sigmoid函数值为0.5;
#随着x的增大,Sigmoid函数值将逼近于1,随着x的减小,Sigmoid函数值将逼近于0
def sigmoid(inX):
return 1.0/(1+exp(-inX))
#Logistic回归梯度上升优化算法
'''
梯度上升法的伪代码:
每个回归系数初始化为1
重复R次:
计算整个数据集的梯度
使用alpha*gradient更新回归系数的向量
返回回归系数
dataMatIn:2维NumPy数组,每列分别代表每个不同的特征,每行则代表每个训练样本
classLabels:数据集类别标签向量
'''
def gradAscent(dataMatIn, classLabels):
#将输入数据转换成矩阵
dataMatrix = mat(dataMatIn)
#将类别标签矩阵进行转置
labelMat = mat(classLabels).transpose()
#获得dataMatrix矩阵大小
m,n = shape(dataMatrix)
#向目标移动的步长
alpha = 0.001
#迭代次数,for循环迭代完成后返回训练好的回归系数
maxCycles = 500
weights = ones((n,1))
for k in range(maxCycles):
#h为一个列向量,元素个数等于样本个数;[m,n]*[n,1] = [m,1],包含了m*n次乘积
h = sigmoid(dataMatrix*weights)
#真实值与预测值之差
error = (labelMat - h)
weights = weights + alpha * dataMatrix.transpose()* error
return weights
画出数据集和Logistic回归最佳拟合直线的函数:
#画出数据集和Logistic回归最佳拟合直线的函数(输入wei为矩阵)
def plotBestFit(weights):
# 矩阵.getA()是把矩阵变成数组array类型
#weights = wei.getA()
#得到数据集列表和类标签列表
dataMat,labelMat=loadDataSet()
print("dataMat:",dataMat)
#将由列表存储的数据集转换为array(数组)类型,方便后面数组访问 dataArr[i,j]
dataArr = array(dataMat)
print("dataArr:",dataArr)
#dataArr数据集的行数,即样本的个数
n = shape(dataArr)[0]
# 定义两个空数组,用来存放不同label的x1和x2值
xcord1 = []; ycord1 = []
xcord2 = []; ycord2 = []
for i in range(n):
if int(labelMat[i])== 1:
xcord1.append(dataArr[i,1]); ycord1.append(dataArr[i,2])
else:
xcord2.append(dataArr[i,1]); ycord2.append(dataArr[i,2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
ax.scatter(xcord2, ycord2, s=30, c='green')
# 画出最佳拟合直线
x = arange(-3.0, 3.0, 0.1)
# 设置sigmoid函数为0,0=W0X0+W1X1+W2X2,解出X1,X2的关系式得到分割线方程(为方便计算,X0=1)
y = (-weights[0]-weights[1]*x)/weights[2]
ax.plot(x, y)
plt.xlabel('X1'); plt.ylabel('X2');
plt.show()
拟合结果如下:
在上图中,分错了差不多两个点,算法实现分类比较好,但是梯度上升算法在每次更新回归系数时都要遍历整个数据集,当样本比较大时,方法的计算复杂度就太高了。
改进:随机梯度上升算法:一次仅用一个样本点更新回归系数。
#随机梯度上升算法
'''
伪代码:
所有回归系数初始化为1
对数据集中每个样本
计算该样本的梯度
使用alpha*gradient更新回归系数值
返回回归系数值
问题:存在每次迭代会引发系数的剧烈改变,效果不够好
'''
def stocGradAscent0(dataMatIn, classLabels):
#将输入数据转换为矩阵
dataMatrix = array(dataMatIn)
#得到矩阵大小
m,n = shape(dataMatrix)
alpha = 0.01
weights = ones(n) #创建n行1列的全为1的数组
for i in range(m):
h = sigmoid(sum(dataMatrix[i]*weights))
error = classLabels[i] - h
weights = weights + alpha * error * dataMatrix[i]
return weights
拟合结果:
该图,分类器错分了大概三分之一的样本, 拟合效果没有梯度上升算法完美。原因是在非随机梯度上升算法,整个数据集上迭代了500次才得到结果,随机梯度上升只是迭代了m(矩阵)行数,非随机梯度上升算法迭代次数要远大于随机梯度上升方法,而判断优化算法优劣的可靠方法:看它是否收敛,也就是说参数是否达到稳定值,是否不断地变化。
3.训练数据:(其中第二栏的性别男性为1,女性为 0)
测试数据集:
第一列数据为X1轴上的值,第二列数据为X2轴上的值。而最后一列数据即为分类标签,利用训练数据来训练逻辑回归分类器,进而用训练好的模型来预测新的样本,即测试数据集,比对正确率;
4.代码和代码实现:
Logistic回归分类函数:
#Logistic回归分类函数
def classifyVector(inX, weights):
prob = sigmoid(sum(inX*weights))
if prob > 0.5: return 1.0
else: return 0.0
Sigmoid函数:
#sigmoid()函数定义,x=0时,Sigmoid函数值为0.5;
#随着x的增大,Sigmoid函数值将逼近于1,随着x的减小,Sigmoid函数值将逼近于0
def sigmoid(inX):
if inX>=0: #对sigmoid函数的优化,避免了出现极大的数据溢出
return 1.0/(1+exp(-inX))
else: #这样做可以保证exp(inx)值始终小于1,避免极大溢出
return exp(inX) / (1 + exp(inX))
读入训练集和测试集,并用测试集:
#读取数据集和测试集
def colicTest():
trainingSet = []; trainingLabels = []
TrainData = pd.read_csv('Indian Liver Patient Dataset (ILPD).csv')
TrainData = TrainData.values.tolist()
TestData = pd.read_csv('test.csv')
TestData = TestData.values.tolist()
for i in range(len(TrainData)):
lineArr =[]
for j in range(10):
lineArr.append(TrainData[i][j])
trainingSet.append(lineArr)
trainingSet = array(trainingSet) #将列表转换成数组
trainingSet = trainingSet.astype(float).tolist() #将数组中元素转换成float
trainingLabels.append(TrainData[i][10]) #将数组重新转为列表
print(trainingSet)
trainWeights = stocGradAscent1(array(trainingSet), trainingLabels, 1000)
errorCount = 0; numTestVec = 0.0
for i in range(len(TestData)):
numTestVec += 1.0
lineArr =[]
for j in range(10):
lineArr.append(TestData[i][j])
if int(classifyVector(array(lineArr), trainWeights))!= int(TestData[i][10]):
errorCount += 1
errorRate = (float(errorCount)/numTestVec)
print("此测试的错误率为: %f" % errorRate)
return errorRate
测试次数调用函数:
#multiTest()调用colicTest()10次求结果的平均值
def multiTest():
numTests = 10; errorSum=0.0
for k in range(numTests):
resultCount = colicTest()
print("第%d次测试的错误率为: %f" % (k+1,resultCount))
errorSum += resultCount
print("在%d次迭代之后,平均错误率为: %f" % (numTests, errorSum/float(numTests)))
#主函数
import logRegres
# 按间距中的绿色按钮以运行脚本。
if __name__ == '__main__':
logRegres.multiTest()
运行结果:
无论使用哪种方法,错误率都为75%,可能是数据集选取有错;
下面重新选取了数据集:(UCI Machine Learning Repository: Heart failure clinical records Data Set)
新的数据集中包含299条心力衰竭临床记录数据;
全批量梯度上升算法测试结果:
随机梯度上升算法:
改进后随机梯度上升测试结果:
迭代1000次的结果:
通过上面的运行结果来看,未改进的随机梯度上升算法错误率较高,改进后的随机梯度上升算法很较为理想的,但是它的运行速度非常慢,这与赋给它的迭代次数和数据集相关。
整体代码:
from numpy import *
import matplotlib.pyplot as plt
import pandas as pd
# Logistic回归梯度上升优化算法
def loadDataSet():
'''
函数说明:打开文件并逐行读取
:return: 数据集列表和类标签列表
'''
dataMat = []
labelMat = []
fr = open('./data/testSet.txt')
for line in fr.readlines():
lineArr = line.strip().split()
# 每行的前两个值是X1和X2
dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])])
# 对应的类标签
labelMat.append(int(lineArr[2]))
return dataMat, labelMat
# 用于全批量随机梯度上升算法的Sigmoid函数,因为传入值是一个向量,不能比较大小
def gradAscentSigmoid(inX):
return 1.0 / (1 + exp(-inX))
# sigmoid()函数定义,x=0时,Sigmoid函数值为0.5;
# 随着x的增大,Sigmoid函数值将逼近于1,随着x的减小,Sigmoid函数值将逼近于0
def sigmoid(inX):
if inX >= 0: # 对sigmoid函数的优化,避免了出现极大的数据溢出
return 1.0 / (1 + exp(-inX))
else: # 这样做可以保证exp(inx)值始终小于1,避免极大溢出
return exp(inX) / (1 + exp(inX))
# 全批量梯度上升算法
'''
梯度上升法的伪代码:
每个回归系数初始化为1
重复R次:
计算整个数据集的梯度
使用alpha*gradient更新回归系数的向量
返回回归系数
dataMatIn:2维NumPy数组,每列分别代表每个不同的特征,每行则代表每个训练样本
classLabels:数据集类别标签向量
'''
def gradAscent(dataMatIn, classLabels):
# 将输入数据转换成矩阵
dataMatrix = mat(dataMatIn)
# 将类别标签矩阵进行转置
labelMat = mat(classLabels).transpose()
# 获得dataMatrix矩阵大小
m, n = shape(dataMatrix)
# 向目标移动的步长
alpha = 0.001
# 迭代次数,for循环迭代完成后返回训练好的回归系数
maxCycles = 500
weights = ones((n, 1))
for k in range(maxCycles):
# h为一个列向量,元素个数等于样本个数;[m,n]*[n,1] = [m,1],包含了m*n次乘积
h = gradAscentSigmoid(dataMatrix * weights)
# 真实值与预测值之差
error = (labelMat - h)
weights = weights + alpha * dataMatrix.transpose() * error
return weights
'''.......#分析数据:画出决策边界...... '''
# 画出数据集和Logistic回归最佳拟合直线的函数(输入wei为矩阵)
def plotBestFit(weights):
# 矩阵.getA()是把矩阵变成数组array类型
# weights = wei.getA()
# 得到数据集列表和类标签列表
dataMat, labelMat = loadDataSet()
print("dataMat:", dataMat)
# 将由列表存储的数据集转换为array(数组)类型,方便后面数组访问 dataArr[i,j]
dataArr = array(dataMat)
print("dataArr:", dataArr)
# dataArr数据集的行数,即样本的个数
n = shape(dataArr)[0]
# 定义两个空数组,用来存放不同label的x1和x2值
xcord1 = [];
ycord1 = []
xcord2 = [];
ycord2 = []
for i in range(n):
if int(labelMat[i]) == 1:
xcord1.append(dataArr[i, 1]);
ycord1.append(dataArr[i, 2])
else:
xcord2.append(dataArr[i, 1]);
ycord2.append(dataArr[i, 2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
ax.scatter(xcord2, ycord2, s=30, c='green')
# 画出最佳拟合直线
x = arange(-3.0, 3.0, 0.1)
# 设置sigmoid函数为0,0=W0X0+W1X1+W2X2,解出X1,X2的关系式得到分割线方程(为方便计算,X0=1)
y = (-weights[0] - weights[1] * x) / weights[2]
ax.plot(x, y)
plt.xlabel('X1');
plt.ylabel('X2');
plt.show()
# 随机梯度上升算法
'''
伪代码:
所有回归系数初始化为1
对数据集中每个样本
计算该样本的梯度
使用alpha*gradient更新回归系数值
返回回归系数值
问题:存在每次迭代会引发系数的剧烈改变,效果不够好
'''
def stocGradAscent0(dataMatIn, classLabels):
# 将输入数据转换为矩阵
dataMatrix = array(dataMatIn)
# 得到矩阵大小
m, n = shape(dataMatrix)
alpha = 0.01
weights = ones(n) # 创建n行1列的全为1的数组
for i in range(m):
h = sigmoid(sum(dataMatrix[i] * weights))
error = classLabels[i] - h
weights = weights + alpha * error * dataMatrix[i]
return weights
# 改进的随机梯度上升算法
def stocGradAscent1(dataMatIn, classLabels, numIter=150):
# 迭代次数若未给定,则默认迭代150次
dataMatrix = array(dataMatIn)
m, n = shape(dataMatrix)
# print("m=:",m)
weights = ones(n)
for j in range(numIter):
dataIndex = list(range(m))
for i in range(m):
# alpha每次迭代时需要调整
alpha = 4 / (1.0 + j + i) + 0.0001
# 随机选取样本更新回归系数
# uniform() 方法将随机生成下一个实数,它在[0,len(dataIndex)]范围内
randIndex = int(random.uniform(0, len(dataIndex)))
h = sigmoid(sum(dataMatrix[randIndex] * weights))
error = classLabels[randIndex] - h # 计算误差
weights = weights + alpha * error * dataMatrix[randIndex]
# 删除已使用的样本
del (dataIndex[randIndex])
return weights
# Logistic回归分类函数
def classifyVector(inX, weights):
prob = sigmoid(sum(inX * weights))
if prob > 0.5:
return 1.0
else:
return 0.0
# 读取数据集和测试集
def colicTest():
trainingSet = [];
trainingLabels = []
TrainData = pd.read_csv('heart_failure_clinical_records_dataset.csv')
TrainData = TrainData.values.tolist()
TestData = pd.read_csv('tttw.csv')
TestData = TestData.values.tolist()
for i in range(len(TrainData)):
lineArr = []
for j in range(12):
lineArr.append(TrainData[i][j])
trainingSet.append(lineArr)
trainingSet = array(trainingSet) # 将列表转换成数组
trainingSet = trainingSet.astype(float).tolist() # 将数组中元素转换成float
trainingLabels.append(TrainData[i][12]) # 将数组重新转为列表
# trainWeights = gradAscent(trainingSet, trainingLabels)
# trainWeights = stocGradAscent0(array(trainingSet), trainingLabels)
trainWeights = stocGradAscent1(array(trainingSet), trainingLabels, 500)
errorCount = 0;
numTestVec = 0.0
for i in range(len(TestData)):
numTestVec += 1.0
lineArr = []
for j in range(12):
lineArr.append(TestData[i][j])
if int(classifyVector(array(lineArr), trainWeights)) != int(TestData[i][12]):
errorCount += 1
errorRate = (float(errorCount) / numTestVec)
# print("此测试的错误率为: %f" % errorRate)
return errorRate
# multiTest()调用colicTest()10次求结果的平均值
def multiTest():
numTests = 10;
errorSum = 0.0
for k in range(numTests):
resultCount = colicTest()
print("第%d次测试的错误率为: %f" % (k + 1, resultCount))
errorSum += resultCount
print("在%d次测试之后,平均错误率为: %f" % (numTests, errorSum / float(numTests)))
更多推荐
机器学习 Logistic回归
发布评论