skip to Main Content

I have a case where I have a table "tab" that stores data (keyed by symbol and time) as well as a second table "summ" that stores the exponential weighted running average of the data in the first table. I have it set up so that when data is added to the first table it triggers the calculation of the running average of the corresponding rows in the second table.

I cant figure out a good way to recover the state however for continuing on the running average. Instead it starts anew each time the trigger is called. How do I save the state of the aggregation at the end of

I have a minimal example at https://dbfiddle.uk/6dqXCwIQ which is also reproduced below.

First making a table that will contain the unsmoothed data as well as a summary table that will hold the smoothed data. There is also a trigger to populate the second table given the first.

/* Making a table that will contain the unsmoothed data */
create table tab (
  symbol text,
  time_to timestamptz,
  something int4,
  PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name2 ON tab USING btree (symbol, time_to);

/* Making a summarisation table too and a trigger to populate it when something is inserted into tab. */
CREATE OR REPLACE FUNCTION smoother_state(state double precision[],
                                          newval double precision, frac double precision)
 RETURNS double precision[]
 LANGUAGE plpgsql
 IMMUTABLE PARALLEL SAFE LEAKPROOF
AS $function$
  declare
      resul double precision := case when state[1] is null then newval else state[1] * (1-frac) + newval * frac end;
  begin
    return ARRAY[resul, coalesce(state[2] + 1, 1)];
  END;
 $function$
;
CREATE OR REPLACE AGGREGATE smoother(val double precision, frac double precision) (
    SFUNC = smoother_state,
    STYPE = double precision[2]
);

create table summ (
  symbol text,
  time_to timestamptz,
  smoothed_something double precision,
  number_of_periods double precision,
  PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name3 ON summ USING btree (symbol, time_to);

/* Making a trigger.
I guess it should read the smoothed_something and number_of_periods values for the symbol
and use them as an initial state vector.
*/
CREATE OR REPLACE FUNCTION do_update()
 RETURNS trigger
 LANGUAGE plpgsql
 PARALLEL SAFE STRICT LEAKPROOF
AS $function$
  DECLARE
    BEGIN
with a as (select symbol, time_to, something,
  smoother(something, 0.3) over (partition by symbol order by time_to) as smoo
FROM newtab
  ), b as (select symbol, time_to, smoo[1] as smoothed_something, smoo[2] as number_of_periods from a)
INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) select * from b;
        RETURN null;
    END;
$function$
;

create trigger update_smoothed after
insert on tab
referencing new table as newtab
for each statement
execute function do_update()

Inserting data the first time both tab and summ look correct.

insert into tab (symbol, time_to, something) values
  ('a', '2022-01-01 00:00:15+01:00'::timestamptz, 15),
  ('b', '2021-01-01 00:00:15+01:00'::timestamptz, 18),
  ('b', '2022-01-01 00:00:15+01:00'::timestamptz, 13),
  ('b', '2023-01-01 00:00:15+01:00'::timestamptz, 11),
  ('b', '2024-01-01 00:00:15+01:00'::timestamptz, 3),
  ('c', '2022-01-01 00:00:16+01:00'::timestamptz, 15),
  ('c', '2022-01-01 00:00:17+01:00'::timestamptz, 150);

Inserting data the second time tab looks correct but summ is not as the aggregation starts from a null state.

insert into tab (symbol, time_to, something) values
  ('a', '2022-06-01 00:00:15+01:00'::timestamptz, 150),
  ('a', '2022-07-01 01:00:15+01:00'::timestamptz, 170),
  ('b', '2024-08-01 00:00:15+01:00'::timestamptz, 180),
  ('b', '2024-09-01 00:00:15+01:00'::timestamptz, 130);

You can see the resultant data in each table below and also in the fiddle site.

The data and showing the data errors

For this problem:

  • Is there a way to modify the trigger to input the correct state when running the aggregation the second time? I am happy to impose that we only ever add data that happens later in time than earlier observations.
  • Is there a better way to accomplish this? I have alot of data so want to avoid having to do a materialised view (as it takes too long to refresh the whole thing).

2

Answers


  1. You could extend the newtab with the latest value for each symbol so that the window function sees the new values preceded by whatever came last. Then the trigger remembers not to re-insert those initial values.

    The aggregate function could also be simplified to just do the thing it’s meant for, and leave counting to count(*).
    demo at db<>fiddle

    CREATE OR REPLACE FUNCTION smoother_state(state double precision,
                                              newval double precision,  
                                              frac double precision)
     RETURNS double precision
     LANGUAGE sql
     IMMUTABLE PARALLEL SAFE LEAKPROOF
     RETURN coalesce(state * (1-frac) + newval * frac,
                     newval);
    CREATE OR REPLACE AGGREGATE smoother(val double precision, 
                                         frac double precision) 
    (   SFUNC = smoother_state,
        STYPE = double precision
    );
    

    Distinct on can pick the latest record per symbol.

    CREATE OR REPLACE FUNCTION do_update()
     RETURNS trigger PARALLEL SAFE STRICT LEAKPROOF
    AS $function$ BEGIN
    WITH bootstraped_newtab AS(
     (select distinct on (symbol)
             symbol,
             time_to,
             smoothed_something as something,
             number_of_periods,
             false as is_new
     from summ --where symbol in (select symbol from newtab)
     order by symbol, time_to desc)
     union all
     select*,1,true from newtab
    ),a AS (
      select symbol, 
             time_to, 
             smoother(something, 0.3)over w as smoothed_something,
             first_value(number_of_periods)over w
               + count(*)over w 
               - 1 as number_of_periods,
             is_new
      FROM bootstraped_newtab
      window w as (partition by symbol order by time_to)
    )
    INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) 
    select symbol, 
           time_to, 
           smoothed_something, 
           number_of_periods 
    from a
    where is_new;--filters out the initial values
    RETURN null; END $function$ LANGUAGE plpgsql;
    
    symbol time_to smoothed_something number_of_periods
    a 2021-12-31 23:00:15+00 15 1
    b 2020-12-31 23:00:15+00 18 1
    b 2021-12-31 23:00:15+00 16.5 2
    b 2022-12-31 23:00:15+00 14.849999999999998 3
    b 2023-12-31 23:00:15+00 11.294999999999998 4
    c 2021-12-31 23:00:16+00 15 1
    c 2021-12-31 23:00:17+00 55.5 2
    a 2022-06-01 00:00:15+01 55.5 2
    a 2022-07-01 01:00:15+01 89.85 3
    b 2024-08-01 00:00:15+01 61.9065 5
    b 2024-09-01 00:00:15+01 82.33455000000001 6

    This seems to fix the problem, as long as you can rely on this assumption

    we only ever add data that happens later in time than earlier observations

    Login or Signup to reply.
  2. An alternative, closer to the idea from the title: the function can accept an init_state parameter:
    demo at db<>fiddle

    CREATE OR REPLACE FUNCTION smoother_state
       (state double precision[],
        newval double precision, 
        frac double precision,
        init_state double precision[] default null::float[] )
     RETURNS double precision[]
     LANGUAGE sql
     IMMUTABLE PARALLEL SAFE LEAKPROOF
     RETURN ARRAY[ coalesce(state[1] * (1-frac) + newval * frac, 
                            init_state[1] * (1-frac) + newval * frac, 
                            newval)
                  ,coalesce(state[2] + 1, 
                            init_state[2] + 1,
                            1)];
    CREATE OR REPLACE AGGREGATE smoother(val double precision, 
                                         frac double precision, 
                                         init_state double precision[]) (
        SFUNC = smoother_state,
        STYPE = double precision[2]
    );
    

    You still need to look up the init state and inject it:

    CREATE OR REPLACE FUNCTION do_update()
     RETURNS trigger PARALLEL SAFE STRICT LEAKPROOF
    AS $function$ BEGIN
    WITH a AS (
      SELECT symbol, 
             time_to, 
             smoother(something, 0.3, init_state)OVER w AS smoo
      FROM newtab
      LEFT JOIN(SELECT DISTINCT ON(symbol)
                       symbol,
                       array[smoothed_something,number_of_periods] AS init_state
                FROM summ
                ORDER BY symbol, time_to DESC) AS init_states
      USING(symbol)
      WINDOW w AS(PARTITION BY symbol ORDER BY time_to)
    )
    INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) 
    SELECT symbol, 
           time_to, 
           smoo[1] as smoothed_something, 
           smoo[2] as number_of_periods 
    FROM a;
    RETURN null; END $function$ LANGUAGE plpgsql;
    
    symbol time_to smoothed_something number_of_periods
    a 2021-12-31 23:00:15+00 15 1
    b 2020-12-31 23:00:15+00 18 1
    b 2021-12-31 23:00:15+00 16.5 2
    b 2022-12-31 23:00:15+00 14.849999999999998 3
    b 2023-12-31 23:00:15+00 11.294999999999998 4
    c 2021-12-31 23:00:16+00 15 1
    c 2021-12-31 23:00:17+00 55.5 2
    a 2022-06-01 00:00:15+01 55.5 2
    a 2022-07-01 01:00:15+01 89.85 3
    b 2024-08-01 00:00:15+01 61.9065 5
    b 2024-09-01 00:00:15+01 82.33455000000001 6
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search