Implement live progress tracking for bulk metadata refresh (remaining books, status, cancel support)

This commit is contained in:
aditya.chandel
2025-08-09 23:47:47 -06:00
committed by Aditya Chandel
parent 2c078716bf
commit b1508493cf
23 changed files with 586 additions and 112 deletions

View File

@@ -0,0 +1,30 @@
package com.adityachandel.booklore.controller;
import com.adityachandel.booklore.quartz.JobSchedulerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/api/v1/tasks")
@RequiredArgsConstructor
public class TaskController {
private final JobSchedulerService jobSchedulerService;
@DeleteMapping("/{taskId}")
public ResponseEntity<?> cancelTask(@PathVariable String taskId) {
log.info("Received request to cancel task: {}", taskId);
boolean cancelled = jobSchedulerService.cancelJob(taskId);
if (cancelled) {
return ResponseEntity.ok(Map.of("message", "Task cancellation scheduled"));
} else {
return ResponseEntity.badRequest().body(Map.of("error", "Failed to cancel task or task not found"));
}
}
}

View File

@@ -85,6 +85,14 @@ public class GlobalExceptionHandler {
return new ResponseEntity<>(errorResponse, HttpStatus.OK);
}
@ExceptionHandler(InterruptedException.class)
public ResponseEntity<ErrorResponse> handleInterruptedException(InterruptedException ex) {
log.info("Request was interrupted: {}", ex.getMessage() != null ? ex.getMessage() : "Thread interrupted");
Thread.currentThread().interrupt();
ErrorResponse errorResponse = new ErrorResponse(HttpStatus.OK.value(), "Request was cancelled.");
return new ResponseEntity<>(errorResponse, HttpStatus.OK);
}
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleGenericException(Exception ex) {
ErrorResponse errorResponse = new ErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), "An unexpected error occurred.");

View File

@@ -0,0 +1,36 @@
package com.adityachandel.booklore.model.dto;
import com.adityachandel.booklore.model.enums.EventTaskType;
import com.adityachandel.booklore.model.enums.TaskStatus;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskMessage {
private String taskId;
@Builder.Default
private Instant timestamp = Instant.now();
private String message;
private EventTaskType taskType;
private TaskStatus status;
@JsonProperty
public String getTitle() {
return taskType != null ? taskType.getTitle() : null;
}
@JsonProperty
public boolean isCancellable() {
return taskType != null && taskType.isCancellable();
}
}

View File

@@ -0,0 +1,13 @@
package com.adityachandel.booklore.model.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
public enum EventTaskType {
METADATA_REFRESH("Metadata Refresh", true);
private final String title;
private final boolean cancellable;
}

View File

@@ -0,0 +1,9 @@
package com.adityachandel.booklore.model.enums;
public enum TaskStatus {
IN_PROGRESS,
CANCELLED,
COMPLETED,
FAILED
}

View File

@@ -10,7 +10,7 @@ public enum Topic {
BOOK_METADATA_BATCH_UPDATE("/topic/book-metadata-batch-update"),
BOOK_METADATA_BATCH_PROGRESS("/topic/book-metadata-batch-progress"),
BOOKDROP_FILE("/topic/bookdrop-file"),
TASK("/topic/task"),
LOG("/topic/log");
private final String path;

View File

@@ -9,7 +9,9 @@ import org.quartz.*;
import org.quartz.impl.matchers.KeyMatcher;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@Slf4j
@@ -20,6 +22,7 @@ public class JobSchedulerService {
private final Scheduler scheduler;
private final BlockingQueue<RefreshJobWrapper> jobQueue = new LinkedBlockingQueue<>();
private final Map<String, String> runningJobs = new ConcurrentHashMap<>();
private boolean isJobRunning = false;
public synchronized void scheduleMetadataRefresh(MetadataRefreshRequest request, Long userId) {
@@ -47,7 +50,6 @@ public class JobSchedulerService {
log.info("Processing job from queue. Remaining queue size: {}", jobQueue.size());
String jobId = generateUniqueJobId(request);
try {
log.info("Scheduling job with ID: {}", jobId);
scheduleJob(request, userId, jobId);
} catch (Exception e) {
isJobRunning = false;
@@ -57,11 +59,39 @@ public class JobSchedulerService {
}
}
public synchronized boolean cancelJob(String jobId) {
try {
String quartzJobId = runningJobs.get(jobId);
if (quartzJobId == null) {
log.warn("Job with ID {} not found in running jobs", jobId);
return false;
}
JobKey jobKey = new JobKey(quartzJobId, "metadataRefreshJobGroup");
boolean cancelled = scheduler.interrupt(jobKey);
if (cancelled) {
runningJobs.remove(jobId);
isJobRunning = false;
log.info("Job {} cancellation scheduled", jobId);
processQueue();
} else {
log.warn("Failed to cancel job {}", jobId);
}
return cancelled;
} catch (UnableToInterruptJobException e) {
log.error("Job {} cannot be interrupted: {}", jobId, e.getMessage());
return false;
}
}
private void scheduleJob(MetadataRefreshRequest request, Long userId, String jobId) {
try {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("request", request);
jobDataMap.put("userId", userId);
jobDataMap.put("jobId", jobId);
JobDetail jobDetail = JobBuilder.newJob(RefreshMetadataJob.class)
.withIdentity(jobId, "metadataRefreshJobGroup")
@@ -74,6 +104,8 @@ public class JobSchedulerService {
.startNow()
.build();
runningJobs.put(jobId, jobId);
scheduler.scheduleJob(jobDetail, trigger);
log.info("Job scheduled successfully with ID: {}", jobId);
@@ -99,6 +131,7 @@ public class JobSchedulerService {
if (jobException != null) {
log.error("Job execution encountered an error: {}", jobException.getMessage(), jobException);
}
runningJobs.remove(jobId);
isJobRunning = false;
log.debug("Job completion handled. Processing next job in the queue.");
processQueue();

View File

@@ -1,32 +1,57 @@
package com.adityachandel.booklore.quartz;
import com.adityachandel.booklore.model.dto.request.MetadataRefreshRequest;
import com.adityachandel.booklore.service.metadata.BookMetadataService;
import com.adityachandel.booklore.service.metadata.MetadataRefreshService;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@AllArgsConstructor
@RequiredArgsConstructor
@DisallowConcurrentExecution
public class RefreshMetadataJob implements Job {
public class RefreshMetadataJob implements InterruptableJob {
private MetadataRefreshService metadataRefreshService;
private final MetadataRefreshService metadataRefreshService;
private volatile Thread executionThread;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
executionThread = Thread.currentThread();
try {
MetadataRefreshRequest request = (MetadataRefreshRequest) context.getMergedJobDataMap().get("request");
Long userId = (Long) context.getMergedJobDataMap().get("userId");
metadataRefreshService.refreshMetadata(request, userId);
} catch (Exception e) {
String jobId = (String) context.getMergedJobDataMap().get("jobId");
log.info("Starting metadata refresh job with ID: {}", jobId);
metadataRefreshService.refreshMetadata(request, userId, jobId);
log.info("Completed metadata refresh job with ID: {}", jobId);
} catch (RuntimeException e) {
if (e.getCause() instanceof InterruptedException) {
log.info("Metadata refresh job with ID was interrupted: {}", context.getMergedJobDataMap().get("jobId"));
Thread.currentThread().interrupt();
return;
}
throw new JobExecutionException("Error occurred while executing metadata refresh job", e);
} catch (Exception e) {
if (Thread.currentThread().isInterrupted()) {
log.info("Metadata refresh job was interrupted");
Thread.currentThread().interrupt();
return;
}
throw new JobExecutionException("Error occurred while executing metadata refresh job", e);
} finally {
executionThread = null;
}
}
@Override
public void interrupt() throws UnableToInterruptJobException {
Thread thread = executionThread;
if (thread != null) {
thread.interrupt();
} else {
log.warn("No execution thread found to interrupt");
}
}
}

View File

@@ -5,6 +5,7 @@ import com.adityachandel.booklore.mapper.BookMapper;
import com.adityachandel.booklore.model.MetadataUpdateWrapper;
import com.adityachandel.booklore.model.dto.Book;
import com.adityachandel.booklore.model.dto.BookMetadata;
import com.adityachandel.booklore.model.dto.TaskMessage;
import com.adityachandel.booklore.model.dto.MetadataBatchProgressNotification;
import com.adityachandel.booklore.model.dto.request.FetchMetadataRequest;
import com.adityachandel.booklore.model.dto.request.MetadataRefreshOptions;
@@ -17,12 +18,13 @@ import com.adityachandel.booklore.model.entity.MetadataFetchProposalEntity;
import com.adityachandel.booklore.model.enums.FetchedMetadataProposalStatus;
import com.adityachandel.booklore.model.enums.MetadataFetchTaskStatus;
import com.adityachandel.booklore.model.enums.MetadataProvider;
import com.adityachandel.booklore.model.enums.EventTaskType;
import com.adityachandel.booklore.model.enums.TaskStatus;
import com.adityachandel.booklore.model.websocket.Topic;
import com.adityachandel.booklore.repository.BookRepository;
import com.adityachandel.booklore.repository.LibraryRepository;
import com.adityachandel.booklore.repository.MetadataFetchJobRepository;
import com.adityachandel.booklore.repository.MetadataFetchProposalRepository;
import com.adityachandel.booklore.service.BookQueryService;
import com.adityachandel.booklore.service.NotificationService;
import com.adityachandel.booklore.service.appsettings.AppSettingService;
import com.adityachandel.booklore.service.metadata.parser.BookParser;
@@ -41,7 +43,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static com.adityachandel.booklore.model.enums.MetadataProvider.*;
import static com.adityachandel.booklore.model.websocket.LogNotification.createLogNotification;
@Slf4j
@AllArgsConstructor
@@ -61,85 +62,187 @@ public class MetadataRefreshService {
private final PlatformTransactionManager transactionManager;
public void refreshMetadata(MetadataRefreshRequest request, Long userId) {
log.info("Refresh Metadata task started!");
if (Boolean.TRUE.equals(request.getQuick())) {
AppSettings appSettings = appSettingService.getAppSettings();
request.setRefreshOptions(appSettings.getMetadataRefreshOptions());
}
List<MetadataProvider> providers = prepareProviders(request);
Set<Long> bookIds = getBookEntities(request);
boolean isReviewMode = Boolean.TRUE.equals(request.getRefreshOptions().getReviewBeforeApply());
MetadataFetchJobEntity task;
String taskId;
if (isReviewMode) {
taskId = UUID.randomUUID().toString();
task = MetadataFetchJobEntity.builder()
.taskId(taskId)
.userId(userId)
.status(MetadataFetchTaskStatus.IN_PROGRESS)
.startedAt(Instant.now())
.totalBooksCount(bookIds.size())
.completedBooks(0)
.build();
metadataFetchJobRepository.save(task);
} else {
taskId = null;
task = null;
}
TransactionTemplate txTemplate = new TransactionTemplate(transactionManager);
int completedCount = 0;
public void refreshMetadata(MetadataRefreshRequest request, Long userId, String jobId) {
try {
if (Boolean.TRUE.equals(request.getQuick())) {
AppSettings appSettings = appSettingService.getAppSettings();
request.setRefreshOptions(appSettings.getMetadataRefreshOptions());
}
List<MetadataProvider> providers = prepareProviders(request);
Set<Long> bookIds = getBookEntities(request);
boolean isReviewMode = Boolean.TRUE.equals(request.getRefreshOptions().getReviewBeforeApply());
MetadataFetchJobEntity task;
if (isReviewMode) {
task = MetadataFetchJobEntity.builder()
.taskId(jobId)
.userId(userId)
.status(MetadataFetchTaskStatus.IN_PROGRESS)
.startedAt(Instant.now())
.totalBooksCount(bookIds.size())
.completedBooks(0)
.build();
metadataFetchJobRepository.save(task);
} else {
task = null;
}
TransactionTemplate txTemplate = new TransactionTemplate(transactionManager);
int completedCount = 0;
for (Long bookId : bookIds) {
checkForInterruption(jobId, task, bookIds.size());
int finalCompletedCount = completedCount;
txTemplate.execute(status -> {
BookEntity book = bookRepository.findAllWithMetadataByIds(Collections.singleton(bookId))
.stream().findFirst()
.orElseThrow(() -> ApiError.BOOK_NOT_FOUND.createException(bookId));
try {
checkForInterruption(jobId, task, bookIds.size());
if (book.getMetadata().areAllFieldsLocked()) {
log.info("Skipping locked book: {}", book.getFileName());
notificationService.sendMessage(Topic.LOG,
createLogNotification("Book '" + book.getMetadata().getTitle() + "' is locked."));
sendTaskNotification(jobId, "Skipped locked book: " + book.getMetadata().getTitle(), TaskStatus.IN_PROGRESS);
return null;
}
reportProgressIfNeeded(task, taskId, finalCompletedCount, bookIds.size(), book);
reportProgressIfNeeded(task, jobId, finalCompletedCount, bookIds.size(), book);
Map<MetadataProvider, BookMetadata> metadataMap = fetchMetadataForBook(providers, book);
if (providers.contains(GoodReads)) Thread.sleep(ThreadLocalRandom.current().nextLong(500, 1500));
if (providers.contains(GoodReads)) {
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(500, 1500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
status.setRollbackOnly();
return null;
}
}
BookMetadata fetched = buildFetchMetadata(book.getId(), request, metadataMap);
if (isReviewMode) saveProposal(taskId, book.getId(), fetched);
else updateBookMetadata(book, fetched, request.getRefreshOptions().isRefreshCovers(),
request.getRefreshOptions().isMergeCategories());
if (isReviewMode) {
saveProposal(task.getTaskId(), book.getId(), fetched);
} else {
updateBookMetadata(book, fetched, request.getRefreshOptions().isRefreshCovers(), request.getRefreshOptions().isMergeCategories());
sendTaskProgressNotification(jobId, finalCompletedCount + 1, bookIds.size(), "Metadata updated: " + book.getMetadata().getTitle());
}
} catch (Exception e) {
if (Thread.currentThread().isInterrupted()) {
log.info("Processing interrupted for book: {}", book.getFileName());
status.setRollbackOnly();
return null;
}
log.error("Metadata update failed for book: {}", book.getFileName(), e);
sendTaskNotification(jobId, String.format("Failed to process: %s - %s", book.getMetadata().getTitle(), e.getMessage()), TaskStatus.FAILED);
}
bookRepository.saveAndFlush(book);
return null;
});
completedCount++;
}
if (isReviewMode) completeTask(task, completedCount, bookIds.size());
log.info("Metadata refresh task {} completed successfully", jobId);
sendTaskNotification(jobId, String.format("Metadata refresh completed successfully - processed %d books", completedCount), TaskStatus.COMPLETED);
} catch (RuntimeException e) {
if (e.getCause() instanceof InterruptedException) {
log.info("Metadata refresh task {} cancelled successfully", jobId);
return;
}
log.error("Fatal error during metadata refresh", e);
sendTaskNotification(jobId, "Fatal error during metadata refresh: " + e.getMessage(), TaskStatus.FAILED);
throw e;
} catch (Exception fatal) {
log.error("Fatal error during metadata refresh", fatal);
if (isReviewMode) failTask(task, bookIds.size(), fatal.getMessage());
sendTaskNotification(jobId, "Fatal error during metadata refresh: " + fatal.getMessage(), TaskStatus.FAILED);
throw fatal;
}
log.info("Metadata refresh task completed!");
}
public Map<MetadataProvider, BookMetadata> fetchMetadataForBook(List<MetadataProvider> providers, Book book) {
return providers.stream()
.map(provider -> createInterruptibleMetadataFuture(() -> fetchTopMetadataFromAProvider(provider, book)))
.map(this::joinFutureSafely)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
BookMetadata::getProvider,
metadata -> metadata,
(existing, replacement) -> existing
));
}
protected Map<MetadataProvider, BookMetadata> fetchMetadataForBook(List<MetadataProvider> providers, BookEntity bookEntity) {
Book book = bookMapper.toBook(bookEntity);
return providers.stream()
.map(provider -> createInterruptibleMetadataFuture(() -> fetchTopMetadataFromAProvider(provider, book)))
.map(this::joinFutureSafely)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
BookMetadata::getProvider,
metadata -> metadata,
(existing, replacement) -> existing
));
}
private CompletableFuture<BookMetadata> createInterruptibleMetadataFuture(java.util.function.Supplier<BookMetadata> metadataSupplier) {
return CompletableFuture.supplyAsync(() -> {
if (Thread.currentThread().isInterrupted()) {
log.info("Skipping metadata fetch due to interruption");
return null;
}
return metadataSupplier.get();
}).exceptionally(e -> {
if (e.getCause() instanceof InterruptedException) {
log.info("Metadata fetch was interrupted");
Thread.currentThread().interrupt();
return null;
}
log.error("Error fetching metadata from provider", e);
return null;
});
}
private BookMetadata joinFutureSafely(CompletableFuture<BookMetadata> future) {
try {
return future.join();
} catch (Exception e) {
if (Thread.currentThread().isInterrupted()) {
log.info("Future join interrupted");
return null;
}
throw e;
}
}
private void checkForInterruption(String jobId, MetadataFetchJobEntity task, int totalBooks) {
if (Thread.currentThread().isInterrupted()) {
log.info("Metadata refresh task {} cancelled by user request", jobId);
sendTaskNotification(jobId, "Task cancelled by user", TaskStatus.CANCELLED);
if (task != null) {
failTask(task, totalBooks);
}
throw new RuntimeException(new InterruptedException("Task was cancelled"));
}
}
private void sendTaskNotification(String taskId, String message, TaskStatus status) {
notificationService.sendMessage(Topic.TASK, TaskMessage.builder()
.taskId(taskId)
.taskType(EventTaskType.METADATA_REFRESH)
.message(message)
.status(status)
.build());
}
private void sendTaskProgressNotification(String taskId, int current, int total, String message) {
sendTaskNotification(taskId, String.format("(%d/%d) %s", current, total, message), TaskStatus.IN_PROGRESS);
}
private void reportProgressIfNeeded(MetadataFetchJobEntity task, String taskId, int completedCount, int total, BookEntity book) {
if (task == null) return;
task.setCompletedBooks(completedCount);
metadataFetchJobRepository.save(task);
String message = String.format("Processing '%s'", book.getMetadata().getTitle());
notificationService.sendMessage(Topic.BOOK_METADATA_BATCH_PROGRESS,
new MetadataBatchProgressNotification(
taskId, completedCount, total, message, MetadataFetchTaskStatus.IN_PROGRESS.name()
@@ -159,14 +262,14 @@ public class MetadataRefreshService {
));
}
private void failTask(MetadataFetchJobEntity task, int total, String errorMessage) {
private void failTask(MetadataFetchJobEntity task, int total) {
task.setStatus(MetadataFetchTaskStatus.ERROR);
task.setCompletedAt(Instant.now());
metadataFetchJobRepository.save(task);
notificationService.sendMessage(Topic.BOOK_METADATA_BATCH_PROGRESS,
new MetadataBatchProgressNotification(
task.getTaskId(), 0, total, "Error: " + errorMessage,
task.getTaskId(), 0, total, "Error: " + "Task was cancelled",
MetadataFetchTaskStatus.ERROR.name()
));
}
@@ -196,7 +299,6 @@ public class MetadataRefreshService {
Book book = bookMapper.toBook(bookEntity);
notificationService.sendMessage(Topic.BOOK_METADATA_UPDATE, book);
notificationService.sendMessage(Topic.LOG, createLogNotification("Book metadata updated: " + book.getMetadata().getTitle()));
}
}
@@ -229,37 +331,6 @@ public class MetadataRefreshService {
}
}
public Map<MetadataProvider, BookMetadata> fetchMetadataForBook(List<MetadataProvider> providers, Book book) {
return providers.stream()
.map(provider -> CompletableFuture.supplyAsync(() -> fetchTopMetadataFromAProvider(provider, book))
.exceptionally(e -> {
log.error("Error fetching metadata from provider: {}", provider, e);
return null;
}))
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
BookMetadata::getProvider,
metadata -> metadata,
(existing, replacement) -> existing
));
}
protected Map<MetadataProvider, BookMetadata> fetchMetadataForBook(List<MetadataProvider> providers, BookEntity bookEntity) {
return providers.stream()
.map(provider -> CompletableFuture.supplyAsync(() -> fetchTopMetadataFromAProvider(provider, bookMapper.toBook(bookEntity)))
.exceptionally(e -> {
log.error("Error fetching metadata from provider: {}", provider, e);
return null;
}))
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
BookMetadata::getProvider,
metadata -> metadata,
(existing, replacement) -> existing
));
}
public BookMetadata fetchTopMetadataFromAProvider(MetadataProvider provider, Book book) {
return getParser(provider).fetchTopMetadata(book, buildFetchMetadataRequestFromBook(book));
@@ -444,3 +515,4 @@ public class MetadataRefreshService {
};
}
}

View File

@@ -3,7 +3,7 @@ import {RxStompService} from './shared/websocket/rx-stomp.service';
import {Message} from '@stomp/stompjs';
import {BookService} from './book/service/book.service';
import {NotificationEventService} from './shared/websocket/notification-event.service';
import {parseLogNotification} from './shared/websocket/model/log-notification.model';
import {parseLogNotification, parseTaskMessage, TaskMessage} from './shared/websocket/model/log-notification.model';
import {ConfirmDialog} from 'primeng/confirmdialog';
import {Toast} from 'primeng/toast';
import {RouterOutlet} from '@angular/router';
@@ -12,6 +12,7 @@ import {AppConfigService} from './core/service/app-config.service';
import {MetadataBatchProgressNotification} from './core/model/metadata-batch-progress.model';
import {MetadataProgressService} from './core/service/metadata-progress-service';
import {BookdropFileService, BookdropFileNotification} from './bookdrop/bookdrop-file.service';
import {TaskEventService} from './shared/websocket/task-event.service';
@Component({
selector: 'app-root',
@@ -29,6 +30,7 @@ export class AppComponent implements OnInit {
private notificationEventService = inject(NotificationEventService);
private metadataProgressService = inject(MetadataProgressService);
private bookdropFileService = inject(BookdropFileService);
private taskEventService = inject(TaskEventService);
private appConfigService = inject(AppConfigService);
ngOnInit(): void {
@@ -64,6 +66,11 @@ export class AppComponent implements OnInit {
this.notificationEventService.handleNewNotification(logNotification);
});
this.rxStompService.watch('/topic/task').subscribe((message: Message) => {
const taskMessage: TaskMessage = parseTaskMessage(message.body);
this.taskEventService.handleTaskMessage(taskMessage);
});
this.rxStompService.watch('/topic/bookdrop-file').subscribe((message: Message) => {
const notification = JSON.parse(message.body) as BookdropFileNotification;
this.bookdropFileService.handleIncomingFile(notification);

View File

@@ -1,23 +1,24 @@
<div class="staging-border p-4 mt-6 rounded">
<div class="staging-border p-4 rounded">
<div class="flex justify-between items-center">
<div class="flex flex-col justify-center">
<p class="text-sm font-medium text-gray-200">Pending Bookdrop Files</p>
<p class="text font-bold text-zinc-300">Pending Bookdrop Files</p>
<p class="text-lg font-bold text-primary">{{ pendingCount }}</p>
@if (lastUpdatedAt) {
<p class="text-xs text-gray-400 mt-1">
<p class="text-xs text-zinc-400 mt-1">
Last updated: {{ lastUpdatedAt | date: 'short' }}
</p>
}
</div>
<div class="flex items-center">
<button
type="button"
class="p-button p-button-sm p-button-outlined p-button-info"
<p-button
size="small"
outlined
severity="info"
(click)="openReviewDialog()">
Review
</button>
</p-button>
</div>
</div>
</div>

View File

@@ -4,6 +4,7 @@ import {takeUntil} from 'rxjs/operators';
import {BookdropFileNotification, BookdropFileService} from '../bookdrop-file.service';
import {DatePipe} from '@angular/common';
import {Router} from '@angular/router';
import {Button} from 'primeng/button';
@Component({
selector: 'app-bookdrop-files-widget-component',
@@ -11,7 +12,8 @@ import {Router} from '@angular/router';
templateUrl: './bookdrop-files-widget.component.html',
styleUrl: './bookdrop-files-widget.component.scss',
imports: [
DatePipe
DatePipe,
Button
]
})
export class BookdropFilesWidgetComponent implements OnInit, OnDestroy {

View File

@@ -1,4 +1,4 @@
<div class="flex flex-col px-4 py-6 live-border">
<p class="text-sm text-gray-400">{{ latestNotification.timestamp }}</p>
<p>{{ latestNotification.message }}</p>
<div class="flex flex-col p-4 space-y-2 live-border">
<p class="text-xs text-zinc-400">{{ latestNotification.timestamp }}</p>
<p class="font-normal text-zinc-200">{{ latestNotification.message }}</p>
</div>

View File

@@ -1,6 +1,6 @@
import {Component, inject} from '@angular/core';
import {LogNotification} from '../../../shared/websocket/model/log-notification.model';
import {NotificationEventService} from '../../../shared/websocket/notification-event.service';
import {LogNotification} from '../../../shared/websocket/model/log-notification.model';
@Component({
selector: 'app-live-notification-box',

View File

@@ -0,0 +1,43 @@
<div class="flex flex-col p-4 live-border">
@if (tasks$ | async; as tasks) {
@if (tasks.length === 0) {
<div class="text-center text-zinc-400">
<p class="text-sm">No active tasks</p>
</div>
} @else {
@for (task of tasks; track task.taskId; let i = $index) {
@if (i > 0) {
<div class="border-t border-zinc-700 my-4"></div>
}
<div class="flex flex-col space-y-1.5">
<p class="text-xs text-zinc-400">{{ task.timestamp }}</p>
<div class="flex flex-col space-y-0.5">
<div class="flex justify-between items-center">
<div class="flex items-center space-x-1.5">
<span class="px-1.5 py-0.5 text-xs rounded-xl font-bold" [class]="getStatusClasses(task.status)">
{{ getStatusText(task.status) }}
</span>
@if (task.title) {
<h4 class="font-bold text-zinc-200">Task: {{ task.title }}</h4>
}
</div>
<div class="flex items-center space-x-1">
@if (task.status === TaskStatus.IN_PROGRESS && task.cancellable) {
<p-button
(onClick)="cancelTask(task.taskId)" icon="pi pi-ban" severity="danger" size="small" rounded text>
</p-button>
}
<p-button
(onClick)="removeTask(task.taskId)" icon="pi pi-times" severity="secondary" size="small" rounded text>
</p-button>
</div>
</div>
<p class="font-normal text-zinc-300">{{ task.message }}</p>
</div>
</div>
}
}
}
</div>

View File

@@ -0,0 +1,6 @@
.live-border {
background: var(--card-background);
border: 1px solid var(--primary-color);
border-radius: 0.5rem;
}

View File

@@ -0,0 +1,84 @@
import {Component, inject} from '@angular/core';
import {CommonModule} from '@angular/common';
import {TaskMessage, TaskStatus} from '../../../shared/websocket/model/log-notification.model';
import {TaskEventService} from '../../../shared/websocket/task-event.service';
import {TaskService} from '../../../shared/services/task.service';
import {Observable, take} from 'rxjs';
import {Button} from 'primeng/button';
import {MessageService} from 'primeng/api';
@Component({
selector: 'app-live-task-event-box',
standalone: true,
imports: [CommonModule, Button],
templateUrl: './live-task-event-box.component.html',
styleUrls: ['./live-task-event-box.component.scss'],
host: {
class: 'config-panel'
}
})
export class LiveTaskEventBoxComponent {
tasks$: Observable<TaskMessage[]>;
TaskStatus = TaskStatus;
private taskEventService = inject(TaskEventService);
private taskService = inject(TaskService);
private messageService = inject(MessageService);
constructor() {
this.tasks$ = this.taskEventService.tasks$;
}
cancelTask(taskId: string): void {
this.taskService.cancelTask(taskId).pipe(take(1)).subscribe({
next: (response) => {
this.messageService.add({
severity: 'success',
summary: 'Success',
detail: `Task cancellation scheduled`
});
},
error: (error) => {
this.messageService.add({
severity: 'error',
summary: 'Cancellation failed',
detail: error.error?.error || 'Failed to cancel task'
});
}
});
}
removeTask(taskId: string): void {
this.taskEventService.removeTask(taskId);
}
getStatusClasses(status: TaskStatus): string {
switch (status) {
case TaskStatus.IN_PROGRESS:
return 'bg-blue-700 text-zinc-100';
case TaskStatus.COMPLETED:
return 'bg-green-700 text-zinc-100';
case TaskStatus.FAILED:
return 'bg-red-700 text-zinc-100';
case TaskStatus.CANCELLED:
return 'bg-gray-700 text-zinc-100';
default:
return 'bg-zinc-700 text-zinc-100';
}
}
getStatusText(status: TaskStatus): string {
switch (status) {
case TaskStatus.IN_PROGRESS:
return 'Running';
case TaskStatus.COMPLETED:
return 'Completed';
case TaskStatus.FAILED:
return 'Failed';
case TaskStatus.CANCELLED:
return 'Cancelled';
default:
return 'Unknown';
}
}
}

View File

@@ -1,5 +1,8 @@
<div class="metadata-progress-box flex flex-col w-[25rem] max-h-[60vh] overflow-y-auto">
<div class="metadata-progress-box flex gap-4 flex-col w-[25rem] max-h-[60vh] overflow-y-auto">
<app-live-notification-box/>
@if (hasActiveTasks$ | async) {
<app-live-task-event-box/>
}
@if (hasMetadataTasks$ | async) {
<app-metadata-progress-widget/>
}

View File

@@ -1,7 +1,9 @@
import {Component, inject} from '@angular/core';
import {LiveNotificationBoxComponent} from '../live-notification-box/live-notification-box.component';
import {LiveTaskEventBoxComponent} from '../live-task-event-box/live-task-event-box.component';
import {MetadataProgressWidgetComponent} from '../metadata-progress-widget-component/metadata-progress-widget-component';
import {MetadataProgressService} from '../../service/metadata-progress-service';
import {TaskEventService} from '../../../shared/websocket/task-event.service';
import {map} from 'rxjs/operators';
import {AsyncPipe} from '@angular/common';
import {BookdropFilesWidgetComponent} from '../../../bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component';
@@ -11,6 +13,7 @@ import {BookdropFileService} from '../../../bookdrop/bookdrop-file.service';
selector: 'app-unified-notification-popover-component',
imports: [
LiveNotificationBoxComponent,
LiveTaskEventBoxComponent,
MetadataProgressWidgetComponent,
AsyncPipe,
BookdropFilesWidgetComponent
@@ -22,10 +25,15 @@ import {BookdropFileService} from '../../../bookdrop/bookdrop-file.service';
export class UnifiedNotificationBoxComponent {
metadataProgressService = inject(MetadataProgressService);
bookdropFileService = inject(BookdropFileService);
taskEventService = inject(TaskEventService);
hasMetadataTasks$ = this.metadataProgressService.activeTasks$.pipe(
map(tasks => Object.keys(tasks).length > 0)
);
hasPendingBookdropFiles$ = this.bookdropFileService.hasPendingFiles$;
hasActiveTasks$ = this.taskEventService.tasks$.pipe(
map(tasks => tasks.length > 0)
);
}

View File

@@ -23,6 +23,7 @@ import {MetadataBatchProgressNotification} from '../../../core/model/metadata-ba
import {UnifiedNotificationBoxComponent} from '../../../core/component/unified-notification-popover-component/unified-notification-popover-component';
import {BookdropFileService} from '../../../bookdrop/bookdrop-file.service';
import {DialogLauncherService} from '../../../dialog-launcher.service';
import {TaskEventService} from '../../../shared/websocket/task-event.service';
@Component({
selector: 'app-topbar',
@@ -77,10 +78,12 @@ export class AppTopBarComponent implements OnDestroy {
protected userService: UserService,
private metadataProgressService: MetadataProgressService,
private bookdropFileService: BookdropFileService,
private dialogLauncher: DialogLauncherService
private dialogLauncher: DialogLauncherService,
private taskEventService: TaskEventService
) {
this.subscribeToMetadataProgress();
this.subscribeToNotifications();
this.subscribeToTaskEvents();
this.metadataProgressService.activeTasks$
.pipe(takeUntil(this.destroy$))
@@ -153,6 +156,16 @@ export class AppTopBarComponent implements OnDestroy {
});
}
private subscribeToTaskEvents() {
this.taskEventService.tasks$
.pipe(takeUntil(this.destroy$))
.subscribe((tasks) => {
if (tasks.length > 0) {
this.triggerPulseEffect();
}
});
}
private triggerPulseEffect() {
this.showPulse = true;
clearTimeout(this.eventTimer);

View File

@@ -0,0 +1,17 @@
import {Injectable, inject} from '@angular/core';
import {HttpClient} from '@angular/common/http';
import {Observable} from 'rxjs';
import {API_CONFIG} from '../../config/api-config';
@Injectable({
providedIn: 'root'
})
export class TaskService {
private http = inject(HttpClient);
private readonly url = `${API_CONFIG.BASE_URL}/api/v1/tasks`;
cancelTask(taskId: string): Observable<{ message: string }> {
return this.http.delete<{ message: string }>(`${this.url}/${taskId}`);
}
}

View File

@@ -1,3 +1,10 @@
export enum TaskStatus {
IN_PROGRESS = 'IN_PROGRESS',
CANCELLED = 'CANCELLED',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED'
}
export interface LogNotification {
timestamp?: string;
message: string;
@@ -5,10 +12,25 @@ export interface LogNotification {
export function parseLogNotification(messageBody: string): LogNotification {
const raw = JSON.parse(messageBody);
const localTime = new Date(raw.timestamp).toLocaleTimeString();
return {
timestamp: localTime,
timestamp: raw.timestamp ? new Date(raw.timestamp).toLocaleTimeString() : undefined,
message: raw.message,
};
}
export interface TaskMessage {
taskId: string;
timestamp: string;
title?: string;
message: string;
cancellable: boolean;
status: TaskStatus;
}
export function parseTaskMessage(messageBody: string): TaskMessage {
const raw = JSON.parse(messageBody) as TaskMessage;
return {
...raw,
timestamp: new Date(raw.timestamp).toLocaleTimeString()
};
}

View File

@@ -0,0 +1,42 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { TaskMessage } from './model/log-notification.model';
@Injectable({
providedIn: 'root'
})
export class TaskEventService {
private tasksSubject = new BehaviorSubject<Map<string, TaskMessage>>(new Map());
tasks$: Observable<TaskMessage[]> = this.tasksSubject.asObservable().pipe(
map(taskMap => Array.from(taskMap.values()))
);
handleTaskMessage(taskMessage: TaskMessage): void {
const currentTasks = new Map(this.tasksSubject.value);
currentTasks.set(taskMessage.taskId, taskMessage);
this.tasksSubject.next(currentTasks);
}
removeTask(taskId: string): void {
const currentTasks = new Map(this.tasksSubject.value);
currentTasks.delete(taskId);
this.tasksSubject.next(currentTasks);
}
getTask(taskId: string): TaskMessage | undefined {
return this.tasksSubject.value.get(taskId);
}
getTaskById$(taskId: string): Observable<TaskMessage | undefined> {
return this.tasksSubject.asObservable().pipe(
map(taskMap => taskMap.get(taskId))
);
}
clearAllTasks(): void {
this.tasksSubject.next(new Map());
}
}