skip to Main Content

os and python info:

uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2

Here is a simple class which can start multiprocessing.

from multiprocessing.pool import Pool    

class my_mp(object):
    def __init__(self):
        self.process_num = 3
        fh = open('test.txt', 'w')
    def run_task(self,i):
        print('process {} start'.format(str(i)))
        time.sleep(2)
        print('process {} end'.format(str(i)))
    def run(self):
        pool = Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()

Initialize the my_mp class,then start multiprocess.

ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end

Now replace fh = open('test.txt', 'w') with self.fh = open('test.txt', 'w') in my_mp class and try again.

ins = my_mp()
ins.run()    

No output!Why no process start?

>>> from multiprocessing.pool import Pool    
>>> 
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         self.fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...         self.fh = fh
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> 

Why can’t add file handler with the form of self.fh in the __init__ method?I have never called the file handler defined in __init__ in any process.

2

Answers


  1. I did some investigation, but it does not fully answer the question. I am going to post the results here in case if they help somebody else.

    First, if the subprocess fails, there is no traceback. So I added the additional line to display the output of subprocesses. It should be None if no errors occur. The new code:

            for i in range(3):
                res = pool.apply_async(self.run_task, args=(i,))
                print(res.get())
    

    The output

    Traceback (most recent call last):
      File "C:/temp/LeetCode-solutions/multithreading.py", line 43, in <module>
        mp.run()
      File "C:/temp/LeetCode-solutions/multithreading.py", line 19, in run
        self.multiprocessing()
      File "C:/temp/LeetCode-solutions/multithreading.py", line 30, in multiprocessing
        print(res.get())
      File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingpool.py", line 771, in get
        raise self._value
      File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingpool.py", line 537, in _handle_tasks
        put(task)
      File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingconnection.py", line 206, in send
        self._send_bytes(_ForkingPickler.dumps(obj))
      File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingreduction.py", line 51, in dumps
        cls(buf, protocol).dump(obj)
    TypeError: cannot pickle '_io.TextIOWrapper' object
    

    It seems that the program gets the file object as part of the argument to self.run_task. The error has a long history on StackOverflow, but the best explanation IMO is here:
    https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604

    I didn’t find why having an attribute that is file object makes all the attributes of the class file objects, but I hope that unveils some mystery.

    Final test: the following code works as expected

    from multiprocessing.pool import Pool
    import time
    
    
    class MyMP(object):
        def __init__(self):
            self.process_num = 3
    
        def run(self):
            self.fh = open('test.txt', 'w')
            pool = Pool(processes=3)
            for i in range(3):
                res = pool.apply_async(run_task, args=(i,))
                print(res.get())
            pool.close()
            pool.join()
            self.fh.close()
    
    def run_task(i):
        print('process {} start'.format(str(i)))
        time.sleep(2)
        print('process {} end'.format(str(i)))
    
    Login or Signup to reply.
  2. The problem:

    Stdlib multiprocessing uses pickle to serialize objects. Anything which needs to be sent across the process boundary needs to be picklable.

    Custom class instances are generally picklable, as long as all their attributes are picklable – it works by importing the type within the subprocess and unpickling the attributes.

    The issue is that the object returned by open() is not picklable.

    >>> class A:
    ...     pass
    ... 
    >>> import pickle
    >>> pickle.dumps(A())
    b'x80x04x95x15x00x00x00x00x00x00x00x8cx08__main__x94x8cx01Ax94x93x94)x81x94.'
    >>> class A:
    ...     def __init__(self):
    ...         self.fh = open("test.txt", "w")
    ... 
    >>> pickle.dumps(A())
    TypeError: cannot pickle '_io.TextIOWrapper' object
    

    In the first case, the multiprocessing pool still works because fh is just a local variable and it’s deleted as soon as it’s out of scope, i.e. when the __init__ method returns. But as soon as you save this handle into the instance’s namespace with self.fh = open(...), there will remain a reference and it will need to be sent over the process boundary.

    You might think that since you’ve only scheduled the method self.run_task to execute in the pool, that the state set from __init__ doesn’t matter, but that’s not the case. There is still a reference:

    >>> ins = my_mp()
    >>> ins.run_task.__self__.__dict__
    {'process_num': 3,
     'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
    

    Note that calling ins = my_mp() runs the __init__ method in the main process, and ins.run_task is the object which gets sent over the process boundary.

    Solution:

    There is a third-party library which provides a drop-in replacement for the stdlib multiprocessing Pool – pip install pathos and replace the multiprocessing import with:

    from pathos.multiprocessing import Pool
    

    pathos uses dill, a more powerful serialization library than pickle, so it is able to serialize the objects returned by open(). Your code should work again without any other changes. However, you should beware that each worker process will not know about other processes writing bytes to self.fh, so whichever worker writes last may overwrite data written earlier from some other process.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search