I have the following function that contains a CTE:
CREATE OR REPLACE FUNCTION create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_job_hash INT,
in_task_classifier VARCHAR(255),
in_task_api_version INT,
in_task_data BYTEA,
in_task_pipe VARCHAR(255),
in_target_pipe VARCHAR(255),
in_prerequisite_job_ids VARCHAR(128)[],
in_delay INT,
in_labels VARCHAR(255)[][] default null,
in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
BEGIN
IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
RETURN QUERY SELECT FALSE;
RETURN;
END IF;
WITH prereqs_with_opts(job_id_with_opts) AS
(
SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
),
prereqs AS
(
-- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
SELECT job_id, precreated FROM
(
SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
FROM prereqs_with_opts
CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
) tbl
WHERE row_number = 1
),
locked_jobs AS
(
SELECT * FROM job
WHERE partition_id = in_partition_id
AND job_id IN (SELECT job_id FROM prereqs)
ORDER BY partition_id, job_id
FOR UPDATE
),
updated_jobs AS
(
SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
),
prereqs_created_but_not_complete AS
(
SELECT * FROM updated_jobs uj
WHERE uj.partition_id = in_partition_id
AND uj.job_id IN (SELECT job_id FROM prereqs)
AND uj.status <> 'Completed'
),
prereqs_not_created_yet AS
(
SELECT * FROM prereqs
WHERE NOT precreated AND job_id NOT IN (
SELECT job_id FROM job WHERE partition_id = in_partition_id
)
),
all_incomplete_prereqs(prerequisite_job_id) AS
(
SELECT job_id FROM prereqs_created_but_not_complete
UNION
SELECT job_id FROM prereqs_not_created_yet
)
INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
SELECT in_partition_id, in_job_id, prerequisite_job_id
FROM all_incomplete_prereqs;
IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
INSERT INTO public.job_task_data(
partition_id,
job_id,
task_classifier,
task_api_version,
task_data,
task_pipe,
target_pipe,
eligible_to_run_date,
suspended
) VALUES (
in_partition_id,
in_job_id,
in_task_classifier,
in_task_api_version,
in_task_data,
in_task_pipe,
in_target_pipe,
CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
in_suspended_partition
);
END IF;
RETURN QUERY SELECT TRUE;
END
$$;
What I would like to do is, AFTER the lock is held on the jobs table (which happens in the CTE), and before the INSERT INTO public.job_dependency
statement is raise an exception if any jobs have Failed
status, for example something like this.
I don’t think creating a temporary table, and using that after the CTE terminates will work, as that will mean the lock is no longer held on the jobs table.
I tried adding a new check_for_failed_prereqs
CTE entry, which calls a function raise_exception_for_failed_prereqs
, and then referencing the check_for_failed_prereqs
CTE in a subsequent CTE using a CROSS JOIN (so that check_for_failed_prereqs
is actually executed) but did not see exception getting raised:
CREATE OR REPLACE FUNCTION raise_exception_for_failed_prereqs(arr text[]) RETURNS boolean
LANGUAGE plpgsql AS
$$BEGIN
IF array_length(arr, 1) > 0 THEN
RAISE EXCEPTION 'One or more prerequisite jobs have failed. Failed Job IDs: %', ARRAY_TO_STRING(arr, ', ') USING ERRCODE = '02000'; -- sqlstate no data
END IF;
RETURN TRUE;
END;
$$;
CREATE OR REPLACE FUNCTION create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_job_hash INT,
in_task_classifier VARCHAR(255),
in_task_api_version INT,
in_task_data BYTEA,
in_task_pipe VARCHAR(255),
in_target_pipe VARCHAR(255),
in_prerequisite_job_ids VARCHAR(128)[],
in_delay INT,
in_labels VARCHAR(255)[][] default null,
in_suspended_partition BOOLEAN default false
)
RETURNS TABLE(
job_created BOOLEAN
)
LANGUAGE plpgsql
AS $$
DECLARE
failed_prerequisite_job_ids VARCHAR[];
BEGIN
IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN
RETURN QUERY SELECT FALSE;
RETURN;
END IF;
WITH prereqs_with_opts(job_id_with_opts) AS
(
SELECT unnest(in_prerequisite_job_ids)::VARCHAR(128)
),
prereqs AS
(
-- Remove any duplicate pre-requisites, and if a pre-req is mentioned multiple times then merge the options
SELECT job_id, precreated FROM
(
SELECT ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY precreated DESC), job_id, precreated
FROM prereqs_with_opts
CROSS JOIN internal_get_prereq_job_id_options(job_id_with_opts)
) tbl
WHERE row_number = 1
),
locked_jobs AS
(
-- Lock table job for update
SELECT * FROM job
WHERE partition_id = in_partition_id
AND job_id IN (SELECT job_id FROM prereqs)
ORDER BY partition_id, job_id
FOR UPDATE
),
updated_jobs AS
(
-- Process outstanding job updates
SELECT * FROM internal_update_job_progress(in_partition_id, (SELECT ARRAY(SELECT job_id FROM locked_jobs)))
),
prereqs_created_but_not_complete AS
(
SELECT * FROM updated_jobs uj
WHERE uj.partition_id = in_partition_id
AND uj.job_id IN (SELECT job_id FROM prereqs)
AND uj.status <> 'Completed'
),
check_for_failed_prereqs AS
(
SELECT raise_exception_for_failed_prereqs(array_agg(job_id))
FROM updated_jobs uj
WHERE uj.partition_id = in_partition_id
AND uj.job_id IN (SELECT job_id FROM prereqs)
AND uj.status = 'Failed'
),
prereqs_not_created_yet AS
(
SELECT * FROM prereqs
CROSS JOIN check_for_failed_prereqs -- Has no effect but to make sure check_for_failed_prereqs is run
WHERE NOT precreated AND job_id NOT IN (
SELECT job_id FROM job WHERE partition_id = in_partition_id
)
),
all_incomplete_prereqs(prerequisite_job_id) AS
(
SELECT job_id FROM prereqs_created_but_not_complete
UNION
SELECT job_id FROM prereqs_not_created_yet
)
INSERT INTO public.job_dependency(partition_id, job_id, dependent_job_id)
SELECT in_partition_id, in_job_id, prerequisite_job_id
FROM all_incomplete_prereqs;
IF FOUND OR in_delay > 0 OR in_suspended_partition THEN
INSERT INTO public.job_task_data(
partition_id,
job_id,
task_classifier,
task_api_version,
task_data,
task_pipe,
target_pipe,
eligible_to_run_date,
suspended
) VALUES (
in_partition_id,
in_job_id,
in_task_classifier,
in_task_api_version,
in_task_data,
in_task_pipe,
in_target_pipe,
CASE WHEN NOT FOUND THEN now() AT TIME ZONE 'UTC' + (in_delay * interval '1 second') END,
in_suspended_partition
);
END IF;
RETURN QUERY SELECT TRUE;
END
$$;
2
Answers
You can write a PL/pgSQL function that raises the desired error and call that function in your SQL statement. Take care that the optimizer may choose to call the function at a different point than you expect — SQL is not a procedural language.
To provide a simplified example somewhat like yours:
While the above-mentioned solution with the helper PL/pgSQL function should work, it is a bit tricky to implement (as already noted, one must actually use the CTE in question by selecting from it later in the SQL) and is perhaps more complex then necessary.
So, let me suggest a simpler – if somewhat pedestrian – solution:
CREATE TEMP TABLE IF NOT EXISTS job_dependency_temp ... ON COMMIT DROP
job_dependency
, insert intojob_dependency_temp
job_dependency
fromjob_dependency_temp
There is a subtle trap here too: if you call
create_job()
multiple times from the same transaction,job_dependency_temp
will not be empty in subsequent calls, sodelete from job_dependency_temp
would also be needed as step 1a in the above list.