mirror of
https://github.com/HeyPuter/puter.git
synced 2026-01-03 03:30:26 -06:00
clean: remove dead code from ll_write
The `_storage_upload` method in ll_write.js is no longer used, which also means the LLWriteBase class is no longer necessary.
This commit is contained in:
@@ -16,159 +16,17 @@
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
const { Context } = require("../../util/context");
|
||||
const { LLFilesystemOperation } = require("./definitions");
|
||||
const { RESOURCE_STATUS_PENDING_CREATE } = require("../../modules/puterfs/ResourceService.js");
|
||||
const { NodeUIDSelector } = require("../node/selectors");
|
||||
const { UploadProgressTracker } = require("../storage/UploadProgressTracker");
|
||||
const FSNodeContext = require("../FSNodeContext");
|
||||
const APIError = require("../../api/APIError");
|
||||
const { stuck_detector_stream, hashing_stream } = require("../../util/streamutil");
|
||||
const { OperationFrame } = require("../../services/OperationTraceService");
|
||||
const { DB_WRITE } = require("../../services/database/consts");
|
||||
|
||||
const crypto = require('crypto');
|
||||
|
||||
const STUCK_STATUS_TIMEOUT = 10 * 1000;
|
||||
const STUCK_ALARM_TIMEOUT = 20 * 1000;
|
||||
|
||||
/**
|
||||
* Base class for low-level write operations providing common storage upload functionality.
|
||||
* @extends LLFilesystemOperation
|
||||
*/
|
||||
class LLWriteBase extends LLFilesystemOperation {
|
||||
static MODULES = {
|
||||
config: require('../../config.js'),
|
||||
simple_retry: require('../../util/retryutil.js').simple_retry,
|
||||
}
|
||||
|
||||
/**
|
||||
* Uploads a file to storage with progress tracking and error handling.
|
||||
* @param {Object} params - Upload parameters
|
||||
* @param {string} params.uuid - Unique identifier for the file
|
||||
* @param {string} [params.bucket] - Storage bucket name
|
||||
* @param {string} [params.bucket_region] - Storage bucket region
|
||||
* @param {Object} params.file - File object containing stream or buffer
|
||||
* @param {Object} params.tmp - Temporary file information
|
||||
* @returns {Promise<Object>} The upload state object
|
||||
* @throws {APIError} When upload fails
|
||||
*/
|
||||
async _storage_upload ({
|
||||
uuid,
|
||||
bucket, bucket_region, file,
|
||||
tmp,
|
||||
}) {
|
||||
const { config } = this.modules;
|
||||
|
||||
const svc = Context.get('services');
|
||||
const log = svc.get('log-service').create('fs._storage_upload');
|
||||
const errors = svc.get('error-service').create(log);
|
||||
const svc_event = svc.get('event');
|
||||
|
||||
const svc_mountpoint = svc.get('mountpoint');
|
||||
// TODO (xiaochen): what if the provider is not PuterFSProvider?
|
||||
const storage = svc_mountpoint.get_storage(PuterFSProvider.name);
|
||||
|
||||
bucket ??= config.s3_bucket;
|
||||
bucket_region ??= config.s3_region ?? config.region;
|
||||
|
||||
let upload_tracker = new UploadProgressTracker();
|
||||
|
||||
svc_event.emit('fs.storage.upload-progress', {
|
||||
upload_tracker,
|
||||
context: Context.get(),
|
||||
meta: {
|
||||
item_uid: uuid,
|
||||
item_path: tmp.path,
|
||||
}
|
||||
})
|
||||
|
||||
if ( ! file.buffer ) {
|
||||
let stream = file.stream;
|
||||
let alarm_timeout = null;
|
||||
stream = stuck_detector_stream(stream, {
|
||||
timeout: STUCK_STATUS_TIMEOUT,
|
||||
on_stuck: () => {
|
||||
this.frame.status = OperationFrame.FRAME_STATUS_STUCK;
|
||||
log.warn('Upload stream stuck might be stuck', {
|
||||
bucket_region,
|
||||
bucket,
|
||||
uuid,
|
||||
});
|
||||
alarm_timeout = setTimeout(() => {
|
||||
errors.report('fs.write.s3-upload', {
|
||||
message: 'Upload stream stuck for too long',
|
||||
alarm: true,
|
||||
extra: {
|
||||
bucket_region,
|
||||
bucket,
|
||||
uuid,
|
||||
},
|
||||
});
|
||||
}, STUCK_ALARM_TIMEOUT);
|
||||
},
|
||||
on_unstuck: () => {
|
||||
clearTimeout(alarm_timeout);
|
||||
this.frame.status = OperationFrame.FRAME_STATUS_WORKING;
|
||||
}
|
||||
});
|
||||
file = { ...file, stream, };
|
||||
}
|
||||
|
||||
let hashPromise;
|
||||
if ( file.buffer ) {
|
||||
const hash = crypto.createHash('sha256');
|
||||
hash.update(file.buffer);
|
||||
hashPromise = Promise.resolve(hash.digest('hex'));
|
||||
} else {
|
||||
const hs = hashing_stream(file.stream);
|
||||
file.stream = hs.stream;
|
||||
hashPromise = hs.hashPromise;
|
||||
}
|
||||
|
||||
hashPromise.then(hash => {
|
||||
const svc_event = Context.get('services').get('event');
|
||||
this.log.debug('<fs.write>', { uuid, hash });
|
||||
svc_event.emit('outer.fs.write-hash', {
|
||||
hash, uuid,
|
||||
});
|
||||
});
|
||||
|
||||
const state_upload = storage.create_upload();
|
||||
|
||||
try {
|
||||
await state_upload.run({
|
||||
uid: uuid,
|
||||
file,
|
||||
storage_meta: { bucket, bucket_region },
|
||||
storage_api: { progress_tracker: upload_tracker },
|
||||
});
|
||||
} catch (e) {
|
||||
errors.report('fs.write.storage-upload', {
|
||||
source: e || new Error('unknown'),
|
||||
trace: true,
|
||||
alarm: true,
|
||||
extra: {
|
||||
bucket_region,
|
||||
bucket,
|
||||
uuid,
|
||||
},
|
||||
});
|
||||
throw APIError.create('upload_failed');
|
||||
}
|
||||
|
||||
return state_upload;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The "overwrite" write operation.
|
||||
*
|
||||
* This operation is used to write a file to an existing path.
|
||||
*
|
||||
* @extends LLWriteBase
|
||||
* @extends LLFilesystemOperation
|
||||
*/
|
||||
class LLOWrite extends LLWriteBase {
|
||||
class LLOWrite extends LLFilesystemOperation {
|
||||
/**
|
||||
* Executes the overwrite operation by writing to an existing file node.
|
||||
* @returns {Promise<Object>} Result of the write operation
|
||||
@@ -204,9 +62,9 @@ class LLOWrite extends LLWriteBase {
|
||||
*
|
||||
* This operation is used to write a file to a non-existent path.
|
||||
*
|
||||
* @extends LLWriteBase
|
||||
* @extends LLFilesystemOperation
|
||||
*/
|
||||
class LLCWrite extends LLWriteBase {
|
||||
class LLCWrite extends LLFilesystemOperation {
|
||||
static MODULES = {
|
||||
_path: require('path'),
|
||||
uuidv4: require('uuid').v4,
|
||||
|
||||
Reference in New Issue
Block a user