diff --git a/src/app/project/app/[tabName]/overview/logs-streamed.tsx b/src/app/project/app/[tabName]/overview/logs-streamed.tsx index 8350e5f..6bdf42c 100644 --- a/src/app/project/app/[tabName]/overview/logs-streamed.tsx +++ b/src/app/project/app/[tabName]/overview/logs-streamed.tsx @@ -16,6 +16,9 @@ export default function LogsStreamed({ const [logs, setLogs] = useState(''); useEffect(() => { + + const logEventName = `${app.projectId}_${app.id}_${podName}`; + function onConnect() { setIsConnected(true); setTransport(podLogsSocket.io.engine.transport.name); @@ -42,12 +45,15 @@ export default function LogsStreamed({ podLogsSocket.on("connect", onConnect); podLogsSocket.on("disconnect", onDisconnect); - podLogsSocket.on(`logs_${app.projectId}_${app.id}_${podName}`, myListener); + + podLogsSocket.on(logEventName, myListener); return () => { + setLogs(''); podLogsSocket.off("connect", onConnect); podLogsSocket.off("disconnect", onDisconnect); - podLogsSocket.off(`logs_${app.projectId}_${app.id}_${podName}`, myListener); + podLogsSocket.off(logEventName, myListener); + podLogsSocket.disconnect(); }; }, [app, podName]); diff --git a/src/server.ts b/src/server.ts index 8308cc5..45e859d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,8 +1,7 @@ import { createServer } from 'http' import { parse } from 'url' import next from 'next' -import webSocketHandler from './socket-io' -import { Server } from 'socket.io' +import socketIoServer from './socket-io.server' // Source: https://nextjs.org/docs/app/building-your-application/configuring/custom-server @@ -18,9 +17,7 @@ app.prepare().then(() => { handle(req, res, parsedUrl) }); - webSocketHandler.initializeSocketIo(server); - //const io = new Server(server); - + socketIoServer.initialize(server); server.listen(port) diff --git a/src/server/services/log-stream.service.ts b/src/server/services/log-stream.service.ts new file mode 100644 index 0000000..0dbaa68 --- /dev/null +++ b/src/server/services/log-stream.service.ts @@ -0,0 +1,96 @@ +import { revalidateTag, unstable_cache } from "next/cache"; +import dataAccess from "../adapter/db.client"; +import { Tags } from "../utils/cache-tag-generator.utils"; +import { App, Prisma, Project } from "@prisma/client"; +import { StringUtils } from "../utils/string.utils"; +import deploymentService from "./deployment.service"; +import k3s from "../adapter/kubernetes-api.adapter"; +import { DefaultEventsMap, Socket } from "socket.io"; +import stream from "stream"; +import { PodsInfoModel } from "@/model/pods-info.model"; + +class LogStreamService { + + activeStreams = new Map(); + + async streamLogs(socket: Socket) { + console.log('Client connected:', socket.id); + + socket.on('joinPodLog', async (podInfo) => { + const { appId, podName } = podInfo; + if (!appId || !podName) { + return; + } + const app = await dataAccess.client.app.findFirstOrThrow({ + where: { + id: appId + } + }); + + const pod = await deploymentService.getPodByName(app.projectId, podName); + + const streamKey = `${app.projectId}_${app.id}_${pod.podName}`; + const existingActiveStream = this.activeStreams.get(streamKey); + if (!existingActiveStream) { + // create stream if not existing + const retVal = await this.createLogStreamForPod(socket, streamKey, app, pod); + this.activeStreams.set(streamKey, retVal); + } + // Client dem Raum hinzufügen und Anzahl der Clients für diesen Pod erhöhen + socket.join(streamKey); + this.activeStreams.get(streamKey)!.clients += 1; + + console.log(`Client ${socket.id} joined log stream for ${streamKey}`); + }); + + socket.on('disconnecting', () => { + // Über alle Räume iterieren, die dieser Socket abonniert hat + for (const streamKey of Array.from(socket.rooms)) { + const existingActiveStream = this.activeStreams.get(streamKey); + if (existingActiveStream) { + // Anzahl der Clients für diesen Stream verringern + existingActiveStream.clients -= 1; + console.log(`Client ${socket.id} left log stream for ${streamKey}`); + + // Falls keine Clients mehr übrig sind, den Stream beenden + if (existingActiveStream.clients === 0) { + this.deleteLogStream(existingActiveStream, streamKey); + } + } + } + }); + } + + + private deleteLogStream(existingActiveStream: { logStream: stream.PassThrough; clients: number; k3sStreamRequest: any; }, streamKey: string) { + existingActiveStream.logStream.end(); + existingActiveStream.k3sStreamRequest.abort(); + this.activeStreams.delete(streamKey); + console.log(`Stopped log stream for ${streamKey} as no clients are listening.`); + } + + private async createLogStreamForPod(socket: Socket, streamKey: string, app: App, pod: PodsInfoModel) { + const logStream = new stream.PassThrough(); + logStream.on('data', (chunk) => { + socket.emit(streamKey, chunk.toString()); + }); + + logStream.on('data', (chunk) => { + socket.to(streamKey).emit(`${streamKey}`, chunk.toString()); + }); + + let k3sStreamRequest = await k3s.log.log(app.projectId, pod.podName, pod.containerName, logStream, { + follow: true, + pretty: false, + tailLines: 100, + }); /*.catch((err) => { + console.error(`Error streaming logs for ${streamKey}:`, err); + logStream.end(); + });*/ + const retVal = { logStream, clients: 0, k3sStreamRequest }; + return retVal; + } +} + +const logService = new LogStreamService(); +export default logService; diff --git a/src/server/services/log.service.ts b/src/server/services/log.service.ts deleted file mode 100644 index 5e4c3ef..0000000 --- a/src/server/services/log.service.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { revalidateTag, unstable_cache } from "next/cache"; -import dataAccess from "../adapter/db.client"; -import { Tags } from "../utils/cache-tag-generator.utils"; -import { Prisma, Project } from "@prisma/client"; -import { StringUtils } from "../utils/string.utils"; -import deploymentService from "./deployment.service"; -import k3s from "../adapter/kubernetes-api.adapter"; - -class LogService { -/* - async streamLogs(namespace: string, podName: string, containerName: string, logStream: NodeJS.WritableStream) { - const req = await k3s.log.log(namespace, podName, containerName, logStream, { - follow: true, - pretty: false, - tailLines: 100, - }); - - return req; - }*/ - -} - -const logService = new LogService(); -export default logService; diff --git a/src/socket-io.server.ts b/src/socket-io.server.ts new file mode 100644 index 0000000..91e3fc7 --- /dev/null +++ b/src/socket-io.server.ts @@ -0,0 +1,16 @@ +import type http from "node:http"; +import { Server } from "socket.io"; +import logService from "./server/services/log-stream.service"; + +class SocketIoServer { + initialize(server: http.Server) { + const io = new Server(server); + const podLogsNamespace = io.of("/pod-logs"); + podLogsNamespace.on("connection", (socket) => { + logService.streamLogs(socket); + }); + }; +} +const socketIoServer = new SocketIoServer(); +export default socketIoServer; + diff --git a/src/socket-io.ts b/src/socket-io.ts deleted file mode 100644 index 74cbb9a..0000000 --- a/src/socket-io.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type http from "node:http"; -import { Server } from "socket.io"; -import k3s from "./server/adapter/kubernetes-api.adapter"; -import stream from "stream"; -import appService from "./server/services/app.service"; -import deploymentService from "./server/services/deployment.service"; -import dataAccess from "./server/adapter/db.client"; - -class WebSocketHandler { - - - - initializeSocketIo(server: http.Server) { - - const io = new Server(server); - - const podLogsNamespace = io.of("/pod-logs"); - podLogsNamespace.on("connection", (socket) => { - - let req: any; - - socket.on('joinPodLog', async (podInfo) => { - const { appId, podName } = podInfo; - if (!appId || !podName) { - return; - } - const app = await dataAccess.client.app.findFirstOrThrow({ - where: { - id: appId - } - }); - - const pod = await deploymentService.getPodByName(app.projectId, podName); - - const logStream = new stream.PassThrough(); - logStream.on('data', (chunk) => { - socket.emit(`logs_${app.projectId}_${app.id}_${pod.podName}`, chunk.toString()); - }); - - req = await k3s.log.log(app.projectId, pod.podName, pod.containerName, logStream, { - follow: true, - pretty: false, - tailLines: 100, - }); - - - - // on disconnect from client - - }); - - socket.on('disconnect', () => { - if (req) { - req.abort(); - console.log("Aborted request"); - } - }); - }); - - }; - -} -const webSocketHandler = new WebSocketHandler(); -export default webSocketHandler; -