feat,refactor: update notifications by filter & by id's

This commit is contained in:
Pujit Mehrotra
2024-09-26 19:31:11 -04:00
parent c84175e763
commit a1fa3462eb
4 changed files with 229 additions and 83 deletions

View File

@@ -622,8 +622,10 @@ export type Mutation = {
addDiskToArray?: Maybe<ArrayType>;
/** Add a new user */
addUser?: Maybe<User>;
archiveAll: NotificationOverview;
/** Marks a notification as archived. */
archiveNotification: NotificationOverview;
archiveNotifications: NotificationOverview;
/** Cancel parity check */
cancelParityCheck?: Maybe<Scalars['JSON']['output']>;
clearArrayDiskStatistics?: Maybe<Scalars['JSON']['output']>;
@@ -654,6 +656,8 @@ export type Mutation = {
startParityCheck?: Maybe<Scalars['JSON']['output']>;
/** Stop array */
stopArray?: Maybe<ArrayType>;
unarchiveAll: NotificationOverview;
unarchiveNotifications: NotificationOverview;
unmountArrayDisk?: Maybe<Disk>;
/** Marks a notification as unread. */
unreadNotification: NotificationOverview;
@@ -678,11 +682,21 @@ export type MutationaddUserArgs = {
};
export type MutationarchiveAllArgs = {
importance?: InputMaybe<Importance>;
};
export type MutationarchiveNotificationArgs = {
id: Scalars['String']['input'];
};
export type MutationarchiveNotificationsArgs = {
ids?: InputMaybe<Array<Scalars['String']['input']>>;
};
export type MutationclearArrayDiskStatisticsArgs = {
id: Scalars['ID']['input'];
};
@@ -751,6 +765,16 @@ export type MutationstartParityCheckArgs = {
};
export type MutationunarchiveAllArgs = {
importance?: InputMaybe<Importance>;
};
export type MutationunarchiveNotificationsArgs = {
ids?: InputMaybe<Array<Scalars['String']['input']>>;
};
export type MutationunmountArrayDiskArgs = {
id: Scalars['ID']['input'];
};
@@ -2319,7 +2343,9 @@ export type MutationResolvers<ContextType = Context, ParentType extends Resolver
addApikey?: Resolver<Maybe<ResolversTypes['ApiKey']>, ParentType, ContextType, RequireFields<MutationaddApikeyArgs, 'name'>>;
addDiskToArray?: Resolver<Maybe<ResolversTypes['Array']>, ParentType, ContextType, Partial<MutationaddDiskToArrayArgs>>;
addUser?: Resolver<Maybe<ResolversTypes['User']>, ParentType, ContextType, RequireFields<MutationaddUserArgs, 'input'>>;
archiveAll?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, Partial<MutationarchiveAllArgs>>;
archiveNotification?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, RequireFields<MutationarchiveNotificationArgs, 'id'>>;
archiveNotifications?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, Partial<MutationarchiveNotificationsArgs>>;
cancelParityCheck?: Resolver<Maybe<ResolversTypes['JSON']>, ParentType, ContextType>;
clearArrayDiskStatistics?: Resolver<Maybe<ResolversTypes['JSON']>, ParentType, ContextType, RequireFields<MutationclearArrayDiskStatisticsArgs, 'id'>>;
connectSignIn?: Resolver<ResolversTypes['Boolean'], ParentType, ContextType, RequireFields<MutationconnectSignInArgs, 'input'>>;
@@ -2341,6 +2367,8 @@ export type MutationResolvers<ContextType = Context, ParentType extends Resolver
startArray?: Resolver<Maybe<ResolversTypes['Array']>, ParentType, ContextType>;
startParityCheck?: Resolver<Maybe<ResolversTypes['JSON']>, ParentType, ContextType, Partial<MutationstartParityCheckArgs>>;
stopArray?: Resolver<Maybe<ResolversTypes['Array']>, ParentType, ContextType>;
unarchiveAll?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, Partial<MutationunarchiveAllArgs>>;
unarchiveNotifications?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, Partial<MutationunarchiveNotificationsArgs>>;
unmountArrayDisk?: Resolver<Maybe<ResolversTypes['Disk']>, ParentType, ContextType, RequireFields<MutationunmountArrayDiskArgs, 'id'>>;
unreadNotification?: Resolver<ResolversTypes['NotificationOverview'], ParentType, ContextType, RequireFields<MutationunreadNotificationArgs, 'id'>>;
updateApikey?: Resolver<Maybe<ResolversTypes['ApiKey']>, ParentType, ContextType, RequireFields<MutationupdateApikeyArgs, 'name'>>;

View File

@@ -21,6 +21,10 @@ type Mutation {
archiveNotification(id: String!): NotificationOverview!
"""Marks a notification as unread."""
unreadNotification(id: String!): NotificationOverview!
archiveNotifications(ids: [String!]): NotificationOverview!
unarchiveNotifications(ids: [String!]): NotificationOverview!
archiveAll(importance: Importance): NotificationOverview!
unarchiveAll(importance: Importance): NotificationOverview!
}
type Subscription {

View File

@@ -2,12 +2,14 @@ import type {
NotificationData,
NotificationType,
NotificationFilter,
NotificationOverview,
} from '@app/graphql/generated/api/types';
import { Args, Mutation, Query, ResolveField, Resolver, Subscription } from '@nestjs/graphql';
import { UseRoles } from 'nest-access-control';
import { createSubscription, PUBSUB_CHANNEL } from '@app/core/pubsub';
import { NotificationsService } from './notifications.service';
import { getServerIdentifier } from '@app/core/utils/server-identifier';
import { Importance } from '@app/graphql/generated/client/graphql';
@Resolver('Notifications')
export class NotificationsResolver {
@@ -71,11 +73,35 @@ export class NotificationsResolver {
return this.notificationsService.archiveNotification({ id });
}
@Mutation()
public async archiveNotifications(@Args('ids') ids: string[]) {
await this.notificationsService.archiveIds(ids);
return this.notificationsService.getOverview();
}
@Mutation()
public async archiveAll(@Args('importance') importance?: Importance): Promise<NotificationOverview> {
const { overview } = await this.notificationsService.archiveAll(importance);
return overview;
}
@Mutation()
public unreadNotification(@Args('id') id: string) {
return this.notificationsService.markAsUnread({ id });
}
@Mutation()
public async unarchiveNotifications(@Args('ids') ids: string[]) {
await this.notificationsService.unarchiveIds(ids);
return this.notificationsService.getOverview();
}
@Mutation()
public async unarchiveAll(@Args('importance') importance?: Importance): Promise<NotificationOverview> {
const { overview } = await this.notificationsService.unarchiveAll(importance);
return overview;
}
/**============================================
* Subscriptions
*=============================================**/

View File

@@ -5,9 +5,10 @@ import {
Importance,
NotificationType,
type Notification,
NotificationFilter,
NotificationOverview,
NotificationData,
type NotificationFilter,
type NotificationOverview,
type NotificationData,
type NotificationCounts,
} from '@app/graphql/generated/api/types';
import { getters } from '@app/store';
import { Injectable } from '@nestjs/common';
@@ -97,30 +98,22 @@ export class NotificationsService {
// The path looks like /{notification base path}/{type}/{notification id}
const type = path.includes('/unread/') ? NotificationType.UNREAD : NotificationType.ARCHIVE;
this.logger.debug(`Adding ${type} Notification: ${path}`);
const notification = await this.loadNotificationFile(path, NotificationType[type]);
this.increment(notification.importance, NotificationsService.overview[type.toLowerCase()]);
NotificationsService.overview[type.toLowerCase()][notification.importance.toLowerCase()] += 1;
NotificationsService.overview[type.toLowerCase()]['total'] += 1;
this.publishOverview();
pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_ADDED, {
notificationAdded: notification,
});
pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW, {
notificationsOverview: NotificationsService.overview,
});
}
private async removeFromOverview(notification: Notification) {
const { type, id, importance } = notification;
this.logger.debug(`Removing ${type} Notification: ${id}`);
NotificationsService.overview[type.toLowerCase()][importance.toLowerCase()] -= 1;
NotificationsService.overview[type.toLowerCase()]['total'] -= 1;
return pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW, {
notificationsOverview: NotificationsService.overview,
});
this.decrement(importance, NotificationsService.overview[type.toLowerCase()]);
return this.publishOverview();
}
/**
@@ -135,6 +128,22 @@ export class NotificationsService {
return structuredClone(NotificationsService.overview);
}
private publishOverview(overview = NotificationsService.overview) {
return pubsub.publish(PUBSUB_CHANNEL.NOTIFICATION_OVERVIEW, {
notificationsOverview: overview,
});
}
private increment(importance: Importance, collector: NotificationCounts) {
collector[importance.toLowerCase()] += 1;
collector['total'] += 1;
}
private decrement(importance: Importance, collector: NotificationCounts) {
collector[importance.toLowerCase()] -= 1;
collector['total'] -= 1;
}
/**------------------------------------------------------------------------
* CRUD: Creating Notifications
*------------------------------------------------------------------------**/
@@ -222,10 +231,84 @@ export class NotificationsService {
* CRUD: Updating Notifications
*------------------------------------------------------------------------**/
/**
* Returns a function that:
* 1. moves a notification from one category to another.
* 2. updates stats overview
* 3. updates the stats snapshot if provided
*
* Note: the returned function implicitly triggers a pubsub event via `fs.rename`,
* which is expected to trigger `NOTIFICATION_ADDED` & `NOTIFICATION_OVERVIEW`.
*
* The published overview will include the update from this operation.
*
* @param params
* @returns lambda function
*/
private moveNotification(params: {
from: NotificationType;
to: NotificationType;
snapshot?: NotificationOverview;
}) {
const { from, to, snapshot } = params;
const paths = this.paths();
const fromStatKey = from.toLowerCase();
const toStatKey = to.toLowerCase();
return async (notification: Notification) => {
const currentPath = join(paths[from], notification.id);
const targetPath = join(paths[to], notification.id);
/**-----------------------
* Event, PubSub, & Overview Update logic
*
* We assume `rename` kicks off 'unlink' and 'add' events
* in the chokidar file watcher (see `getNotificationsWatcher`).
*
* We assume the 'add' handler publishes to
* NOTIFICATION_ADDED & NOTIFICATION_OVERVIEW,
* and that no pubsub or overview updates occur upon 'unlink'.
*
* Thus, we explicitly update our state here via `decrement` and implicitly expect
* it to be updated (i.e. incremented & pubsub'd) via our filesystem changes.
*
* The reasons for this discrepancy are:
* - Backwards compatibility: not every notification will be created through this API,
* so we track state by watching the store (i.e. the file system).
*
* - Technical Limitations: By the time the unlink event fires, the notification file
* can no longer be read. This means we can only track overview totals accurately;
* to track other stats, we have to update them manually, prior to file deletion.
*
* Note: this introduces a pubsub race condition between this `decrement` and the `rename`.
* To ensure correctness, re-publish the overview stats after running this function.
*------------------------**/
this.decrement(notification.importance, NotificationsService.overview[fromStatKey]);
try {
await rename(currentPath, targetPath);
} catch (err) {
// revert our earlier decrement
// we do it this way (decrement -> try rename -> revert if error) to avoid
// a race condition between `rename` and `decrement`
this.increment(notification.importance, NotificationsService.overview[fromStatKey]);
throw err;
}
if (snapshot) {
this.decrement(notification.importance, snapshot[fromStatKey]);
this.increment(notification.importance, snapshot[toStatKey]);
}
};
}
public async archiveNotification({ id }: Pick<Notification, 'id'>): Promise<NotificationOverview> {
const { UNREAD, ARCHIVE } = this.paths();
const unreadPath = join(UNREAD, id);
const archivePath = join(ARCHIVE, id);
const unreadPath = join(this.paths().UNREAD, id);
// We expect to only archive 'unread' notifications, but it's possible that the notification
// has already been archived or deleted (e.g. retry logic, spike in network latency).
if (!(await fileExists(unreadPath))) {
this.logger.warn(`[archiveNotification] Could not find notification in unreads: ${id}`);
return NotificationsService.overview;
}
/**-----------------------
* Why we use a snapshot
@@ -236,88 +319,88 @@ export class NotificationsService {
* So, we use & modify a snapshot of the overview to make sure we're returning accurate
* data to the client.
*------------------------**/
const archiveSnapshot = this.getOverview().archive;
// We expect to only archive 'unread' notifications, but it's possible that the notification
// has already been archived or deleted (e.g. retry logic, spike in network latency).
if (!(await fileExists(unreadPath))) {
this.logger.warn(`[archiveNotification] Could not find notification in unreads: ${id}`);
return NotificationsService.overview;
}
const snapshot = this.getOverview();
const notification = await this.loadNotificationFile(unreadPath, NotificationType.UNREAD);
await rename(unreadPath, archivePath);
await this.removeFromOverview(notification);
archiveSnapshot.total += 1;
archiveSnapshot[notification.importance.toLowerCase()] += 1;
/**-----------------------
* Event & PubSub logic
*
* We assume `rename` kicks off 'unlink' and 'add' events
* in the chokidar file watcher.
*
* We assume the 'add' handler publishes to
* NOTIFICATION_ADDED & NOTIFICATION_OVERVIEW,
* and that no pubsub or overview updates occur upon 'unlink'.
*
* Thus, we explicitly update our state & pubsub via `removeFromOverview`
* and implicitly expect it to be updated via our filesystem changes.
*
* The reasons for this discrepancy are:
* - Backwards compatibility: not every notification will be created through this API,
* so we track state by watching the store (i.e. the file system).
*
* - Technical Limitations: By the time the unlink event fires, the notification file
* can no longer be read. This means we can only track overview totals accurately;
* to track other stats, we have to update them manually, prior to file deletion.
*------------------------**/
const moveToArchive = this.moveNotification({
from: NotificationType.UNREAD,
to: NotificationType.ARCHIVE,
snapshot,
});
await moveToArchive(notification);
return {
...NotificationsService.overview,
archive: archiveSnapshot,
archive: snapshot.archive,
};
}
public async markAsUnread({ id }: Pick<Notification, 'id'>): Promise<NotificationOverview> {
const { UNREAD, ARCHIVE } = this.paths();
const unreadPath = join(UNREAD, id);
const archivePath = join(ARCHIVE, id);
// see `archiveNotification` for why we use a snapshot
// it's b/c of a race condition
const unreadSnapshot = this.getOverview().unread;
const archivePath = join(this.paths().ARCHIVE, id);
// the target notification might not be in the archive!
if (!(await fileExists(archivePath))) {
this.logger.warn(`[markAsUnread] Could not find notification in archive: ${id}`);
return NotificationsService.overview;
}
// we use a snapshot to provide an accurate overview update
// otherwise, we'd enter a race condition with the 'add' file watcher event handler
const snapshot = this.getOverview();
const notification = await this.loadNotificationFile(archivePath, NotificationType.ARCHIVE);
await rename(archivePath, unreadPath);
// see `archiveNotification` for why there are 2 ways of updating our overview state,
// and the implications it has for updating notifications.
await this.removeFromOverview(notification);
unreadSnapshot.total += 1;
unreadSnapshot[notification.importance.toLowerCase()] += 1;
const moveToUnread = this.moveNotification({
from: NotificationType.ARCHIVE,
to: NotificationType.UNREAD,
snapshot,
});
await moveToUnread(notification);
return {
...NotificationsService.overview,
unread: unreadSnapshot,
unread: snapshot.unread,
};
}
public async archiveAll() {
public async archiveAll(importance?: Importance) {
const { UNREAD } = this.paths();
return readdir(UNREAD).then(this.archiveIds);
if (!importance) {
// use arrow function to preserve `this`
await readdir(UNREAD).then((ids) => this.archiveIds(ids));
return { overview: NotificationsService.overview };
}
const overviewSnapshot = this.getOverview();
const unreads = await this.listFilesInFolder(UNREAD);
const [notifications] = await this.loadNotificationsFromPaths(unreads, { importance });
const archive = this.moveNotification({
from: NotificationType.UNREAD,
to: NotificationType.ARCHIVE,
snapshot: overviewSnapshot,
});
const stats = await this.updateMany(notifications, archive);
return { ...stats, overview: overviewSnapshot };
}
public async unarchiveAll() {
public async unarchiveAll(importance?: Importance) {
const { ARCHIVE } = this.paths();
return readdir(ARCHIVE).then(this.unarchiveIds);
if (!importance) {
// use arrow function to preserve `this`
await readdir(ARCHIVE).then((ids) => this.unarchiveIds(ids));
return { overview: NotificationsService.overview };
}
const overviewSnapshot = this.getOverview();
const archives = await this.listFilesInFolder(ARCHIVE);
const [notifications] = await this.loadNotificationsFromPaths(archives, { importance });
const unArchive = this.moveNotification({
from: NotificationType.ARCHIVE,
to: NotificationType.UNREAD,
snapshot: overviewSnapshot,
});
const stats = await this.updateMany(notifications, unArchive);
return { ...stats, overview: overviewSnapshot };
}
/**
@@ -356,14 +439,19 @@ export class NotificationsService {
* @param action
* @returns
*/
private async updateMany<T>(notificationIds: string[], action: (id: string) => Promise<T>) {
private async updateMany<Input, T>(notificationIds: Input[], action: (id: Input) => Promise<T>) {
const processes = notificationIds.map(action);
const results = await Promise.allSettled(processes);
const succeeded = results.filter(isFulfilled).length;
const failed = results.filter(isRejected).map((result) => result.reason);
const successes = results.filter(isFulfilled);
const errors = results.filter(isRejected).map((result) => result.reason);
return { succeeded, failed, error: failed.length > 0 };
return {
data: successes,
successes: successes.length,
errors: errors,
errorOccured: errors.length > 0,
};
}
/**------------------------------------------------------------------------
@@ -412,9 +500,9 @@ export class NotificationsService {
*/
private async loadNotificationsFromPaths(
files: string[],
filters: NotificationFilter
filters: Partial<NotificationFilter>
): Promise<[Notification[], unknown[]]> {
const { limit, importance, type, offset } = filters;
const { importance, type, offset = 0, limit = files.length } = filters;
const fileReads = files
.slice(offset, limit + offset)