Gemini: emit events

This commit is contained in:
projectmoon 2024-12-13 17:07:46 +01:00
parent 8c7ec60633
commit 2e8ba67086
2 changed files with 175 additions and 80 deletions

View File

@ -218,6 +218,9 @@
# Gemini Tool
**0.2.0:**
- Emit events and citations.
**0.1.2:**
- Check MIME type of response (and only handle it if it's gemtext).

252
gemini.py
View File

@ -2,7 +2,7 @@
title: Gemini Protocol Tool
author: projectmoon
author_url: https://git.agnos.is/projectmoon/open-webui-filters
version: 0.1.2
version: 0.2.0
license: AGPL-3.0+
required_open_webui_version: 0.4.3
requirements: ignition-gemini
@ -13,6 +13,24 @@ from ignition import RedirectResponse, SuccessResponse
from pydantic import BaseModel, Field
from typing import Optional
# From https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python
def remove_emojis(text):
pattern = re.compile(pattern = "["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags = re.UNICODE)
return pattern.sub(r'',text)
def extract_title(content: str, fallback: str="") -> Optional[str]:
lines = [line.strip() for line in content.splitlines()]
title = next(filter(lambda line: line.startswith("#"), lines), None)
if title:
return remove_emojis(title.lstrip('#'))
else:
return fallback
def result_instructions(url: str, redirect: bool=False) -> str:
content_instructions = (
"Report the content to the user and answer their question."
@ -44,94 +62,162 @@ def instructions(url: str, redirect: bool=False) -> str:
"The Gemtext content is below in the code block."
)
def correct_url(url: str) -> str:
if url.startswith("gemini://http://"):
match = re.match(r'gemini://http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
class GeminiFetcher:
def __init__(self, gemini_url: str, correct_urls: bool=False, event_emitter=None):
self.original_url = gemini_url
self.current_url = None
self.correct_urls = correct_urls
self.event_emitter = event_emitter
async def fetch_event(self, done: bool):
if not self.event_emitter:
return
description = (f"Fetched: {self.original_url}" if done
else f"Fetching: {self.original_url}")
await self.event_emitter({
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"description": description,
"done": done,
},
})
async def error_event(self, message: str="There was an error"):
if not self.event_emitter:
return
await self.event_emitter({
"type": "status",
"data": {
"status": "error",
"description": message,
"done": True,
},
})
async def create_citation(self, title: str, content: str):
if not self.event_emitter:
return
document = content
await self.event_emitter({
"type": "source",
"data": {
"document": [document],
"metadata": [{"source": title, "html": False }],
"source": {"name": title, "url": self.original_url},
}
})
async def complete_event(self, content: str):
title = extract_title(content, self.original_url)
await self.fetch_event(done=True)
await self.create_citation(title, content)
def correct_url(self, url: str) -> str:
if url.startswith("gemini://http://"):
match = re.match(r'gemini://http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("gemini://https://"):
match = re.match(r'gemini://https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("https://"):
match = re.match(r'https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("http://"):
match = re.match(r'http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if not url.startswith("gemini://"):
return f"gemini://{url}"
return url
if url.startswith("gemini://https://"):
match = re.match(r'gemini://https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if url.startswith("https://"):
match = re.match(r'https://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
async def fetch(self, prev_url: Optional[str]=None, redirects: int=0) -> dict:
await self.fetch_event(done=False)
gemini_url = (self.current_url
if self.current_url is not None
else self.original_url)
if url.startswith("http://"):
match = re.match(r'http://(.+)', url)
if match:
return f"gemini://{match.group(1)}"
return url
if not url.startswith("gemini://"):
return f"gemini://{url}"
return url
def fetch(gemini_url: str, correct_urls: bool=False, prev_url: Optional[str]=None, redirects: int=0) -> dict:
if redirects > 5:
return {
"success": False,
"content": f"Too many redirects (ended at {gemini_url})",
"redirected": prev_url is not None
}
if correct_urls and not prev_url:
corrected_url = correct_url(gemini_url)
if corrected_url != gemini_url:
print(f"[Gemini] URL '{gemini_url}' corrected to '{corrected_url}'")
gemini_url = corrected_url
if not prev_url:
print(f"[Gemini] Fetching: {gemini_url}")
else:
print(f"[Gemini] Fetching: {gemini_url} (redirected from {prev_url})")
try:
response = ignition.request(gemini_url, raise_errors=True, referer=prev_url)
if isinstance(response, SuccessResponse):
if redirects > 5:
return {
"success": True,
"content": handle_content(response),
"success": False,
"content": f"Too many redirects (ended at {gemini_url})",
"redirected": prev_url is not None
}
elif isinstance(response, RedirectResponse):
redirect_url = response.data()
return fetch(redirect_url, correct_urls, gemini_url, redirects + 1)
if self.correct_urls and not prev_url:
corrected_url = self.correct_url(gemini_url)
if corrected_url != gemini_url:
print(f"[Gemini] URL '{gemini_url}' corrected to '{corrected_url}'")
gemini_url = corrected_url
if not prev_url:
print(f"[Gemini] Fetching: {gemini_url}")
else:
print(f"[Gemini] Unhandled {response.status} code for '{gemini_url}'")
message = (f"Tell the user there was a {response.status} status code. "
f"Support for handling {response.status} is not implemented yet.")
return { "success": False, "content": message, "redirected": prev_url is not None }
except Exception as e:
print(f"[Gemini] error: {e}")
message = f"Tell the user there was an error fetching the page: {e}"
return {
"success": False,
"content": message,
"redirected": prev_url is not None
}
print(f"[Gemini] Fetching: {gemini_url} (redirected from {prev_url})")
def handle_content(resp: SuccessResponse) -> str:
try:
mime_type, encoding = resp.meta.split(";")
except ValueError:
mime_type = resp.meta
try:
response = ignition.request(gemini_url, raise_errors=True, referer=prev_url)
mime_type = mime_type.strip()
if isinstance(response, SuccessResponse):
content = self.handle_content(response)
await self.complete_event(content)
return {
"success": True,
"content": content,
"redirected": prev_url is not None
}
elif isinstance(response, RedirectResponse):
prev_url = self.current_url
self.current_url = response.data()
return await self.fetch(prev_url, redirects + 1)
else:
await self.error_event(f"Unsupported status code {response.status}")
print(f"[Gemini] Unsupported {response.status} code for '{gemini_url}'")
message = (f"Tell the user there was a {response.status} status code. "
f"Support for handling {response.status} is not implemented yet.")
return { "success": False, "content": message, "redirected": prev_url is not None }
except Exception as e:
print(f"[Gemini] error: {e}")
message = f"Tell the user there was an error fetching the page: {e}"
await self.error_event(str(e))
return {
"success": False,
"content": message,
"redirected": prev_url is not None
}
if mime_type == "text/gemini":
return resp.data().strip()
else:
raise ValueError(f"Not yet able to handle MIME type {mime_type}")
def handle_content(self, resp: SuccessResponse) -> str:
try:
mime_type, encoding = resp.meta.split(";")
except ValueError:
mime_type = resp.meta
mime_type = mime_type.strip()
if mime_type == "text/gemini":
return resp.data().strip()
else:
raise ValueError(f"Not yet able to handle MIME type {mime_type}")
class Tools:
class Valves(BaseModel):
@ -144,14 +230,20 @@ class Tools:
self.valves = self.Valves()
self.user_valves = None
def get_gemini_page(self, gemini_url: str, __event_emitter__) -> str:
async def get_gemini_page(self, gemini_url: str, __event_emitter__) -> str:
"""
Fetches Gemini capsules, content, and web pages over Gemini Protocol.
Use this if the user requests a gemini:// URL.
:param gemini_url: The URL to fetch. The URL MUST begin with gemini://.
:return: The fetched data as Markdown.
"""
resp = fetch(gemini_url, correct_urls=self.valves.attempt_url_correction)
fetcher = GeminiFetcher(
gemini_url=gemini_url,
correct_urls=self.valves.attempt_url_correction,
event_emitter=__event_emitter__
)
resp = await fetcher.fetch()
if resp["success"]:
result_instructions = instructions(gemini_url, redirect=resp["redirected"])
return f"{result_instructions}\n\n```\n{resp['content']}\n```"