feat: enhance database operations with new insert and count functions; update Nostr job to utilize batch inserts and pending counts

This commit is contained in:
besoeasy
2025-12-31 15:52:47 +05:30
parent 43f3271ba9
commit b0d80bac2c
3 changed files with 215 additions and 173 deletions
+76
View File
@@ -88,6 +88,23 @@ const getRecentPinsStmt = db.prepare(`
LIMIT ?
`);
const insertIfNotExistsStmt = db.prepare(`
INSERT OR IGNORE INTO pins (event_id, cid, size, timestamp, author, type, status)
VALUES (?, ?, 0, ?, ?, ?, 'pending')
`);
const getPendingByTypeStmt = db.prepare(`
SELECT * FROM pins
WHERE type = ? AND status = 'pending'
ORDER BY timestamp DESC
LIMIT ?
`);
const countByTypeAndStatusStmt = db.prepare(`
SELECT COUNT(*) as count FROM pins
WHERE type = ? AND status = ?
`);
// Functions
const recordPin = ({ eventId, cid, size = 0, timestamp, author, type, status = 'pinned' }) => {
try {
@@ -100,6 +117,42 @@ const recordPin = ({ eventId, cid, size = 0, timestamp, author, type, status = '
}
};
const insertCidIfNotExists = ({ eventId, cid, timestamp, author, type }) => {
try {
const result = insertIfNotExistsStmt.run(eventId, cid, timestamp, author, type);
return result.changes > 0; // Returns true if inserted, false if already exists
} catch (err) {
console.error(`[DB] Failed to insert CID:`, err.message);
return false;
}
};
const batchInsertCids = (cids) => {
const insertMany = db.transaction((cidList) => {
let inserted = 0;
for (const cidObj of cidList) {
const result = insertIfNotExistsStmt.run(
cidObj.eventId,
cidObj.cid,
cidObj.timestamp,
cidObj.author,
cidObj.type
);
if (result.changes > 0) inserted++;
}
return inserted;
});
try {
const inserted = insertMany(cids);
console.log(`[DB] Batch inserted ${inserted} new CIDs (${cids.length - inserted} duplicates ignored)`);
return inserted;
} catch (err) {
console.error(`[DB] Failed to batch insert:`, err.message);
return 0;
}
};
const updatePinSize = (cid, size, status = 'pinned') => {
try {
updatePinSizeStmt.run(size, status, cid);
@@ -166,6 +219,25 @@ const getRecentPins = (limit = 10) => {
}
};
const getPendingCidsByType = (type, limit = 1) => {
try {
return getPendingByTypeStmt.all(type, limit);
} catch (err) {
console.error(`[DB] Failed to get pending CIDs:`, err.message);
return [];
}
};
const countByTypeAndStatus = (type, status) => {
try {
const result = countByTypeAndStatusStmt.get(type, status);
return result ? result.count : 0;
} catch (err) {
console.error(`[DB] Failed to count:`, err.message);
return 0;
}
};
// Cleanup on exit
process.on('exit', () => {
db.close();
@@ -186,4 +258,8 @@ module.exports = {
getStats,
getTotalCount,
getRecentPins,
insertCidIfNotExists,
batchInsertCids,
getPendingCidsByType,
countByTypeAndStatus,
};
+119 -158
View File
@@ -2,19 +2,17 @@
const { syncNostrPins, syncFollowPins } = require("./nostr");
const {
getSelfQueue,
getFriendsQueue,
addToSelfQueue,
addToFriendsQueue,
removeFromSelfQueue,
incrementPinnedSelf,
incrementCachedFriends,
setLastPinnerActivity,
setLastNostrRun,
} = require("./queue");
const { isPinned, pinCid, addCid, getCidSize } = require("./nostr");
const { recordPin, updatePinSize, getPinByCid } = require("./database");
const {
batchInsertCids,
getPendingCidsByType,
updatePinSize,
countByTypeAndStatus,
} = require("./database");
// Nostr discovery job
const runNostrJob = async (NPUB) => {
@@ -34,51 +32,51 @@ const runNostrJob = async (NPUB) => {
const selfResult = await syncNostrPins({ npubOrPubkey: NPUB, dryRun: true });
const friendsResult = await syncFollowPins({ npubOrPubkey: NPUB, dryRun: true });
// Add discovered CIDs to queues (avoid duplicates)
const selfCids = selfResult.plannedPins || [];
const friendCids = friendsResult.plannedAdds || [];
// Prepare CIDs for database insertion
const selfCids = (selfResult.plannedPins || []).map(cidObj => ({
...cidObj,
type: 'self'
}));
const friendCids = (friendsResult.plannedAdds || []).map(cidObj => ({
...cidObj,
type: 'friend'
}));
const selfQueue = getSelfQueue();
const friendsQueue = getFriendsQueue();
// Batch insert to database (duplicates automatically ignored)
const allCids = [...selfCids, ...friendCids];
const insertedCount = batchInsertCids(allCids);
const selfCidSet = new Set(selfQueue.map((obj) => obj.cid));
const friendCidSet = new Set(friendsQueue.map((obj) => obj.cid));
const newSelfCids = selfCids.filter((cidObj) => !selfCidSet.has(cidObj.cid));
const newFriendCids = friendCids.filter((cidObj) => !friendCidSet.has(cidObj.cid));
addToSelfQueue(newSelfCids);
addToFriendsQueue(newFriendCids);
// Get current pending counts
const selfPending = countByTypeAndStatus('self', 'pending');
const friendsPending = countByTypeAndStatus('friend', 'pending');
setLastNostrRun({
at: new Date().toISOString(),
self: {
eventsScanned: selfResult.eventsScanned,
cidsFound: selfResult.cidsFound,
newCids: newSelfCids.length,
queueSize: selfQueue.length + newSelfCids.length,
newCids: selfCids.length,
pendingInDb: selfPending,
},
friends: {
eventsScanned: friendsResult.eventsScanned,
cidsFound: friendsResult.cidsFound,
newCids: newFriendCids.length,
queueSize: friendsQueue.length + newFriendCids.length,
newCids: friendCids.length,
pendingInDb: friendsPending,
},
error: null,
});
console.log("\n=== Discovery Summary ===");
console.log({
self: {
discovered: selfCids.length,
new: newSelfCids.length,
queueSize: getSelfQueue().length,
},
friends: {
discovered: friendCids.length,
new: newFriendCids.length,
queueSize: getFriendsQueue().length,
},
discovered: allCids.length,
inserted: insertedCount,
duplicates: allCids.length - insertedCount,
database: {
selfPending: selfPending,
friendsPending: friendsPending,
}
});
} catch (err) {
setLastNostrRun({
@@ -94,156 +92,119 @@ const runNostrJob = async (NPUB) => {
// Pinner job
const pinnerJob = async () => {
try {
const selfQueue = getSelfQueue();
const friendsQueue = getFriendsQueue();
console.log(`\n════ Pinner Job Started ════`);
console.log(`Queue Status: Self=${selfQueue.length}, Friends=${friendsQueue.length}`);
// Get counts
const selfPending = countByTypeAndStatus('self', 'pending');
const friendsPending = countByTypeAndStatus('friend', 'pending');
console.log(`Database Status: Self Pending=${selfPending}, Friends Pending=${friendsPending}`);
let didWork = false;
// Process self queue: pin CID
if (selfQueue.length > 0) {
let cidToPinIndex = -1;
let cidToPin = null;
const checkedIndices = new Set();
// Keep trying random CIDs until we find one that's not pinned
while (checkedIndices.size < selfQueue.length) {
const randomIndex = Math.floor(Math.random() * selfQueue.length);
if (checkedIndices.has(randomIndex)) {
continue;
}
checkedIndices.add(randomIndex);
const cidObj = selfQueue[randomIndex];
// Process self queue: pin CID (permanent)
if (selfPending > 0) {
const pendingCids = getPendingCidsByType('self', 1);
if (pendingCids.length > 0) {
const cidObj = pendingCids[0];
const cid = cidObj.cid;
const primalLink = `https://primal.net/e/${cidObj.eventId}`;
console.log(`\n[Self] Checking CID (${selfQueue.length} in queue): ${cid}`);
const primalLink = `https://primal.net/e/${cidObj.event_id}`;
console.log(`\n[Self] Processing CID: ${cid}`);
console.log(` Event: ${primalLink}`);
console.log(` Author: ${cidObj.author} | Time: ${new Date(cidObj.timestamp * 1000).toISOString()}`);
// Check if already pinned in IPFS
const alreadyPinned = await isPinned(cid);
if (alreadyPinned) {
console.log(`⏭️ Already pinned, removing from queue: ${cid}`);
removeFromSelfQueue(randomIndex);
incrementPinnedSelf();
console.log(`⏭️ Already pinned in IPFS, updating database`);
try {
const size = await getCidSize(cid);
updatePinSize(cid, size, "pinned");
} catch (err) {
updatePinSize(cid, 0, "pinned");
}
didWork = true;
// Adjust checked indices after splice
const newCheckedIndices = new Set();
checkedIndices.forEach((idx) => {
if (idx < randomIndex) {
newCheckedIndices.add(idx);
} else if (idx > randomIndex) {
newCheckedIndices.add(idx - 1);
}
});
checkedIndices.clear();
newCheckedIndices.forEach((idx) => checkedIndices.add(idx));
} else {
// Found an unpinned CID
cidToPinIndex = randomIndex;
cidToPin = cid;
break;
console.log(`\n[Self] Pinning CID: ${cid}`);
// Fire-and-forget: start pinning without waiting
pinCid(cid)
.then(async () => {
console.log(`✓ Successfully pinned: ${cid}`);
try {
const size = await getCidSize(cid);
updatePinSize(cid, size, "pinned");
} catch (err) {
updatePinSize(cid, 0, "pinned");
}
})
.catch((err) => {
console.error(`❌ Failed to pin ${cid}:`, err.message);
updatePinSize(cid, 0, "failed");
});
didWork = true;
}
}
if (cidToPin) {
const cidObj = selfQueue[cidToPinIndex];
console.log(`\n[Self] Pinning CID: ${cidToPin}`);
// Record to database first (as pending)
recordPin({
eventId: cidObj.eventId,
cid: cidToPin,
size: 0,
timestamp: cidObj.timestamp,
author: cidObj.author,
type: "self",
status: "pending",
});
// Fire-and-forget: start pinning without waiting
pinCid(cidToPin)
.then(async () => {
console.log(`✓ Successfully pinned: ${cidToPin}`);
// Try to get size after pinning
try {
const size = await getCidSize(cidToPin);
updatePinSize(cidToPin, size, "pinned");
} catch (err) {
updatePinSize(cidToPin, 0, "pinned");
}
})
.catch((err) => {
console.error(`❌ Failed to pin ${cidToPin}:`, err.message);
updatePinSize(cidToPin, 0, "failed");
});
removeFromSelfQueue(cidToPinIndex);
incrementPinnedSelf();
console.log(`📊 Counter updated: totalPinnedSelf = ${incrementPinnedSelf.length}`);
console.log(`📋 Queue updated: ${getSelfQueue().length} CIDs remaining`);
didWork = true;
} else if (checkedIndices.size > 0) {
console.log(`✓ All checked CIDs were already pinned and removed`);
}
} else {
console.log(`[Self] Queue empty, nothing to process`);
console.log(`[Self] No pending CIDs in database`);
}
// Process friends queue: cache CID
if (friendsQueue.length > 0) {
const randomIndex = Math.floor(Math.random() * friendsQueue.length);
const cidObj = friendsQueue[randomIndex];
const cid = cidObj.cid;
// Process friends queue: cache CID (ephemeral)
if (friendsPending > 0) {
const pendingCids = getPendingCidsByType('friend', 1);
if (pendingCids.length > 0) {
const cidObj = pendingCids[0];
const cid = cidObj.cid;
const primalLink = `https://primal.net/e/${cidObj.eventId}`;
console.log(`\n[Friend] Caching CID (${friendsQueue.length} in queue): ${cid}`);
console.log(` Event: ${primalLink}`);
console.log(` Author: ${cidObj.author} | Time: ${new Date(cidObj.timestamp * 1000).toISOString()}`);
const primalLink = `https://primal.net/e/${cidObj.event_id}`;
console.log(`\n[Friend] Processing CID: ${cid}`);
console.log(` Event: ${primalLink}`);
console.log(` Author: ${cidObj.author} | Time: ${new Date(cidObj.timestamp * 1000).toISOString()}`);
// Record to database first (as pending)
recordPin({
eventId: cidObj.eventId,
cid: cid,
size: 0,
timestamp: cidObj.timestamp,
author: cidObj.author,
type: "friend",
status: "pending",
});
// Fire-and-forget: start caching without waiting
addCid(cid)
.then((result) => {
console.log(`✓ Successfully cached: ${cid}`);
// Update with actual size from result
const size = result?.size || 0;
updatePinSize(cid, size, "cached");
})
.catch((err) => {
console.error(`❌ Failed to cache ${cid}:`, err.message);
updatePinSize(cid, 0, "failed");
});
removeFromFriendsQueue(randomIndex);
incrementCachedFriends();
console.log(`📊 Counter updated: totalCachedFriends = ${incrementCachedFriends.length}`);
console.log(`📋 Queue updated: ${getFriendsQueue().length} CIDs remaining`);
didWork = true;
// Check if already available locally in IPFS
const alreadyAvailable = await isPinned(cid);
if (alreadyAvailable) {
console.log(`⏭️ Already available locally, updating database`);
try {
const size = await getCidSize(cid);
updatePinSize(cid, size, "cached");
} catch (err) {
updatePinSize(cid, 0, "cached");
}
didWork = true;
} else {
console.log(`\n[Friend] Caching CID: ${cid}`);
// Fire-and-forget: start caching without waiting
addCid(cid)
.then((result) => {
console.log(`✓ Successfully cached: ${cid}`);
const size = result?.size || 0;
updatePinSize(cid, size, "cached");
})
.catch((err) => {
console.error(`❌ Failed to cache ${cid}:`, err.message);
updatePinSize(cid, 0, "failed");
});
didWork = true;
}
}
} else {
console.log(`[Friend] Queue empty, nothing to process`);
console.log(`[Friend] No pending CIDs in database`);
}
if (didWork) {
setLastPinnerActivity(new Date().toISOString());
console.log(`\n⏰ Activity timestamp updated`);
} else {
console.log(`\n⏸ No work performed - all queues empty`);
console.log(`\n⏸ No work performed - no pending CIDs`);
}
console.log(`════ Pinner Job Complete ════\n`);
+20 -15
View File
@@ -8,10 +8,6 @@ const mime = require("mime-types");
const { IPFS_API, STORAGE_MAX, FILE_LIMIT, formatBytes } = require("./config");
const { getPinnedSize, checkIPFSHealth, getIPFSStats } = require("./ipfs");
const {
getSelfQueue,
getFriendsQueue,
getTotalPinnedSelf,
getTotalCachedFriends,
getLastPinnerActivity,
getLastNostrRun,
} = require("./queue");
@@ -29,6 +25,7 @@ const {
getStats,
getTotalCount,
getRecentPins,
countByTypeAndStatus,
} = require("./database");
const unlinkAsync = promisify(fs.unlink);
@@ -128,6 +125,15 @@ const nostrHandler = async (req, res, NPUB) => {
const operatorNpub = NPUB.startsWith("npub") ? NPUB : toNpub(NPUB);
// Get counts from database
const selfPinned = countByTypeAndStatus('self', 'pinned');
const selfPending = countByTypeAndStatus('self', 'pending');
const selfFailed = countByTypeAndStatus('self', 'failed');
const friendsCached = countByTypeAndStatus('friend', 'cached');
const friendsPending = countByTypeAndStatus('friend', 'pending');
const friendsFailed = countByTypeAndStatus('friend', 'failed');
// Build lastRun object
let lastRun = null;
if (lastNostrRun?.at) {
@@ -146,21 +152,20 @@ const nostrHandler = async (req, res, NPUB) => {
friends: friendsList,
repo,
pins: {
self: getTotalPinnedSelf(),
friends: getTotalCachedFriends(),
total: getTotalPinnedSelf() + getTotalCachedFriends(),
totalSize: pinnedStats.totalSize,
pinnedCount: pinnedStats.count,
},
queues: {
self: {
pending: getSelfQueue().length,
processed: getTotalPinnedSelf(),
pinned: selfPinned,
pending: selfPending,
failed: selfFailed,
total: selfPinned + selfPending + selfFailed,
},
friends: {
pending: getFriendsQueue().length,
processed: getTotalCachedFriends(),
cached: friendsCached,
pending: friendsPending,
failed: friendsFailed,
total: friendsCached + friendsPending + friendsFailed,
},
totalSize: pinnedStats.totalSize,
pinnedCount: pinnedStats.count,
},
activity: {
lastDiscovery: lastNostrRun?.at || null,