I am trying to create a celery chain here:
chain(getAllProducts.s(shopname, hdrs),
editOgTags.s(title, description, whichImage, readableShopname, currentThemeId),
notifyBulkEditFinish.si(email, name, readableShopname, totalProducts),
updateBulkEditTask.si(taskID))()
In editOgTags, there are 3 subtasks:
@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId):
for product in products:
editOgTitle.delay(product, title, readableShopname)
editOgDescription.delay(product, description, readableShopname)
editOgImage.delay(product, int(whichImage), currentThemeId)
In each editOgXXX function, there is a function to be called with rate limit:
@shared_task(rate_limit='1/s')
def updateMetafield(index, loop_var, target_id, type_value):
resource = type_value + 's'
# print(f"loop_var key = {loop_var[index]['key']}")
if type_value == 'product' or type_value == 'collection' or type_value == 'article' or type_value == 'page':
meta = shopify.Metafield.find(resource=resource, resource_id=target_id, namespace='global', key=loop_var[index]['key'])
checkAndWaitShopifyAPICallLimit()
else:
print("Not available metafield type! Cannot update.")
return
if meta:
# meta[0].destroy()
meta[0].value = loop_var[index]['value']
meta[0].save()
else:
metafield = shopify.Metafield.create({
'value_type': 'string',
'namespace': 'global',
'value': loop_var[index]['value'],
'value-type': 'string',
'key': loop_var[index]['key'],
'resource': resource,
'resource_id': target_id,
})
metafield.save()
Under leaky bucket algo, it provides 40 api calls at once, 2 reqs / s replenishment. Since there is a rate limit of 2 requests / s for shopify function. I set the rate limit to 1/s. When it uses up the api quota, I will call time.sleep(20) to wait for replenishment in checkAndWaitShopifyAPICallLimit().
The question is the email notification function (notifyBulkEditFinish) is called before all tasks are finished. How can I make sure the email function is called after all tasks finish?
I suspect the sleep function make the task fall behind the email function in the queue.
2
Answers
To expand @bruno’s comment: use
chord
and modify theeditOgTags
function to create a group that chords to the notification:Your problem is with the definition of “after all tasks finish”.
that
editOgTags
launcheslen(products) * 3
subtasks – which apparently each launch yet another async substack. If you want to wait until all those tasks have been executed before sending the email, you need some synchronization mechanism. Celery’s builtin solution for this is thechord
object. ATM, your code wait foreditOgTags
to be finished, but the only thing this task do is to launch other subtasks – then it returns, whether or not those subtasks are themselves finished.Note that I’m not saying you should necessarily replace your whole
chain
with achord
. Hint: chains and groups and chords are tasks to, so you can create complex workflows by combining tasks, chains, groups and chords.And the difference, as mentionned above, is that a
chord
will wait until all tasks in it’s header are finished before it executes the callback. This allow to have N async tasks executed in parallel yet still wait for all of them to be finished before running the callback. This will of course require some thinking and possibly reorganisation in your code (so sub-sub-tasks are taken into account if they need to be), but this does answer your question: “How can I make sure the email function is called after all tasks finish? “