jokull/supabase/migrations/20240410070316_scheduled-jo...

66 lines
2.5 KiB
PL/PgSQL

-- downloads data async into the http response table.
SELECT
cron.schedule(
'download-bus-data',
'5 seconds',
$$
select gather_bus_data();
$$
);
-- copies data into a more permanent tabe/more useful format. note: in
-- supabase, you cannot delete triggers on net._http_response
-- directly. but a cascading delete of the trigger function also
-- removes the trigger itself.
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
-- NOTE: the http_response sequence resets on DB restart, so there
-- is potential for old responses to have duplicated ids. we use a
-- constraint to only update newer entries (within the last 30
-- seconds). this should make accidentally overwriting older data
-- with newer values difficult. The API also provides a
-- lastUpdated value. we require that raw.created is at or after
-- that value to be updated.
WITH mapped_response_rows as (
select x.*
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
"busId" text,
"tripId" text,
"routeNr" text,
headsign text,
tag text,
direction int,
lat decimal,
lng decimal
)
)
INSERT INTO raw_bus_positions
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords)
SELECT
NEW.status_code,
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction,
ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates
FROM mapped_response_rows mr
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
WHERE
-- fairly generous constraint to account for long requests.
raw_request.created >= (NEW.created - '30 seconds'::interval)
-- the response must be at or after we actually sent the request.
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
AND NEW.status_code = 200;
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
-- what if we want to stream other data? we can do multiple updates in
-- different tables, where the request id is.
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER copy_http_response
AFTER INSERT ON net._http_response
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION copy_to_raw_table();