skip to Main Content

I am currently working on a multiprocessing Python program where each process writes its index to shared memory as a consecutive 4-byte integer. and there is a reader that reads other processes’ indexes without any locks(due to the speed limitations). Because I am not using any synchronization primitives, I am worried that reader process might read corrupted data (e.g., the first 2 bytes of one index and the last 2 bytes of another index) due to the memory being written byte by byte.

Reading previous indexes is not a problem in my case; I just need to ensure that the read index is historically valid.

I have observed that on macOS, this issue does not occur (perhaps due to Python or macOS-specific behavior). However, the program needs to run on other operating systems such as Windows and Ubuntu as well.

I created a small example bellow.

import time
import random
import struct
import multiprocessing
from multiprocessing import shared_memory

def writer(rank, shared_memory):
    index = 0
    while True:
        byte_array = struct.pack("i", index)
        shared_memory.buf[rank * 4: rank * 4 + 4] = byte_array
        index += 1
        time.sleep(random.random())

def reader(shared_memory, size):
    while True:
        indexes = struct.unpack(f"{size}i", shared_memory.buf[:size*4])
        print(f"Read indexes: {indexes}")
        time.sleep(1)

if __name__ == "__main__":
    size = 10
    shared_memory = shared_memory.SharedMemory(
            create=True, size= 4 * size
        )
    
    ctx = multiprocessing.get_context("spawn")

    processes = []
    for i in range(size):
        p = ctx.Process(target=writer, args=(i, shared_memory))
        processes.append(p)
        p.start()

    reader_process = multiprocessing.Process(target=reader, args=(shared_memory, size))
    reader_process.start()

    for p in processes:
        p.join()
    reader_process.join()

Any advice or examples to ensure data integrity would be appreciated!

2

Answers


  1. Usually, memory transfers will happen in chunks with rather large size, e.g. in 64KB blocks. I would strongly assume that your 4 bytes will be written/read in an atomic operation.

    I’d recommend to do a stress test with lots of random writes and reads of known numbers. If you don’t see any problems there, you’re probably good.

    Other than that, you can only roll some kind of custom locking. E.g. reserve the last bit of the last byte for locking. When writing, first write 0x80 to the last address, then write the payload data. When reading, if (last read byte) & 0x80 != 0, you know that you read a half-written index and need to retry.

    In the usual LSB order of integers, this bit is the flag bit – i.e. your indexes must not be negative, to not interfere with the "lock" marking.

    Code example:

    def writer(rank, shared_memory):
        index = 0
        while True:
            byte_array = struct.pack("i", index)
            # Set "Locked" flag for this index
            shared_memory.buf[rank * 4 + 3] = 0x80
            shared_memory.buf[rank * 4: rank * 4 + 4] = byte_array
            index += 1
            time.sleep(random.random())
    
    def reader(shared_memory, size):
        while True:
            indexes = struct.unpack(f"{size}i", shared_memory.buf[:size*4])
            # "Optimistic" locking: assumes that indexes are not locked most of the time.
            if any((index & 0x80000000) for index in indexes):
                print("Read while locked, retry.")
                continue
            print(f"Read indexes: {indexes}")
            time.sleep(1)
    
    Login or Signup to reply.
  2. Added a couple of log lines to assist in ‘debugging’

    import time
    import datetime
    import random
    import struct
    import multiprocessing
    from multiprocessing import shared_memory
    
    def writer(sz, rank, shared_memory):
        index = 0
        while True:
            byte_array = struct.pack("i", index)
            shared_memory.buf[rank * 4: rank * 4 + 4] = byte_array
            print( f'WRITER:{datetime.datetime.now()} rank:{rank} {struct.unpack(f"{sz}i", shared_memory.buf[:sz*4])}' )
            index += 1
            time.sleep(random.random())
    
    def reader(shared_memory, size):
        while True:
            indexes = struct.unpack(f"{size}i", shared_memory.buf[:size*4])
            print(f"READER:{datetime.datetime.now()} size:{size} Read indexes: {indexes}")
            time.sleep(1)
    
    if __name__ == "__main__":
        size = 10
        shared_memory = shared_memory.SharedMemory(
                create=True, size= 4 * size
            )
    
        ctx = multiprocessing.get_context("spawn")
    
        processes = []
        for i in range(size):
            p = ctx.Process(target=writer, args=(size, i, shared_memory))
            processes.append(p)
            p.start()
    
        reader_process = multiprocessing.Process(target=reader, args=(shared_memory, size))
        reader_process.start()
    
        for p in processes:
            p.join()
        reader_process.join()
    

    redirect the output to a file when testing, you’ll also do well to sort the log by time sort -k2 should be enough, that hopefully will show/highlight any issues. I think the 1second sleep it probably a partial reason why I didn’t find any noticeable corruption ( i ran it for ~30mins ).
    still, doing ‘proper synchronisation’ is the way to go.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search