Make migrations pretty.
This commit is contained in:
parent
082feeb847
commit
4e73f47f07
|
@ -1,67 +0,0 @@
|
|||
-- extensions
|
||||
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
|
||||
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
|
||||
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
|
||||
|
||||
-- request ID is inserted by first cron job,
|
||||
-- then second cron job updates from net._http_response table.
|
||||
create table if not exists raw_bus_positions (
|
||||
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
||||
request_id INT not null,
|
||||
created TIMESTAMP WITH TIME ZONE not null default now(),
|
||||
response_status INT null,
|
||||
response_json JSONB null
|
||||
);
|
||||
|
||||
-- used to help craft the API URL. Requires plv8 extension.
|
||||
-- because we are lazy. just use javascript in the database. why not?
|
||||
create or replace function to_straeto_querystring(jsonb) returns text
|
||||
language plv8 strict immutable as $$
|
||||
const obj = $1;
|
||||
return Object.keys(obj).map(function(variableName) {
|
||||
const variableValue = obj[variableName];
|
||||
if (typeof variableValue == 'object') {
|
||||
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
|
||||
} else {
|
||||
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
|
||||
}
|
||||
}).join('&');
|
||||
$$;
|
||||
|
||||
create or replace function create_straeto_parameters(bus_routes text[]) returns jsonb
|
||||
language plpgsql as $$
|
||||
declare query_params jsonb;
|
||||
begin
|
||||
query_params := '{
|
||||
"operationName": "BusLocationByRoute",
|
||||
"variables": { "trips":[], "routes": [] },
|
||||
"extensions": {
|
||||
"persistedQuery": {
|
||||
"version": 1,
|
||||
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
|
||||
}
|
||||
}
|
||||
}'::jsonb;
|
||||
|
||||
select into query_params
|
||||
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
|
||||
return query_params;
|
||||
end
|
||||
$$;
|
||||
|
||||
create or replace function create_api_url(bus_routes text[]) returns text language sql as $$
|
||||
select concat(
|
||||
'https://straeto.is/graphql?',
|
||||
to_straeto_querystring(create_straeto_parameters(bus_routes))
|
||||
);
|
||||
$$;
|
||||
|
||||
create or replace function gather_bus_data()
|
||||
returns void language sql
|
||||
as $$
|
||||
insert into raw_bus_positions (request_id) select
|
||||
net.http_get(
|
||||
url := create_api_url(array['1', '2', '3']),
|
||||
headers := '{"Content-Type": "application/json"}'::jsonb
|
||||
);
|
||||
$$;
|
|
@ -0,0 +1,14 @@
|
|||
-- extensions
|
||||
CREATE EXTENSION IF NOT EXISTS "plv8" SCHEMA pg_catalog;
|
||||
CREATE EXTENSION IF NOT EXISTS "pg_cron" SCHEMA pg_catalog;
|
||||
CREATE EXTENSION IF NOT EXISTS "pg_net" SCHEMA extensions;
|
||||
|
||||
-- request ID is inserted by first cron job, then a trigger handles
|
||||
-- insertion of response.
|
||||
CREATE TABLE IF NOT EXISTS raw_bus_positions (
|
||||
id BIGINT GENERATED ALWAYS AS IDENTITY,
|
||||
request_id INT NOT NULL,
|
||||
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
response_status INT NULL,
|
||||
response_json JSONB NULL
|
||||
);
|
|
@ -0,0 +1,57 @@
|
|||
-- encode straeto API params into the query string by the graphql API.
|
||||
-- yes, javascript in the database: the lazy way.
|
||||
CREATE OR REPLACE FUNCTION encode_straeto_querystring(obj jsonb) RETURNS text
|
||||
LANGUAGE plv8 STRICT IMMUTABLE AS $$
|
||||
return Object.keys(obj).map(function(variableName) {
|
||||
const variableValue = obj[variableName];
|
||||
if (typeof variableValue == 'object') {
|
||||
return encodeURIComponent(variableName) + '=' + encodeURIComponent(JSON.stringify(variableValue));
|
||||
} else {
|
||||
return encodeURIComponent(variableName) + '=' + encodeURIComponent(variableValue);
|
||||
}
|
||||
}).join('&');
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION create_straeto_parameters(bus_routes text[]) RETURNS jsonb
|
||||
LANGUAGE plpgsql AS $$
|
||||
DECLARE query_params jsonb;
|
||||
BEGIN
|
||||
-- This is the unencoded GraphQL request payload.
|
||||
query_params := '{
|
||||
"operationName": "BusLocationByRoute",
|
||||
"variables": { "trips":[], "routes": [] },
|
||||
"extensions": {
|
||||
"persistedQuery": {
|
||||
"version": 1,
|
||||
"sha256Hash": "8f9ee84171961f8a3b9a9d1a7b2a7ac49e7e122e1ba1727e75cfe3a94ff3edb8"
|
||||
}
|
||||
}
|
||||
}'::jsonb;
|
||||
|
||||
-- Add requested bus routes to the `routes` GraphQL variable.
|
||||
SELECT INTO query_params
|
||||
jsonb_set(query_params, '{variables, routes}', array_to_json(bus_routes)::jsonb);
|
||||
RETURN query_params;
|
||||
END
|
||||
$$;
|
||||
|
||||
-- Builds the full request URL to download bus positions of the given
|
||||
-- route numbers.
|
||||
CREATE OR REPLACE FUNCTION build_api_url(bus_routes text[]) RETURNS text LANGUAGE sql AS $$
|
||||
SELECT concat(
|
||||
'https://straeto.is/graphql?',
|
||||
encode_straeto_querystring(create_straeto_parameters(bus_routes))
|
||||
);
|
||||
$$;
|
||||
|
||||
-- Called by a scheduled job to request data on a timer. A trigger
|
||||
-- handles insertion of the response after record initially inserted.
|
||||
CREATE OR REPLACE FUNCTION gather_bus_data()
|
||||
RETURNS void LANGUAGE sql
|
||||
as $$
|
||||
INSERT INTO raw_bus_positions (request_id) SELECT
|
||||
net.http_get(
|
||||
url := build_api_url(array['1', '2', '3']),
|
||||
headers := '{"Content-Type": "application/json"}'::jsonb
|
||||
);
|
||||
$$;
|
|
@ -1,5 +1,5 @@
|
|||
-- downloads data async into the http response table.
|
||||
select
|
||||
SELECT
|
||||
cron.schedule(
|
||||
'download-bus-data',
|
||||
'5 seconds',
|
||||
|
@ -8,9 +8,9 @@ select
|
|||
$$
|
||||
);
|
||||
|
||||
-- copies data into a more permanent tabe/more useful format. note:
|
||||
-- supabase has an issue where you cannot delete these triggers
|
||||
-- directly, but a cascading delete of the trigger function also
|
||||
-- copies data into a more permanent tabe/more useful format. note: in
|
||||
-- supabase, you cannot delete triggers on net._http_response
|
||||
-- directly. but a cascading delete of the trigger function also
|
||||
-- removes the trigger itself.
|
||||
CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
|
||||
BEGIN
|
||||
|
@ -21,14 +21,19 @@ CREATE FUNCTION copy_to_raw_table() RETURNS trigger LANGUAGE plpgsql AS $$
|
|||
-- with newer values difficult. The API also provides a
|
||||
-- lastUpdated value. we require that raw.created is at or after
|
||||
-- that value to be updated.
|
||||
update raw_bus_positions raw
|
||||
set response_json = NEW.content::jsonb,
|
||||
UPDATE raw_bus_positions raw
|
||||
SET response_json = NEW.content::jsonb->'data'->'BusLocationByRoute',
|
||||
response_status = NEW.status_code
|
||||
where raw.request_id = NEW.id
|
||||
WHERE raw.request_id = NEW.id
|
||||
-- fairly generous constraint to account for long requests.
|
||||
and raw.created >= (NEW.created - '30 seconds'::interval)
|
||||
and raw.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
|
||||
and raw.response_json is null;
|
||||
AND raw.created >= (NEW.created - '30 seconds'::interval)
|
||||
-- the response must be at or after we actually sent the request.
|
||||
AND raw.created >= (NEW.content::jsonb->'data'->'BusLocationByRoute'->>'lastUpdate')::timestamptz
|
||||
AND NEW.status_code = 200
|
||||
AND raw.response_json IS NULL;
|
||||
|
||||
-- what if we want to stream other data? we can do multiple updates in
|
||||
-- different tables, where the request id is.
|
||||
RETURN NULL; -- this is an AFTER trigger
|
||||
END;
|
||||
$$;
|
Loading…
Reference in New Issue