I am trying to use Celery with RabbitMQ and Eventlet to handle asynchronous tasks in my Python application. One of my tasks involves uploading files to an S3 bucket using the Boto3 client. However, I am encountering an issue where the task fails with the error message: "Endpoint Discovery failed to refresh the required endpoints."
My Setup:
- Celery: 5.2.7
- Eventlet: 0.31.1
- Boto3: 1.24.13
- RabbitMQ: 3.8.9
Task Definition (tasks.py):
import eventlet
eventlet.monkey_patch()
from celery import Celery
app = Celery(
"myapp", broker="pyamqp://guest@localhost//", backend="redis://127.0.0.1:6379//0"
)
@app.task
def extract_performance_data():
TimestreamDbInitializer()
class TimestreamDbInitializer:
# Initialize the Timestream Write client
timestream_write = boto3.client(
TIMESTREAM_WRITE, region_name=AWS_REGION
)
def __init__(self):
self.memory_store_retention_period_in_hours = (
MEMORY_STORE_RETENTION_PERIOD_IN_HOURS_VALUE
)
self.magnetic_store_retention_period_in_days = (
MAGNETIC_STORE_RETENTION_PERIOD_IN_DAYS_VALUE
)
self.create_db()
def create_db(self):
"""
Create the database if it does not exist
"""
try:
response = self.timestream_write.create_database(DatabaseName=DATABASE_NAME)
logger.debug("DB created => %s", response)
except self.timestream_write.exceptions.ConflictException:
logger.debug("Database '%s' already exists.", DATABASE_NAME)
except Exception as err:
logger.error("create_db Error:", err)
Running the Celery Worker:
I start the Celery worker using the following command:
celery -A tasks worker -P eventlet --loglevel=info
Test Script (test_task.py):
from tasks import extract_performance_data
if __name__ == "__main__":
extract_performance_data.delay()
Error:
When I run the test script, the Celery worker logs the following error:
An error occurred: Endpoint Discovery failed to refresh the required
endpoints
Things I’ve Tried:
- Verified that RabbitMQ is running and accessible.
- Ensured AWS credentials are correctly configured and available.
- Set up logging to see detailed error messages.
- Verified network connectivity and ensured there are no firewall issues.
- Ensured Eventlet is monkey-patching properly by placing it at the top of the script.
- Despite these efforts, I am still encountering the same issue. I suspect it might be related to Eventlet’s compatibility with Boto3 or some configuration I am missing.
Questions:
- Has anyone encountered a similar issue with Celery, Eventlet, and Boto3?
- Are there any specific configurations needed to make Boto3 work with Eventlet?
- Is there an alternative approach to handle asynchronous tasks with Celery and Boto3 that might avoid this issue?
- Any insights or suggestions would be greatly appreciated.
2
Answers
So, it seems like boto3 does not work with eventlet. I ended up using gevent and it worked.
You can install gevent using
Then use celery pooling with gevent
One thing to look at is what are the permissions to the IAM role that is associated with when executing the code. Since I see you are trying to use timestream-write that certainly would need the following permission:
timestream:describeEndpoints
just in case its not the boto3 endpoint but the timestream endpoint