mirror of
https://github.com/HeyPuter/puter.git
synced 2026-01-05 20:50:22 -06:00
refactor(backend): migrate FileCacheService
FileCacheService was still a legacy service extending AdvancedBase. This commit changes it to extend BaseService and implement the lifecycle methods consistent with other services. This is a prerequisite for moving write_overwrite which needs to import the service via `extension.import` (not possible for legacy services) so that it may invalidate file cache when a file is written to.
This commit is contained in:
@@ -433,18 +433,19 @@ const install = async ({ context, services, app, useapi, modapi }) => {
|
||||
|
||||
const { PermissionShortcutService } = require('./services/auth/PermissionShortcutService');
|
||||
services.registerService('permission-shortcut', PermissionShortcutService);
|
||||
|
||||
const { FileCacheService } = require('./services/file-cache/FileCacheService');
|
||||
services.registerService('file-cache', FileCacheService);
|
||||
};
|
||||
|
||||
const install_legacy = async ({ services }) => {
|
||||
const { OperationTraceService } = require('./services/OperationTraceService');
|
||||
const { ClientOperationService } = require('./services/ClientOperationService');
|
||||
const { EngPortalService } = require('./services/EngPortalService');
|
||||
const { FileCacheService } = require('./services/file-cache/FileCacheService');
|
||||
|
||||
// === Services which do not yet extend BaseService ===
|
||||
// services.registerService('filesystem', FilesystemService);
|
||||
services.registerService('operationTrace', OperationTraceService);
|
||||
services.registerService('file-cache', FileCacheService);
|
||||
services.registerService('client-operation', ClientOperationService);
|
||||
services.registerService('engineering-portal', EngPortalService);
|
||||
|
||||
|
||||
@@ -127,7 +127,18 @@ class CommandService extends BaseService {
|
||||
}
|
||||
|
||||
registerCommands(serviceName, commands) {
|
||||
if ( ! this.log ) process.exit(1);
|
||||
if ( ! this.log ) {
|
||||
/* eslint-disable */
|
||||
console.error(
|
||||
'CommandService.registerCommands was called before a logger ' +
|
||||
'was initialied. This happens when calling registerCommands ' +
|
||||
'in the "construct" phase instead of the "init" phase. If ' +
|
||||
'you are migrating a legacy service that does not extend ' +
|
||||
'BaseService, maybe the _construct hook is calling init()'
|
||||
);
|
||||
/* eslint-enable */
|
||||
process.exit(1);
|
||||
}
|
||||
for (const command of commands) {
|
||||
this.log.debug(`registering command ${serviceName}:${command.id}`);
|
||||
this.commands_.push(new Command({
|
||||
|
||||
@@ -17,27 +17,27 @@
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
const { AdvancedBase } = require("@heyputer/putility");
|
||||
const { FileTracker } = require("./FileTracker");
|
||||
const { pausing_tee } = require("../../util/streamutil");
|
||||
const putility = require("@heyputer/putility");
|
||||
const { EWMA } = require("../../util/opmath");
|
||||
const { FileTracker } = require('./FileTracker');
|
||||
const { pausing_tee } = require('../../util/streamutil');
|
||||
const putility = require('@heyputer/putility');
|
||||
const { EWMA } = require('../../util/opmath');
|
||||
|
||||
const crypto = require('crypto');
|
||||
const BaseService = require('../BaseService');
|
||||
|
||||
/**
|
||||
* @class FileCacheService
|
||||
* @extends AdvancedBase
|
||||
* @description
|
||||
|
||||
* The FileCacheService class manages a cache for file storage and retrieval in the Puter system.
|
||||
* The FileCacheService class manages a cache for file storage and retrieval in the Puter system.
|
||||
* This service provides functionalities to:
|
||||
* - Cache files either in memory (precache) or on disk.
|
||||
* - Track file usage with FileTracker instances to manage cache eviction policies.
|
||||
* - Ensure files are stored within configured limits for both disk and memory usage.
|
||||
* - Provide methods for initializing the cache, storing, retrieving, and invalidating cached files.
|
||||
* - Register commands for managing and inspecting the cache status.
|
||||
*
|
||||
*
|
||||
* @property {Object} MODULES - Static property containing module dependencies.
|
||||
* @property {number} disk_limit - The maximum size allowed for disk storage of cached files.
|
||||
* @property {number} disk_max_size - The maximum size of a file that can be cached on disk.
|
||||
@@ -47,25 +47,19 @@ const crypto = require('crypto');
|
||||
* @property {Map} precache - A Map to hold files in memory.
|
||||
* @property {Map} uid_to_tracker - A Map to track each file with its FileTracker instance.
|
||||
*/
|
||||
class FileCacheService extends AdvancedBase {
|
||||
class FileCacheService extends BaseService {
|
||||
static MODULES = {
|
||||
fs: require('fs'),
|
||||
path_: require('path'),
|
||||
}
|
||||
};
|
||||
|
||||
constructor ({ services, my_config, config: global_config }) {
|
||||
super({ services });
|
||||
_construct () {
|
||||
this.disk_limit = this.config.disk_limit;
|
||||
this.disk_max_size = this.config.disk_max_size;
|
||||
this.precache_size = this.config.precache_size;
|
||||
this.path = this.config.path;
|
||||
|
||||
this.log = services.get('log-service').create(this.constructor.name);
|
||||
this.errors = services.get('error-service').create(this.log);
|
||||
this.services = services;
|
||||
|
||||
this.disk_limit = my_config.disk_limit;
|
||||
this.disk_max_size = my_config.disk_max_size;
|
||||
this.precache_size = my_config.precache_size;
|
||||
this.path = my_config.path;
|
||||
|
||||
this.ttl = my_config.ttl || (60 * 1000);
|
||||
this.ttl = this.config.ttl || (60 * 1000);
|
||||
|
||||
this.precache = new Map();
|
||||
this.uid_to_tracker = new Map();
|
||||
@@ -74,61 +68,56 @@ class FileCacheService extends AdvancedBase {
|
||||
initial: 0.5,
|
||||
alpha: 0.2,
|
||||
});
|
||||
|
||||
this.logging_enabled = (global_config.logging ?? [])
|
||||
|
||||
this.logging_enabled = (this.global_config.logging ?? [])
|
||||
.includes('file-cache');
|
||||
|
||||
this.init();
|
||||
|
||||
this._register_commands(services.get('commands'));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Retrieves the amount of precache space currently used.
|
||||
*
|
||||
*
|
||||
* @returns {number} The total size in bytes of files stored in the precache.
|
||||
*/
|
||||
get _precache_used () {
|
||||
let used = 0;
|
||||
|
||||
// Iterate over file trackers in PHASE_PRECACHE
|
||||
for (const tracker of this.uid_to_tracker.values()) {
|
||||
if (tracker.phase !== FileTracker.PHASE_PRECACHE) continue;
|
||||
for ( const tracker of this.uid_to_tracker.values() ) {
|
||||
if ( tracker.phase !== FileTracker.PHASE_PRECACHE ) continue;
|
||||
used += tracker.size;
|
||||
}
|
||||
|
||||
return used;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Calculates the total disk space used by files in the PHASE_DISK phase.
|
||||
*
|
||||
*
|
||||
* @returns {number} The total size of all files currently stored on disk.
|
||||
*/
|
||||
get _disk_used () {
|
||||
let used = 0;
|
||||
|
||||
// Iterate over file trackers in PHASE_DISK
|
||||
for (const tracker of this.uid_to_tracker.values()) {
|
||||
if (tracker.phase !== FileTracker.PHASE_DISK) continue;
|
||||
for ( const tracker of this.uid_to_tracker.values() ) {
|
||||
if ( tracker.phase !== FileTracker.PHASE_DISK ) continue;
|
||||
used += tracker.size;
|
||||
}
|
||||
|
||||
return used;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initializes the cache by ensuring the storage directory exists.
|
||||
*
|
||||
*
|
||||
* @async
|
||||
* @method init
|
||||
* @returns {Promise<void>} A promise that resolves when the initialization is complete.
|
||||
* @throws {Error} If there's an error creating the directory.
|
||||
*/
|
||||
async init () {
|
||||
async _init () {
|
||||
this._register_commands(this.services.get('commands'));
|
||||
|
||||
const { fs } = this.modules;
|
||||
// Ensure storage path exists
|
||||
await fs.promises.mkdir(this.path, { recursive: true });
|
||||
@@ -147,7 +136,7 @@ class FileCacheService extends AdvancedBase {
|
||||
|
||||
/**
|
||||
* Get the file path for a given file UID.
|
||||
*
|
||||
*
|
||||
* @param {string} uid - The unique identifier of the file.
|
||||
* @returns {string} The full path where the file is stored on disk.
|
||||
*/
|
||||
@@ -158,7 +147,7 @@ class FileCacheService extends AdvancedBase {
|
||||
|
||||
/**
|
||||
* Attempts to retrieve a cached file.
|
||||
*
|
||||
*
|
||||
* This method first checks if the file exists in the cache by its UID.
|
||||
* If found, it verifies the file's age against the TTL (time-to-live).
|
||||
* If the file is expired, it invalidates the cache entry. Otherwise,
|
||||
@@ -168,7 +157,7 @@ class FileCacheService extends AdvancedBase {
|
||||
* @param {Object} [opt_log] - Optional logging service to log cache hits.
|
||||
* @returns {Promise<Buffer|null>} - The file data if found, or null.
|
||||
*/
|
||||
async try_get(fsNode, opt_log) {
|
||||
async try_get (fsNode, opt_log) {
|
||||
const result = await this.try_get_(fsNode, opt_log);
|
||||
this.cache_hit_rate.put(result ? 1 : 0);
|
||||
return result;
|
||||
@@ -192,7 +181,7 @@ class FileCacheService extends AdvancedBase {
|
||||
if ( tracker.phase === FileTracker.PHASE_PENDING ) {
|
||||
Promise.race([
|
||||
tracker.p_ready,
|
||||
new Promise(resolve => setTimeout(resolve, 2000))
|
||||
new Promise(resolve => setTimeout(resolve, 2000)),
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -209,12 +198,12 @@ class FileCacheService extends AdvancedBase {
|
||||
}
|
||||
|
||||
if ( tracker.phase === FileTracker.PHASE_PRECACHE ) {
|
||||
if ( opt_log ) opt_log.debug('obtained from precache');
|
||||
this.log.noticeme('obtained from precache');
|
||||
return this.precache.get(await fsNode.get('uid'));
|
||||
}
|
||||
|
||||
if ( tracker.phase === FileTracker.PHASE_DISK ) {
|
||||
if ( opt_log ) opt_log.debug('obtained from disk');
|
||||
this.log.noticeme('obtained from disk');
|
||||
|
||||
const { fs } = this.modules;
|
||||
const path = this._get_path(await fsNode.get('uid'));
|
||||
@@ -236,7 +225,7 @@ class FileCacheService extends AdvancedBase {
|
||||
alarm: true,
|
||||
extra: {
|
||||
phase: tracker.phase?.label,
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return null;
|
||||
@@ -245,22 +234,22 @@ class FileCacheService extends AdvancedBase {
|
||||
/**
|
||||
* Stores a file in the cache if it's "important enough"
|
||||
* to be in the cache (i.e. wouldn't get immediately evicted).
|
||||
* @param {*} fsNode
|
||||
* @param {*} stream
|
||||
* @returns
|
||||
* @param {*} fsNode
|
||||
* @param {*} stream
|
||||
* @returns
|
||||
*/
|
||||
async maybe_store (fsNode, stream) {
|
||||
const size = await fsNode.get('size');
|
||||
|
||||
// If the file is too big, don't cache it
|
||||
if (size > this.disk_max_size) {
|
||||
if ( size > this.disk_max_size ) {
|
||||
return { cached: false };
|
||||
}
|
||||
|
||||
const key = await fsNode.get('uid');
|
||||
|
||||
// If the file is already cached, don't cache it again
|
||||
if (this.uid_to_tracker.has(key)) {
|
||||
if ( this.uid_to_tracker.has(key) ) {
|
||||
return { cached: true };
|
||||
}
|
||||
|
||||
@@ -270,7 +259,6 @@ class FileCacheService extends AdvancedBase {
|
||||
tracker.p_ready = new putility.libs.promise.TeePromise();
|
||||
tracker.touch();
|
||||
|
||||
|
||||
// Store binary data in memory (precache)
|
||||
const data = Buffer.alloc(size);
|
||||
|
||||
@@ -279,7 +267,7 @@ class FileCacheService extends AdvancedBase {
|
||||
(async () => {
|
||||
let offset = 0;
|
||||
const hash = crypto.createHash('sha256');
|
||||
for await (const chunk of store_stream) {
|
||||
for await ( const chunk of store_stream ) {
|
||||
chunk.copy(data, offset);
|
||||
hash.update(chunk);
|
||||
offset += chunk.length;
|
||||
@@ -290,18 +278,17 @@ class FileCacheService extends AdvancedBase {
|
||||
tracker.hash = hash.digest('hex');
|
||||
tracker.phase = FileTracker.PHASE_PRECACHE;
|
||||
tracker.p_ready.resolve();
|
||||
})()
|
||||
})();
|
||||
|
||||
return { cached: true, stream: replace_stream };
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Invalidates a file from the cache.
|
||||
*
|
||||
*
|
||||
* @param {FsNode} fsNode - The file system node to invalidate.
|
||||
* @returns {Promise<void>} A promise that resolves when the file has been invalidated.
|
||||
*
|
||||
*
|
||||
* @description
|
||||
* This method checks if the given file is in the cache, and if so, removes it from both
|
||||
* the precache and disk storage, ensuring that any references to this file are cleaned up.
|
||||
@@ -323,25 +310,21 @@ class FileCacheService extends AdvancedBase {
|
||||
this.uid_to_tracker.delete(key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Evicts files from precache until there's enough room for a new file.
|
||||
* @param {*} size - The size of the file to be stored.
|
||||
*/
|
||||
async _precache_make_room (size) {
|
||||
if (this._precache_used + size > this.precache_size) {
|
||||
await this._precache_evict(
|
||||
this._precache_used + size - this.precache_size
|
||||
);
|
||||
if ( this._precache_used + size > this.precache_size ) {
|
||||
await this._precache_evict(this._precache_used + size - this.precache_size);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Evicts files from precache to make room for new files.
|
||||
* This method sorts all trackers by score and evicts the lowest scoring
|
||||
* files in precache phase until the specified capacity is freed.
|
||||
*
|
||||
*
|
||||
* @param {number} capacity_needed - The amount of capacity (in bytes) that needs to be freed in precache.
|
||||
*/
|
||||
async _precache_evict (capacity_needed) {
|
||||
@@ -350,15 +333,14 @@ class FileCacheService extends AdvancedBase {
|
||||
.sort((a, b) => b.score - a.score);
|
||||
|
||||
let capacity = 0;
|
||||
for (const tracker of sorted) {
|
||||
if (tracker.phase !== FileTracker.PHASE_PRECACHE) continue;
|
||||
for ( const tracker of sorted ) {
|
||||
if ( tracker.phase !== FileTracker.PHASE_PRECACHE ) continue;
|
||||
capacity += tracker.size;
|
||||
await this._maybe_promote_to_disk(tracker);
|
||||
if (capacity >= capacity_needed) break;
|
||||
if ( capacity >= capacity_needed ) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Promotes a file from precache to disk if it has a higher score than the files that would be evicted.
|
||||
*
|
||||
@@ -369,10 +351,10 @@ class FileCacheService extends AdvancedBase {
|
||||
* while before writing it to disk.
|
||||
*
|
||||
* @param {*} tracker - The FileTracker instance representing the file to be promoted.
|
||||
* @returns
|
||||
* @returns
|
||||
*/
|
||||
async _maybe_promote_to_disk (tracker) {
|
||||
if (tracker.phase !== FileTracker.PHASE_PRECACHE) return;
|
||||
if ( tracker.phase !== FileTracker.PHASE_PRECACHE ) return;
|
||||
|
||||
// It's important to check that the score of this file is
|
||||
// higher than the combined score of the N files that
|
||||
@@ -383,45 +365,44 @@ class FileCacheService extends AdvancedBase {
|
||||
let capacity = 0;
|
||||
let score_needed = 0;
|
||||
const capacity_needed = this._disk_used + tracker.size - this.disk_limit;
|
||||
for (const tracker of sorted) {
|
||||
if (tracker.phase !== FileTracker.PHASE_DISK) continue;
|
||||
for ( const tracker of sorted ) {
|
||||
if ( tracker.phase !== FileTracker.PHASE_DISK ) continue;
|
||||
capacity += tracker.size;
|
||||
score_needed += tracker.score;
|
||||
if (capacity >= capacity_needed) break;
|
||||
if ( capacity >= capacity_needed ) break;
|
||||
}
|
||||
|
||||
if (tracker.score < score_needed) return;
|
||||
if ( tracker.score < score_needed ) return;
|
||||
|
||||
// Now we can remove the lowest scoring files
|
||||
// to make room for this file.
|
||||
capacity = 0;
|
||||
for (const tracker of sorted) {
|
||||
if (tracker.phase !== FileTracker.PHASE_DISK) continue;
|
||||
for ( const tracker of sorted ) {
|
||||
if ( tracker.phase !== FileTracker.PHASE_DISK ) continue;
|
||||
capacity += tracker.size;
|
||||
await this._disk_evict(tracker);
|
||||
if (capacity >= capacity_needed) break;
|
||||
if ( capacity >= capacity_needed ) break;
|
||||
}
|
||||
|
||||
const { fs } = this.modules;
|
||||
const path = this._get_path(tracker.key);
|
||||
console.log(`precache fetch key`, tracker.key);
|
||||
console.log('precache fetch key', tracker.key);
|
||||
const data = this.precache.get(tracker.key);
|
||||
await fs.promises.writeFile(path, data);
|
||||
this.precache.delete(tracker.key);
|
||||
tracker.phase = FileTracker.PHASE_DISK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Evicts a file from disk cache.
|
||||
*
|
||||
*
|
||||
* @param {FileTracker} tracker - The FileTracker instance representing the file to be evicted.
|
||||
* @returns {Promise<void>} A promise that resolves when the file is evicted or if the tracker is not in the disk phase.
|
||||
*
|
||||
*
|
||||
* @note This method ensures that the file is removed from the disk cache and the tracker's phase is updated to GONE.
|
||||
*/
|
||||
async _disk_evict (tracker) {
|
||||
if (tracker.phase !== FileTracker.PHASE_DISK) return;
|
||||
if ( tracker.phase !== FileTracker.PHASE_DISK ) return;
|
||||
|
||||
const { fs } = this.modules;
|
||||
const path = this._get_path(tracker.key);
|
||||
@@ -448,18 +429,18 @@ class FileCacheService extends AdvancedBase {
|
||||
};
|
||||
|
||||
log.log(JSON.stringify(status, null, 2));
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'hitrate',
|
||||
handler: async (args, log) => {
|
||||
log.log(this.cache_hit_rate.get());
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
FileCacheService
|
||||
FileCacheService,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user