Add sequential ordering

This commit is contained in:
f-trycua
2025-04-19 23:33:02 -07:00
committed by Dillon DuPont
parent d6d36ecb94
commit b791324a91
2 changed files with 262 additions and 225 deletions

View File

@@ -26,6 +26,18 @@ enum PushError: Error {
case missingToken
case invalidURL
case lz4NotFound // Added error case
case invalidMediaType // Added during part refactoring
case missingUncompressedSizeAnnotation // Added for sparse file handling
case fileCreationFailed(String) // Added for sparse file handling
case reassemblySetupFailed(path: String, underlyingError: Error?) // Added for sparse file handling
case missingPart(Int) // Added for sparse file handling
case layerDownloadFailed(String) // Added for download retries
case manifestFetchFailed // Added for manifest fetching
}
// Define a specific error type for when no underlying error exists
struct NoSpecificUnderlyingError: Error, CustomStringConvertible {
var description: String { "No specific underlying error was provided." }
}
struct ChunkMetadata: Codable {
@@ -104,21 +116,25 @@ struct ImageMetadata: Codable {
// Actor to safely collect disk part information from concurrent tasks
actor DiskPartsCollector {
// Store tuples of (sequentialPartNum, url)
private var diskParts: [(Int, URL)] = []
private var partCounter = 0
// Restore internal counter
private var partCounter = 0
// Adds a part and returns its assigned sequential number
func addPart(url: URL) -> Int {
partCounter += 1
partCounter += 1 // Use counter logic
let partNum = partCounter
diskParts.append((partNum, url))
return partNum
diskParts.append((partNum, url)) // Store sequential number
return partNum // Return assigned sequential number
}
// Sort by the sequential part number (index 0 of tuple)
func getSortedParts() -> [(Int, URL)] {
return diskParts.sorted { $0.0 < $1.0 }
}
// Restore getTotalParts
func getTotalParts() -> Int {
return partCounter
}
@@ -752,6 +768,9 @@ class ImageContainerRegistry: @unchecked Sendable {
)
let counter = TaskCounter()
// Remove totalDiskParts
// var totalDiskParts: Int? = nil
var lz4LayerCount = 0 // Count lz4 layers found
try await withThrowingTaskGroup(of: Int64.self) { group in
for layer in manifest.layers {
@@ -764,176 +783,151 @@ class ImageContainerRegistry: @unchecked Sendable {
await counter.decrement()
}
// Check both media type and safely unwrap part info
// Identify disk parts by media type
if layer.mediaType == "application/octet-stream+lz4" {
let size = layer.size
// Declare cachedLayer and digest here
// --- Handle LZ4 Disk Part Layer ---
lz4LayerCount += 1 // Increment count
let currentPartNum = lz4LayerCount // Use the current count as the logical number for logging
let cachedLayer = getCachedLayerPath(
manifestId: manifestId, digest: layer.digest)
let digest = layer.digest
let size = layer.size
// For memory-constrained mode - point directly to cache when possible
if memoryConstrained // Line 777
&& FileManager.default.fileExists(atPath: cachedLayer.path)
{
// Use the cached file *directly* without copying to temp
// Add the *cached* layer path to the collector
let partNum = await diskPartsCollector.addPart(url: cachedLayer) // Use the collector
Logger.info("Using cached layer directly for part #\(partNum): \(cachedLayer.lastPathComponent)")
// Account for progress directly, no need for a separate task
await downloadProgress.addProgress(Int64(size))
// No task was added, so no need to increment/decrement counter here
continue // Skip the download task group logic below
} else {
// Not memory constrained OR file not cached
// Add a task to handle copy/download and adding to collector
group.addTask { [self] in
await counter.increment() // Increment counter for the task
let finalPath: URL
if memoryConstrained && FileManager.default.fileExists(atPath: cachedLayer.path) {
// Add to collector, get sequential number assigned by collector
let collectorPartNum = await diskPartsCollector.addPart(url: cachedLayer)
// Log using the sequential number from collector for clarity if needed, or the lz4LayerCount
Logger.info("Using cached lz4 layer (part \(currentPartNum)) directly: \(cachedLayer.lastPathComponent) -> Collector #\(collectorPartNum)")
await downloadProgress.addProgress(Int64(size))
continue
} else {
// Download/Copy Path (Task Group)
group.addTask { [self] in
await counter.increment()
let finalPath: URL
if FileManager.default.fileExists(atPath: cachedLayer.path) {
// If cached, copy to temp and use temp path for reassembly later
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size)) // Update progress after copy
finalPath = tempPartURL
} else {
// If not cached, download to temp path
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)") // Unique temp path
// Check if this layer is already being downloaded
if isDownloading(digest) {
try await waitForExistingDownload(digest, cachedLayer: cachedLayer)
// If it finished downloading while waiting, copy from cache to temp
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size)) // Update progress
finalPath = tempPartURL
} else {
// If still not available after waiting (should be rare), proceed to download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: layer.mediaType, // Use correct mediaType
token: token,
to: tempPartURL,
maxRetries: 5,
progress: downloadProgress, // Progress updated inside downloadLayer
manifestId: manifestId
)
// downloadLayer handles caching and markDownloadComplete
finalPath = tempPartURL
}
} else {
// Start new download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: layer.mediaType, // Use correct mediaType
token: token,
to: tempPartURL,
maxRetries: 5,
progress: downloadProgress, // Progress updated inside downloadLayer
manifestId: manifestId
)
// downloadLayer handles caching and markDownloadComplete
finalPath = tempPartURL
}
}
// Add the final determined path (temp path) to the collector
let partNum = await diskPartsCollector.addPart(url: finalPath)
Logger.info("Assigned part #\(partNum) for path: \(finalPath.lastPathComponent)")
await counter.decrement() // Decrement counter
return Int64(size)
}
continue // Ensure we move to the next layer after adding task
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)")
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size))
finalPath = tempPartURL
} else {
let tempPartURL = tempDownloadDir.appendingPathComponent("disk.img.part.\(UUID().uuidString)")
if isDownloading(digest) {
try await waitForExistingDownload(digest, cachedLayer: cachedLayer)
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(at: cachedLayer, to: tempPartURL)
await downloadProgress.addProgress(Int64(size))
finalPath = tempPartURL
} else {
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest, mediaType: layer.mediaType, token: token,
to: tempPartURL, maxRetries: 5,
progress: downloadProgress, manifestId: manifestId
)
finalPath = tempPartURL
}
} else {
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest, mediaType: layer.mediaType, token: token,
to: tempPartURL, maxRetries: 5,
progress: downloadProgress, manifestId: manifestId
)
finalPath = tempPartURL
}
}
// Add to collector, get sequential number assigned by collector
let collectorPartNum = await diskPartsCollector.addPart(url: finalPath)
// Log using the sequential number from collector
Logger.info("Assigned path for lz4 layer (part \(currentPartNum)): \(finalPath.lastPathComponent) -> Collector #\(collectorPartNum)")
await counter.decrement()
return Int64(size)
}
}
} else {
// --- Handle Non-Disk-Part Layer ---
let mediaType = layer.mediaType
let digest = layer.digest
let size = layer.size
// Determine output path based on media type
let outputURL: URL
switch mediaType {
case "application/vnd.oci.image.layer.v1.tar",
"application/octet-stream+gzip":
outputURL = tempDownloadDir.appendingPathComponent("disk.img")
"application/octet-stream+gzip": // Might be compressed disk.img single file?
outputURL = tempDownloadDir.appendingPathComponent("disk.img")
case "application/vnd.oci.image.config.v1+json":
outputURL = tempDownloadDir.appendingPathComponent("config.json")
case "application/octet-stream":
outputURL = tempDownloadDir.appendingPathComponent("nvram.bin")
case "application/octet-stream": // Could be nvram or uncompressed single disk.img
// Heuristic: If a config.json already exists or is expected, assume this is nvram.
// This might need refinement if single disk images use octet-stream.
if manifest.config != nil {
outputURL = tempDownloadDir.appendingPathComponent("nvram.bin")
} else {
// Assume it's a single-file disk image if no config layer is present
outputURL = tempDownloadDir.appendingPathComponent("disk.img")
}
default:
continue
Logger.info("Skipping unsupported layer media type: \(mediaType)")
continue // Skip to the next layer
}
// Add task to download/copy the non-disk-part layer
group.addTask { [self] in
await counter.increment()
let cachedLayer = getCachedLayerPath(
manifestId: manifestId, digest: digest)
let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: digest)
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(at: cachedLayer, to: outputURL)
await downloadProgress.addProgress(Int64(size))
} else {
// Check if this layer is already being downloaded and we're not skipping cache
if isDownloading(digest) {
try await waitForExistingDownload(
digest, cachedLayer: cachedLayer)
try await waitForExistingDownload(digest, cachedLayer: cachedLayer)
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.copyItem(
at: cachedLayer, to: outputURL)
try FileManager.default.copyItem(at: cachedLayer, to: outputURL)
await downloadProgress.addProgress(Int64(size))
await counter.decrement() // Decrement before returning
return Int64(size)
}
}
// Start new download
markDownloadStarted(digest)
try await self.downloadLayer(
repository: "\(self.organization)/\(imageName)",
digest: digest,
mediaType: mediaType,
token: token,
to: outputURL,
maxRetries: 5,
progress: downloadProgress,
manifestId: manifestId
digest: digest, mediaType: mediaType, token: token,
to: outputURL, maxRetries: 5,
progress: downloadProgress, manifestId: manifestId
)
// Cache the downloaded layer if caching is enabled
if cachingEnabled {
if FileManager.default.fileExists(atPath: cachedLayer.path) {
try FileManager.default.removeItem(at: cachedLayer)
}
try FileManager.default.copyItem(at: outputURL, to: cachedLayer)
}
markDownloadComplete(digest)
// Note: downloadLayer handles caching and marking download complete
}
await counter.decrement()
return Int64(size)
}
}
}
} // End for layer in manifest.layers
// Wait for remaining tasks
for try await _ in group {}
} // End TaskGroup
// --- Safely retrieve parts AFTER TaskGroup ---
let diskParts = await diskPartsCollector.getSortedParts()
let totalParts = await diskPartsCollector.getTotalParts()
Logger.info("Finished processing layers. Found \(totalParts) disk parts.")
let diskParts = await diskPartsCollector.getSortedParts() // Already sorted by logicalPartNum
// Check if totalDiskParts was set (meaning at least one lz4 layer was processed)
// Get total parts from the collector
let totalPartsFromCollector = await diskPartsCollector.getTotalParts()
// Change guard to if for logging only, as the later if condition handles the logic
if totalPartsFromCollector == 0 {
// If totalParts is 0, it means no layers matched the lz4 format.
Logger.info("No lz4 disk part layers found. Assuming single-part image or non-lz4 parts.")
// Reassembly logic below will be skipped if diskParts is empty.
// Explicitly set totalParts to 0 to prevent entering the reassembly block if diskParts might somehow be non-empty but totalParts was 0
// This ensures consistency if the collector logic changes.
}
Logger.info("Finished processing layers. Found \(diskParts.count) disk parts to reassemble (Total Lz4 Layers: \(totalPartsFromCollector)).")
// --- End retrieving parts ---
Logger.info("") // New line after progress
@@ -974,8 +968,9 @@ class ImageContainerRegistry: @unchecked Sendable {
}
// Handle disk parts if present
if !diskParts.isEmpty {
Logger.info("Reassembling disk image using sparse file technique...")
if !diskParts.isEmpty && totalPartsFromCollector > 0 {
// Use totalPartsFromCollector here
Logger.info("Reassembling \(totalPartsFromCollector) disk image parts using sparse file technique...")
let outputURL = tempVMDir.appendingPathComponent("disk.img")
// Wrap setup in do-catch for better error reporting
@@ -1008,8 +1003,9 @@ class ImageContainerRegistry: @unchecked Sendable {
}
// Calculate expected size from the manifest layers (sum of compressed parts - for logging only now)
// Filter based on the correct media type now
let expectedCompressedTotalSize = UInt64(
manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0)
manifest.layers.filter { $0.mediaType == "application/octet-stream+lz4" }.reduce(0)
{ $0 + $1.size }
)
Logger.info(
@@ -1067,23 +1063,39 @@ class ImageContainerRegistry: @unchecked Sendable {
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05)
var currentOffset: UInt64 = 0 // Track position in the final *decompressed* file
for partNum in 1...totalParts {
// Iterate using the reliable totalParts count from media type
// Use totalPartsFromCollector for the loop range
for partNum in 1...totalPartsFromCollector {
// Find the original layer info for this part number
guard
let layer = manifest.layers.first(where: { layer in
if let info = extractPartInfo(from: layer.mediaType) {
return info.partNum == partNum
}
return false
}),
let (_, partURL) = diskParts.first(where: { $0.0 == partNum })
else {
throw PullError.missingPart(partNum)
// Find the part URL from our collected parts using the logical partNum
guard let partInfo = diskParts.first(where: { $0.0 == partNum }) else {
// This error should now be less likely, but good to keep
Logger.error("Missing required part number \(partNum) in collected parts during reassembly.")
throw PullError.missingPart(partNum)
}
let layerMediaType = layer.mediaType // Extract mediaType here
let partURL = partInfo.1 // Get the URL from the tuple
// We no longer need to find the original manifest layer here,
// as all parts collected by the collector should be the lz4 type.
// Remove the block that used extractPartInfo:
/*
guard let layer = manifest.layers.first(where: { layer in
if let info = extractPartInfo(from: layer.mediaType) {
return info.partNum == partNum
}
return false
}) else {
// Should not happen if totalParts was derived correctly
Logger.error("Could not find manifest layer for logical part number \(partNum).")
throw PullError.missingPart(partNum) // Or a different error
}
let layerMediaType = layer.mediaType
*/
// Assume the media type for decompression purposes
let layerMediaType = "application/octet-stream+lz4"
Logger.info(
"Processing part \(partNum) of \(totalParts): \(partURL.lastPathComponent)")
"Processing part \(partNum) of \(totalPartsFromCollector): \(partURL.lastPathComponent)")
let inputHandle = try FileHandle(forReadingFrom: partURL)
defer {
@@ -1504,31 +1516,46 @@ class ImageContainerRegistry: @unchecked Sendable {
async throws
{
Logger.info("Copying from cache...")
// Define output URL and expected size variable scope here
let outputURL = destination.appendingPathComponent("disk.img")
var expectedTotalSize: UInt64? = nil // Use optional to handle missing config
// Instantiate collector
let diskPartsCollector = DiskPartsCollector()
// Remove totalDiskParts
// var totalDiskParts: Int? = nil
var lz4LayerCount = 0 // Count lz4 layers found
// First identify disk parts and non-disk files
for layer in manifest.layers {
let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: layer.digest)
// Check if it's a disk chunk layer based on media type
// Identify disk parts simply by media type
if layer.mediaType == "application/octet-stream+lz4" {
// It's a disk chunk - Add to collector
_ = await diskPartsCollector.addPart(url: cachedLayer) // Ignore return value
}
lz4LayerCount += 1 // Increment count
// Add to collector. It will assign the sequential part number.
let collectorPartNum = await diskPartsCollector.addPart(url: cachedLayer)
Logger.info("Adding cached lz4 layer (part \(lz4LayerCount)) -> Collector #\(collectorPartNum): \(cachedLayer.lastPathComponent)")
}
else {
// Handle non-disk layers (config, nvram)
// --- Handle Non-Disk-Part Layer (from cache) ---
let fileName: String
switch layer.mediaType {
case "application/vnd.oci.image.config.v1+json":
fileName = "config.json"
case "application/octet-stream":
fileName = "nvram.bin"
// Assume nvram if config layer exists, otherwise assume single disk image
fileName = manifest.config != nil ? "nvram.bin" : "disk.img"
case "application/vnd.oci.image.layer.v1.tar",
"application/octet-stream+gzip":
// Assume disk image for these types as well if encountered in cache scenario
fileName = "disk.img"
default:
Logger.info("Skipping unsupported cached layer media type: \(layer.mediaType)")
continue
}
// Only non-disk files are copied
// Copy the non-disk file directly from cache to destination
try FileManager.default.copyItem(
at: cachedLayer,
to: destination.appendingPathComponent(fileName)
@@ -1537,111 +1564,113 @@ class ImageContainerRegistry: @unchecked Sendable {
}
// --- Safely retrieve parts AFTER loop ---
let diskPartSources = await diskPartsCollector.getSortedParts()
let totalParts = await diskPartsCollector.getTotalParts()
Logger.info("Found \(totalParts) disk parts in cache.")
let diskPartSources = await diskPartsCollector.getSortedParts() // Sorted by assigned sequential number
let totalParts = await diskPartsCollector.getTotalParts() // Get total count from collector
// Remove old guard check
/*
guard let totalParts = totalDiskParts else {
Logger.info("No cached layers with valid part information found. Assuming single-part image or non-lz4 parts.")
}
*/
Logger.info("Found \(totalParts) lz4 disk parts in cache to reassemble.")
// --- End retrieving parts ---
// Reassemble disk parts if needed
if !diskPartSources.isEmpty { // Use the retrieved array
// Get the uncompressed size from cached config
let configDigest = manifest.config?.digest
let cachedConfigPath =
configDigest != nil
? getCachedLayerPath(manifestId: manifestId, digest: configDigest!) : nil
let uncompressedSize = cachedConfigPath.flatMap {
getUncompressedSizeFromConfig(configPath: $0)
}
// Use the count from the collector
if !diskPartSources.isEmpty {
// Use totalParts from collector directly
Logger.info("Reassembling \(totalParts) disk image parts using sparse file technique...")
// Get uncompressed size from cached config file (needs to be copied first)
let configURL = destination.appendingPathComponent("config.json")
// Parse config.json to get uncompressed size *before* reassembly
let uncompressedSize = getUncompressedSizeFromConfig(configPath: configURL)
// Try to get disk size from VM config if OCI annotation not found
// Now also try to get disk size from VM config if OCI annotation not found
var vmConfigDiskSize: UInt64? = nil
if uncompressedSize == nil {
// Find config.json in the copied files
let vmConfigPath = destination.appendingPathComponent("config.json")
if FileManager.default.fileExists(atPath: vmConfigPath.path) {
do {
let configData = try Data(contentsOf: vmConfigPath)
let decoder = JSONDecoder()
if let vmConfig = try? decoder.decode(VMConfig.self, from: configData) {
vmConfigDiskSize = vmConfig.diskSize
if let size = vmConfigDiskSize {
Logger.info(
"Found diskSize from cached VM config.json: \(size) bytes")
}
if uncompressedSize == nil && FileManager.default.fileExists(atPath: configURL.path) {
do {
let configData = try Data(contentsOf: configURL)
let decoder = JSONDecoder()
if let vmConfig = try? decoder.decode(VMConfig.self, from: configData) {
vmConfigDiskSize = vmConfig.diskSize
if let size = vmConfigDiskSize {
Logger.info("Found diskSize from VM config.json: \(size) bytes")
}
} catch {
Logger.error("Failed to parse cached VM config.json for diskSize: \(error)")
}
} catch {
Logger.error("Failed to parse VM config.json for diskSize: \(error)")
}
}
// Force explicit use
if uncompressedSize != nil {
Logger.info(
"Will use uncompressed size from annotation for sparse file: \(uncompressedSize!) bytes"
// Determine the size to use for the sparse file
// Use: annotation size > VM config diskSize > fallback (error)
if let size = uncompressedSize {
Logger.info("Using uncompressed size from annotation: \(size) bytes")
expectedTotalSize = size
} else if let size = vmConfigDiskSize {
Logger.info("Using diskSize from VM config: \(size) bytes")
expectedTotalSize = size
} else {
// If neither is found in cache scenario, throw error as we cannot determine the size
Logger.error(
"Missing both uncompressed size annotation and VM config diskSize for cached multi-part image."
+ " Cannot reassemble."
)
} else if vmConfigDiskSize != nil {
Logger.info(
"Will use diskSize from VM config for sparse file: \(vmConfigDiskSize!) bytes")
throw PullError.missingUncompressedSizeAnnotation
}
Logger.info(
"Reassembling disk image from cached parts using sparse file technique..."
)
let outputURL = destination.appendingPathComponent("disk.img")
// Now that expectedTotalSize is guaranteed to be non-nil, proceed with setup
guard let sizeForTruncate = expectedTotalSize else {
// This should not happen due to the checks above, but safety first
let nilError: Error? = nil
// Use nil-coalescing to provide a default error, appeasing the compiler
throw PullError.reassemblySetupFailed(path: outputURL.path, underlyingError: nilError ?? NoSpecificUnderlyingError())
}
// Wrap setup in do-catch for better error reporting
// Wrap file handle setup and sparse file creation within this block
let outputHandle: FileHandle
do {
// 1. Ensure parent directory exists
try FileManager.default.createDirectory(
at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true)
// 2. Explicitly create the file first, removing old one if needed
// Ensure parent directory exists
try FileManager.default.createDirectory(at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true)
// Explicitly create the file first, removing old one if needed
if FileManager.default.fileExists(atPath: outputURL.path) {
try FileManager.default.removeItem(at: outputURL)
}
guard FileManager.default.createFile(atPath: outputURL.path, contents: nil) else {
throw PullError.fileCreationFailed(outputURL.path)
}
// 3. Now open the handle for writing
// Open handle for writing
outputHandle = try FileHandle(forWritingTo: outputURL)
// Set the file size (creates sparse file)
try outputHandle.truncate(atOffset: sizeForTruncate)
Logger.info("Sparse file initialized for cache reassembly with size: \(ByteCountFormatter.string(fromByteCount: Int64(sizeForTruncate), countStyle: .file))")
} catch {
// Catch errors during directory/file creation or handle opening
Logger.error(
"Failed during setup for disk image reassembly: \(error.localizedDescription)",
metadata: ["path": outputURL.path])
throw PullError.reassemblySetupFailed(path: outputURL.path, underlyingError: error)
Logger.error("Failed during setup for cached disk image reassembly: \(error.localizedDescription)", metadata: ["path": outputURL.path])
throw PullError.reassemblySetupFailed(path: outputURL.path, underlyingError: error)
}
// Calculate expected total size from the cached files
let expectedTotalSize: UInt64 = diskPartSources.reduce(UInt64(0)) {
(acc: UInt64, element) -> UInt64 in
let fileSize =
(try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size]
as? UInt64 ?? 0) ?? 0
return acc + fileSize
}
Logger.info(
"Expected download size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file)) (actual disk usage will be lower)"
)
// Ensure handle is closed when exiting this scope
defer { try? outputHandle.close() }
// ... (Get uncompressed size etc.) ...
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05)
var currentOffset: UInt64 = 0 // Track position in the final *decompressed* file
var currentOffset: UInt64 = 0
for partNum in 1...totalParts {
// Find the original layer info for this part number
guard
// Find layer by index approximated during collection, not media type parts
let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum })
else {
throw PullError.missingPart(partNum)
// Iterate from 1 up to the total number of parts found by the collector
for collectorPartNum in 1...totalParts {
// Find the source URL from our collected parts using the sequential collectorPartNum
guard let sourceInfo = diskPartSources.first(where: { $0.0 == collectorPartNum }) else {
Logger.error("Missing required cached part number \(collectorPartNum) in collected parts during reassembly.")
throw PullError.missingPart(collectorPartNum)
}
let sourceURL = sourceInfo.1 // Get URL from tuple
// Log using the sequential collector part number
Logger.info(
"Decompressing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent) at offset \(currentOffset)..."
"Decompressing part \(collectorPartNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent) at offset \(currentOffset)..."
)
// Use the correct sparse decompression function
@@ -1659,7 +1688,8 @@ class ImageContainerRegistry: @unchecked Sendable {
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembly Complete")
// Ensure output handle is closed before post-processing
try outputHandle.close()
// No need for explicit close here, defer handles it
// try outputHandle.close()
// Verify final size
let finalSize =
@@ -1669,9 +1699,10 @@ class ImageContainerRegistry: @unchecked Sendable {
"Final disk image size from cache (before sparse file optimization): \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
)
if finalSize != expectedTotalSize {
// Use the calculated sizeForTruncate for comparison
if finalSize != sizeForTruncate {
Logger.info(
"Warning: Final reported size (\(finalSize) bytes) differs from expected size (\(expectedTotalSize) bytes), but this doesn't affect functionality"
"Warning: Final reported size (\(finalSize) bytes) differs from expected size (\(sizeForTruncate) bytes), but this doesn't affect functionality"
)
}
@@ -1912,8 +1943,10 @@ class ImageContainerRegistry: @unchecked Sendable {
throw lastError ?? PullError.layerDownloadFailed(digest)
}
// Function removed as it's not applicable to the observed manifest format
/*
private func extractPartInfo(from mediaType: String) -> (partNum: Int, total: Int)? {
let pattern = #"part\.number=(\d+);part\.total=(\d+)"#
let pattern = #"part\\.number=(\\d+);part\\.total=(\\d+)"#
guard let regex = try? NSRegularExpression(pattern: pattern),
let match = regex.firstMatch(
in: mediaType,
@@ -1928,6 +1961,7 @@ class ImageContainerRegistry: @unchecked Sendable {
}
return (partNum, total)
}
*/
private func listRepositories() async throws -> [String] {
var request = URLRequest(

View File

@@ -58,6 +58,7 @@ enum PullError: Error, LocalizedError {
case fileCreationFailed(String)
case reassemblySetupFailed(path: String, underlyingError: Error)
case missingUncompressedSizeAnnotation
case invalidMediaType
var errorDescription: String? {
switch self {
@@ -81,6 +82,8 @@ enum PullError: Error, LocalizedError {
return "Failed to set up for reassembly at path: \(path). Underlying error: \(underlyingError.localizedDescription)"
case .missingUncompressedSizeAnnotation:
return "Could not find the required uncompressed disk size annotation in the image config.json."
case .invalidMediaType:
return "Invalid media type"
}
}
}