这篇文章记录的是使用PySpark调用Spark MLib的Decision Tree在 KDD Cup 1999 上训练。
建模训练
首先,初始化环境
import findspark
findspark.init()
import pyspark
import random
sc = pyspark.SparkContext(appName="DecisionTree")
下载训练集
import urllib
f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")
读取训练集
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)
print( "Train data size is {}".format(raw_data.count()))
Train data size is 4898431
下载测试集
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")
读取测试集
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)
print ("Test data size is {}".format(test_raw_data.count()))
取出分类数据,以及它们的类别
from pyspark.mllib.regression import LabeledPoint
from numpy import array
csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()
protocols, services[:5], flags[:5]
(['tcp', 'udp', 'icmp'],
['http', 'smtp', 'domain_u', 'auth', 'finger'],
['SF', 'S2', 'S1', 'S3', 'OTH'])
构建LabelPoint
,create_labeled_point
函数做了3件事
- 取出Feature,并把分类数据转换为数字
- 取出Label
- 结合放到
LabelPoint(label, feature)
里面
def create_labeled_point(line_split):
# 第41列是label,所以只要41列前面的列
clean_line_split = line_split[0:41]
# 把p rotocol 转换为数值分类变量
try:
clean_line_split[1] = protocols.index(clean_line_split[1])
except:
clean_line_split[1] = len(protocols)
# 把 service 转换为数值分类变量
try:
clean_line_split[2] = services.index(clean_line_split[2])
except:
clean_line_split[2] = len(services)
# 把 flag 转换为数值分类变量
try:
clean_line_split[3] = flags.index(clean_line_split[3])
except:
clean_line_split[3] = len(flags)
# 把 abel 转换为01分类变量
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
# 最终返回的是LabelPoint(label, feature)
return LabeledPoint(attack, array([float(x) for x in clean_line_split]))
training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)
训练决策树,决策树需要输入参数的包括:
- training_data:训练集
- numClasses:训练类别数
- categoricalFeaturesInfo:分类特征格式为{第i列:分类数,...}
- impurity:不纯度,一般都是用CART的
gini
- maxDepth:树的最大深度
- maxBins:树的最大bins
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time
# 构建模型
t0 = time()
# categoricalFeaturesInfo={第i列:分类数}
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2,
categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0
print("Classifier trained in {} seconds".format(round(tt,3)))
Classifier trained in 124.439 seconds
使用决策树模型,在测试集做推理
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)
计算测试准确率
t0 = time()
test_accuracy = labels_and_preds.filter(lambda v: v[0] == v[1]).count() / float(test_data.count())
tt = time() - t0
print ("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))
显示决策树的逻辑
print ("Learned classification tree model:")
print (tree_model.toDebugString())
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 25 nodes
If (feature 22 <= 35.5)
If (feature 38 <= 0.875)
If (feature 36 <= 0.445)
If (feature 34 <= 0.925)
Predict: 0.0
Else (feature 34 > 0.925)
Predict: 1.0
Else (feature 36 > 0.445)
If (feature 2 in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
Predict: 0.0
Else (feature 2 not in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
Predict: 1.0
Else (feature 38 > 0.875)
If (feature 3 in {0.0,1.0,2.0})
Predict: 0.0
Else (feature 3 not in {0.0,1.0,2.0})
If (feature 36 <= 0.255)
Predict: 1.0
Else (feature 36 > 0.255)
Predict: 0.0
Else (feature 22 > 35.5)
If (feature 5 <= 2.0)
If (feature 2 in {11.0,66.0})
Predict: 0.0
Else (feature 2 not in {11.0,66.0})
If (feature 11 <= 0.5)
Predict: 1.0
Else (feature 11 > 0.5)
Predict: 0.0
Else (feature 5 > 2.0)
If (feature 29 <= 0.08499999999999999)
If (feature 2 in {0.0,5.0,10.0,1.0,2.0,22.0,8.0})
Predict: 0.0
Else (feature 2 not in {0.0,5.0,10.0,1.0,2.0,22.0,8.0})
Predict: 1.0
Else (feature 29 > 0.08499999999999999)
Predict: 1.0
print ("Service 0 is {}".format(services[0]))
print ("Service 52 is {}".format(services[52]))
Service 0 is http
Service 52 is pop_2
只用部分重要特征做训练
def create_labeled_point_minimal(line_split):
clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
try:
clean_line_split[0] = flags.index(clean_line_split[0])
except:
clean_line_split[0] = len(flags)
attack = 1.0
if line_split[41]=='normal.':
attack = 0.0
return LabeledPoint(attack, array([float(x) for x in clean_line_split]))
training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)
构建模型
t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2,
categoricalFeaturesInfo={0: len(flags)},
impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0
print ("Classifier trained in {} seconds".format(round(tt,3)))
Classifier trained in 84.475 seconds
predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)
查看准确率
t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda v: v[0] == v[1]).count() / float(test_data_minimal.count())
tt = time() - t0
print ("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))
Prediction made in 5.193 seconds. Test accuracy is 0.9156
持久化
把模型保存起来
tree_model_minimal.save(sc, "tree_model_minimal.model")
读取模型
from pyspark.mllib.tree import DecisionTreeModel
DecisionTreeModel.load(sc, "tree_model_minimal.model")