implemented logs

This commit is contained in:
biersoeckli
2024-11-02 11:10:43 +00:00
parent 51628314fc
commit a45b854c32
6 changed files with 122 additions and 96 deletions

View File

@@ -16,6 +16,9 @@ export default function LogsStreamed({
const [logs, setLogs] = useState<string>('');
useEffect(() => {
const logEventName = `${app.projectId}_${app.id}_${podName}`;
function onConnect() {
setIsConnected(true);
setTransport(podLogsSocket.io.engine.transport.name);
@@ -42,12 +45,15 @@ export default function LogsStreamed({
podLogsSocket.on("connect", onConnect);
podLogsSocket.on("disconnect", onDisconnect);
podLogsSocket.on(`logs_${app.projectId}_${app.id}_${podName}`, myListener);
podLogsSocket.on(logEventName, myListener);
return () => {
setLogs('');
podLogsSocket.off("connect", onConnect);
podLogsSocket.off("disconnect", onDisconnect);
podLogsSocket.off(`logs_${app.projectId}_${app.id}_${podName}`, myListener);
podLogsSocket.off(logEventName, myListener);
podLogsSocket.disconnect();
};
}, [app, podName]);

View File

@@ -1,8 +1,7 @@
import { createServer } from 'http'
import { parse } from 'url'
import next from 'next'
import webSocketHandler from './socket-io'
import { Server } from 'socket.io'
import socketIoServer from './socket-io.server'
// Source: https://nextjs.org/docs/app/building-your-application/configuring/custom-server
@@ -18,9 +17,7 @@ app.prepare().then(() => {
handle(req, res, parsedUrl)
});
webSocketHandler.initializeSocketIo(server);
//const io = new Server(server);
socketIoServer.initialize(server);
server.listen(port)

View File

@@ -0,0 +1,96 @@
import { revalidateTag, unstable_cache } from "next/cache";
import dataAccess from "../adapter/db.client";
import { Tags } from "../utils/cache-tag-generator.utils";
import { App, Prisma, Project } from "@prisma/client";
import { StringUtils } from "../utils/string.utils";
import deploymentService from "./deployment.service";
import k3s from "../adapter/kubernetes-api.adapter";
import { DefaultEventsMap, Socket } from "socket.io";
import stream from "stream";
import { PodsInfoModel } from "@/model/pods-info.model";
class LogStreamService {
activeStreams = new Map<string, { logStream: stream.PassThrough, clients: number, k3sStreamRequest: any }>();
async streamLogs(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>) {
console.log('Client connected:', socket.id);
socket.on('joinPodLog', async (podInfo) => {
const { appId, podName } = podInfo;
if (!appId || !podName) {
return;
}
const app = await dataAccess.client.app.findFirstOrThrow({
where: {
id: appId
}
});
const pod = await deploymentService.getPodByName(app.projectId, podName);
const streamKey = `${app.projectId}_${app.id}_${pod.podName}`;
const existingActiveStream = this.activeStreams.get(streamKey);
if (!existingActiveStream) {
// create stream if not existing
const retVal = await this.createLogStreamForPod(socket, streamKey, app, pod);
this.activeStreams.set(streamKey, retVal);
}
// Client dem Raum hinzufügen und Anzahl der Clients für diesen Pod erhöhen
socket.join(streamKey);
this.activeStreams.get(streamKey)!.clients += 1;
console.log(`Client ${socket.id} joined log stream for ${streamKey}`);
});
socket.on('disconnecting', () => {
// Über alle Räume iterieren, die dieser Socket abonniert hat
for (const streamKey of Array.from(socket.rooms)) {
const existingActiveStream = this.activeStreams.get(streamKey);
if (existingActiveStream) {
// Anzahl der Clients für diesen Stream verringern
existingActiveStream.clients -= 1;
console.log(`Client ${socket.id} left log stream for ${streamKey}`);
// Falls keine Clients mehr übrig sind, den Stream beenden
if (existingActiveStream.clients === 0) {
this.deleteLogStream(existingActiveStream, streamKey);
}
}
}
});
}
private deleteLogStream(existingActiveStream: { logStream: stream.PassThrough; clients: number; k3sStreamRequest: any; }, streamKey: string) {
existingActiveStream.logStream.end();
existingActiveStream.k3sStreamRequest.abort();
this.activeStreams.delete(streamKey);
console.log(`Stopped log stream for ${streamKey} as no clients are listening.`);
}
private async createLogStreamForPod(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>, streamKey: string, app: App, pod: PodsInfoModel) {
const logStream = new stream.PassThrough();
logStream.on('data', (chunk) => {
socket.emit(streamKey, chunk.toString());
});
logStream.on('data', (chunk) => {
socket.to(streamKey).emit(`${streamKey}`, chunk.toString());
});
let k3sStreamRequest = await k3s.log.log(app.projectId, pod.podName, pod.containerName, logStream, {
follow: true,
pretty: false,
tailLines: 100,
}); /*.catch((err) => {
console.error(`Error streaming logs for ${streamKey}:`, err);
logStream.end();
});*/
const retVal = { logStream, clients: 0, k3sStreamRequest };
return retVal;
}
}
const logService = new LogStreamService();
export default logService;

View File

@@ -1,24 +0,0 @@
import { revalidateTag, unstable_cache } from "next/cache";
import dataAccess from "../adapter/db.client";
import { Tags } from "../utils/cache-tag-generator.utils";
import { Prisma, Project } from "@prisma/client";
import { StringUtils } from "../utils/string.utils";
import deploymentService from "./deployment.service";
import k3s from "../adapter/kubernetes-api.adapter";
class LogService {
/*
async streamLogs(namespace: string, podName: string, containerName: string, logStream: NodeJS.WritableStream) {
const req = await k3s.log.log(namespace, podName, containerName, logStream, {
follow: true,
pretty: false,
tailLines: 100,
});
return req;
}*/
}
const logService = new LogService();
export default logService;

16
src/socket-io.server.ts Normal file
View File

@@ -0,0 +1,16 @@
import type http from "node:http";
import { Server } from "socket.io";
import logService from "./server/services/log-stream.service";
class SocketIoServer {
initialize(server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>) {
const io = new Server(server);
const podLogsNamespace = io.of("/pod-logs");
podLogsNamespace.on("connection", (socket) => {
logService.streamLogs(socket);
});
};
}
const socketIoServer = new SocketIoServer();
export default socketIoServer;

View File

@@ -1,65 +0,0 @@
import type http from "node:http";
import { Server } from "socket.io";
import k3s from "./server/adapter/kubernetes-api.adapter";
import stream from "stream";
import appService from "./server/services/app.service";
import deploymentService from "./server/services/deployment.service";
import dataAccess from "./server/adapter/db.client";
class WebSocketHandler {
initializeSocketIo(server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>) {
const io = new Server(server);
const podLogsNamespace = io.of("/pod-logs");
podLogsNamespace.on("connection", (socket) => {
let req: any;
socket.on('joinPodLog', async (podInfo) => {
const { appId, podName } = podInfo;
if (!appId || !podName) {
return;
}
const app = await dataAccess.client.app.findFirstOrThrow({
where: {
id: appId
}
});
const pod = await deploymentService.getPodByName(app.projectId, podName);
const logStream = new stream.PassThrough();
logStream.on('data', (chunk) => {
socket.emit(`logs_${app.projectId}_${app.id}_${pod.podName}`, chunk.toString());
});
req = await k3s.log.log(app.projectId, pod.podName, pod.containerName, logStream, {
follow: true,
pretty: false,
tailLines: 100,
});
// on disconnect from client
});
socket.on('disconnect', () => {
if (req) {
req.abort();
console.log("Aborted request");
}
});
});
};
}
const webSocketHandler = new WebSocketHandler();
export default webSocketHandler;