From 2e8ba67086ebcd599d0e3aabc5e7f0f76e9fa5b7 Mon Sep 17 00:00:00 2001 From: projectmoon <projectmoon@agnos.is> Date: Fri, 13 Dec 2024 17:07:46 +0100 Subject: [PATCH] Gemini: emit events --- CHANGELOG.md | 3 + gemini.py | 252 +++++++++++++++++++++++++++++++++++---------------- 2 files changed, 175 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12dd2c3..16ee645 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -218,6 +218,9 @@ # Gemini Tool +**0.2.0:** + - Emit events and citations. + **0.1.2:** - Check MIME type of response (and only handle it if it's gemtext). diff --git a/gemini.py b/gemini.py index d3c69d0..b6c4dd2 100644 --- a/gemini.py +++ b/gemini.py @@ -2,7 +2,7 @@ title: Gemini Protocol Tool author: projectmoon author_url: https://git.agnos.is/projectmoon/open-webui-filters -version: 0.1.2 +version: 0.2.0 license: AGPL-3.0+ required_open_webui_version: 0.4.3 requirements: ignition-gemini @@ -13,6 +13,24 @@ from ignition import RedirectResponse, SuccessResponse from pydantic import BaseModel, Field from typing import Optional +# From https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python +def remove_emojis(text): + pattern = re.compile(pattern = "[" + u"\U0001F600-\U0001F64F" # emoticons + u"\U0001F300-\U0001F5FF" # symbols & pictographs + u"\U0001F680-\U0001F6FF" # transport & map symbols + u"\U0001F1E0-\U0001F1FF" # flags (iOS) + "]+", flags = re.UNICODE) + return pattern.sub(r'',text) + +def extract_title(content: str, fallback: str="") -> Optional[str]: + lines = [line.strip() for line in content.splitlines()] + title = next(filter(lambda line: line.startswith("#"), lines), None) + if title: + return remove_emojis(title.lstrip('#')) + else: + return fallback + def result_instructions(url: str, redirect: bool=False) -> str: content_instructions = ( "Report the content to the user and answer their question." @@ -44,94 +62,162 @@ def instructions(url: str, redirect: bool=False) -> str: "The Gemtext content is below in the code block." ) -def correct_url(url: str) -> str: - if url.startswith("gemini://http://"): - match = re.match(r'gemini://http://(.+)', url) - if match: - return f"gemini://{match.group(1)}" + + +class GeminiFetcher: + def __init__(self, gemini_url: str, correct_urls: bool=False, event_emitter=None): + self.original_url = gemini_url + self.current_url = None + self.correct_urls = correct_urls + self.event_emitter = event_emitter + + async def fetch_event(self, done: bool): + if not self.event_emitter: + return + + description = (f"Fetched: {self.original_url}" if done + else f"Fetching: {self.original_url}") + + await self.event_emitter({ + "type": "status", + "data": { + "status": "complete" if done else "in_progress", + "description": description, + "done": done, + }, + }) + + async def error_event(self, message: str="There was an error"): + if not self.event_emitter: + return + + await self.event_emitter({ + "type": "status", + "data": { + "status": "error", + "description": message, + "done": True, + }, + }) + + async def create_citation(self, title: str, content: str): + if not self.event_emitter: + return + + document = content + + await self.event_emitter({ + "type": "source", + "data": { + "document": [document], + "metadata": [{"source": title, "html": False }], + "source": {"name": title, "url": self.original_url}, + } + }) + + async def complete_event(self, content: str): + title = extract_title(content, self.original_url) + await self.fetch_event(done=True) + await self.create_citation(title, content) + + def correct_url(self, url: str) -> str: + if url.startswith("gemini://http://"): + match = re.match(r'gemini://http://(.+)', url) + if match: + return f"gemini://{match.group(1)}" + return url + + if url.startswith("gemini://https://"): + match = re.match(r'gemini://https://(.+)', url) + if match: + return f"gemini://{match.group(1)}" + return url + + if url.startswith("https://"): + match = re.match(r'https://(.+)', url) + if match: + return f"gemini://{match.group(1)}" + return url + + if url.startswith("http://"): + match = re.match(r'http://(.+)', url) + if match: + return f"gemini://{match.group(1)}" + return url + + if not url.startswith("gemini://"): + return f"gemini://{url}" + return url - if url.startswith("gemini://https://"): - match = re.match(r'gemini://https://(.+)', url) - if match: - return f"gemini://{match.group(1)}" - return url - if url.startswith("https://"): - match = re.match(r'https://(.+)', url) - if match: - return f"gemini://{match.group(1)}" - return url + async def fetch(self, prev_url: Optional[str]=None, redirects: int=0) -> dict: + await self.fetch_event(done=False) + gemini_url = (self.current_url + if self.current_url is not None + else self.original_url) - if url.startswith("http://"): - match = re.match(r'http://(.+)', url) - if match: - return f"gemini://{match.group(1)}" - return url - - if not url.startswith("gemini://"): - return f"gemini://{url}" - - return url - - -def fetch(gemini_url: str, correct_urls: bool=False, prev_url: Optional[str]=None, redirects: int=0) -> dict: - if redirects > 5: - return { - "success": False, - "content": f"Too many redirects (ended at {gemini_url})", - "redirected": prev_url is not None - } - - if correct_urls and not prev_url: - corrected_url = correct_url(gemini_url) - if corrected_url != gemini_url: - print(f"[Gemini] URL '{gemini_url}' corrected to '{corrected_url}'") - gemini_url = corrected_url - - if not prev_url: - print(f"[Gemini] Fetching: {gemini_url}") - else: - print(f"[Gemini] Fetching: {gemini_url} (redirected from {prev_url})") - - try: - response = ignition.request(gemini_url, raise_errors=True, referer=prev_url) - - if isinstance(response, SuccessResponse): + if redirects > 5: return { - "success": True, - "content": handle_content(response), + "success": False, + "content": f"Too many redirects (ended at {gemini_url})", "redirected": prev_url is not None } - elif isinstance(response, RedirectResponse): - redirect_url = response.data() - return fetch(redirect_url, correct_urls, gemini_url, redirects + 1) + + if self.correct_urls and not prev_url: + corrected_url = self.correct_url(gemini_url) + if corrected_url != gemini_url: + print(f"[Gemini] URL '{gemini_url}' corrected to '{corrected_url}'") + gemini_url = corrected_url + + if not prev_url: + print(f"[Gemini] Fetching: {gemini_url}") else: - print(f"[Gemini] Unhandled {response.status} code for '{gemini_url}'") - message = (f"Tell the user there was a {response.status} status code. " - f"Support for handling {response.status} is not implemented yet.") - return { "success": False, "content": message, "redirected": prev_url is not None } - except Exception as e: - print(f"[Gemini] error: {e}") - message = f"Tell the user there was an error fetching the page: {e}" - return { - "success": False, - "content": message, - "redirected": prev_url is not None - } + print(f"[Gemini] Fetching: {gemini_url} (redirected from {prev_url})") -def handle_content(resp: SuccessResponse) -> str: - try: - mime_type, encoding = resp.meta.split(";") - except ValueError: - mime_type = resp.meta + try: + response = ignition.request(gemini_url, raise_errors=True, referer=prev_url) - mime_type = mime_type.strip() + if isinstance(response, SuccessResponse): + content = self.handle_content(response) + await self.complete_event(content) + return { + "success": True, + "content": content, + "redirected": prev_url is not None + } + elif isinstance(response, RedirectResponse): + prev_url = self.current_url + self.current_url = response.data() + return await self.fetch(prev_url, redirects + 1) + else: + await self.error_event(f"Unsupported status code {response.status}") + print(f"[Gemini] Unsupported {response.status} code for '{gemini_url}'") + message = (f"Tell the user there was a {response.status} status code. " + f"Support for handling {response.status} is not implemented yet.") + return { "success": False, "content": message, "redirected": prev_url is not None } + except Exception as e: + print(f"[Gemini] error: {e}") + message = f"Tell the user there was an error fetching the page: {e}" + await self.error_event(str(e)) + return { + "success": False, + "content": message, + "redirected": prev_url is not None + } - if mime_type == "text/gemini": - return resp.data().strip() - else: - raise ValueError(f"Not yet able to handle MIME type {mime_type}") + def handle_content(self, resp: SuccessResponse) -> str: + try: + mime_type, encoding = resp.meta.split(";") + except ValueError: + mime_type = resp.meta + + mime_type = mime_type.strip() + + if mime_type == "text/gemini": + return resp.data().strip() + else: + raise ValueError(f"Not yet able to handle MIME type {mime_type}") class Tools: class Valves(BaseModel): @@ -144,14 +230,20 @@ class Tools: self.valves = self.Valves() self.user_valves = None - def get_gemini_page(self, gemini_url: str, __event_emitter__) -> str: + async def get_gemini_page(self, gemini_url: str, __event_emitter__) -> str: """ Fetches Gemini capsules, content, and web pages over Gemini Protocol. Use this if the user requests a gemini:// URL. :param gemini_url: The URL to fetch. The URL MUST begin with gemini://. :return: The fetched data as Markdown. """ - resp = fetch(gemini_url, correct_urls=self.valves.attempt_url_correction) + fetcher = GeminiFetcher( + gemini_url=gemini_url, + correct_urls=self.valves.attempt_url_correction, + event_emitter=__event_emitter__ + ) + + resp = await fetcher.fetch() if resp["success"]: result_instructions = instructions(gemini_url, redirect=resp["redirected"]) return f"{result_instructions}\n\n```\n{resp['content']}\n```"