python多线程处理文件

当文件特别大的时候,逐行读取文件并处理是很废时间的,目前有两种解决办法

每次读取多行

1
2
3
4
5
6
7
8
9
10
11
READ_SIZE是设置的读取多少个字节, 如果size小于缓冲区的大小,则读取缓冲区大小的数据;
如果size大于缓冲区的大小,读取size大小的数据,但不是完全的等于size的大小;
一般读取比size大的整行的数据, size最小值是1024*8. readlines不会丢失行,当size大小不处在行边界当时候。

f = open(file_name)
while True:
lines = f.readlines(READ_SIZE)
if not lines:
break
for line in lines:
deal_one_line()

多线程读取

思路是,主线程负责往队列中放每一行, 很多自线程并行对数据进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
queue = Queue.Queue()
class MultiThreadFile(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
line = self.queue.get()
deal_line()
self.queue.task_done()

if __name__ == '__main__':
for i in range(30):
t = MultiThreadFile(queue)
t.setDaemon(True)
t.start()

f = open(USER_TERM_FILE)
while True:
lines = f.readlines(READ_SIZE)
if not lines:
break
for line in lines:
line = line.rstrip()
queue.put(line)
queue.join()

对数据切成batch处理

这个程序是将数据切成200个batch,每5个batch进行并行跑,最后将batch合并下
感觉和hadoop的思想是一样的

1
2
3
4
5
6
7
8
9
for((i = 1; i <= 200; i++))
do
sh your_program.sh $file_$i &
let mod=i%5
if [ $mod -eq 0 ]
then
wait
fi
done

------ 本文结束 ------
k