rewrite logs to server events

This commit is contained in:
biersoeckli
2024-11-08 15:48:49 +00:00
parent e3b7d82d09
commit 35ecb7ab21
6 changed files with 213 additions and 71 deletions

View File

@@ -0,0 +1,104 @@
import buildService, { buildNamespace } from "@/server/services/build.service";
import deploymentService from "@/server/services/deployment.service";
import { z } from "zod";
import stream from "stream";
import k3s from "@/server/adapter/kubernetes-api.adapter";
import { simpleAction, simpleRoute } from "@/server/utils/action-wrapper.utils";
import podService from "@/server/services/pod.service";
// Prevents this route's response from being cached
export const dynamic = "force-dynamic";
const zodInputModel = z.object({
namespace: z.string().optional(),
podName: z.string().optional(),
buildJobName: z.string().optional(),
});
export async function POST(request: Request) {
return simpleRoute(async () => {
const input = await request.json();
console.log(input)
const podInfo = zodInputModel.parse(input);
let { namespace, podName, buildJobName } = podInfo;
let pod;
let streamKey;
if (namespace && podName) {
pod = await deploymentService.getPodByName(namespace, podName);
streamKey = `${namespace}_${podName}`;
} else if (buildJobName) {
namespace = buildNamespace;
pod = await buildService.getPodForJob(buildJobName);
streamKey = `${buildJobName}`;
} else {
console.error('Invalid pod info for streaming logs', podInfo);
return new Response("Invalid pod info", { status: 400 });
}
console.log('pod', pod)
let k3sStreamRequest: any | undefined;
let logStream: stream.PassThrough | undefined;
let streamEndedByClient = false;
const encoder = new TextEncoder();
const customReadable = new ReadableStream({
start(controller) {
const innerFunc = async () => {
console.log(`[CONNECT] Client joined log stream for ${streamKey}`);
controller.enqueue(encoder.encode('Connected\n'));
if (namespace !== buildNamespace) {
// container logs and not build logs
await podService.waitUntilPodIsRunningFailedOrSucceded(namespace, pod.podName); // has timeout onfigured
}
logStream = new stream.PassThrough();
k3sStreamRequest = await k3s.log.log(namespace, pod.podName, pod.containerName, logStream, {
follow: true,
tailLines: namespace === buildNamespace ? undefined : 100,
timestamps: true,
pretty: false,
previous: false
});
logStream.on('data', (chunk) => {
controller.enqueue(encoder.encode(chunk.toString()));
});
logStream.on('error', (error) => {
controller.enqueue(encoder.encode('[ERROR] An unexpected error occurred while streaming logs.\n'));
console.error("Error in log stream:", error);
});
logStream.on('end', () => {
console.log(`[END] Log stream ended for ${streamKey} by ${streamEndedByClient ? 'client' : 'server'}`);
if (!streamEndedByClient) {
controller.enqueue(encoder.encode('[INFO] Log stream closed by Pod.'));
controller.close();
}
});
};
innerFunc();
},
cancel() {
streamEndedByClient = true;
logStream?.end();
k3sStreamRequest?.abort();
console.log(`[DISCONNECTED] Client disconnected log stream for ${streamKey}`);
},
})
return new Response(customReadable, {
// Set the headers for Server-Sent Events (SSE)
headers: {
Connection: "keep-alive",
"Content-Encoding": "none",
"Cache-Control": "no-cache, no-transform",
"Content-Type": "text/event-stream; charset=utf-8",
},
})
});
}

View File

@@ -14,7 +14,7 @@ import React, { useEffect } from "react";
import { set } from "date-fns";
import { DeploymentInfoModel } from "@/model/deployment-info.model";
import LogsStreamed from "./logs-streamed";
import { formatDate } from "@/lib/format.utils";
import { formatDate, formatDateTime } from "@/lib/format.utils";
import { podLogsSocket } from "@/lib/sockets";
export function BuildLogsDialog({
@@ -34,14 +34,14 @@ export function BuildLogsDialog({
podLogsSocket.emit('leavePodLog', { streamKey: deploymentInfo.buildJobName });
onClose();
}}>
<DialogContent className="w-[70%]">
<DialogContent className="sm:max-w-[1300px]">
<DialogHeader>
<DialogTitle>Build Logs</DialogTitle>
<DialogDescription>
View the build logs for the selected deployment {formatDate(deploymentInfo.createdAt)}.
View the build logs for the selected deployment {formatDateTime(deploymentInfo.createdAt)}.
</DialogDescription>
</DialogHeader>
<div className="grid gap-4 py-4">
<div >
{!deploymentInfo.buildJobName && 'For this build is no log available'}
{deploymentInfo.buildJobName && <LogsStreamed buildJobName={deploymentInfo.buildJobName} />}
</div>

View File

@@ -1,5 +1,4 @@
import { useEffect, useRef, useState } from "react";
import { podLogsSocket } from "@/lib/sockets";
import { Textarea } from "@/components/ui/textarea";
import React from "react";
@@ -13,54 +12,59 @@ export default function LogsStreamed({
buildJobName?: string;
}) {
const [isConnected, setIsConnected] = useState(false);
const [transport, setTransport] = useState("N/A");
const [logs, setLogs] = useState<string>('');
const textAreaRef = useRef<HTMLTextAreaElement>(null);
function onConnect() {
setIsConnected(true);
setTransport(podLogsSocket.io.engine.transport.name);
podLogsSocket.io.engine.on("upgrade", (transport) => {
setTransport(transport.name);
const initializeConnection = async (controller: AbortController) => {
// Initiate the first call to connect to SSE API
setLogs('Loading...');
const signal = controller.signal;
const apiResponse = await fetch('/api/pod-logs', {
method: "POST",
headers: {
"Content-Type": "text/event-stream",
},
body: JSON.stringify({ namespace, podName, buildJobName }),
signal: signal,
});
}
function onDisconnect() {
setIsConnected(false);
setTransport("N/A");
}
if (!apiResponse.ok) return;
if (!apiResponse.body) return;
setIsConnected(true);
const myListener = (e: string) => {
setLogs((prevLogs) => prevLogs + e);
// To decode incoming data as a string
const reader = apiResponse.body
.pipeThrough(new TextDecoderStream())
.getReader();
setLogs('');
while (true) {
const { value, done } = await reader.read();
if (done) {
setIsConnected(false);
break;
}
if (value) {
setLogs((prevLogs) => prevLogs + value);
}
}
}
useEffect(() => {
if (!buildJobName && (!namespace || !podName)) {
return;
}
const streamKey = buildJobName ? buildJobName : `${namespace}_${podName}`;
console.log('Connecting to logs ' + streamKey);
const controller = new AbortController();
initializeConnection(controller);
if (podLogsSocket.connected) {
onConnect();
}
podLogsSocket.emit('joinPodLog', { namespace, podName, buildJobName });
podLogsSocket.on("connect", onConnect);
podLogsSocket.on("disconnect", onDisconnect);
podLogsSocket.on(streamKey, myListener);
return () => {
if (!podName) {
return;
}
console.log('Disconnecting from logs ' + streamKey);
podLogsSocket.emit('leavePodLog', { streamKey: streamKey });
console.log('Disconnecting from logs');
setLogs('');
podLogsSocket.off("connect", onConnect);
podLogsSocket.off("disconnect", onDisconnect);
podLogsSocket.off(streamKey, myListener);
controller.abort();
};
}, [namespace, podName, buildJobName]);
@@ -72,7 +76,9 @@ export default function LogsStreamed({
}, [logs]);
return <>
<Textarea ref={textAreaRef} value={logs} readOnly className="h-[400px] bg-slate-900 text-white" />
<div className="text-sm pl-1">Status: {isConnected ? 'Connected' : 'Disconnected'}</div>
<div className="space-y-4">
<Textarea ref={textAreaRef} value={logs} readOnly className="h-[400px] bg-slate-900 text-white" />
<div className="text-sm pl-1">Status: {isConnected ? 'Connected' : 'Disconnected'}</div>
</div>
</>;
}

View File

@@ -209,6 +209,7 @@ class DeploymentService {
async getDeploymentHistory(projectId: string, appId: string): Promise<DeploymentInfoModel[]> {
const replicasetRevisions = await this.getReplicasetRevisionHistory(projectId, appId);
const builds = await buildService.getBuildsForApp(appId);
// adding running or failed builds as "Deployment" to the list
const runningOrFailedBuilds = builds
.filter((build) => ['RUNNING', 'FAILED', 'UNKNOWN'].includes(build.status))
.map((build) => {
@@ -265,7 +266,7 @@ class DeploymentService {
return {
replicasetName: rs.metadata?.name!,
createdAt: rs.metadata?.creationTimestamp!,
buildJobName: rs.metadata?.annotations?.buildJobName!,
buildJobName: rs.spec?.template?.metadata?.annotations?.buildJobName!,
status: status
}
});

View File

@@ -0,0 +1,33 @@
import k3s from "../adapter/kubernetes-api.adapter";
import { ServiceException } from "@/model/service.exception.model";
class PodService {
async waitUntilPodIsRunningFailedOrSucceded(projectId: string, podName: string) {
const timeout = 120000;
const interval = 1000;
const maxTries = timeout / interval;
let tries = 0;
while (tries < maxTries) {
const pod = await this.getPodOrUndefined(projectId, podName);
if (pod && ['Running', 'Failed', 'Succeeded'].includes(pod.status?.phase!)) {
return;
}
await new Promise(resolve => setTimeout(resolve, interval));
tries++;
}
throw new ServiceException(`Pod ${podName} did not become ready in time (${timeout}ms).`);
}
async getPodOrUndefined(projectId: string, podName: string) {
const res = await k3s.core.readNamespacedPod(podName, projectId);
return res.body;
}
}
const podService = new PodService();
export default podService;

View File

@@ -28,22 +28,7 @@ export async function getAuthUserSession(): Promise<UserSession> {
}
return session;
}
/*
export async function checkIfCurrentUserHasAccessToContract(contractId: string | null | undefined) {
const session = await getLandlordSession();
if (!contractId) {
return { ...session };
}
const currentLandlordIdIfExists = await rentalContractService.getCurrentLandlordIdForContract(contractId);
if (!currentLandlordIdIfExists) {
throw new ServiceException('Objekt nicht gefunden.');
}
if (currentLandlordIdIfExists !== session.landlordId) {
throw new ServiceException('Sie haben keine Berechtigung, dieses Objekt zu bearbeiten.');
}
return { ...session };
}
*/
export async function saveFormAction<ReturnType, TInputData, ZodType extends ZodRawShape>(
inputData: TInputData,
validationModel: ZodObject<ZodType>,
@@ -72,21 +57,6 @@ export async function saveFormAction<ReturnType, TInputData, ZodType extends Zod
}, redirectOnSuccessPath);
}
function convertFormDataToJson(formData: FormData) {
const jsonObject: { [key: string]: any } = {};
formData.forEach((value, key) => {
if (key.startsWith('$ACTION')) {
return;
}
if (value === '') {
jsonObject[key] = null;
} else {
jsonObject[key] = value;
}
});
return jsonObject;
}
export async function simpleAction<ReturnType, ValidationCallbackType>(
func: () => Promise<ReturnType>,
redirectOnSuccessPath?: string) {
@@ -127,4 +97,32 @@ export async function simpleAction<ReturnType, ValidationCallbackType>(
status: 'success',
data: funcResult ?? undefined
} as ServerActionResult<ValidationCallbackType, ReturnType>;
}
export async function simpleRoute<ReturnType>(
func: () => Promise<ReturnType>) {
let funcResult: ReturnType;
try {
funcResult = await func();
} catch (ex) {
if (ex instanceof FormValidationException) {
return {
status: 'error',
message: ex.message
};
} else if (ex instanceof ServiceException) {
return {
status: 'error',
message: ex.message
};
} else {
console.error(ex)
return {
status: 'error',
message: 'An unknown error occurred.'
};
}
}
return funcResult;
}