added first version of terminal backend

This commit is contained in:
biersoeckli
2024-11-27 16:59:44 +00:00
parent 8a3e4dbbda
commit 65abd04ee7
9 changed files with 190 additions and 74 deletions

BIN
bun.lockb

Binary file not shown.

View File

@@ -35,6 +35,8 @@
"@tanstack/react-table": "^8.20.5",
"@types/bcrypt": "^5.0.2",
"@types/qrcode": "^1.5.5",
"@types/ws": "^8.5.13",
"@xterm/xterm": "^5.5.0",
"bcrypt": "^5.1.1",
"class-variance-authority": "^0.7.0",
"clsx": "^2.1.1",
@@ -65,6 +67,7 @@
"ts-node": "^10.9.2",
"typedi": "^0.10.0",
"vaul": "^1.1.0",
"ws": "^8.18.0",
"zod": "^3.23.8",
"zustand": "^5.0.1"
},

View File

@@ -4,5 +4,4 @@ import { Manager } from "socket.io-client";
const manager = new Manager();
export const podLogsSocket = manager.socket("/pod-logs");
//export const deploymentStatusSocket = manager.socket("/deployment-status");
export const podTerminalSocket = manager.socket("/pod-terminal");

View File

@@ -1,74 +1,4 @@
import * as k8s from '@kubernetes/client-node';
/*
const getKubeConfig = () => {
const kc = new k8s.KubeConfig();
if (process.env.NODE_ENV === 'production') {
kc.loadFromCluster();
} else {
kc.loadFromFile('/workspace/kube-config.config');
}
return kc;
}
const getK8sCoreApiClient = () => {
const kc = getKubeConfig()
const k8sCoreClient = kc.makeApiClient(k8s.CoreV1Api);
return k8sCoreClient;
}
const k8sCoreClient = globalThis.k8sCoreGlobal ?? getK8sCoreApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sCoreGlobal = k8sCoreClient
const getK8sAppsApiClient = () => {
const kc = getKubeConfig()
const k8sCoreClient = kc.makeApiClient(k8s.AppsV1Api);
return k8sCoreClient;
}
const k8sAppsClient = globalThis.k8sAppsGlobal ?? getK8sAppsApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sAppsGlobal = k8sAppsClient
const getK8sBatchApiClient = () => {
const kc = getKubeConfig()
const k8sJobClient = kc.makeApiClient(k8s.BatchV1Api);
return k8sJobClient;
}
const k8sJobClient = globalThis.k8sJobGlobal ?? getK8sBatchApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sJobGlobal = k8sJobClient
const getK8sLogApiClient = () => {
const kc = getKubeConfig()
const logClient = new k8s.Log(kc)
return logClient;
}
const k8sLogClient = globalThis.k8sLogGlobal ?? getK8sLogApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sLogGlobal = k8sLogClient
const getK8sCustomObjectsApiClient = () => {
const kc = getKubeConfig()
const client = kc.makeApiClient(k8s.CustomObjectsApi);
return client;
}
const k8sCustomObjectsClient = globalThis.k8sCustomObjectsGlobal ?? getK8sCustomObjectsApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sCustomObjectsGlobal = k8sCustomObjectsClient
const getK8sNetworkApiClient = () => {
const kc = getKubeConfig()
const networkClient = kc.makeApiClient(k8s.NetworkingV1Api);
return networkClient;
}
const k8sNetworkClient = globalThis.k8sNetworkGlobal ?? getK8sNetworkApiClient()
if (process.env.NODE_ENV !== 'production') globalThis.k8sNetworkGlobal = k8sNetworkClient
declare const globalThis: {
k8sCoreGlobal: ReturnType<typeof getK8sCoreApiClient>;
k8sAppsGlobal: ReturnType<typeof getK8sAppsApiClient>;
k8sJobGlobal: ReturnType<typeof getK8sBatchApiClient>;
k8sLogGlobal: ReturnType<typeof getK8sLogApiClient>;
k8sNetworkGlobal: ReturnType<typeof getK8sNetworkApiClient>;
k8sCustomObjectsGlobal: ReturnType<typeof getK8sCustomObjectsApiClient>;
} & typeof global;
*/
class K3sApiAdapter {

View File

@@ -0,0 +1,131 @@
import { TerminalSetupInfoModel, terminalSetupInfoZodModel } from "../../shared/model/terminal-setup-info.model";
import { DefaultEventsMap, Socket } from "socket.io";
import setupPodService from "./setup-services/setup-pod.service";
import k3s from "../adapter/kubernetes-api.adapter";
import * as k8s from '@kubernetes/client-node';
import stream from 'stream';
import { StreamUtils } from "@/shared/utils/stream.utils";
import WebSocket from "ws";
interface TerminalStrean {
stdoutStream: stream.PassThrough;
stderrStream: stream.PassThrough;
stdinStream: stream.PassThrough;
streamInputKey: string;
streamOutputKey: string;
websocket: WebSocket.WebSocket;
}
export class TerminalService {
activeStreams = new Map<string, { logStream: stream.PassThrough, clients: number, k3sStreamRequest: any }>();
async streamLogs(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>) {
console.log('Client connected:', socket.id);
const streamsOfSocket: TerminalStrean[] = [];
socket.on('openTerminal', async (podInfo) => {
const terminalInfo = terminalSetupInfoZodModel.parse(podInfo);
const streamInputKey = StreamUtils.getInputStreamName(terminalInfo);
const streamOutputKey = StreamUtils.getOutputStreamName(terminalInfo);
const podReachable = await setupPodService.waitUntilPodIsRunningFailedOrSucceded(terminalInfo.namespace, terminalInfo.podName);
if (!podReachable) {
socket.emit(streamOutputKey);
return;
}
const exec = new k8s.Exec(k3s.getKubeConfig());
const stdoutStream = new stream.PassThrough();
const stderrStream = new stream.PassThrough();
const stdinStream = new stream.PassThrough();
const websocket = await exec.exec(
terminalInfo.namespace,
terminalInfo.podName,
terminalInfo.containerName,
['/bin/sh'],
stdoutStream,
stderrStream,
stdinStream,
true /* tty */,
(status: k8s.V1Status) => {
console.log('Exited with status:');
console.log(JSON.stringify(status, null, 2));
stderrStream!.end();
stdoutStream!.end();
stdinStream!.end();
},
);
stdoutStream.on('data', (chunk) => {
socket.emit(streamOutputKey, chunk.toString());
});
stderrStream.on('data', (chunk) => {
socket.emit(streamOutputKey, chunk.toString());
});
socket.on(streamInputKey, (data) => {
stdinStream!.write(data);
});
streamsOfSocket.push({ stdoutStream, stderrStream, stdinStream, streamInputKey, streamOutputKey, websocket });
console.log(`Client ${socket.id} joined log stream for ${stdoutStream}`);
});
socket.on('closeTerminal', (podInfo) => {
const terminalInfo = terminalSetupInfoZodModel.parse(podInfo);
const streamInputKey = StreamUtils.getInputStreamName(terminalInfo);
const streams = streamsOfSocket.find(stream => stream.streamInputKey === streamInputKey);
if (streams) {
this.deleteLogStream(streams);
}
});
socket.on('disconnecting', () => {
// Stop all log streams for this client
for (const stream of streamsOfSocket) {
this.deleteLogStream(stream);
}
});
}
private deleteLogStream(streams: TerminalStrean) {
streams.stderrStream.end();
streams.stdoutStream.end();
streams.stdinStream.end();
streams.websocket.close();
console.log(`Stopped log stream for ${streams.streamInputKey}.`);
}
/*
private async createLogStreamForPod(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap, any>,
streamKey: string, inputInfo: TerminalSetupInfoModel) {
logStream.on('data', (chunk) => {
socket.emit(streamKey, chunk.toString());
});
logStream.on('data', (chunk) => {
socket.to(streamKey).emit(`${streamKey}`, chunk.toString());
});
let k3sStreamRequest = await k3s.log.log(app.projectId, pod.podName, pod.containerName, logStream, {
follow: true,
pretty: false,
tailLines: 100,
});
const retVal = { logStream, clients: 0, k3sStreamRequest };
return retVal;
}*/
}
const terminalService = new TerminalService();
export default terminalService;

View File

@@ -0,0 +1,9 @@
import { z } from "zod";
export const terminalSetupInfoZodModel = z.object({
namespace: z.string().min(1),
podName: z.string().min(1),
containerName: z.string().min(1),
});
export type TerminalSetupInfoModel = z.infer<typeof terminalSetupInfoZodModel>;

View File

@@ -0,0 +1,12 @@
import { TerminalSetupInfoModel } from "../model/terminal-setup-info.model";
export class StreamUtils {
static getInputStreamName(terminalInfo: TerminalSetupInfoModel) {
return `${terminalInfo.namespace}_${terminalInfo.podName}_${terminalInfo.containerName}_input`;
}
static getOutputStreamName(terminalInfo: TerminalSetupInfoModel) {
return `${terminalInfo.namespace}_${terminalInfo.podName}_${terminalInfo.containerName}_output`;
}
}

View File

@@ -1,12 +1,13 @@
import type http from "node:http";
import { Server } from "socket.io";
import terminalService from "./server/services/terminal.service";
class SocketIoServer {
initialize(server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>) {
const io = new Server(server);
const podLogsNamespace = io.of("/pod-logs");
const podLogsNamespace = io.of("/pod-terminal");
podLogsNamespace.on("connection", (socket) => {
//logService.streamLogs(socket);
terminalService.streamLogs(socket);
});
};
}

31
src/websocket.server.ts Normal file
View File

@@ -0,0 +1,31 @@
import { WebSocket } from "ws";
import type http from "node:http";
export default async function initializeWebsocket(server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>) {
// Create a WebSocket server by passing the HTTP server
const wss = new WebSocket.Server({ server });
// Event handler for WebSocket connections
wss.on('connection', (ws) => {
console.log('A new client has connected.');
// Event handler for incoming messages from clients
ws.on('message', (message) => {
console.log(`Received: ${message}`);
// Broadcast the received message to all connected clients
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
});
// Event handler for WebSocket connection closing
ws.on('close', () => {
console.log('A client has disconnected.');
});
});
}