From f2f5236e2ee1d253f149566e4cb6dde5175ab7f7 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Wed, 10 Apr 2024 10:38:35 +0200 Subject: [PATCH] One row per bus position entry. TimescaleDB! --- README.md | 25 +++++++++++++ .../migrations/20240409190001_initial.sql | 28 ++++++++++++--- .../migrations/20240409210612_bus_funcs.sql | 7 +++- .../20240410070316_scheduled-jobs.sql | 36 ++++++++++++++----- 4 files changed, 82 insertions(+), 14 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..476905a --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# Jokullbase + +How to track how late buses are: + - [ ] Download bus route data + - [ ] Insert raw bus positions into timescaledb/postGIS table? + - [ ] "Fan-out" JSON responses into one DB row per datapoint. + - [ ] Download routes. + - There is an endpoint called Timetable, which takes a route number + (along with day of week etc). + - The timetable response includes a list of stops in the route, + along with the route ID. + - There is an endpoint called **BusLocationByStop**, which takes a + stop ID as a parameter. + - This information has upcoming arrivals, included estimated + arrival time, along with the current position of the bus. + - Record stop info every few minutes, notably the arrivals. + - [ ] Analyze actual arrival vs what the stop endpoint said about the + arrival. + - Use PostGIS to compute "bus arrived" event: When bus is within X + meters of a stop on its route, mark that as an arrival event. + - Once we have bus arrival events, we can compare them to arrivals + throughout the day. + - We can then discard the raw bus position data, as it's not needed + to store it: delete every raw data point between the last arrival + and the newly computed one. diff --git a/supabase/migrations/20240409190001_initial.sql b/supabase/migrations/20240409190001_initial.sql index 8377c34..a86c25c 100644 --- a/supabase/migrations/20240409190001_initial.sql +++ b/supabase/migrations/20240409190001_initial.sql @@ -2,13 +2,31 @@ CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog; CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog; CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions; +CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE; --- request ID is inserted by first cron job, then a trigger handles --- insertion of response. CREATE TABLE IF NOT EXISTS raw_bus_positions ( id BIGINT GENERATED ALWAYS AS IDENTITY, - request_id INT NOT NULL, - created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), response_status INT NULL, - response_json JSONB NULL + bus_id text, + trip_id text, + route_number text, + headsign text, + tag text, + direction int, + lat decimal, + lon decimal +); + +-- TimescaleDB Hypertable creation +SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes'); +CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC); + +SELECT + cron.schedule( + 'drop-old-chunks', + '*/10 * * * *', + $$ + SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours'); + $$ ); diff --git a/supabase/migrations/20240409210612_bus_funcs.sql b/supabase/migrations/20240409210612_bus_funcs.sql index 988b87a..7f89b18 100644 --- a/supabase/migrations/20240409210612_bus_funcs.sql +++ b/supabase/migrations/20240409210612_bus_funcs.sql @@ -46,10 +46,15 @@ $$; -- Called by a scheduled job to request data on a timer. A trigger -- handles insertion of the response after record initially inserted. +CREATE TABLE IF NOT EXISTS raw_bus_position_requests( + request_id int, + created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + CREATE OR REPLACE FUNCTION gather_bus_data() RETURNS void LANGUAGE sql as $$ - INSERT INTO raw_bus_positions (request_id) SELECT + INSERT INTO raw_bus_position_requests (request_id) SELECT net.http_get( url := build_api_url(array['1', '2', '3']), headers := '{"Content-Type": "application/json"}'::jsonb diff --git a/supabase/migrations/20240410070316_scheduled-jobs.sql b/supabase/migrations/20240410070316_scheduled-jobs.sql index 787d990..4abd2fc 100644 --- a/supabase/migrations/20240410070316_scheduled-jobs.sql +++ b/supabase/migrations/20240410070316_scheduled-jobs.sql @@ -21,16 +21,36 @@ CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$ -- with newer values difficult. The API also provides a -- lastUpdated value. we require that raw.created is at or after -- that value to be updated. - UPDATE raw_bus_positions raw - SET response_json = NEW.content::jsonb->'data'->'BusLocationByRoute', - response_status = NEW.status_code - WHERE raw.request_id = NEW.id + WITH mapped_response_rows as ( + select x.* + from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x ( + "busId" text, + "tripId" text, + "routeNr" text, + headsign text, + tag text, + direction int, + lat decimal, + lng decimal + ) + ) + INSERT INTO raw_bus_positions + -- This needs to be in the same order as the jsonb_to_recordset call to work properly. + (response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, lat, lon) + SELECT + NEW.status_code, + (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz, + mr.* + FROM mapped_response_rows mr + JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id + WHERE -- fairly generous constraint to account for long requests. - AND raw.created >= (NEW.created - '30 seconds'::interval) + raw_request.created >= (NEW.created - '30 seconds'::interval) -- the response must be at or after we actually sent the request. - AND raw.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz - AND NEW.status_code = 200 - AND raw.response_json IS NULL; + AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz + AND NEW.status_code = 200; + + DELETE FROM raw_bus_position_requests where request_id = NEW.id; -- what if we want to stream other data? we can do multiple updates in -- different tables, where the request id is.