diff --git a/libs/lume/README.md b/libs/lume/README.md index a7305c74..c329cf42 100644 --- a/libs/lume/README.md +++ b/libs/lume/README.md @@ -10,7 +10,6 @@ [![Swift 6](https://img.shields.io/badge/Swift_6-F54A2A?logo=swift&logoColor=white&labelColor=F54A2A)](#) [![macOS](https://img.shields.io/badge/macOS-000000?logo=apple&logoColor=F0F0F0)](#) - [![Homebrew](https://img.shields.io/badge/Homebrew-FBB040?logo=homebrew&logoColor=fff)](#install) [![Discord](https://img.shields.io/badge/Discord-%235865F2.svg?&logo=discord&logoColor=white)](https://discord.com/invite/mVnXXpdE85) diff --git a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift index d2028827..a412d8b1 100644 --- a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift +++ b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift @@ -1,4 +1,5 @@ import ArgumentParser +import Darwin import Foundation import Swift @@ -343,6 +344,13 @@ class ImageContainerRegistry: @unchecked Sendable { let maxConcurrentTasks = 5 let counter = TaskCounter() + // Use a more efficient approach for memory-constrained systems + let memoryConstrained = determineIfMemoryConstrained() + Logger.info( + memoryConstrained + ? "Using memory-optimized mode for disk parts" + : "Using standard mode for disk parts") + try await withThrowingTaskGroup(of: Int64.self) { group in for layer in manifest.layers { if layer.mediaType == "application/vnd.oci.empty.v1+json" { @@ -354,15 +362,96 @@ class ImageContainerRegistry: @unchecked Sendable { await counter.decrement() } - let outputURL: URL if let partInfo = extractPartInfo(from: layer.mediaType) { let (partNum, total) = partInfo totalParts = total - outputURL = tempDownloadDir.appendingPathComponent( - "disk.img.part.\(partNum)") - diskParts.append((partNum, outputURL)) + + let cachedLayer = getCachedLayerPath( + manifestId: manifestId, digest: layer.digest) + let digest = layer.digest + let size = layer.size + + // For memory-optimized mode - point directly to cache when possible + if !noCache && memoryConstrained + && FileManager.default.fileExists(atPath: cachedLayer.path) + { + // Use the cached file directly + diskParts.append((partNum, cachedLayer)) + + // Still need to account for progress + group.addTask { @Sendable [self] in + await counter.increment() + await progress.addProgress(Int64(size)) + await counter.decrement() + return Int64(size) + } + continue + } else { + let partURL = tempDownloadDir.appendingPathComponent( + "disk.img.part.\(partNum)") + diskParts.append((partNum, partURL)) + + group.addTask { @Sendable [self] in + await counter.increment() + + if !noCache + && FileManager.default.fileExists(atPath: cachedLayer.path) + { + try FileManager.default.copyItem(at: cachedLayer, to: partURL) + await progress.addProgress(Int64(size)) + } else { + // Check if this layer is already being downloaded and we're not skipping cache + if !noCache && isDownloading(digest) { + try await waitForExistingDownload( + digest, cachedLayer: cachedLayer) + if FileManager.default.fileExists(atPath: cachedLayer.path) + { + try FileManager.default.copyItem( + at: cachedLayer, to: partURL) + await progress.addProgress(Int64(size)) + return Int64(size) + } + } + + // Start new download + if !noCache { + markDownloadStarted(digest) + } + + try await self.downloadLayer( + repository: "\(self.organization)/\(imageName)", + digest: digest, + mediaType: layer.mediaType, + token: token, + to: partURL, + maxRetries: 5, + progress: progress + ) + + // Cache the downloaded layer if not in noCache mode + if !noCache { + if FileManager.default.fileExists(atPath: cachedLayer.path) + { + try FileManager.default.removeItem(at: cachedLayer) + } + try FileManager.default.copyItem( + at: partURL, to: cachedLayer) + markDownloadComplete(digest) + } + } + + await counter.decrement() + return Int64(size) + } + continue + } } else { - switch layer.mediaType { + let mediaType = layer.mediaType + let digest = layer.digest + let size = layer.size + + let outputURL: URL + switch mediaType { case "application/vnd.oci.image.layer.v1.tar": outputURL = tempDownloadDir.appendingPathComponent("disk.img") case "application/vnd.oci.image.config.v1+json": @@ -372,55 +461,58 @@ class ImageContainerRegistry: @unchecked Sendable { default: continue } - } - group.addTask { @Sendable [self] in - await counter.increment() + group.addTask { @Sendable [self] in + await counter.increment() - let cachedLayer = getCachedLayerPath( - manifestId: manifestId, digest: layer.digest) + let cachedLayer = getCachedLayerPath( + manifestId: manifestId, digest: digest) - if !noCache && FileManager.default.fileExists(atPath: cachedLayer.path) { - try FileManager.default.copyItem(at: cachedLayer, to: outputURL) - await progress.addProgress(Int64(layer.size)) - } else { - // Check if this layer is already being downloaded and we're not skipping cache - if !noCache && isDownloading(layer.digest) { - try await waitForExistingDownload( - layer.digest, cachedLayer: cachedLayer) - if FileManager.default.fileExists(atPath: cachedLayer.path) { - try FileManager.default.copyItem(at: cachedLayer, to: outputURL) - await progress.addProgress(Int64(layer.size)) - return Int64(layer.size) + if !noCache && FileManager.default.fileExists(atPath: cachedLayer.path) + { + try FileManager.default.copyItem(at: cachedLayer, to: outputURL) + await progress.addProgress(Int64(size)) + } else { + // Check if this layer is already being downloaded and we're not skipping cache + if !noCache && isDownloading(digest) { + try await waitForExistingDownload( + digest, cachedLayer: cachedLayer) + if FileManager.default.fileExists(atPath: cachedLayer.path) { + try FileManager.default.copyItem( + at: cachedLayer, to: outputURL) + await progress.addProgress(Int64(size)) + return Int64(size) + } + } + + // Start new download + if !noCache { + markDownloadStarted(digest) + } + + try await self.downloadLayer( + repository: "\(self.organization)/\(imageName)", + digest: digest, + mediaType: mediaType, + token: token, + to: outputURL, + maxRetries: 5, + progress: progress + ) + + // Cache the downloaded layer if not in noCache mode + if !noCache { + if FileManager.default.fileExists(atPath: cachedLayer.path) { + try FileManager.default.removeItem(at: cachedLayer) + } + try FileManager.default.copyItem(at: outputURL, to: cachedLayer) + markDownloadComplete(digest) } } - // Start new download - if !noCache { - markDownloadStarted(layer.digest) - } - - try await self.downloadLayer( - repository: "\(self.organization)/\(imageName)", - digest: layer.digest, - mediaType: layer.mediaType, - token: token, - to: outputURL, - maxRetries: 5, - progress: progress - ) - - // Cache the downloaded layer if not in noCache mode - if !noCache { - if FileManager.default.fileExists(atPath: cachedLayer.path) { - try FileManager.default.removeItem(at: cachedLayer) - } - try FileManager.default.copyItem(at: outputURL, to: cachedLayer) - markDownloadComplete(layer.digest) - } + await counter.decrement() + return Int64(size) } - - return Int64(layer.size) } } @@ -455,18 +547,44 @@ class ImageContainerRegistry: @unchecked Sendable { let inputHandle = try FileHandle(forReadingFrom: partURL) defer { try? inputHandle.close() - try? FileManager.default.removeItem(at: partURL) + // Don't delete the part file if we're in cache mode and the part is from cache + if noCache || !partURL.path.contains(cacheDirectory.path) { + try? FileManager.default.removeItem(at: partURL) + } } - // Read and write in chunks to minimize memory usage - let chunkSize = 10 * 1024 * 1024 // 10MB chunks - while let chunk = try inputHandle.read(upToCount: chunkSize) { - try outputHandle.write(contentsOf: chunk) - totalWritten += UInt64(chunk.count) - let progress: Double = - Double(totalWritten) / Double(expectedTotalSize) * 100 - Logger.info("Reassembling disk image: \(Int(progress))%") + // On low memory systems, be more aggressive with releasing memory + let memoryConstrained = determineIfMemoryConstrained() + var chunksProcessed = 0 + + while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) { + try autoreleasepool { + try outputHandle.write(contentsOf: data) + totalWritten += UInt64(data.count) + + // Only log progress every 5% to reduce log noise + let progress: Double = + Double(totalWritten) / Double(expectedTotalSize) * 100 + let roundedProgress = Int(progress / 5) * 5 + if roundedProgress != Int( + (Double(totalWritten - UInt64(data.count)) + / Double(expectedTotalSize) * 100) + / 5) * 5 + { + Logger.info("Reassembling disk image: \(roundedProgress)%") + } + + // Force more frequent autoreleases on memory-constrained systems + chunksProcessed += 1 + if memoryConstrained && chunksProcessed % 10 == 0 { + try outputHandle.synchronize() + } + } } + + // Make sure we explicitly close handles after each part to free resources + try? inputHandle.synchronize() + try inputHandle.close() } // Verify final size @@ -526,19 +644,19 @@ class ImageContainerRegistry: @unchecked Sendable { async throws { Logger.info("Copying from cache...") - var diskParts: [(Int, URL)] = [] + var diskPartSources: [(Int, URL)] = [] var totalParts = 0 var expectedTotalSize: UInt64 = 0 + // First identify disk parts and non-disk files for layer in manifest.layers { let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: layer.digest) if let partInfo = extractPartInfo(from: layer.mediaType) { let (partNum, total) = partInfo totalParts = total - let partURL = destination.appendingPathComponent("disk.img.part.\(partNum)") - try FileManager.default.copyItem(at: cachedLayer, to: partURL) - diskParts.append((partNum, partURL)) + // Just store the reference to source instead of copying + diskPartSources.append((partNum, cachedLayer)) expectedTotalSize += UInt64(layer.size) } else { let fileName: String @@ -552,6 +670,7 @@ class ImageContainerRegistry: @unchecked Sendable { default: continue } + // Only non-disk files are copied try FileManager.default.copyItem( at: cachedLayer, to: destination.appendingPathComponent(fileName) @@ -560,8 +679,8 @@ class ImageContainerRegistry: @unchecked Sendable { } // Reassemble disk parts if needed - if !diskParts.isEmpty { - Logger.info("Reassembling disk image from cached parts...") + if !diskPartSources.isEmpty { + Logger.info("Reassembling disk image from cached parts (optimized storage)...") let outputURL = destination.appendingPathComponent("disk.img") FileManager.default.createFile(atPath: outputURL.path, contents: nil) let outputHandle = try FileHandle(forWritingTo: outputURL) @@ -569,18 +688,48 @@ class ImageContainerRegistry: @unchecked Sendable { var totalWritten: UInt64 = 0 + // Process parts in order, reading directly from cache for partNum in 1...totalParts { - guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else { + guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - let inputHandle = try FileHandle(forReadingFrom: partURL) - while let data = try inputHandle.read(upToCount: 1024 * 1024 * 10) { - try outputHandle.write(contentsOf: data) - totalWritten += UInt64(data.count) + // Read directly from the cached part + let inputHandle = try FileHandle(forReadingFrom: sourceURL) + defer { try? inputHandle.close() } + + // On low memory systems, be more aggressive with releasing memory + let memoryConstrained = determineIfMemoryConstrained() + var chunksProcessed = 0 + + while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) { + try autoreleasepool { + try outputHandle.write(contentsOf: data) + totalWritten += UInt64(data.count) + + // Only log progress every 5% to reduce log noise + let progress: Double = + Double(totalWritten) / Double(expectedTotalSize) * 100 + let roundedProgress = Int(progress / 5) * 5 + if roundedProgress != Int( + (Double(totalWritten - UInt64(data.count)) / Double(expectedTotalSize) + * 100) + / 5) * 5 + { + Logger.info("Reassembling disk image from cache: \(roundedProgress)%") + } + + // Force more frequent autoreleases on memory-constrained systems + chunksProcessed += 1 + if memoryConstrained && chunksProcessed % 10 == 0 { + try outputHandle.synchronize() + } + } } + + // Make sure we explicitly close handles after each part to free resources + try? inputHandle.synchronize() try inputHandle.close() - try FileManager.default.removeItem(at: partURL) } // Verify final size @@ -716,16 +865,18 @@ class ImageContainerRegistry: @unchecked Sendable { // Create the output file FileManager.default.createFile(atPath: destination.path, contents: nil) - // Process in 10MB chunks - let chunkSize = 10 * 1024 * 1024 + // Process with optimal chunk size + let chunkSize = getOptimalChunkSize() while let chunk = try inputHandle.read(upToCount: chunkSize) { - try inputPipe.fileHandleForWriting.write(contentsOf: chunk) + try autoreleasepool { + try inputPipe.fileHandleForWriting.write(contentsOf: chunk) - // Read and write output in chunks as well - while let decompressedChunk = try outputPipe.fileHandleForReading.read( - upToCount: chunkSize) - { - try outputHandle.write(contentsOf: decompressedChunk) + // Read and write output in chunks as well + while let decompressedChunk = try outputPipe.fileHandleForReading.read( + upToCount: chunkSize) + { + try outputHandle.write(contentsOf: decompressedChunk) + } } } @@ -734,7 +885,9 @@ class ImageContainerRegistry: @unchecked Sendable { // Read any remaining output while let decompressedChunk = try outputPipe.fileHandleForReading.read(upToCount: chunkSize) { - try outputHandle.write(contentsOf: decompressedChunk) + try autoreleasepool { + try outputHandle.write(contentsOf: decompressedChunk) + } } process.waitUntilExit() @@ -890,4 +1043,69 @@ class ImageContainerRegistry: @unchecked Sendable { let repoTags = try JSONDecoder().decode(RepositoryTags.self, from: data) return repoTags.tags } + + // Determine appropriate chunk size based on available system memory on macOS + private func getOptimalChunkSize() -> Int { + // Try to get system memory info + var stats = vm_statistics64_data_t() + var size = mach_msg_type_number_t( + MemoryLayout.size / MemoryLayout.size) + let hostPort = mach_host_self() + + let result = withUnsafeMutablePointer(to: &stats) { statsPtr in + statsPtr.withMemoryRebound(to: integer_t.self, capacity: Int(size)) { ptr in + host_statistics64(hostPort, HOST_VM_INFO64, ptr, &size) + } + } + + // Default to 512KB as a safe minimum + let defaultChunkSize = 512 * 1024 + + // If we can't get memory info, return conservative default + guard result == KERN_SUCCESS else { + return defaultChunkSize + } + + // Calculate free memory in bytes using a fixed page size + // Standard page size on macOS is 4KB or 16KB + let pageSize = 4096 // Use a constant instead of vm_kernel_page_size + let freeMemory = UInt64(stats.free_count) * UInt64(pageSize) + + // On very memory-constrained systems (< 1GB free), use the minimum + if freeMemory < 1_073_741_824 { // 1GB + return defaultChunkSize + } + + // For systems with adequate memory, use a smarter sizing approach: + // - Use 0.1% of free memory, with limits + let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), 2 * 1024 * 1024) + return adaptiveSize + } + + // Check if system is memory constrained for more aggressive memory management + private func determineIfMemoryConstrained() -> Bool { + var stats = vm_statistics64_data_t() + var size = mach_msg_type_number_t( + MemoryLayout.size / MemoryLayout.size) + let hostPort = mach_host_self() + + let result = withUnsafeMutablePointer(to: &stats) { statsPtr in + statsPtr.withMemoryRebound(to: integer_t.self, capacity: Int(size)) { ptr in + host_statistics64(hostPort, HOST_VM_INFO64, ptr, &size) + } + } + + guard result == KERN_SUCCESS else { + // If we can't determine, assume constrained for safety + return true + } + + // Calculate free memory in bytes using a fixed page size + // Standard page size on macOS is 4KB or 16KB + let pageSize = 4096 // Use a constant instead of vm_kernel_page_size + let freeMemory = UInt64(stats.free_count) * UInt64(pageSize) + + // Consider memory constrained if less than 2GB free + return freeMemory < 2_147_483_648 // 2GB + } }