diff --git a/src/backend/src/filesystem/ll_operations/ll_write.js b/src/backend/src/filesystem/ll_operations/ll_write.js index 0b592274..b566dd4f 100644 --- a/src/backend/src/filesystem/ll_operations/ll_write.js +++ b/src/backend/src/filesystem/ll_operations/ll_write.js @@ -16,159 +16,17 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -const { Context } = require("../../util/context"); const { LLFilesystemOperation } = require("./definitions"); -const { RESOURCE_STATUS_PENDING_CREATE } = require("../../modules/puterfs/ResourceService.js"); -const { NodeUIDSelector } = require("../node/selectors"); -const { UploadProgressTracker } = require("../storage/UploadProgressTracker"); -const FSNodeContext = require("../FSNodeContext"); const APIError = require("../../api/APIError"); -const { stuck_detector_stream, hashing_stream } = require("../../util/streamutil"); -const { OperationFrame } = require("../../services/OperationTraceService"); -const { DB_WRITE } = require("../../services/database/consts"); - -const crypto = require('crypto'); - -const STUCK_STATUS_TIMEOUT = 10 * 1000; -const STUCK_ALARM_TIMEOUT = 20 * 1000; - -/** - * Base class for low-level write operations providing common storage upload functionality. - * @extends LLFilesystemOperation - */ -class LLWriteBase extends LLFilesystemOperation { - static MODULES = { - config: require('../../config.js'), - simple_retry: require('../../util/retryutil.js').simple_retry, - } - - /** - * Uploads a file to storage with progress tracking and error handling. - * @param {Object} params - Upload parameters - * @param {string} params.uuid - Unique identifier for the file - * @param {string} [params.bucket] - Storage bucket name - * @param {string} [params.bucket_region] - Storage bucket region - * @param {Object} params.file - File object containing stream or buffer - * @param {Object} params.tmp - Temporary file information - * @returns {Promise} The upload state object - * @throws {APIError} When upload fails - */ - async _storage_upload ({ - uuid, - bucket, bucket_region, file, - tmp, - }) { - const { config } = this.modules; - - const svc = Context.get('services'); - const log = svc.get('log-service').create('fs._storage_upload'); - const errors = svc.get('error-service').create(log); - const svc_event = svc.get('event'); - - const svc_mountpoint = svc.get('mountpoint'); - // TODO (xiaochen): what if the provider is not PuterFSProvider? - const storage = svc_mountpoint.get_storage(PuterFSProvider.name); - - bucket ??= config.s3_bucket; - bucket_region ??= config.s3_region ?? config.region; - - let upload_tracker = new UploadProgressTracker(); - - svc_event.emit('fs.storage.upload-progress', { - upload_tracker, - context: Context.get(), - meta: { - item_uid: uuid, - item_path: tmp.path, - } - }) - - if ( ! file.buffer ) { - let stream = file.stream; - let alarm_timeout = null; - stream = stuck_detector_stream(stream, { - timeout: STUCK_STATUS_TIMEOUT, - on_stuck: () => { - this.frame.status = OperationFrame.FRAME_STATUS_STUCK; - log.warn('Upload stream stuck might be stuck', { - bucket_region, - bucket, - uuid, - }); - alarm_timeout = setTimeout(() => { - errors.report('fs.write.s3-upload', { - message: 'Upload stream stuck for too long', - alarm: true, - extra: { - bucket_region, - bucket, - uuid, - }, - }); - }, STUCK_ALARM_TIMEOUT); - }, - on_unstuck: () => { - clearTimeout(alarm_timeout); - this.frame.status = OperationFrame.FRAME_STATUS_WORKING; - } - }); - file = { ...file, stream, }; - } - - let hashPromise; - if ( file.buffer ) { - const hash = crypto.createHash('sha256'); - hash.update(file.buffer); - hashPromise = Promise.resolve(hash.digest('hex')); - } else { - const hs = hashing_stream(file.stream); - file.stream = hs.stream; - hashPromise = hs.hashPromise; - } - - hashPromise.then(hash => { - const svc_event = Context.get('services').get('event'); - this.log.debug('', { uuid, hash }); - svc_event.emit('outer.fs.write-hash', { - hash, uuid, - }); - }); - - const state_upload = storage.create_upload(); - - try { - await state_upload.run({ - uid: uuid, - file, - storage_meta: { bucket, bucket_region }, - storage_api: { progress_tracker: upload_tracker }, - }); - } catch (e) { - errors.report('fs.write.storage-upload', { - source: e || new Error('unknown'), - trace: true, - alarm: true, - extra: { - bucket_region, - bucket, - uuid, - }, - }); - throw APIError.create('upload_failed'); - } - - return state_upload; - } -} /** * The "overwrite" write operation. * * This operation is used to write a file to an existing path. * - * @extends LLWriteBase + * @extends LLFilesystemOperation */ -class LLOWrite extends LLWriteBase { +class LLOWrite extends LLFilesystemOperation { /** * Executes the overwrite operation by writing to an existing file node. * @returns {Promise} Result of the write operation @@ -204,9 +62,9 @@ class LLOWrite extends LLWriteBase { * * This operation is used to write a file to a non-existent path. * - * @extends LLWriteBase + * @extends LLFilesystemOperation */ -class LLCWrite extends LLWriteBase { +class LLCWrite extends LLFilesystemOperation { static MODULES = { _path: require('path'), uuidv4: require('uuid').v4,