OSM: support way results for more accurate search.

This commit is contained in:
projectmoon 2024-09-17 22:30:27 +00:00
parent b832c3c387
commit fc00abe030
2 changed files with 98 additions and 65 deletions

View File

@ -1,5 +1,12 @@
# OpenStreetMap Tool
**0.5.0:**
- Support Way results. This makes searching much more accurate and
useful. Many map features are marked as "ways" (shapes on the map)
rather than specific points.
- Drop support for "secondary results," and instead return Ways that
have enough information to be useful.
**0.4.0:**
- Complete rewrite of search result handling to prevent incorrect OSM
map links being generated, and bad info given.

156
osm.py
View File

@ -2,7 +2,7 @@
title: OpenStreetMap Tool
author: projectmoon
author_url: https://git.agnos.is/projectmoon/open-webui-filters
version: 0.4.0
version: 0.5.0
license: AGPL-3.0+
required_open_webui_version: 0.3.21
"""
@ -101,19 +101,35 @@ def way_has_info(way):
"""
return len(way['tags']) > 1 and any('name' in tag for tag in way['tags'])
def strip_nodes_from_way(way):
def process_way_result(way) -> Optional[dict]:
"""
Post-process an OSM Way dict to remove the geometry and node
info, and calculate a single GPS coordinate from its bounding
box.
"""
if 'nodes' in way:
del way['nodes']
return way
if 'geometry' in way:
del way['geometry']
if 'bounds' in way:
way_center = get_bounding_box_center(way['bounds'])
way['lat'] = way_center['lat']
way['lon'] = way_center['lon']
del way['bounds']
return way
return None
def get_bounding_box_center(bbox):
def convert(bbox, key):
return bbox[key] if isinstance(bbox[key], float) else float(bbox[key])
min_lat = convert(bbox, 'min_lat')
min_lon = convert(bbox, 'min_lon')
max_lat = convert(bbox, 'max_lat')
max_lon = convert(bbox, 'max_lon')
min_lat = convert(bbox, 'minlat')
min_lon = convert(bbox, 'minlon')
max_lat = convert(bbox, 'maxlat')
max_lon = convert(bbox, 'maxlon')
return {
'lon': (min_lon + max_lon) / 2,
@ -147,38 +163,56 @@ def sort_by_closeness(origin, points):
point['distance'] = distance
return [point for point, distance in points_with_distance]
def get_or_unknown(tags: dict, *keys: str) -> str:
def get_or_none(tags: dict, *keys: str) -> Optional[str]:
"""
Try to extract a value from a dict by trying keys in order, or
return unknown if none of the keys were found.
return None if none of the keys were found.
"""
for key in keys:
if key in tags:
return tags[key]
return "unknown"
return None
def parse_thing_address(tags: dict) -> str:
#'addr:city': 'Haarlem', 'addr:housenumber': '8', 'addr:postcode': '2012DG', 'addr:street'
house_number = get_or_unknown(tags, "addr:housenumber", "addr:house_number")
street = get_or_unknown(tags, "addr:street")
city = get_or_unknown(tags, "addr:city")
state = get_or_unknown(tags, "addr:state", "addr:province")
postal_code = get_or_unknown(tags,
"addr:postcode", "addr:post_code", "addr:postal_code",
"addr:zipcode", "addr:zip_code")
def all_are_none(*args) -> bool:
for arg in args:
if arg is not None:
return False
# Bit of a hack to make sure spacing of state is correct, whether
# or not it's present.
state = f" {state} " if state != "unknown" else " "
return True
return f"{street} {house_number}, {city}{state}{postal_code}"
def parse_thing_address(tags: dict) -> Optional[str]:
house_number = get_or_none(tags, "addr:housenumber", "addr:house_number")
street = get_or_none(tags, "addr:street")
city = get_or_none(tags, "addr:city")
state = get_or_none(tags, "addr:state", "addr:province")
postal_code = get_or_none(
tags,
"addr:postcode", "addr:post_code", "addr:postal_code",
"addr:zipcode", "addr:zip_code"
)
# if all are none, that means we don't know the address at all.
if all_are_none(house_number, street, city, state, postal_code):
return None
# Handle missing values to create complete-ish addresses, even if
# we have missing data. We will get either a partly complete
# address, or None if all the values are missing.
line1 = filter(None, [street, house_number])
line2 = filter(None, [city, state, postal_code])
line1 = " ".join(line1).strip()
line2 = " ".join(line2).strip()
full_address = filter(None, [line1, line2])
full_address = ", ".join(full_address).strip()
return full_address if len(full_address) > 0 else None
def parse_and_validate_thing(thing: dict) -> Optional[dict]:
"""
Parse an OSM result and make it more friendly to work with.
Helps remove ambiguity of the LLM interpreting the raw JSON data.
If there is not enough data, discard the result.
Parse an OSM result (node or post-processed way) and make it
more friendly to work with. Helps remove ambiguity of the LLM
interpreting the raw JSON data. If there is not enough data,
discard the result.
"""
tags: dict = thing['tags'] if 'tags' in thing else {}
@ -209,7 +243,10 @@ def parse_and_validate_thing(thing: dict) -> Optional[dict]:
def create_osm_link(lat, lon):
return EXAMPLE_OSM_LINK.replace("<lat>", str(lat)).replace("<lon>", str(lon))
def convert_and_validate_results(original_location: str, things_nearby: List[dict]) -> Optional[str]:
def convert_and_validate_results(
original_location: str,
things_nearby: List[dict]
) -> Optional[str]:
"""
Converts the things_nearby JSON into Markdown-ish results to
(hopefully) improve model understanding of the results. Intended
@ -268,10 +305,9 @@ class OsmSearcher:
"""Let user valve for instruction mode override the global setting."""
print(str(self.user_valves))
if self.user_valves:
print(f"Using user valve setting: {self.user_valves.instruction_oriented_interpretation}")
return self.user_valves.instruction_oriented_interpretation
else:
self.valves.instruction_oriented_interpretation
return self.valves.instruction_oriented_interpretation
def get_result_instructions(self, tag_type_str: str) -> str:
if self.use_detailed_interpretation_mode():
@ -332,6 +368,11 @@ class OsmSearcher:
def overpass_search(
self, place, tags, bbox, limit=5, radius=4000
) -> (List[dict], List[dict]):
"""
Return a list relevant of OSM nodes and ways. Some
post-processing is done on ways in order to add coordinates to
them.
"""
headers = self.create_headers()
if not headers:
raise ValueError("Headers not set")
@ -354,23 +395,25 @@ class OsmSearcher:
if len(search) > 0:
search += ";"
# "out geom;" is needed to get bounding box info of ways,
# so we can calculate the coordinates.
query = f"""
[out:json];
(
{search}
);
out qt;
out geom;
"""
print(query)
data = { "data": query }
response = requests.get(url, params=data, headers=headers)
if response.status_code == 200:
# nodes are prioritized because they have exact GPS
# coordinates. we also include useful way entries (without
# node list) as secondary results, because there are often
# useful results that don't have a node (e.g. building or
# whole area marked for the tag type).
# nodes have have exact GPS coordinates. we also include
# useful way entries, post-processed to remove extra data
# and add a centered calculation of their GPS coords. any
# way that doesn't have enough info for us to use is
# dropped.
results = response.json()
results = results['elements'] if 'elements' in results else []
nodes = []
@ -382,7 +425,7 @@ class OsmSearcher:
if res['type'] == 'node':
nodes.append(res)
elif res['type'] == 'way' and way_has_info(res):
ways.append(strip_nodes_from_way(res))
ways.append(process_way_result(res))
return nodes, ways
else:
@ -400,10 +443,10 @@ class OsmSearcher:
if nominatim_result:
nominatim_result = nominatim_result[0]
bbox = {
'min_lat': nominatim_result['boundingbox'][0],
'max_lat': nominatim_result['boundingbox'][1],
'min_lon': nominatim_result['boundingbox'][2],
'max_lon': nominatim_result['boundingbox'][3]
'minlat': nominatim_result['boundingbox'][0],
'maxlat': nominatim_result['boundingbox'][1],
'minlon': nominatim_result['boundingbox'][2],
'maxlon': nominatim_result['boundingbox'][3]
}
nodes, ways = self.overpass_search(place, tags, bbox, limit, radius)
@ -411,48 +454,31 @@ class OsmSearcher:
# use results from overpass, but if they do not exist,
# fall back to the nominatim result. this may or may
# not be a good idea.
things_nearby = (nodes
if len(nodes) > 0
things_nearby = (nodes + ways
if len(nodes) > 0 or len(ways) > 0
else OsmSearcher.fallback(nominatim_result))
origin = get_bounding_box_center(bbox)
things_nearby = sort_by_closeness(origin, things_nearby)
things_nearby = things_nearby[:limit]
primary_results = convert_and_validate_results(place, things_nearby)
other_results = ways[:(limit+5)]
things_nearby = things_nearby[:limit] # drop down to requested limit
search_results = convert_and_validate_results(place, things_nearby)
if not things_nearby or len(things_nearby) == 0:
return NO_RESULTS
tag_type_str = ", ".join(tags)
if len(other_results) > 0:
extra_info = (
"\n\n----------\n\n"
f"Additionally, here are some other results that might be useful. "
"The exact distance from the requested location is not known. "
"The seconary results are below."
"\n\n----------\n\n"
f"{str(other_results)}")
else:
extra_info = ""
# Only print the full result instructions if we
# actually have something.
if primary_results:
if search_results:
result_instructions = self.get_result_instructions(tag_type_str)
else:
if len(other_results) > 0:
result_instructions = ("No primary results found, but there are secondary results."
"These results have less details, but still have useful "
"information.")
else:
result_instructions = "No results found at all. Tell the user there are no results."
result_instructions = ("No results found at all. "
"Tell the user there are no results.")
resp = (
f"{result_instructions}\n\n"
f"{primary_results}"
f"{extra_info}"
f"{search_results}"
)
print(resp)