Streaming potential arrivals

This commit is contained in:
projectmoon 2024-04-10 17:00:22 +02:00
parent 3d8d461dfd
commit f2bf0bd150
6 changed files with 367 additions and 132 deletions

View File

@ -4,7 +4,8 @@ How to track how late buses are:
- [x] Download bus route data
- [x] Insert raw bus positions into timescaledb/postGIS table?
- [x] "Fan-out" JSON responses into one DB row per datapoint.
- [ ] Download routes.
- [ ] Download routes/stops.
- There is an endpoint called Stops.
- There is an endpoint called Timetable, which takes a route number
(along with day of week etc).
- The timetable response includes a list of stops in the route,
@ -38,3 +39,13 @@ Arrival computation:
- These are arrival times for this bus ID, at each stop.
- Insert into arrivals.
- After all arrivals computed, drop all raw positions for this bus from DB.
Useful stuff:
```
SELECT buses_at_stops.* FROM (SELECT DISTINCT ON (s.id, h.measured_at) s.id, s.stop_name, ST_AsText(s.coords) as stop_coords, h.measured_at, h.bus_id, ST_AsText(h.coords) as bus_coords, ST_Distance(s.coords, h.coords) as distance
FROM bus_stops s
JOIN raw_bus_positions h ON ST_DWithin(s.coords, h.coords, 30)) AS buses_at_stops
WHERE buses_at_stops.bus_id like '1%'
ORDER BY buses_at_stops.measured_at desc;
```

View File

@ -1,7 +1,8 @@
{
"version": "3",
"redirects": {
"https://deno.land/x/base64/base64url.ts": "https://deno.land/x/base64@v0.2.1/base64url.ts"
"https://deno.land/x/base64/base64url.ts": "https://deno.land/x/base64@v0.2.1/base64url.ts",
"https://esm.sh/@supabase/supabase-js@2": "https://esm.sh/@supabase/supabase-js@2.42.0"
},
"remote": {
"https://deno.land/std@0.177.0/async/abortable.ts": "73acfb3ed7261ce0d930dbe89e43db8d34e017b063cf0eaa7d215477bf53442e",
@ -150,6 +151,18 @@
"https://deno.land/x/upstash_redis@v1.19.3/pkg/types.ts": "f9363a02964f5c2af8f4f054cba2cfde222af8eb9e908c37ca508a6399ac9e29",
"https://deno.land/x/upstash_redis@v1.19.3/pkg/util.ts": "bda4f5eb90ff82d2443ec8908a376079a44af328ee64390d2e5ee7687a171556",
"https://deno.land/x/upstash_redis@v1.19.3/version.ts": "fb553d493437bc431a81483e2940d14fc1a31e476128ef89e1e77db317ea4baf",
"https://denopkg.com/chiefbiiko/std-encoding@v1.0.0/mod.ts": "4a927e5cd1d9b080d72881eb285b3b94edb6dadc1828aeb194117645f4481ac0"
"https://denopkg.com/chiefbiiko/std-encoding@v1.0.0/mod.ts": "4a927e5cd1d9b080d72881eb285b3b94edb6dadc1828aeb194117645f4481ac0",
"https://esm.sh/@supabase/supabase-js@2.42.0": "4828e9cc4af2357c70f1943a243f3235031432ed41f758c30655be969512575e",
"https://esm.sh/v135/@supabase/auth-js@2.63.0/denonext/auth-js.mjs": "626ae8dd2177c4437d6ba38f9aa1f97bd8e19c59386bd7edda465cc71d34e3f1",
"https://esm.sh/v135/@supabase/functions-js@2.2.2/denonext/functions-js.mjs": "2115a8e67eb5058e0633fcea1bc304d8b1a7a8534ac194e18caf67f3a112a5e4",
"https://esm.sh/v135/@supabase/node-fetch@2.6.15/denonext/node-fetch.mjs": "efad00ea3d4cbe1bee688836ef75339d49a1981bc5728a13e9ad5f26791d5efb",
"https://esm.sh/v135/@supabase/postgrest-js@1.15.0/denonext/postgrest-js.mjs": "824ad68b13c6232f3e4520eae5a2ca08c937d43c62485df69dc6520062dbd0de",
"https://esm.sh/v135/@supabase/realtime-js@2.9.3/denonext/realtime-js.mjs": "22f025f3a7f744aad39f538fdac296095eb8ec33974c24dc7168571493c707ec",
"https://esm.sh/v135/@supabase/storage-js@2.5.5/denonext/storage-js.mjs": "66e29e4e55c7d396503e2d1d9376cfbc34f046b962bce4524c2d80b209fff413",
"https://esm.sh/v135/@supabase/supabase-js@2.42.0/denonext/supabase-js.mjs": "0269571ba1b3e42fc36456b8c74fe24b0dd2f29e68e8e76e019704f9506fdd0c",
"https://esm.sh/v135/bufferutil@4.0.8/denonext/bufferutil.mjs": "60a4618cbd1a5cb24935c55590b793d4ecb33862357d32e1d4614a0bbb90947f",
"https://esm.sh/v135/node-gyp-build@4.6.1/denonext/node-gyp-build.mjs": "5d28b312f145a6cb2ec0dbdd80a7d34c0e0e6b5dcada65411d8bcff6c8991cc6",
"https://esm.sh/v135/utf-8-validate@6.0.3/denonext/utf-8-validate.mjs": "410c48d66840e987e474a4849cd25829817415cedd25466280effb1287d05aa5",
"https://esm.sh/v135/ws@8.16.0/denonext/ws.mjs": "0fa0c00b69577ba36d0a36001329b2cec91498cb2e33e329fc76aa6d51a0d54d"
}
}

View File

@ -0,0 +1,145 @@
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
interface StraetoVariables {
[key: string]: number | string;
}
interface StraetoQuery {
operationName: string;
variables: StraetoVariables;
extensions: StraetoQueryExtensions;
}
interface StraetoQueryExtensions {
persistedQuery: StraetoPersistedQuery;
}
interface StraetoPersistedQuery {
version: number;
sha256Hash: string;
}
interface StraetoBusStop {
id: number;
name: string;
lat: number;
lon: number;
type: number;
rotation: number;
code: string | null;
isTerminal: boolean;
routes: Array<string>;
alerts: Array<string>;
__typename: string;
}
interface DbBusStop {
id: number;
stop_name: string;
coords: string; // TODO better geometry type representation
type: number;
rotation: number;
code: string | null;
is_terminal: boolean;
routes: Array<string>;
}
function toDbBusStop(straetoStop: StraetoBusStop): DbBusStop {
return {
id: straetoStop.id,
code: straetoStop.code,
stop_name: straetoStop.name,
coords: `POINT(${straetoStop.lat} ${straetoStop.lon})`,
type: straetoStop.type,
rotation: straetoStop.rotation,
is_terminal: straetoStop.isTerminal,
routes: straetoStop.routes,
};
}
function toQuerystring(obj: StraetoQuery) {
return Object.keys(obj).map(function (variableName) {
const variableValue = obj[variableName as keyof typeof obj];
if (typeof variableValue == "object") {
return encodeURIComponent(variableName) + "=" +
encodeURIComponent(JSON.stringify(variableValue));
} else {
return encodeURIComponent(variableName) + "=" +
encodeURIComponent(variableValue);
}
}).join("&");
}
const graphqlParams: StraetoQuery = {
operationName: "Stops",
variables: {},
extensions: {
persistedQuery: {
version: 1,
sha256Hash:
"6303de055cc42db47a4e1f1cc941b8c86d11c147903bed231e6d4bddcf0e1312",
},
},
};
async function getStops(): Promise<Array<StraetoBusStop>> {
const apiUrl = `https://straeto.is/graphql?${toQuerystring(graphqlParams)}`;
const opts = {
method: "GET",
headers: {
"Content-Type": "application/json",
},
};
const resp = await fetch(apiUrl, opts);
const body = await resp.json();
return body.data.GtfsStops.results;
}
Deno.serve(async (req) => {
const busStops = await getStops();
const authHeader = req.headers.get("Authorization")!;
const supabaseClient = createClient(
Deno.env.get("SUPABASE_URL") ?? "",
Deno.env.get("SUPABASE_ANON_KEY") ?? "",
{ global: { headers: { Authorization: authHeader } } },
);
console.log("inserting", busStops.length, "rows");
const toInsert = busStops.map(toDbBusStop);
const insert = await supabaseClient.from("bus_stops")
.upsert(toInsert, { ignoreDuplicates: true });
console.log(insert.statusText, insert.error?.code, insert.error?.message);
console.log("rows inserted", insert.count);
// const { name } = await req.json()
// const data = {
// message: `Hello ${name}!`,
// }
return new Response(
JSON.stringify({
"status": insert.statusText,
"inserted": toInsert.length,
"error": insert.error,
}),
{ headers: { "Content-Type": "application/json" } },
);
});
/* To invoke locally:
1. Run `supabase start` (see: https://supabase.com/docs/reference/cli/supabase-start)
2. Make an HTTP request:
curl -i --location --request POST 'http://127.0.0.1:54321/functions/v1/download-bus-stops' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6ImFub24iLCJleHAiOjE5ODM4MTI5OTZ9.CRXP1A7WOeoJeXxjNni43kdQwgnWNReilDMblYTn_I0' \
--header 'Content-Type: application/json' \
--data '{"name":"Functions"}'
*/

View File

@ -5,6 +5,7 @@ CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
CREATE EXTENSION IF NOT EXISTS postgis;
-- Raw bus positions time series table.
CREATE TABLE IF NOT EXISTS raw_bus_positions (
id BIGINT GENERATED ALWAYS AS IDENTITY,
measured_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
@ -15,12 +16,15 @@ CREATE TABLE IF NOT EXISTS raw_bus_positions (
headsign text,
tag text,
direction int,
coords geometry(Point, 4326)
coords geography(Point, 4326)
);
-- TimescaleDB Hypertable creation
COMMENT ON TABLE raw_bus_positions IS 'Recordings of bus positions. Mostly transient data.';
COMMENT ON COLUMN raw_bus_positions.coords IS 'GPS coords of bus. PostGIS Point.';
SELECT create_hypertable('raw_bus_positions', 'measured_at', chunk_time_interval => INTERVAL '10 minutes');
CREATE INDEX bus_position_measures ON raw_bus_positions (bus_id, measured_at DESC);
CREATE INDEX raw_bus_positions_spatial ON raw_bus_positions USING GIST (coords);
SELECT
cron.schedule(
@ -30,3 +34,192 @@ SELECT
SELECT drop_chunks('raw_bus_positions', INTERVAL '24 hours');
$$
);
-- Record of all bus stops.
create table bus_stops(
id BIGINT PRIMARY KEY,
stop_name TEXT,
coords geography(Point, 4326),
"type" INT,
rotation INT,
code TEXT,
is_terminal BOOL,
routes TEXT[]
);
CREATE INDEX bus_stops_spatial ON bus_stops USING GIST (coords);
-- Bus Position Related Functions
-- encode straeto API params into the query string by the graphql API.
-- yes, javascript in the database: the lazy way.
CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text
LANGUAGE plv8 STRICT IMMUTABLE AS $$
return Object.keys(obj).map(function(variableName) {
const variableValue = obj[variableName];
if (typeof variableValue == 'object') {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
} else {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
}
}).join('&');
$$;
CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb
LANGUAGE plpgsql AS $$
DECLARE query_params jsonb;
BEGIN
-- This is the unencoded GraphQL request payload.
query_params := '{
"operationName": "BusLocationByRoute",
"variables": { "trips":[], "routes": [] },
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
}
}
}'::jsonb;
-- Add requested bus routes to the `routes` GraphQL variable.
SELECT INTO query_params
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
RETURN query_params;
END
$$;
-- Builds the full request URL to download bus positions of the given
-- route numbers.
CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$
SELECT concat(
'https://straeto.is/graphql?',
encode_straeto_querystring(create_straeto_parameters(bus_routes))
);
$$;
-- Called by a scheduled job to request data on a timer. A trigger
-- handles insertion of the response after record initially inserted.
CREATE TABLE IF NOT EXISTS raw_bus_position_requests(
request_id int,
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION gather_bus_data()
RETURNS void LANGUAGE sql
as $$
INSERT INTO raw_bus_position_requests (request_id) SELECT
net.http_get(
url := build_api_url(array['1', '2', '3']),
headers := '{"Content-Type": "application/json"}'::jsonb
);
$$;
-- Configuration of streaming bus positions into Postgres
-- downloads data async into the http response table.
SELECT
cron.schedule(
'download-bus-data',
'5 seconds',
$$
select gather_bus_data();
$$
);
-- copies data into a more permanent tabe/more useful format. note: in
-- supabase, you cannot delete triggers on net._http_response
-- directly. but a cascading delete of the trigger function also
-- removes the trigger itself.
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
-- NOTE: the http_response sequence resets on DB restart, so there
-- is potential for old responses to have duplicated ids. we use a
-- constraint to only update newer entries (within the last 30
-- seconds). this should make accidentally overwriting older data
-- with newer values difficult. The API also provides a
-- lastUpdated value. we require that raw.created is at or after
-- that value to be updated.
WITH mapped_response_rows as (
select x.*
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
"busId" text,
"tripId" text,
"routeNr" text,
headsign text,
tag text,
direction int,
lat decimal,
lng decimal
)
)
INSERT INTO raw_bus_positions
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords)
SELECT
NEW.status_code,
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction,
ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates
FROM mapped_response_rows mr
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
WHERE
-- fairly generous constraint to account for long requests.
raw_request.created >= (NEW.created - '30 seconds'::interval)
-- the response must be at or after we actually sent the request.
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
AND NEW.status_code = 200;
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
-- what if we want to stream other data? we can do multiple updates in
-- different tables, where the request id is.
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER copy_http_response
AFTER INSERT ON net._http_response
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION copy_to_raw_table();
-- Create a trigger on insert into raw_bus_positions. It runs a
-- spatial query against bus stops and finds out if this measurement
-- is within X meters of a bus stop. If so, queue up to sanity check
-- pipeline. Otherwise, delete from table.
CREATE TABLE potential_arrivals(
id BIGINT GENERATED ALWAYS AS IDENTITY,
raw_pos_id BIGINT NOT NULL,
bus_id text NOT NULL,
bus_coords GEOGRAPHY(Point, 4326) NOT NULL,
measured_at TIMESTAMP WITH TIME ZONE NOT NULL,
stop_id bigint NOT NULL,
distance decimal NOT NULL
);
CREATE FUNCTION check_near_stop() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO potential_arrivals (raw_pos_id, bus_id, bus_coords, measured_at, stop_id, distance)
SELECT DISTINCT ON (pos.id)
pos.id,
pos.bus_id,
pos.coords,
pos.measured_at,
stops.id,
ST_Distance(pos.coords, stops.coords) as distance
FROM raw_bus_positions pos
join bus_stops stops on ST_DWithin(pos.coords, stops.coords, 30)
WHERE pos.id = NEW.id;
-- Remove any raw positions that are not potentially a bus
-- stopping at a bus stop.
DELETE FROM raw_bus_positions pos
WHERE pos.id = NEW.id and pos.id not in (select raw_pos_id from potential_arrivals);
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER add_potential_stop
AFTER INSERT ON raw_bus_positions
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION check_near_stop();

View File

@ -1,62 +0,0 @@
-- encode straeto API params into the query string by the graphql API.
-- yes, javascript in the database: the lazy way.
CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text
LANGUAGE plv8 STRICT IMMUTABLE AS $$
return Object.keys(obj).map(function(variableName) {
const variableValue = obj[variableName];
if (typeof variableValue == 'object') {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
} else {
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
}
}).join('&');
$$;
CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb
LANGUAGE plpgsql AS $$
DECLARE query_params jsonb;
BEGIN
-- This is the unencoded GraphQL request payload.
query_params := '{
"operationName": "BusLocationByRoute",
"variables": { "trips":[], "routes": [] },
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
}
}
}'::jsonb;
-- Add requested bus routes to the `routes` GraphQL variable.
SELECT INTO query_params
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
RETURN query_params;
END
$$;
-- Builds the full request URL to download bus positions of the given
-- route numbers.
CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$
SELECT concat(
'https://straeto.is/graphql?',
encode_straeto_querystring(create_straeto_parameters(bus_routes))
);
$$;
-- Called by a scheduled job to request data on a timer. A trigger
-- handles insertion of the response after record initially inserted.
CREATE TABLE IF NOT EXISTS raw_bus_position_requests(
request_id int,
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION gather_bus_data()
RETURNS void LANGUAGE sql
as $$
INSERT INTO raw_bus_position_requests (request_id) SELECT
net.http_get(
url := build_api_url(array['1', '2', '3']),
headers := '{"Content-Type": "application/json"}'::jsonb
);
$$;

View File

@ -1,65 +0,0 @@
-- downloads data async into the http response table.
SELECT
cron.schedule(
'download-bus-data',
'5 seconds',
$$
select gather_bus_data();
$$
);
-- copies data into a more permanent tabe/more useful format. note: in
-- supabase, you cannot delete triggers on net._http_response
-- directly. but a cascading delete of the trigger function also
-- removes the trigger itself.
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
-- NOTE: the http_response sequence resets on DB restart, so there
-- is potential for old responses to have duplicated ids. we use a
-- constraint to only update newer entries (within the last 30
-- seconds). this should make accidentally overwriting older data
-- with newer values difficult. The API also provides a
-- lastUpdated value. we require that raw.created is at or after
-- that value to be updated.
WITH mapped_response_rows as (
select x.*
from jsonb_to_recordset(NEW.content::jsonb->'data'->'BusLocationByRoute'->'results') x (
"busId" text,
"tripId" text,
"routeNr" text,
headsign text,
tag text,
direction int,
lat decimal,
lng decimal
)
)
INSERT INTO raw_bus_positions
(response_status, measured_at, bus_id, trip_id, route_number, headsign, tag, direction, coords)
SELECT
NEW.status_code,
(NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz,
mr."busId", mr."tripId", mr."routeNr", mr.headsign, mr.tag, mr.direction,
ST_SetSRID(ST_MakePoint(mr.lat, mr.lng), 4326) -- PostGIS coordinates
FROM mapped_response_rows mr
JOIN raw_bus_position_requests raw_request ON raw_request.request_id = NEW.id
WHERE
-- fairly generous constraint to account for long requests.
raw_request.created >= (NEW.created - '30 seconds'::interval)
-- the response must be at or after we actually sent the request.
AND raw_request.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
AND NEW.status_code = 200;
DELETE FROM raw_bus_position_requests where request_id = NEW.id;
-- what if we want to stream other data? we can do multiple updates in
-- different tables, where the request id is.
RETURN NULL; -- this is an AFTER trigger
END;
$$;
CREATE CONSTRAINT TRIGGER copy_http_response
AFTER INSERT ON net._http_response
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
EXECUTE FUNCTION copy_to_raw_table();