skip to Main Content

My celery progress always crash in 00:00:00.And I don’t know why..I don’t define an app called LogAnalysis and I never define a method called reindex_log.But The log told me can’t not find that method.

Error Log:
[2019-07-29 23:50:02,949: INFO/ForkPoolWorker-5] changed: [192.168.4.233]
[2019-07-29 23:50:02,982: INFO/ForkPoolWorker-5] Task automation.tasks.run_ansible[3973e84f-046d-4428-b788-de222e747645] succeeded in 2.9768258426338434s: '获取docker-1564415400006:结束执行任务'
[2019-07-29 23:52:55,232: INFO/Beat] Writing entries...
[2019-07-29 23:55:55,480: INFO/Beat] Writing entries...
[2019-07-29 23:58:55,727: INFO/Beat] Writing entries...
[2019-07-30 00:00:00,000: INFO/Beat] Scheduler: Sending due task 10000047 (QualityControl.tasks.run_quality_control)
[2019-07-30 00:00:00,003: INFO/MainProcess] Received task: QualityControl.tasks.run_quality_control[a5a7bfa8-5f8b-48a1-82e1-4bc9f3bcb4b2]  
[2019-07-30 00:00:10,506: INFO/ForkPoolWorker-4] Task QualityControl.tasks.run_quality_control[a5a7bfa8-5f8b-48a1-82e1-4bc9f3bcb4b2] succeeded in 10.502024855464697s: None
[2019-07-30 00:02:00,162: INFO/Beat] Writing entries...
[2019-07-30 00:05:00,415: INFO/Beat] Writing entries...
[2019-07-30 00:08:00,655: INFO/Beat] Writing entries...
[2019-07-30 00:11:00,895: INFO/Beat] Writing entries...
[2019-07-30 00:14:01,129: INFO/Beat] Writing entries...
[2019-07-30 00:17:01,362: INFO/Beat] Writing entries...
[2019-07-30 00:19:00,001: INFO/Beat] Scheduler: Sending due task reindex_log (LogAnalysis.tasks.reindex_log)
[2019-07-30 00:19:00,003: ERROR/MainProcess] Received unregistered task of type 'LogAnalysis.tasks.reindex_log'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[["01_01_00001-*", "safe_log-analysis"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (113b)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'LogAnalysis.tasks.reindex_log'
[2019-07-30 00:19:00,005: CRITICAL/MainProcess] Unrecoverable error: InterfaceError("(0, '')",)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'LogAnalysis.tasks.reindex_log'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 71, in execute
    return self.cursor.execute(query, args)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._execute_command(COMMAND.COM_QUERY, sql)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 750, in _execute_command
    raise err.InterfaceError("(0, '')")
pymysql.err.InterfaceError: (0, '')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/usr/local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 1052, in on_readable
    self.cycle.on_readable(fileno)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 348, in on_readable
    chan.handlers[type]()
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 736, in _brpop_read
    self.connection._deliver(loads(bytes_to_str(item)), dest)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 983, in _deliver
    callback(message)
  File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
    return callback(message)
  File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
    return on_unknown_task(None, message, exc)
  File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 510, in on_unknown_task
    id_, NotRegistered(name), request=request,
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 160, in mark_as_failure
    traceback=traceback, request=request)
  File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 342, in store_result
    request=request, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/django_celery_results/backends/database.py", line 35, in _store_result
    task_kwargs=task_kwargs,
  File "/usr/local/lib/python3.6/site-packages/django_celery_results/managers.py", line 50, in _inner
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/django_celery_results/managers.py", line 126, in store_result
    obj, created = self.get_or_create(task_id=task_id, defaults=fields)
  File "/usr/local/lib/python3.6/site-packages/django/db/models/manager.py", line 82, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 486, in get_or_create
    return self.get(**lookup), False
  File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 393, in get
    num = len(clone)
  File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 250, in __len__
    self._fetch_all()
  File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 1186, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 54, in __iter__
    results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
  File "/usr/local/lib/python3.6/site-packages/django/db/models/sql/compiler.py", line 1065, in execute_sql
    cursor.execute(sql, params)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 100, in execute
    return super().execute(sql, params)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 68, in execute
    return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 77, in _execute_with_wrappers
    return executor(sql, params, many, context)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
  File "/usr/local/lib/python3.6/site-packages/django/db/utils.py", line 89, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
  File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 71, in execute
    return self.cursor.execute(query, args)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
    self._execute_command(COMMAND.COM_QUERY, sql)
  File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 750, in _execute_command
    raise err.InterfaceError("(0, '')")
django.db.utils.InterfaceError: (0, '')
[2019-07-30 00:19:01,025: INFO/MainProcess] beat: Shutting down...
[2019-07-30 00:19:01,025: INFO/Beat] Writing entries...
[2019-07-30 00:19:01,069: INFO/Beat] Writing entries...

 -------------- celery@form-yunwei-02 v4.3.0 (rhubarb)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-957.5.1.el7.x86_64-x86_64-with-centos-7.6.1810-Core 2019-07-29 11:11:42
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         AutomatedOperation:0x7f00d35cae10
- ** ---------- .> transport:   redis://:**@192.168.4.87:6688/2
- ** ---------- .> results:     
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . AutomatedOperation.celery.debug_task
  . QualityControl.tasks.run_quality_control
  . automation.tasks.run_ansible
  . automation.tasks.run_ansible_schedule
  . automation.tasks.run_playbook

My celey.py file:

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AutomatedOperation.settings')
from dotenv import load_dotenv
load_dotenv()

redis_host = os.getenv('REDIS_HOST')
redis_port = os.getenv('REDIS_PORT')
redis_password = os.getenv('REDIS_PASSWORD')
redis_broker_library = os.getenv('REDIS_BROKER_LIBRARY')

app = Celery('AutomatedOperation',backend='redis',broker="redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/'+redis_broker_library)

#once配置
app.conf.ONCE = {
    'backend': 'celery_once.backends.Redis',
    'settings': {
        'url': "redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/5',
        'default_timeout': 60 * 60
      }
}


#可以让你在django的settings.py配置celery
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    result_expires=18000,
)

#自动在注册app中寻找tasks.py,所以你的tasks.py必须放在各个app的目录下并且不能随意命名
app.autodiscover_tasks(['automation','QualityControl'])

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

My two app named automation and QualityControl defined tasks.py:
automation.tasks

@shared_task(base=QueueOnce,once={'graceful': True})
@timeout_decorator.timeout(600)
def run_ansible(multiple_server,operation_id,script_content,task_name,unique_tag,task_id=None,task_operator='root'):

QualityControl.tasks

@shared_task(base=QueueOnce,once={'graceful': True})
def run_quality_control(taskID,timingTaskName,timingSceneList,operator):

celery in the django settings file:

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案


CELERY_BROKER_URL = "redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/'+redis_broker_library # Broker配置,使用Redis作为消息中间件


# CELERY_RESULT_BACKEND = 'redis://'+redis_host+':'+ redis_port +'/4' # BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_TASK_RESULT_EXPIRES = 300  #任务执行超时时间
CELERYD_CONCURRENCY = 50   #  celery worker的并发数
CELERYD_PREFETCH_MULTIPLIER = 8  # 每次从任务队列取的数目
CELERYD_MAX_TASKS_PER_CHILD = 2000  #每个worker 执行了多少任务就会挂
CELERY_DEFAULT_QUEUE = 'default_dongwm' #默认的队列,如果一个消息不符合其它的队列就会放在默认队列里面

#celery beat 配置
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

2

Answers


  1. Your code tries to execute the LogAnalysis.tasks.reindex_log task that is not yet registered. You probably forgot to deploy a new code to your Celery server(s).

    These are registered tasks (from the log you pasted):

    . AutomatedOperation.celery.debug_task
    . QualityControl.tasks.run_quality_control
    . automation.tasks.run_ansible
    . automation.tasks.run_ansible_schedule
    . automation.tasks.run_playbook
    

    As you can see, no LogAnalysis.tasks.reindex_log there.

    Login or Signup to reply.
  2. I’m late to the party, but this can happen when there are instances of processes running old/different code that you may not be aware of.

    There are two ways this can be really confusing:

    1. when you place something on the queue and some mystery celery worker tries to process it rather than the process you expect.
    2. or when your celery process is finding strange things on the queue it from some unknown process placing things on the queue.

    For case 1, run ps A | grep celery and you may find there are processes trying to process something from the queue, but they don’t have the method you are calling, and hence you get the NotRegistered error.

    {"status": "FAILURE", "result": {"exc_type": "NotRegistered", "exc_message": ["my.thing.execute_task"], "exc_module": "celery.exceptions"}

    Another strange indicator is when you add a new parameter to a method and you may sporadically get a message like:

    TypeError: execute_task() takes 6 positional arguments but 7 were given

    If sometimes it works, and sometimes it doesn’t, you likely have an old instance of your code grabbing items off your queue and failing.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search