skip to Main Content

I have 3 operators imported from airflow.providers.google.cloud.operators.dataproc

  1. DataprocCreateBatchOperator
  2. DataprocDeleteBatchOperator
  3. DataprocGetBatchOperator

Need the same kind-of operators for Azure.
Can please someone look into this or I have to create a new operator ?

2

Answers


  1. Chosen as BEST ANSWER

    @Mazlum Tosun

    For GCP in my code DataprocCreateBatchOperator used like this:-

    create_batch = DataprocCreateBatchOperator(
        task_id="CREATE_BATCH",
        batch={
                "pyspark_batch": {
                    "main_python_file_uri": f"gs://{ARTIFACT_BUCKET}/spark-jobs/main.py",
                    "args": app_args,
                    "python_file_uris": [
                        f"gs://{ARTIFACT_BUCKET}/spark-jobs/jobs.zip",
                        f"gs://{ARTIFACT_BUCKET}/spark-jobs/libs.zip"
                    ],
                    "jar_file_uris": test_jars,
                    "file_uris": [
                        f"gs://{ARTIFACT_BUCKET}/config/params.yaml"
                    ]
                },
                "environment_config": {
                    "peripherals_config": {
                        "spark_history_server_config": {}
                    }
                }
            },
            region=REGION,batch_id=batch_id_str,)
    

  2. I believe the apache-airflow-providers-microsoft-azure provider package equivalent for Dataproc operators would be Azure Synapse Operators.

    Specifically, the AzureSynapseRunSparkBatchOperator allows users to "execute a spark application within Synapse Analytics".

    If you’re running Spark jobs on Azure Databricks, there are also several Databricks Operators that might be able to help.


    Here’s an example PythonOperator (via Taskflow API) that uses the AzureSynapseHook. Note that I didn’t test this, and I’m just using this as a demonstration of what it might look like:

    @task()
    def cancel_spark_job(job_id: str):
        hook = AzureSynapseHook(azure_synapse_conn_id="your_conn_id")
        if hook.wait_for_job_run_status(job_id, expected_statuses=("error", "dead", "killed"):
            hook.cancel_job_run(job_id)
    

    This task will wait for a spark job to enter a status of "error", "dead", or "killed" or timeout. If the spark job enters one of the statuses previously mentioned, it will cancel the job. Again, this is just for a demonstration of how to use the AzureSynapseHook within a PythonOperator, and I’m not sure if it would work or if it even makes sense to implement it this way.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search