skip to Main Content

My Code

I am using my personal access token with full access still getting error, tell me other possible ways to achieve this so I can add tasks to multiple azure pipeline through automation.

import requests
import base64

# Azure DevOps organization URL
org_url = "https://dev.azure.com/my-org"  # Replace with your organization URL

# Personal access token (PAT) with necessary permissions (Read and Write) for pipelines
pat = "my-pat" 

# Encode the PAT to Base64
credentials = base64.b64encode(f":{pat}".encode()).decode()

# Project name and pipeline ID where you want to add the tasks
project_name = "My Project"
pipeline_id = 12

# Tasks to be added
tasks = [
    {
        "taskReference": {
            "id": "mspremier.PostBuildCleanup.PostBuildCleanup-task.PostBuildCleanup@3",
            "name": "Clean Agent Directories"
        },
        "enabled": True
    },
    {
        "taskReference": {
            "id": "NodeTool@0",
            "name": "Use Node 6.x"
        },
        "enabled": True
    }
]

def update_pipeline_definition():
    url = f"{org_url}/{project_name}/_apis/pipelines/{pipeline_id}?api-version=6.0-preview.1"
    headers = {"Authorization": f"Basic {credentials}", "Content-Type": "application/json"}

    # Get the current pipeline definition
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        pipeline_data = response.json()

        # Modify the pipeline definition to include the tasks
        if "phases" in pipeline_data["configuration"]:
            for phase in pipeline_data["configuration"]["phases"]:
                if "steps" in phase:
                    for task in tasks:
                        phase["steps"].append(task)

        # Update the pipeline definition
        response = requests.put(url, headers=headers, json=pipeline_data)

        if response.status_code == 200:
            print(f"Tasks added to pipeline {pipeline_id} successfully.")
        else:
            print(f"Failed to update pipeline {pipeline_id}. Status code: {response.status_code}")
            print(response.text)
    else:
        print(f"Failed to get pipeline {pipeline_id}. Status code: {response.status_code}")
        print(response.text)

if __name__ == "__main__":
    update_pipeline_definition()

Following error I am getting:

Failed to update pipeline 42. Status code: 405
{"count":1,"value":{"Message":"The requested resource does not support http method 'PUT'."}}

It should update/add the tasks to multiple azure pipelines, tell me other possible way to automate this.

2

Answers


  1. I tried making some changes to your code and ran the pipeline by updating its tasks, But Only the pipeline ran and the tasks did not get updated as steps, jobs and for classic pipeline phases, steps are not supported in the Python.

    In your existing code replace put with post or use the modified code below which runs thee pipeline but fails to create the task as it is not supported as mentioned above.

    My modified code:-

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Replace with your Azure DevOps organization URL
    organization_url = "https://dev.azure.com/org-name"
    
    # Replace with your Personal Access Token
    personal_access_token = "xxxxxj4gskzhnjunhtxdpdfi3kaygq2a"
    
    # Replace with your project name and pipeline ID
    project_name = "devopsprojectname"
    pipeline_id = 122
    
    # Task definition for the sample task
    new_task = {
        "task": {
            "id": "CmdLine",              # ID of the task type (CmdLine is a simple command line task)
            "versionSpec": "2.*",         # Task version (2.* is the latest version)
            "inputs": {
                "script": "echo Hello",   # Sample command to be executed by the task
            }
        }
    }
    
    # URL to create a new version of the pipeline
    api_url = f"{organization_url}/{project_name}/_apis/pipelines/{pipeline_id}/runs?api-version=6.0-preview.1"
    
    headers = {
        "Content-Type": "application/json",
    }
    
    auth = HTTPBasicAuth("", personal_access_token)
    
    response = requests.post(api_url, json={}, headers=headers, auth=auth)
    
    if response.status_code == 202:
        run_id = response.json()["id"]
    
        # Get the run details to modify the new pipeline
        api_url = f"{organization_url}/{project_name}/_apis/pipelines/{pipeline_id}/runs/{run_id}?api-version=6.0-preview.1"
        response = requests.get(api_url, headers=headers, auth=auth)
    
        if response.status_code == 200:
            pipeline_run = response.json()
    
            # Add the new task definition to the existing tasks in the new pipeline version
            pipeline_run["phases"][0]["steps"].append(new_task)
    
            # Update the pipeline run with the modified definition
            response = requests.patch(api_url, json=pipeline_run, headers=headers, auth=auth)
    
            if response.status_code == 200:
                print("Task added successfully to the pipeline.")
            else:
                print(f"Failed to add task to the new pipeline. Status code: {response.status_code}, Error: {response.text}")
        else:
            print(f"Failed to get pipeline run details. Status code: {response.status_code}, Error: {response.text}")
    else:
        print(f"Failed to create a new version of the pipeline. Status code: {response.status_code}, Error: {response.text}")
    

    Pipeline ran but task failed:-

    enter image description here

    The recommended way is to upgrade your YAML file content in your Azure Repository and then use the code below to trigger the pipeline with Python like below:-

    Python code:-

    import requests
    from requests.auth import HTTPBasicAuth
    
    # Replace with your Azure DevOps organization URL
    organization_url = "https://dev.azure.com/org-name"
    
    # Replace with your Personal Access Token
    personal_access_token = "xxxxixxxhtxdpdfi3kaygq2a"
    
    # Replace with your project name and pipeline ID
    project_name = "devopsprojectname"
    pipeline_id = 122
    
    # URL to trigger a new pipeline run
    api_url = f"{organization_url}/{project_name}/_apis/pipelines/{pipeline_id}/runs?api-version=7.1-preview"
    
    headers = {
        "Content-Type": "application/json",
    }
    
    auth = HTTPBasicAuth("", personal_access_token)
    
    # Trigger a new pipeline run
    response = requests.post(api_url, json={}, headers=headers, auth=auth)
    
    if response.status_code == 200:
        print("Pipeline run triggered successfully.")
    else:
        print(f"Failed to trigger the pipeline run. Status code: {response.status_code}, Error: {response.text}")
    

    Output:-

    enter image description here

    enter image description here

    Login or Signup to reply.
  2. Can you check whether the endpoint method is right, As 405 status code means the "method is Note Allowed" for that particular endpoint (PUT in this case)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search