当文件特别大的时候,逐行读取文件并处理是很废时间的,目前有两种解决办法
每次读取多行
1 | READ_SIZE是设置的读取多少个字节, 如果size小于缓冲区的大小,则读取缓冲区大小的数据; |
多线程读取
思路是,主线程负责往队列中放每一行, 很多自线程并行对数据进行处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27queue = Queue.Queue()
class MultiThreadFile(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
line = self.queue.get()
deal_line()
self.queue.task_done()
if __name__ == '__main__':
for i in range(30):
t = MultiThreadFile(queue)
t.setDaemon(True)
t.start()
f = open(USER_TERM_FILE)
while True:
lines = f.readlines(READ_SIZE)
if not lines:
break
for line in lines:
line = line.rstrip()
queue.put(line)
queue.join()
对数据切成batch处理
这个程序是将数据切成200个batch,每5个batch进行并行跑,最后将batch合并下
感觉和hadoop的思想是一样的1
2
3
4
5
6
7
8
9for((i = 1; i <= 200; i++))
do
sh your_program.sh $file_$i &
let mod=i%5
if [ $mod -eq 0 ]
then
wait
fi
done