multithreading - Writing Errors in Multiprocessing Python -
i trying write files after editing using multiprocessing code python (2.7). works charm small number(<20). when try more files (20+), goes berserk. using python 2.7.5 on centos 6.5 4 core processor.
import sys, os import multiprocessing import glob list_files = glob.glob("protein/*.txt") def some_func(some_file): open(some_file) some: open(file_output) output: lines in some: #do #edited_lines = func(lines) output.write(edited_lines) pool = multiprocessing.pool(10) # desired number of threads = 10 pool.map(some_func, list_files,) pool.close() pool.join()
the final written files overlap each other.
file 1 lines 1 .. file 1 lines 2 .. file 1 lines 3 .. file 1 lines 4 .. file 1 lines 5 .. file 1 lines 6 .. file 1 lines 7 .. file 1 lines 8 .. file 1 lines 9 .. file 1 file 2 lines 1 .. file 2 lines 2 .. file 2 lines 3 .. file 2 lines 4 .. file 2 lines 5 .. file 2 lines 6 .. file 2 lines 7 .. file 2 lines 8 .. file 2 lines 9 .. file 2 output: lines 1 .. file 1 lines 2 .. file 1 lines 3 .. file 1 lines 1 .. file 2 lines 4 .. file 1 lines 5 .. file 1lines 2 .. file 2 lines 3 .. file 2 lines 4 .. file 2 lines 6 .. file 1
the problem you're trying write file many processes in parallel, isn't synchronized. means possible different processes try write @ same time, leading oddities you're seeing.
you can solve either having single writer process, each worker sending lines write single process, or synchronizing writes done each process using multiprocessing.lock
.
using single writer:
import glob import multiprocessing functools import partial threading import thread list_files = glob.glob("protein/*.txt") def some_func(out_q, some_file): open(some_file) some: lines in some: #do #edited_lines = func(lines) out_q.put(edited_lines) def write_lines(q): open(file_output) output: line in iter(q.get, none): # end when none received output.write(line) pool = multiprocessing.pool(10) # desired number of threads = 10 m = multiprocessing.manager() q = m.queue() t = thread(target=write_lines, args=(q,)) t.start() pool.map(partial(some_func, q), list_files) pool.close() pool.join() q.put(none) # shut down writer thread t.join()
using multiprocessing.lock
:
import glob import multiprocessing functools import partial list_files = glob.glob("protein/*.txt") def some_func(lock, some_file): open(some_file) some: open(file_output) output: lines in some: #do #edited_lines = func(lines) lock: output.write(edited_lines) pool = multiprocessing.pool(10) # desired number of threads = 10 m = multiprocessing.manager() lock = m.lock() pool.map(partial(some_func, lock), list_files) pool.close() pool.join()
we need use manager
create shared objects because you're passing them pool
, requires pickling them. normal multiprocessing.lock
/multiprocessing.queue
objects can passed multiprocessing.process
constructor, , cause exception when passed pool
method map
.
Comments
Post a Comment