feat: implement continuous pinner job and add getRandomCid utility

This commit is contained in:
besoeasy
2026-01-01 20:16:23 +05:30
parent 24e8be18f0
commit d8b7e0e021
3 changed files with 50 additions and 49 deletions
+2 -3
View File
@@ -73,8 +73,8 @@ let nostrTimers = { discovery: null, pinner: null };
if (NPUB) {
runNostrJob(NPUB); // Initial run
nostrTimers.discovery = setInterval(() => runNostrJob(NPUB), NOSTR_CHECK_INTERVAL_MS);
nostrTimers.pinner = setInterval(pinnerJob, PINNER_INTERVAL_MS);
console.log(`[STARTUP] NOSTR_ENABLED npub=${NPUB} discovery_interval_ms=${NOSTR_CHECK_INTERVAL_MS} pinner_interval_ms=${PINNER_INTERVAL_MS}`);
pinnerJob(); // Start continuous pinner loop
console.log(`[STARTUP] NOSTR_ENABLED npub=${NPUB} discovery_interval_ms=${NOSTR_CHECK_INTERVAL_MS} pinner_mode=continuous`);
} else {
console.log(`[STARTUP] NOSTR_DISABLED reason=no_npub_configured`);
}
@@ -90,7 +90,6 @@ const gracefulShutdown = (signal) => {
// Clear Nostr timers
if (nostrTimers.discovery) clearInterval(nostrTimers.discovery);
if (nostrTimers.pinner) clearInterval(nostrTimers.pinner);
console.log(`[SHUTDOWN] NOSTR_TIMERS_CLEARED`);
// Give active requests 5 seconds to complete
+1
View File
@@ -371,4 +371,5 @@ module.exports = {
// Utility
getStoreStats,
getRandomCid,
};
+47 -46
View File
@@ -4,13 +4,13 @@ const { syncNostrPins } = require("./nostr");
const { pinCid } = require("./ipfs");
const {
batchInsertCids,
getPendingCidsByType,
markInProgress,
clearInProgress,
updatePinSize,
countByTypeAndStatus,
setLastPinnerActivity,
setLastNostrRun,
getRandomCid,
} = require("./database");
// Nostr discovery job
@@ -65,54 +65,55 @@ const runNostrJob = async (NPUB) => {
}
};
// Pinner job - checks if discovered CIDs are pinned, one at a time
// Pinner job - continuously picks random CIDs and ensures they're pinned
const pinnerJob = async () => {
try {
console.log(`[JOB] PINNER_START`);
const selfPending = countByTypeAndStatus('self', 'pending');
console.log(`[JOB] PINNER_QUEUE_STATUS self_pending=${selfPending}`);
let didWork = false;
if (selfPending > 0) {
const [cidObj] = getPendingCidsByType('self', 1);
if (cidObj) {
const cid = cidObj.cid;
console.log(`[JOB] PINNER_CHECKING_SELF cid=${cid} event_id=${cidObj.event_id} author=${cidObj.author} timestamp=${new Date(cidObj.timestamp * 1000).toISOString()} `);
markInProgress(cid, 'self');
didWork = true;
setLastPinnerActivity(new Date().toISOString());
pinCid(cid)
.then((result) => {
if (result.success && result.alreadyPinned) {
const sizeMB = (result.size / 1024 / 1024).toFixed(2);
updatePinSize(cid, result.size, "pinned");
console.log(`[JOB] PINNER_SELF_ALREADY_PINNED cid=${cid} status=pinned size_mb=${sizeMB} message="${result.message}"`);
} else {
updatePinSize(cid, 0, "failed");
console.log(`[JOB] PINNER_SELF_NOT_PINNED cid=${cid} status=failed message="${result.message}"`);
}
})
.catch((err) => {
updatePinSize(cid, 0, "failed");
console.error(`[JOB] PINNER_SELF_CHECK_FAILED cid=${cid} status=failed error="${err.message}"`);
})
.finally(() => {
clearInProgress(cid);
});
console.log(`[JOB] PINNER_LOOP_START continuous_mode=true`);
while (true) {
try {
// Get a random CID from database
const cidObj = getRandomCid();
if (!cidObj) {
console.log(`[JOB] PINNER_NO_CIDS waiting=5s`);
await new Promise(resolve => setTimeout(resolve, 5000));
continue;
}
const cid = cidObj.cid;
console.log(`[JOB] PINNER_SELECTED cid=${cid} type=${cidObj.type} status=${cidObj.status} event_id=${cidObj.event_id}`);
markInProgress(cid, cidObj.type);
setLastPinnerActivity(new Date().toISOString());
try {
console.log(`[JOB] PINNER_CHECKING cid=${cid}`);
const result = await pinCid(cid);
if (result.success) {
const sizeMB = (result.size / 1024 / 1024).toFixed(2);
updatePinSize(cid, result.size, "pinned");
console.log(`[JOB] PINNER_SUCCESS cid=${cid} status=pinned size_mb=${sizeMB} newly_pinned=${!result.alreadyPinned}`);
} else {
updatePinSize(cid, 0, "failed");
console.log(`[JOB] PINNER_FAILED cid=${cid} status=failed message="${result.message}"`);
}
} catch (err) {
updatePinSize(cid, 0, "failed");
console.error(`[JOB] PINNER_ERROR cid=${cid} error="${err.message}"`);
} finally {
clearInProgress(cid);
}
// Random delay between 30-100 seconds before next iteration
const delaySeconds = 30 + Math.floor(Math.random() * 71); // 30-100 seconds
console.log(`[JOB] PINNER_WAITING delay_sec=${delaySeconds}`);
await new Promise(resolve => setTimeout(resolve, delaySeconds * 1000));
} catch (err) {
console.error(`[JOB] PINNER_LOOP_ERROR error="${err.message}"`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
console.log(`[JOB] PINNER_COMPLETE work_done=${didWork}\n`);
} catch (err) {
console.error(`[JOB] PINNER_ERROR error="${err.message}"`);
console.error(`[JOB] PINNER_ERROR_STACK stack="${err.stack}"`);
}
};