OSM: Integrate OpenRouteService

This commit is contained in:
projectmoon 2024-09-27 23:23:56 +02:00
parent b3fd1586c6
commit 496000fc69
3 changed files with 174 additions and 33 deletions

View File

@ -1,5 +1,15 @@
# OpenStreetMap Tool
**0.9.0:**
- Integrate OpenRouteService to allow calculation of more accurate
distances based on the distance of travel rather than "as the crow
flies" distance.
- If OpenRouteService is not enabled, the original method will be
used for calculating distance.
- ORS is not enabled by default.
- Properly extract amenity type of leisure locations (playgrounds,
parks, etc) in search results.
**0.8.0:**
- Added ability to find specific stores and businesses near
coordinates. Helps LLM answer questions about arbitrary businesses

View File

@ -229,12 +229,12 @@ the LLM's reply.
## OpenStreetMap Tool
_Recommended models: Llama 3.1, Mistral Nemo Instruct._
_Recommended models: Llama 3.1+, Mistral Nemo, Mistral Small, Qwen 2.5._
A tool that can find certain points of interest (POIs) nearby a
requested address or place.
There are currently six settings:
These are the current settings:
- **User Agent:** The custom user agent to set for OSM and Overpass
Turbo API requests.
- **From Header:** The email address for the From header for OSM and
@ -252,6 +252,11 @@ There are currently six settings:
- **Status Indicators:** If enabled, emit update events to the web
UI, showing what the tool is doing and what search results it has
found, or if it has encountered an error.
- **ORS API Key:** Provide an API key for Open Route Service to
calculate navigational routes to nearby places, to provide more
accurate search results.
- **ORS Instance:** By default, use the public Open Route Service
instance. Can be changed to point to another ORS instance.
The tool **will not run** without the User Agent and From headers set.
This is because the public instance of the Nominatim API will block

188
osm.py
View File

@ -2,13 +2,19 @@
title: OpenStreetMap Tool
author: projectmoon
author_url: https://git.agnos.is/projectmoon/open-webui-filters
version: 0.8.0
version: 0.9.0
license: AGPL-3.0+
required_open_webui_version: 0.3.21
requirements: openrouteservice
"""
import json
import math
import requests
import openrouteservice
from openrouteservice.directions import directions as ors_directions
from operator import itemgetter
from typing import List, Optional, Callable, Any
from pydantic import BaseModel, Field
@ -60,8 +66,11 @@ def detailed_instructions(tag_type_str: str) -> str:
"These are the results known to be closest to the requested location. "
"When telling the user about them, make sure to report "
"all the information (address, contact info, website, etc).\n\n"
"Tell the user about ALL the results, and give the CLOSEST result "
"first. The results are ordered by closeness. "
"Tell the user about ALL the results, and give closer results "
"first. Closer results are higher in the list. When telling the "
"user the distance, use the TRAVEL DISTANCE. Do not say one "
"distance is farther away than another. Just say what the "
"distances are. "
f"{OSM_LINK_INSTRUCTIONS}"
"Give map links friendly, contextual labels. Don't just print "
f"the naked link:\n"
@ -93,7 +102,8 @@ def simple_instructions(tag_type_str: str) -> str:
" - OpenStreetMap Link (make it a human readable link like 'View on OpenStreetMap')\n"
" - Contact information (address, phone, website, email, etc)\n\n"
"Tell the user about ALL the results, and give the CLOSEST result "
"first. The results are ordered by closeness. "
"first. The results are ordered by closeness as the crow flies. "
"When telling the user about distances, use the TRAVEL DISTANCE only. "
"Only use relevant results. If there are no relevant results, "
"say so. Do not make up answers or hallucinate. "
"Make sure that your results are in the actual location the user is talking about, "
@ -159,17 +169,12 @@ def haversine_distance(point1, point2):
return R * c
def sort_by_closeness(origin, points):
def sort_by_closeness(origin, points, *keys: str):
"""
Sorts a list of { lat, lon }-like dicts by closeness to an origin point.
The origin is a dict with keys of { lat, lon }. This function adds the
distance as a dict value to the points.
The origin is a dict with keys of { lat, lon }.
"""
points_with_distance = [(point, haversine_distance(origin, point)) for point in points]
points_with_distance = sorted(points_with_distance, key=lambda pwd: pwd[1])
for point, distance in points_with_distance:
point['distance'] = distance
return [point for point, distance in points_with_distance]
return sorted(points, key=itemgetter(*keys))
def get_or_none(tags: dict, *keys: str) -> Optional[str]:
"""
@ -273,6 +278,8 @@ def parse_thing_amenity_type(thing: dict, tags: dict) -> Optional[dict]:
# fall back to tag categories, like shop=*
if 'shop' in tags:
return friendly_shop_name(tags['shop'])
if 'leisure' in tags:
return friendly_shop_name(tags['leisure'])
return None
@ -299,14 +306,23 @@ def parse_and_validate_thing(thing: dict) -> Optional[dict]:
else "unknown")
address: string = parse_thing_address(thing)
distance: Optional[float] = thing['distance'] if 'distance' in thing else None
distance: Optional[float] = thing.get('distance', None)
nav_distance: Optional[float] = thing.get('nav_distance', None)
lat: Optional[float] = thing['lat'] if 'lat' in thing else None
lon: Optional[float] = thing['lon'] if 'lon' in thing else None
lat: Optional[float] = thing.get('lat', None)
lon: Optional[float] = thing.get('lon', None)
amenity_type: Optional[str] = parse_thing_amenity_type(thing, tags)
friendly_thing['name'] = name if name else "unknown"
# use the navigation distance if it's present. but if not, set to
# the haversine distance so that we at least get coherent results
# for LLM.
friendly_thing['distance'] = "{:.3f}".format(distance) if distance else "unknown"
if nav_distance:
friendly_thing['nav_distance'] = "{:.3f}".format(nav_distance) + " km"
else:
friendly_thing['nav_distance'] = f"a bit more than {friendly_thing['distance']}km"
friendly_thing['name'] = name if name else "unknown"
friendly_thing['address'] = address if address else "unknown"
friendly_thing['lat'] = lat if lat else "unknown"
friendly_thing['lon'] = lon if lon else "unknown"
@ -338,8 +354,10 @@ def convert_and_validate_results(
if not friendly_thing:
continue
distance = (f" - Distance from Origin: {friendly_thing['distance']} km\n"
distance = (f" - Haversine Distance from Origin: {friendly_thing['distance']} km\n"
if use_distance else "")
travel_distance = (f" - Travel Distance from Origin: {friendly_thing['nav_distance']}\n"
if use_distance and 'nav_distance' in friendly_thing else "")
map_link = create_osm_link(friendly_thing['lat'], friendly_thing['lon'])
entry = (f"## {friendly_thing['name']}\n"
f" - Latitude: {friendly_thing['lat']}\n"
@ -347,6 +365,7 @@ def convert_and_validate_results(
f" - Address: {friendly_thing['address']}\n"
f" - Amenity Type: {friendly_thing['amenity_type']}\n"
f"{distance}"
f"{travel_distance}"
f" - OpenStreetMap link: {map_link}\n\n"
f"Raw JSON data:\n"
"```json\n"
@ -365,11 +384,64 @@ def convert_and_validate_results(
return f"{header}\n\n{result_text}"
class OrsRouter:
def __init__(
self, valves, user_valves: Optional[dict], event_emitter=None
):
self.valves = valves
self.event_emitter = event_emitter
self.user_valves = user_valves
if self.valves.ors_api_key is not None and self.valves.ors_api_key != "":
if self.valves.ors_instance is not None:
self._client = openrouteservice.Client(
base_url=self.valves.ors_instance,
key=self.valves.ors_api_key
)
else:
self._client = openrouteservice.Client(key=self.valves.ors_api_key)
else:
self._client = None
def calculate_distance(
self, from_thing: dict, to_thing: dict
) -> Optional[float]:
"""
Calculate navigation distance between A and B. Returns the
distance calculated, if successful, or None if the distance
could not be calculated, or if ORS is not configured.
"""
if not self._client:
return None
# select profile based on distance for more accurate
# measurements. very close haversine distances use the walking
# profile, which should (usually?) essentially cover walking
# and biking. further away = use car.
if to_thing.get('distance', 9000) <= 1.5:
profile = "foot-walking"
else:
profile = "driving-car"
coords = ((from_thing['lon'], from_thing['lat']),
(to_thing['lon'], to_thing['lat']))
resp = ors_directions(self._client, coords, profile=profile,
preference="fastest", units="km")
routes = resp.get('routes', [])
if len(routes) > 0:
return routes[0].get('summary', {}).get('distance', None)
else:
return None
class OsmSearcher:
def __init__(self, valves, user_valves: Optional[dict], event_emitter=None):
self.valves = valves
self.event_emitter = event_emitter
self.user_valves = user_valves
self._ors = OrsRouter(valves, user_valves, event_emitter)
def create_headers(self) -> Optional[dict]:
if len(self.valves.user_agent) == 0 or len(self.valves.from_header) == 0:
@ -435,9 +507,31 @@ class OsmSearcher:
},
})
def calculate_navigation_distance(self, start, destination) -> float:
"""Calculate real distance from A to B, instead of Haversine."""
return self._ors.calculate_distance(start, destination)
def attempt_ors(self, origin, things_nearby) -> bool:
"""Update distances to use ORS navigable distances, if ORS enabled."""
used_ors = False
for thing in things_nearby:
print(f"Checking ORS for {thing}")
nav_distance = self.calculate_navigation_distance(origin, thing)
if nav_distance:
used_ors = True
thing['nav_distance'] = nav_distance
return used_ors
def calculate_haversine(self, origin, things_nearby):
for thing in things_nearby:
if 'distance' not in thing:
thing['distance'] = haversine_distance(origin, thing)
def use_detailed_interpretation_mode(self) -> bool:
"""Let user valve for instruction mode override the global setting."""
# Let user valve for instruction mode override the global
# setting.
print(str(self.user_valves))
if self.user_valves:
return self.user_valves.instruction_oriented_interpretation
@ -571,6 +665,27 @@ class OsmSearcher:
print(response.text)
raise Exception(f"Error calling Overpass API: {response.text}")
async def get_things_nearby(self, nominatim_result, place, tags, bbox, limit, radius):
nodes, ways = self.overpass_search(place, tags, bbox, limit, radius)
# use results from overpass, but if they do not exist,
# fall back to the nominatim result. this may or may
# not be a good idea.
things_nearby = (nodes + ways
if len(nodes) > 0 or len(ways) > 0
else OsmSearcher.fallback(nominatim_result))
# in order to not spam ORS, we first sort by haversine
# distance and drop number of results to the limit. then, if
# enabled, we calculate ORS distances. then we sort again.
origin = get_bounding_box_center(bbox)
self.calculate_haversine(origin, things_nearby)
things_nearby = sort_by_closeness(origin, things_nearby, 'distance')
things_nearby = things_nearby[:limit] # drop down to requested limit
if self.attempt_ors(origin, things_nearby):
things_nearby = sort_by_closeness(origin, things_nearby, 'nav_distance', 'distance')
return things_nearby
async def search_nearby(
self, place: str, tags: List[str], limit: int=5, radius: int=4000,
@ -602,19 +717,8 @@ class OsmSearcher:
'maxlon': nominatim_result['boundingbox'][3]
}
nodes, ways = self.overpass_search(place, tags, bbox, limit, radius)
# use results from overpass, but if they do not exist,
# fall back to the nominatim result. this may or may
# not be a good idea.
things_nearby = (nodes + ways
if len(nodes) > 0 or len(ways) > 0
else OsmSearcher.fallback(nominatim_result))
origin = get_bounding_box_center(bbox)
things_nearby = sort_by_closeness(origin, things_nearby)
things_nearby = things_nearby[:limit] # drop down to requested limit
search_results = convert_and_validate_results(place, things_nearby)
things_nearby = await self.get_things_nearby(nominatim_result, place, tags,
bbox, limit, radius)
if not things_nearby or len(things_nearby) == 0:
await self.event_search_complete(category, place, 0)
@ -624,6 +728,7 @@ class OsmSearcher:
# Only print the full result instructions if we
# actually have something.
search_results = convert_and_validate_results(place, things_nearby)
if search_results:
result_instructions = self.get_result_instructions(tag_type_str)
else:
@ -678,6 +783,15 @@ class Tools:
description=("Give detailed result interpretation instructions to the model. "
"Switch this off if results are inconsistent, wrong, or missing.")
)
ors_api_key: Optional[str] = Field(
default=None,
description=("Provide an Open Route Service API key to calculate "
"more accurate distances (leave default to disable).")
)
ors_instance: Optional[str] = Field(
default=None,
description="Use a custom ORS instance (leave default to use public ORS instance)."
)
status_indicators: bool = Field(
default=True,
description=("Emit status update events to the web UI.")
@ -828,6 +942,18 @@ class Tools:
return await do_osm_search(valves=self.valves, user_valves=user_valves, category="swimming",
radius=10000, place=place, tags=tags, event_emitter=__event_emitter__)
async def find_playground_near_place(self, __user__: dict, place: str, __event_emitter__) -> str:
"""
Finds playgrounds on OpenStreetMap near a given place, address, or coordinates.
The location of the address or place is reverse geo-coded, then nearby results are fetched
from OpenStreetMap.
:param place: The name of a place, an address, or GPS coordinates. City and country must be specified, if known.
:return: A list of recreational places, if found.
"""
user_valves = __user__["valves"] if "valves" in __user__ else None
tags = ["leisure=playground"]
return await do_osm_search(valves=self.valves, user_valves=user_valves, category="playgrounds",
place=place, tags=tags, event_emitter=__event_emitter__)
async def find_recreation_near_place(self, __user__: dict, place: str, __event_emitter__) -> str:
"""