diff --git a/src/server/services/log-stream.service.ts b/src/server/services/log-stream.service.ts deleted file mode 100644 index c63ec6b..0000000 --- a/src/server/services/log-stream.service.ts +++ /dev/null @@ -1,100 +0,0 @@ -import deploymentService from "./deployment.service"; -import k3s from "../adapter/kubernetes-api.adapter"; -import { DefaultEventsMap, Socket } from "socket.io"; -import stream from "stream"; -import { PodsInfoModel } from "@/model/pods-info.model"; -import buildService, { buildNamespace } from "./build.service"; - -class LogStreamService { - - async streamLogs(socket: Socket) { - console.log('[CONNECT] Client connected:', socket.id); - - type socketStreamsBody = { - logStream: stream.PassThrough, - k3sStreamRequest: any - }; - const socketStreams = new Map(); - - socket.on('joinPodLog', (podInfo) => this.streamWrapper(socket, async () => { - let { namespace, podName, buildJobName } = podInfo; - let pod; - let streamKey; - if (namespace && podName) { - pod = await deploymentService.getPodByName(namespace, podName); - streamKey = `${namespace}_${podName}`; - - } else if (buildJobName) { - namespace = buildNamespace; - pod = await buildService.getPodForJob(buildJobName); - streamKey = `${buildJobName}`; - - } else { - console.error('Invalid pod info for streaming logs', podInfo); - return; - } - - if (socketStreams.has(streamKey)) { - console.error(`[INFO] Client ${socket.id} already joined log stream for ${streamKey}`); - return; - } - - // create stream if not existing - const retVal = await this.createLogStreamForPod(socket, streamKey, namespace, pod); - socketStreams.set(streamKey, { - logStream: retVal.logStream, - k3sStreamRequest: retVal.k3sStreamRequest - }); - - console.log(`[JOIN] Client ${socket.id} joined log stream for ${streamKey}`); - })); - - socket.on('leavePodLog', (data) => { - const streamKey = data?.streamKey; - const socketInfo = socketStreams.get(streamKey); - socketStreams.delete(streamKey); - socketInfo?.logStream?.end(); - socketInfo?.k3sStreamRequest?.abort(); - console.log(`[LEAVE] Client ${socket.id} left log stream for ${streamKey}`); - }); - - socket.on('disconnect', () => { - const streamKeys = Array.from(socketStreams.keys()); - for (const [streamKey, socketInfo] of Array.from(socketStreams.entries())) { - socketInfo?.logStream?.end(); - socketInfo?.k3sStreamRequest?.abort(); - } - socketStreams.clear(); - console.log(`[DISCONNECTED] Client ${socket.id} disconnected log stream for ${streamKeys}`); - }); - } - - private async createLogStreamForPod(socket: Socket, streamKey: string, namespace: string, pod: PodsInfoModel) { - const logStream = new stream.PassThrough(); - logStream.on('data', (chunk) => { - socket.emit(streamKey, chunk.toString()); - }); - - let k3sStreamRequest = await k3s.log.log(namespace, pod.podName, pod.containerName, logStream, { - follow: true, - tailLines: namespace === buildNamespace ? undefined : 100, - previous: false, - timestamps: true, - pretty: false - }); - return { logStream, k3sStreamRequest }; - } - - private async streamWrapper(socket: Socket, - func: () => Promise) { - try { - return await func(); - } catch (ex) { - console.error(ex); - socket.emit('error', (ex as Error)?.message ?? 'An unknown error occurred.'); - } - } -} - -const logService = new LogStreamService(); -export default logService; diff --git a/src/server/utils/action-wrapper.utils.ts b/src/server/utils/action-wrapper.utils.ts index 1706249..a65d133 100644 --- a/src/server/utils/action-wrapper.utils.ts +++ b/src/server/utils/action-wrapper.utils.ts @@ -6,6 +6,7 @@ import { redirect } from "next/navigation"; import { ServerActionResult } from "@/model/server-action-error-return.model"; import { FormValidationException } from "@/model/form-validation-exception.model"; import { authOptions } from "@/lib/auth-options"; +import { NextResponse } from "next/server"; /** * THIS FUNCTION RETURNS NULL IF NO USER IS LOGGED IN @@ -107,21 +108,21 @@ export async function simpleRoute( funcResult = await func(); } catch (ex) { if (ex instanceof FormValidationException) { - return { + return NextResponse.json({ status: 'error', message: ex.message - }; + }); } else if (ex instanceof ServiceException) { - return { + return NextResponse.json({ status: 'error', message: ex.message - }; + }); } else { console.error(ex) - return { + return NextResponse.json({ status: 'error', message: 'An unknown error occurred.' - }; + }); } } return funcResult; diff --git a/src/socket-io.server.ts b/src/socket-io.server.ts index 91e3fc7..ac49b0a 100644 --- a/src/socket-io.server.ts +++ b/src/socket-io.server.ts @@ -1,13 +1,12 @@ import type http from "node:http"; import { Server } from "socket.io"; -import logService from "./server/services/log-stream.service"; class SocketIoServer { initialize(server: http.Server) { const io = new Server(server); const podLogsNamespace = io.of("/pod-logs"); podLogsNamespace.on("connection", (socket) => { - logService.streamLogs(socket); + //logService.streamLogs(socket); }); }; }