2019年5月

Python高效处理GB级数据

最近需要处理车辆轨迹数据,数据源是
http://kolntrace.project.citi-lab.fr/ ,得到的CSV文件大小是18.9GB,大约有3亿5千万条记录。

现在的处理方式是读取源文件的数据,进行处理后将结果保存到另一个文件中。现在有四种不同的方式。

方式一

def init_csv(filename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(csv_title)
        f.writelines('\n')


def write_to_csv(filename, vehicleID, time, x_coordinates, y_coordinates, speed):
    with open(filename, 'a+', encoding="utf-8") as f:
        f.writelines(str(vehicleID) +',' + str(time) + ','+ str(x_coordinates) + ','+ str(y_coordinates) + ',' + str(speed))
        f.writelines('\n')


def process_line(line, i, csvfilename):
    if i <= 50:
        print(line + '\n')
        info = line.split()
        time = info[0]
        id = info[1]
        x_coordinates = info[2]
        y_coordinates = info[3]
        speed = info[4]
        if id.isdigit():
            pass
        else:
            write_to_csv(csvfilename, id, time, x_coordinates, y_coordinates, speed)


def read_file_to_csv(filename, csvfilename):
    init_csv(csvfilename)
    with open(filename, 'r', encoding='utf-8') as f:
        i = 1
        for line in f:
            process_line(line, i, csvfilename)
            i = i + 1


def main():
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = 'koln.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)


if __name__ == '__main__':
    main()

第一种方式是我一开始使用,使用了很多子函数,逻辑清晰,但是效率极低,如果要处理完18.9G数据,时间估算是大约需要12天。

方式二

def read_file_to_csv(filename, csvfilename):
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    with open(csvfilename, 'a+', encoding="utf-8") as csvf:
        csvf.writelines(csv_title)
        csvf.writelines('\n')
        with open(filename, 'r', encoding='utf-8') as f:
            for line in f:
                info = line.split()
                time = info[0]
                id = info[1]
                x_coordinates = info[2]
                y_coordinates = info[3]
                speed = info[4]
                if float(speed) != 0:
                    if id.isdigit():
                        linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                        if linedata.count(',') == 4:
                            csvf.writelines(linedata)
                            csvf.writelines('\n')


if __name__ == '__main__':
    ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
    CSV_FILE_NAME = '../../koln.tr/readfiledata.csv'
    read_file_to_csv(ORIGIN_FILE_NAME, CSV_FILE_NAME)

第二种方式是将第一种方式进行改进,去掉了子函数,而且把文件的打开操作作为一步执行,大大减少了文件IO与函数调用的时间,但由于仍然是单进程在执行,所以时间也不是很短,处理完18.9G数据,大概需要3个小时。

方式三

import multiprocessing as mp
import os

ORIGIN_FILE_NAME = '../../koln.tr/koln.tr'
CSV_FILE_NAME = '../../koln.tr/data.csv'

global csvf
csvf = open(CSV_FILE_NAME, 'a+', encoding="utf-8")

def is_number(n):
    try:
        num = float(n)
        # 检查 "nan"
        is_number = num == num   # 或者使用 `math.isnan(num)`
    except ValueError:
        is_number = False
    return is_number


def not_comma_in(n):
    s = str(n)
    if s.find(',') == -1:
        return True
    else:
        return False


def process_wrapper(chunkStart, chunkSize):
    with open(ORIGIN_FILE_NAME) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            info = line.split()
            time = info[0]
            id = info[1]
            x_coordinates = info[2]
            y_coordinates = info[3]
            speed = info[4]
            if is_number(id) and is_number(time) and is_number(x_coordinates) and is_number(y_coordinates) and is_number(speed)\
                    and not_comma_in(id) and not_comma_in(time) and not_comma_in(x_coordinates) and not_comma_in(y_coordinates) and not_comma_in(speed):
                linedata = str(id) + ',' + str(time) + ',' + str(x_coordinates) + ',' + str(y_coordinates) + ',' + str(speed)
                datas = linedata.split(',')
                if 5 == len(datas):
                    csvf.writelines(linedata)
                    csvf.writelines('\n')


def chunkify(fname, size=6138*10240):
    fileEnd = os.path.getsize(fname)
    with open(fname,'rb') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break


def main():
    # init objects
    pool = mp.Pool(processes=60)
    jobs = []

    # create jobs
    csv_title = 'vehicleID,time,x_coordinates,y_coordinates,speed'
    csvf.writelines(csv_title)
    csvf.writelines('\n')

    for chunkStart, chunkSize in chunkify(ORIGIN_FILE_NAME):
        jobs.append(pool.apply_async(process_wrapper, (chunkStart, chunkSize)))

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

第三种方式使用了多进程,所以需要性能比较好的计算机才能运行,这也使得数据处理非常快速,处理完18.9G数据只需要不到10分钟。但是,这也导致了一个问题,由于它是将文件分块进行处理,导致了很多数据在分块的时候在每一块中变成了错误的数据。

方式四

import pandas as pd
import multiprocessing as mp

CSV_FILE_NAME = 'withspeed.csv'
TRACE_FILE = 'trace'


def process_wrapper(chunk, chunk_num):
    trace_num = 1
    with open(TRACE_FILE + '_' + str(chunk_num) + '.csv', 'a+', encoding='utf-8') as f:
        f.write('traceID,time,x,y')
        f.write('\n')
        vehicleID = chunk['vehicleID'].drop_duplicates()
        print(len(vehicleID))
        for id in vehicleID:
            trace = chunk[chunk['vehicleID'] == id].sort_values(by=['time'])
            if len(trace) >= 60:
                x = trace['x_coordinates']
                y = trace['y_coordinates']
                time = trace['time']
                for i in range(len(trace)):
                    if i + 1 < len(trace): # is not over bound
                        if time.values[i+1] == time.values[i] + 1:
                            f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            f.write('\n')
                        elif time.values[i+1] - time.values[i] >= 30:
                            trace_num += 1
                            # f.write(str(trace_num) + ',' + str(time.values[i]) + ',' + str(x.values[i]) + ',' + str(y.values[i]))
                            # f.write('\n')
                        else:
                            pass
                trace_num += 1
        f.close()

def main():
    # init objects
    pool = mp.Pool(processes=20)
    jobs = []

    chunk_size = 5 ** 10

    chunk_num = 0
    for chunk in pd.read_csv(CSV_FILE_NAME, error_bad_lines=False, chunksize=chunk_size):
        jobs.append(pool.apply_async(process_wrapper, (chunk, chunk_num)))
        chunk_num += 1

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()


if __name__ == '__main__':
    main()

方式四是最终采用的方式,先使用panpads 读取CSV文件,就可以无损地对CSV文件进行分块,即通过行数进行分块,并将各个分块的结果保存到不同的文件中,这样必须是分块数据具有一定的独立性。最终的解决方法可以保证在数据正确性的前提下处理速度也是很快的。

开源车辆轨迹数据集

尽管如今可以生成车辆轨迹的仿真软件有很多,如SUMOVISSIMVanetMobiSim,这些软件都可以比较方便地生成车辆轨迹。但是,用软件人工合成的车辆轨迹终究不如在现实世界中采集的数据有说服力。不过,不管是人工合成的也好,还是真实世界采集的也好,都各有优缺点。人工合成的轨迹生成相对于真实轨迹更加方便,但是缺乏“真实性”,而真实采集的轨迹数据则相反,但真实轨迹也不可避免地会面临数据采集开销大,采集过程中的噪声导致错误数据,采集频率比较大等等的问题。

而对车辆进行建模的过程中,车辆轨迹就显得尤为重要,可能大部分的研究人员限于财力人力都没有办法亲自去采集大规模的数据,而人工合成数据也会有一些处理代价。可喜的是,现在网络上也是有一些优质的开源车辆轨迹数据的,现在,整理并分享到这里,希望能帮助各位减少在网络上查找的时间。

人工轨迹数据

1、 Bologna

地点:意大利博洛尼亚,频率:1秒, 持续时长:1小时,车辆数量:22000

2、Lust-scenario

地点:卢森堡,频率:1秒, 持续时长:24小时,区域大小:155.95平方千米

真实轨迹数据

1、 T-Drive

地点:中国北京,频率:180秒, 持续时长:7天,区域大小:750平方千米,车辆类型:出租车,车辆数目:10357

2、 Cabspotting

地点:美国旧金山,频率:60秒, 持续时长:30天,区域大小:18000平方千米,车辆类型:出租车,车辆数目:500

3、Scrg

地点:中国上海,频率:60秒, 持续时长:1天,车辆类型:出租车,车辆数目:4000

4、Roma

地点:意大利罗马,频率:7秒, 持续时长:30天,车辆类型:出租车,车辆数目:320

5、Creteil

地点:法国克雷泰伊,频率:1秒, 持续时长:4小时,车辆类型:普通车

6、Madrid

地点:西班牙马德里,频率:500米一个点, 持续时长:1天

7、King county

地点:美国西雅图国王郡,频率:120秒, 持续时长:长期更新, 车辆类型:公交车

8、CTA

地点:美国芝加哥,频率:40秒, 持续时长:长期更新, 车辆类型:公交车,车辆数量:1648

数据集网站:

参考资料:

  1. souvenir001.车辆运动轨迹数据集[Online].https://blog.csdn.net/souvenir001/article/details/52180335.
    2016年08月11日.
  2. Mobility datasets[Online].https://privamov.github.io/accio/docs/datasets.html.
  3. Uppoor, Sandesh, et al. “Generation and analysis of a large-scale urban vehicular mobility dataset.” IEEE Transactions on Mobile Computing 13.5 (2014): 1061-1075.