feat(messageHandler): add new emitter event handling

This commit is contained in:
ItzCrazyKns 2024-04-17 11:27:05 +05:30
parent 89340fcfff
commit 90edd98613
No known key found for this signature in database
GPG Key ID: 8162927C7CCE3065
2 changed files with 51 additions and 187 deletions

View File

@ -9,7 +9,7 @@ import {
RunnableMap, RunnableMap,
RunnableLambda, RunnableLambda,
} from '@langchain/core/runnables'; } from '@langchain/core/runnables';
import { ChatOpenAI, OpenAI, OpenAIEmbeddings } from '@langchain/openai'; import { ChatOpenAI, OpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers'; import { StringOutputParser } from '@langchain/core/output_parsers';
import { Document } from '@langchain/core/documents'; import { Document } from '@langchain/core/documents';
import { searchSearxng } from '../core/searxng'; import { searchSearxng } from '../core/searxng';

View File

@ -1,4 +1,4 @@
import { WebSocket } from 'ws'; import { EventEmitter, WebSocket } from 'ws';
import { BaseMessage, AIMessage, HumanMessage } from '@langchain/core/messages'; import { BaseMessage, AIMessage, HumanMessage } from '@langchain/core/messages';
import handleWebSearch from '../agents/webSearchAgent'; import handleWebSearch from '../agents/webSearchAgent';
import handleAcademicSearch from '../agents/academicSearchAgent'; import handleAcademicSearch from '../agents/academicSearchAgent';
@ -15,6 +15,49 @@ type Message = {
history: Array<[string, string]>; history: Array<[string, string]>;
}; };
const searchHandlers = {
webSearch: handleWebSearch,
academicSearch: handleAcademicSearch,
writingAssistant: handleWritingAssistant,
wolframAlphaSearch: handleWolframAlphaSearch,
youtubeSearch: handleYoutubeSearch,
redditSearch: handleRedditSearch,
};
const handleEmitterEvents = (
emitter: EventEmitter,
ws: WebSocket,
id: string,
) => {
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
};
export const handleMessage = async (message: string, ws: WebSocket) => { export const handleMessage = async (message: string, ws: WebSocket) => {
try { try {
const parsedMessage = JSON.parse(message) as Message; const parsedMessage = JSON.parse(message) as Message;
@ -38,191 +81,12 @@ export const handleMessage = async (message: string, ws: WebSocket) => {
}); });
if (parsedMessage.type === 'message') { if (parsedMessage.type === 'message') {
switch (parsedMessage.focusMode) { const handler = searchHandlers[parsedMessage.focusMode];
case 'webSearch': { if (handler) {
const emitter = handleWebSearch(parsedMessage.content, history); const emitter = handler(parsedMessage.content, history);
emitter.on('data', (data) => { handleEmitterEvents(emitter, ws, id);
const parsedData = JSON.parse(data); } else {
if (parsedData.type === 'response') { ws.send(JSON.stringify({ type: 'error', data: 'Invalid focus mode' }));
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
case 'academicSearch': {
const emitter = handleAcademicSearch(parsedMessage.content, history);
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
case 'writingAssistant': {
const emitter = handleWritingAssistant(
parsedMessage.content,
history,
);
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
case 'wolframAlphaSearch': {
const emitter = handleWolframAlphaSearch(
parsedMessage.content,
history,
);
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
case 'youtubeSearch': {
const emitter = handleYoutubeSearch(parsedMessage.content, history);
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
case 'redditSearch': {
const emitter = handleRedditSearch(parsedMessage.content, history);
emitter.on('data', (data) => {
const parsedData = JSON.parse(data);
if (parsedData.type === 'response') {
ws.send(
JSON.stringify({
type: 'message',
data: parsedData.data,
messageId: id,
}),
);
} else if (parsedData.type === 'sources') {
ws.send(
JSON.stringify({
type: 'sources',
data: parsedData.data,
messageId: id,
}),
);
}
});
emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id }));
});
emitter.on('error', (data) => {
const parsedData = JSON.parse(data);
ws.send(JSON.stringify({ type: 'error', data: parsedData.data }));
});
break;
}
} }
} }
} catch (error) { } catch (error) {