230 lines
7.9 KiB
PL/PgSQL
230 lines
7.9 KiB
PL/PgSQL
-- extensions
|
|
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
|
|
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
|
|
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
|
|
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
|
|
CREATE EXTENSION IF NOT EXISTS postgis;
|
|
|
|
-- Raw bus positions time series table.
|
|
CREATE TABLE IF NOT EXISTS raw_bus_positions (
|
|
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
|
measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
response_status INT NULL,
|
|
bus_id text,
|
|
trip_id text,
|
|
route_number text,
|
|
headsign text,
|
|
tag text,
|
|
direction int,
|
|
coords geography(Point, 4326)
|
|
);
|
|
|
|
COMMENT ON TABLE raw_bus_positions IS 'Recordings of bus positions. Mostly transient data.';
|
|
COMMENT ON COLUMN raw_bus_positions.coords IS 'GPS coords of bus. PostGIS Point.';
|
|
|
|
SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
|
|
CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC);
|
|
CREATE INDEX raw_bus_positions_spatial ON raw_bus_positions USING GIST (coords);
|
|
|
|
SELECT
|
|
cron.schedule(
|
|
'drop-old-chunks',
|
|
'*/10 * * * *',
|
|
$$
|
|
SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours');
|
|
$$
|
|
);
|
|
|
|
-- Record of all bus stops.
|
|
create table bus_stops(
|
|
id BIGINT PRIMARY KEY,
|
|
stop_name TEXT,
|
|
coords geography(Point, 4326),
|
|
"type" INT,
|
|
rotation INT,
|
|
code TEXT,
|
|
is_terminal BOOL,
|
|
routes TEXT[]
|
|
);
|
|
|
|
CREATE INDEX bus_stops_spatial ON bus_stops USING GIST (coords);
|
|
|
|
-- Bus Position Related Functions
|
|
-- encode straeto API params into the query string by the graphql API.
|
|
-- yes, javascript in the database: the lazy way.
|
|
CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text
|
|
LANGUAGE plv8 STRICT IMMUTABLE AS $$
|
|
return Object.keys(obj).map(function(variableName) {
|
|
const variableValue = obj[variableName];
|
|
if (typeof variableValue == 'object') {
|
|
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
|
|
} else {
|
|
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
|
|
}
|
|
}).join('&');
|
|
$$;
|
|
|
|
CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb
|
|
LANGUAGE plpgsql AS $$
|
|
DECLARE query_params jsonb;
|
|
BEGIN
|
|
-- This is the unencoded GraphQL request payload.
|
|
query_params := '{
|
|
"operationName": "BusLocationByRoute",
|
|
"variables": { "trips":[], "routes": [] },
|
|
"extensions": {
|
|
"persistedQuery": {
|
|
"version": 1,
|
|
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
|
|
}
|
|
}
|
|
}'::jsonb;
|
|
|
|
-- Add requested bus routes to the `routes` GraphQL variable.
|
|
SELECT INTO query_params
|
|
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
|
|
RETURN query_params;
|
|
END
|
|
$$;
|
|
|
|
-- Builds the full request URL to download bus positions of the given
|
|
-- route numbers.
|
|
CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$
|
|
SELECT concat(
|
|
'https://straeto.is/graphql?',
|
|
encode_straeto_querystring(create_straeto_parameters(bus_routes))
|
|
);
|
|
$$;
|
|
|
|
-- Called by a scheduled job to request data on a timer. A trigger
|
|
-- handles insertion of the response after record initially inserted.
|
|
CREATE TABLE IF NOT EXISTS raw_bus_position_requests(
|
|
request_id int,
|
|
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE OR REPLACE FUNCTION gather_bus_data()
|
|
RETURNS void LANGUAGE sql
|
|
as $$
|
|
INSERT INTO raw_bus_position_requests (request_id) SELECT
|
|
net.http_get(
|
|
url := build_api_url(array['1', '2', '3']),
|
|
headers := '{"Content-Type": "application/json"}'::jsonb
|
|
);
|
|
$$;
|
|
|
|
-- Configuration of streaming bus positions into Postgres
|
|
-- downloads data async into the http response table.
|
|
SELECT
|
|
cron.schedule(
|
|
'download-bus-data',
|
|
'5 seconds',
|
|
$$
|
|
select gather_bus_data();
|
|
$$
|
|
);
|
|
|
|
-- copies data into a more permanent tabe/more useful format. note: in
|
|
-- supabase, you cannot delete triggers on net._http_response
|
|
-- directly. but a cascading delete of the trigger function also
|
|
-- removes the trigger itself.
|
|
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
-- NOTE: the http_response sequence resets on DB restart, so there
|
|
-- is potential for old responses to have duplicated ids. we use a
|
|
-- constraint to only update newer entries (within the last 30
|
|
-- seconds). this should make accidentally overwriting older data
|
|
-- with newer values difficult. The API also provides a
|
|
-- lastUpdated value. we require that raw.created is at or after
|
|
-- that value to be updated.
|
|
WITH mapped_response_rows as (
|
|
select x.*
|
|
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
|
|
"busId" text,
|
|
"tripId" text,
|
|
"routeNr" text,
|
|
headsign text,
|
|
tag text,
|
|
direction int,
|
|
lat decimal,
|
|
lng decimal
|
|
)
|
|
)
|
|
INSERT INTO raw_bus_positions
|
|
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords)
|
|
SELECT
|
|
NEW.status_code,
|
|
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
|
|
mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction,
|
|
ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates
|
|
FROM mapped_response_rows mr
|
|
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
|
|
WHERE
|
|
-- fairly generous constraint to account for long requests.
|
|
raw_request.created >= (NEW.created - '30 seconds'::interval)
|
|
-- the response must be at or after we actually sent the request.
|
|
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
|
|
AND NEW.status_code = 200;
|
|
|
|
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
|
|
|
|
-- what if we want to stream other data? we can do multiple updates in
|
|
-- different tables, where the request id is.
|
|
RETURN NULL; -- this is an AFTER trigger
|
|
END;
|
|
$$;
|
|
|
|
CREATE CONSTRAINT TRIGGER copy_http_response
|
|
AFTER INSERT ON net._http_response
|
|
DEFERRABLE INITIALLY DEFERRED
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION copy_to_raw_table();
|
|
|
|
|
|
-- Create a trigger on insert into raw_bus_positions. It runs a
|
|
-- spatial query against bus stops and finds out if this measurement
|
|
-- is within X meters of a bus stop. If so, queue up to sanity check
|
|
-- pipeline. Otherwise, delete from table.
|
|
CREATE TABLE potential_arrivals(
|
|
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
|
raw_pos_id BIGINT NOT NULL,
|
|
bus_id text NOT NULL,
|
|
bus_coords GEOGRAPHY(Point, 4326) NOT NULL,
|
|
measured_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
stop_id bigint NOT NULL,
|
|
distance decimal NOT NULL
|
|
);
|
|
|
|
SELECT create_hypertable('potential_arrivals', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
|
|
CREATE INDEX potential_arrivals_measures ON potential_arrivals (bus_id, measured_at DESC);
|
|
CREATE INDEX potential_arrivals_spatial ON potential_arrivals USING GIST (bus_coords);
|
|
|
|
CREATE FUNCTION check_near_stop() RETURNS trigger LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
INSERT INTO potential_arrivals (raw_pos_id, bus_id, bus_coords, measured_at, stop_id, distance)
|
|
SELECT DISTINCT ON (pos.id)
|
|
pos.id,
|
|
pos.bus_id,
|
|
pos.coords,
|
|
pos.measured_at,
|
|
stops.id,
|
|
ST_Distance(pos.coords, stops.coords) as distance
|
|
FROM raw_bus_positions pos
|
|
join bus_stops stops on ST_DWithin(pos.coords, stops.coords, 30)
|
|
WHERE pos.id = NEW.id;
|
|
|
|
-- Remove any raw positions that are not potentially a bus
|
|
-- stopping at a bus stop.
|
|
DELETE FROM raw_bus_positions pos
|
|
WHERE pos.id = NEW.id and pos.id not in (select raw_pos_id from potential_arrivals);
|
|
|
|
RETURN NULL; -- this is an AFTER trigger
|
|
END;
|
|
$$;
|
|
|
|
CREATE CONSTRAINT TRIGGER add_potential_stop
|
|
AFTER INSERT ON raw_bus_positions
|
|
DEFERRABLE INITIALLY DEFERRED
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION check_near_stop();
|