skip to Main Content

I am trying to use Celery with RabbitMQ and Eventlet to handle asynchronous tasks in my Python application. One of my tasks involves uploading files to an S3 bucket using the Boto3 client. However, I am encountering an issue where the task fails with the error message: "Endpoint Discovery failed to refresh the required endpoints."

My Setup:

  • Celery: 5.2.7
  • Eventlet: 0.31.1
  • Boto3: 1.24.13
  • RabbitMQ: 3.8.9

Task Definition (tasks.py):

import eventlet
eventlet.monkey_patch()

from celery import Celery

app = Celery(
    "myapp", broker="pyamqp://guest@localhost//", backend="redis://127.0.0.1:6379//0"
)

@app.task
def extract_performance_data():
    TimestreamDbInitializer()

class TimestreamDbInitializer:

    # Initialize the Timestream Write client
    timestream_write = boto3.client(
        TIMESTREAM_WRITE, region_name=AWS_REGION
    )

    def __init__(self):

        self.memory_store_retention_period_in_hours = (
            MEMORY_STORE_RETENTION_PERIOD_IN_HOURS_VALUE
        )
        self.magnetic_store_retention_period_in_days = (
            MAGNETIC_STORE_RETENTION_PERIOD_IN_DAYS_VALUE
        )

        self.create_db()

    def create_db(self):
        """
        Create the database if it does not exist
        """
        try:
            response = self.timestream_write.create_database(DatabaseName=DATABASE_NAME)
            logger.debug("DB created => %s", response)
        except self.timestream_write.exceptions.ConflictException:
            logger.debug("Database '%s' already exists.", DATABASE_NAME)
        except Exception as err:
            logger.error("create_db Error:", err)

Running the Celery Worker:
I start the Celery worker using the following command:

celery -A tasks worker -P eventlet --loglevel=info

Test Script (test_task.py):

from tasks import extract_performance_data

if __name__ == "__main__":
    extract_performance_data.delay()

Error:
When I run the test script, the Celery worker logs the following error:

An error occurred: Endpoint Discovery failed to refresh the required
endpoints

Things I’ve Tried:

  1. Verified that RabbitMQ is running and accessible.
  2. Ensured AWS credentials are correctly configured and available.
  3. Set up logging to see detailed error messages.
  4. Verified network connectivity and ensured there are no firewall issues.
  5. Ensured Eventlet is monkey-patching properly by placing it at the top of the script.
  6. Despite these efforts, I am still encountering the same issue. I suspect it might be related to Eventlet’s compatibility with Boto3 or some configuration I am missing.

Questions:

  1. Has anyone encountered a similar issue with Celery, Eventlet, and Boto3?
  2. Are there any specific configurations needed to make Boto3 work with Eventlet?
  3. Is there an alternative approach to handle asynchronous tasks with Celery and Boto3 that might avoid this issue?
  4. Any insights or suggestions would be greatly appreciated.

2

Answers


  1. Chosen as BEST ANSWER

    So, it seems like boto3 does not work with eventlet. I ended up using gevent and it worked.

    You can install gevent using

    pip install gevent
    

    Then use celery pooling with gevent

    celery -A tasks worker -P eventlet --loglevel=info
    

  2. One thing to look at is what are the permissions to the IAM role that is associated with when executing the code. Since I see you are trying to use timestream-write that certainly would need the following permission:

    timestream:describeEndpoints

    just in case its not the boto3 endpoint but the timestream endpoint

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search