skip to Main Content

I use celery tasks with external modules that send information with print() to stdout. How to save all prints of celery task to predefined file? Ideally, if each task has its own separate file.

Example main.py for running redis on localhost:6379:

import time

from celery import Celery

def long_run_func():
    print('>>> Start running long_run_func()')
    time.sleep(5)
    print('>>> End running long_run_func()')

celery = Celery('celery_task', broker='redis://localhost:6379')

@celery.task(name="long_run_celery_task")
def long_run_celery_task():
    long_run_func()

long_run_celery_task.delay()

Now run Celery worker:

celery -A main:celery worker --loglevel=INFO

And if you will run main.py you can see in celery lines from long_run_func() prints:

[2024-01-11 17:30:52,746: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
[2024-01-11 17:30:57,751: WARNING/ForkPoolWorker-7] >>> End running long_run_func()

It possible to setup @celery.task to dump all this logs to some string var or file? I mean, if I have several such tasks and they run at the same time, then be able to separate these outputs depending on the task, and not mix everything into a single log.

2

Answers


    1. It possible to setup @celery.task to dump all this logs to some string var or file?

    • you can use -f --logfile command option to dump all logs
      to the file you want.
    • example:
    command:
    celery -A main:celery worker --loglevel=INFO -f test.log
    
    test.log:
    [2024-01-13 09:56:04,119: INFO/MainProcess] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] received
    [2024-01-13 09:56:04,121: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
    [2024-01-13 09:56:09,123: WARNING/ForkPoolWorker-7] >>> End running long_run_func()
    [2024-01-13 09:56:09,127: INFO/ForkPoolWorker-7] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] succeeded in 5.006108791014412s: None
    

    1. if I have several such tasks and they run at the same time, then be able to separate these outputs depending on the task, and not mix everything into a single log.

    simple but dirty way

    • if you only have some tasks to run, just write some python file and than run them in different command.
    • example:
    celery -A main1:celery worker --loglevel=INFO -f test1.log
    celery -A main2:celery worker --loglevel=INFO -f test2.log
    celery -A main3:celery worker --loglevel=INFO -f test3.log
    

    better way

    • set logger by name and set handler
    • example:
    import time
    import logging
    import os
    from celery import Celery
    
    def long_run_func():
        l = custom_logger("long")
        l.info('>>> Start running long_run_func()')
        time.sleep(5)
        l.info('>>> End running long_run_func()')
    
    def short_run_func():
        l = custom_logger("short")
        l.info('>>> Start running short_run_func()')
        time.sleep(3)
        l.info('>>> End running short_run_func()')
    
    celery = Celery(__name__,broker='redis://localhost:6379')
    
    @celery.task(name="long_run_celery_task")
    def long_run_celery_task():
        long_run_func()
    
    @celery.task(name="short_run_celery_task")
    def short_run_celery_task():
        short_run_func()
    
    def custom_logger(name):
        logger = logging.getLogger(name)
        logger.setLevel(logging.INFO)
        handler = logging.FileHandler(os.path.join(name + '.log'), 'w')
        logger.addHandler(handler)
        return logger
    
    long_run_celery_task.delay()
    short_run_celery_task.delay()
    
    • result:
    long.log
    >>> Start running long_run_func()
    >>> End running long_run_func()
    short.log
    >>> Start running short_run_func()
    >>> End running short_run_func()
    
    • if you want to use the same format as print, you can use:
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') #custom formatter
    handler.setFormatter(formatter)
    
    Login or Signup to reply.
  1. All you have to do is writing a logger class and redirecting the std out and error file descriptors to that class. I am using pandas lib for time so I recommend it.
    code as follows ->

    import sys
    import pandas as pd
    
    class Logger:
        def __init__(self, pipe, suffix: str, interval: str):
            self.terminal = pipe
            self.file_suffix = suffix
            self.interval = interval
            self.log_time = pd.Timestamp.now().floor(self.interval)
            self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")
    
        def write(self, message):
            self.terminal.write(message)
            self.terminal.flush()
            if pd.Timestamp.now().floor(self.interval) > self.log_time:
                self.log_time = pd.Timestamp.now().floor(self.interval)
                self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")
                self.log_file.write(message)
                self.log_file.flush()
            else:
                self.log_file.write(message)
                self.log_file.flush()
    
        def flush(self):
            pass
    
    
    sys.stdout = Logger(sys.stdout, "_system_stdout.log", "15min")
    sys.stderr = Logger(sys.stderr, "_system_stderr.log", "15min")
    
    print("test")
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search