From 18a787c35a6863a6b713459780852e47f525b34f Mon Sep 17 00:00:00 2001 From: f-trycua Date: Fri, 11 Apr 2025 09:34:30 +0200 Subject: [PATCH] Fix reassemble kill --- .../ImageContainerRegistry.swift | 299 ++++++++++-------- libs/lume/src/Errors/Errors.swift | 3 + 2 files changed, 178 insertions(+), 124 deletions(-) diff --git a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift index 83d1d996..25271641 100644 --- a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift +++ b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift @@ -346,10 +346,12 @@ class ImageContainerRegistry: @unchecked Sendable { // Use a more efficient approach for memory-constrained systems let memoryConstrained = determineIfMemoryConstrained() + // Adjust concurrency based on memory constraints + let actualMaxConcurrentTasks = memoryConstrained ? 2 : maxConcurrentTasks Logger.info( memoryConstrained - ? "Using memory-optimized mode for disk parts" - : "Using standard mode for disk parts") + ? "Using memory-optimized mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))" + : "Using standard mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))") try await withThrowingTaskGroup(of: Int64.self) { group in for layer in manifest.layers { @@ -357,7 +359,7 @@ class ImageContainerRegistry: @unchecked Sendable { continue } - while await counter.current() >= maxConcurrentTasks { + while await counter.current() >= actualMaxConcurrentTasks { // Use adjusted concurrency _ = try await group.next() await counter.decrement() } @@ -523,80 +525,92 @@ class ImageContainerRegistry: @unchecked Sendable { // Handle disk parts if present if !diskParts.isEmpty { - Logger.info("Reassembling disk image...") + Logger.info("Reassembling disk image using external 'cat' command...") let outputURL = tempVMDir.appendingPathComponent("disk.img") try FileManager.default.createDirectory( at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true) - // Create empty output file + // Ensure the output file exists but is empty + if FileManager.default.fileExists(atPath: outputURL.path) { + try FileManager.default.removeItem(at: outputURL) + } FileManager.default.createFile(atPath: outputURL.path, contents: nil) - let outputHandle = try FileHandle(forWritingTo: outputURL) - defer { try? outputHandle.close() } - var totalWritten: UInt64 = 0 + // CORRECT LOGIC for new pull: Calculate expected size from the manifest layers let expectedTotalSize = UInt64( - manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0) - { $0 + $1.size }) + manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0) { $0 + $1.size } + ) + Logger.info( + "Expected final size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" + ) - // Process parts in order + // Prepare arguments for cat + var catArgs: [String] = [] + var partURLsToDelete: [URL] = [] // Keep track of non-cached parts to delete for partNum in 1...totalParts { guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - - let inputHandle = try FileHandle(forReadingFrom: partURL) - defer { - try? inputHandle.close() - // Don't delete the part file if we're in cache mode and the part is from cache - if noCache || !partURL.path.contains(cacheDirectory.path) { - try? FileManager.default.removeItem(at: partURL) - } - } - - // On low memory systems, be more aggressive with releasing memory - let memoryConstrained = determineIfMemoryConstrained() - var chunksProcessed = 0 - - while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) { - try autoreleasepool { - try outputHandle.write(contentsOf: data) - totalWritten += UInt64(data.count) - - // Only log progress every 5% to reduce log noise - let progress: Double = - Double(totalWritten) / Double(expectedTotalSize) * 100 - let roundedProgress = Int(progress / 5) * 5 - if roundedProgress != Int( - (Double(totalWritten - UInt64(data.count)) - / Double(expectedTotalSize) * 100) - / 5) * 5 - { - Logger.info("Reassembling disk image: \(roundedProgress)%") - } - - // Force more frequent autoreleases on memory-constrained systems - chunksProcessed += 1 - if memoryConstrained && chunksProcessed % 10 == 0 { - try outputHandle.synchronize() - } - } - } - - // Make sure we explicitly close handles after each part to free resources - try? inputHandle.synchronize() - try inputHandle.close() + catArgs.append(partURL.path) + // Only mark for deletion if it's not in the main cache dir + if noCache || !partURL.path.contains(cacheDirectory.path) { + partURLsToDelete.append(partURL) + } } - // Verify final size - let finalSize = - try FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] - as? UInt64 ?? 0 + // Execute cat command asynchronously + let process = Process() + process.executableURL = URL(fileURLWithPath: "/bin/cat") + process.arguments = catArgs + + // Redirect output to the final disk image file + let outputFileHandle = try FileHandle(forWritingTo: outputURL) + process.standardOutput = outputFileHandle + + try process.run() + + // Monitor progress by checking file size + var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var + var currentSize: UInt64 = 0 + while process.isRunning { + // Get current file size + currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize + + // Calculate and log progress + if expectedTotalSize > 0 { + let progressValue = Double(currentSize) / Double(expectedTotalSize) + reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image") + } + + // Wait a bit before checking again + try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds + } + // Log 100% completion explicitly + if expectedTotalSize > 0 { + reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image") + } + Logger.info("") // Newline after progress + + // Ensure handle is closed AFTER process finishes + try? outputFileHandle.close() + + // Check termination status AFTER process finishes + let terminationStatus = process.terminationStatus + + // Clean up temporary part files + for partURL in partURLsToDelete { + try? FileManager.default.removeItem(at: partURL) + } + + guard terminationStatus == 0 else { + throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)") + } + + // Verify final size (already done after loop essentially, but good to check) + let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0 Logger.info( "Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))" ) - Logger.info( - "Expected size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" - ) if finalSize != expectedTotalSize { Logger.info( @@ -604,7 +618,7 @@ class ImageContainerRegistry: @unchecked Sendable { ) } - Logger.info("Disk image reassembled successfully") + Logger.info("Disk image reassembled successfully using 'cat'") } else { // Copy single disk image if it exists let diskURL = tempDownloadDir.appendingPathComponent("disk.img") @@ -683,74 +697,99 @@ class ImageContainerRegistry: @unchecked Sendable { // Reassemble disk parts if needed if !diskPartSources.isEmpty { - Logger.info("Reassembling disk image from cached parts (optimized storage)...") + Logger.info( + "Reassembling disk image from cached parts using external 'cat' command (optimized storage)..." + ) let outputURL = destination.appendingPathComponent("disk.img") + + // Ensure the output file exists but is empty + if FileManager.default.fileExists(atPath: outputURL.path) { + try FileManager.default.removeItem(at: outputURL) + } FileManager.default.createFile(atPath: outputURL.path, contents: nil) - let outputHandle = try FileHandle(forWritingTo: outputURL) - defer { try? outputHandle.close() } - var totalWritten: UInt64 = 0 + // Explicitly type accumulator (acc: UInt64) and element in the closure + let expectedTotalSize: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in + let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0 + return acc + fileSize + } + Logger.info( + "Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" + ) - // Process parts in order, reading directly from cache + + // Prepare arguments for cat, reading directly from cache + var catArgs: [String] = [] for partNum in 1...totalParts { guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - - // Read directly from the cached part - let inputHandle = try FileHandle(forReadingFrom: sourceURL) - defer { try? inputHandle.close() } - - // On low memory systems, be more aggressive with releasing memory - let memoryConstrained = determineIfMemoryConstrained() - var chunksProcessed = 0 - - while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) { - try autoreleasepool { - try outputHandle.write(contentsOf: data) - totalWritten += UInt64(data.count) - - // Only log progress every 5% to reduce log noise - let progress: Double = - Double(totalWritten) / Double(expectedTotalSize) * 100 - let roundedProgress = Int(progress / 5) * 5 - if roundedProgress != Int( - (Double(totalWritten - UInt64(data.count)) / Double(expectedTotalSize) - * 100) - / 5) * 5 - { - Logger.info("Reassembling disk image from cache: \(roundedProgress)%") - } - - // Force more frequent autoreleases on memory-constrained systems - chunksProcessed += 1 - if memoryConstrained && chunksProcessed % 10 == 0 { - try outputHandle.synchronize() - } - } - } - - // Make sure we explicitly close handles after each part to free resources - try? inputHandle.synchronize() - try inputHandle.close() + catArgs.append(sourceURL.path) } - // Verify final size - let finalSize = - try FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64 - ?? 0 + // Execute cat command asynchronously + let process = Process() + process.executableURL = URL(fileURLWithPath: "/bin/cat") + process.arguments = catArgs + + // Redirect output to the final disk image file + let outputFileHandle = try FileHandle(forWritingTo: outputURL) + process.standardOutput = outputFileHandle + + try process.run() + + // Monitor progress by checking file size + var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var + var currentSize: UInt64 = 0 + while process.isRunning { + // Get current file size + currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize + + // Calculate and log progress + if expectedTotalSize > 0 { + let progressValue = Double(currentSize) / Double(expectedTotalSize) + reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image from cache") + } + + // Wait a bit before checking again + try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds + } + // Log 100% completion explicitly + if expectedTotalSize > 0 { + reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache") + } + Logger.info("") // Newline after progress + + // Ensure handle is closed AFTER process finishes + try? outputFileHandle.close() + + // Check termination status AFTER process finishes + let terminationStatus = process.terminationStatus + + guard terminationStatus == 0 else { + throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)") + } + + + // Verify final size (already done after loop essentially, but good to check) + let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0 Logger.info( - "Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))" - ) - Logger.info( - "Expected size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" + "Final disk image size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))" ) if finalSize != expectedTotalSize { + // Calculate expected size again for the warning message consistency + // Explicitly type accumulator (acc: UInt64) and element in the closure + let expectedSizeForWarning: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in + let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0 + return acc + fileSize + } Logger.info( - "Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedTotalSize) bytes)" + "Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedSizeForWarning) bytes)" ) } + + Logger.info("Disk image reassembled successfully from cache using 'cat'") } Logger.info("Cache copy complete") @@ -1061,27 +1100,39 @@ class ImageContainerRegistry: @unchecked Sendable { } } - // Default to 512KB as a safe minimum - let defaultChunkSize = 512 * 1024 + // Define chunk size parameters + let safeMinimumChunkSize = 128 * 1024 // Reduced minimum for constrained systems + let defaultChunkSize = 512 * 1024 // Standard default / minimum for non-constrained + let constrainedCap = 512 * 1024 // Lower cap for constrained systems + let standardCap = 2 * 1024 * 1024 // Standard cap for non-constrained systems - // If we can't get memory info, return conservative default + // If we can't get memory info, return a reasonable default guard result == KERN_SUCCESS else { + Logger.info("Could not get VM statistics, using default chunk size: \(defaultChunkSize) bytes") return defaultChunkSize } - // Calculate free memory in bytes using a fixed page size - // Standard page size on macOS is 4KB or 16KB - let pageSize = 4096 // Use a constant instead of vm_kernel_page_size + // Calculate free memory in bytes + let pageSize = 4096 // Use a constant page size assumption let freeMemory = UInt64(stats.free_count) * UInt64(pageSize) + let isConstrained = determineIfMemoryConstrained() // Check if generally constrained - // On very memory-constrained systems (< 1GB free), use the minimum - if freeMemory < 1_073_741_824 { // 1GB - return defaultChunkSize + // Extremely constrained (< 512MB free) -> use absolute minimum + if freeMemory < 536_870_912 { // 512MB + Logger.info("System extremely memory constrained (<512MB free), using minimum chunk size: \(safeMinimumChunkSize) bytes") + return safeMinimumChunkSize } - // For systems with adequate memory, use a smarter sizing approach: - // - Use 0.1% of free memory, with limits - let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), 2 * 1024 * 1024) + // Generally constrained -> use adaptive size with lower cap + if isConstrained { + let adaptiveSize = min(max(Int(freeMemory / 1000), safeMinimumChunkSize), constrainedCap) + Logger.info("System memory constrained, using adaptive chunk size capped at \(constrainedCap) bytes: \(adaptiveSize) bytes") + return adaptiveSize + } + + // Not constrained -> use original adaptive logic with standard cap + let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), standardCap) + Logger.info("System has sufficient memory, using adaptive chunk size capped at \(standardCap) bytes: \(adaptiveSize) bytes") return adaptiveSize } diff --git a/libs/lume/src/Errors/Errors.swift b/libs/lume/src/Errors/Errors.swift index d52a8e47..91b04950 100644 --- a/libs/lume/src/Errors/Errors.swift +++ b/libs/lume/src/Errors/Errors.swift @@ -27,6 +27,7 @@ enum PullError: Error, LocalizedError { case layerDownloadFailed(String) case missingPart(Int) case decompressionFailed(String) + case reassemblyFailed(String) var errorDescription: String? { switch self { @@ -42,6 +43,8 @@ enum PullError: Error, LocalizedError { return "Missing disk image part \(number)" case .decompressionFailed(let filename): return "Failed to decompress file: \(filename)" + case .reassemblyFailed(let reason): + return "Disk image reassembly failed: \(reason)." } } }