skip to Main Content

I have the following function that contains a CTE:

CREATE OR REPLACE FUNCTION create_job(
    in_partition_id VARCHAR(40),
    in_job_id VARCHAR(48),
    in_name VARCHAR(255),
    in_description TEXT,
    in_data TEXT,
    in_job_hash INT,
    in_task_classifier VARCHAR(255),
    in_task_api_version INT,
    in_task_data BYTEA,
    in_task_pipe VARCHAR(255),
    in_target_pipe VARCHAR(255),
    in_prerequisite_job_ids VARCHAR(128)[],
    in_delay INT,
    in_labels VARCHAR(255)[][] default null,
    in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
    job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
BEGIN
    IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
        RETURN QUERY SELECT FALSE;
        RETURN;
    END IF;

    WITH prereqs_with_opts(job_id_with_opts) AS
    (
        SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
    ),
    prereqs AS
    (
        -- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
        SELECT job_id, precreated FROM
        (
            SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
            FROM prereqs_with_opts
            CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
        ) tbl
        WHERE row_number = 1
    ),
    locked_jobs AS
    (
        SELECT * FROM job
        WHERE partition_id = in_partition_id
            AND job_id IN (SELECT job_id FROM prereqs)
        ORDER BY partition_id, job_id
        FOR UPDATE
    ),
    updated_jobs AS
    (
        SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
    ),
    prereqs_created_but_not_complete AS
    (
        SELECT * FROM updated_jobs uj
        WHERE uj.partition_id = in_partition_id
            AND uj.job_id IN (SELECT job_id FROM prereqs)
            AND uj.status <> 'Completed'
    ),
    prereqs_not_created_yet AS
    (
        SELECT * FROM prereqs
        WHERE NOT precreated AND job_id NOT IN (
            SELECT job_id FROM job WHERE partition_id = in_partition_id
        )
    ),
    all_incomplete_prereqs(prerequisite_job_id) AS
    (
        SELECT job_id FROM prereqs_created_but_not_complete
        UNION
        SELECT job_id FROM prereqs_not_created_yet
    )

    INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
    SELECT in_partition_id, in_job_id, prerequisite_job_id
    FROM all_incomplete_prereqs;

    IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
        INSERT INTO public.job_task_data(
            partition_id,
            job_id,
            task_classifier,
            task_api_version,
            task_data,
            task_pipe,
            target_pipe,
            eligible_to_run_date,
            suspended
        ) VALUES (
            in_partition_id,
            in_job_id,
            in_task_classifier,
            in_task_api_version,
            in_task_data,
            in_task_pipe,
            in_target_pipe,
            CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
            in_suspended_partition
        );
    END IF;

    RETURN QUERY SELECT TRUE;
END
$$;

What I would like to do is, AFTER the lock is held on the jobs table (which happens in the CTE), and before the INSERT INTO public.job_dependency statement is raise an exception if any jobs have Failed status, for example something like this.

I don’t think creating a temporary table, and using that after the CTE terminates will work, as that will mean the lock is no longer held on the jobs table.

I tried adding a new check_for_failed_prereqs CTE entry, which calls a function raise_exception_for_failed_prereqs, and then referencing the check_for_failed_prereqs CTE in a subsequent CTE using a CROSS JOIN (so that check_for_failed_prereqs is actually executed) but did not see exception getting raised:

CREATE OR REPLACE FUNCTION raise_exception_for_failed_prereqs(arr text[]) RETURNS boolean
    LANGUAGE plpgsql AS
$$BEGIN
    IF array_length(arr, 1) > 0 THEN
        RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(arr, ', ') USING ERRCODE = '02000'; -- sqlstate no data
    END IF;
    RETURN TRUE;
END;
$$;

CREATE OR REPLACE FUNCTION create_job(
    in_partition_id VARCHAR(40),
    in_job_id VARCHAR(48),
    in_name VARCHAR(255),
    in_description TEXT,
    in_data TEXT,
    in_job_hash INT,
    in_task_classifier VARCHAR(255),
    in_task_api_version INT,
    in_task_data BYTEA,
    in_task_pipe VARCHAR(255),
    in_target_pipe VARCHAR(255),
    in_prerequisite_job_ids VARCHAR(128)[],
    in_delay INT,
    in_labels VARCHAR(255)[][] default null,
    in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
    job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
DECLARE
    failed_prerequisite_job_ids VARCHAR[];
BEGIN

        IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
        RETURN QUERY SELECT FALSE;
        RETURN;
    END IF;

    WITH prereqs_with_opts(job_id_with_opts) AS
    (
        SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
    ),
    prereqs AS
    (
        -- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
        SELECT job_id, precreated FROM
        (
            SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
            FROM prereqs_with_opts
            CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
        ) tbl
        WHERE row_number = 1
    ),
    locked_jobs AS
    (
        -- Lock table job for update
        SELECT * FROM job
        WHERE partition_id = in_partition_id
            AND job_id IN (SELECT job_id FROM prereqs)
        ORDER BY partition_id, job_id
        FOR UPDATE
    ),
    updated_jobs AS
    (
        -- Process outstanding job updates
        SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
    ),
    prereqs_created_but_not_complete AS
    (
        SELECT * FROM updated_jobs uj
        WHERE uj.partition_id = in_partition_id
            AND uj.job_id IN (SELECT job_id FROM prereqs)
            AND uj.status <> 'Completed'
    ),
    check_for_failed_prereqs AS
    (
        SELECT raise_exception_for_failed_prereqs(array_agg(job_id))
        FROM updated_jobs uj
        WHERE uj.partition_id = in_partition_id
          AND uj.job_id IN (SELECT job_id FROM prereqs)
          AND uj.status = 'Failed'
    ),
    prereqs_not_created_yet AS
    (
        SELECT * FROM prereqs
        CROSS JOIN check_for_failed_prereqs -- Has no effect but to make sure check_for_failed_prereqs  is run
        WHERE NOT precreated AND job_id NOT IN (
            SELECT job_id FROM job WHERE partition_id = in_partition_id
        )
    ),
    all_incomplete_prereqs(prerequisite_job_id) AS
    (
        SELECT job_id FROM prereqs_created_but_not_complete
        UNION
        SELECT job_id FROM prereqs_not_created_yet
    )

    INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
    SELECT in_partition_id, in_job_id, prerequisite_job_id
    FROM all_incomplete_prereqs;

    IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
        INSERT INTO public.job_task_data(
            partition_id,
            job_id,
            task_classifier,
            task_api_version,
            task_data,
            task_pipe,
            target_pipe,
            eligible_to_run_date,
            suspended
        ) VALUES (
            in_partition_id,
            in_job_id,
            in_task_classifier,
            in_task_api_version,
            in_task_data,
            in_task_pipe,
            in_target_pipe,
            CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
            in_suspended_partition
        );
    END IF;

    RETURN QUERY SELECT TRUE;
END
$$;

2

Answers


  1. You can write a PL/pgSQL function that raises the desired error and call that function in your SQL statement. Take care that the optimizer may choose to call the function at a different point than you expect — SQL is not a procedural language.

    To provide a simplified example somewhat like yours:

    CREATE FUNCTION check_array_empty(arr text[]) RETURNS boolean
       LANGUAGE plpgsql AS
    $$BEGIN
       IF cardinality(arr) > 0 THEN
          RAISE EXCEPTION 'boom!';
       END IF;
       RETURN TRUE;
    END;$$;
    
    WITH things AS (
       SELECT ...
    ), error_check AS (
       SELECT check_array_empty(array_agg(col))
       FROM things
       WHERE ...
    )
    SELECT ...
    FROM things
       /* has no effect except to make sure "check_array_empty" is executed */
       CROSS JOIN error_check
    WHERE ...;
    
    Login or Signup to reply.
  2. While the above-mentioned solution with the helper PL/pgSQL function should work, it is a bit tricky to implement (as already noted, one must actually use the CTE in question by selecting from it later in the SQL) and is perhaps more complex then necessary.

    So, let me suggest a simpler – if somewhat pedestrian – solution:

    1. Before the big SQL, CREATE TEMP TABLE IF NOT EXISTS job_dependency_temp ... ON COMMIT DROP
    2. Instead of job_dependency, insert into job_dependency_temp
    3. Do the check and raise the exception if necessary
    4. If everything is fine, insert into job_dependency from job_dependency_temp

    There is a subtle trap here too: if you call create_job() multiple times from the same transaction, job_dependency_temp will not be empty in subsequent calls, so delete from job_dependency_temp would also be needed as step 1a in the above list.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search