mirror of
https://github.com/trycua/computer.git
synced 2026-03-03 04:19:27 -06:00
Optimize disk and memory during pull
This commit is contained in:
@@ -10,7 +10,6 @@
|
||||
|
||||
[](#)
|
||||
[](#)
|
||||
[](#install)
|
||||
[](https://discord.com/invite/mVnXXpdE85)
|
||||
</h1>
|
||||
</div>
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import ArgumentParser
|
||||
import Darwin
|
||||
import Foundation
|
||||
import Swift
|
||||
|
||||
@@ -343,6 +344,13 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
let maxConcurrentTasks = 5
|
||||
let counter = TaskCounter()
|
||||
|
||||
// Use a more efficient approach for memory-constrained systems
|
||||
let memoryConstrained = determineIfMemoryConstrained()
|
||||
Logger.info(
|
||||
memoryConstrained
|
||||
? "Using memory-optimized mode for disk parts"
|
||||
: "Using standard mode for disk parts")
|
||||
|
||||
try await withThrowingTaskGroup(of: Int64.self) { group in
|
||||
for layer in manifest.layers {
|
||||
if layer.mediaType == "application/vnd.oci.empty.v1+json" {
|
||||
@@ -354,15 +362,96 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
await counter.decrement()
|
||||
}
|
||||
|
||||
let outputURL: URL
|
||||
if let partInfo = extractPartInfo(from: layer.mediaType) {
|
||||
let (partNum, total) = partInfo
|
||||
totalParts = total
|
||||
outputURL = tempDownloadDir.appendingPathComponent(
|
||||
"disk.img.part.\(partNum)")
|
||||
diskParts.append((partNum, outputURL))
|
||||
|
||||
let cachedLayer = getCachedLayerPath(
|
||||
manifestId: manifestId, digest: layer.digest)
|
||||
let digest = layer.digest
|
||||
let size = layer.size
|
||||
|
||||
// For memory-optimized mode - point directly to cache when possible
|
||||
if !noCache && memoryConstrained
|
||||
&& FileManager.default.fileExists(atPath: cachedLayer.path)
|
||||
{
|
||||
// Use the cached file directly
|
||||
diskParts.append((partNum, cachedLayer))
|
||||
|
||||
// Still need to account for progress
|
||||
group.addTask { @Sendable [self] in
|
||||
await counter.increment()
|
||||
await progress.addProgress(Int64(size))
|
||||
await counter.decrement()
|
||||
return Int64(size)
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
let partURL = tempDownloadDir.appendingPathComponent(
|
||||
"disk.img.part.\(partNum)")
|
||||
diskParts.append((partNum, partURL))
|
||||
|
||||
group.addTask { @Sendable [self] in
|
||||
await counter.increment()
|
||||
|
||||
if !noCache
|
||||
&& FileManager.default.fileExists(atPath: cachedLayer.path)
|
||||
{
|
||||
try FileManager.default.copyItem(at: cachedLayer, to: partURL)
|
||||
await progress.addProgress(Int64(size))
|
||||
} else {
|
||||
// Check if this layer is already being downloaded and we're not skipping cache
|
||||
if !noCache && isDownloading(digest) {
|
||||
try await waitForExistingDownload(
|
||||
digest, cachedLayer: cachedLayer)
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path)
|
||||
{
|
||||
try FileManager.default.copyItem(
|
||||
at: cachedLayer, to: partURL)
|
||||
await progress.addProgress(Int64(size))
|
||||
return Int64(size)
|
||||
}
|
||||
}
|
||||
|
||||
// Start new download
|
||||
if !noCache {
|
||||
markDownloadStarted(digest)
|
||||
}
|
||||
|
||||
try await self.downloadLayer(
|
||||
repository: "\(self.organization)/\(imageName)",
|
||||
digest: digest,
|
||||
mediaType: layer.mediaType,
|
||||
token: token,
|
||||
to: partURL,
|
||||
maxRetries: 5,
|
||||
progress: progress
|
||||
)
|
||||
|
||||
// Cache the downloaded layer if not in noCache mode
|
||||
if !noCache {
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path)
|
||||
{
|
||||
try FileManager.default.removeItem(at: cachedLayer)
|
||||
}
|
||||
try FileManager.default.copyItem(
|
||||
at: partURL, to: cachedLayer)
|
||||
markDownloadComplete(digest)
|
||||
}
|
||||
}
|
||||
|
||||
await counter.decrement()
|
||||
return Int64(size)
|
||||
}
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
switch layer.mediaType {
|
||||
let mediaType = layer.mediaType
|
||||
let digest = layer.digest
|
||||
let size = layer.size
|
||||
|
||||
let outputURL: URL
|
||||
switch mediaType {
|
||||
case "application/vnd.oci.image.layer.v1.tar":
|
||||
outputURL = tempDownloadDir.appendingPathComponent("disk.img")
|
||||
case "application/vnd.oci.image.config.v1+json":
|
||||
@@ -372,55 +461,58 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
group.addTask { @Sendable [self] in
|
||||
await counter.increment()
|
||||
group.addTask { @Sendable [self] in
|
||||
await counter.increment()
|
||||
|
||||
let cachedLayer = getCachedLayerPath(
|
||||
manifestId: manifestId, digest: layer.digest)
|
||||
let cachedLayer = getCachedLayerPath(
|
||||
manifestId: manifestId, digest: digest)
|
||||
|
||||
if !noCache && FileManager.default.fileExists(atPath: cachedLayer.path) {
|
||||
try FileManager.default.copyItem(at: cachedLayer, to: outputURL)
|
||||
await progress.addProgress(Int64(layer.size))
|
||||
} else {
|
||||
// Check if this layer is already being downloaded and we're not skipping cache
|
||||
if !noCache && isDownloading(layer.digest) {
|
||||
try await waitForExistingDownload(
|
||||
layer.digest, cachedLayer: cachedLayer)
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path) {
|
||||
try FileManager.default.copyItem(at: cachedLayer, to: outputURL)
|
||||
await progress.addProgress(Int64(layer.size))
|
||||
return Int64(layer.size)
|
||||
if !noCache && FileManager.default.fileExists(atPath: cachedLayer.path)
|
||||
{
|
||||
try FileManager.default.copyItem(at: cachedLayer, to: outputURL)
|
||||
await progress.addProgress(Int64(size))
|
||||
} else {
|
||||
// Check if this layer is already being downloaded and we're not skipping cache
|
||||
if !noCache && isDownloading(digest) {
|
||||
try await waitForExistingDownload(
|
||||
digest, cachedLayer: cachedLayer)
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path) {
|
||||
try FileManager.default.copyItem(
|
||||
at: cachedLayer, to: outputURL)
|
||||
await progress.addProgress(Int64(size))
|
||||
return Int64(size)
|
||||
}
|
||||
}
|
||||
|
||||
// Start new download
|
||||
if !noCache {
|
||||
markDownloadStarted(digest)
|
||||
}
|
||||
|
||||
try await self.downloadLayer(
|
||||
repository: "\(self.organization)/\(imageName)",
|
||||
digest: digest,
|
||||
mediaType: mediaType,
|
||||
token: token,
|
||||
to: outputURL,
|
||||
maxRetries: 5,
|
||||
progress: progress
|
||||
)
|
||||
|
||||
// Cache the downloaded layer if not in noCache mode
|
||||
if !noCache {
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path) {
|
||||
try FileManager.default.removeItem(at: cachedLayer)
|
||||
}
|
||||
try FileManager.default.copyItem(at: outputURL, to: cachedLayer)
|
||||
markDownloadComplete(digest)
|
||||
}
|
||||
}
|
||||
|
||||
// Start new download
|
||||
if !noCache {
|
||||
markDownloadStarted(layer.digest)
|
||||
}
|
||||
|
||||
try await self.downloadLayer(
|
||||
repository: "\(self.organization)/\(imageName)",
|
||||
digest: layer.digest,
|
||||
mediaType: layer.mediaType,
|
||||
token: token,
|
||||
to: outputURL,
|
||||
maxRetries: 5,
|
||||
progress: progress
|
||||
)
|
||||
|
||||
// Cache the downloaded layer if not in noCache mode
|
||||
if !noCache {
|
||||
if FileManager.default.fileExists(atPath: cachedLayer.path) {
|
||||
try FileManager.default.removeItem(at: cachedLayer)
|
||||
}
|
||||
try FileManager.default.copyItem(at: outputURL, to: cachedLayer)
|
||||
markDownloadComplete(layer.digest)
|
||||
}
|
||||
await counter.decrement()
|
||||
return Int64(size)
|
||||
}
|
||||
|
||||
return Int64(layer.size)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -455,18 +547,44 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
let inputHandle = try FileHandle(forReadingFrom: partURL)
|
||||
defer {
|
||||
try? inputHandle.close()
|
||||
try? FileManager.default.removeItem(at: partURL)
|
||||
// Don't delete the part file if we're in cache mode and the part is from cache
|
||||
if noCache || !partURL.path.contains(cacheDirectory.path) {
|
||||
try? FileManager.default.removeItem(at: partURL)
|
||||
}
|
||||
}
|
||||
|
||||
// Read and write in chunks to minimize memory usage
|
||||
let chunkSize = 10 * 1024 * 1024 // 10MB chunks
|
||||
while let chunk = try inputHandle.read(upToCount: chunkSize) {
|
||||
try outputHandle.write(contentsOf: chunk)
|
||||
totalWritten += UInt64(chunk.count)
|
||||
let progress: Double =
|
||||
Double(totalWritten) / Double(expectedTotalSize) * 100
|
||||
Logger.info("Reassembling disk image: \(Int(progress))%")
|
||||
// On low memory systems, be more aggressive with releasing memory
|
||||
let memoryConstrained = determineIfMemoryConstrained()
|
||||
var chunksProcessed = 0
|
||||
|
||||
while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) {
|
||||
try autoreleasepool {
|
||||
try outputHandle.write(contentsOf: data)
|
||||
totalWritten += UInt64(data.count)
|
||||
|
||||
// Only log progress every 5% to reduce log noise
|
||||
let progress: Double =
|
||||
Double(totalWritten) / Double(expectedTotalSize) * 100
|
||||
let roundedProgress = Int(progress / 5) * 5
|
||||
if roundedProgress != Int(
|
||||
(Double(totalWritten - UInt64(data.count))
|
||||
/ Double(expectedTotalSize) * 100)
|
||||
/ 5) * 5
|
||||
{
|
||||
Logger.info("Reassembling disk image: \(roundedProgress)%")
|
||||
}
|
||||
|
||||
// Force more frequent autoreleases on memory-constrained systems
|
||||
chunksProcessed += 1
|
||||
if memoryConstrained && chunksProcessed % 10 == 0 {
|
||||
try outputHandle.synchronize()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we explicitly close handles after each part to free resources
|
||||
try? inputHandle.synchronize()
|
||||
try inputHandle.close()
|
||||
}
|
||||
|
||||
// Verify final size
|
||||
@@ -526,19 +644,19 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
async throws
|
||||
{
|
||||
Logger.info("Copying from cache...")
|
||||
var diskParts: [(Int, URL)] = []
|
||||
var diskPartSources: [(Int, URL)] = []
|
||||
var totalParts = 0
|
||||
var expectedTotalSize: UInt64 = 0
|
||||
|
||||
// First identify disk parts and non-disk files
|
||||
for layer in manifest.layers {
|
||||
let cachedLayer = getCachedLayerPath(manifestId: manifestId, digest: layer.digest)
|
||||
|
||||
if let partInfo = extractPartInfo(from: layer.mediaType) {
|
||||
let (partNum, total) = partInfo
|
||||
totalParts = total
|
||||
let partURL = destination.appendingPathComponent("disk.img.part.\(partNum)")
|
||||
try FileManager.default.copyItem(at: cachedLayer, to: partURL)
|
||||
diskParts.append((partNum, partURL))
|
||||
// Just store the reference to source instead of copying
|
||||
diskPartSources.append((partNum, cachedLayer))
|
||||
expectedTotalSize += UInt64(layer.size)
|
||||
} else {
|
||||
let fileName: String
|
||||
@@ -552,6 +670,7 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
default:
|
||||
continue
|
||||
}
|
||||
// Only non-disk files are copied
|
||||
try FileManager.default.copyItem(
|
||||
at: cachedLayer,
|
||||
to: destination.appendingPathComponent(fileName)
|
||||
@@ -560,8 +679,8 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
}
|
||||
|
||||
// Reassemble disk parts if needed
|
||||
if !diskParts.isEmpty {
|
||||
Logger.info("Reassembling disk image from cached parts...")
|
||||
if !diskPartSources.isEmpty {
|
||||
Logger.info("Reassembling disk image from cached parts (optimized storage)...")
|
||||
let outputURL = destination.appendingPathComponent("disk.img")
|
||||
FileManager.default.createFile(atPath: outputURL.path, contents: nil)
|
||||
let outputHandle = try FileHandle(forWritingTo: outputURL)
|
||||
@@ -569,18 +688,48 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
|
||||
var totalWritten: UInt64 = 0
|
||||
|
||||
// Process parts in order, reading directly from cache
|
||||
for partNum in 1...totalParts {
|
||||
guard let (_, partURL) = diskParts.first(where: { $0.0 == partNum }) else {
|
||||
guard let (_, sourceURL) = diskPartSources.first(where: { $0.0 == partNum }) else {
|
||||
throw PullError.missingPart(partNum)
|
||||
}
|
||||
|
||||
let inputHandle = try FileHandle(forReadingFrom: partURL)
|
||||
while let data = try inputHandle.read(upToCount: 1024 * 1024 * 10) {
|
||||
try outputHandle.write(contentsOf: data)
|
||||
totalWritten += UInt64(data.count)
|
||||
// Read directly from the cached part
|
||||
let inputHandle = try FileHandle(forReadingFrom: sourceURL)
|
||||
defer { try? inputHandle.close() }
|
||||
|
||||
// On low memory systems, be more aggressive with releasing memory
|
||||
let memoryConstrained = determineIfMemoryConstrained()
|
||||
var chunksProcessed = 0
|
||||
|
||||
while let data = try inputHandle.read(upToCount: getOptimalChunkSize()) {
|
||||
try autoreleasepool {
|
||||
try outputHandle.write(contentsOf: data)
|
||||
totalWritten += UInt64(data.count)
|
||||
|
||||
// Only log progress every 5% to reduce log noise
|
||||
let progress: Double =
|
||||
Double(totalWritten) / Double(expectedTotalSize) * 100
|
||||
let roundedProgress = Int(progress / 5) * 5
|
||||
if roundedProgress != Int(
|
||||
(Double(totalWritten - UInt64(data.count)) / Double(expectedTotalSize)
|
||||
* 100)
|
||||
/ 5) * 5
|
||||
{
|
||||
Logger.info("Reassembling disk image from cache: \(roundedProgress)%")
|
||||
}
|
||||
|
||||
// Force more frequent autoreleases on memory-constrained systems
|
||||
chunksProcessed += 1
|
||||
if memoryConstrained && chunksProcessed % 10 == 0 {
|
||||
try outputHandle.synchronize()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we explicitly close handles after each part to free resources
|
||||
try? inputHandle.synchronize()
|
||||
try inputHandle.close()
|
||||
try FileManager.default.removeItem(at: partURL)
|
||||
}
|
||||
|
||||
// Verify final size
|
||||
@@ -716,16 +865,18 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
// Create the output file
|
||||
FileManager.default.createFile(atPath: destination.path, contents: nil)
|
||||
|
||||
// Process in 10MB chunks
|
||||
let chunkSize = 10 * 1024 * 1024
|
||||
// Process with optimal chunk size
|
||||
let chunkSize = getOptimalChunkSize()
|
||||
while let chunk = try inputHandle.read(upToCount: chunkSize) {
|
||||
try inputPipe.fileHandleForWriting.write(contentsOf: chunk)
|
||||
try autoreleasepool {
|
||||
try inputPipe.fileHandleForWriting.write(contentsOf: chunk)
|
||||
|
||||
// Read and write output in chunks as well
|
||||
while let decompressedChunk = try outputPipe.fileHandleForReading.read(
|
||||
upToCount: chunkSize)
|
||||
{
|
||||
try outputHandle.write(contentsOf: decompressedChunk)
|
||||
// Read and write output in chunks as well
|
||||
while let decompressedChunk = try outputPipe.fileHandleForReading.read(
|
||||
upToCount: chunkSize)
|
||||
{
|
||||
try outputHandle.write(contentsOf: decompressedChunk)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -734,7 +885,9 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
// Read any remaining output
|
||||
while let decompressedChunk = try outputPipe.fileHandleForReading.read(upToCount: chunkSize)
|
||||
{
|
||||
try outputHandle.write(contentsOf: decompressedChunk)
|
||||
try autoreleasepool {
|
||||
try outputHandle.write(contentsOf: decompressedChunk)
|
||||
}
|
||||
}
|
||||
|
||||
process.waitUntilExit()
|
||||
@@ -890,4 +1043,69 @@ class ImageContainerRegistry: @unchecked Sendable {
|
||||
let repoTags = try JSONDecoder().decode(RepositoryTags.self, from: data)
|
||||
return repoTags.tags
|
||||
}
|
||||
|
||||
// Determine appropriate chunk size based on available system memory on macOS
|
||||
private func getOptimalChunkSize() -> Int {
|
||||
// Try to get system memory info
|
||||
var stats = vm_statistics64_data_t()
|
||||
var size = mach_msg_type_number_t(
|
||||
MemoryLayout<vm_statistics64_data_t>.size / MemoryLayout<integer_t>.size)
|
||||
let hostPort = mach_host_self()
|
||||
|
||||
let result = withUnsafeMutablePointer(to: &stats) { statsPtr in
|
||||
statsPtr.withMemoryRebound(to: integer_t.self, capacity: Int(size)) { ptr in
|
||||
host_statistics64(hostPort, HOST_VM_INFO64, ptr, &size)
|
||||
}
|
||||
}
|
||||
|
||||
// Default to 512KB as a safe minimum
|
||||
let defaultChunkSize = 512 * 1024
|
||||
|
||||
// If we can't get memory info, return conservative default
|
||||
guard result == KERN_SUCCESS else {
|
||||
return defaultChunkSize
|
||||
}
|
||||
|
||||
// Calculate free memory in bytes using a fixed page size
|
||||
// Standard page size on macOS is 4KB or 16KB
|
||||
let pageSize = 4096 // Use a constant instead of vm_kernel_page_size
|
||||
let freeMemory = UInt64(stats.free_count) * UInt64(pageSize)
|
||||
|
||||
// On very memory-constrained systems (< 1GB free), use the minimum
|
||||
if freeMemory < 1_073_741_824 { // 1GB
|
||||
return defaultChunkSize
|
||||
}
|
||||
|
||||
// For systems with adequate memory, use a smarter sizing approach:
|
||||
// - Use 0.1% of free memory, with limits
|
||||
let adaptiveSize = min(max(Int(freeMemory / 1000), defaultChunkSize), 2 * 1024 * 1024)
|
||||
return adaptiveSize
|
||||
}
|
||||
|
||||
// Check if system is memory constrained for more aggressive memory management
|
||||
private func determineIfMemoryConstrained() -> Bool {
|
||||
var stats = vm_statistics64_data_t()
|
||||
var size = mach_msg_type_number_t(
|
||||
MemoryLayout<vm_statistics64_data_t>.size / MemoryLayout<integer_t>.size)
|
||||
let hostPort = mach_host_self()
|
||||
|
||||
let result = withUnsafeMutablePointer(to: &stats) { statsPtr in
|
||||
statsPtr.withMemoryRebound(to: integer_t.self, capacity: Int(size)) { ptr in
|
||||
host_statistics64(hostPort, HOST_VM_INFO64, ptr, &size)
|
||||
}
|
||||
}
|
||||
|
||||
guard result == KERN_SUCCESS else {
|
||||
// If we can't determine, assume constrained for safety
|
||||
return true
|
||||
}
|
||||
|
||||
// Calculate free memory in bytes using a fixed page size
|
||||
// Standard page size on macOS is 4KB or 16KB
|
||||
let pageSize = 4096 // Use a constant instead of vm_kernel_page_size
|
||||
let freeMemory = UInt64(stats.free_count) * UInt64(pageSize)
|
||||
|
||||
// Consider memory constrained if less than 2GB free
|
||||
return freeMemory < 2_147_483_648 // 2GB
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user