My celery progress always crash in 00:00:00.And I don’t know why..I don’t define an app called LogAnalysis and I never define a method called reindex_log.But The log told me can’t not find that method.
Error Log:
[2019-07-29 23:50:02,949: INFO/ForkPoolWorker-5] changed: [192.168.4.233]
[2019-07-29 23:50:02,982: INFO/ForkPoolWorker-5] Task automation.tasks.run_ansible[3973e84f-046d-4428-b788-de222e747645] succeeded in 2.9768258426338434s: '获取docker-1564415400006:结束执行任务'
[2019-07-29 23:52:55,232: INFO/Beat] Writing entries...
[2019-07-29 23:55:55,480: INFO/Beat] Writing entries...
[2019-07-29 23:58:55,727: INFO/Beat] Writing entries...
[2019-07-30 00:00:00,000: INFO/Beat] Scheduler: Sending due task 10000047 (QualityControl.tasks.run_quality_control)
[2019-07-30 00:00:00,003: INFO/MainProcess] Received task: QualityControl.tasks.run_quality_control[a5a7bfa8-5f8b-48a1-82e1-4bc9f3bcb4b2]
[2019-07-30 00:00:10,506: INFO/ForkPoolWorker-4] Task QualityControl.tasks.run_quality_control[a5a7bfa8-5f8b-48a1-82e1-4bc9f3bcb4b2] succeeded in 10.502024855464697s: None
[2019-07-30 00:02:00,162: INFO/Beat] Writing entries...
[2019-07-30 00:05:00,415: INFO/Beat] Writing entries...
[2019-07-30 00:08:00,655: INFO/Beat] Writing entries...
[2019-07-30 00:11:00,895: INFO/Beat] Writing entries...
[2019-07-30 00:14:01,129: INFO/Beat] Writing entries...
[2019-07-30 00:17:01,362: INFO/Beat] Writing entries...
[2019-07-30 00:19:00,001: INFO/Beat] Scheduler: Sending due task reindex_log (LogAnalysis.tasks.reindex_log)
[2019-07-30 00:19:00,003: ERROR/MainProcess] Received unregistered task of type 'LogAnalysis.tasks.reindex_log'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[["01_01_00001-*", "safe_log-analysis"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (113b)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: 'LogAnalysis.tasks.reindex_log'
[2019-07-30 00:19:00,005: CRITICAL/MainProcess] Unrecoverable error: InterfaceError("(0, '')",)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: 'LogAnalysis.tasks.reindex_log'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
return self.cursor.execute(sql, params)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 71, in execute
return self.cursor.execute(query, args)
File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 170, in execute
result = self._query(query)
File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
conn.query(q)
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 750, in _execute_command
raise err.InterfaceError("(0, '')")
pymysql.err.InterfaceError: (0, '')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/usr/local/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/usr/local/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
cb(*cbargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 1052, in on_readable
self.cycle.on_readable(fileno)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 348, in on_readable
chan.handlers[type]()
File "/usr/local/lib/python3.6/site-packages/kombu/transport/redis.py", line 736, in _brpop_read
self.connection._deliver(loads(bytes_to_str(item)), dest)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 983, in _deliver
callback(message)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 633, in _callback
return callback(message)
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
return on_unknown_task(None, message, exc)
File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 510, in on_unknown_task
id_, NotRegistered(name), request=request,
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 160, in mark_as_failure
traceback=traceback, request=request)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 342, in store_result
request=request, **kwargs)
File "/usr/local/lib/python3.6/site-packages/django_celery_results/backends/database.py", line 35, in _store_result
task_kwargs=task_kwargs,
File "/usr/local/lib/python3.6/site-packages/django_celery_results/managers.py", line 50, in _inner
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/django_celery_results/managers.py", line 126, in store_result
obj, created = self.get_or_create(task_id=task_id, defaults=fields)
File "/usr/local/lib/python3.6/site-packages/django/db/models/manager.py", line 82, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 486, in get_or_create
return self.get(**lookup), False
File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 393, in get
num = len(clone)
File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 250, in __len__
self._fetch_all()
File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 1186, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "/usr/local/lib/python3.6/site-packages/django/db/models/query.py", line 54, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "/usr/local/lib/python3.6/site-packages/django/db/models/sql/compiler.py", line 1065, in execute_sql
cursor.execute(sql, params)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 100, in execute
return super().execute(sql, params)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 68, in execute
return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 77, in _execute_with_wrappers
return executor(sql, params, many, context)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
return self.cursor.execute(sql, params)
File "/usr/local/lib/python3.6/site-packages/django/db/utils.py", line 89, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "/usr/local/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
return self.cursor.execute(sql, params)
File "/usr/local/lib/python3.6/site-packages/django/db/backends/mysql/base.py", line 71, in execute
return self.cursor.execute(query, args)
File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 170, in execute
result = self._query(query)
File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
conn.query(q)
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 750, in _execute_command
raise err.InterfaceError("(0, '')")
django.db.utils.InterfaceError: (0, '')
[2019-07-30 00:19:01,025: INFO/MainProcess] beat: Shutting down...
[2019-07-30 00:19:01,025: INFO/Beat] Writing entries...
[2019-07-30 00:19:01,069: INFO/Beat] Writing entries...
-------------- celery@form-yunwei-02 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-3.10.0-957.5.1.el7.x86_64-x86_64-with-centos-7.6.1810-Core 2019-07-29 11:11:42
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: AutomatedOperation:0x7f00d35cae10
- ** ---------- .> transport: redis://:**@192.168.4.87:6688/2
- ** ---------- .> results:
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. AutomatedOperation.celery.debug_task
. QualityControl.tasks.run_quality_control
. automation.tasks.run_ansible
. automation.tasks.run_ansible_schedule
. automation.tasks.run_playbook
My celey.py file:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AutomatedOperation.settings')
from dotenv import load_dotenv
load_dotenv()
redis_host = os.getenv('REDIS_HOST')
redis_port = os.getenv('REDIS_PORT')
redis_password = os.getenv('REDIS_PASSWORD')
redis_broker_library = os.getenv('REDIS_BROKER_LIBRARY')
app = Celery('AutomatedOperation',backend='redis',broker="redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/'+redis_broker_library)
#once配置
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': "redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/5',
'default_timeout': 60 * 60
}
}
#可以让你在django的settings.py配置celery
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
result_expires=18000,
)
#自动在注册app中寻找tasks.py,所以你的tasks.py必须放在各个app的目录下并且不能随意命名
app.autodiscover_tasks(['automation','QualityControl'])
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
My two app named automation and QualityControl defined tasks.py:
automation.tasks
@shared_task(base=QueueOnce,once={'graceful': True})
@timeout_decorator.timeout(600)
def run_ansible(multiple_server,operation_id,script_content,task_name,unique_tag,task_id=None,task_operator='root'):
QualityControl.tasks
@shared_task(base=QueueOnce,once={'graceful': True})
def run_quality_control(taskID,timingTaskName,timingSceneList,operator):
celery in the django settings file:
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_BROKER_URL = "redis://:"+redis_password+"@"+redis_host+':'+redis_port+'/'+redis_broker_library # Broker配置,使用Redis作为消息中间件
# CELERY_RESULT_BACKEND = 'redis://'+redis_host+':'+ redis_port +'/4' # BACKEND配置,这里使用redis
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TASK_RESULT_EXPIRES = 300 #任务执行超时时间
CELERYD_CONCURRENCY = 50 # celery worker的并发数
CELERYD_PREFETCH_MULTIPLIER = 8 # 每次从任务队列取的数目
CELERYD_MAX_TASKS_PER_CHILD = 2000 #每个worker 执行了多少任务就会挂
CELERY_DEFAULT_QUEUE = 'default_dongwm' #默认的队列,如果一个消息不符合其它的队列就会放在默认队列里面
#celery beat 配置
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
2
Answers
Your code tries to execute the
LogAnalysis.tasks.reindex_log
task that is not yet registered. You probably forgot to deploy a new code to your Celery server(s).These are registered tasks (from the log you pasted):
As you can see, no
LogAnalysis.tasks.reindex_log
there.I’m late to the party, but this can happen when there are instances of processes running old/different code that you may not be aware of.
There are two ways this can be really confusing:
For case 1, run
ps A | grep celery
and you may find there are processes trying to process something from the queue, but they don’t have the method you are calling, and hence you get the NotRegistered error.{"status": "FAILURE", "result": {"exc_type": "NotRegistered", "exc_message": ["my.thing.execute_task"], "exc_module": "celery.exceptions"}
Another strange indicator is when you add a new parameter to a method and you may sporadically get a message like:
TypeError: execute_task() takes 6 positional arguments but 7 were given
If sometimes it works, and sometimes it doesn’t, you likely have an old instance of your code grabbing items off your queue and failing.