I’m working with Airflow DAG and Kubernetes for the first time.
I have a Python script that connects to AWS S3 and reads some files. This works fine if I run it in a Docker container/image using bash. But when I try to run this docker from an airflow task using a K8s pod, I get the following error (I replaced some sensitive values with XXXXX)
[2022-02-08 22:48:55,795] {kubernetes_pod.py:365} INFO - creating pod with labels {'dag_id': 'ECO_CELLS_POLYGON_STORES', 'task_id': 'process_AR', 'execution_date': '2022-02-08T224216.4628350000-e866f2011', 'try_number': '1'} and launcher <airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher object at 0x7f649be71410>
[2022-02-08 22:48:55,812] {pod_launcher.py:93} ERROR - Exception when attempting to create Namespaced Pod: {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"annotations": {},
"labels": {
"airflow_version": "2.0.0-astro.8",
"kubernetes_pod_operator": "True",
"dag_id": "ECO_CELLS_POLYGON_STORES",
"task_id": "process_AR",
"execution_date": "2022-02-08T224216.4628350000-e866f2011",
"try_number": "1"
},
"name": "k8s-pod-ml-operator.3aada8ada8df491ea63e9319bf779d10",
"namespace": "default"
},
"spec": {
"affinity": {},
"containers": [
{
"args": [],
"command": [
"python",
"main.py"
],
"env": {
"AWS_ACCESS_KEY_ID": "XXXXXX",
"AWS_SECRET_ACCESS_KEY": "***",
"AWS_BUCKET_NAME": "XXXXXX-dev",
"SNOWFLAKE_SERVER": "XXXXXX",
"SNOWFLAKE_LOGIN": "XXXXXX",
"SNOWFLAKE_PASSWORD": "***",
"SNOWFLAKE_ACCOUNT": "XXXXXX",
"SNOWFLAKE_DATABASE": "XXXXXX",
"SNOWFLAKE_WAREHOUSE": "XXXXXX",
"COUNTRY": "AR",
"S3_PROJECT": "ecom_polygon_stores",
"S3_TEAM_VERTICAL": "ecommerce"
},
"envFrom": [],
"image": "ecom_polygon_stores:v1.0.7",
"imagePullPolicy": "Never",
"name": "base",
"ports": [],
"resources": {},
"volumeMounts": []
}
],
"hostNetwork": false,
"imagePullSecrets": [],
"initContainers": [],
"restartPolicy": "Never",
"securityContext": {},
"serviceAccountName": "default",
"tolerations": [],
"volumes": []
}
}
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py", line 89, in run_pod_async
body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 6174, in create_namespaced_pod
(data) = self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 6265, in create_namespaced_pod_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 345, in call_api
_preload_content, _request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 176, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 388, in request
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 278, in POST
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '319700db-6333-4a4d-885c-1f45a0cd13a3', 'X-Kubernetes-Pf-Prioritylevel-Uid': '4d5b12e4-65e9-4ab9-ad63-de6f29ca0b6d', 'Date': 'Tue, 08 Feb 2022 22:48:55 GMT', 'Content-Length': '487'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Env: []v1.EnvVar: decode slice: expect [ or n, but found {, error found in #10 byte of ...|, "env": {"AWS_ACCES|..., bigger context ...|s": [], "command": ["python", "main.py"], "env": {"AWS_ACCESS_KEY_ID": "AXXXXXXXXXXXXXXXXXX6", "AWS_|...","reason":"BadRequest","code":400}
I’m not sure where to go from here… From what I’m reading the error says it expected a [ instead of a { in "env": {"AWS_ACCESS_KEY_ID"… But I’m not sure how to correct that since I pass those parameters like this:
self.env_vars = {
'AWS_ACCESS_KEY_ID': s3_connection.login,
'AWS_SECRET_ACCESS_KEY': s3_connection.password,
'AWS_BUCKET_NAME': bucket_name,
'SNOWFLAKE_SERVER': str(snowflake_connection.host),
'SNOWFLAKE_LOGIN': str(snowflake_connection.login),
'SNOWFLAKE_PASSWORD': str(snowflake_connection.password),
'SNOWFLAKE_ACCOUNT': str(snowflake_connection.extra_dejson['account']),
'SNOWFLAKE_DATABASE': str(snowflake_connection.extra_dejson['database']),
'SNOWFLAKE_WAREHOUSE': str(snowflake_connection.extra_dejson['warehouse']),
'COUNTRY': code,
'S3_PROYECT': s3_project,
'S3_TEAM_VERTICAL': s3_team_vertical
}
Any suggestions?
2
Answers
Indeed the env var format is wrong. For the 2.0.0 airflow version it needs to be a list of k8s.V1EnvVar objects (since I'm using k8s).
Each V1EnvVar needs to have a format like this:
Your
env:
is malformed; one can see this in two different ways: (1)env:
in thePodSpec
is a list of{name: "", value: ""}
items (2) the structure emitted in the error message is malformed regardless:"env": {"AWS_ACCESS_KEY_ID": "ASIAWCMTKGYGDU6KEOD6", "AWS_|...
as there is no such shape of data as{"":"",""
I don’t have any Airflow reference documentation links handy, but you’d want to check them to ensure
self.env_vars
is what Airflow expects it to be, since python places the entire burden of correctness upon the programmer