os and python info:
uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
Here is a simple class which can start multiprocessing.
from multiprocessing.pool import Pool
class my_mp(object):
def __init__(self):
self.process_num = 3
fh = open('test.txt', 'w')
def run_task(self,i):
print('process {} start'.format(str(i)))
time.sleep(2)
print('process {} end'.format(str(i)))
def run(self):
pool = Pool(processes = self.process_num)
for i in range(self.process_num):
pool.apply_async(self.run_task,args = (i,))
pool.close()
pool.join()
Initialize the my_mp
class,then start multiprocess.
ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
Now replace fh = open('test.txt', 'w')
with self.fh = open('test.txt', 'w')
in my_mp
class and try again.
ins = my_mp()
ins.run()
No output!Why no process start?
>>> from multiprocessing.pool import Pool
>>>
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... self.fh = open('test.txt', 'w')
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
... def __init__(self):
... self.process_num = 3
... fh = open('test.txt', 'w')
... self.fh = fh
... def run_task(self,i):
... print('process {} start'.format(str(i)))
... time.sleep(2)
... print('process {} end'.format(str(i)))
... def run(self):
... pool = Pool(processes = self.process_num)
... for i in range(self.process_num):
... pool.apply_async(self.run_task,args = (i,))
... pool.close()
... pool.join()
...
>>> x = my_mp()
>>> x.run()
>>>
Why can’t add file handler with the form of self.fh
in the __init__
method?I have never called the file handler defined in __init__
in any process.
2
Answers
I did some investigation, but it does not fully answer the question. I am going to post the results here in case if they help somebody else.
First, if the subprocess fails, there is no traceback. So I added the additional line to display the output of subprocesses. It should be
None
if no errors occur. The new code:The output
It seems that the program gets the file object as part of the argument to
self.run_task
. The error has a long history on StackOverflow, but the best explanation IMO is here:https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604
I didn’t find why having an attribute that is file object makes all the attributes of the class file objects, but I hope that unveils some mystery.
Final test: the following code works as expected
The problem:
Stdlib multiprocessing uses pickle to serialize objects. Anything which needs to be sent across the process boundary needs to be picklable.
Custom class instances are generally picklable, as long as all their attributes are picklable – it works by importing the type within the subprocess and unpickling the attributes.
The issue is that the object returned by
open()
is not picklable.In the first case, the multiprocessing pool still works because
fh
is just a local variable and it’s deleted as soon as it’s out of scope, i.e. when the__init__
method returns. But as soon as you save this handle into the instance’s namespace withself.fh = open(...)
, there will remain a reference and it will need to be sent over the process boundary.You might think that since you’ve only scheduled the method
self.run_task
to execute in the pool, that the state set from__init__
doesn’t matter, but that’s not the case. There is still a reference:Note that calling
ins = my_mp()
runs the__init__
method in the main process, andins.run_task
is the object which gets sent over the process boundary.Solution:
There is a third-party library which provides a drop-in replacement for the stdlib multiprocessing Pool –
pip install pathos
and replace the multiprocessing import with:pathos uses dill, a more powerful serialization library than pickle, so it is able to serialize the objects returned by
open()
. Your code should work again without any other changes. However, you should beware that each worker process will not know about other processes writing bytes toself.fh
, so whichever worker writes last may overwrite data written earlier from some other process.