feat(msg-handler): update message types

This commit is contained in:
ItzCrazyKns 2024-06-29 11:09:31 +05:30
parent d806c7e581
commit 61044715e9
No known key found for this signature in database
GPG Key ID: 8162927C7CCE3065
1 changed files with 73 additions and 11 deletions

View File

@ -9,11 +9,21 @@ import handleRedditSearch from '../agents/redditSearchAgent';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { Embeddings } from '@langchain/core/embeddings'; import type { Embeddings } from '@langchain/core/embeddings';
import logger from '../utils/logger'; import logger from '../utils/logger';
import db from '../db';
import { chats, messages } from '../db/schema';
import { eq } from 'drizzle-orm';
import crypto from 'crypto';
type Message = { type Message = {
type: string; messageId: string;
chatId: string;
content: string; content: string;
};
type WSMessage = {
message: Message;
copilot: boolean; copilot: boolean;
type: string;
focusMode: string; focusMode: string;
history: Array<[string, string]>; history: Array<[string, string]>;
}; };
@ -30,8 +40,12 @@ const searchHandlers = {
const handleEmitterEvents = ( const handleEmitterEvents = (
emitter: EventEmitter, emitter: EventEmitter,
ws: WebSocket, ws: WebSocket,
id: string, messageId: string,
chatId: string,
) => { ) => {
let recievedMessage = '';
let sources = [];
emitter.on('data', (data) => { emitter.on('data', (data) => {
const parsedData = JSON.parse(data); const parsedData = JSON.parse(data);
if (parsedData.type === 'response') { if (parsedData.type === 'response') {
@ -39,21 +53,36 @@ const handleEmitterEvents = (
JSON.stringify({ JSON.stringify({
type: 'message', type: 'message',
data: parsedData.data, data: parsedData.data,
messageId: id, messageId: messageId,
}), }),
); );
recievedMessage += parsedData.data;
} else if (parsedData.type === 'sources') { } else if (parsedData.type === 'sources') {
ws.send( ws.send(
JSON.stringify({ JSON.stringify({
type: 'sources', type: 'sources',
data: parsedData.data, data: parsedData.data,
messageId: id, messageId: messageId,
}), }),
); );
sources = parsedData.data;
} }
}); });
emitter.on('end', () => { emitter.on('end', () => {
ws.send(JSON.stringify({ type: 'messageEnd', messageId: id })); ws.send(JSON.stringify({ type: 'messageEnd', messageId: messageId }));
db.insert(messages)
.values({
content: recievedMessage,
chatId: chatId,
messageId: messageId,
role: 'assistant',
metadata: JSON.stringify({
createdAt: new Date(),
...(sources && sources.length > 0 && { sources }),
}),
})
.execute();
}); });
emitter.on('error', (data) => { emitter.on('error', (data) => {
const parsedData = JSON.parse(data); const parsedData = JSON.parse(data);
@ -74,8 +103,10 @@ export const handleMessage = async (
embeddings: Embeddings, embeddings: Embeddings,
) => { ) => {
try { try {
const parsedMessage = JSON.parse(message) as Message; const parsedWSMessage = JSON.parse(message) as WSMessage;
const id = Math.random().toString(36).substring(7); const parsedMessage = parsedWSMessage.message;
const id = crypto.randomBytes(7).toString('hex');
if (!parsedMessage.content) if (!parsedMessage.content)
return ws.send( return ws.send(
@ -86,7 +117,7 @@ export const handleMessage = async (
}), }),
); );
const history: BaseMessage[] = parsedMessage.history.map((msg) => { const history: BaseMessage[] = parsedWSMessage.history.map((msg) => {
if (msg[0] === 'human') { if (msg[0] === 'human') {
return new HumanMessage({ return new HumanMessage({
content: msg[1], content: msg[1],
@ -98,8 +129,9 @@ export const handleMessage = async (
} }
}); });
if (parsedMessage.type === 'message') { if (parsedWSMessage.type === 'message') {
const handler = searchHandlers[parsedMessage.focusMode]; const handler = searchHandlers[parsedWSMessage.focusMode];
if (handler) { if (handler) {
const emitter = handler( const emitter = handler(
parsedMessage.content, parsedMessage.content,
@ -107,7 +139,37 @@ export const handleMessage = async (
llm, llm,
embeddings, embeddings,
); );
handleEmitterEvents(emitter, ws, id);
handleEmitterEvents(emitter, ws, id, parsedMessage.chatId);
const chat = await db.query.chats.findFirst({
where: eq(chats.id, parsedMessage.chatId),
});
if (!chat) {
await db
.insert(chats)
.values({
id: parsedMessage.chatId,
title: parsedMessage.content,
createdAt: new Date().toString(),
focusMode: parsedWSMessage.focusMode,
})
.execute();
}
await db
.insert(messages)
.values({
content: parsedMessage.content,
chatId: parsedMessage.chatId,
messageId: id,
role: 'user',
metadata: JSON.stringify({
createdAt: new Date(),
}),
})
.execute();
} else { } else {
ws.send( ws.send(
JSON.stringify({ JSON.stringify({