2012年4月29日星期日

Python多进程处理文件

有一天,QA要我写一个程序,处理一个很大的文本文件,通常几百MB,有时几个G。
这个本文本件是一些顺序打印的JSON,需要解析出来,然后按照他们喜欢的格式重新打印。

先写了一个单线程版本,发现很慢,CPU 100%。估计是parse json太慢了。

buf = ""
for line in sys.stdin:
if line[0] == "}":
line = "}"
buf += line
handle(buf)
buf = "{"
else:
buf += line


def handle(buf, m):
o = json.loads(buf)
s = ""
...
sys.stdout.write(s)


记得python的多线程是伪多线程,在多核的机器上CPU还是100%。于是直接采用多进程。

buf_queue = multiprocessing.Queue()

def process(q):
for buf in iter(q.get, None):
handle(buf)
q.put(None)

for _i in xrange(23):
multiprocessing.Process(target=process, args=(buf_queue)).start()

buf = ""
for line in sys.stdin:
if line[0] == "}":
line = "}"
buf += line
buf_queue.put(buf)
buf = "{"
else:
buf += line
buf_queue.put(None)

主进程不断向一个队列里丢字符串,23个子进程谁拿到谁处理。主进程在队列末端放一个None,子进程发现None就退出。
当然,子进程在退出的时候还要往里面放一个None,这样后面的子进程也就可以退出了。
也可以让子进程往队列里放23个None,这样所有子进程也都可以退出了。
iter(q.get, None)
是一个很神奇的语法糖:每次执行q.get()的返回值,作为buf在for代码块中处理。直到q.get()返回了None,for就退出了。

这样有一个问题,多个进程都在stdout上写,会写乱。于是加了一个mutex,multiprocessing.Lock。Lock支持with语法。

然后觉得自己搞fork进程、put/get队列什么的太底层太�嗦了,于是改成了Pool

pool = multiprocessing.Pool()
buf = ""
for line in sys.stdin:
if line[0] == "}":
line = "}"
buf += line
pool.apply_async(handle, (buf, ))
buf = "{"
else:
buf += line

但是multiprocessing.Lock不能pickle串行化,只好改成multiprocessing.Manager.Lock,生成一个委托的锁,在多进程中共享。

俗话说,共享状态和锁是并发程序的天敌。于是继续寻求更好的办法。想,如果每个进程不要自己打印,而是把要打印的字符串扔到一个的队列里面,然后有一个单独的线程打印,就可以避免自己用锁了。不过最后使用了更紧凑的写法:
pool.apply_async(handle, (buf,), callback = sys.stdout.write)
当然handle结尾改成return s
看了源代码,Pool会有一个fork出来的线程负责在所有子进程外面顺序执行callback,从而避免了应用程序显性用锁。当然Pool本身有一个输入队列,一个输出队列,各自有锁,躲不掉的。

2012年4月1日星期日

线程和进程的区别

进程好像有一种copy on write的性质,线程总是共享内存。

from multiprocessing import Process
from threading import Thread

class A(Process):
    def __init__(self):
        Process.__init__(self)
        self.done = 1

    def run(self):
        print self.done, id(self.done)
        self.done += 1
        print self.done, id(self.done)


class B(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.done = 1

    def run(self):
        print self.done, id(self.done)
        self.done += 1
        print self.done, id(self.done)

if __name__ == '__main__':    a=A()
    a.start()
    a.join()
    print a.done, id(a.done)
    b=B()
    b.start()
    b.join()
    print b.done, id(b.done)


结果:
1 90612360
2 90612336
1 90612360
1 90612360
2 90612336
2 90612336

坑啊。。(每当此时,我就格外怀念Erlang。。。)