skip to Main Content

I am creating/updating a set in dynamodb with multiple threads. This is the code I am using

# sends a request to add or update
def update(key, value_to_be_added_to_set):
    # creates a key and add value to the mySet column if it doesn't exist
    # else it will just add value to mySet
    response = table.update_item(
        Key={
            'key_name': key
        },
        UpdateExpression='ADD mySet :val',
        ExpressionAttributeValues={
            ':val': {value_to_be_added_to_set}
        },
        ReturnConsumedCapacity='INDEXES'
    )
    return response

I couldn’t find anything in AWS documentation as to whether this operation guarantees thread safety. That is if I add [value=1] and add [value=2] to a set, the result should always be value={1,2}.

So I wrote this script to test it.

import threading
from random import randrange

import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-test')
key = f'test{randrange(1, 1000000000)}'
max_concurrency = 50
multiplier = 10

# sends a request to add or update
def update(key, value_to_be_added_to_set):
    # this call will create a key and add value to the mySet column if it doesn't exist
    # else it will add value to mySet
    response = table.update_item(
        Key={
            'key_name': key
        },
        UpdateExpression='ADD mySet :val',
        ExpressionAttributeValues={
            ':val': {value_to_be_added_to_set}
        },
        ReturnConsumedCapacity='INDEXES'
    )
    return response


# this method will be called by every thread
# every thread receives a unique number start from 1 to 50
def create_or_update_values(num):
    start = num * multiplier
    # if the thread receives 0, it will add the add values to the set from 1 to 10
    # similarly thread 2 will add values from 11 to 20
    # ..
    # thread 49 will add values from 491 to 500
    for i in range(start + 1, start + multiplier + 1):
        resp = update(key, i)
        print(f"Thread {i} has finished")


threads = []

# spin up threads
for i in range(0, max_concurrency):
    t = threading.Thread(target=create_or_update_values, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("All threads have finished.")

# get mySet, convert it to list and sort it
l = list(table.get_item(Key={
    'key_name': key
}, )['Item']['mySet'])
l.sort()

# verify if list contains values from 1 to 500
for i in range(1, max_concurrency * multiplier + 1):
    assert int(l[i]) == i

This test passes every time it is run.

Assuming I update 50 identical keys at the same time, can I safely assume the thread safety here?

2

Answers


  1. Yes, individual item updates are serialized.

    Login or Signup to reply.
  2. DynamoDB Architecture

    DynamoDB stores items in partitions, which are located on servers known as storage nodes.

    DynamoDB follows a leader/follower architecture in which all writes (and strongly consistent reads) are served by the leader node for that partition group.

    Serialized Writes

    All writes are serialized by the leader node, meaning all updates will happen in order as they are received by the node. The changes are then replicated to the follower nodes in an eventually consistent manner.

    Serializable isolation ensures that the results of multiple concurrent operations are the same as if no operation begins until the previous one has finished. src


    For more information on DynamoDB Architecture, please refer to this YouTube Video

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search