From f2bf0bd150653f4e5da12799ea694eedefa32b59 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Wed, 10 Apr 2024 17:00:22 +0200 Subject: [PATCH] Streaming potential arrivals --- README.md | 13 +- supabase/functions/deno.lock | 17 +- .../functions/download-bus-stops/index.ts | 145 +++++++++++++ .../migrations/20240409190001_initial.sql | 197 +++++++++++++++++- .../migrations/20240409210612_bus_funcs.sql | 62 ------ .../20240410070316_scheduled-jobs.sql | 65 ------ 6 files changed, 367 insertions(+), 132 deletions(-) create mode 100644 supabase/functions/download-bus-stops/index.ts delete mode 100644 supabase/migrations/20240409210612_bus_funcs.sql delete mode 100644 supabase/migrations/20240410070316_scheduled-jobs.sql diff --git a/README.md b/README.md index 5f049d6..9bb88c3 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ How to track how late buses are: - [x] Download bus route data - [x] Insert raw bus positions into timescaledb/postGIS table? - [x] "Fan-out" JSON responses into one DB row per datapoint. - - [ ] Download routes. + - [ ] Download routes/stops. + - There is an endpoint called Stops. - There is an endpoint called Timetable, which takes a route number (along with day of week etc). - The timetable response includes a list of stops in the route, @@ -38,3 +39,13 @@ Arrival computation: - These are arrival times for this bus ID, at each stop. - Insert into arrivals. - After all arrivals computed, drop all raw positions for this bus from DB. + +Useful stuff: + +``` +SELECT buses_at_stops.* FROM (SELECT DISTINCT ON (s.id, h.measured_at) s.id, s.stop_name, ST_AsText(s.coords) as stop_coords, h.measured_at, h.bus_id, ST_AsText(h.coords) as bus_coords, ST_Distance(s.coords, h.coords) as distance + FROM bus_stops s + JOIN raw_bus_positions h ON ST_DWithin(s.coords, h.coords, 30)) AS buses_at_stops + WHERE buses_at_stops.bus_id like '1%' + ORDER BY buses_at_stops.measured_at desc; +``` diff --git a/supabase/functions/deno.lock b/supabase/functions/deno.lock index 7d48b18..cb57190 100644 --- a/supabase/functions/deno.lock +++ b/supabase/functions/deno.lock @@ -1,7 +1,8 @@ { "version": "3", "redirects": { - "https://deno.land/x/base64/base64url.ts": "https://deno.land/x/base64@v0.2.1/base64url.ts" + "https://deno.land/x/base64/base64url.ts": "https://deno.land/x/base64@v0.2.1/base64url.ts", + "https://esm.sh/@supabase/supabase-js@2": "https://esm.sh/@supabase/supabase-js@2.42.0" }, "remote": { "https://deno.land/std@0.177.0/async/abortable.ts": "73acfb3ed7261ce0d930dbe89e43db8d34e017b063cf0eaa7d215477bf53442e", @@ -150,6 +151,18 @@ "https://deno.land/x/upstash_redis@v1.19.3/pkg/types.ts": "f9363a02964f5c2af8f4f054cba2cfde222af8eb9e908c37ca508a6399ac9e29", "https://deno.land/x/upstash_redis@v1.19.3/pkg/util.ts": "bda4f5eb90ff82d2443ec8908a376079a44af328ee64390d2e5ee7687a171556", "https://deno.land/x/upstash_redis@v1.19.3/version.ts": "fb553d493437bc431a81483e2940d14fc1a31e476128ef89e1e77db317ea4baf", - "https://denopkg.com/chiefbiiko/std-encoding@v1.0.0/mod.ts": "4a927e5cd1d9b080d72881eb285b3b94edb6dadc1828aeb194117645f4481ac0" + "https://denopkg.com/chiefbiiko/std-encoding@v1.0.0/mod.ts": "4a927e5cd1d9b080d72881eb285b3b94edb6dadc1828aeb194117645f4481ac0", + "https://esm.sh/@supabase/supabase-js@2.42.0": "4828e9cc4af2357c70f1943a243f3235031432ed41f758c30655be969512575e", + "https://esm.sh/v135/@supabase/auth-js@2.63.0/denonext/auth-js.mjs": "626ae8dd2177c4437d6ba38f9aa1f97bd8e19c59386bd7edda465cc71d34e3f1", + "https://esm.sh/v135/@supabase/functions-js@2.2.2/denonext/functions-js.mjs": "2115a8e67eb5058e0633fcea1bc304d8b1a7a8534ac194e18caf67f3a112a5e4", + "https://esm.sh/v135/@supabase/node-fetch@2.6.15/denonext/node-fetch.mjs": "efad00ea3d4cbe1bee688836ef75339d49a1981bc5728a13e9ad5f26791d5efb", + "https://esm.sh/v135/@supabase/postgrest-js@1.15.0/denonext/postgrest-js.mjs": "824ad68b13c6232f3e4520eae5a2ca08c937d43c62485df69dc6520062dbd0de", + "https://esm.sh/v135/@supabase/realtime-js@2.9.3/denonext/realtime-js.mjs": "22f025f3a7f744aad39f538fdac296095eb8ec33974c24dc7168571493c707ec", + "https://esm.sh/v135/@supabase/storage-js@2.5.5/denonext/storage-js.mjs": "66e29e4e55c7d396503e2d1d9376cfbc34f046b962bce4524c2d80b209fff413", + "https://esm.sh/v135/@supabase/supabase-js@2.42.0/denonext/supabase-js.mjs": "0269571ba1b3e42fc36456b8c74fe24b0dd2f29e68e8e76e019704f9506fdd0c", + "https://esm.sh/v135/bufferutil@4.0.8/denonext/bufferutil.mjs": "60a4618cbd1a5cb24935c55590b793d4ecb33862357d32e1d4614a0bbb90947f", + "https://esm.sh/v135/node-gyp-build@4.6.1/denonext/node-gyp-build.mjs": "5d28b312f145a6cb2ec0dbdd80a7d34c0e0e6b5dcada65411d8bcff6c8991cc6", + "https://esm.sh/v135/utf-8-validate@6.0.3/denonext/utf-8-validate.mjs": "410c48d66840e987e474a4849cd25829817415cedd25466280effb1287d05aa5", + "https://esm.sh/v135/ws@8.16.0/denonext/ws.mjs": "0fa0c00b69577ba36d0a36001329b2cec91498cb2e33e329fc76aa6d51a0d54d" } } diff --git a/supabase/functions/download-bus-stops/index.ts b/supabase/functions/download-bus-stops/index.ts new file mode 100644 index 0000000..9fd8ca1 --- /dev/null +++ b/supabase/functions/download-bus-stops/index.ts @@ -0,0 +1,145 @@ +import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; + +interface StraetoVariables { + [key: string]: number | string; +} + +interface StraetoQuery { + operationName: string; + variables: StraetoVariables; + extensions: StraetoQueryExtensions; +} + +interface StraetoQueryExtensions { + persistedQuery: StraetoPersistedQuery; +} + +interface StraetoPersistedQuery { + version: number; + sha256Hash: string; +} + +interface StraetoBusStop { + id: number; + name: string; + lat: number; + lon: number; + type: number; + rotation: number; + code: string | null; + isTerminal: boolean; + routes: Array; + alerts: Array; + __typename: string; +} + +interface DbBusStop { + id: number; + stop_name: string; + coords: string; // TODO better geometry type representation + type: number; + rotation: number; + code: string | null; + is_terminal: boolean; + routes: Array; +} + +function toDbBusStop(straetoStop: StraetoBusStop): DbBusStop { + return { + id: straetoStop.id, + code: straetoStop.code, + stop_name: straetoStop.name, + coords: `POINT(${straetoStop.lat} ${straetoStop.lon})`, + type: straetoStop.type, + rotation: straetoStop.rotation, + is_terminal: straetoStop.isTerminal, + routes: straetoStop.routes, + }; +} + +function toQuerystring(obj: StraetoQuery) { + return Object.keys(obj).map(function (variableName) { + const variableValue = obj[variableName as keyof typeof obj]; + if (typeof variableValue == "object") { + return encodeURIComponent(variableName) + "=" + + encodeURIComponent(JSON.stringify(variableValue)); + } else { + return encodeURIComponent(variableName) + "=" + + encodeURIComponent(variableValue); + } + }).join("&"); +} + +const graphqlParams: StraetoQuery = { + operationName: "Stops", + variables: {}, + extensions: { + persistedQuery: { + version: 1, + sha256Hash: + "6303de055cc42db47a4e1f1cc941b8c86d11c147903bed231e6d4bddcf0e1312", + }, + }, +}; + +async function getStops(): Promise> { + const apiUrl = `https://straeto.is/graphql?${toQuerystring(graphqlParams)}`; + + const opts = { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }; + + const resp = await fetch(apiUrl, opts); + const body = await resp.json(); + return body.data.GtfsStops.results; +} + +Deno.serve(async (req) => { + const busStops = await getStops(); + + const authHeader = req.headers.get("Authorization")!; + const supabaseClient = createClient( + Deno.env.get("SUPABASE_URL") ?? "", + Deno.env.get("SUPABASE_ANON_KEY") ?? "", + { global: { headers: { Authorization: authHeader } } }, + ); + + console.log("inserting", busStops.length, "rows"); + const toInsert = busStops.map(toDbBusStop); + + const insert = await supabaseClient.from("bus_stops") + .upsert(toInsert, { ignoreDuplicates: true }); + + console.log(insert.statusText, insert.error?.code, insert.error?.message); + + console.log("rows inserted", insert.count); + + // const { name } = await req.json() + // const data = { + // message: `Hello ${name}!`, + // } + + return new Response( + JSON.stringify({ + "status": insert.statusText, + "inserted": toInsert.length, + "error": insert.error, + }), + { headers: { "Content-Type": "application/json" } }, + ); +}); + +/* To invoke locally: + + 1. Run `supabase start` (see: https://supabase.com/docs/reference/cli/supabase-start) + 2. Make an HTTP request: + + curl -i --location --request POST 'http://127.0.0.1:54321/functions/v1/download-bus-stops' \ + --header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0' \ + --header 'Content-Type: application/json' \ + --data '{"name":"Functions"}' + +*/ diff --git a/supabase/migrations/20240409190001_initial.sql b/supabase/migrations/20240409190001_initial.sql index 5cdd411..d772729 100644 --- a/supabase/migrations/20240409190001_initial.sql +++ b/supabase/migrations/20240409190001_initial.sql @@ -5,6 +5,7 @@ CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions; CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; CREATE EXTENSION IF NOT EXISTS postgis; +-- Raw bus positions time series table. CREATE TABLE IF NOT EXISTS raw_bus_positions ( id BIGINT GENERATED ALWAYS AS IDENTITY, measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), @@ -15,12 +16,15 @@ CREATE TABLE IF NOT EXISTS raw_bus_positions ( headsign text, tag text, direction int, - coords geometry(Point, 4326) + coords geography(Point, 4326) ); --- TimescaleDB Hypertable creation +COMMENT ON TABLE raw_bus_positions IS 'Recordings of bus positions. Mostly transient data.'; +COMMENT ON COLUMN raw_bus_positions.coords IS 'GPS coords of bus. PostGIS Point.'; + SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes'); CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC); +CREATE INDEX raw_bus_positions_spatial ON raw_bus_positions USING GIST (coords); SELECT cron.schedule( @@ -30,3 +34,192 @@ SELECT SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours'); $$ ); + +-- Record of all bus stops. +create table bus_stops( + id BIGINT PRIMARY KEY, + stop_name TEXT, + coords geography(Point, 4326), + "type" INT, + rotation INT, + code TEXT, + is_terminal BOOL, + routes TEXT[] +); + +CREATE INDEX bus_stops_spatial ON bus_stops USING GIST (coords); + +-- Bus Position Related Functions +-- encode straeto API params into the query string by the graphql API. +-- yes, javascript in the database: the lazy way. +CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text +LANGUAGE plv8 STRICT IMMUTABLE AS $$ + return Object.keys(obj).map(function(variableName) { + const variableValue = obj[variableName]; + if (typeof variableValue == 'object') { + return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue)); + } else { + return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue); + } + }).join('&'); +$$; + +CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb +LANGUAGE plpgsql AS $$ + DECLARE query_params jsonb; + BEGIN + -- This is the unencoded GraphQL request payload. + query_params := '{ + "operationName": "BusLocationByRoute", + "variables": { "trips":[], "routes": [] }, + "extensions": { + "persistedQuery": { + "version": 1, + "sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8" + } + } + }'::jsonb; + + -- Add requested bus routes to the `routes` GraphQL variable. + SELECT INTO query_params + jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb); + RETURN query_params; + END +$$; + +-- Builds the full request URL to download bus positions of the given +-- route numbers. +CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$ + SELECT concat( + 'https://straeto.is/graphql?', + encode_straeto_querystring(create_straeto_parameters(bus_routes)) + ); +$$; + +-- Called by a scheduled job to request data on a timer. A trigger +-- handles insertion of the response after record initially inserted. +CREATE TABLE IF NOT EXISTS raw_bus_position_requests( + request_id int, + created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE OR REPLACE FUNCTION gather_bus_data() +RETURNS void LANGUAGE sql +as $$ + INSERT INTO raw_bus_position_requests (request_id) SELECT + net.http_get( + url := build_api_url(array['1', '2', '3']), + headers := '{"Content-Type": "application/json"}'::jsonb + ); +$$; + +-- Configuration of streaming bus positions into Postgres +-- downloads data async into the http response table. +SELECT + cron.schedule( + 'download-bus-data', + '5 seconds', + $$ + select gather_bus_data(); + $$ + ); + +-- copies data into a more permanent tabe/more useful format. note: in +-- supabase, you cannot delete triggers on net._http_response +-- directly. but a cascading delete of the trigger function also +-- removes the trigger itself. +CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$ + BEGIN + -- NOTE: the http_response sequence resets on DB restart, so there + -- is potential for old responses to have duplicated ids. we use a + -- constraint to only update newer entries (within the last 30 + -- seconds). this should make accidentally overwriting older data + -- with newer values difficult. The API also provides a + -- lastUpdated value. we require that raw.created is at or after + -- that value to be updated. + WITH mapped_response_rows as ( + select x.* + from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x ( + "busId" text, + "tripId" text, + "routeNr" text, + headsign text, + tag text, + direction int, + lat decimal, + lng decimal + ) + ) + INSERT INTO raw_bus_positions + (response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords) + SELECT + NEW.status_code, + (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz, + mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction, + ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates + FROM mapped_response_rows mr + JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id + WHERE + -- fairly generous constraint to account for long requests. + raw_request.created >= (NEW.created - '30 seconds'::interval) + -- the response must be at or after we actually sent the request. + AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz + AND NEW.status_code = 200; + + DELETE FROM raw_bus_position_requests where request_id = NEW.id; + + -- what if we want to stream other data? we can do multiple updates in + -- different tables, where the request id is. + RETURN NULL; -- this is an AFTER trigger + END; +$$; + +CREATE CONSTRAINT TRIGGER copy_http_response + AFTER INSERT ON net._http_response + DEFERRABLE INITIALLY DEFERRED + FOR EACH ROW + EXECUTE FUNCTION copy_to_raw_table(); + + +-- Create a trigger on insert into raw_bus_positions. It runs a +-- spatial query against bus stops and finds out if this measurement +-- is within X meters of a bus stop. If so, queue up to sanity check +-- pipeline. Otherwise, delete from table. +CREATE TABLE potential_arrivals( + id BIGINT GENERATED ALWAYS AS IDENTITY, + raw_pos_id BIGINT NOT NULL, + bus_id text NOT NULL, + bus_coords GEOGRAPHY(Point, 4326) NOT NULL, + measured_at TIMESTAMP WITH TIME ZONE NOT NULL, + stop_id bigint NOT NULL, + distance decimal NOT NULL +); + +CREATE FUNCTION check_near_stop() RETURNS trigger LANGUAGE plpgsql AS $$ + BEGIN + INSERT INTO potential_arrivals (raw_pos_id, bus_id, bus_coords, measured_at, stop_id, distance) + SELECT DISTINCT ON (pos.id) + pos.id, + pos.bus_id, + pos.coords, + pos.measured_at, + stops.id, + ST_Distance(pos.coords, stops.coords) as distance + FROM raw_bus_positions pos + join bus_stops stops on ST_DWithin(pos.coords, stops.coords, 30) + WHERE pos.id = NEW.id; + + -- Remove any raw positions that are not potentially a bus + -- stopping at a bus stop. + DELETE FROM raw_bus_positions pos + WHERE pos.id = NEW.id and pos.id not in (select raw_pos_id from potential_arrivals); + + RETURN NULL; -- this is an AFTER trigger + END; +$$; + +CREATE CONSTRAINT TRIGGER add_potential_stop + AFTER INSERT ON raw_bus_positions + DEFERRABLE INITIALLY DEFERRED + FOR EACH ROW + EXECUTE FUNCTION check_near_stop(); diff --git a/supabase/migrations/20240409210612_bus_funcs.sql b/supabase/migrations/20240409210612_bus_funcs.sql deleted file mode 100644 index 7f89b18..0000000 --- a/supabase/migrations/20240409210612_bus_funcs.sql +++ /dev/null @@ -1,62 +0,0 @@ --- encode straeto API params into the query string by the graphql API. --- yes, javascript in the database: the lazy way. -CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text -LANGUAGE plv8 STRICT IMMUTABLE AS $$ - return Object.keys(obj).map(function(variableName) { - const variableValue = obj[variableName]; - if (typeof variableValue == 'object') { - return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue)); - } else { - return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue); - } - }).join('&'); -$$; - -CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb -LANGUAGE plpgsql AS $$ - DECLARE query_params jsonb; - BEGIN - -- This is the unencoded GraphQL request payload. - query_params := '{ - "operationName": "BusLocationByRoute", - "variables": { "trips":[], "routes": [] }, - "extensions": { - "persistedQuery": { - "version": 1, - "sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8" - } - } - }'::jsonb; - - -- Add requested bus routes to the `routes` GraphQL variable. - SELECT INTO query_params - jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb); - RETURN query_params; - END -$$; - --- Builds the full request URL to download bus positions of the given --- route numbers. -CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$ - SELECT concat( - 'https://straeto.is/graphql?', - encode_straeto_querystring(create_straeto_parameters(bus_routes)) - ); -$$; - --- Called by a scheduled job to request data on a timer. A trigger --- handles insertion of the response after record initially inserted. -CREATE TABLE IF NOT EXISTS raw_bus_position_requests( - request_id int, - created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() -); - -CREATE OR REPLACE FUNCTION gather_bus_data() -RETURNS void LANGUAGE sql -as $$ - INSERT INTO raw_bus_position_requests (request_id) SELECT - net.http_get( - url := build_api_url(array['1', '2', '3']), - headers := '{"Content-Type": "application/json"}'::jsonb - ); -$$; diff --git a/supabase/migrations/20240410070316_scheduled-jobs.sql b/supabase/migrations/20240410070316_scheduled-jobs.sql deleted file mode 100644 index c74c6a6..0000000 --- a/supabase/migrations/20240410070316_scheduled-jobs.sql +++ /dev/null @@ -1,65 +0,0 @@ --- downloads data async into the http response table. -SELECT - cron.schedule( - 'download-bus-data', - '5 seconds', - $$ - select gather_bus_data(); - $$ - ); - --- copies data into a more permanent tabe/more useful format. note: in --- supabase, you cannot delete triggers on net._http_response --- directly. but a cascading delete of the trigger function also --- removes the trigger itself. -CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$ - BEGIN - -- NOTE: the http_response sequence resets on DB restart, so there - -- is potential for old responses to have duplicated ids. we use a - -- constraint to only update newer entries (within the last 30 - -- seconds). this should make accidentally overwriting older data - -- with newer values difficult. The API also provides a - -- lastUpdated value. we require that raw.created is at or after - -- that value to be updated. - WITH mapped_response_rows as ( - select x.* - from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x ( - "busId" text, - "tripId" text, - "routeNr" text, - headsign text, - tag text, - direction int, - lat decimal, - lng decimal - ) - ) - INSERT INTO raw_bus_positions - (response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords) - SELECT - NEW.status_code, - (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz, - mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction, - ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates - FROM mapped_response_rows mr - JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id - WHERE - -- fairly generous constraint to account for long requests. - raw_request.created >= (NEW.created - '30 seconds'::interval) - -- the response must be at or after we actually sent the request. - AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz - AND NEW.status_code = 200; - - DELETE FROM raw_bus_position_requests where request_id = NEW.id; - - -- what if we want to stream other data? we can do multiple updates in - -- different tables, where the request id is. - RETURN NULL; -- this is an AFTER trigger - END; -$$; - -CREATE CONSTRAINT TRIGGER copy_http_response - AFTER INSERT ON net._http_response - DEFERRABLE INITIALLY DEFERRED - FOR EACH ROW - EXECUTE FUNCTION copy_to_raw_table();