skip to Main Content

I have a parameter called –file_delimiter in my dataflow flex template job. This parameter takes ‘,’ or ‘|’ values as input.

In my beam pipeline, I am passing this as the argument for the read_csv transform.

df = p | read_csv(input_file,sep=known_args.file_delimiter)

argument parser code:

parser.add_argument(
        "--file_delimiter",
        default=",",
    )

when I run my dataflow job using the following command, It works fine:

python test.py --output_table $PROJECT:$Dataset.$table --input_file $file  --runner=DataflowRunner --project=$PROJECT--job_name=titles-df  --temp_location=gs://ingest-test1/temp --region=us-central1 --delimiter ,

But when I create a flex template and run the command below. The job fails

gcloud dataflow flex-template run "titles-template-`date +%Y%m%d-%H%M%S`" 
--template-file-gcs-location "$TEMPLATE_PATH" 
--parameters input_file="gs://ingest-test1/titles.csv" 
--parameters output_table="$PROJECT:templateOutput.titles" 
--parameters file_delimiter=","  --region "$REGION"

job logs:

Error occurred in the launcher container: Template launch failed. See console logs.

console logs:

"message":"ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0"}

I don’t understand why it works for a normal dataflow job but not for the flex-template job. I am I supposed to pass "," to the –file delimiter parameter? why is it showing length 0 when I did pass the string ",".

I also want to mention, that even when I don’t pass anything for –file_delimiter, the flex template job throws the same error. But when I don’t pass anything for the normal dataflow job, it is using the default value for the parameter which is "," and is able to run successfully.

Complete Console logs:

{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.386919","line":"python_template_launcher.go:40","message":"Started template launcher."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.387097","line":"python_template_launcher.go:44","message":"Initialize Python template."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.387111","line":"python_template.go:93","message":"Falling back to using template-container args from metadata: template-container-args"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.388666","line":"python_template.go:102","message":"Validating metadata template-container-args: {"consoleLogsLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs","environment":{"region":"us-central1","serviceAccountEmail":"[email protected]","stagingLocation":"gs://dataflow-staging-us-central1-1075620756053/staging","tempLocation":"gs://dataflow-staging-us-central1-1075620756053/tmp"},"jobId":"2022-06-11_23_41_36-12248159446928913945","jobName":"titles-template-default-20220612-064135","jobObjectLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object","operationResultLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/operation_result","parameters":{"file_delimiter":"","input_file":"gs://ingest-test1/titles.csv","output_table":"hidden-mapper-351214:templateOutput.titles-default","staging_location":"gs://dataflow-staging-us-central1-1075620756053/staging","temp_location":"gs://dataflow-staging-us-central1-1075620756053/tmp"},"projectId":"hidden-mapper-351214"}"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389043","line":"python_template.go:111","message":"Extracting operation result location."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389065","line":"python_template.go:119","message":"Operation result location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/operation_result"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389081","line":"python_template.go:122","message":"Extracting console log location."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389091","line":"python_template.go:130","message":"Console logs location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389106","line":"python_template.go:133","message":"Extracting Python command specs."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389640","line":"python_template.go:142","message":"Generating launch args."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389767","line":"python_args.go:236","message":"Overriding staging_location with value: gs://dataflow-staging-us-central1-1075620756053/staging (previous value: gs://dataflow-staging-us-central1-1075620756053/staging)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389823","line":"python_args.go:236","message":"Overriding temp_location with value: gs://dataflow-staging-us-central1-1075620756053/tmp (previous value: gs://dataflow-staging-us-central1-1075620756053/tmp)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389879","line":"launch.go:47","message":"Validating ExpectedFeatures."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389896","line":"launch.go:72","message":"Launching Python template."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389914","line":"python_template.go:64","message":"Using launch args: [/template/ingest-file-bq.py --requirements_file=/template/requirements.txt --runner=DataflowRunner --project=hidden-mapper-351214 --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging --input_file=gs://ingest-test1/titles.csv --job_name=titles-template-default-20220612-064135 --region=us-central1 --service_account_email=1075620756053-compute@developer.gserviceaccount.com --file_delimiter= --output_table=hidden-mapper-351214:templateOutput.titles-default]"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:52.389964","line":"exec.go:38","message":"Executing: python /template/ingest-file-bq.py --requirements_file=/template/requirements.txt --runner=DataflowRunner --project=hidden-mapper-351214 --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging --input_file=gs://ingest-test1/titles.csv --job_name=titles-template-default-20220612-064135 --region=us-central1 --service_account_email=1075620756053-compute@developer.gserviceaccount.com --file_delimiter= --output_table=hidden-mapper-351214:templateOutput.titles-default"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.308089","line":"exec.go:66","message":"INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.308476","line":"exec.go:66","message":"INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds."}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.312666","line":"exec.go:66","message":"INFO:oauth2client.transport:Attempting refresh to obtain initial access_token"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644630","line":"exec.go:66","message":"Traceback (most recent call last):"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644687","line":"exec.go:66","message":"  File "/template/ingest-file-bq.py", line 96, in u003cmoduleu003e"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644715","line":"exec.go:66","message":"    run()"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644725","line":"exec.go:66","message":"  File "/template/ingest-file-bq.py", line 83, in run"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644737","line":"exec.go:66","message":"    df = p | read_csv(input_file,sep=known_args.file_delimiter,dtype=object,header=0,names=headers)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644760","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 614, in __ror__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644773","line":"exec.go:66","message":"    result = p.apply(self, pvalueish, label)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644796","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 708, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644813","line":"exec.go:66","message":"    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644859","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644873","line":"exec.go:66","message":"    return super().apply(transform, input, options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644884","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644896","line":"exec.go:66","message":"    return m(transform, input, options)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644905","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644924","line":"exec.go:66","message":"    return transform.expand(input)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644934","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/apache_beam/dataframe/io.py", line 250, in expand"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644946","line":"exec.go:66","message":"    self.reader(handle, *self.args, **dict(self.kwargs, chunksize=100)))"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644975","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/util/_decorators.py", line 311, in wrapper"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644987","line":"exec.go:66","message":"    return func(*args, **kwargs)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.644996","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 586, in read_csv"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645007","line":"exec.go:66","message":"    return _read(filepath_or_buffer, kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645021","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 482, in _read"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645033","line":"exec.go:66","message":"    parser = TextFileReader(filepath_or_buffer, **kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645043","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 811, in __init__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645054","line":"exec.go:66","message":"    self._engine = self._make_engine(self.engine)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645064","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/readers.py", line 1040, in _make_engine"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645075","line":"exec.go:66","message":"    return mapping[engine](self.f, **self.options)  # type: ignore[call-arg]"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645086","line":"exec.go:66","message":"  File "/usr/local/lib/python3.7/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 69, in __init__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645098","line":"exec.go:66","message":"    self._reader = parsers.TextReader(self.handles.handle, **kwds)"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645108","line":"exec.go:66","message":"  File "pandas/_libs/parsers.pyx", line 401, in pandas._libs.parsers.TextReader.__cinit__"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.645119","line":"exec.go:66","message":"ValueError: only single character unicode strings can be converted to Py_UCS4, got length 0"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881335","line":"exec.go:52","message":"python failed with exit status 1"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881396","line":"launch.go:77","message":"Template launch failed: exit status 1"}
{"container_id":"96e296b95468afac85863625bb00daf291ce6f448adab9620461e4cb468e1e4d","severity":"INFO","time":"2022/06/12 06:43:54.881414","line":"launch.go:99","message":"Uploading console logs to gcs location: gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs"}

metadata.json

{
    "name": "CSV-BQ beam Python flex template",
    "description": "flex template to ingest files into BQ",
    "parameters": [
      {
        "name": "input_file",
        "label": "Input csv file gcs path",
        "helpText": "gcscpath of the file"
       
      },
      {
        "name": "output_table",
        "label": "BigQuery output table name.",
        "helpText": "Name of the BigQuery output table name.",
        "isOptional": true,
        "regexes": [
          "([^:]+:)?[^.]+[.].+"
        ]
      },
      {
        "name": "file_delimiter",
        "label": "delimiter used in the file",
        "helpText": "pass the character used as delimited eg: , or | ",
        "isOptional": true
      }
    ]
  }

2

Answers


  1. In the provided logs, there are these two lines that are helpful clues (formatted for easy reading)

    
    {
      "time":"2022/06/12 06:43:52.388666",
      "line":"python_template.go:102",
      "message":"Validating metadata template-container-args:
        {
          "consoleLogsLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/console_logs",
          "environment":
            {
              "region":"us-central1",
              "serviceAccountEmail":"[email protected]",
              "stagingLocation":"gs://dataflow-staging-us-central1-1075620756053/staging",
              "tempLocation":"gs://dataflow-staging-us-central1-1075620756053/tmp"
            },
          "jobId":"2022-06-11_23_41_36-12248159446928913945",
          "jobName":"titles-template-default-20220612-064135",
          "jobObjectLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object",
          "operationResultLocation":"gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/operation_result",
          "parameters":
            {
              "file_delimiter":"",
              "input_file":"gs://ingest-test1/titles.csv",
              "output_table":"hidden-mapper-351214:templateOutput.titles-default",
              "staging_location":"gs://dataflow-staging-us-central1-1075620756053/staging",
              "temp_location":"gs://dataflow-staging-us-central1-1075620756053/tmp"
            },
          "projectId":"hidden-mapper-351214"
        }"
      }
    
    {
      "time":"2022/06/12 06:43:52.389914",
      "line":"python_template.go:64",
      "message":
        "Using launch args: [
          /template/ingest-file-bq.py 
            --requirements_file=/template/requirements.txt
            --runner=DataflowRunner
            --project=hidden-mapper-351214
            --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object
            --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp
            --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging
            --input_file=gs://ingest-test1/titles.csv
            --job_name=titles-template-default-20220612-064135
            --region=us-central1
            --service_account_email=1075620756053-compute@developer.gserviceaccount.com
            --file_delimiter=
            --output_table=hidden-mapper-351214:templateOutput.titles-default
      ]"
    }
    

    and

    {
      "time":"2022/06/12 06:43:52.389964",
      "line":"exec.go:38",
      "message":
        "Executing:
           python /template/ingest-file-bq.py
             --requirements_file=/template/requirements.txt
             --runner=DataflowRunner
             --project=hidden-mapper-351214
             --template_location=gs://dataflow-staging-us-central1-1075620756053/staging/template_launches/2022-06-11_23_41_36-12248159446928913945/job_object
             --temp_location=gs://dataflow-staging-us-central1-1075620756053/tmp
             --staging_location=gs://dataflow-staging-us-central1-1075620756053/staging
             --input_file=gs://ingest-test1/titles.csv
             --job_name=titles-template-default-20220612-064135
             --region=us-central1
             --service_account_email=1075620756053-compute@developer.gserviceaccount.com
             --file_delimiter=
             --output_table=hidden-mapper-351214:templateOutput.titles-default"
    }
    

    So the --file_delimiter parameter is being passed (hence the default is not used) but is set to the empty string.

    We do see the other parameters --input_file and --output_table being passed to the Python program as expected. This suggests that there is a mismatch between your metadata.json and the parameters you are passing. The documentation is at https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#metadata.

    Login or Signup to reply.
  2. Having dug into this from the templates side of things, it looks like your issue is that the gcloud CLI is treating your comma as a flag separating value, which is why the direct job submission to Dataflow works and the gcloud template submission route does not. You should be able to avoid this by using the gcloud CLI’s escaping syntax (https://cloud.google.com/sdk/gcloud/reference/topic/escaping) to make sure that your comma is parsed as the value you want.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search