From 35ecb7ab213a5c27c40d03da0f36170a20417f5f Mon Sep 17 00:00:00 2001 From: biersoeckli Date: Fri, 8 Nov 2024 15:48:49 +0000 Subject: [PATCH] rewrite logs to server events --- src/app/api/pod-logs/route.ts | 104 ++++++++++++++++++ .../[tabName]/overview/build-logs-overlay.tsx | 8 +- .../app/[tabName]/overview/logs-streamed.tsx | 76 +++++++------ src/server/services/deployment.service.ts | 3 +- src/server/services/pod.service.ts | 33 ++++++ src/server/utils/action-wrapper.utils.ts | 60 +++++----- 6 files changed, 213 insertions(+), 71 deletions(-) create mode 100644 src/app/api/pod-logs/route.ts create mode 100644 src/server/services/pod.service.ts diff --git a/src/app/api/pod-logs/route.ts b/src/app/api/pod-logs/route.ts new file mode 100644 index 0000000..5bf22b1 --- /dev/null +++ b/src/app/api/pod-logs/route.ts @@ -0,0 +1,104 @@ +import buildService, { buildNamespace } from "@/server/services/build.service"; +import deploymentService from "@/server/services/deployment.service"; +import { z } from "zod"; +import stream from "stream"; +import k3s from "@/server/adapter/kubernetes-api.adapter"; +import { simpleAction, simpleRoute } from "@/server/utils/action-wrapper.utils"; +import podService from "@/server/services/pod.service"; + +// Prevents this route's response from being cached +export const dynamic = "force-dynamic"; + +const zodInputModel = z.object({ + namespace: z.string().optional(), + podName: z.string().optional(), + buildJobName: z.string().optional(), +}); + +export async function POST(request: Request) { + return simpleRoute(async () => { + const input = await request.json(); + console.log(input) + const podInfo = zodInputModel.parse(input); + let { namespace, podName, buildJobName } = podInfo; + let pod; + let streamKey; + if (namespace && podName) { + pod = await deploymentService.getPodByName(namespace, podName); + streamKey = `${namespace}_${podName}`; + + } else if (buildJobName) { + namespace = buildNamespace; + pod = await buildService.getPodForJob(buildJobName); + streamKey = `${buildJobName}`; + + } else { + console.error('Invalid pod info for streaming logs', podInfo); + return new Response("Invalid pod info", { status: 400 }); + } + console.log('pod', pod) + + let k3sStreamRequest: any | undefined; + let logStream: stream.PassThrough | undefined; + let streamEndedByClient = false; + + const encoder = new TextEncoder(); + const customReadable = new ReadableStream({ + start(controller) { + const innerFunc = async () => { + console.log(`[CONNECT] Client joined log stream for ${streamKey}`); + controller.enqueue(encoder.encode('Connected\n')); + + if (namespace !== buildNamespace) { + // container logs and not build logs + await podService.waitUntilPodIsRunningFailedOrSucceded(namespace, pod.podName); // has timeout onfigured + } + logStream = new stream.PassThrough(); + + k3sStreamRequest = await k3s.log.log(namespace, pod.podName, pod.containerName, logStream, { + follow: true, + tailLines: namespace === buildNamespace ? undefined : 100, + timestamps: true, + pretty: false, + previous: false + }); + + logStream.on('data', (chunk) => { + controller.enqueue(encoder.encode(chunk.toString())); + }); + + logStream.on('error', (error) => { + controller.enqueue(encoder.encode('[ERROR] An unexpected error occurred while streaming logs.\n')); + console.error("Error in log stream:", error); + }); + + logStream.on('end', () => { + console.log(`[END] Log stream ended for ${streamKey} by ${streamEndedByClient ? 'client' : 'server'}`); + if (!streamEndedByClient) { + controller.enqueue(encoder.encode('[INFO] Log stream closed by Pod.')); + controller.close(); + } + }); + }; + innerFunc(); + }, + cancel() { + streamEndedByClient = true; + logStream?.end(); + k3sStreamRequest?.abort(); + console.log(`[DISCONNECTED] Client disconnected log stream for ${streamKey}`); + }, + + }) + + return new Response(customReadable, { + // Set the headers for Server-Sent Events (SSE) + headers: { + Connection: "keep-alive", + "Content-Encoding": "none", + "Cache-Control": "no-cache, no-transform", + "Content-Type": "text/event-stream; charset=utf-8", + }, + }) + }); +} diff --git a/src/app/project/app/[tabName]/overview/build-logs-overlay.tsx b/src/app/project/app/[tabName]/overview/build-logs-overlay.tsx index 60cfd69..80ec53f 100644 --- a/src/app/project/app/[tabName]/overview/build-logs-overlay.tsx +++ b/src/app/project/app/[tabName]/overview/build-logs-overlay.tsx @@ -14,7 +14,7 @@ import React, { useEffect } from "react"; import { set } from "date-fns"; import { DeploymentInfoModel } from "@/model/deployment-info.model"; import LogsStreamed from "./logs-streamed"; -import { formatDate } from "@/lib/format.utils"; +import { formatDate, formatDateTime } from "@/lib/format.utils"; import { podLogsSocket } from "@/lib/sockets"; export function BuildLogsDialog({ @@ -34,14 +34,14 @@ export function BuildLogsDialog({ podLogsSocket.emit('leavePodLog', { streamKey: deploymentInfo.buildJobName }); onClose(); }}> - + Build Logs - View the build logs for the selected deployment {formatDate(deploymentInfo.createdAt)}. + View the build logs for the selected deployment {formatDateTime(deploymentInfo.createdAt)}. -
+
{!deploymentInfo.buildJobName && 'For this build is no log available'} {deploymentInfo.buildJobName && }
diff --git a/src/app/project/app/[tabName]/overview/logs-streamed.tsx b/src/app/project/app/[tabName]/overview/logs-streamed.tsx index d5cbb63..e9dfcec 100644 --- a/src/app/project/app/[tabName]/overview/logs-streamed.tsx +++ b/src/app/project/app/[tabName]/overview/logs-streamed.tsx @@ -1,5 +1,4 @@ import { useEffect, useRef, useState } from "react"; -import { podLogsSocket } from "@/lib/sockets"; import { Textarea } from "@/components/ui/textarea"; import React from "react"; @@ -13,54 +12,59 @@ export default function LogsStreamed({ buildJobName?: string; }) { const [isConnected, setIsConnected] = useState(false); - const [transport, setTransport] = useState("N/A"); const [logs, setLogs] = useState(''); const textAreaRef = useRef(null); - function onConnect() { - setIsConnected(true); - setTransport(podLogsSocket.io.engine.transport.name); - podLogsSocket.io.engine.on("upgrade", (transport) => { - setTransport(transport.name); + + const initializeConnection = async (controller: AbortController) => { + // Initiate the first call to connect to SSE API + + setLogs('Loading...'); + + const signal = controller.signal; + const apiResponse = await fetch('/api/pod-logs', { + method: "POST", + headers: { + "Content-Type": "text/event-stream", + }, + body: JSON.stringify({ namespace, podName, buildJobName }), + signal: signal, }); - } - function onDisconnect() { - setIsConnected(false); - setTransport("N/A"); - } + if (!apiResponse.ok) return; + if (!apiResponse.body) return; + setIsConnected(true); - const myListener = (e: string) => { - setLogs((prevLogs) => prevLogs + e); + // To decode incoming data as a string + const reader = apiResponse.body + .pipeThrough(new TextDecoderStream()) + .getReader(); + + setLogs(''); + while (true) { + const { value, done } = await reader.read(); + if (done) { + setIsConnected(false); + break; + } + if (value) { + setLogs((prevLogs) => prevLogs + value); + } + } } useEffect(() => { if (!buildJobName && (!namespace || !podName)) { return; } - const streamKey = buildJobName ? buildJobName : `${namespace}_${podName}`; - console.log('Connecting to logs ' + streamKey); + const controller = new AbortController(); + initializeConnection(controller); - if (podLogsSocket.connected) { - onConnect(); - } - - podLogsSocket.emit('joinPodLog', { namespace, podName, buildJobName }); - - podLogsSocket.on("connect", onConnect); - podLogsSocket.on("disconnect", onDisconnect); - podLogsSocket.on(streamKey, myListener); return () => { - if (!podName) { - return; - } - console.log('Disconnecting from logs ' + streamKey); - podLogsSocket.emit('leavePodLog', { streamKey: streamKey }); + console.log('Disconnecting from logs'); setLogs(''); - podLogsSocket.off("connect", onConnect); - podLogsSocket.off("disconnect", onDisconnect); - podLogsSocket.off(streamKey, myListener); + controller.abort(); }; }, [namespace, podName, buildJobName]); @@ -72,7 +76,9 @@ export default function LogsStreamed({ }, [logs]); return <> -