skip to Main Content

I am trying to create a celery chain here:

chain(getAllProducts.s(shopname, hdrs),
    editOgTags.s(title, description, whichImage, readableShopname, currentThemeId),
    notifyBulkEditFinish.si(email, name, readableShopname, totalProducts),
    updateBulkEditTask.si(taskID))()

In editOgTags, there are 3 subtasks:

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId):
    for product in products:
        editOgTitle.delay(product, title, readableShopname)
        editOgDescription.delay(product, description, readableShopname)
        editOgImage.delay(product, int(whichImage), currentThemeId)

In each editOgXXX function, there is a function to be called with rate limit:

@shared_task(rate_limit='1/s')
def updateMetafield(index, loop_var, target_id, type_value):
    resource = type_value + 's'
    # print(f"loop_var key = {loop_var[index]['key']}")
    if type_value == 'product' or type_value == 'collection' or type_value == 'article' or type_value == 'page':
        meta = shopify.Metafield.find(resource=resource, resource_id=target_id, namespace='global', key=loop_var[index]['key'])
        checkAndWaitShopifyAPICallLimit()
    else:
        print("Not available metafield type! Cannot update.")
        return

    if meta:
        # meta[0].destroy()
        meta[0].value = loop_var[index]['value']
        meta[0].save()
    else:
        metafield = shopify.Metafield.create({
            'value_type': 'string',
            'namespace': 'global',
            'value': loop_var[index]['value'],
            'value-type': 'string',
            'key': loop_var[index]['key'],
            'resource': resource,
            'resource_id': target_id,
            })
        metafield.save()

Under leaky bucket algo, it provides 40 api calls at once, 2 reqs / s replenishment. Since there is a rate limit of 2 requests / s for shopify function. I set the rate limit to 1/s. When it uses up the api quota, I will call time.sleep(20) to wait for replenishment in checkAndWaitShopifyAPICallLimit().

The question is the email notification function (notifyBulkEditFinish) is called before all tasks are finished. How can I make sure the email function is called after all tasks finish?

I suspect the sleep function make the task fall behind the email function in the queue.

2

Answers


  1. To expand @bruno’s comment: use chord and modify the editOgTags function to create a group that chords to the notification:

    from celery import chord
    
    @shared_task(ignore_result=True)
    def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId, name, email, totalProducts):
        tasks = []
        for product in products:
            tasks.append(editOgTitle.si(product, title, readableShopname))
            tasks.append(editOgDescription.si(product, description, readableShopname))
            tasks.append(editOgImage.si(product, int(whichImage), currentThemeId))
        # kick off the chord, notifyBulk... will be called after all of these 
        # edit... tasks complete.
        chord(tasks)(notifyBulkEditFinish.si(email, name, readableShopname, totalProducts))
    
    Login or Signup to reply.
  2. Your problem is with the definition of “after all tasks finish”.

    that editOgTags launches len(products) * 3 subtasks – which apparently each launch yet another async substack. If you want to wait until all those tasks have been executed before sending the email, you need some synchronization mechanism. Celery’s builtin solution for this is the chord object. ATM, your code wait for editOgTags to be finished, but the only thing this task do is to launch other subtasks – then it returns, whether or not those subtasks are themselves finished.

    A chord is just like a group but with a callback. The chain primitive lets us link together signatures so that one is called after the other, essentially forming a chain of callbacks. What is the difference to change the chain to chord?

    Note that I’m not saying you should necessarily replace your whole chain with a chord. Hint: chains and groups and chords are tasks to, so you can create complex workflows by combining tasks, chains, groups and chords.

    And the difference, as mentionned above, is that a chord will wait until all tasks in it’s header are finished before it executes the callback. This allow to have N async tasks executed in parallel yet still wait for all of them to be finished before running the callback. This will of course require some thinking and possibly reorganisation in your code (so sub-sub-tasks are taken into account if they need to be), but this does answer your question: “How can I make sure the email function is called after all tasks finish? “

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search