导入GoogleClusterData到MySQL

  本篇随笔记录如何导入google-cluster-data-2011-1-2的

job_events和task_events到MySQL

1. 下载数据

download_job_events:

import urllib2

url = 'https://commondatastorage.googleapis.com/clusterdata-2011-2/'
f = open('C:\SHA256SUM')
l = f.readlines()
f.close()
for i in l:
    if i.count('job_events')>0:
        fileAddr = i.split()[1][1:]
        fileName = fileAddr.split('/')[1]
        print 'downloading', fileName
        data = urllib2.urlopen(url+fileAddr).read()
        print 'saving', fileName
        fileDown = open('C:\job_events\'+fileName, 'wb')
        fileDown.write(data)
        fileDown.close()

download_task_events:

import urllib2

url = 'https://commondatastorage.googleapis.com/clusterdata-2011-2/'
f = open('C:\SHA256SUM')
l = f.readlines()
f.close()
for i in l:
    if i.count('task_events')>0:
        fileAddr = i.split()[1][1:]
        fileName = fileAddr.split('/')[1]
        print 'downloading', fileName
        data = urllib2.urlopen(url+fileAddr).read()
        print 'saving', fileName
        fileDown = open('C:\task_events\'+fileName, 'wb')
        fileDown.write(data)
        fileDown.close()

注意:这次用的数据是

clusterdata-2011-2

不同于之前重画GoogleCLusterData中的

clusterdata-2011-1

2. 解压缩

由于不能直接导入压缩包里的数据到mysql,故先将它们解压缩

unzip_job_events:

import gzip
import os

fileNames = os.listdir('C:\task_events')

for l in fileNames:
    print 'now at: '+ l
    f = gzip.open('C:\job_events\'+l)
    fOut = open('C:\job_events_unzip\'+l[:-3], 'w')
    content = f.read()
    fOut.write(content)
    f.close()
    fOut.close()
    #raw_input()

unzip_task_events:

import gzip
import os

fileNames = os.listdir('C:\task_events')

for l in fileNames:
    print 'now at: '+ l
    f = gzip.open('C:\task_events\'+l)
    fOut = open('C:\task_events_unzip\'+l[:-3], 'w')
    content = f.read()
    fOut.write(content)
    f.close()
    fOut.close()

3. 建数据库

create_job_events:

create table job_events(
time bigint,
missing_info int,
job_id bigint,
event_type int,
user text,
scheduling_class int,
job_name text,
logical_job_name text)
engine = myisam;

create_task_events:

create table task_events(
time bigint,
missing_info int,
job_id bigint,
task_index bigint,
machine_id bigint,
event_type int,
user text,
scheduling_class int,
priority int,
cpu_request float,
memory_request float,
disk_space_request float,
difference_machine_restriction boolean
)engine = myisam;

注意:由于数据量非常大,这里一定要选择myisam作为engine。

4. 导入数据

由于数据中有部分为空的值,需要先设定mysql使其能够导入空值。

具体方法为:

在mysql的控制台输入

SET @@GLOBAL.sql_mode="NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION";

之后就可以开始导入数据了。

注意!!以下代码在导入类似2.3e-10的数据会产生严重问题,具体为导入的数据在MySQL中变为负数,而且绝对值不小!!!

loadJobEvents2MySQL.py

import os
import MySQLdb

fileNames = os.listdir('C:\task_events_unzip')

conn=MySQLdb.connect(host="localhost",user="root",passwd="123456",db="googleclusterdata",charset="utf8")
cursor = conn.cursor()
cursor.execute('truncate job_events')

for f in fileNames:
    print 'now at: '+ f
    order = "load data infile 'C:/job_events_unzip/%s' into table job_events fields terminated by ',' lines terminated by '
'" %f
    print order
    cursor.execute(order)
    conn.commit()

loadTaskEvents2MySQL.py

import os
import MySQLdb

fileNames = os.listdir('C:\task_events_unzip')

conn=MySQLdb.connect(host="localhost",user="root",passwd="123456",db="googleclusterdata",charset="utf8")
cursor = conn.cursor()
cursor.execute('truncate task_events')

for f in fileNames:
    print 'now at: '+ f
    order = "load data infile 'C:/task_events_unzip/%s' into table task_events fields terminated by ',' lines terminated by '
'" %f
    print order
    cursor.execute(order)
    conn.commit()

注意:这里需要相应的修改密码和使用的数据库名(db)