skip to Main Content

I’ve been using josiahcarlson’s Redis Object Mapper (ROM) to work with my data in Redis. I need certain operations to be done as transactions, i.e. either all writes are done or none are done (auto rollback in case of any error or shutdown).

From what I could find online, Josiah had a conversation with the maintainer of Redis, who didn’t agree to accept the PR for transactions back in 2015. What is the situation now? I couldn’t find anything about this in the docs nor in Google.

How do I implement atomicity for my operations with ROM? Is there a way?

2

Answers


  1. There are multiple functions in ROM which uses atomicity. Here is a link with the documentation of all the functions in ROM:

    https://readthedocs.org/projects/redis-py/downloads/pdf/latest/

    As seen, functions such as:

    getset()
    pipeline()
    rpoplpush()
    smove()
    

    All allow you to implement atomicity in the Redits Object Mapper.

    Hope this helps!

    Login or Signup to reply.
  2. It was not previously built into rom (the earlier version). If you want to go fast and loose, we’ll keep a temp transaction id in a common place (which you can verify afterwards, if you need to). If you want to go official, you will want a transaction history / ledger, which will take a little more work (you can use the ledger model for storing value, then mark it as done as part of the transaction).

    If you want to use built-in functionality plus your own model data to transfer, you can use something like we have in the tests: https://github.com/josiahcarlson/rom/blob/1.0.0/test/test_rom.py#L1759

    The function below will transfer a value, and refresh your local models. Returns (successful_transfer, optional_message).

    def transfer(source, dest, column, value, txn_set, txn_id, timeout=1, column_indexed=False):
        source_key = source._pk
        dest_key = dest._pk
        # assume source and dest have same connection
        c = source._connection.pipeline(True)
    
        assert value > 0, value
    
        end = time.time() + timeout
        while time.time() < end:
            c.watch(source_key, dest_key, txn_set)
            if c.sismember(txn_set, txn_id):
                return False, "already sent"
    
            v = type(value)(c.hget(source, column) or 0)
            if v < value:
                c.discard()
                return False, "not enough credit"
    
            try:
                c.multi()
                c.hincrby(source_key, column, -value)
                c.hincrby(dest_key, column, value)
                c.sadd(txn_set, txn_id)
                r = c.execute()
                assert not r[-1], r
                source.refresh()
                dest.refresh()
                bd = []
                if column_indexed:
                    try:
                        source.save(full=True)
                    except:
                        bd.append("source index update failed")
                    try:
                        dest.save(full=True)
                    except:
                        bd.append("dest index update failed")
                if bd:
                    bd.append("value transferred")
                
                return True, ", ".join(bd)
    
            except redis.exceptions.WatchError:
                continue
    
        return False, "timed out from data race condition"
    
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search