mirror of
https://github.com/trycua/computer.git
synced 2026-02-24 07:19:30 -06:00
Merge pull request #102 from trycua/fix/lume/pull-cat-space
[Lume] Optimize VM image assembly
This commit is contained in:
@@ -48,16 +48,187 @@ actor ProgressTracker {
|
||||
private var progressLogger = ProgressLogger(threshold: 0.01)
|
||||
private var totalFiles: Int = 0
|
||||
private var completedFiles: Int = 0
|
||||
|
||||
|
||||
// Download speed tracking
|
||||
private var startTime: Date = Date()
|
||||
private var lastUpdateTime: Date = Date()
|
||||
private var lastUpdateBytes: Int64 = 0
|
||||
private var speedSamples: [Double] = []
|
||||
private var peakSpeed: Double = 0
|
||||
private var totalElapsedTime: TimeInterval = 0
|
||||
|
||||
func setTotal(_ total: Int64, files: Int) {
|
||||
totalBytes = total
|
||||
totalFiles = files
|
||||
startTime = Date()
|
||||
lastUpdateTime = startTime
|
||||
}
|
||||
|
||||
func addProgress(_ bytes: Int64) {
|
||||
downloadedBytes += bytes
|
||||
let progress = Double(downloadedBytes) / Double(totalBytes)
|
||||
progressLogger.logProgress(current: progress, context: "Downloading Image")
|
||||
let now = Date()
|
||||
let elapsed = now.timeIntervalSince(lastUpdateTime)
|
||||
|
||||
// Only update stats and display progress if enough time has passed (at least 0.5 seconds)
|
||||
if elapsed >= 0.5 {
|
||||
let currentSpeed = Double(downloadedBytes - lastUpdateBytes) / elapsed
|
||||
speedSamples.append(currentSpeed)
|
||||
|
||||
// Cap samples array to prevent memory growth
|
||||
if speedSamples.count > 20 {
|
||||
speedSamples.removeFirst(speedSamples.count - 20)
|
||||
}
|
||||
|
||||
// Update peak speed
|
||||
peakSpeed = max(peakSpeed, currentSpeed)
|
||||
|
||||
// Calculate average speed over the last few samples
|
||||
let recentAvgSpeed = calculateAverageSpeed()
|
||||
|
||||
// Calculate overall average
|
||||
let totalElapsed = now.timeIntervalSince(startTime)
|
||||
let overallAvgSpeed = totalElapsed > 0 ? Double(downloadedBytes) / totalElapsed : 0
|
||||
|
||||
let progress = Double(downloadedBytes) / Double(totalBytes)
|
||||
logSpeedProgress(
|
||||
current: progress,
|
||||
currentSpeed: currentSpeed,
|
||||
averageSpeed: recentAvgSpeed,
|
||||
overallSpeed: overallAvgSpeed,
|
||||
peakSpeed: peakSpeed,
|
||||
context: "Downloading Image"
|
||||
)
|
||||
|
||||
// Update tracking variables
|
||||
lastUpdateTime = now
|
||||
lastUpdateBytes = downloadedBytes
|
||||
totalElapsedTime = totalElapsed
|
||||
}
|
||||
}
|
||||
|
||||
private func calculateAverageSpeed() -> Double {
|
||||
guard !speedSamples.isEmpty else { return 0 }
|
||||
// Use the most recent samples (up to last 5)
|
||||
let samples = speedSamples.suffix(min(5, speedSamples.count))
|
||||
return samples.reduce(0, +) / Double(samples.count)
|
||||
}
|
||||
|
||||
func getDownloadStats() -> DownloadStats {
|
||||
let avgSpeed = totalElapsedTime > 0 ? Double(downloadedBytes) / totalElapsedTime : 0
|
||||
return DownloadStats(
|
||||
totalBytes: totalBytes,
|
||||
downloadedBytes: downloadedBytes,
|
||||
elapsedTime: totalElapsedTime,
|
||||
averageSpeed: avgSpeed,
|
||||
peakSpeed: peakSpeed
|
||||
)
|
||||
}
|
||||
|
||||
private func logSpeedProgress(
|
||||
current: Double,
|
||||
currentSpeed: Double,
|
||||
averageSpeed: Double,
|
||||
overallSpeed: Double,
|
||||
peakSpeed: Double,
|
||||
context: String
|
||||
) {
|
||||
let progressPercent = Int(current * 100)
|
||||
let currentSpeedStr = formatByteSpeed(currentSpeed)
|
||||
let avgSpeedStr = formatByteSpeed(averageSpeed)
|
||||
let peakSpeedStr = formatByteSpeed(peakSpeed)
|
||||
|
||||
// Calculate ETA based on overall average speed
|
||||
let remainingBytes = totalBytes - downloadedBytes
|
||||
let etaSeconds = overallSpeed > 0 ? Double(remainingBytes) / overallSpeed : 0
|
||||
let etaStr = formatTimeRemaining(etaSeconds)
|
||||
|
||||
let progressBar = createProgressBar(progress: current)
|
||||
|
||||
print("\r\(progressBar) \(progressPercent)% | Current: \(currentSpeedStr) | Avg: \(avgSpeedStr) | Peak: \(peakSpeedStr) | ETA: \(etaStr) ", terminator: "")
|
||||
fflush(stdout)
|
||||
}
|
||||
|
||||
private func createProgressBar(progress: Double, width: Int = 20) -> String {
|
||||
let completedWidth = Int(progress * Double(width))
|
||||
let remainingWidth = width - completedWidth
|
||||
|
||||
let completed = String(repeating: "█", count: completedWidth)
|
||||
let remaining = String(repeating: "░", count: remainingWidth)
|
||||
|
||||
return "[\(completed)\(remaining)]"
|
||||
}
|
||||
|
||||
private func formatByteSpeed(_ bytesPerSecond: Double) -> String {
|
||||
let units = ["B/s", "KB/s", "MB/s", "GB/s"]
|
||||
var speed = bytesPerSecond
|
||||
var unitIndex = 0
|
||||
|
||||
while speed > 1024 && unitIndex < units.count - 1 {
|
||||
speed /= 1024
|
||||
unitIndex += 1
|
||||
}
|
||||
|
||||
return String(format: "%.1f %@", speed, units[unitIndex])
|
||||
}
|
||||
|
||||
private func formatTimeRemaining(_ seconds: Double) -> String {
|
||||
if seconds.isNaN || seconds.isInfinite || seconds <= 0 {
|
||||
return "calculating..."
|
||||
}
|
||||
|
||||
let hours = Int(seconds) / 3600
|
||||
let minutes = (Int(seconds) % 3600) / 60
|
||||
let secs = Int(seconds) % 60
|
||||
|
||||
if hours > 0 {
|
||||
return String(format: "%d:%02d:%02d", hours, minutes, secs)
|
||||
} else {
|
||||
return String(format: "%d:%02d", minutes, secs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct DownloadStats {
|
||||
let totalBytes: Int64
|
||||
let downloadedBytes: Int64
|
||||
let elapsedTime: TimeInterval
|
||||
let averageSpeed: Double
|
||||
let peakSpeed: Double
|
||||
|
||||
func formattedSummary() -> String {
|
||||
let bytesStr = ByteCountFormatter.string(fromByteCount: downloadedBytes, countStyle: .file)
|
||||
let avgSpeedStr = formatSpeed(averageSpeed)
|
||||
let peakSpeedStr = formatSpeed(peakSpeed)
|
||||
let timeStr = formatTime(elapsedTime)
|
||||
|
||||
return """
|
||||
Download Statistics:
|
||||
- Total downloaded: \(bytesStr)
|
||||
- Elapsed time: \(timeStr)
|
||||
- Average speed: \(avgSpeedStr)
|
||||
- Peak speed: \(peakSpeedStr)
|
||||
"""
|
||||
}
|
||||
|
||||
private func formatSpeed(_ bytesPerSecond: Double) -> String {
|
||||
let formatter = ByteCountFormatter()
|
||||
formatter.countStyle = .file
|
||||
let bytesStr = formatter.string(fromByteCount: Int64(bytesPerSecond))
|
||||
return "\(bytesStr)/s"
|
||||
}
|
||||
|
||||
private func formatTime(_ seconds: TimeInterval) -> String {
|
||||
let hours = Int(seconds) / 3600
|
||||
let minutes = (Int(seconds) % 3600) / 60
|
||||
let secs = Int(seconds) % 60
|
||||
|
||||
if hours > 0 {
|
||||
return String(format: "%d hours, %d minutes, %d seconds", hours, minutes, secs)
|
||||
} else if minutes > 0 {
|
||||
return String(format: "%d minutes, %d seconds", minutes, secs)
|
||||
} else {
|
||||
return String(format: "%d seconds", secs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,17 +512,17 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
)
|
||||
var diskParts: [(Int, URL)] = []
|
||||
var totalParts = 0
|
||||
let maxConcurrentTasks = 5
|
||||
let counter = TaskCounter()
|
||||
|
||||
// Use a more efficient approach for memory-constrained systems
|
||||
|
||||
// Adaptive concurrency based on system capabilities
|
||||
let memoryConstrained = determineIfMemoryConstrained()
|
||||
// Adjust concurrency based on memory constraints
|
||||
let actualMaxConcurrentTasks = memoryConstrained ? 2 : maxConcurrentTasks
|
||||
let networkQuality = determineNetworkQuality()
|
||||
let maxConcurrentTasks = calculateOptimalConcurrency(memoryConstrained: memoryConstrained, networkQuality: networkQuality)
|
||||
|
||||
Logger.info(
|
||||
memoryConstrained
|
||||
? "Using memory-optimized mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))"
|
||||
: "Using standard mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))")
|
||||
"Using adaptive download configuration: Concurrency=\(maxConcurrentTasks), Memory-optimized=\(memoryConstrained)"
|
||||
)
|
||||
|
||||
let counter = TaskCounter()
|
||||
|
||||
try await withThrowingTaskGroup(of: Int64.self) { group in
|
||||
for layer in manifest.layers {
|
||||
@@ -359,7 +530,7 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
continue
|
||||
}
|
||||
|
||||
while await counter.current() >= actualMaxConcurrentTasks { // Use adjusted concurrency
|
||||
while await counter.current() >= maxConcurrentTasks {
|
||||
_ = try await group.next()
|
||||
await counter.decrement()
|
||||
}
|
||||
@@ -523,9 +694,13 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
}
|
||||
Logger.info("") // New line after progress
|
||||
|
||||
// Display download statistics
|
||||
let stats = await progress.getDownloadStats()
|
||||
Logger.info(stats.formattedSummary())
|
||||
|
||||
// Handle disk parts if present
|
||||
if !diskParts.isEmpty {
|
||||
Logger.info("Reassembling disk image using external 'cat' command...")
|
||||
Logger.info("Reassembling disk image using sparse file technique...")
|
||||
let outputURL = tempVMDir.appendingPathComponent("disk.img")
|
||||
try FileManager.default.createDirectory(
|
||||
at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true)
|
||||
@@ -534,79 +709,93 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
if FileManager.default.fileExists(atPath: outputURL.path) {
|
||||
try FileManager.default.removeItem(at: outputURL)
|
||||
}
|
||||
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
|
||||
|
||||
// CORRECT LOGIC for new pull: Calculate expected size from the manifest layers
|
||||
|
||||
// Calculate expected size from the manifest layers
|
||||
let expectedTotalSize = UInt64(
|
||||
manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0) { $0 + $1.size }
|
||||
)
|
||||
Logger.info(
|
||||
"Expected final size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
|
||||
)
|
||||
|
||||
// Prepare arguments for cat
|
||||
var catArgs: [String] = []
|
||||
var partURLsToDelete: [URL] = [] // Keep track of non-cached parts to delete
|
||||
|
||||
// Create sparse file of the required size
|
||||
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
|
||||
let outputHandle = try FileHandle(forWritingTo: outputURL)
|
||||
|
||||
// Set the file size without writing data (creates a sparse file)
|
||||
try outputHandle.truncate(atOffset: expectedTotalSize)
|
||||
|
||||
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05)
|
||||
var processedSize: UInt64 = 0
|
||||
|
||||
// Process each part in order
|
||||
for partNum in 1...totalParts {
|
||||
guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else {
|
||||
throw PullError.missingPart(partNum)
|
||||
}
|
||||
catArgs.append(partURL.path)
|
||||
// Only mark for deletion if it's not in the main cache dir
|
||||
if noCache || !partURL.path.contains(cacheDirectory.path) {
|
||||
partURLsToDelete.append(partURL)
|
||||
}
|
||||
}
|
||||
|
||||
// Execute cat command asynchronously
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: "/bin/cat")
|
||||
process.arguments = catArgs
|
||||
|
||||
// Redirect output to the final disk image file
|
||||
let outputFileHandle = try FileHandle(forWritingTo: outputURL)
|
||||
process.standardOutput = outputFileHandle
|
||||
|
||||
try process.run()
|
||||
|
||||
// Monitor progress by checking file size
|
||||
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var
|
||||
var currentSize: UInt64 = 0
|
||||
while process.isRunning {
|
||||
// Get current file size
|
||||
currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize
|
||||
|
||||
// Calculate and log progress
|
||||
if expectedTotalSize > 0 {
|
||||
let progressValue = Double(currentSize) / Double(expectedTotalSize)
|
||||
reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image")
|
||||
Logger.info("Processing part \(partNum) of \(totalParts): \(partURL.lastPathComponent)")
|
||||
|
||||
// Get part file size
|
||||
let partAttributes = try FileManager.default.attributesOfItem(atPath: partURL.path)
|
||||
let partSize = partAttributes[.size] as? UInt64 ?? 0
|
||||
|
||||
// Calculate the offset in the final file (parts are sequential)
|
||||
let partOffset = processedSize
|
||||
|
||||
// Open input file
|
||||
let inputHandle = try FileHandle(forReadingFrom: partURL)
|
||||
defer { try? inputHandle.close() }
|
||||
|
||||
// Seek to the appropriate offset in output file
|
||||
try outputHandle.seek(toOffset: partOffset)
|
||||
|
||||
// Copy data in chunks to avoid memory issues
|
||||
let chunkSize: UInt64 = determineIfMemoryConstrained() ? 256 * 1024 : 1024 * 1024 // Use smaller chunks (256KB-1MB)
|
||||
var bytesWritten: UInt64 = 0
|
||||
|
||||
while bytesWritten < partSize {
|
||||
// Use Foundation's autoreleasepool for proper memory management
|
||||
Foundation.autoreleasepool {
|
||||
let readSize: UInt64 = min(UInt64(chunkSize), partSize - bytesWritten)
|
||||
if let chunk = try? inputHandle.read(upToCount: Int(readSize)) {
|
||||
if !chunk.isEmpty {
|
||||
try? outputHandle.write(contentsOf: chunk)
|
||||
bytesWritten += UInt64(chunk.count)
|
||||
|
||||
// Update progress less frequently to reduce overhead
|
||||
if bytesWritten % (chunkSize * 4) == 0 || bytesWritten == partSize {
|
||||
let totalProgress = Double(processedSize + bytesWritten) / Double(expectedTotalSize)
|
||||
reassemblyProgressLogger.logProgress(current: totalProgress, context: "Reassembling disk image")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a small delay every few MB to allow memory cleanup
|
||||
if bytesWritten % (chunkSize * 16) == 0 && bytesWritten > 0 {
|
||||
Thread.sleep(forTimeInterval: 0.01)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a bit before checking again
|
||||
try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds
|
||||
}
|
||||
// Log 100% completion explicitly
|
||||
if expectedTotalSize > 0 {
|
||||
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image")
|
||||
// Update processed size
|
||||
processedSize += partSize
|
||||
|
||||
// Delete part file if it's not from cache to save space immediately
|
||||
if noCache || !partURL.path.contains(cacheDirectory.path) {
|
||||
try? FileManager.default.removeItem(at: partURL)
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize progress
|
||||
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image")
|
||||
Logger.info("") // Newline after progress
|
||||
|
||||
// Ensure handle is closed AFTER process finishes
|
||||
try? outputFileHandle.close()
|
||||
|
||||
// Check termination status AFTER process finishes
|
||||
let terminationStatus = process.terminationStatus
|
||||
|
||||
// Clean up temporary part files
|
||||
for partURL in partURLsToDelete {
|
||||
try? FileManager.default.removeItem(at: partURL)
|
||||
}
|
||||
|
||||
guard terminationStatus == 0 else {
|
||||
throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)")
|
||||
}
|
||||
|
||||
// Verify final size (already done after loop essentially, but good to check)
|
||||
|
||||
// Close the output file
|
||||
try outputHandle.synchronize()
|
||||
try outputHandle.close()
|
||||
|
||||
// Verify final size
|
||||
let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0
|
||||
Logger.info(
|
||||
"Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
|
||||
@@ -618,7 +807,7 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
)
|
||||
}
|
||||
|
||||
Logger.info("Disk image reassembled successfully using 'cat'")
|
||||
Logger.info("Disk image reassembled successfully using sparse file technique")
|
||||
} else {
|
||||
// Copy single disk image if it exists
|
||||
let diskURL = tempDownloadDir.appendingPathComponent("disk.img")
|
||||
@@ -698,98 +887,109 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
// Reassemble disk parts if needed
|
||||
if !diskPartSources.isEmpty {
|
||||
Logger.info(
|
||||
"Reassembling disk image from cached parts using external 'cat' command (optimized storage)..."
|
||||
"Reassembling disk image from cached parts using sparse file technique..."
|
||||
)
|
||||
let outputURL = destination.appendingPathComponent("disk.img")
|
||||
|
||||
// Ensure the output file exists but is empty
|
||||
// Ensure the output file exists but is empty
|
||||
if FileManager.default.fileExists(atPath: outputURL.path) {
|
||||
try FileManager.default.removeItem(at: outputURL)
|
||||
}
|
||||
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
|
||||
|
||||
// Explicitly type accumulator (acc: UInt64) and element in the closure
|
||||
|
||||
// Calculate expected total size from the cached files
|
||||
let expectedTotalSize: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in
|
||||
let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0
|
||||
return acc + fileSize
|
||||
}
|
||||
Logger.info(
|
||||
"Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
|
||||
)
|
||||
Logger.info(
|
||||
"Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))"
|
||||
)
|
||||
|
||||
|
||||
// Prepare arguments for cat, reading directly from cache
|
||||
var catArgs: [String] = []
|
||||
// Create sparse file of the required size
|
||||
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
|
||||
let outputHandle = try FileHandle(forWritingTo: outputURL)
|
||||
|
||||
// Set the file size without writing data (creates a sparse file)
|
||||
try outputHandle.truncate(atOffset: expectedTotalSize)
|
||||
|
||||
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05)
|
||||
var processedSize: UInt64 = 0
|
||||
|
||||
// Process each part in order
|
||||
for partNum in 1...totalParts {
|
||||
guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else {
|
||||
throw PullError.missingPart(partNum)
|
||||
}
|
||||
catArgs.append(sourceURL.path)
|
||||
}
|
||||
|
||||
// Execute cat command asynchronously
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: "/bin/cat")
|
||||
process.arguments = catArgs
|
||||
|
||||
// Redirect output to the final disk image file
|
||||
let outputFileHandle = try FileHandle(forWritingTo: outputURL)
|
||||
process.standardOutput = outputFileHandle
|
||||
|
||||
try process.run()
|
||||
|
||||
// Monitor progress by checking file size
|
||||
var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var
|
||||
var currentSize: UInt64 = 0
|
||||
while process.isRunning {
|
||||
// Get current file size
|
||||
currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize
|
||||
|
||||
// Calculate and log progress
|
||||
if expectedTotalSize > 0 {
|
||||
let progressValue = Double(currentSize) / Double(expectedTotalSize)
|
||||
reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image from cache")
|
||||
Logger.info("Processing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent)")
|
||||
|
||||
// Get part file size
|
||||
let partAttributes = try FileManager.default.attributesOfItem(atPath: sourceURL.path)
|
||||
let partSize = partAttributes[.size] as? UInt64 ?? 0
|
||||
|
||||
// Calculate the offset in the final file (parts are sequential)
|
||||
let partOffset = processedSize
|
||||
|
||||
// Open input file
|
||||
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
|
||||
defer { try? inputHandle.close() }
|
||||
|
||||
// Seek to the appropriate offset in output file
|
||||
try outputHandle.seek(toOffset: partOffset)
|
||||
|
||||
// Copy data in chunks to avoid memory issues
|
||||
let chunkSize: UInt64 = determineIfMemoryConstrained() ? 256 * 1024 : 1024 * 1024 // Use smaller chunks (256KB-1MB)
|
||||
var bytesWritten: UInt64 = 0
|
||||
|
||||
while bytesWritten < partSize {
|
||||
// Use Foundation's autoreleasepool for proper memory management
|
||||
Foundation.autoreleasepool {
|
||||
let readSize: UInt64 = min(UInt64(chunkSize), partSize - bytesWritten)
|
||||
if let chunk = try? inputHandle.read(upToCount: Int(readSize)) {
|
||||
if !chunk.isEmpty {
|
||||
try? outputHandle.write(contentsOf: chunk)
|
||||
bytesWritten += UInt64(chunk.count)
|
||||
|
||||
// Update progress less frequently to reduce overhead
|
||||
if bytesWritten % (chunkSize * 4) == 0 || bytesWritten == partSize {
|
||||
let totalProgress = Double(processedSize + bytesWritten) / Double(expectedTotalSize)
|
||||
reassemblyProgressLogger.logProgress(current: totalProgress, context: "Reassembling disk image from cache")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a small delay every few MB to allow memory cleanup
|
||||
if bytesWritten % (chunkSize * 16) == 0 && bytesWritten > 0 {
|
||||
Thread.sleep(forTimeInterval: 0.01)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a bit before checking again
|
||||
try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds
|
||||
// Update processed size
|
||||
processedSize += partSize
|
||||
}
|
||||
// Log 100% completion explicitly
|
||||
if expectedTotalSize > 0 {
|
||||
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache")
|
||||
}
|
||||
Logger.info("") // Newline after progress
|
||||
|
||||
// Ensure handle is closed AFTER process finishes
|
||||
try? outputFileHandle.close()
|
||||
|
||||
// Check termination status AFTER process finishes
|
||||
let terminationStatus = process.terminationStatus
|
||||
|
||||
guard terminationStatus == 0 else {
|
||||
throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)")
|
||||
}
|
||||
|
||||
|
||||
// Verify final size (already done after loop essentially, but good to check)
|
||||
|
||||
// Finalize progress
|
||||
reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache")
|
||||
Logger.info("") // Newline after progress
|
||||
|
||||
// Close the output file
|
||||
try outputHandle.synchronize()
|
||||
try outputHandle.close()
|
||||
|
||||
// Verify final size
|
||||
let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0
|
||||
Logger.info(
|
||||
"Final disk image size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))"
|
||||
)
|
||||
|
||||
if finalSize != expectedTotalSize {
|
||||
// Calculate expected size again for the warning message consistency
|
||||
// Explicitly type accumulator (acc: UInt64) and element in the closure
|
||||
let expectedSizeForWarning: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in
|
||||
let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0
|
||||
return acc + fileSize
|
||||
}
|
||||
Logger.info(
|
||||
"Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedSizeForWarning) bytes)"
|
||||
"Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedTotalSize) bytes)"
|
||||
)
|
||||
}
|
||||
|
||||
Logger.info("Disk image reassembled successfully from cache using 'cat'")
|
||||
Logger.info("Disk image reassembled successfully from cache using sparse file technique")
|
||||
}
|
||||
|
||||
Logger.info("Cache copy complete")
|
||||
@@ -849,12 +1049,22 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
request.addValue(mediaType, forHTTPHeaderField: "Accept")
|
||||
request.timeoutInterval = 60
|
||||
|
||||
// Configure session for better reliability
|
||||
// Optimized session configuration for speed
|
||||
let config = URLSessionConfiguration.default
|
||||
config.timeoutIntervalForRequest = 60
|
||||
config.timeoutIntervalForResource = 3600
|
||||
config.waitsForConnectivity = true
|
||||
config.httpMaximumConnectionsPerHost = 1
|
||||
|
||||
// Performance optimizations
|
||||
config.httpMaximumConnectionsPerHost = 6
|
||||
config.httpShouldUsePipelining = true
|
||||
config.requestCachePolicy = .reloadIgnoringLocalCacheData
|
||||
|
||||
// Network service type optimization
|
||||
if getTCPReceiveWindowSize() != nil {
|
||||
// If we can get TCP window size, the system supports advanced networking
|
||||
config.networkServiceType = .responsiveData
|
||||
}
|
||||
|
||||
let session = URLSession(configuration: config)
|
||||
|
||||
@@ -874,8 +1084,13 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
} catch {
|
||||
lastError = error
|
||||
if attempt < maxRetries {
|
||||
let delay = Double(attempt) * 5
|
||||
// Exponential backoff with jitter for retries
|
||||
let baseDelay = Double(attempt) * 2
|
||||
let jitter = Double.random(in: 0...1)
|
||||
let delay = baseDelay + jitter
|
||||
try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000))
|
||||
|
||||
Logger.info("Retrying download (attempt \(attempt+1)/\(maxRetries)): \(digest)")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1162,4 +1377,101 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
// Consider memory constrained if less than 2GB free
|
||||
return freeMemory < 2_147_483_648 // 2GB
|
||||
}
|
||||
|
||||
// Helper method to determine network quality
|
||||
private func determineNetworkQuality() -> Int {
|
||||
// Default quality is medium (3)
|
||||
var quality = 3
|
||||
|
||||
// A simple ping test to determine network quality
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: "/sbin/ping")
|
||||
process.arguments = ["-c", "3", "-q", self.registry]
|
||||
|
||||
let outputPipe = Pipe()
|
||||
process.standardOutput = outputPipe
|
||||
process.standardError = outputPipe
|
||||
|
||||
do {
|
||||
try process.run()
|
||||
process.waitUntilExit()
|
||||
|
||||
let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data()
|
||||
if let output = String(data: outputData, encoding: .utf8) {
|
||||
// Check for average ping time
|
||||
if let avgTimeRange = output.range(of: "= [0-9.]+/([0-9.]+)/", options: .regularExpression) {
|
||||
let avgSubstring = output[avgTimeRange]
|
||||
if let avgString = avgSubstring.split(separator: "/").dropFirst().first,
|
||||
let avgTime = Double(avgString) {
|
||||
|
||||
// Classify network quality based on ping time
|
||||
if avgTime < 50 {
|
||||
quality = 5 // Excellent
|
||||
} else if avgTime < 100 {
|
||||
quality = 4 // Good
|
||||
} else if avgTime < 200 {
|
||||
quality = 3 // Average
|
||||
} else if avgTime < 300 {
|
||||
quality = 2 // Poor
|
||||
} else {
|
||||
quality = 1 // Very poor
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Default to medium if ping fails
|
||||
Logger.info("Failed to determine network quality, using default settings")
|
||||
}
|
||||
|
||||
return quality
|
||||
}
|
||||
|
||||
// Helper method to calculate optimal concurrency based on system capabilities
|
||||
private func calculateOptimalConcurrency(memoryConstrained: Bool, networkQuality: Int) -> Int {
|
||||
// Base concurrency based on network quality (1-5)
|
||||
let baseThreads = min(networkQuality * 2, 8)
|
||||
|
||||
if memoryConstrained {
|
||||
// Reduce concurrency for memory-constrained systems
|
||||
return max(2, baseThreads / 2)
|
||||
}
|
||||
|
||||
// Physical cores available on the system
|
||||
let cores = ProcessInfo.processInfo.processorCount
|
||||
|
||||
// Adaptive approach: 1-2 threads per core depending on network quality
|
||||
let threadsPerCore = (networkQuality >= 4) ? 2 : 1
|
||||
let systemBasedThreads = min(cores * threadsPerCore, 12)
|
||||
|
||||
// Take the larger of network-based and system-based concurrency
|
||||
return max(baseThreads, systemBasedThreads)
|
||||
}
|
||||
|
||||
// Helper to get optimal TCP window size
|
||||
private func getTCPReceiveWindowSize() -> Int? {
|
||||
// Try to query system TCP window size
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: "/usr/sbin/sysctl")
|
||||
process.arguments = ["net.inet.tcp.recvspace"]
|
||||
|
||||
let outputPipe = Pipe()
|
||||
process.standardOutput = outputPipe
|
||||
|
||||
do {
|
||||
try process.run()
|
||||
process.waitUntilExit()
|
||||
|
||||
let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data()
|
||||
if let output = String(data: outputData, encoding: .utf8),
|
||||
let valueStr = output.split(separator: ":").last?.trimmingCharacters(in: .whitespacesAndNewlines),
|
||||
let value = Int(valueStr) {
|
||||
return value
|
||||
}
|
||||
} catch {
|
||||
// Ignore errors, we'll use defaults
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user