diff --git a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift index 3ba7d543..8b668db7 100644 --- a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift +++ b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift @@ -102,6 +102,28 @@ struct ImageMetadata: Codable { let timestamp: Date } +// Actor to safely collect disk part information from concurrent tasks +actor DiskPartsCollector { + private var diskParts: [(Int, URL)] = [] + private var partCounter = 0 + + // Adds a part and returns its assigned sequential number + func addPart(url: URL) -> Int { + partCounter += 1 + let partNum = partCounter + diskParts.append((partNum, url)) + return partNum + } + + func getSortedParts() -> [(Int, URL)] { + return diskParts.sorted { $0.0 < $1.0 } + } + + func getTotalParts() -> Int { + return partCounter + } +} + actor ProgressTracker { private var totalBytes: Int64 = 0 private var downloadedBytes: Int64 = 0 @@ -716,8 +738,8 @@ class ImageContainerRegistry: @unchecked Sendable { "[░░░░░░░░░░░░░░░░░░░░] 0% | Initializing downloads... | ETA: calculating... ") fflush(stdout) - var diskParts: [(Int, URL)] = [] - var totalParts = 0 + // Instantiate the collector + let diskPartsCollector = DiskPartsCollector() // Adaptive concurrency based on system capabilities let memoryConstrained = determineIfMemoryConstrained() @@ -742,85 +764,97 @@ class ImageContainerRegistry: @unchecked Sendable { await counter.decrement() } - if let partInfo = extractPartInfo(from: layer.mediaType) { - let (partNum, total) = partInfo - totalParts = total + // Check both media type and safely unwrap part info + if layer.mediaType == "application/octet-stream+lz4" { + let size = layer.size + // Declare cachedLayer and digest here let cachedLayer = getCachedLayerPath( manifestId: manifestId, digest: layer.digest) let digest = layer.digest - let size = layer.size - // For memory-optimized mode - point directly to cache when possible - if memoryConstrained - && FileManager.default.fileExists(atPath: cachedLayer.path) + // For memory-constrained mode - point directly to cache when possible + if memoryConstrained // Line 777 + && FileManager.default.fileExists(atPath: cachedLayer.path) { - // Use the cached file directly - diskParts.append((partNum, cachedLayer)) + // Use the cached file *directly* without copying to temp + // Add the *cached* layer path to the collector + let partNum = await diskPartsCollector.addPart(url: cachedLayer) // Use the collector + Logger.info("Using cached layer directly for part #\(partNum): \(cachedLayer.lastPathComponent)") - // Still need to account for progress - group.addTask { [self] in - await counter.increment() - await downloadProgress.addProgress(Int64(size)) - await counter.decrement() - return Int64(size) - } - continue - } else { - let partURL = tempDownloadDir.appendingPathComponent( - "disk.img.part.\(partNum)") - diskParts.append((partNum, partURL)) - - group.addTask { [self] in - await counter.increment() + // Account for progress directly, no need for a separate task + await downloadProgress.addProgress(Int64(size)) + + // No task was added, so no need to increment/decrement counter here + + continue // Skip the download task group logic below + } else { + // Not memory constrained OR file not cached + // Add a task to handle copy/download and adding to collector + group.addTask { [self] in + await counter.increment() // Increment counter for the task + let finalPath: URL if FileManager.default.fileExists(atPath: cachedLayer.path) { - try FileManager.default.copyItem(at: cachedLayer, to: partURL) - await downloadProgress.addProgress(Int64(size)) - } else { - // Check if this layer is already being downloaded and we're not skipping cache - if isDownloading(digest) { - try await waitForExistingDownload( - digest, cachedLayer: cachedLayer) - if FileManager.default.fileExists(atPath: cachedLayer.path) - { - try FileManager.default.copyItem( - at: cachedLayer, to: partURL) - await downloadProgress.addProgress(Int64(size)) - return Int64(size) - } - } + // If cached, copy to temp and use temp path for reassembly later + let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path + try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL) + await downloadProgress.addProgress(Int64(size)) // Update progress after copy + finalPath = tempPartURL + } else { + // If not cached, download to temp path + let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path + + // Check if this layer is already being downloaded + if isDownloading(digest) { + try await waitForExistingDownload(digest, cachedLayer: cachedLayer) + // If it finished downloading while waiting, copy from cache to temp + if FileManager.default.fileExists(atPath: cachedLayer.path) { + try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL) + await downloadProgress.addProgress(Int64(size)) // Update progress + finalPath = tempPartURL + } else { + // If still not available after waiting (should be rare), proceed to download + markDownloadStarted(digest) + try await self.downloadLayer( + repository: "\(self.organization)/\(imageName)", + digest: digest, + mediaType: layer.mediaType, // Use correct mediaType + token: token, + to: tempPartURL, + maxRetries: 5, + progress: downloadProgress, // Progress updated inside downloadLayer + manifestId: manifestId + ) + // downloadLayer handles caching and markDownloadComplete + finalPath = tempPartURL + } + } else { + // Start new download + markDownloadStarted(digest) + try await self.downloadLayer( + repository: "\(self.organization)/\(imageName)", + digest: digest, + mediaType: layer.mediaType, // Use correct mediaType + token: token, + to: tempPartURL, + maxRetries: 5, + progress: downloadProgress, // Progress updated inside downloadLayer + manifestId: manifestId + ) + // downloadLayer handles caching and markDownloadComplete + finalPath = tempPartURL + } + } + + // Add the final determined path (temp path) to the collector + let partNum = await diskPartsCollector.addPart(url: finalPath) + Logger.info("Assigned part #\(partNum) for path: \(finalPath.lastPathComponent)") - // Start new download - markDownloadStarted(digest) - - try await self.downloadLayer( - repository: "\(self.organization)/\(imageName)", - digest: digest, - mediaType: layer.mediaType, - token: token, - to: partURL, - maxRetries: 5, - progress: downloadProgress, - manifestId: manifestId - ) - - // Cache the downloaded layer if caching is enabled - if cachingEnabled { - if FileManager.default.fileExists(atPath: cachedLayer.path) - { - try FileManager.default.removeItem(at: cachedLayer) - } - try FileManager.default.copyItem( - at: partURL, to: cachedLayer) - } - markDownloadComplete(digest) - } - - await counter.decrement() - return Int64(size) - } - continue + await counter.decrement() // Decrement counter + return Int64(size) + } + continue // Ensure we move to the next layer after adding task } } else { let mediaType = layer.mediaType @@ -894,7 +928,14 @@ class ImageContainerRegistry: @unchecked Sendable { // Wait for remaining tasks for try await _ in group {} - } + } // End TaskGroup + + // --- Safely retrieve parts AFTER TaskGroup --- + let diskParts = await diskPartsCollector.getSortedParts() + let totalParts = await diskPartsCollector.getTotalParts() + Logger.info("Finished processing layers. Found \(totalParts) disk parts.") + // --- End retrieving parts --- + Logger.info("") // New line after progress // Display download statistics @@ -1464,23 +1505,22 @@ class ImageContainerRegistry: @unchecked Sendable { { Logger.info("Copying from cache...") - var diskPartSources: [(Int, URL)] = [] - var totalParts = 0 + // Instantiate collector + let diskPartsCollector = DiskPartsCollector() // First identify disk parts and non-disk files for layer in manifest.layers { let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: layer.digest) - if let partInfo = extractPartInfo(from: layer.mediaType) { - let (partNum, total) = partInfo - totalParts = total - // Just store the reference to source instead of copying - diskPartSources.append((partNum, cachedLayer)) - } else { + // Check if it's a disk chunk layer based on media type + if layer.mediaType == "application/octet-stream+lz4" { + // It's a disk chunk - Add to collector + _ = await diskPartsCollector.addPart(url: cachedLayer) // Ignore return value + } + else { + // Handle non-disk layers (config, nvram) let fileName: String switch layer.mediaType { - case "application/vnd.oci.image.layer.v1.tar", "application/octet-stream+gzip": - fileName = "disk.img" case "application/vnd.oci.image.config.v1+json": fileName = "config.json" case "application/octet-stream": @@ -1496,8 +1536,14 @@ class ImageContainerRegistry: @unchecked Sendable { } } + // --- Safely retrieve parts AFTER loop --- + let diskPartSources = await diskPartsCollector.getSortedParts() + let totalParts = await diskPartsCollector.getTotalParts() + Logger.info("Found \(totalParts) disk parts in cache.") + // --- End retrieving parts --- + // Reassemble disk parts if needed - if !diskPartSources.isEmpty { + if !diskPartSources.isEmpty { // Use the retrieved array // Get the uncompressed size from cached config let configDigest = manifest.config?.digest let cachedConfigPath = @@ -1588,277 +1634,29 @@ class ImageContainerRegistry: @unchecked Sendable { for partNum in 1...totalParts { // Find the original layer info for this part number guard - let layer = manifest.layers.first(where: { layer in - if let info = extractPartInfo(from: layer.mediaType) { - return info.partNum == partNum - } - return false - }), + // Find layer by index approximated during collection, not media type parts let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - let layerMediaType = layer.mediaType // Extract mediaType here Logger.info( - "Processing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent)" + "Decompressing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent) at offset \(currentOffset)..." ) - let inputHandle = try FileHandle(forReadingFrom: sourceURL) - defer { try? inputHandle.close() } - - // Seek to the correct offset in the output sparse file - try outputHandle.seek(toOffset: currentOffset) - - if let decompressCmd = getDecompressionCommand(for: layerMediaType) { // Use extracted mediaType - Logger.info("Decompressing part \(partNum) with media type: \(layerMediaType)") - - // Handle Apple Archive format - let toolPath = String(decompressCmd.dropFirst("apple_archive:".count)) - let tempOutputPath = FileManager.default.temporaryDirectory - .appendingPathComponent(UUID().uuidString) - - // Check input file size before decompression - let inputFileSize = - (try? FileManager.default.attributesOfItem(atPath: sourceURL.path)[.size] - as? UInt64) ?? 0 - Logger.info( - "Part \(partNum) input size: \(ByteCountFormatter.string(fromByteCount: Int64(inputFileSize), countStyle: .file))" - ) - - // Create a process that decompresses to a temporary file - let process = Process() - process.executableURL = URL(fileURLWithPath: toolPath) - process.arguments = [ - "extract", "-i", sourceURL.path, "-o", tempOutputPath.path, - ] - - // Add error output capture - let errorPipe = Pipe() - process.standardError = errorPipe - - Logger.info( - "Decompressing Apple Archive format with: \(toolPath) \(process.arguments?.joined(separator: " ") ?? "")" - ) - try process.run() - process.waitUntilExit() - - // Check error output if any - let errorData = errorPipe.fileHandleForReading.readDataToEndOfFile() - if !errorData.isEmpty, - let errorString = String(data: errorData, encoding: .utf8) - { - Logger.error("Decompression error output: \(errorString)") - } - - if process.terminationStatus != 0 { - Logger.error( - "Apple Archive decompression failed with status: \(process.terminationStatus), falling back to direct copy" - ) - // Fall back to direct copying (uncompressed) - Logger.info("Copying part \(partNum) directly without decompression...") - try outputHandle.seek(toOffset: currentOffset) - - let inputHandle = try FileHandle(forReadingFrom: sourceURL) - defer { try? inputHandle.close() } - - var bytesWritten: UInt64 = 0 - let chunkSize = 1024 * 1024 // 1MB chunks - var chunkCount = 0 - - while true { - let data = autoreleasepool { - try! inputHandle.read(upToCount: chunkSize) ?? Data() - } - if data.isEmpty { break } - - try outputHandle.write(contentsOf: data) - bytesWritten += UInt64(data.count) - chunkCount += 1 - - // Update progress - let totalProgress = - Double(currentOffset + bytesWritten) / Double(expectedTotalSize) - let progressBar = createProgressBar(progress: totalProgress, width: 30) - let progressPercent = Int(totalProgress * 100) - let currentSpeed = - ByteCountFormatter.string( - fromByteCount: Int64(Double(bytesWritten) / 0.5), - countStyle: .file) + "/s" - - print( - "\r\(progressBar) \(progressPercent)% | Speed: \(currentSpeed) | Part \(partNum) | \(ByteCountFormatter.string(fromByteCount: Int64(currentOffset + bytesWritten), countStyle: .file)) ", - terminator: "") - fflush(stdout) - - // Also log to the progress logger for consistency - reassemblyProgressLogger.logProgress( - current: totalProgress, - context: "Direct copying") - } - - Logger.info( - "Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))" - ) - currentOffset += bytesWritten - continue - } - - // Check if the output file exists and has content - let outputExists = FileManager.default.fileExists(atPath: tempOutputPath.path) - let outputFileSize = - outputExists - ? ((try? FileManager.default.attributesOfItem(atPath: tempOutputPath.path)[ - .size] as? UInt64) ?? 0) : 0 - Logger.info( - "Part \(partNum) - Decompressed output exists: \(outputExists), size: \(ByteCountFormatter.string(fromByteCount: Int64(outputFileSize), countStyle: .file))" - ) - - // If decompression produced an empty file, fall back to direct copy - if outputFileSize == 0 { - Logger.info( - "Decompression resulted in empty file, falling back to direct copy for part \(partNum)" - ) - try? FileManager.default.removeItem(at: tempOutputPath) - - // Fall back to direct copying (uncompressed) - Logger.info("Copying part \(partNum) directly without decompression...") - try outputHandle.seek(toOffset: currentOffset) - - let inputHandle = try FileHandle(forReadingFrom: sourceURL) - defer { try? inputHandle.close() } - - var bytesWritten: UInt64 = 0 - let chunkSize = 1024 * 1024 // 1MB chunks - var chunkCount = 0 - - while true { - let data = autoreleasepool { - try! inputHandle.read(upToCount: chunkSize) ?? Data() - } - if data.isEmpty { break } - - try outputHandle.write(contentsOf: data) - bytesWritten += UInt64(data.count) - chunkCount += 1 - - // Update progress - let totalProgress = - Double(currentOffset + bytesWritten) / Double(expectedTotalSize) - let progressBar = createProgressBar(progress: totalProgress, width: 30) - let progressPercent = Int(totalProgress * 100) - let currentSpeed = - ByteCountFormatter.string( - fromByteCount: Int64(Double(bytesWritten) / 0.5), - countStyle: .file) + "/s" - - print( - "\r\(progressBar) \(progressPercent)% | Speed: \(currentSpeed) | Part \(partNum) | \(ByteCountFormatter.string(fromByteCount: Int64(currentOffset + bytesWritten), countStyle: .file)) ", - terminator: "") - fflush(stdout) - - // Also log to the progress logger for consistency - reassemblyProgressLogger.logProgress( - current: totalProgress, - context: "Direct copying") - } - - Logger.info( - "Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))" - ) - currentOffset += bytesWritten - continue - } - - // Read the decompressed file and write to our output - let tempInputHandle = try FileHandle(forReadingFrom: tempOutputPath) - defer { - try? tempInputHandle.close() - try? FileManager.default.removeItem(at: tempOutputPath) - } - - // Read decompressed data in chunks and write to sparse file - var partDecompressedSize: UInt64 = 0 - let chunkSize = 1024 * 1024 // 1MB chunks - var chunkCount = 0 - - while true { - let data = autoreleasepool { // Help manage memory with large files - try! tempInputHandle.read(upToCount: chunkSize) ?? Data() - } - if data.isEmpty { break } // End of stream - - try outputHandle.write(contentsOf: data) - partDecompressedSize += UInt64(data.count) - chunkCount += 1 - - // Update progress based on decompressed size written - let totalProgress = - Double(currentOffset + partDecompressedSize) - / Double(expectedTotalSize) - reassemblyProgressLogger.logProgress( - current: totalProgress, - context: "Reassembling") - } - - Logger.info( - "Part \(partNum) - Wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(partDecompressedSize), countStyle: .file))" - ) - currentOffset += partDecompressedSize // Advance offset by decompressed size - } else { - // No decompression command available, try direct copy - Logger.info( - "Copying part \(partNum) directly..." - ) - try outputHandle.seek(toOffset: currentOffset) - - let inputHandle = try FileHandle(forReadingFrom: sourceURL) - defer { try? inputHandle.close() } - - // Get part size - let partSize = - (try? FileManager.default.attributesOfItem(atPath: sourceURL.path)[.size] - as? UInt64) ?? 0 - Logger.info( - "Direct copy of part \(partNum) with size: \(ByteCountFormatter.string(fromByteCount: Int64(partSize), countStyle: .file))" - ) - - var bytesWritten: UInt64 = 0 - let chunkSize = 1024 * 1024 // 1MB chunks - var chunkCount = 0 - - while true { - let data = autoreleasepool { - try! inputHandle.read(upToCount: chunkSize) ?? Data() - } - if data.isEmpty { break } - - try outputHandle.write(contentsOf: data) - bytesWritten += UInt64(data.count) - chunkCount += 1 - - // Update progress - let totalProgress = - Double(currentOffset + bytesWritten) - / Double(expectedTotalSize) - reassemblyProgressLogger.logProgress( - current: totalProgress, - context: "Direct copying") - } - - Logger.info( - "Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))" - ) - currentOffset += bytesWritten - } - - // Ensure data is written before processing next part (optional but safer) - try outputHandle.synchronize() + // Use the correct sparse decompression function + let decompressedBytesWritten = try decompressChunkAndWriteSparse( + inputPath: sourceURL.path, + outputHandle: outputHandle, + startOffset: currentOffset + ) + currentOffset += decompressedBytesWritten + + try outputHandle.synchronize() // Optional: Synchronize after each chunk } // Finalize progress, close handle (done by defer) reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembly Complete") - Logger.info("") // Newline // Ensure output handle is closed before post-processing try outputHandle.close()