diff --git a/eslint.config.js b/eslint.config.js index 5f10d324..99ea1ec2 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -55,6 +55,9 @@ export const rules = { '@stylistic/type-annotation-spacing': 'error', '@stylistic/type-generic-spacing': 'error', '@stylistic/type-named-tuple-spacing': ['error'], + 'no-use-before-define': ['error', { + 'functions': false, + }], }; export default defineConfig([ @@ -65,6 +68,7 @@ export default defineConfig([ }, rules: { '@typescript-eslint/no-unused-vars': 'off', // Disable rules requiring type checking + 'no-use-before-define': 'off', }, }, // TypeScript support for tests diff --git a/eslint/mandatory.eslint.config.js b/eslint/mandatory.eslint.config.js index e3036508..b3d13991 100644 --- a/eslint/mandatory.eslint.config.js +++ b/eslint/mandatory.eslint.config.js @@ -20,6 +20,13 @@ const backendLanguageOptions = { }, }; +const mandatoryRules = { + 'no-undef': 'error', + 'no-use-before-define': ['error', { + 'functions': false, + }], +}; + export default defineConfig([ { ignores: [ @@ -47,9 +54,7 @@ export default defineConfig([ ignores: [ 'src/backend/src/services/database/sqlite_setup/**/*.js', ], - rules: { - 'no-undef': 'error', - }, + rules: mandatoryRules, languageOptions: { ...backendLanguageOptions, }, @@ -58,9 +63,7 @@ export default defineConfig([ files: [ 'src/backend/src/services/database/sqlite_setup/**/*.js', ], - rules: { - 'no-undef': 'error', - }, + rules: mandatoryRules, languageOptions: { globals: { read: 'readonly', @@ -76,9 +79,7 @@ export default defineConfig([ 'extensions/**/*.{ts}', 'src/backend-core-0/**/*.{ts}', ], - rules: { - 'no-undef': 'error', - }, + rules: mandatoryRules, languageOptions: { ...backendLanguageOptions, }, diff --git a/src/backend/src/CoreModule.js b/src/backend/src/CoreModule.js index f4dcee21..3176a051 100644 --- a/src/backend/src/CoreModule.js +++ b/src/backend/src/CoreModule.js @@ -30,45 +30,6 @@ const { TDetachable } = require('@heyputer/putility/src/traits/traits.js'); const { MultiDetachable } = require('@heyputer/putility/src/libs/listener.js'); const { OperationFrame } = require('./services/OperationTraceService'); -/** - * Core module for the Puter platform that includes essential services including - * authentication, filesystems, rate limiting, permissions, and various API endpoints. - * - * This is a monolithic module. Incrementally, services should be migrated to - * Core2Module and other modules instead. Core2Module has a smaller scope, and each - * new module will be a cohesive concern. Once CoreModule is empty, it will be removed - * and Core2Module will take on its name. - */ -class CoreModule extends AdvancedBase { - dirname () { - return __dirname; - } - async install (context) { - const services = context.get('services'); - const app = context.get('app'); - const useapi = context.get('useapi'); - const modapi = context.get('modapi'); - await install({ context, services, app, useapi, modapi }); - } - - /** - * Installs legacy services that don't extend BaseService and require special handling. - * These services were created before the BaseService class existed and don't listen - * to the init event. They need to be installed after the init event is dispatched - * due to initialization order dependencies. - * - * @param {Object} context - The context object containing service references - * @param {Object} context.services - Service registry for registering legacy services - * @returns {Promise} Resolves when legacy services are installed - */ - async install_legacy (context) { - const services = context.get('services'); - await install_legacy({ services }); - } -} - -module.exports = CoreModule; - /** * @footgun - real install method is defined above */ @@ -146,7 +107,6 @@ const install = async ({ context, services, app, useapi, modapi }) => { const { NAPIThumbnailService } = require('./services/thumbnails/NAPIThumbnailService'); const { RateLimitService } = require('./services/sla/RateLimitService'); const { AuthService } = require('./services/auth/AuthService'); - const { PreAuthService } = require('./services/auth/PreAuthService'); const { SLAService } = require('./services/sla/SLAService'); const { PermissionService } = require('./services/auth/PermissionService'); const { ACLService } = require('./services/auth/ACLService'); @@ -450,3 +410,42 @@ const install_legacy = async ({ services }) => { services.registerService('engineering-portal', EngPortalService); }; + +/** + * Core module for the Puter platform that includes essential services including + * authentication, filesystems, rate limiting, permissions, and various API endpoints. + * + * This is a monolithic module. Incrementally, services should be migrated to + * Core2Module and other modules instead. Core2Module has a smaller scope, and each + * new module will be a cohesive concern. Once CoreModule is empty, it will be removed + * and Core2Module will take on its name. + */ +class CoreModule extends AdvancedBase { + dirname () { + return __dirname; + } + async install (context) { + const services = context.get('services'); + const app = context.get('app'); + const useapi = context.get('useapi'); + const modapi = context.get('modapi'); + await install({ context, services, app, useapi, modapi }); + } + + /** + * Installs legacy services that don't extend BaseService and require special handling. + * These services were created before the BaseService class existed and don't listen + * to the init event. They need to be installed after the init event is dispatched + * due to initialization order dependencies. + * + * @param {Object} context - The context object containing service references + * @param {Object} context.services - Service registry for registering legacy services + * @returns {Promise} Resolves when legacy services are installed + */ + async install_legacy (context) { + const services = context.get('services'); + await install_legacy({ services }); + } +} + +module.exports = CoreModule; \ No newline at end of file diff --git a/src/backend/src/helpers.js b/src/backend/src/helpers.js index 15297301..e72f7075 100644 --- a/src/backend/src/helpers.js +++ b/src/backend/src/helpers.js @@ -262,76 +262,6 @@ function invalidate_cached_user_by_id (id) { */ async function refresh_apps_cache (options, override) { return; - /** @type BaseDatabaseAccessService */ - const db = services.get('database').get(DB_READ, 'apps'); - const svc_event = services.get('event'); - - const log = services.get('log-service').create('refresh_apps_cache'); - log.tick('refresh apps cache'); - // if options is not provided, refresh all apps - if ( ! options ) { - let apps = await db.read('SELECT * FROM apps'); - for ( let index = 0; index < apps.length; index++ ) { - const app = apps[index]; - kv.set(`apps:name:${ app.name}`, app); - kv.set(`apps:id:${ app.id}`, app); - kv.set(`apps:uid:${ app.uid}`, app); - } - svc_event.emit('apps.invalidate', { - options, apps, - }); - } - // refresh only apps that are approved for listing - else if ( options.only_approved_for_listing ) { - let apps = await db.read('SELECT * FROM apps WHERE approved_for_listing = 1'); - for ( let index = 0; index < apps.length; index++ ) { - const app = apps[index]; - kv.set(`apps:name:${ app.name}`, app); - kv.set(`apps:id:${ app.id}`, app); - kv.set(`apps:uid:${ app.uid}`, app); - } - svc_event.emit('apps.invalidate', { - options, apps, - }); - } - // if options is provided, refresh only the app specified - else { - let app; - - if ( options.name ) - { - app = await db.pread('SELECT * FROM apps WHERE name = ?', [options.name]); - } - else if ( options.uid ) - { - app = await db.pread('SELECT * FROM apps WHERE uid = ?', [options.uid]); - } - else if ( options.id ) - { - app = await db.pread('SELECT * FROM apps WHERE id = ?', [options.id]); - } - else { - log.error('invalid options to refresh_apps_cache'); - throw new Error('Invalid options provided'); - } - - if ( !app || !app[0] ) { - log.error('refresh_apps_cache could not find the app'); - return; - } else { - app = app[0]; - if ( override ) { - Object.assign(app, override); - } - kv.set(`apps:name:${ app.name}`, app); - kv.set(`apps:id:${ app.id}`, app); - kv.set(`apps:uid:${ app.uid}`, app); - } - - svc_event.emit('apps.invalidate', { - options, app, - }); - } } async function refresh_associations_cache () { @@ -664,18 +594,6 @@ function byte_format (bytes) { return `${Math.round(bytes / Math.pow(1024, i), 2) } ${ sizes[i]}`; }; -const get_dir_size = async (path, user) => { - let size = 0; - const descendants = await get_descendants(path, user); - for ( let i = 0; i < descendants.length; i++ ) { - if ( ! descendants[i].is_dir ) { - size += descendants[i].size; - } - } - - return size; -}; - /** * Recursively retrieve all files, directories, and subdirectories under `path`. * Optionally the `depth` can be set. @@ -685,7 +603,7 @@ const get_dir_size = async (path, user) => { * @param {integer} depth * @returns */ -const get_descendants_0 = async (path, user, depth, return_thumbnail = false) => { +async function getDescendantsHelper (path, user, depth, return_thumbnail = false) { const log = services.get('log-service').create('get_descendants'); log.called(); @@ -865,22 +783,34 @@ const get_descendants_0 = async (path, user, depth, return_thumbnail = false) => return ret.flat(); }; -const get_descendants = async (...args) => { +async function get_descendants (...args) { const tracer = services.get('traceService').tracer; let ret; await tracer.startActiveSpan('get_descendants', async span => { - ret = await get_descendants_0(...args); + ret = await getDescendantsHelper(...args); span.end(); }); return ret; }; +const get_dir_size = async (path, user) => { + let size = 0; + const descendants = await get_descendants(path, user); + for ( let i = 0; i < descendants.length; i++ ) { + if ( ! descendants[i].is_dir ) { + size += descendants[i].size; + } + } + + return size; +}; + /** * * @param {integer} entry_id * @returns */ -const id2path = async (entry_uid) => { +async function id2path (entry_uid) { if ( entry_uid == null ) { throw new Error('got null or undefined entry id'); } diff --git a/src/backend/src/modules/core/ParameterService.js b/src/backend/src/modules/core/ParameterService.js index f011cd81..ffe7e435 100644 --- a/src/backend/src/modules/core/ParameterService.js +++ b/src/backend/src/modules/core/ParameterService.js @@ -20,6 +20,64 @@ const BaseService = require('../../services/BaseService'); +/** +* @class Parameter +* @description Represents a configurable parameter with value management, constraints, and change notification capabilities. +* Provides functionality for setting/getting values, binding to object instances, and subscribing to value changes. +* Supports validation through configurable constraints and maintains a list of value change listeners. +*/ +class Parameter { + constructor (spec) { + this.spec_ = spec; + this.valueListeners_ = []; + + if ( spec.default ) { + this.value_ = spec.default; + } + } + + /** + * Sets a new value for the parameter after validating against constraints + * @param {*} value - The new value to set for the parameter + * @throws {Error} If the value fails any constraint checks + * @fires valueListeners with new value and old value + * @async + */ + async set (value) { + for ( const constraint of (this.spec_.constraints ?? []) ) { + if ( ! await constraint.check(value) ) { + throw new Error(`value ${value} does not satisfy constraint ${constraint.id}`); + } + } + + const old = this.value_; + this.value_ = value; + for ( const listener of this.valueListeners_ ) { + listener(value, { old }); + } + } + + /** + * Gets the current value of this parameter + * @returns {Promise<*>} The parameter's current value + */ + async get () { + return this.value_; + } + + bindToInstance (instance, name) { + const value = this.value_; + instance[name] = value; + this.valueListeners_.push((value) => { + instance[name] = value; + }); + } + + subscribe (listener) { + this.valueListeners_.push(listener); + } +} + /** * @class ParameterService * @extends BaseService @@ -154,64 +212,6 @@ class ParameterService extends BaseService { } } -/** -* @class Parameter -* @description Represents a configurable parameter with value management, constraints, and change notification capabilities. -* Provides functionality for setting/getting values, binding to object instances, and subscribing to value changes. -* Supports validation through configurable constraints and maintains a list of value change listeners. -*/ -class Parameter { - constructor (spec) { - this.spec_ = spec; - this.valueListeners_ = []; - - if ( spec.default ) { - this.value_ = spec.default; - } - } - - /** - * Sets a new value for the parameter after validating against constraints - * @param {*} value - The new value to set for the parameter - * @throws {Error} If the value fails any constraint checks - * @fires valueListeners with new value and old value - * @async - */ - async set (value) { - for ( const constraint of (this.spec_.constraints ?? []) ) { - if ( ! await constraint.check(value) ) { - throw new Error(`value ${value} does not satisfy constraint ${constraint.id}`); - } - } - - const old = this.value_; - this.value_ = value; - for ( const listener of this.valueListeners_ ) { - listener(value, { old }); - } - } - - /** - * Gets the current value of this parameter - * @returns {Promise<*>} The parameter's current value - */ - async get () { - return this.value_; - } - - bindToInstance (instance, name) { - const value = this.value_; - instance[name] = value; - this.valueListeners_.push((value) => { - instance[name] = value; - }); - } - - subscribe (listener) { - this.valueListeners_.push(listener); - } -} - module.exports = { ParameterService, }; diff --git a/src/backend/src/modules/puterai/TogetherAIService.js b/src/backend/src/modules/puterai/TogetherAIService.js index df12eb27..ef8b050d 100644 --- a/src/backend/src/modules/puterai/TogetherAIService.js +++ b/src/backend/src/modules/puterai/TogetherAIService.js @@ -113,7 +113,7 @@ class TogetherAIService extends BaseService { // Metering integration const actor = Context.get('actor'); - const modelDetails = (await this.models_()).find(m => m.id === modelId || m.aliases?.include(modelId)); + const modelDetails = (await this.models_()).find(m => m.id === model || m.aliases?.include(model)); const modelId = modelDetails ?? this.get_default_model(); if ( stream ) { diff --git a/src/backend/src/routers/drivers/call.js b/src/backend/src/routers/drivers/call.js index 4bbb977e..855d52d8 100644 --- a/src/backend/src/routers/drivers/call.js +++ b/src/backend/src/routers/drivers/call.js @@ -27,6 +27,27 @@ const { TeePromise } = require('@heyputer/putility').libs.promise; const { valid_file_size } = require('../../util/validutil'); let _handle_multipart; +const responseHelper = (res, result) => { + if ( result.result instanceof TypedValue ) { + const tv = result.result; + if ( TypeSpec.adapt({ $: 'stream' }).equals(tv.type) ) { + res.set('Content-Type', tv.type.raw.content_type); + if ( tv.type.raw.chunked ) { + res.set('Transfer-Encoding', 'chunked'); + } + tv.value.pipe(res); + return; + } + + // This is the + if ( typeof tv.value === 'object' ) { + tv.value.type_fallback = true; + } + res.json(tv.value); + return; + } + res.json(result); +}; /** * POST /drivers/call @@ -56,7 +77,7 @@ module.exports = eggspress('/drivers/call', { // noReallyItsJson: true, jsonCanBeLarge: true, allowedMethods: ['POST'], -}, async (req, res, next) => { +}, async (req, res) => { const x = Context.get(); const svc_driver = x.get('services').get('driver'); @@ -87,35 +108,13 @@ module.exports = eggspress('/drivers/call', { // stream transformation, thus the stream from the request isn't // consumed until the response is being sent. - _respond(res, result); + responseHelper(res, result); // What we _can_ do is await the request promise while responding // to ensure errors are caught here. await p_request; }); -const _respond = (res, result) => { - if ( result.result instanceof TypedValue ) { - const tv = result.result; - if ( TypeSpec.adapt({ $: 'stream' }).equals(tv.type) ) { - res.set('Content-Type', tv.type.raw.content_type); - if ( tv.type.raw.chunked ) { - res.set('Transfer-Encoding', 'chunked'); - } - tv.value.pipe(res); - return; - } - - // This is the - if ( typeof tv.value === 'object' ) { - tv.value.type_fallback = true; - } - res.json(tv.value); - return; - } - res.json(result); -}; - _handle_multipart = async (req) => { const Busboy = require('busboy'); const { PassThrough } = require('stream'); @@ -130,7 +129,7 @@ _handle_multipart = async (req) => { const p_data_end = new TeePromise(); const p_nonfile_data_end = new TeePromise(); - bb.on('file', (fieldname, stream, details) => { + bb.on('file', (fieldname, stream, _details) => { p_nonfile_data_end.resolve(); const fileinfo = files[file_index++]; stream.pipe(fileinfo.stream); @@ -141,7 +140,7 @@ _handle_multipart = async (req) => { const last_key = key_parts.pop(); let dst = params; for ( let i = 0; i < key_parts.length; i++ ) { - if ( ! dst.hasOwnProperty(key_parts[i]) ) { + if ( ! Object.prototype.hasOwnProperty.call(dst, key_parts[i]) ) { dst[key_parts[i]] = {}; } if ( whatis(dst[key_parts[i]]) !== 'object' ) { @@ -164,7 +163,7 @@ _handle_multipart = async (req) => { files.push(fileinfo); value = file_facade; } - if ( dst.hasOwnProperty(last_key) ) { + if ( Object.prototype.hasOwnProperty.call(dst, last_key) ) { if ( ! Array.isArray(dst[last_key]) ) { dst[last_key] = [dst[last_key]]; } @@ -174,7 +173,7 @@ _handle_multipart = async (req) => { } }; - bb.on('field', (fieldname, value, details) => { + bb.on('field', (fieldname, value, _details) => { const o = JSON.parse(value); for ( const k in o ) { on_field(k, o[k]); diff --git a/src/backend/src/routers/filesystem_api/batch/all.js b/src/backend/src/routers/filesystem_api/batch/all.js index 9bf208f6..188928c2 100644 --- a/src/backend/src/routers/filesystem_api/batch/all.js +++ b/src/backend/src/routers/filesystem_api/batch/all.js @@ -37,7 +37,7 @@ module.exports = eggspress('/batch', { // multest: true, // multipart_jsons: ['operation'], allowedMethods: ['POST'], -}, async (req, res, next) => { +}, async (req, res, _next) => { const log = req.services.get('log-service').create('batch'); const errors = req.services.get('error-service').create(log); @@ -46,6 +46,7 @@ module.exports = eggspress('/batch', { let app; if ( req.body.app_uid ) { + // eslint-disable-next-line no-unused-vars app = await get_app({ uid: req.body.app_uid }); } @@ -135,6 +136,7 @@ module.exports = eggspress('/batch', { const pending_operations = []; const response_promises = []; const fileinfos = []; + let request_error = null; const on_nonfile_data_end = OnlyOnceFn(() => { if ( request_error ) { @@ -150,6 +152,7 @@ module.exports = eggspress('/batch', { log.debug(`executing ${op_spec.op}`); response_promises[i] = batch_exe.exec_op(req, op_spec); } else { + // no handler } } @@ -167,7 +170,6 @@ module.exports = eggspress('/batch', { }); const still_reading = new TeePromise(); - let request_error = null; busboy.on('field', (fieldname, value, details) => { try { @@ -178,7 +180,7 @@ module.exports = eggspress('/batch', { throw new Error('valueTruncated'); } - if ( expected_metadata.hasOwnProperty(fieldname) ) { + if ( Object.prototype.hasOwnProperty.call(expected_metadata, fieldname) ) { expected_metadata[fieldname] = value; req.body[fieldname] = value; return; @@ -216,7 +218,7 @@ module.exports = eggspress('/batch', { } }); - busboy.on('file', async (fieldname, stream, detais) => { + busboy.on('file', async (fieldname, stream ) => { if ( batch_exe.total_tbd ) { batch_exe.total_tbd = false; batch_widget.ic = pending_operations.length; @@ -281,7 +283,9 @@ module.exports = eggspress('/batch', { frame.done(); if ( pending_operations.length ) { - for ( const op_spec of pending_operations ) { + + // eslint-disable-next-line no-unused-vars + for ( const _op_spec of pending_operations ) { const err = new APIError('batch_missing_file'); request_errors_.push(err); } diff --git a/src/backend/src/services/auth/Actor.js b/src/backend/src/services/auth/Actor.js index da48aa73..fc4d6642 100644 --- a/src/backend/src/services/auth/Actor.js +++ b/src/backend/src/services/auth/Actor.js @@ -29,6 +29,54 @@ const PRIVATE_UID_NAMESPACE = config.private_uid_namespace const PRIVATE_UID_SECRET = config.private_uid_secret ?? require('crypto').randomBytes(24).toString('hex'); +/** + * Base class for all actor types in the system. + * Provides common initialization functionality for actor type instances. + */ +class ActorType { + /** + * Initializes the ActorType with the provided properties. + * + * @param {Object} o - Object containing properties to assign to this instance. + */ + constructor (o) { + for ( const k in o ) { + this[k] = o[k]; + } + } +} + +/** + * Class representing the system actor type within the actor framework. + * This type serves as a specific implementation of an actor that + * represents a system-level entity and provides methods for UID retrieval + * and related type management. + */ +class SystemActorType extends ActorType { + /** + * Gets the unique identifier for the system actor. + * + * @returns {string} Always returns 'system'. + */ + get uid () { + return 'system'; + } + + /** + * Gets a related actor type for the system actor. + * + * @param {Function} type_class - The ActorType class to get a related type for. + * @returns {SystemActorType} Returns this instance if type_class is SystemActorType. + * @throws {Error} If the requested type_class is not supported. + */ + get_related_type (type_class) { + if ( type_class === SystemActorType ) { + return this; + } + throw new Error(`cannot get ${type_class.name} from ${this.constructor.name}`); + } +} + /** * Represents an Actor in the system, extending functionality from AdvancedBase. * The Actor class is responsible for managing actor instances, including @@ -173,54 +221,6 @@ class Actor extends AdvancedBase { } } -/** - * Base class for all actor types in the system. - * Provides common initialization functionality for actor type instances. - */ -class ActorType { - /** - * Initializes the ActorType with the provided properties. - * - * @param {Object} o - Object containing properties to assign to this instance. - */ - constructor (o) { - for ( const k in o ) { - this[k] = o[k]; - } - } -} - -/** - * Class representing the system actor type within the actor framework. - * This type serves as a specific implementation of an actor that - * represents a system-level entity and provides methods for UID retrieval - * and related type management. - */ -class SystemActorType extends ActorType { - /** - * Gets the unique identifier for the system actor. - * - * @returns {string} Always returns 'system'. - */ - get uid () { - return 'system'; - } - - /** - * Gets a related actor type for the system actor. - * - * @param {Function} type_class - The ActorType class to get a related type for. - * @returns {SystemActorType} Returns this instance if type_class is SystemActorType. - * @throws {Error} If the requested type_class is not supported. - */ - get_related_type (type_class) { - if ( type_class === SystemActorType ) { - return this; - } - throw new Error(`cannot get ${type_class.name} from ${this.constructor.name}`); - } -} - /** * Represents the type of a User Actor in the system, allowing operations and relations * specific to user actors. This class extends the base functionality to uniquely identify @@ -334,7 +334,7 @@ class SiteActorType { * @param {Object} o - The properties to initialize the SiteActorType with. * @param {...*} a - Additional arguments. */ - constructor (o, ...a) { + constructor (o, ..._a) { for ( const k in o ) { this[k] = o[k]; } diff --git a/src/backend/src/util/streamutil.js b/src/backend/src/util/streamutil.js index 2b7aab84..c51935c3 100644 --- a/src/backend/src/util/streamutil.js +++ b/src/backend/src/util/streamutil.js @@ -145,33 +145,64 @@ const offset_write_stream = ({ originalDataStream, newDataStream, offset, replace_length = 0, }) => { + const passThrough = new PassThrough(); let remaining = offset; let new_end = false; let org_end = false; let replaced_bytes = 0; - - let last_state = null; - const implied = { - get state () { - const state = - remaining > 0 ? STATE_ORIGINAL_STREAM : - new_end && org_end ? STATE_END : - new_end ? STATE_CONTINUE : - STATE_NEW_STREAM ; - // (comment to reset indentation) - if ( state !== last_state ) { - last_state = state; - if ( state.on_enter ) state.on_enter(); - } - return state; + let defer_buffer = Buffer.alloc(0); + let new_stream_early_buffer = Buffer.alloc(0); + let implied; + const STATE_ORIGINAL_STREAM = { + on_enter: () => { + console.log('STATE_ORIGINAL_STREAM'); + newDataStream.pause(); }, }; - let defer_buffer = Buffer.alloc(0); - let new_stream_early_buffer = Buffer.alloc(0); + const STATE_NEW_STREAM = { + on_enter: () => { + console.log('STATE_NEW_STREAM'); + originalDataStream.pause(); + originalDataStream.off('data', original_stream_on_data); + newDataStream.resume(); + }, + }; - const original_stream_on_data = chunk => { + const STATE_END = { + on_enter: () => { + console.log('STATE_END'); + passThrough.end(); + }, + }; + + const STATE_CONTINUE = { + on_enter: () => { + console.log('STATE_CONTINUE'); + if ( defer_buffer.length > 0 ) { + const remaining_replacement = replace_length - replaced_bytes; + if ( replaced_bytes < replace_length ) { + if ( defer_buffer.length <= remaining_replacement ) { + console.log('skipping deferred', defer_buffer.toString()); + replaced_bytes += defer_buffer.length; + defer_buffer = Buffer.alloc(0); + } else { + console.log('skipping deferred', defer_buffer.slice(0, remaining_replacement).toString()); + defer_buffer = defer_buffer.slice(remaining_replacement); + replaced_bytes += remaining_replacement; + } + } + console.log('pushing deferred:', defer_buffer.toString()); + passThrough.push(defer_buffer); + } + // originalDataStream.pipe(passThrough); + originalDataStream.on('data', original_stream_on_data); + originalDataStream.resume(); + }, + }; + + function original_stream_on_data (chunk) { console.log('original stream data', chunk.length, implied.state); console.log('received from original:', chunk.toString()); @@ -214,48 +245,20 @@ const offset_write_stream = ({ implied.state; }; - const STATE_ORIGINAL_STREAM = { - on_enter: () => { - console.log('STATE_ORIGINAL_STREAM'); - newDataStream.pause(); - }, - }; - const STATE_NEW_STREAM = { - on_enter: () => { - console.log('STATE_NEW_STREAM'); - originalDataStream.pause(); - originalDataStream.off('data', original_stream_on_data); - newDataStream.resume(); - }, - }; - const STATE_CONTINUE = { - on_enter: () => { - console.log('STATE_CONTINUE'); - if ( defer_buffer.length > 0 ) { - const remaining_replacement = replace_length - replaced_bytes; - if ( replaced_bytes < replace_length ) { - if ( defer_buffer.length <= remaining_replacement ) { - console.log('skipping deferred', defer_buffer.toString()); - replaced_bytes += defer_buffer.length; - defer_buffer = Buffer.alloc(0); - } else { - console.log('skipping deferred', defer_buffer.slice(0, remaining_replacement).toString()); - defer_buffer = defer_buffer.slice(remaining_replacement); - replaced_bytes += remaining_replacement; - } - } - console.log('pushing deferred:', defer_buffer.toString()); - passThrough.push(defer_buffer); + let last_state = null; + implied = { + get state () { + const state = + remaining > 0 ? STATE_ORIGINAL_STREAM : + new_end && org_end ? STATE_END : + new_end ? STATE_CONTINUE : + STATE_NEW_STREAM ; + // (comment to reset indentation) + if ( state !== last_state ) { + last_state = state; + if ( state.on_enter ) state.on_enter(); } - // originalDataStream.pipe(passThrough); - originalDataStream.on('data', original_stream_on_data); - originalDataStream.resume(); - }, - }; - const STATE_END = { - on_enter: () => { - console.log('STATE_END'); - passThrough.end(); + return state; }, };