This commit is contained in:
f-trycua
2025-04-19 23:04:40 -07:00
committed by Dillon DuPont
parent 48d495490b
commit d6d36ecb94

View File

@@ -102,6 +102,28 @@ struct ImageMetadata: Codable {
let timestamp: Date
}
// Actor to safely collect disk part information from concurrent tasks
actor DiskPartsCollector {
private var diskParts: [(Int, URL)] = []
private var partCounter = 0
// Adds a part and returns its assigned sequential number
func addPart(url: URL) -> Int {
partCounter += 1
let partNum = partCounter
diskParts.append((partNum, url))
return partNum
}
func getSortedParts() -> [(Int, URL)] {
return diskParts.sorted { $0.0 < $1.0 }
}
func getTotalParts() -> Int {
return partCounter
}
}
actor ProgressTracker {
private var totalBytes: Int64 = 0
private var downloadedBytes: Int64 = 0
@@ -716,8 +738,8 @@ class ImageContainerRegistry: @unchecked Sendable {
"[░░░░░░░░░░░░░░░░░░░░] 0% | Initializing downloads... | ETA: calculating... ")
fflush(stdout)
var diskParts: [(Int, URL)] = []
var totalParts = 0
// Instantiate the collector
let diskPartsCollector = DiskPartsCollector()
// Adaptive concurrency based on system capabilities
let memoryConstrained = determineIfMemoryConstrained()
@@ -742,85 +764,97 @@ class ImageContainerRegistry: @unchecked Sendable {
await counter.decrement()
}
if let partInfo = extractPartInfo(from: layer.mediaType) {
let (partNum, total) = partInfo
totalParts = total
// Check both media type and safely unwrap part info
if layer.mediaType == "application/octet-stream+lz4" {
let size = layer.size
// Declare cachedLayer and digest here
let cachedLayer = getCachedLayerPath(
manifestId: manifestId, digest: layer.digest)
let digest = layer.digest
let size = layer.size
// For memory-optimized mode - point directly to cache when possible
if memoryConstrained
&& FileManager.default.fileExists(atPath: cachedLayer.path)
// For memory-constrained mode - point directly to cache when possible
if memoryConstrained // Line 777
&& FileManager.default.fileExists(atPath: cachedLayer.path)
{
// Use the cached file directly
diskParts.append((partNum, cachedLayer))
// Use the cached file *directly* without copying to temp
// Add the *cached* layer path to the collector
let partNum = await diskPartsCollector.addPart(url: cachedLayer) // Use the collector
Logger.info("Using cached layer directly for part #\(partNum): \(cachedLayer.lastPathComponent)")
// Still need to account for progress
group.addTask { [self] in
await counter.increment()
await downloadProgress.addProgress(Int64(size))
await counter.decrement()
return Int64(size)
}
continue
} else {
let partURL = tempDownloadDir.appendingPathComponent(
"disk.img.part.\(partNum)")
diskParts.append((partNum, partURL))
group.addTask { [self] in
await counter.increment()
// Account for progress directly, no need for a separate task
await downloadProgress.addProgress(Int64(size))
// No task was added, so no need to increment/decrement counter here
continue // Skip the download task group logic below
} else {
// Not memory constrained OR file not cached
// Add a task to handle copy/download and adding to collector
group.addTask { [self] in
await counter.increment() // Increment counter for the task
let finalPath: URL
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(at: cachedLayer, to: partURL)
await downloadProgress.addProgress(Int64(size))
} else {
// Check if this layer is already being downloaded and we're not skipping cache
if isDownloading(digest) {
try await waitForExistingDownload(
digest, cachedLayer: cachedLayer)
if FileManager.default.fileExists(atPath: cachedLayer.path)
{
try FileManager.default.copyItem(
at: cachedLayer, to: partURL)
await downloadProgress.addProgress(Int64(size))
return Int64(size)
}
}
// If cached, copy to temp and use temp path for reassembly later
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size)) // Update progress after copy
finalPath = tempPartURL
} else {
// If not cached, download to temp path
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path
// Check if this layer is already being downloaded
if isDownloading(digest) {
try await waitForExistingDownload(digest, cachedLayer: cachedLayer)
// If it finished downloading while waiting, copy from cache to temp
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size)) // Update progress
finalPath = tempPartURL
} else {
// If still not available after waiting (should be rare), proceed to download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: layer.mediaType, // Use correct mediaType
token: token,
to: tempPartURL,
maxRetries: 5,
progress: downloadProgress, // Progress updated inside downloadLayer
manifestId: manifestId
)
// downloadLayer handles caching and markDownloadComplete
finalPath = tempPartURL
}
} else {
// Start new download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: layer.mediaType, // Use correct mediaType
token: token,
to: tempPartURL,
maxRetries: 5,
progress: downloadProgress, // Progress updated inside downloadLayer
manifestId: manifestId
)
// downloadLayer handles caching and markDownloadComplete
finalPath = tempPartURL
}
}
// Add the final determined path (temp path) to the collector
let partNum = await diskPartsCollector.addPart(url: finalPath)
Logger.info("Assigned part #\(partNum) for path: \(finalPath.lastPathComponent)")
// Start new download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: layer.mediaType,
token: token,
to: partURL,
maxRetries: 5,
progress: downloadProgress,
manifestId: manifestId
)
// Cache the downloaded layer if caching is enabled
if cachingEnabled {
if FileManager.default.fileExists(atPath: cachedLayer.path)
{
try FileManager.default.removeItem(at: cachedLayer)
}
try FileManager.default.copyItem(
at: partURL, to: cachedLayer)
}
markDownloadComplete(digest)
}
await counter.decrement()
return Int64(size)
}
continue
await counter.decrement() // Decrement counter
return Int64(size)
}
continue // Ensure we move to the next layer after adding task
}
} else {
let mediaType = layer.mediaType
@@ -894,7 +928,14 @@ class ImageContainerRegistry: @unchecked Sendable {
// Wait for remaining tasks
for try await _ in group {}
}
} // End TaskGroup
// --- Safely retrieve parts AFTER TaskGroup ---
let diskParts = await diskPartsCollector.getSortedParts()
let totalParts = await diskPartsCollector.getTotalParts()
Logger.info("Finished processing layers. Found \(totalParts) disk parts.")
// --- End retrieving parts ---
Logger.info("") // New line after progress
// Display download statistics
@@ -1464,23 +1505,22 @@ class ImageContainerRegistry: @unchecked Sendable {
{
Logger.info("Copying from cache...")
var diskPartSources: [(Int, URL)] = []
var totalParts = 0
// Instantiate collector
let diskPartsCollector = DiskPartsCollector()
// First identify disk parts and non-disk files
for layer in manifest.layers {
let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: layer.digest)
if let partInfo = extractPartInfo(from: layer.mediaType) {
let (partNum, total) = partInfo
totalParts = total
// Just store the reference to source instead of copying
diskPartSources.append((partNum, cachedLayer))
} else {
// Check if it's a disk chunk layer based on media type
if layer.mediaType == "application/octet-stream+lz4" {
// It's a disk chunk - Add to collector
_ = await diskPartsCollector.addPart(url: cachedLayer) // Ignore return value
}
else {
// Handle non-disk layers (config, nvram)
let fileName: String
switch layer.mediaType {
case "application/vnd.oci.image.layer.v1.tar", "application/octet-stream+gzip":
fileName = "disk.img"
case "application/vnd.oci.image.config.v1+json":
fileName = "config.json"
case "application/octet-stream":
@@ -1496,8 +1536,14 @@ class ImageContainerRegistry: @unchecked Sendable {
}
}
// --- Safely retrieve parts AFTER loop ---
let diskPartSources = await diskPartsCollector.getSortedParts()
let totalParts = await diskPartsCollector.getTotalParts()
Logger.info("Found \(totalParts) disk parts in cache.")
// --- End retrieving parts ---
// Reassemble disk parts if needed
if !diskPartSources.isEmpty {
if !diskPartSources.isEmpty { // Use the retrieved array
// Get the uncompressed size from cached config
let configDigest = manifest.config?.digest
let cachedConfigPath =
@@ -1588,277 +1634,29 @@ class ImageContainerRegistry: @unchecked Sendable {
for partNum in 1...totalParts {
// Find the original layer info for this part number
guard
let layer = manifest.layers.first(where: { layer in
if let info = extractPartInfo(from: layer.mediaType) {
return info.partNum == partNum
}
return false
}),
// Find layer by index approximated during collection, not media type parts
let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum })
else {
throw PullError.missingPart(partNum)
}
let layerMediaType = layer.mediaType // Extract mediaType here
Logger.info(
"Processing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent)"
"Decompressing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent) at offset \(currentOffset)..."
)
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
defer { try? inputHandle.close() }
// Seek to the correct offset in the output sparse file
try outputHandle.seek(toOffset: currentOffset)
if let decompressCmd = getDecompressionCommand(for: layerMediaType) { // Use extracted mediaType
Logger.info("Decompressing part \(partNum) with media type: \(layerMediaType)")
// Handle Apple Archive format
let toolPath = String(decompressCmd.dropFirst("apple_archive:".count))
let tempOutputPath = FileManager.default.temporaryDirectory
.appendingPathComponent(UUID().uuidString)
// Check input file size before decompression
let inputFileSize =
(try? FileManager.default.attributesOfItem(atPath: sourceURL.path)[.size]
as? UInt64) ?? 0
Logger.info(
"Part \(partNum) input size: \(ByteCountFormatter.string(fromByteCount: Int64(inputFileSize), countStyle: .file))"
)
// Create a process that decompresses to a temporary file
let process = Process()
process.executableURL = URL(fileURLWithPath: toolPath)
process.arguments = [
"extract", "-i", sourceURL.path, "-o", tempOutputPath.path,
]
// Add error output capture
let errorPipe = Pipe()
process.standardError = errorPipe
Logger.info(
"Decompressing Apple Archive format with: \(toolPath) \(process.arguments?.joined(separator: " ") ?? "")"
)
try process.run()
process.waitUntilExit()
// Check error output if any
let errorData = errorPipe.fileHandleForReading.readDataToEndOfFile()
if !errorData.isEmpty,
let errorString = String(data: errorData, encoding: .utf8)
{
Logger.error("Decompression error output: \(errorString)")
}
if process.terminationStatus != 0 {
Logger.error(
"Apple Archive decompression failed with status: \(process.terminationStatus), falling back to direct copy"
)
// Fall back to direct copying (uncompressed)
Logger.info("Copying part \(partNum) directly without decompression...")
try outputHandle.seek(toOffset: currentOffset)
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
defer { try? inputHandle.close() }
var bytesWritten: UInt64 = 0
let chunkSize = 1024 * 1024 // 1MB chunks
var chunkCount = 0
while true {
let data = autoreleasepool {
try! inputHandle.read(upToCount: chunkSize) ?? Data()
}
if data.isEmpty { break }
try outputHandle.write(contentsOf: data)
bytesWritten += UInt64(data.count)
chunkCount += 1
// Update progress
let totalProgress =
Double(currentOffset + bytesWritten) / Double(expectedTotalSize)
let progressBar = createProgressBar(progress: totalProgress, width: 30)
let progressPercent = Int(totalProgress * 100)
let currentSpeed =
ByteCountFormatter.string(
fromByteCount: Int64(Double(bytesWritten) / 0.5),
countStyle: .file) + "/s"
print(
"\r\(progressBar) \(progressPercent)% | Speed: \(currentSpeed) | Part \(partNum) | \(ByteCountFormatter.string(fromByteCount: Int64(currentOffset + bytesWritten), countStyle: .file)) ",
terminator: "")
fflush(stdout)
// Also log to the progress logger for consistency
reassemblyProgressLogger.logProgress(
current: totalProgress,
context: "Direct copying")
}
Logger.info(
"Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))"
)
currentOffset += bytesWritten
continue
}
// Check if the output file exists and has content
let outputExists = FileManager.default.fileExists(atPath: tempOutputPath.path)
let outputFileSize =
outputExists
? ((try? FileManager.default.attributesOfItem(atPath: tempOutputPath.path)[
.size] as? UInt64) ?? 0) : 0
Logger.info(
"Part \(partNum) - Decompressed output exists: \(outputExists), size: \(ByteCountFormatter.string(fromByteCount: Int64(outputFileSize), countStyle: .file))"
)
// If decompression produced an empty file, fall back to direct copy
if outputFileSize == 0 {
Logger.info(
"Decompression resulted in empty file, falling back to direct copy for part \(partNum)"
)
try? FileManager.default.removeItem(at: tempOutputPath)
// Fall back to direct copying (uncompressed)
Logger.info("Copying part \(partNum) directly without decompression...")
try outputHandle.seek(toOffset: currentOffset)
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
defer { try? inputHandle.close() }
var bytesWritten: UInt64 = 0
let chunkSize = 1024 * 1024 // 1MB chunks
var chunkCount = 0
while true {
let data = autoreleasepool {
try! inputHandle.read(upToCount: chunkSize) ?? Data()
}
if data.isEmpty { break }
try outputHandle.write(contentsOf: data)
bytesWritten += UInt64(data.count)
chunkCount += 1
// Update progress
let totalProgress =
Double(currentOffset + bytesWritten) / Double(expectedTotalSize)
let progressBar = createProgressBar(progress: totalProgress, width: 30)
let progressPercent = Int(totalProgress * 100)
let currentSpeed =
ByteCountFormatter.string(
fromByteCount: Int64(Double(bytesWritten) / 0.5),
countStyle: .file) + "/s"
print(
"\r\(progressBar) \(progressPercent)% | Speed: \(currentSpeed) | Part \(partNum) | \(ByteCountFormatter.string(fromByteCount: Int64(currentOffset + bytesWritten), countStyle: .file)) ",
terminator: "")
fflush(stdout)
// Also log to the progress logger for consistency
reassemblyProgressLogger.logProgress(
current: totalProgress,
context: "Direct copying")
}
Logger.info(
"Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))"
)
currentOffset += bytesWritten
continue
}
// Read the decompressed file and write to our output
let tempInputHandle = try FileHandle(forReadingFrom: tempOutputPath)
defer {
try? tempInputHandle.close()
try? FileManager.default.removeItem(at: tempOutputPath)
}
// Read decompressed data in chunks and write to sparse file
var partDecompressedSize: UInt64 = 0
let chunkSize = 1024 * 1024 // 1MB chunks
var chunkCount = 0
while true {
let data = autoreleasepool { // Help manage memory with large files
try! tempInputHandle.read(upToCount: chunkSize) ?? Data()
}
if data.isEmpty { break } // End of stream
try outputHandle.write(contentsOf: data)
partDecompressedSize += UInt64(data.count)
chunkCount += 1
// Update progress based on decompressed size written
let totalProgress =
Double(currentOffset + partDecompressedSize)
/ Double(expectedTotalSize)
reassemblyProgressLogger.logProgress(
current: totalProgress,
context: "Reassembling")
}
Logger.info(
"Part \(partNum) - Wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(partDecompressedSize), countStyle: .file))"
)
currentOffset += partDecompressedSize // Advance offset by decompressed size
} else {
// No decompression command available, try direct copy
Logger.info(
"Copying part \(partNum) directly..."
)
try outputHandle.seek(toOffset: currentOffset)
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
defer { try? inputHandle.close() }
// Get part size
let partSize =
(try? FileManager.default.attributesOfItem(atPath: sourceURL.path)[.size]
as? UInt64) ?? 0
Logger.info(
"Direct copy of part \(partNum) with size: \(ByteCountFormatter.string(fromByteCount: Int64(partSize), countStyle: .file))"
)
var bytesWritten: UInt64 = 0
let chunkSize = 1024 * 1024 // 1MB chunks
var chunkCount = 0
while true {
let data = autoreleasepool {
try! inputHandle.read(upToCount: chunkSize) ?? Data()
}
if data.isEmpty { break }
try outputHandle.write(contentsOf: data)
bytesWritten += UInt64(data.count)
chunkCount += 1
// Update progress
let totalProgress =
Double(currentOffset + bytesWritten)
/ Double(expectedTotalSize)
reassemblyProgressLogger.logProgress(
current: totalProgress,
context: "Direct copying")
}
Logger.info(
"Part \(partNum) - Direct copy: wrote \(chunkCount) chunks, total bytes: \(ByteCountFormatter.string(fromByteCount: Int64(bytesWritten), countStyle: .file))"
)
currentOffset += bytesWritten
}
// Ensure data is written before processing next part (optional but safer)
try outputHandle.synchronize()
// Use the correct sparse decompression function
let decompressedBytesWritten = try decompressChunkAndWriteSparse(
inputPath: sourceURL.path,
outputHandle: outputHandle,
startOffset: currentOffset
)
currentOffset += decompressedBytesWritten
try outputHandle.synchronize() // Optional: Synchronize after each chunk
}
// Finalize progress, close handle (done by defer)
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembly Complete")
Logger.info("") // Newline
// Ensure output handle is closed before post-processing
try outputHandle.close()