机器学习算法的Python实现 (三):CART决策树与剪枝处理
机器学习算法的Python实现 (3):CART决策树与剪枝处理
本文数据参照 机器学习-周志华 一书中的决策树一章。可作为此章课后习题4的答案
代码则参照《机器学习实战》一书的内容,并做了一些修改。
CART决策树 使用基尼指数(Gini Index)来选择划分属性。其公式如下:
本文内容包括未剪枝CART决策树、预剪枝CART决策树以及后剪枝决策树
本文使用的Python库包括
- numpy
- pandas
- math
- operator
- matplotlib
- copy
- re
本文使用的数据如下
Idx | color | root | knocks | texture | navel | touch | label |
1 | dark_green | curl_up | little_heavily | distinct | sinking | hard_smooth | 1 |
2 | black | curl_up | heavily | distinct | sinking | hard_smooth | 1 |
3 | black | curl_up | little_heavily | distinct | sinking | hard_smooth | 1 |
6 | dark_green | little_curl_up | little_heavily | distinct | little_sinking | soft_stick | 1 |
7 | black | little_curl_up | little_heavily | little_blur | little_sinking | soft_stick | 1 |
10 | dark_green | stiff | clear | distinct | even | soft_stick | 0 |
14 | light_white | little_curl_up | heavily | little_blur | sinking | hard_smooth | 0 |
15 | black | little_curl_up | little_heavily | distinct | little_sinking | soft_stick | 0 |
16 | light_white | curl_up | little_heavily | blur | even | hard_smooth | 0 |
17 | dark_green | curl_up | heavily | little_blur | little_sinking | hard_smooth | 0 |
4 | dark_green | curl_up | heavily | distinct | sinking | hard_smooth | 1 |
5 | light_white | curl_up | little_heavily | distinct | sinking | hard_smooth | 1 |
8 | black | little_curl_up | little_heavily | distinct | little_sinking | hard_smooth | 1 |
9 | black | little_curl_up | heavily | little_blur | little_sinking | hard_smooth | 0 |
11 | light_white | stiff | clear | blur | even | hard_smooth | 0 |
12 | light_white | curl_up | little_heavily | blur | even | soft_stick | 0 |
13 | dark_green | little_curl_up | little_heavily | little_blur | sinking | hard_smooth | 0 |
其中,前11个数据用作训练集(1,2,3,6,7,10,14,15,16,17,4)后6个数据用作测试集(5,8,9,11,12,13)
注:书上原来是将前10个数据作为训练,后7个数据测试。但是这样得到的决策树与书上例子不同。而如上调整后得到的结果相同,因此应该是书上的数据集划分与其实际使用的划分不同。为了起到对照作用,本文采取书上图例中决策树对应的划分
未剪枝决策树:
代码与ID3决策树相似,主要将信息增益换成了基尼指数的计算。
# -*- coding: utf-8 -*- from numpy import * import numpy as np import pandas as pd from math import log import operator #计算数据集的基尼指数 def calcGini(dataSet): numEntries=len(dataSet) labelCounts={} #给所有可能分类创建字典 for featVec in dataSet: currentLabel=featVec[-1] if currentLabel not in labelCounts.keys(): labelCounts[currentLabel]=0 labelCounts[currentLabel]+=1 Gini=1.0 #以2为底数计算香农熵 for key in labelCounts: prob = float(labelCounts[key])/numEntries Gini-=prob*prob return Gini #对离散变量划分数据集,取出该特征取值为value的所有样本 def splitDataSet(dataSet,axis,value): retDataSet=[] for featVec in dataSet: if featVec[axis]==value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) return retDataSet #对连续变量划分数据集,direction规定划分的方向, #决定是划分出小于value的数据样本还是大于value的数据样本集 def splitContinuousDataSet(dataSet,axis,value,direction): retDataSet=[] for featVec in dataSet: if direction==0: if featVec[axis]>value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) else: if featVec[axis]<=value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) return retDataSet #选择最好的数据集划分方式 def chooseBestFeatureToSplit(dataSet,labels): numFeatures=len(dataSet[0])-1 bestGiniIndex=100000.0 bestFeature=-1 bestSplitDict={} for i in range(numFeatures): featList=[example[i] for example in dataSet] #对连续型特征进行处理 if type(featList[0]).__name__=='float' or type(featList[0]).__name__=='int': #产生n-1个候选划分点 sortfeatList=sorted(featList) splitList=[] for j in range(len(sortfeatList)-1): splitList.append((sortfeatList[j]+sortfeatList[j+1])/2.0) bestSplitGini=10000 slen=len(splitList) #求用第j个候选划分点划分时,得到的信息熵,并记录最佳划分点 for j in range(slen): value=splitList[j] newGiniIndex=0.0 subDataSet0=splitContinuousDataSet(dataSet,i,value,0) subDataSet1=splitContinuousDataSet(dataSet,i,value,1) prob0=len(subDataSet0)/float(len(dataSet)) newGiniIndex+=prob0*calcGini(subDataSet0) prob1=len(subDataSet1)/float(len(dataSet)) newGiniIndex+=prob1*calcGini(subDataSet1) if newGiniIndex<bestSplitGini: bestSplitGini=newGiniIndex bestSplit=j #用字典记录当前特征的最佳划分点 bestSplitDict[labels[i]]=splitList[bestSplit] GiniIndex=bestSplitGini #对离散型特征进行处理 else: uniqueVals=set(featList) newGiniIndex=0.0 #计算该特征下每种划分的信息熵 for value in uniqueVals: subDataSet=splitDataSet(dataSet,i,value) prob=len(subDataSet)/float(len(dataSet)) newGiniIndex+=prob*calcGini(subDataSet) GiniIndex=newGiniIndex if GiniIndex<bestGiniIndex: bestGiniIndex=GiniIndex bestFeature=i #若当前节点的最佳划分特征为连续特征,则将其以之前记录的划分点为界进行二值化处理 #即是否小于等于bestSplitValue if type(dataSet[0][bestFeature]).__name__=='float' or type(dataSet[0][bestFeature]).__name__=='int': bestSplitValue=bestSplitDict[labels[bestFeature]] labels[bestFeature]=labels[bestFeature]+'<='+str(bestSplitValue) for i in range(shape(dataSet)[0]): if dataSet[i][bestFeature]<=bestSplitValue: dataSet[i][bestFeature]=1 else: dataSet[i][bestFeature]=0 return bestFeature #特征若已经划分完,节点下的样本还没有统一取值,则需要进行投票 def majorityCnt(classList): classCount={} for vote in classList: if vote not in classCount.keys(): classCount[vote]=0 classCount[vote]+=1 return max(classCount) #主程序,递归产生决策树 def createTree(dataSet,labels,data_full,labels_full): classList=[example[-1] for example in dataSet] if classList.count(classList[0])==len(classList): return classList[0] if len(dataSet[0])==1: return majorityCnt(classList) bestFeat=chooseBestFeatureToSplit(dataSet,labels) bestFeatLabel=labels[bestFeat] myTree={bestFeatLabel:{}} featValues=[example[bestFeat] for example in dataSet] uniqueVals=set(featValues) if type(dataSet[0][bestFeat]).__name__=='str': currentlabel=labels_full.index(labels[bestFeat]) featValuesFull=[example[currentlabel] for example in data_full] uniqueValsFull=set(featValuesFull) del(labels[bestFeat]) #针对bestFeat的每个取值,划分出一个子树。 for value in uniqueVals: subLabels=labels[:] if type(dataSet[0][bestFeat]).__name__=='str': uniqueValsFull.remove(value) myTree[bestFeatLabel][value]=createTree(splitDataSet\ (dataSet,bestFeat,value),subLabels,data_full,labels_full) if type(dataSet[0][bestFeat]).__name__=='str': for value in uniqueValsFull: myTree[bestFeatLabel][value]=majorityCnt(classList) return myTree df=pd.read_csv('watermelon_4_2.csv') data=df.values[:11,1:].tolist() data_full=data[:] labels=df.columns.values[1:-1].tolist() labels_full=labels[:] myTree=createTree(data,labels,data_full,labels_full) import plotTree plotTree.createPlot(myTree)
plotTree的程序见我之前的ID3决策树博文:
ID3决策树
得到的决策树结果为:
与书上的图4.5一致
接下来进行剪枝操作
预剪枝决策树:
预剪枝是在决策树生成过程中,在划分节点时,若该节点的划分没有提高其在训练集上的准确率,则不进行划分。
代码如下:
# -*- coding: utf-8 -*- from numpy import * import numpy as np import pandas as pd from math import log import operator import copy import re #计算数据集的基尼指数 def calcGini(dataSet): numEntries=len(dataSet) labelCounts={} #给所有可能分类创建字典 for featVec in dataSet: currentLabel=featVec[-1] if currentLabel not in labelCounts.keys(): labelCounts[currentLabel]=0 labelCounts[currentLabel]+=1 Gini=1.0 #以2为底数计算香农熵 for key in labelCounts: prob = float(labelCounts[key])/numEntries Gini-=prob*prob return Gini #对离散变量划分数据集,取出该特征取值为value的所有样本 def splitDataSet(dataSet,axis,value): retDataSet=[] for featVec in dataSet: if featVec[axis]==value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) return retDataSet #对连续变量划分数据集,direction规定划分的方向, #决定是划分出小于value的数据样本还是大于value的数据样本集 def splitContinuousDataSet(dataSet,axis,value,direction): retDataSet=[] for featVec in dataSet: if direction==0: if featVec[axis]>value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) else: if featVec[axis]<=value: reducedFeatVec=featVec[:axis] reducedFeatVec.extend(featVec[axis+1:]) retDataSet.append(reducedFeatVec) return retDataSet #选择最好的数据集划分方式 def chooseBestFeatureToSplit(dataSet,labels): numFeatures=len(dataSet[0])-1 bestGiniIndex=100000.0 bestFeature=-1 bestSplitDict={} for i in range(numFeatures): featList=[example[i] for example in dataSet] #对连续型特征进行处理 if type(featList[0]).__name__=='float' or type(featList[0]).__name__=='int': #产生n-1个候选划分点 sortfeatList=sorted(featList) splitList=[] for j in range(len(sortfeatList)-1): splitList.append((sortfeatList[j]+sortfeatList[j+1])/2.0) bestSplitGini=10000 slen=len(splitList) #求用第j个候选划分点划分时,得到的信息熵,并记录最佳划分点 for j in range(slen): value=splitList[j] newGiniIndex=0.0 subDataSet0=splitContinuousDataSet(dataSet,i,value,0) subDataSet1=splitContinuousDataSet(dataSet,i,value,1) prob0=len(subDataSet0)/float(len(dataSet)) newGiniIndex+=prob0*calcGini(subDataSet0) prob1=len(subDataSet1)/float(len(dataSet)) newGiniIndex+=prob1*calcGini(subDataSet1) if newGiniIndex<bestSplitGini: bestSplitGini=newGiniIndex bestSplit=j #用字典记录当前特征的最佳划分点 bestSplitDict[labels[i]]=splitList[bestSplit] GiniIndex=bestSplitGini #对离散型特征进行处理 else: uniqueVals=set(featList) newGiniIndex=0.0 #计算该特征下每种划分的信息熵 for value in uniqueVals: subDataSet=splitDataSet(dataSet,i,value) prob=len(subDataSet)/float(len(dataSet)) newGiniIndex+=prob*calcGini(subDataSet) GiniIndex=newGiniIndex if GiniIndex<bestGiniIndex: bestGiniIndex=GiniIndex bestFeature=i #若当前节点的最佳划分特征为连续特征,则将其以之前记录的划分点为界进行二值化处理 #即是否小于等于bestSplitValue #并将特征名改为 name<=value的格式 if type(dataSet[0][bestFeature]).__name__=='float' or type(dataSet[0][bestFeature]).__name__=='int': bestSplitValue=bestSplitDict[labels[bestFeature]] labels[bestFeature]=labels[bestFeature]+'<='+str(bestSplitValue) for i in range(shape(dataSet)[0]): if dataSet[i][bestFeature]<=bestSplitValue: dataSet[i][bestFeature]=1 else: dataSet[i][bestFeature]=0 return bestFeature #特征若已经划分完,节点下的样本还没有统一取值,则需要进行投票 def majorityCnt(classList): classCount={} for vote in classList: if vote not in classCount.keys(): classCount[vote]=0 classCount[vote]+=1 return max(classCount) #由于在Tree中,连续值特征的名称以及改为了 feature<=value的形式 #因此对于这类特征,需要利用正则表达式进行分割,获得特征名以及分割阈值 def classify(inputTree,featLabels,testVec): firstStr=inputTree.keys()[0] if '<=' in firstStr: featvalue=float(re.compile("(<=.+)").search(firstStr).group()[2:]) featkey=re.compile("(.+<=)").search(firstStr).group()[:-2] secondDict=inputTree[firstStr] featIndex=featLabels.index(featkey) if testVec[featIndex]<=featvalue: judge=1 else: judge=0 for key in secondDict.keys(): if judge==int(key): if type(secondDict[key]).__name__=='dict': classLabel=classify(secondDict[key],featLabels,testVec) else: classLabel=secondDict[key] else: secondDict=inputTree[firstStr] featIndex=featLabels.index(firstStr) for key in secondDict.keys(): if testVec[featIndex]==key: if type(secondDict[key]).__name__=='dict': classLabel=classify(secondDict[key],featLabels,testVec) else: classLabel=secondDict[key] return classLabel def testing(myTree,data_test,labels): error=0.0 for i in range(len(data_test)): if classify(myTree,labels,data_test[i])!=data_test[i][-1]: error+=1 print 'myTree %d' %error return float(error) def testingMajor(major,data_test): error=0.0 for i in range(len(data_test)): if major!=data_test[i][-1]: error+=1 print 'major %d' %error return float(error) #主程序,递归产生决策树 def createTree(dataSet,labels,data_full,labels_full,data_test): classList=[example[-1] for example in dataSet] if classList.count(classList[0])==len(classList): return classList[0] if len(dataSet[0])==1: return majorityCnt(classList) temp_labels=copy.deepcopy(labels) bestFeat=chooseBestFeatureToSplit(dataSet,labels) bestFeatLabel=labels[bestFeat] myTree={bestFeatLabel:{}} if type(dataSet[0][bestFeat]).__name__=='str': currentlabel=labels_full.index(labels[bestFeat]) featValuesFull=[example[currentlabel] for example in data_full] uniqueValsFull=set(featValuesFull) featValues=[example[bestFeat] for example in dataSet] uniqueVals=set(featValues) del(labels[bestFeat]) #针对bestFeat的每个取值,划分出一个子树。 for value in uniqueVals: subLabels=labels[:] if type(dataSet[0][bestFeat]).__name__=='str': uniqueValsFull.remove(value) myTree[bestFeatLabel][value]=createTree(splitDataSet\ (dataSet,bestFeat,value),subLabels,data_full,labels_full,\ splitDataSet(data_test,bestFeat,value)) if type(dataSet[0][bestFeat]).__name__=='str': for value in uniqueValsFull: myTree[bestFeatLabel][value]=majorityCnt(classList) #进行测试,若划分没有提高准确率,则不进行划分,返回该节点的投票值作为节点类别 if testing(myTree,data_test,temp_labels)<testingMajor(majorityCnt(classList),data_test): return myTree return majorityCnt(classList) df=pd.read_csv('watermelon_4_2.csv') data=df.values[:11,1:].tolist() data_full=data[:] data_test=df.values[11:,1:].tolist() labels=df.columns.values[1:-1].tolist() labels_full=labels[:] myTree=createTree(data,labels,data_full,labels_full,data_test) import plotTree plotTree.createPlot(myTree)
得到的结果如图所示:
与书上的图4.6一致
后剪枝决策树:
后剪枝决策树先生成一棵完整的决策树,再从底往顶进行剪枝处理。在以下代码中,使用的是深度优先搜索。
决策树生成部分与未剪枝CART决策树相同,在其代码最后附上以下后剪枝代码即可:
#由于在Tree中,连续值特征的名称以及改为了 feature<=value的形式 #因此对于这类特征,需要利用正则表达式进行分割,获得特征名以及分割阈值 def classify(inputTree,featLabels,testVec): firstStr=inputTree.keys()[0] if '<=' in firstStr: featvalue=float(re.compile("(<=.+)").search(firstStr).group()[2:]) featkey=re.compile("(.+<=)").search(firstStr).group()[:-2] secondDict=inputTree[firstStr] featIndex=featLabels.index(featkey) if testVec[featIndex]<=featvalue: judge=1 else: judge=0 for key in secondDict.keys(): if judge==int(key): if type(secondDict[key]).__name__=='dict': classLabel=classify(secondDict[key],featLabels,testVec) else: classLabel=secondDict[key] else: secondDict=inputTree[firstStr] featIndex=featLabels.index(firstStr) for key in secondDict.keys(): if testVec[featIndex]==key: if type(secondDict[key]).__name__=='dict': classLabel=classify(secondDict[key],featLabels,testVec) else: classLabel=secondDict[key] return classLabel #测试决策树正确率 def testing(myTree,data_test,labels): error=0.0 for i in range(len(data_test)): if classify(myTree,labels,data_test[i])!=data_test[i][-1]: error+=1 #print 'myTree %d' %error return float(error) #测试投票节点正确率 def testingMajor(major,data_test): error=0.0 for i in range(len(data_test)): if major!=data_test[i][-1]: error+=1 #print 'major %d' %error return float(error) #后剪枝 def postPruningTree(inputTree,dataSet,data_test,labels): firstStr=inputTree.keys()[0] secondDict=inputTree[firstStr] classList=[example[-1] for example in dataSet] featkey=copy.deepcopy(firstStr) if '<=' in firstStr: featkey=re.compile("(.+<=)").search(firstStr).group()[:-2] featvalue=float(re.compile("(<=.+)").search(firstStr).group()[2:]) labelIndex=labels.index(featkey) temp_labels=copy.deepcopy(labels) del(labels[labelIndex]) for key in secondDict.keys(): if type(secondDict[key]).__name__=='dict': if type(dataSet[0][labelIndex]).__name__=='str': inputTree[firstStr][key]=postPruningTree(secondDict[key],\ splitDataSet(dataSet,labelIndex,key),splitDataSet(data_test,labelIndex,key),copy.deepcopy(labels)) else: inputTree[firstStr][key]=postPruningTree(secondDict[key],\ splitContinuousDataSet(dataSet,labelIndex,featvalue,key),\ splitContinuousDataSet(data_test,labelIndex,featvalue,key),\ copy.deepcopy(labels)) if testing(inputTree,data_test,temp_labels)<=testingMajor(majorityCnt(classList),data_test): return inputTree return majorityCnt(classList) data=df.values[:11,1:].tolist() data_test=df.values[11:,1:].tolist() labels=df.columns.values[1:-1].tolist() myTree=postPruningTree(myTree,data,data_test,labels) import plotTree plotTree.createPlot(myTree)
得到的决策树如下图:
与教材P83的图4.7一致。
总结:
后剪枝决策树比起预剪枝决策树保留了更多的分支。在一般情形下,后剪枝决策树的欠拟合风险很小,泛化性能往往优于预剪枝决策树。但同时其训练时间花销也比较大。