I have a case where I have a table "tab" that stores data (keyed by symbol and time) as well as a second table "summ" that stores the exponential weighted running average of the data in the first table. I have it set up so that when data is added to the first table it triggers the calculation of the running average of the corresponding rows in the second table.
I cant figure out a good way to recover the state however for continuing on the running average. Instead it starts anew each time the trigger is called. How do I save the state of the aggregation at the end of
I have a minimal example at https://dbfiddle.uk/6dqXCwIQ which is also reproduced below.
First making a table that will contain the unsmoothed data as well as a summary table that will hold the smoothed data. There is also a trigger to populate the second table given the first.
/* Making a table that will contain the unsmoothed data */
create table tab (
symbol text,
time_to timestamptz,
something int4,
PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name2 ON tab USING btree (symbol, time_to);
/* Making a summarisation table too and a trigger to populate it when something is inserted into tab. */
CREATE OR REPLACE FUNCTION smoother_state(state double precision[],
newval double precision, frac double precision)
RETURNS double precision[]
LANGUAGE plpgsql
IMMUTABLE PARALLEL SAFE LEAKPROOF
AS $function$
declare
resul double precision := case when state[1] is null then newval else state[1] * (1-frac) + newval * frac end;
begin
return ARRAY[resul, coalesce(state[2] + 1, 1)];
END;
$function$
;
CREATE OR REPLACE AGGREGATE smoother(val double precision, frac double precision) (
SFUNC = smoother_state,
STYPE = double precision[2]
);
create table summ (
symbol text,
time_to timestamptz,
smoothed_something double precision,
number_of_periods double precision,
PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name3 ON summ USING btree (symbol, time_to);
/* Making a trigger.
I guess it should read the smoothed_something and number_of_periods values for the symbol
and use them as an initial state vector.
*/
CREATE OR REPLACE FUNCTION do_update()
RETURNS trigger
LANGUAGE plpgsql
PARALLEL SAFE STRICT LEAKPROOF
AS $function$
DECLARE
BEGIN
with a as (select symbol, time_to, something,
smoother(something, 0.3) over (partition by symbol order by time_to) as smoo
FROM newtab
), b as (select symbol, time_to, smoo[1] as smoothed_something, smoo[2] as number_of_periods from a)
INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) select * from b;
RETURN null;
END;
$function$
;
create trigger update_smoothed after
insert on tab
referencing new table as newtab
for each statement
execute function do_update()
Inserting data the first time both tab and summ look correct.
insert into tab (symbol, time_to, something) values
('a', '2022-01-01 00:00:15+01:00'::timestamptz, 15),
('b', '2021-01-01 00:00:15+01:00'::timestamptz, 18),
('b', '2022-01-01 00:00:15+01:00'::timestamptz, 13),
('b', '2023-01-01 00:00:15+01:00'::timestamptz, 11),
('b', '2024-01-01 00:00:15+01:00'::timestamptz, 3),
('c', '2022-01-01 00:00:16+01:00'::timestamptz, 15),
('c', '2022-01-01 00:00:17+01:00'::timestamptz, 150);
Inserting data the second time tab looks correct but summ is not as the aggregation starts from a null state.
insert into tab (symbol, time_to, something) values
('a', '2022-06-01 00:00:15+01:00'::timestamptz, 150),
('a', '2022-07-01 01:00:15+01:00'::timestamptz, 170),
('b', '2024-08-01 00:00:15+01:00'::timestamptz, 180),
('b', '2024-09-01 00:00:15+01:00'::timestamptz, 130);
You can see the resultant data in each table below and also in the fiddle site.
For this problem:
- Is there a way to modify the trigger to input the correct state when running the aggregation the second time? I am happy to impose that we only ever add data that happens later in time than earlier observations.
- Is there a better way to accomplish this? I have alot of data so want to avoid having to do a materialised view (as it takes too long to refresh the whole thing).
2
Answers
You could extend the
newtab
with the latest value for eachsymbol
so that the window function sees the new values preceded by whatever came last. Then the trigger remembers not to re-insert those initial values.The aggregate function could also be simplified to just do the thing it’s meant for, and leave counting to
count(*)
.demo at db<>fiddle
Distinct on
can pick the latest record persymbol
.This seems to fix the problem, as long as you can rely on this assumption
An alternative, closer to the idea from the title: the function can accept an
init_state
parameter:demo at db<>fiddle
You still need to look up the init state and inject it: