在Kaggle比赛TalkingData AdTracking Fraud Detection Challenge里遇到了一个问题,数据集的size太大了,用Pandas不能一下全部加载进来。根据forum里面的内容总结一下几种解决方式。
方法1:预先设定数据类型
有的数据用int32
才能存储,而有的数据用int16
就可以。这样可以先分别调查每个feature的数据类型,之后在读取的时候预先设定数据类型,来避免内存溢出。
import numpy as np
import pandas as pd
import datetime
import os
import time
import matplotlib.pyplot as plt
import seaborn as sns
import gc
%matplotlib inline
dtypes = {
'ip' : 'uint32',
'app' : 'uint16',
'device' : 'uint16',
'os' : 'uint16',
'channel' : 'uint16',
'is_attributed' : 'uint8',
}
train = pd.read_csv('./train.csv', dtype=dtypes)
train.info()
方法2:随机抽样
假设要在原始数据集中随机抽样,来训练多个模型,那么可以每次做一次随机抽样训练一个模型,这也和LightGBM
与Random Forest
等算法的思想类似。
首先看一下数据集一共多少行:
import subprocess
# https://stackoverflow.com/questions/845058/how-to-get-line-count-cheaply-in-python
def file_len(fname):
p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result, err = p.communicate()
if p.returncode != 0:
raise IOError(err)
return int(result.strip().split()[0])
lines = file_len('./train.csv')
print('Number of lines in "train.csv" is:', lines)
假设要选择1,000,000
个样本,可以先随机选择lines-1-1000000
个样本来skip掉(这个需要花费的时间较长):
# generate list of lines to skip
skiplines = np.random.choice(np.arange(1, lines), size=lines-1-1000000, replace=False)
# sort the list
skiplines=np.sort(skiplines)
train = pd.read_csv('./train.csv', skiprows=skiplines, dtype=dtypes)
这个中间变量比较大,需要删除:
del skiplines
gc.collect()
train['click_time'] = pd.to_datetime(train['click_time'])
train['attributed_time'] = pd.to_datetime(train['attributed_time'])
方法3:chunk分段读取
这个数据集里,正样本的比例很小,所以用降采样思想,把正样本取出来,然后再取一些负样本。但直接读取数据集来选择正样本是不可能的,所以可以通过分段chunk
来一段段的把正样本找到,并放到一个DataFrame
里。
df_converted = pd.DataFrame()
#酶促取10^6个样本
chunksize = 10 ** 6
for chunk in pd.read_csv('./train.csv', chunksize=chunksize, dtype=dtypes):
filtered = (chunk[(np.where(chunk['is_attributed']==1, True, False))])
df_converted = pd.concat([df_converted, filtered], ignore_index=True, )
方法4:只读入部分列
之前的方法都是以行为思想,实际上在大数据里,以列为读取单位的方式也很多,像HBase
等等数据库都是列式存储的。在pandas里也可以以列为单位读入一部分列。
columns = ['ip', 'click_time', 'is_attributed']
dtypes = {
'ip' : 'uint32',
'is_attributed' : 'uint8',
}
ips_df = pd.read_csv('./train.csv', usecols=columns, dtype=dtypes)
方法5:分而治之
对整个数据集做groupby
会导致计算问题,但可以考虑用deep learning batch的思想分而治之,比如要做count
,可以每次算一部分count
,然后把他们加到一起:
ips_df[0:100][['ip', 'is_attributed']].groupby('ip', as_index=False).count()[:10]
size=100000
all_rows = len(ips_df)
num_parts = all_rows // size
# 第一个batch
ip_counts = ips_df[0:size][['ip', 'is_attributed']].groupby('ip', as_index=False).count()
# 剩下的batch
for p in range(1,num_parts):
start = p * size
end = p * size + size
if end < all_rows:
group = ips_df[start:end][['ip', 'is_attributed']].groupby('ip', as_index=False).count()
else:
group = ips_df[start:][['ip', 'is_attributed']].groupby('ip', as_index=False).count()
ip_counts = ip_counts.merge(group, on='ip', how='outer')
ip_counts.columns = ['ip', 'count1','count2']
ip_counts['counts'] = np.nansum((ip_counts['count1'], ip_counts['count2']), axis = 0)
ip_counts.drop(columns=['count1', 'count2'], axis = 0, inplace=True)
方法6:Dask
可以用Dask来做处理,Dask会把超过内存的内容自动分成子任务来处理。
首先导入dask.dataframe
,然后剩下的操作和pandas.DataFrame
就差不多了:
import dask
import dask.dataframe as dd
加载训练数据
dtypes = {'ip':'uint32',
'app': 'uint16',
'device': 'uint16',
'os': 'uint16',
'channel': 'uint16',
'is_attributed': 'uint8'}
train = dd.read_csv("./train.csv", dtype=dtypes, parse_dates=['click_time', 'attributed_time'])
train.head()
计算len
的时间会比较长:
len(train)
184903890
其他计算需要用compute
来触发:
train['ip'].mean().compute()
90876.04196792723
也可以进行复杂一些的计算,只不过时间会毕竟长:
channel_means = train[['channel','is_attributed']].groupby('channel').mean().compute()
channel_means[:20]
Dask不支持的一些操作,比如:as_index=False