skip to Main Content

I am having N number of tables(around 100) as shown below. These are base tables

CREATE TABLE table1 (
    SignalID NCHAR(36) NOT NULL,
    Timestamp VARCHAR(24) NOT  NULL,
    Value DOUBLE NOT NULL
)

CREATE TABLE table2 (
    SignalID NCHAR(36) NOT NULL,
    Timestamp VARCHAR(24) NOT  NULL,
    Value DOUBLE NOT NULL
)
table3(), table4(), so on.............

Now I have a final table as shown below

CREATE TABLE final_table (
    Timestamp VARCHAR(24) NOT  NULL,
    table1_Value DOUBLE NOT NULL,
    table2_Value DOUBLE NOT NULL,
    table3_Value DOUBLE NOT NULL,
    table4_Value DOUBLE NOT NULL,
    so on,,,,,,,,,,,,,,,
)

Base tables (like table1, table2, table3 and so on) rows will be updated in realtime like 50 rows will be inseted per second. Now what i want is to join all the base tables and make them copied to final_table when timestamp of all the base tables are equal. This should happen in realtime. Whenever a new row is inserted in base tables, final_table should also update with new row.

Is it possible to implemet this?

I want a final table(with all value columns from base tables joined and copied to this) beacuse, I want to access that table using python code for processing the values. As there are 50 rows insterted every second, i am planning to use NOTIFY/LISTEN and Trigger method. It will be difficult to handle N number of triggers if I can create a final table.

I have tried using VIEW(With all the value columns from base table joined and copied to view). But there is issue with view where I cann’t use AFTER INSERT ON trigger on VIEWS. This only works on tables.

Or is there any kind of trigger on VIEW where a pg_notify will be sent to client when new row inserted to VIEW?

2

Answers


  1. For storage and access, it’s easier, more convenient and efficient if you merge these tables and just save source info in an additional field for each entry. Whenever you want to structure the information the way you showed in your example, you can use crosstab() from tablefunc extension. Demo at db<>fiddle:

    create table table_all (
        source_id int not null,
        signal_id text not null,
        signal_timestamp timestamp not null,
        value numeric not null );
    
    create extension tablefunc;
    
    select * from crosstab($s1$ 
      select a.signal_timestamp, b.source_id, round(avg(value),6)
      from (select distinct signal_timestamp from table_all) AS a 
      join (select distinct source_id from table_all) AS b on true
      natural left join table_all
      group by 1,2 order by 1,2
    $s1$) AS ct(signal_timestamp timestamp, "source_0_value" numeric, "source_1_value" numeric, "source_2_value" numeric, "source_3_value" numeric, "source_4_value" numeric, "source_5_value" numeric);
    
    signal_timestamp source_0_value source_1_value source_2_value source_3_value source_4_value source_5_value
    2023-11-21 12:46:49.734515 435.360227 525.596913 668.464315 663.286502 379.664059 null
    2023-11-21 12:46:49.734516 null null 845.618261 231.036637 449.268347 483.163038
    2023-11-21 12:46:49.734517 558.158229 null 377.138522 453.009545 474.747528 null
    2023-11-21 12:46:49.734518 699.251527 316.118611 396.480418 null 857.458398 503.021461
    2023-11-21 12:46:49.734519 830.661170 347.182544 931.305928 509.378886 632.748298 411.723710
    2023-11-21 12:46:49.73452 568.937359 451.818054 253.002697 359.698710 733.449772 null
    1. You can densify this using date_bin() (shown in the fiddle demo).
    2. I’m rounding just for readability, but the avg() might be a good idea in case somewhere, sometime you get two signals from the same source, with the same timestamp. You can swap this for any other aggregate function to process the peer group.
    3. If you want to broadcast something from a trigger, you only need a single one, on this one table.
    4. Whenever you need to search for something in all signals, here it’s a single select from a single source. With multiple tables, you’d have to search everything separately, union, etc.
    5. In your proposed structure, new sources mean adding additional tables followed by altering all related structures and triggers, each time. Here, you can just start feeding the data into the main table and add the column to the list at the end.
    6. By partitioning, you can have the cookie and eat it too: everything interfaces with the main table, but you can attach, detach, manipulate and move around its partitions.
    7. There are some additional time-focused features in TimescaleDB, based on PostgreSQL.

    Here’s how you can query one crosstab row.

    select * from crosstab($s1$ 
      select a.signal_timestamp, 
             b.source_id, 
             round(avg(value),6)
      from (select '2023-11-21 12:46:49.734436'::timestamp signal_timestamp) AS a 
      join (select distinct source_id from table_all) AS b on true
      natural left join table_all
      group by 1,2 order by 1,2
    $s1$) AS ct(signal_timestamp timestamp, "source_0_value" numeric, "source_1_value" numeric, "source_2_value" numeric, "source_3_value" numeric, "source_4_value" numeric, "source_5_value" numeric);
    
    Login or Signup to reply.
  2. Another take: you can keep feeding all signals into one, common table and construct your "incremental materialized view" on the fly, in the same singular trigger which broadcasts notify to your python code.

    create table table_all_signals (
        source_id int not null,
        signal_id text not null,
        signal_timestamp timestamp not null,
        value numeric not null );
    
    create table signals_per_source_over_time(
      signal_timestamp timestamp unique );
    

    Whenever you get a signal from any source, the trigger checks if it’s already collecting signals from that source and if not, adds the new source dynamically. The same trigger relays the incoming information to your python code via pg_notify().

    create function f_signal_collector()returns trigger as $f$
    declare target_value_column text:=concat_ws('_','source',new.source_id,'value');
            new_signals_per_source_over_time record;
    begin 
      if not exists (select from information_schema.columns
                 where table_schema='public'
                 and table_name='signals_per_source_over_time'
                 and column_name=target_value_column)
      then
         execute format ($dynamic_sql$
                         alter table public.signals_per_source_over_time 
                         add column %I numeric;
                         $dynamic_sql$, target_value_column);
      end if;
      execute format ($dynamic_sql$
                         insert into public.signals_per_source_over_time AS n
                         (signal_timestamp,%1$I)
                         values ($1,$2)
                         on conflict (signal_timestamp) do update
                         set %1$I=$2
                         returning n.*;
                      $dynamic_sql$, target_value_column)
               using new.signal_timestamp, new.value
               into new_signals_per_source_over_time;
      perform pg_notify('incoming_signals_feed', 
                         to_jsonb(new_signals_per_source_over_time)::text);
      return new;
    end $f$ language plpgsql;
    
    create trigger t_signal_collector after insert or update on table_all_signals
    for each row execute function f_signal_collector();
    

    Generating some sample signals:

    select setseed(0.2);
    insert into table_all_signals 
    select random()*5,
           gen_random_uuid(),
           '2023-11-21 12:46:49.734534'::timestamp-'100 microseconds'::interval*random(),
           round((random()*1e3)::numeric,6)
    from generate_series(1,1e3,1);
    
    select * from table_all_signals order by signal_timestamp,source_id limit 6;
    
    source_id signal_id signal_timestamp value
    0 f07e06ad-9416-415c-9678-dd8228a292f5 2023-11-21 12:46:49.734434 796.485640
    1 9bbb6f5e-6c5c-4f30-a468-04bcbdad0488 2023-11-21 12:46:49.734434 711.898776
    1 308d53f9-5553-4085-b19a-c40bddd1afb8 2023-11-21 12:46:49.734434 621.605431
    1 ac4ec48f-62d6-40af-8970-636e7bc1858f 2023-11-21 12:46:49.734434 350.894326
    2 36b964a7-2f76-4bb2-9fd2-e3ecc372cebb 2023-11-21 12:46:49.734434 921.574251
    3 3f939a24-cde0-42f7-9d0f-c3fca29d4dd2 2023-11-21 12:46:49.734434 932.034978

    You can see they automatically populate your other table on the fly, adjusting its structure accordingly.

    select * from signals_per_source_over_time order by signal_timestamp limit 5;
    
    signal_timestamp source_5_value source_4_value source_3_value source_0_value source_1_value source_2_value
    2023-11-21 12:46:49.734434 null null 484.759824 796.485640 621.605431 921.574251
    2023-11-21 12:46:49.734435 null 750.385100 833.484142 604.862822 530.592514 674.441018
    2023-11-21 12:46:49.734436 324.038718 45.646781 425.312724 127.251620 982.819011 828.340270
    2023-11-21 12:46:49.734437 68.402196 990.369557 null 499.607598 672.720661 null
    2023-11-21 12:46:49.734438 null 491.357430 406.220658 null 380.443741 455.597406

    fiddle

    1. The trigger keeps writing and updating your structure.
    2. Each change is published via notify for your python code to listen to.
    3. Note that the heavier a table grows, the longer alter table gets, so if you don’t ever need the full table, you can just keep a view for the purpose of looking up signals that match the new one’s timestamp, and feed that into notify.
    4. You’ll get multiple notifications for the same timestamp, whenever a new signal matching an older timestamp arrives. Because of that, it would make more sense to relay just the timestamp, source and value in notify and assemble/update your output structure entirely in python.
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search