skip to Main Content

I’m working with Airflow DAG and Kubernetes for the first time.
I have a Python script that connects to AWS S3 and reads some files. This works fine if I run it in a Docker container/image using bash. But when I try to run this docker from an airflow task using a K8s pod, I get the following error (I replaced some sensitive values with XXXXX)

    [2022-02-08 22:48:55,795] {kubernetes_pod.py:365} INFO - creating pod with labels {'dag_id': 'ECO_CELLS_POLYGON_STORES', 'task_id': 'process_AR', 'execution_date': '2022-02-08T224216.4628350000-e866f2011', 'try_number': '1'} and launcher <airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher object at 0x7f649be71410>
[2022-02-08 22:48:55,812] {pod_launcher.py:93} ERROR - Exception when attempting to create Namespaced Pod: {
  "apiVersion": "v1",
  "kind": "Pod",
  "metadata": {
    "annotations": {},
    "labels": {
      "airflow_version": "2.0.0-astro.8",
      "kubernetes_pod_operator": "True",
      "dag_id": "ECO_CELLS_POLYGON_STORES",
      "task_id": "process_AR",
      "execution_date": "2022-02-08T224216.4628350000-e866f2011",
      "try_number": "1"
    },
    "name": "k8s-pod-ml-operator.3aada8ada8df491ea63e9319bf779d10",
    "namespace": "default"
  },
  "spec": {
    "affinity": {},
    "containers": [
      {
        "args": [],
        "command": [
          "python",
          "main.py"
        ],
        "env": {
          "AWS_ACCESS_KEY_ID": "XXXXXX",
          "AWS_SECRET_ACCESS_KEY": "***",
          "AWS_BUCKET_NAME": "XXXXXX-dev",
          "SNOWFLAKE_SERVER": "XXXXXX",
          "SNOWFLAKE_LOGIN": "XXXXXX",
          "SNOWFLAKE_PASSWORD": "***",
          "SNOWFLAKE_ACCOUNT": "XXXXXX",
          "SNOWFLAKE_DATABASE": "XXXXXX",
          "SNOWFLAKE_WAREHOUSE": "XXXXXX",
          "COUNTRY": "AR",
          "S3_PROJECT": "ecom_polygon_stores",
          "S3_TEAM_VERTICAL": "ecommerce"
        },
        "envFrom": [],
        "image": "ecom_polygon_stores:v1.0.7",
        "imagePullPolicy": "Never",
        "name": "base",
        "ports": [],
        "resources": {},
        "volumeMounts": []
      }
    ],
    "hostNetwork": false,
    "imagePullSecrets": [],
    "initContainers": [],
    "restartPolicy": "Never",
    "securityContext": {},
    "serviceAccountName": "default",
    "tolerations": [],
    "volumes": []
  }
}
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py", line 89, in run_pod_async
    body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 6174, in create_namespaced_pod
    (data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 6265, in create_namespaced_pod_with_http_info
    collection_formats=collection_formats)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 345, in call_api
    _preload_content, _request_timeout)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 176, in __call_api
    _request_timeout=_request_timeout)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 388, in request
    body=body)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 278, in POST
    body=body)
  File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '319700db-6333-4a4d-885c-1f45a0cd13a3', 'X-Kubernetes-Pf-Prioritylevel-Uid': '4d5b12e4-65e9-4ab9-ad63-de6f29ca0b6d', 'Date': 'Tue, 08 Feb 2022 22:48:55 GMT', 'Content-Length': '487'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Env: []v1.EnvVar: decode slice: expect [ or n, but found {, error found in #10 byte of ...|, "env": {"AWS_ACCES|..., bigger context ...|s": [], "command": ["python", "main.py"], "env": {"AWS_ACCESS_KEY_ID": "AXXXXXXXXXXXXXXXXXX6", "AWS_|...","reason":"BadRequest","code":400}

I’m not sure where to go from here… From what I’m reading the error says it expected a [ instead of a { in "env": {"AWS_ACCESS_KEY_ID"… But I’m not sure how to correct that since I pass those parameters like this:

        self.env_vars = {
            'AWS_ACCESS_KEY_ID': s3_connection.login,
            'AWS_SECRET_ACCESS_KEY': s3_connection.password,
            'AWS_BUCKET_NAME': bucket_name,
            'SNOWFLAKE_SERVER': str(snowflake_connection.host),
            'SNOWFLAKE_LOGIN': str(snowflake_connection.login),
            'SNOWFLAKE_PASSWORD': str(snowflake_connection.password),
            'SNOWFLAKE_ACCOUNT': str(snowflake_connection.extra_dejson['account']),
            'SNOWFLAKE_DATABASE': str(snowflake_connection.extra_dejson['database']),
            'SNOWFLAKE_WAREHOUSE': str(snowflake_connection.extra_dejson['warehouse']),
            'COUNTRY': code,
            'S3_PROYECT': s3_project,
            'S3_TEAM_VERTICAL': s3_team_vertical
        }

Any suggestions?

2

Answers


  1. Chosen as BEST ANSWER

    Indeed the env var format is wrong. For the 2.0.0 airflow version it needs to be a list of k8s.V1EnvVar objects (since I'm using k8s).

    Each V1EnvVar needs to have a format like this:

    from kubernetes.client import models as k8s
    
     my_var = k8s.V1EnvVar(name='my_var_name', value=1234)
    

  2. Your env: is malformed; one can see this in two different ways: (1) env: in the PodSpec is a list of {name: "", value: ""} items (2) the structure emitted in the error message is malformed regardless: "env": {"AWS_ACCESS_KEY_ID": "ASIAWCMTKGYGDU6KEOD6", "AWS_|... as there is no such shape of data as {"":"",""

    I don’t have any Airflow reference documentation links handy, but you’d want to check them to ensure self.env_vars is what Airflow expects it to be, since python places the entire burden of correctness upon the programmer

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search