Merge pull request #99 from trycua/fix/lume/pull-cat

[Lume] Fix reassemble kill
This commit is contained in:
f-trycua
2025-04-11 09:37:43 +02:00
committed by GitHub
2 changed files with 178 additions and 124 deletions

View File

@@ -346,10 +346,12 @@ class ImageContainerRegistry: @unchecked Sendable {
// Use a more efficient approach for memory-constrained systems
let memoryConstrained = determineIfMemoryConstrained()
// Adjust concurrency based on memory constraints
let actualMaxConcurrentTasks = memoryConstrained ? 2 : maxConcurrentTasks
Logger.info(
memoryConstrained
? "Using memory-optimized mode for disk parts"
: "Using standard mode for disk parts")
? "Using memory-optimized mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))"
: "Using standard mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))")
try await withThrowingTaskGroup(of: Int64.self) { group in
for layer in manifest.layers {
@@ -357,7 +359,7 @@ class ImageContainerRegistry: @unchecked Sendable {
continue
}
while await counter.current() >= maxConcurrentTasks {
while await counter.current() >= actualMaxConcurrentTasks { // Use adjusted concurrency
_ = try await group.next()
await counter.decrement()
}
@@ -523,80 +525,92 @@ class ImageContainerRegistry: @unchecked Sendable {
// Handle disk parts if present
if !diskParts.isEmpty {
Logger.info("Reassembling disk image...")
Logger.info("Reassembling disk image using external 'cat' command...")
let outputURL = tempVMDir.appendingPathComponent("disk.img")
try FileManager.default.createDirectory(
at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true)
// Create empty output file
// Ensure the output file exists but is empty
if FileManager.default.fileExists(atPath: outputURL.path) {
try FileManager.default.removeItem(at: outputURL)
}
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
let outputHandle = try FileHandle(forWritingTo: outputURL)
defer { try? outputHandle.close() }
var totalWritten: UInt64 = 0
// CORRECT LOGIC for new pull: Calculate expected size from the manifest layers
let expectedTotalSize = UInt64(
manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0)
{ $0 + $1.size })
manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0) { $0 + $1.size }
)
Logger.info(
"Expected final size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
)
// Process parts in order
// Prepare arguments for cat
var catArgs: [String] = []
var partURLsToDelete: [URL] = [] // Keep track of non-cached parts to delete
for partNum in 1...totalParts {
guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else {
throw PullError.missingPart(partNum)
}
let inputHandle = try FileHandle(forReadingFrom: partURL)
defer {
try? inputHandle.close()
// Don't delete the part file if we're in cache mode and the part is from cache
if noCache || !partURL.path.contains(cacheDirectory.path) {
try? FileManager.default.removeItem(at: partURL)
}
}
// On low memory systems, be more aggressive with releasing memory
let memoryConstrained = determineIfMemoryConstrained()
var chunksProcessed = 0
while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) {
try autoreleasepool {
try outputHandle.write(contentsOf: data)
totalWritten += UInt64(data.count)
// Only log progress every 5% to reduce log noise
let progress: Double =
Double(totalWritten) / Double(expectedTotalSize) * 100
let roundedProgress = Int(progress / 5) * 5
if roundedProgress != Int(
(Double(totalWritten - UInt64(data.count))
/ Double(expectedTotalSize) * 100)
/ 5) * 5
{
Logger.info("Reassembling disk image: \(roundedProgress)%")
}
// Force more frequent autoreleases on memory-constrained systems
chunksProcessed += 1
if memoryConstrained && chunksProcessed % 10 == 0 {
try outputHandle.synchronize()
}
}
}
// Make sure we explicitly close handles after each part to free resources
try? inputHandle.synchronize()
try inputHandle.close()
catArgs.append(partURL.path)
// Only mark for deletion if it's not in the main cache dir
if noCache || !partURL.path.contains(cacheDirectory.path) {
partURLsToDelete.append(partURL)
}
}
// Verify final size
let finalSize =
try FileManager.default.attributesOfItem(atPath: outputURL.path)[.size]
as? UInt64 ?? 0
// Execute cat command asynchronously
let process = Process()
process.executableURL = URL(fileURLWithPath: "/bin/cat")
process.arguments = catArgs
// Redirect output to the final disk image file
let outputFileHandle = try FileHandle(forWritingTo: outputURL)
process.standardOutput = outputFileHandle
try process.run()
// Monitor progress by checking file size
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var
var currentSize: UInt64 = 0
while process.isRunning {
// Get current file size
currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize
// Calculate and log progress
if expectedTotalSize > 0 {
let progressValue = Double(currentSize) / Double(expectedTotalSize)
reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image")
}
// Wait a bit before checking again
try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds
}
// Log 100% completion explicitly
if expectedTotalSize > 0 {
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image")
}
Logger.info("") // Newline after progress
// Ensure handle is closed AFTER process finishes
try? outputFileHandle.close()
// Check termination status AFTER process finishes
let terminationStatus = process.terminationStatus
// Clean up temporary part files
for partURL in partURLsToDelete {
try? FileManager.default.removeItem(at: partURL)
}
guard terminationStatus == 0 else {
throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)")
}
// Verify final size (already done after loop essentially, but good to check)
let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0
Logger.info(
"Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
)
Logger.info(
"Expected size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
)
if finalSize != expectedTotalSize {
Logger.info(
@@ -604,7 +618,7 @@ class ImageContainerRegistry: @unchecked Sendable {
)
}
Logger.info("Disk image reassembled successfully")
Logger.info("Disk image reassembled successfully using 'cat'")
} else {
// Copy single disk image if it exists
let diskURL = tempDownloadDir.appendingPathComponent("disk.img")
@@ -683,74 +697,99 @@ class ImageContainerRegistry: @unchecked Sendable {
// Reassemble disk parts if needed
if !diskPartSources.isEmpty {
Logger.info("Reassembling disk image from cached parts (optimized storage)...")
Logger.info(
"Reassembling disk image from cached parts using external 'cat' command (optimized storage)..."
)
let outputURL = destination.appendingPathComponent("disk.img")
// Ensure the output file exists but is empty
if FileManager.default.fileExists(atPath: outputURL.path) {
try FileManager.default.removeItem(at: outputURL)
}
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
let outputHandle = try FileHandle(forWritingTo: outputURL)
defer { try? outputHandle.close() }
var totalWritten: UInt64 = 0
// Explicitly type accumulator (acc: UInt64) and element in the closure
let expectedTotalSize: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in
let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0
return acc + fileSize
}
Logger.info(
"Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
)
// Process parts in order, reading directly from cache
// Prepare arguments for cat, reading directly from cache
var catArgs: [String] = []
for partNum in 1...totalParts {
guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else {
throw PullError.missingPart(partNum)
}
// Read directly from the cached part
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
defer { try? inputHandle.close() }
// On low memory systems, be more aggressive with releasing memory
let memoryConstrained = determineIfMemoryConstrained()
var chunksProcessed = 0
while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) {
try autoreleasepool {
try outputHandle.write(contentsOf: data)
totalWritten += UInt64(data.count)
// Only log progress every 5% to reduce log noise
let progress: Double =
Double(totalWritten) / Double(expectedTotalSize) * 100
let roundedProgress = Int(progress / 5) * 5
if roundedProgress != Int(
(Double(totalWritten - UInt64(data.count)) / Double(expectedTotalSize)
* 100)
/ 5) * 5
{
Logger.info("Reassembling disk image from cache: \(roundedProgress)%")
}
// Force more frequent autoreleases on memory-constrained systems
chunksProcessed += 1
if memoryConstrained && chunksProcessed % 10 == 0 {
try outputHandle.synchronize()
}
}
}
// Make sure we explicitly close handles after each part to free resources
try? inputHandle.synchronize()
try inputHandle.close()
catArgs.append(sourceURL.path)
}
// Verify final size
let finalSize =
try FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64
?? 0
// Execute cat command asynchronously
let process = Process()
process.executableURL = URL(fileURLWithPath: "/bin/cat")
process.arguments = catArgs
// Redirect output to the final disk image file
let outputFileHandle = try FileHandle(forWritingTo: outputURL)
process.standardOutput = outputFileHandle
try process.run()
// Monitor progress by checking file size
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var
var currentSize: UInt64 = 0
while process.isRunning {
// Get current file size
currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize
// Calculate and log progress
if expectedTotalSize > 0 {
let progressValue = Double(currentSize) / Double(expectedTotalSize)
reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image from cache")
}
// Wait a bit before checking again
try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds
}
// Log 100% completion explicitly
if expectedTotalSize > 0 {
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache")
}
Logger.info("") // Newline after progress
// Ensure handle is closed AFTER process finishes
try? outputFileHandle.close()
// Check termination status AFTER process finishes
let terminationStatus = process.terminationStatus
guard terminationStatus == 0 else {
throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)")
}
// Verify final size (already done after loop essentially, but good to check)
let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0
Logger.info(
"Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
)
Logger.info(
"Expected size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
"Final disk image size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
)
if finalSize != expectedTotalSize {
// Calculate expected size again for the warning message consistency
// Explicitly type accumulator (acc: UInt64) and element in the closure
let expectedSizeForWarning: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in
let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0
return acc + fileSize
}
Logger.info(
"Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedTotalSize) bytes)"
"Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedSizeForWarning) bytes)"
)
}
Logger.info("Disk image reassembled successfully from cache using 'cat'")
}
Logger.info("Cache copy complete")
@@ -1061,27 +1100,39 @@ class ImageContainerRegistry: @unchecked Sendable {
}
}
// Default to 512KB as a safe minimum
let defaultChunkSize = 512 * 1024
// Define chunk size parameters
let safeMinimumChunkSize = 128 * 1024 // Reduced minimum for constrained systems
let defaultChunkSize = 512 * 1024 // Standard default / minimum for non-constrained
let constrainedCap = 512 * 1024 // Lower cap for constrained systems
let standardCap = 2 * 1024 * 1024 // Standard cap for non-constrained systems
// If we can't get memory info, return conservative default
// If we can't get memory info, return a reasonable default
guard result == KERN_SUCCESS else {
Logger.info("Could not get VM statistics, using default chunk size: \(defaultChunkSize) bytes")
return defaultChunkSize
}
// Calculate free memory in bytes using a fixed page size
// Standard page size on macOS is 4KB or 16KB
let pageSize = 4096 // Use a constant instead of vm_kernel_page_size
// Calculate free memory in bytes
let pageSize = 4096 // Use a constant page size assumption
let freeMemory = UInt64(stats.free_count) * UInt64(pageSize)
let isConstrained = determineIfMemoryConstrained() // Check if generally constrained
// On very memory-constrained systems (< 1GB free), use the minimum
if freeMemory < 1_073_741_824 { // 1GB
return defaultChunkSize
// Extremely constrained (< 512MB free) -> use absolute minimum
if freeMemory < 536_870_912 { // 512MB
Logger.info("System extremely memory constrained (<512MB free), using minimum chunk size: \(safeMinimumChunkSize) bytes")
return safeMinimumChunkSize
}
// For systems with adequate memory, use a smarter sizing approach:
// - Use 0.1% of free memory, with limits
let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), 2 * 1024 * 1024)
// Generally constrained -> use adaptive size with lower cap
if isConstrained {
let adaptiveSize = min(max(Int(freeMemory / 1000), safeMinimumChunkSize), constrainedCap)
Logger.info("System memory constrained, using adaptive chunk size capped at \(constrainedCap) bytes: \(adaptiveSize) bytes")
return adaptiveSize
}
// Not constrained -> use original adaptive logic with standard cap
let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), standardCap)
Logger.info("System has sufficient memory, using adaptive chunk size capped at \(standardCap) bytes: \(adaptiveSize) bytes")
return adaptiveSize
}

View File

@@ -27,6 +27,7 @@ enum PullError: Error, LocalizedError {
case layerDownloadFailed(String)
case missingPart(Int)
case decompressionFailed(String)
case reassemblyFailed(String)
var errorDescription: String? {
switch self {
@@ -42,6 +43,8 @@ enum PullError: Error, LocalizedError {
return "Missing disk image part \(number)"
case .decompressionFailed(let filename):
return "Failed to decompress file: \(filename)"
case .reassemblyFailed(let reason):
return "Disk image reassembly failed: \(reason)."
}
}
}