mirror of
https://github.com/biersoeckli/QuickStack.git
synced 2026-01-05 02:59:54 -06:00
removed socket.io log stream
This commit is contained in:
@@ -1,100 +0,0 @@
|
||||
import deploymentService from "./deployment.service";
|
||||
import k3s from "../adapter/kubernetes-api.adapter";
|
||||
import { DefaultEventsMap, Socket } from "socket.io";
|
||||
import stream from "stream";
|
||||
import { PodsInfoModel } from "@/model/pods-info.model";
|
||||
import buildService, { buildNamespace } from "./build.service";
|
||||
|
||||
class LogStreamService {
|
||||
|
||||
async streamLogs(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>) {
|
||||
console.log('[CONNECT] Client connected:', socket.id);
|
||||
|
||||
type socketStreamsBody = {
|
||||
logStream: stream.PassThrough,
|
||||
k3sStreamRequest: any
|
||||
};
|
||||
const socketStreams = new Map<string, socketStreamsBody>();
|
||||
|
||||
socket.on('joinPodLog', (podInfo) => this.streamWrapper(socket, async () => {
|
||||
let { namespace, podName, buildJobName } = podInfo;
|
||||
let pod;
|
||||
let streamKey;
|
||||
if (namespace && podName) {
|
||||
pod = await deploymentService.getPodByName(namespace, podName);
|
||||
streamKey = `${namespace}_${podName}`;
|
||||
|
||||
} else if (buildJobName) {
|
||||
namespace = buildNamespace;
|
||||
pod = await buildService.getPodForJob(buildJobName);
|
||||
streamKey = `${buildJobName}`;
|
||||
|
||||
} else {
|
||||
console.error('Invalid pod info for streaming logs', podInfo);
|
||||
return;
|
||||
}
|
||||
|
||||
if (socketStreams.has(streamKey)) {
|
||||
console.error(`[INFO] Client ${socket.id} already joined log stream for ${streamKey}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// create stream if not existing
|
||||
const retVal = await this.createLogStreamForPod(socket, streamKey, namespace, pod);
|
||||
socketStreams.set(streamKey, {
|
||||
logStream: retVal.logStream,
|
||||
k3sStreamRequest: retVal.k3sStreamRequest
|
||||
});
|
||||
|
||||
console.log(`[JOIN] Client ${socket.id} joined log stream for ${streamKey}`);
|
||||
}));
|
||||
|
||||
socket.on('leavePodLog', (data) => {
|
||||
const streamKey = data?.streamKey;
|
||||
const socketInfo = socketStreams.get(streamKey);
|
||||
socketStreams.delete(streamKey);
|
||||
socketInfo?.logStream?.end();
|
||||
socketInfo?.k3sStreamRequest?.abort();
|
||||
console.log(`[LEAVE] Client ${socket.id} left log stream for ${streamKey}`);
|
||||
});
|
||||
|
||||
socket.on('disconnect', () => {
|
||||
const streamKeys = Array.from(socketStreams.keys());
|
||||
for (const [streamKey, socketInfo] of Array.from(socketStreams.entries())) {
|
||||
socketInfo?.logStream?.end();
|
||||
socketInfo?.k3sStreamRequest?.abort();
|
||||
}
|
||||
socketStreams.clear();
|
||||
console.log(`[DISCONNECTED] Client ${socket.id} disconnected log stream for ${streamKeys}`);
|
||||
});
|
||||
}
|
||||
|
||||
private async createLogStreamForPod(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>, streamKey: string, namespace: string, pod: PodsInfoModel) {
|
||||
const logStream = new stream.PassThrough();
|
||||
logStream.on('data', (chunk) => {
|
||||
socket.emit(streamKey, chunk.toString());
|
||||
});
|
||||
|
||||
let k3sStreamRequest = await k3s.log.log(namespace, pod.podName, pod.containerName, logStream, {
|
||||
follow: true,
|
||||
tailLines: namespace === buildNamespace ? undefined : 100,
|
||||
previous: false,
|
||||
timestamps: true,
|
||||
pretty: false
|
||||
});
|
||||
return { logStream, k3sStreamRequest };
|
||||
}
|
||||
|
||||
private async streamWrapper<T>(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>,
|
||||
func: () => Promise<T>) {
|
||||
try {
|
||||
return await func();
|
||||
} catch (ex) {
|
||||
console.error(ex);
|
||||
socket.emit('error', (ex as Error)?.message ?? 'An unknown error occurred.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const logService = new LogStreamService();
|
||||
export default logService;
|
||||
@@ -6,6 +6,7 @@ import { redirect } from "next/navigation";
|
||||
import { ServerActionResult } from "@/model/server-action-error-return.model";
|
||||
import { FormValidationException } from "@/model/form-validation-exception.model";
|
||||
import { authOptions } from "@/lib/auth-options";
|
||||
import { NextResponse } from "next/server";
|
||||
|
||||
/**
|
||||
* THIS FUNCTION RETURNS NULL IF NO USER IS LOGGED IN
|
||||
@@ -107,21 +108,21 @@ export async function simpleRoute<ReturnType>(
|
||||
funcResult = await func();
|
||||
} catch (ex) {
|
||||
if (ex instanceof FormValidationException) {
|
||||
return {
|
||||
return NextResponse.json({
|
||||
status: 'error',
|
||||
message: ex.message
|
||||
};
|
||||
});
|
||||
} else if (ex instanceof ServiceException) {
|
||||
return {
|
||||
return NextResponse.json({
|
||||
status: 'error',
|
||||
message: ex.message
|
||||
};
|
||||
});
|
||||
} else {
|
||||
console.error(ex)
|
||||
return {
|
||||
return NextResponse.json({
|
||||
status: 'error',
|
||||
message: 'An unknown error occurred.'
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
return funcResult;
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import type http from "node:http";
|
||||
import { Server } from "socket.io";
|
||||
import logService from "./server/services/log-stream.service";
|
||||
|
||||
class SocketIoServer {
|
||||
initialize(server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>) {
|
||||
const io = new Server(server);
|
||||
const podLogsNamespace = io.of("/pod-logs");
|
||||
podLogsNamespace.on("connection", (socket) => {
|
||||
logService.streamLogs(socket);
|
||||
//logService.streamLogs(socket);
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user