One row per bus position entry. TimescaleDB!
This commit is contained in:
parent
4e73f47f07
commit
f2f5236e2e
|
@ -0,0 +1,25 @@
|
||||||
|
# Jokullbase
|
||||||
|
|
||||||
|
How to track how late buses are:
|
||||||
|
- [ ] Download bus route data
|
||||||
|
- [ ] Insert raw bus positions into timescaledb/postGIS table?
|
||||||
|
- [ ] "Fan-out" JSON responses into one DB row per datapoint.
|
||||||
|
- [ ] Download routes.
|
||||||
|
- There is an endpoint called Timetable, which takes a route number
|
||||||
|
(along with day of week etc).
|
||||||
|
- The timetable response includes a list of stops in the route,
|
||||||
|
along with the route ID.
|
||||||
|
- There is an endpoint called **BusLocationByStop**, which takes a
|
||||||
|
stop ID as a parameter.
|
||||||
|
- This information has upcoming arrivals, included estimated
|
||||||
|
arrival time, along with the current position of the bus.
|
||||||
|
- Record stop info every few minutes, notably the arrivals.
|
||||||
|
- [ ] Analyze actual arrival vs what the stop endpoint said about the
|
||||||
|
arrival.
|
||||||
|
- Use PostGIS to compute "bus arrived" event: When bus is within X
|
||||||
|
meters of a stop on its route, mark that as an arrival event.
|
||||||
|
- Once we have bus arrival events, we can compare them to arrivals
|
||||||
|
throughout the day.
|
||||||
|
- We can then discard the raw bus position data, as it's not needed
|
||||||
|
to store it: delete every raw data point between the last arrival
|
||||||
|
and the newly computed one.
|
|
@ -2,13 +2,31 @@
|
||||||
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
|
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
|
||||||
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
|
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
|
||||||
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
|
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
|
||||||
|
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
|
||||||
|
|
||||||
-- request ID is inserted by first cron job, then a trigger handles
|
|
||||||
-- insertion of response.
|
|
||||||
CREATE TABLE IF NOT EXISTS raw_bus_positions (
|
CREATE TABLE IF NOT EXISTS raw_bus_positions (
|
||||||
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
||||||
request_id INT NOT NULL,
|
measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
||||||
response_status INT NULL,
|
response_status INT NULL,
|
||||||
response_json JSONB NULL
|
bus_id text,
|
||||||
|
trip_id text,
|
||||||
|
route_number text,
|
||||||
|
headsign text,
|
||||||
|
tag text,
|
||||||
|
direction int,
|
||||||
|
lat decimal,
|
||||||
|
lon decimal
|
||||||
|
);
|
||||||
|
|
||||||
|
-- TimescaleDB Hypertable creation
|
||||||
|
SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
|
||||||
|
CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
cron.schedule(
|
||||||
|
'drop-old-chunks',
|
||||||
|
'*/10 * * * *',
|
||||||
|
$$
|
||||||
|
SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours');
|
||||||
|
$$
|
||||||
);
|
);
|
||||||
|
|
|
@ -46,10 +46,15 @@ $$;
|
||||||
|
|
||||||
-- Called by a scheduled job to request data on a timer. A trigger
|
-- Called by a scheduled job to request data on a timer. A trigger
|
||||||
-- handles insertion of the response after record initially inserted.
|
-- handles insertion of the response after record initially inserted.
|
||||||
|
CREATE TABLE IF NOT EXISTS raw_bus_position_requests(
|
||||||
|
request_id int,
|
||||||
|
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION gather_bus_data()
|
CREATE OR REPLACE FUNCTION gather_bus_data()
|
||||||
RETURNS void LANGUAGE sql
|
RETURNS void LANGUAGE sql
|
||||||
as $$
|
as $$
|
||||||
INSERT INTO raw_bus_positions (request_id) SELECT
|
INSERT INTO raw_bus_position_requests (request_id) SELECT
|
||||||
net.http_get(
|
net.http_get(
|
||||||
url := build_api_url(array['1', '2', '3']),
|
url := build_api_url(array['1', '2', '3']),
|
||||||
headers := '{"Content-Type": "application/json"}'::jsonb
|
headers := '{"Content-Type": "application/json"}'::jsonb
|
||||||
|
|
|
@ -21,16 +21,36 @@ CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
|
||||||
-- with newer values difficult. The API also provides a
|
-- with newer values difficult. The API also provides a
|
||||||
-- lastUpdated value. we require that raw.created is at or after
|
-- lastUpdated value. we require that raw.created is at or after
|
||||||
-- that value to be updated.
|
-- that value to be updated.
|
||||||
UPDATE raw_bus_positions raw
|
WITH mapped_response_rows as (
|
||||||
SET response_json = NEW.content::jsonb->'data'->'BusLocationByRoute',
|
select x.*
|
||||||
response_status = NEW.status_code
|
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
|
||||||
WHERE raw.request_id = NEW.id
|
"busId" text,
|
||||||
|
"tripId" text,
|
||||||
|
"routeNr" text,
|
||||||
|
headsign text,
|
||||||
|
tag text,
|
||||||
|
direction int,
|
||||||
|
lat decimal,
|
||||||
|
lng decimal
|
||||||
|
)
|
||||||
|
)
|
||||||
|
INSERT INTO raw_bus_positions
|
||||||
|
-- This needs to be in the same order as the jsonb_to_recordset call to work properly.
|
||||||
|
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, lat, lon)
|
||||||
|
SELECT
|
||||||
|
NEW.status_code,
|
||||||
|
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
|
||||||
|
mr.*
|
||||||
|
FROM mapped_response_rows mr
|
||||||
|
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
|
||||||
|
WHERE
|
||||||
-- fairly generous constraint to account for long requests.
|
-- fairly generous constraint to account for long requests.
|
||||||
AND raw.created >= (NEW.created - '30 seconds'::interval)
|
raw_request.created >= (NEW.created - '30 seconds'::interval)
|
||||||
-- the response must be at or after we actually sent the request.
|
-- the response must be at or after we actually sent the request.
|
||||||
AND raw.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
|
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
|
||||||
AND NEW.status_code = 200
|
AND NEW.status_code = 200;
|
||||||
AND raw.response_json IS NULL;
|
|
||||||
|
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
|
||||||
|
|
||||||
-- what if we want to stream other data? we can do multiple updates in
|
-- what if we want to stream other data? we can do multiple updates in
|
||||||
-- different tables, where the request id is.
|
-- different tables, where the request id is.
|
||||||
|
|
Loading…
Reference in New Issue