From b5f8a647b0ccea9b717e14e1690f36055879759c Mon Sep 17 00:00:00 2001 From: f-trycua Date: Sat, 12 Apr 2025 20:35:26 +0200 Subject: [PATCH] Optimize VM image assembly --- .../ImageContainerRegistry.swift | 590 +++++++++++++----- 1 file changed, 451 insertions(+), 139 deletions(-) diff --git a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift index 25271641..880a8a0a 100644 --- a/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift +++ b/libs/lume/src/ContainerRegistry/ImageContainerRegistry.swift @@ -48,16 +48,187 @@ actor ProgressTracker { private var progressLogger = ProgressLogger(threshold: 0.01) private var totalFiles: Int = 0 private var completedFiles: Int = 0 - + + // Download speed tracking + private var startTime: Date = Date() + private var lastUpdateTime: Date = Date() + private var lastUpdateBytes: Int64 = 0 + private var speedSamples: [Double] = [] + private var peakSpeed: Double = 0 + private var totalElapsedTime: TimeInterval = 0 + func setTotal(_ total: Int64, files: Int) { totalBytes = total totalFiles = files + startTime = Date() + lastUpdateTime = startTime } func addProgress(_ bytes: Int64) { downloadedBytes += bytes - let progress = Double(downloadedBytes) / Double(totalBytes) - progressLogger.logProgress(current: progress, context: "Downloading Image") + let now = Date() + let elapsed = now.timeIntervalSince(lastUpdateTime) + + // Only update stats and display progress if enough time has passed (at least 0.5 seconds) + if elapsed >= 0.5 { + let currentSpeed = Double(downloadedBytes - lastUpdateBytes) / elapsed + speedSamples.append(currentSpeed) + + // Cap samples array to prevent memory growth + if speedSamples.count > 20 { + speedSamples.removeFirst(speedSamples.count - 20) + } + + // Update peak speed + peakSpeed = max(peakSpeed, currentSpeed) + + // Calculate average speed over the last few samples + let recentAvgSpeed = calculateAverageSpeed() + + // Calculate overall average + let totalElapsed = now.timeIntervalSince(startTime) + let overallAvgSpeed = totalElapsed > 0 ? Double(downloadedBytes) / totalElapsed : 0 + + let progress = Double(downloadedBytes) / Double(totalBytes) + logSpeedProgress( + current: progress, + currentSpeed: currentSpeed, + averageSpeed: recentAvgSpeed, + overallSpeed: overallAvgSpeed, + peakSpeed: peakSpeed, + context: "Downloading Image" + ) + + // Update tracking variables + lastUpdateTime = now + lastUpdateBytes = downloadedBytes + totalElapsedTime = totalElapsed + } + } + + private func calculateAverageSpeed() -> Double { + guard !speedSamples.isEmpty else { return 0 } + // Use the most recent samples (up to last 5) + let samples = speedSamples.suffix(min(5, speedSamples.count)) + return samples.reduce(0, +) / Double(samples.count) + } + + func getDownloadStats() -> DownloadStats { + let avgSpeed = totalElapsedTime > 0 ? Double(downloadedBytes) / totalElapsedTime : 0 + return DownloadStats( + totalBytes: totalBytes, + downloadedBytes: downloadedBytes, + elapsedTime: totalElapsedTime, + averageSpeed: avgSpeed, + peakSpeed: peakSpeed + ) + } + + private func logSpeedProgress( + current: Double, + currentSpeed: Double, + averageSpeed: Double, + overallSpeed: Double, + peakSpeed: Double, + context: String + ) { + let progressPercent = Int(current * 100) + let currentSpeedStr = formatByteSpeed(currentSpeed) + let avgSpeedStr = formatByteSpeed(averageSpeed) + let peakSpeedStr = formatByteSpeed(peakSpeed) + + // Calculate ETA based on overall average speed + let remainingBytes = totalBytes - downloadedBytes + let etaSeconds = overallSpeed > 0 ? Double(remainingBytes) / overallSpeed : 0 + let etaStr = formatTimeRemaining(etaSeconds) + + let progressBar = createProgressBar(progress: current) + + print("\r\(progressBar) \(progressPercent)% | Current: \(currentSpeedStr) | Avg: \(avgSpeedStr) | Peak: \(peakSpeedStr) | ETA: \(etaStr) ", terminator: "") + fflush(stdout) + } + + private func createProgressBar(progress: Double, width: Int = 20) -> String { + let completedWidth = Int(progress * Double(width)) + let remainingWidth = width - completedWidth + + let completed = String(repeating: "█", count: completedWidth) + let remaining = String(repeating: "░", count: remainingWidth) + + return "[\(completed)\(remaining)]" + } + + private func formatByteSpeed(_ bytesPerSecond: Double) -> String { + let units = ["B/s", "KB/s", "MB/s", "GB/s"] + var speed = bytesPerSecond + var unitIndex = 0 + + while speed > 1024 && unitIndex < units.count - 1 { + speed /= 1024 + unitIndex += 1 + } + + return String(format: "%.1f %@", speed, units[unitIndex]) + } + + private func formatTimeRemaining(_ seconds: Double) -> String { + if seconds.isNaN || seconds.isInfinite || seconds <= 0 { + return "calculating..." + } + + let hours = Int(seconds) / 3600 + let minutes = (Int(seconds) % 3600) / 60 + let secs = Int(seconds) % 60 + + if hours > 0 { + return String(format: "%d:%02d:%02d", hours, minutes, secs) + } else { + return String(format: "%d:%02d", minutes, secs) + } + } +} + +struct DownloadStats { + let totalBytes: Int64 + let downloadedBytes: Int64 + let elapsedTime: TimeInterval + let averageSpeed: Double + let peakSpeed: Double + + func formattedSummary() -> String { + let bytesStr = ByteCountFormatter.string(fromByteCount: downloadedBytes, countStyle: .file) + let avgSpeedStr = formatSpeed(averageSpeed) + let peakSpeedStr = formatSpeed(peakSpeed) + let timeStr = formatTime(elapsedTime) + + return """ + Download Statistics: + - Total downloaded: \(bytesStr) + - Elapsed time: \(timeStr) + - Average speed: \(avgSpeedStr) + - Peak speed: \(peakSpeedStr) + """ + } + + private func formatSpeed(_ bytesPerSecond: Double) -> String { + let formatter = ByteCountFormatter() + formatter.countStyle = .file + let bytesStr = formatter.string(fromByteCount: Int64(bytesPerSecond)) + return "\(bytesStr)/s" + } + + private func formatTime(_ seconds: TimeInterval) -> String { + let hours = Int(seconds) / 3600 + let minutes = (Int(seconds) % 3600) / 60 + let secs = Int(seconds) % 60 + + if hours > 0 { + return String(format: "%d hours, %d minutes, %d seconds", hours, minutes, secs) + } else if minutes > 0 { + return String(format: "%d minutes, %d seconds", minutes, secs) + } else { + return String(format: "%d seconds", secs) + } } } @@ -341,17 +512,17 @@ class ImageContainerRegistry: @unchecked Sendable { ) var diskParts: [(Int, URL)] = [] var totalParts = 0 - let maxConcurrentTasks = 5 - let counter = TaskCounter() - - // Use a more efficient approach for memory-constrained systems + + // Adaptive concurrency based on system capabilities let memoryConstrained = determineIfMemoryConstrained() - // Adjust concurrency based on memory constraints - let actualMaxConcurrentTasks = memoryConstrained ? 2 : maxConcurrentTasks + let networkQuality = determineNetworkQuality() + let maxConcurrentTasks = calculateOptimalConcurrency(memoryConstrained: memoryConstrained, networkQuality: networkQuality) + Logger.info( - memoryConstrained - ? "Using memory-optimized mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))" - : "Using standard mode for disk parts (Concurrency: \(actualMaxConcurrentTasks))") + "Using adaptive download configuration: Concurrency=\(maxConcurrentTasks), Memory-optimized=\(memoryConstrained)" + ) + + let counter = TaskCounter() try await withThrowingTaskGroup(of: Int64.self) { group in for layer in manifest.layers { @@ -359,7 +530,7 @@ class ImageContainerRegistry: @unchecked Sendable { continue } - while await counter.current() >= actualMaxConcurrentTasks { // Use adjusted concurrency + while await counter.current() >= maxConcurrentTasks { _ = try await group.next() await counter.decrement() } @@ -523,9 +694,13 @@ class ImageContainerRegistry: @unchecked Sendable { } Logger.info("") // New line after progress + // Display download statistics + let stats = await progress.getDownloadStats() + Logger.info(stats.formattedSummary()) + // Handle disk parts if present if !diskParts.isEmpty { - Logger.info("Reassembling disk image using external 'cat' command...") + Logger.info("Reassembling disk image using sparse file technique...") let outputURL = tempVMDir.appendingPathComponent("disk.img") try FileManager.default.createDirectory( at: outputURL.deletingLastPathComponent(), withIntermediateDirectories: true) @@ -534,79 +709,93 @@ class ImageContainerRegistry: @unchecked Sendable { if FileManager.default.fileExists(atPath: outputURL.path) { try FileManager.default.removeItem(at: outputURL) } - FileManager.default.createFile(atPath: outputURL.path, contents: nil) - - // CORRECT LOGIC for new pull: Calculate expected size from the manifest layers + + // Calculate expected size from the manifest layers let expectedTotalSize = UInt64( manifest.layers.filter { extractPartInfo(from: $0.mediaType) != nil }.reduce(0) { $0 + $1.size } ) Logger.info( "Expected final size: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" ) - - // Prepare arguments for cat - var catArgs: [String] = [] - var partURLsToDelete: [URL] = [] // Keep track of non-cached parts to delete + + // Create sparse file of the required size + FileManager.default.createFile(atPath: outputURL.path, contents: nil) + let outputHandle = try FileHandle(forWritingTo: outputURL) + + // Set the file size without writing data (creates a sparse file) + try outputHandle.truncate(atOffset: expectedTotalSize) + + var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) + var processedSize: UInt64 = 0 + + // Process each part in order for partNum in 1...totalParts { guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - catArgs.append(partURL.path) - // Only mark for deletion if it's not in the main cache dir - if noCache || !partURL.path.contains(cacheDirectory.path) { - partURLsToDelete.append(partURL) - } - } - - // Execute cat command asynchronously - let process = Process() - process.executableURL = URL(fileURLWithPath: "/bin/cat") - process.arguments = catArgs - - // Redirect output to the final disk image file - let outputFileHandle = try FileHandle(forWritingTo: outputURL) - process.standardOutput = outputFileHandle - - try process.run() - - // Monitor progress by checking file size - var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var - var currentSize: UInt64 = 0 - while process.isRunning { - // Get current file size - currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize - // Calculate and log progress - if expectedTotalSize > 0 { - let progressValue = Double(currentSize) / Double(expectedTotalSize) - reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image") + Logger.info("Processing part \(partNum) of \(totalParts): \(partURL.lastPathComponent)") + + // Get part file size + let partAttributes = try FileManager.default.attributesOfItem(atPath: partURL.path) + let partSize = partAttributes[.size] as? UInt64 ?? 0 + + // Calculate the offset in the final file (parts are sequential) + let partOffset = processedSize + + // Open input file + let inputHandle = try FileHandle(forReadingFrom: partURL) + defer { try? inputHandle.close() } + + // Seek to the appropriate offset in output file + try outputHandle.seek(toOffset: partOffset) + + // Copy data in chunks to avoid memory issues + let chunkSize: UInt64 = determineIfMemoryConstrained() ? 256 * 1024 : 1024 * 1024 // Use smaller chunks (256KB-1MB) + var bytesWritten: UInt64 = 0 + + while bytesWritten < partSize { + // Use Foundation's autoreleasepool for proper memory management + Foundation.autoreleasepool { + let readSize: UInt64 = min(UInt64(chunkSize), partSize - bytesWritten) + if let chunk = try? inputHandle.read(upToCount: Int(readSize)) { + if !chunk.isEmpty { + try? outputHandle.write(contentsOf: chunk) + bytesWritten += UInt64(chunk.count) + + // Update progress less frequently to reduce overhead + if bytesWritten % (chunkSize * 4) == 0 || bytesWritten == partSize { + let totalProgress = Double(processedSize + bytesWritten) / Double(expectedTotalSize) + reassemblyProgressLogger.logProgress(current: totalProgress, context: "Reassembling disk image") + } + } + } + + // Add a small delay every few MB to allow memory cleanup + if bytesWritten % (chunkSize * 16) == 0 && bytesWritten > 0 { + Thread.sleep(forTimeInterval: 0.01) + } + } } - // Wait a bit before checking again - try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds - } - // Log 100% completion explicitly - if expectedTotalSize > 0 { - reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image") + // Update processed size + processedSize += partSize + + // Delete part file if it's not from cache to save space immediately + if noCache || !partURL.path.contains(cacheDirectory.path) { + try? FileManager.default.removeItem(at: partURL) + } } + + // Finalize progress + reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image") Logger.info("") // Newline after progress - - // Ensure handle is closed AFTER process finishes - try? outputFileHandle.close() - - // Check termination status AFTER process finishes - let terminationStatus = process.terminationStatus - - // Clean up temporary part files - for partURL in partURLsToDelete { - try? FileManager.default.removeItem(at: partURL) - } - - guard terminationStatus == 0 else { - throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)") - } - - // Verify final size (already done after loop essentially, but good to check) + + // Close the output file + try outputHandle.synchronize() + try outputHandle.close() + + // Verify final size let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0 Logger.info( "Final disk image size: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))" @@ -618,7 +807,7 @@ class ImageContainerRegistry: @unchecked Sendable { ) } - Logger.info("Disk image reassembled successfully using 'cat'") + Logger.info("Disk image reassembled successfully using sparse file technique") } else { // Copy single disk image if it exists let diskURL = tempDownloadDir.appendingPathComponent("disk.img") @@ -698,98 +887,109 @@ class ImageContainerRegistry: @unchecked Sendable { // Reassemble disk parts if needed if !diskPartSources.isEmpty { Logger.info( - "Reassembling disk image from cached parts using external 'cat' command (optimized storage)..." + "Reassembling disk image from cached parts using sparse file technique..." ) let outputURL = destination.appendingPathComponent("disk.img") - // Ensure the output file exists but is empty + // Ensure the output file exists but is empty if FileManager.default.fileExists(atPath: outputURL.path) { try FileManager.default.removeItem(at: outputURL) } - FileManager.default.createFile(atPath: outputURL.path, contents: nil) - - // Explicitly type accumulator (acc: UInt64) and element in the closure + + // Calculate expected total size from the cached files let expectedTotalSize: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0 return acc + fileSize } - Logger.info( - "Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" - ) + Logger.info( + "Expected final size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(expectedTotalSize), countStyle: .file))" + ) - - // Prepare arguments for cat, reading directly from cache - var catArgs: [String] = [] + // Create sparse file of the required size + FileManager.default.createFile(atPath: outputURL.path, contents: nil) + let outputHandle = try FileHandle(forWritingTo: outputURL) + + // Set the file size without writing data (creates a sparse file) + try outputHandle.truncate(atOffset: expectedTotalSize) + + var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) + var processedSize: UInt64 = 0 + + // Process each part in order for partNum in 1...totalParts { guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else { throw PullError.missingPart(partNum) } - catArgs.append(sourceURL.path) - } - - // Execute cat command asynchronously - let process = Process() - process.executableURL = URL(fileURLWithPath: "/bin/cat") - process.arguments = catArgs - - // Redirect output to the final disk image file - let outputFileHandle = try FileHandle(forWritingTo: outputURL) - process.standardOutput = outputFileHandle - - try process.run() - - // Monitor progress by checking file size - var reassemblyProgressLogger = ProgressLogger(threshold: 0.05) // Change let to var - var currentSize: UInt64 = 0 - while process.isRunning { - // Get current file size - currentSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? currentSize - // Calculate and log progress - if expectedTotalSize > 0 { - let progressValue = Double(currentSize) / Double(expectedTotalSize) - reassemblyProgressLogger.logProgress(current: progressValue, context: "Reassembling disk image from cache") + Logger.info("Processing part \(partNum) of \(totalParts) from cache: \(sourceURL.lastPathComponent)") + + // Get part file size + let partAttributes = try FileManager.default.attributesOfItem(atPath: sourceURL.path) + let partSize = partAttributes[.size] as? UInt64 ?? 0 + + // Calculate the offset in the final file (parts are sequential) + let partOffset = processedSize + + // Open input file + let inputHandle = try FileHandle(forReadingFrom: sourceURL) + defer { try? inputHandle.close() } + + // Seek to the appropriate offset in output file + try outputHandle.seek(toOffset: partOffset) + + // Copy data in chunks to avoid memory issues + let chunkSize: UInt64 = determineIfMemoryConstrained() ? 256 * 1024 : 1024 * 1024 // Use smaller chunks (256KB-1MB) + var bytesWritten: UInt64 = 0 + + while bytesWritten < partSize { + // Use Foundation's autoreleasepool for proper memory management + Foundation.autoreleasepool { + let readSize: UInt64 = min(UInt64(chunkSize), partSize - bytesWritten) + if let chunk = try? inputHandle.read(upToCount: Int(readSize)) { + if !chunk.isEmpty { + try? outputHandle.write(contentsOf: chunk) + bytesWritten += UInt64(chunk.count) + + // Update progress less frequently to reduce overhead + if bytesWritten % (chunkSize * 4) == 0 || bytesWritten == partSize { + let totalProgress = Double(processedSize + bytesWritten) / Double(expectedTotalSize) + reassemblyProgressLogger.logProgress(current: totalProgress, context: "Reassembling disk image from cache") + } + } + } + + // Add a small delay every few MB to allow memory cleanup + if bytesWritten % (chunkSize * 16) == 0 && bytesWritten > 0 { + Thread.sleep(forTimeInterval: 0.01) + } + } } - // Wait a bit before checking again - try await Task.sleep(nanoseconds: 500_000_000) // 0.5 seconds + // Update processed size + processedSize += partSize } - // Log 100% completion explicitly - if expectedTotalSize > 0 { - reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache") - } - Logger.info("") // Newline after progress - - // Ensure handle is closed AFTER process finishes - try? outputFileHandle.close() - - // Check termination status AFTER process finishes - let terminationStatus = process.terminationStatus - - guard terminationStatus == 0 else { - throw PullError.reassemblyFailed("cat command failed with status \(terminationStatus)") - } - - - // Verify final size (already done after loop essentially, but good to check) + + // Finalize progress + reassemblyProgressLogger.logProgress(current: 1.0, context: "Reassembling disk image from cache") + Logger.info("") // Newline after progress + + // Close the output file + try outputHandle.synchronize() + try outputHandle.close() + + // Verify final size let finalSize = (try? FileManager.default.attributesOfItem(atPath: outputURL.path)[.size] as? UInt64) ?? 0 Logger.info( "Final disk image size from cache: \(ByteCountFormatter.string(fromByteCount: Int64(finalSize), countStyle: .file))" ) if finalSize != expectedTotalSize { - // Calculate expected size again for the warning message consistency - // Explicitly type accumulator (acc: UInt64) and element in the closure - let expectedSizeForWarning: UInt64 = diskPartSources.reduce(UInt64(0)) { (acc: UInt64, element) -> UInt64 in - let fileSize = (try? FileManager.default.attributesOfItem(atPath: element.1.path)[.size] as? UInt64 ?? 0) ?? 0 - return acc + fileSize - } Logger.info( - "Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedSizeForWarning) bytes)" + "Warning: Final size (\(finalSize) bytes) differs from expected size (\(expectedTotalSize) bytes)" ) } - Logger.info("Disk image reassembled successfully from cache using 'cat'") + Logger.info("Disk image reassembled successfully from cache using sparse file technique") } Logger.info("Cache copy complete") @@ -849,12 +1049,22 @@ class ImageContainerRegistry: @unchecked Sendable { request.addValue(mediaType, forHTTPHeaderField: "Accept") request.timeoutInterval = 60 - // Configure session for better reliability + // Optimized session configuration for speed let config = URLSessionConfiguration.default config.timeoutIntervalForRequest = 60 config.timeoutIntervalForResource = 3600 config.waitsForConnectivity = true - config.httpMaximumConnectionsPerHost = 1 + + // Performance optimizations + config.httpMaximumConnectionsPerHost = 6 + config.httpShouldUsePipelining = true + config.requestCachePolicy = .reloadIgnoringLocalCacheData + + // Network service type optimization + if getTCPReceiveWindowSize() != nil { + // If we can get TCP window size, the system supports advanced networking + config.networkServiceType = .responsiveData + } let session = URLSession(configuration: config) @@ -874,8 +1084,13 @@ class ImageContainerRegistry: @unchecked Sendable { } catch { lastError = error if attempt < maxRetries { - let delay = Double(attempt) * 5 + // Exponential backoff with jitter for retries + let baseDelay = Double(attempt) * 2 + let jitter = Double.random(in: 0...1) + let delay = baseDelay + jitter try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) + + Logger.info("Retrying download (attempt \(attempt+1)/\(maxRetries)): \(digest)") } } } @@ -1162,4 +1377,101 @@ class ImageContainerRegistry: @unchecked Sendable { // Consider memory constrained if less than 2GB free return freeMemory < 2_147_483_648 // 2GB } + + // Helper method to determine network quality + private func determineNetworkQuality() -> Int { + // Default quality is medium (3) + var quality = 3 + + // A simple ping test to determine network quality + let process = Process() + process.executableURL = URL(fileURLWithPath: "/sbin/ping") + process.arguments = ["-c", "3", "-q", self.registry] + + let outputPipe = Pipe() + process.standardOutput = outputPipe + process.standardError = outputPipe + + do { + try process.run() + process.waitUntilExit() + + let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data() + if let output = String(data: outputData, encoding: .utf8) { + // Check for average ping time + if let avgTimeRange = output.range(of: "= [0-9.]+/([0-9.]+)/", options: .regularExpression) { + let avgSubstring = output[avgTimeRange] + if let avgString = avgSubstring.split(separator: "/").dropFirst().first, + let avgTime = Double(avgString) { + + // Classify network quality based on ping time + if avgTime < 50 { + quality = 5 // Excellent + } else if avgTime < 100 { + quality = 4 // Good + } else if avgTime < 200 { + quality = 3 // Average + } else if avgTime < 300 { + quality = 2 // Poor + } else { + quality = 1 // Very poor + } + } + } + } + } catch { + // Default to medium if ping fails + Logger.info("Failed to determine network quality, using default settings") + } + + return quality + } + + // Helper method to calculate optimal concurrency based on system capabilities + private func calculateOptimalConcurrency(memoryConstrained: Bool, networkQuality: Int) -> Int { + // Base concurrency based on network quality (1-5) + let baseThreads = min(networkQuality * 2, 8) + + if memoryConstrained { + // Reduce concurrency for memory-constrained systems + return max(2, baseThreads / 2) + } + + // Physical cores available on the system + let cores = ProcessInfo.processInfo.processorCount + + // Adaptive approach: 1-2 threads per core depending on network quality + let threadsPerCore = (networkQuality >= 4) ? 2 : 1 + let systemBasedThreads = min(cores * threadsPerCore, 12) + + // Take the larger of network-based and system-based concurrency + return max(baseThreads, systemBasedThreads) + } + + // Helper to get optimal TCP window size + private func getTCPReceiveWindowSize() -> Int? { + // Try to query system TCP window size + let process = Process() + process.executableURL = URL(fileURLWithPath: "/usr/sbin/sysctl") + process.arguments = ["net.inet.tcp.recvspace"] + + let outputPipe = Pipe() + process.standardOutput = outputPipe + + do { + try process.run() + process.waitUntilExit() + + let outputData = try outputPipe.fileHandleForReading.readToEnd() ?? Data() + if let output = String(data: outputData, encoding: .utf8), + let valueStr = output.split(separator: ":").last?.trimmingCharacters(in: .whitespacesAndNewlines), + let value = Int(valueStr) { + return value + } + } catch { + // Ignore errors, we'll use defaults + } + + return nil + } }