jokull/supabase/migrations/20240409190001_initial.sql

232 lines
8.0 KiB
PL/PgSQL

-- extensions
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
CREATE EXTENSION IF NOT EXISTS postgis;
-- Raw bus positions time series table.
CREATE TABLE IF NOT EXISTS raw_bus_positions (
id BIGINT GENERATED ALWAYS AS IDENTITY,
measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
response_status INT NULL,
bus_id text,
trip_id text,
route_number text,
headsign text,
tag text,
direction int,
coords geography(Point, 4326)
);
COMMENT ON TABLE raw_bus_positions IS 'Recordings of bus positions. Mostly transient data.';
COMMENT ON COLUMN raw_bus_positions.coords IS 'GPS coords of bus. PostGIS Point.';
SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC);
CREATE INDEX raw_bus_positions_spatial ON raw_bus_positions USING GIST (coords);
SELECT
cron.schedule(
'drop-old-chunks',
'*/10 * * * *',
$$
SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours');
$$
);
-- Record of all bus stops.
create table bus_stops(
id BIGINT PRIMARY KEY,
stop_name TEXT,
coords geography(Point, 4326),
"type" INT,
rotation INT,
code TEXT,
is_terminal BOOL,
routes TEXT[]
);
CREATE INDEX bus_stops_spatial ON bus_stops USING GIST (coords);
-- Bus Position Related Functions
-- encode straeto API params into the query string by the graphql API.
-- yes, javascript in the database: the lazy way.
CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text
LANGUAGE plv8 STRICT IMMUTABLE AS $$
return Object.keys(obj).map(function(variableName) {
const variableValue = obj[variableName];
if (typeof variableValue == 'object') {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
} else {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
}
}).join('&');
$$;
CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb
LANGUAGE plpgsql AS $$
DECLARE query_params jsonb;
BEGIN
-- This is the unencoded GraphQL request payload.
query_params := '{
"operationName": "BusLocationByRoute",
"variables": { "trips":[], "routes": [] },
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
}
}
}'::jsonb;
-- Add requested bus routes to the `routes` GraphQL variable.
SELECT INTO query_params
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
RETURN query_params;
END
$$;
-- Builds the full request URL to download bus positions of the given
-- route numbers.
CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$
SELECT concat(
'https://straeto.is/graphql?',
encode_straeto_querystring(create_straeto_parameters(bus_routes))
);
$$;
-- Called by a scheduled job to request data on a timer. A trigger
-- handles insertion of the response after record initially inserted.
CREATE TABLE IF NOT EXISTS raw_bus_position_requests(
request_id int,
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION gather_bus_data()
RETURNS void LANGUAGE sql
as $$
INSERT INTO raw_bus_position_requests (request_id) SELECT
net.http_get(
url := build_api_url(array['1', '2', '3']),
headers := '{"Content-Type": "application/json"}'::jsonb
);
$$;
-- Configuration of streaming bus positions into Postgres
-- downloads data async into the http response table.
SELECT
cron.schedule(
'download-bus-data',
'5 seconds',
$$
select gather_bus_data();
$$
);
-- copies data into a more permanent tabe/more useful format. note: in
-- supabase, you cannot delete triggers on net._http_response
-- directly. but a cascading delete of the trigger function also
-- removes the trigger itself.
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
-- NOTE: the http_response sequence resets on DB restart, so there
-- is potential for old responses to have duplicated ids. we use a
-- constraint to only update newer entries (within the last 30
-- seconds). this should make accidentally overwriting older data
-- with newer values difficult. The API also provides a
-- lastUpdated value. we require that raw.created is at or after
-- that value to be updated.
WITH mapped_response_rows as (
select x.*
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
"busId" text,
"tripId" text,
"routeNr" text,
headsign text,
tag text,
direction int,
lat decimal,
lng decimal
)
)
INSERT INTO raw_bus_positions
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords)
SELECT
NEW.status_code,
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction,
ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates
FROM mapped_response_rows mr
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
WHERE
-- fairly generous constraint to account for long requests.
raw_request.created >= (NEW.created - '30 seconds'::interval)
-- the response must be at or after we actually sent the request.
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
AND NEW.status_code = 200;
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
-- what if we want to stream other data? we can do multiple updates in
-- different tables, where the request id is.
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER copy_http_response
AFTER INSERT ON net._http_response
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION copy_to_raw_table();
-- Create a trigger on insert into raw_bus_positions. It runs a
-- spatial query against bus stops and finds out if this measurement
-- is within X meters of a bus stop. If so, queue up to sanity check
-- pipeline. Otherwise, delete from table.
CREATE TABLE potential_arrivals(
id BIGINT GENERATED ALWAYS AS IDENTITY,
raw_pos_id BIGINT NOT NULL,
bus_id text NOT NULL,
bus_coords GEOGRAPHY(Point, 4326) NOT NULL,
measured_at TIMESTAMP WITH TIME ZONE NOT NULL,
stop_id bigint NOT NULL,
distance decimal NOT NULL
);
SELECT create_hypertable('potential_arrivals', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
CREATE INDEX potential_arrives_raw_pos_id ON potential_arrivals (raw_pos_id);
CREATE INDEX potential_arrivals_measures ON potential_arrivals (bus_id, measured_at DESC);
CREATE INDEX potential_arrivals_spatial ON potential_arrivals USING GIST (bus_coords);
CREATE FUNCTION check_near_stop() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO potential_arrivals (raw_pos_id, bus_id, bus_coords, measured_at, stop_id, distance)
SELECT DISTINCT ON (pos.id)
pos.id,
pos.bus_id,
pos.coords,
pos.measured_at,
stops.id,
ST_Distance(pos.coords, stops.coords) as distance
FROM raw_bus_positions pos
join bus_stops stops on ST_DWithin(pos.coords, stops.coords, 30)
WHERE pos.id = NEW.id;
-- Remove any raw positions that are not potentially a bus
-- stopping at a bus stop.
DELETE FROM raw_bus_positions pos
WHERE pos.id = NEW.id and pos.id not in (select raw_pos_id from potential_arrivals);
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER add_potential_stop
AFTER INSERT ON raw_bus_positions
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION check_near_stop();