From fc00abe030c12c24acbb4e12080a5060b7fd9f66 Mon Sep 17 00:00:00 2001 From: projectmoon Date: Tue, 17 Sep 2024 22:30:27 +0000 Subject: [PATCH] OSM: support way results for more accurate search. --- CHANGELOG.md | 7 +++ osm.py | 156 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 98 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 686bdad..83e598e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # OpenStreetMap Tool +**0.5.0:** + - Support Way results. This makes searching much more accurate and + useful. Many map features are marked as "ways" (shapes on the map) + rather than specific points. + - Drop support for "secondary results," and instead return Ways that + have enough information to be useful. + **0.4.0:** - Complete rewrite of search result handling to prevent incorrect OSM map links being generated, and bad info given. diff --git a/osm.py b/osm.py index e4ced1c..de90ead 100644 --- a/osm.py +++ b/osm.py @@ -2,7 +2,7 @@ title: OpenStreetMap Tool author: projectmoon author_url: https://git.agnos.is/projectmoon/open-webui-filters -version: 0.4.0 +version: 0.5.0 license: AGPL-3.0+ required_open_webui_version: 0.3.21 """ @@ -101,19 +101,35 @@ def way_has_info(way): """ return len(way['tags']) > 1 and any('name' in tag for tag in way['tags']) -def strip_nodes_from_way(way): +def process_way_result(way) -> Optional[dict]: + """ + Post-process an OSM Way dict to remove the geometry and node + info, and calculate a single GPS coordinate from its bounding + box. + """ if 'nodes' in way: del way['nodes'] - return way + + if 'geometry' in way: + del way['geometry'] + + if 'bounds' in way: + way_center = get_bounding_box_center(way['bounds']) + way['lat'] = way_center['lat'] + way['lon'] = way_center['lon'] + del way['bounds'] + return way + + return None def get_bounding_box_center(bbox): def convert(bbox, key): return bbox[key] if isinstance(bbox[key], float) else float(bbox[key]) - min_lat = convert(bbox, 'min_lat') - min_lon = convert(bbox, 'min_lon') - max_lat = convert(bbox, 'max_lat') - max_lon = convert(bbox, 'max_lon') + min_lat = convert(bbox, 'minlat') + min_lon = convert(bbox, 'minlon') + max_lat = convert(bbox, 'maxlat') + max_lon = convert(bbox, 'maxlon') return { 'lon': (min_lon + max_lon) / 2, @@ -147,38 +163,56 @@ def sort_by_closeness(origin, points): point['distance'] = distance return [point for point, distance in points_with_distance] -def get_or_unknown(tags: dict, *keys: str) -> str: +def get_or_none(tags: dict, *keys: str) -> Optional[str]: """ Try to extract a value from a dict by trying keys in order, or - return unknown if none of the keys were found. + return None if none of the keys were found. """ for key in keys: if key in tags: return tags[key] - return "unknown" + return None -def parse_thing_address(tags: dict) -> str: - #'addr:city': 'Haarlem', 'addr:housenumber': '8', 'addr:postcode': '2012DG', 'addr:street' - house_number = get_or_unknown(tags, "addr:housenumber", "addr:house_number") - street = get_or_unknown(tags, "addr:street") - city = get_or_unknown(tags, "addr:city") - state = get_or_unknown(tags, "addr:state", "addr:province") - postal_code = get_or_unknown(tags, - "addr:postcode", "addr:post_code", "addr:postal_code", - "addr:zipcode", "addr:zip_code") +def all_are_none(*args) -> bool: + for arg in args: + if arg is not None: + return False - # Bit of a hack to make sure spacing of state is correct, whether - # or not it's present. - state = f" {state} " if state != "unknown" else " " + return True - return f"{street} {house_number}, {city}{state}{postal_code}" +def parse_thing_address(tags: dict) -> Optional[str]: + house_number = get_or_none(tags, "addr:housenumber", "addr:house_number") + street = get_or_none(tags, "addr:street") + city = get_or_none(tags, "addr:city") + state = get_or_none(tags, "addr:state", "addr:province") + postal_code = get_or_none( + tags, + "addr:postcode", "addr:post_code", "addr:postal_code", + "addr:zipcode", "addr:zip_code" + ) + + # if all are none, that means we don't know the address at all. + if all_are_none(house_number, street, city, state, postal_code): + return None + + # Handle missing values to create complete-ish addresses, even if + # we have missing data. We will get either a partly complete + # address, or None if all the values are missing. + line1 = filter(None, [street, house_number]) + line2 = filter(None, [city, state, postal_code]) + line1 = " ".join(line1).strip() + line2 = " ".join(line2).strip() + full_address = filter(None, [line1, line2]) + full_address = ", ".join(full_address).strip() + return full_address if len(full_address) > 0 else None def parse_and_validate_thing(thing: dict) -> Optional[dict]: """ - Parse an OSM result and make it more friendly to work with. - Helps remove ambiguity of the LLM interpreting the raw JSON data. - If there is not enough data, discard the result. + Parse an OSM result (node or post-processed way) and make it + more friendly to work with. Helps remove ambiguity of the LLM + interpreting the raw JSON data. If there is not enough data, + discard the result. """ tags: dict = thing['tags'] if 'tags' in thing else {} @@ -209,7 +243,10 @@ def parse_and_validate_thing(thing: dict) -> Optional[dict]: def create_osm_link(lat, lon): return EXAMPLE_OSM_LINK.replace("", str(lat)).replace("", str(lon)) -def convert_and_validate_results(original_location: str, things_nearby: List[dict]) -> Optional[str]: +def convert_and_validate_results( + original_location: str, + things_nearby: List[dict] +) -> Optional[str]: """ Converts the things_nearby JSON into Markdown-ish results to (hopefully) improve model understanding of the results. Intended @@ -268,10 +305,9 @@ class OsmSearcher: """Let user valve for instruction mode override the global setting.""" print(str(self.user_valves)) if self.user_valves: - print(f"Using user valve setting: {self.user_valves.instruction_oriented_interpretation}") return self.user_valves.instruction_oriented_interpretation else: - self.valves.instruction_oriented_interpretation + return self.valves.instruction_oriented_interpretation def get_result_instructions(self, tag_type_str: str) -> str: if self.use_detailed_interpretation_mode(): @@ -332,6 +368,11 @@ class OsmSearcher: def overpass_search( self, place, tags, bbox, limit=5, radius=4000 ) -> (List[dict], List[dict]): + """ + Return a list relevant of OSM nodes and ways. Some + post-processing is done on ways in order to add coordinates to + them. + """ headers = self.create_headers() if not headers: raise ValueError("Headers not set") @@ -354,23 +395,25 @@ class OsmSearcher: if len(search) > 0: search += ";" + # "out geom;" is needed to get bounding box info of ways, + # so we can calculate the coordinates. query = f""" [out:json]; ( {search} ); - out qt; + out geom; """ print(query) data = { "data": query } response = requests.get(url, params=data, headers=headers) if response.status_code == 200: - # nodes are prioritized because they have exact GPS - # coordinates. we also include useful way entries (without - # node list) as secondary results, because there are often - # useful results that don't have a node (e.g. building or - # whole area marked for the tag type). + # nodes have have exact GPS coordinates. we also include + # useful way entries, post-processed to remove extra data + # and add a centered calculation of their GPS coords. any + # way that doesn't have enough info for us to use is + # dropped. results = response.json() results = results['elements'] if 'elements' in results else [] nodes = [] @@ -382,7 +425,7 @@ class OsmSearcher: if res['type'] == 'node': nodes.append(res) elif res['type'] == 'way' and way_has_info(res): - ways.append(strip_nodes_from_way(res)) + ways.append(process_way_result(res)) return nodes, ways else: @@ -400,10 +443,10 @@ class OsmSearcher: if nominatim_result: nominatim_result = nominatim_result[0] bbox = { - 'min_lat': nominatim_result['boundingbox'][0], - 'max_lat': nominatim_result['boundingbox'][1], - 'min_lon': nominatim_result['boundingbox'][2], - 'max_lon': nominatim_result['boundingbox'][3] + 'minlat': nominatim_result['boundingbox'][0], + 'maxlat': nominatim_result['boundingbox'][1], + 'minlon': nominatim_result['boundingbox'][2], + 'maxlon': nominatim_result['boundingbox'][3] } nodes, ways = self.overpass_search(place, tags, bbox, limit, radius) @@ -411,48 +454,31 @@ class OsmSearcher: # use results from overpass, but if they do not exist, # fall back to the nominatim result. this may or may # not be a good idea. - things_nearby = (nodes - if len(nodes) > 0 + things_nearby = (nodes + ways + if len(nodes) > 0 or len(ways) > 0 else OsmSearcher.fallback(nominatim_result)) origin = get_bounding_box_center(bbox) things_nearby = sort_by_closeness(origin, things_nearby) - things_nearby = things_nearby[:limit] - primary_results = convert_and_validate_results(place, things_nearby) - other_results = ways[:(limit+5)] + things_nearby = things_nearby[:limit] # drop down to requested limit + search_results = convert_and_validate_results(place, things_nearby) if not things_nearby or len(things_nearby) == 0: return NO_RESULTS tag_type_str = ", ".join(tags) - if len(other_results) > 0: - extra_info = ( - "\n\n----------\n\n" - f"Additionally, here are some other results that might be useful. " - "The exact distance from the requested location is not known. " - "The seconary results are below." - "\n\n----------\n\n" - f"{str(other_results)}") - else: - extra_info = "" - # Only print the full result instructions if we # actually have something. - if primary_results: + if search_results: result_instructions = self.get_result_instructions(tag_type_str) else: - if len(other_results) > 0: - result_instructions = ("No primary results found, but there are secondary results." - "These results have less details, but still have useful " - "information.") - else: - result_instructions = "No results found at all. Tell the user there are no results." + result_instructions = ("No results found at all. " + "Tell the user there are no results.") resp = ( f"{result_instructions}\n\n" - f"{primary_results}" - f"{extra_info}" + f"{search_results}" ) print(resp)