multithreading - Writing Errors in Multiprocessing Python -
i trying write files after editing using multiprocessing code python (2.7). works charm small number(<20). when try more files (20+), goes berserk. using python 2.7.5 on centos 6.5 4 core processor.
import sys, os import multiprocessing import glob list_files = glob.glob("protein/*.txt") def some_func(some_file): open(some_file) some: open(file_output) output: lines in some: #do #edited_lines = func(lines) output.write(edited_lines) pool = multiprocessing.pool(10) # desired number of threads = 10 pool.map(some_func, list_files,) pool.close() pool.join() the final written files overlap each other.
file 1 lines 1 .. file 1 lines 2 .. file 1 lines 3 .. file 1 lines 4 .. file 1 lines 5 .. file 1 lines 6 .. file 1 lines 7 .. file 1 lines 8 .. file 1 lines 9 .. file 1 file 2 lines 1 .. file 2 lines 2 .. file 2 lines 3 .. file 2 lines 4 .. file 2 lines 5 .. file 2 lines 6 .. file 2 lines 7 .. file 2 lines 8 .. file 2 lines 9 .. file 2 output: lines 1 .. file 1 lines 2 .. file 1 lines 3 .. file 1 lines 1 .. file 2 lines 4 .. file 1 lines 5 .. file 1lines 2 .. file 2 lines 3 .. file 2 lines 4 .. file 2 lines 6 .. file 1
the problem you're trying write file many processes in parallel, isn't synchronized. means possible different processes try write @ same time, leading oddities you're seeing.
you can solve either having single writer process, each worker sending lines write single process, or synchronizing writes done each process using multiprocessing.lock.
using single writer:
import glob import multiprocessing functools import partial threading import thread list_files = glob.glob("protein/*.txt") def some_func(out_q, some_file): open(some_file) some: lines in some: #do #edited_lines = func(lines) out_q.put(edited_lines) def write_lines(q): open(file_output) output: line in iter(q.get, none): # end when none received output.write(line) pool = multiprocessing.pool(10) # desired number of threads = 10 m = multiprocessing.manager() q = m.queue() t = thread(target=write_lines, args=(q,)) t.start() pool.map(partial(some_func, q), list_files) pool.close() pool.join() q.put(none) # shut down writer thread t.join() using multiprocessing.lock:
import glob import multiprocessing functools import partial list_files = glob.glob("protein/*.txt") def some_func(lock, some_file): open(some_file) some: open(file_output) output: lines in some: #do #edited_lines = func(lines) lock: output.write(edited_lines) pool = multiprocessing.pool(10) # desired number of threads = 10 m = multiprocessing.manager() lock = m.lock() pool.map(partial(some_func, lock), list_files) pool.close() pool.join() we need use manager create shared objects because you're passing them pool, requires pickling them. normal multiprocessing.lock/multiprocessing.queue objects can passed multiprocessing.process constructor, , cause exception when passed pool method map.
Comments
Post a Comment