skip to Main Content

I’m trying to use Celery with SQS with the below config. But I receive InvalidClientToken error. Could some one help me to resolve this?

I’m using celery[sqs]==5.3.6, Kombu==5.3.7

# Celery configuration
SQS_QUEUE_URL = f'https://sqs.us-east-2.amazonaws.com/{settings.aws_account}/{settings.celery_worker_queue}' 
result_backend = f'db+postgresql://{settings.pg_db_config.user}:{settings.pg_db_config.pwd}@{settings.pg_db_config.host}:5432/{settings.pg_db_config.db_name}'
pg_db_conn_string = f"postgresql://{settings.pg_db_config.user}:{settings.pg_db_config.pwd}@{settings.pg_db_config.host}:{settings.pg_db_config.port}/{settings.pg_db_config.db_name}"

session = boto3.Session(profile_name='ar-se-sbx')

app = Celery(
    'worker',
    broker_url=f"sqs://",
    backend=result_backend,
    task_default_queue=settings.celery_worker_queue,
    broker_connection_retry_on_startup=True,
    broker_transport_options={
        "region": "us-east-2",
        'predefined_queues': {
            'celery-worker-1':{
                'url': SQS_QUEUE_URL,
                'access_key_id': session.get_credentials().access_key,
                'secret_access_key': session.get_credentials().secret_key,
            }
        }
    },
    task_create_missing_queues=False,
)

Error:

--------------- [email protected] (emerald-rush)
--- ***** ----- 
-- ******* ---- Windows-10-10.0.22631-SP0 2024-09-21 17:40:48
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         worker:0x1f0861a8370
- ** ---------- .> transport:   sqs://localhost//
- ** ---------- .> results:     postgresql://root:**@127.0.0.1:5432/postgres
- *** --- * --- .> concurrency: 12 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery-worker-1  exchange=celery-worker-1(direct) key=celery-worker-1


[tasks]
  . LANG_GUARD

[2024-09-21 17:40:48,995: INFO/MainProcess] Connected to sqs://localhost//
[2024-09-21 17:40:49,832: CRITICAL/MainProcess] Unrecoverable error: ClientError('An error occurred (InvalidClientTokenId) when calling the GetQueueAttributes operation: The security token included in the request is invalid.')

2

Answers


  1. Based on IAM identifiers – AWS Identity and Access Management:

    ASIA: Temporary (AWS STS) access key IDs use this prefix, but are unique only in combination with the secret access key and the session token.

    This means that you are using temporary credentials, typically obtained when assuming an IAM Role. Temporary credentials have three parts:

    • Access Key (like a username)
    • Secret Key (like a password)
    • Session Token (used only with temporary credentials)

    Therefore, you should use something like:

                    'access_key_id': session.get_credentials().access_key,
                    'secret_access_key': session.get_credentials().secret_key,
                    'session_token': session.get_credentials().session_token
    

    (I’m not sure what variable name the celery worker is expecting for session_token, so it might be slightly different.)

    Alternatively, you can store credentials from an IAM User in your local machine, which would only use an Access Key and Secret Key (without the Session Token).

    Login or Signup to reply.
  2. You need to move all access credentials to a higher level by using the broker_transport_options. This is the example:

    from celery import Celery
    from kombu import Queue
    
    app = Celery(
        "worker",
        broker_url="sqs://",
        broker_connection_retry_on_startup=True,
        broker_transport_options={
            "region": "us-east-2",
            "aws_access_key_id": session.get_credentials().access_key,
            "aws_secret_access_key": session.get_credentials().secret_key,
            "aws_session_token": session.get_credentials().session_token,
            "predefined_queues": {
                settings.celery_worker_queue: {
                    "url": SQS_QUEUE_URL,
                }
            },
        },
        task_queues=[
            Queue(settings.celery_worker_queue, routing_key=settings.celery_worker_queue),
        ],
        task_create_missing_queues=False,
    )
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search