这篇文章记录的是使用PySpark调用Spark MLib的Decision Tree在 KDD Cup 1999 上训练。

建模训练

首先,初始化环境

import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="DecisionTree")

下载训练集

import urllib
f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

读取训练集

data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print( "Train data size is {}".format(raw_data.count()))
Train data size is 4898431

下载测试集

ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")

读取测试集

test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print ("Test data size is {}".format(test_raw_data.count()))

取出分类数据,以及它们的类别

from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()
protocols, services[:5], flags[:5]
(['tcp', 'udp', 'icmp'],
 ['http', 'smtp', 'domain_u', 'auth', 'finger'],
 ['SF', 'S2', 'S1', 'S3', 'OTH'])

构建LabelPointcreate_labeled_point函数做了3件事

  • 取出Feature,并把分类数据转换为数字
  • 取出Label
  • 结合放到LabelPoint(label, feature)里面
def create_labeled_point(line_split):
    # 第41列是label,所以只要41列前面的列
    clean_line_split = line_split[0:41]
    
    # 把p rotocol 转换为数值分类变量
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # 把 service 转换为数值分类变量
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # 把 flag 转换为数值分类变量
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # 把 abel 转换为01分类变量
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    # 最终返回的是LabelPoint(label, feature)
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

训练决策树,决策树需要输入参数的包括:

  • training_data:训练集
  • numClasses:训练类别数
  • categoricalFeaturesInfo:分类特征格式为{第i列:分类数,...}
  • impurity:不纯度,一般都是用CART的gini
  • maxDepth:树的最大深度
  • maxBins:树的最大bins
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# 构建模型
t0 = time()
# categoricalFeaturesInfo={第i列:分类数}
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print("Classifier trained in {} seconds".format(round(tt,3)))
Classifier trained in 124.439 seconds

使用决策树模型,在测试集做推理

predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

计算测试准确率

t0 = time()
test_accuracy = labels_and_preds.filter(lambda v: v[0] == v[1]).count() / float(test_data.count())
tt = time() - t0

print ("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

显示决策树的逻辑

print ("Learned classification tree model:")
print (tree_model.toDebugString())
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 25 nodes
  If (feature 22 <= 35.5)
   If (feature 38 <= 0.875)
    If (feature 36 <= 0.445)
     If (feature 34 <= 0.925)
      Predict: 0.0
     Else (feature 34 > 0.925)
      Predict: 1.0
    Else (feature 36 > 0.445)
     If (feature 2 in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 1.0
   Else (feature 38 > 0.875)
    If (feature 3 in {0.0,1.0,2.0})
     Predict: 0.0
    Else (feature 3 not in {0.0,1.0,2.0})
     If (feature 36 <= 0.255)
      Predict: 1.0
     Else (feature 36 > 0.255)
      Predict: 0.0
  Else (feature 22 > 35.5)
   If (feature 5 <= 2.0)
    If (feature 2 in {11.0,66.0})
     Predict: 0.0
    Else (feature 2 not in {11.0,66.0})
     If (feature 11 <= 0.5)
      Predict: 1.0
     Else (feature 11 > 0.5)
      Predict: 0.0
   Else (feature 5 > 2.0)
    If (feature 29 <= 0.08499999999999999)
     If (feature 2 in {0.0,5.0,10.0,1.0,2.0,22.0,8.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,5.0,10.0,1.0,2.0,22.0,8.0})
      Predict: 1.0
    Else (feature 29 > 0.08499999999999999)
     Predict: 1.0
print ("Service 0 is {}".format(services[0]))
print ("Service 52 is {}".format(services[52]))
Service 0 is http
Service 52 is pop_2

只用部分重要特征做训练

def create_labeled_point_minimal(line_split):
    clean_line_split = line_split[3:4] + line_split[5:6] + line_split[22:23]
    
    try:
        clean_line_split[0] = flags.index(clean_line_split[0])
    except:
        clean_line_split[0] = len(flags)
    
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data_minimal = csv_data.map(create_labeled_point_minimal)
test_data_minimal = test_csv_data.map(create_labeled_point_minimal)

构建模型

t0 = time()
tree_model_minimal = DecisionTree.trainClassifier(training_data_minimal, numClasses=2, 
                                          categoricalFeaturesInfo={0: len(flags)},
                                          impurity='gini', maxDepth=3, maxBins=32)
tt = time() - t0

print ("Classifier trained in {} seconds".format(round(tt,3)))
Classifier trained in 84.475 seconds
predictions_minimal = tree_model_minimal.predict(test_data_minimal.map(lambda p: p.features))
labels_and_preds_minimal = test_data_minimal.map(lambda p: p.label).zip(predictions_minimal)

查看准确率

t0 = time()
test_accuracy = labels_and_preds_minimal.filter(lambda v: v[0] == v[1]).count() / float(test_data_minimal.count())
tt = time() - t0

print ("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))
Prediction made in 5.193 seconds. Test accuracy is 0.9156

持久化

把模型保存起来

tree_model_minimal.save(sc, "tree_model_minimal.model")

读取模型

from pyspark.mllib.tree import DecisionTreeModel

DecisionTreeModel.load(sc, "tree_model_minimal.model")
最后修改:2021 年 07 月 15 日 05 : 47 PM
如果觉得我的文章对你有用,请随意赞赏