diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/controller/TaskController.java b/booklore-api/src/main/java/com/adityachandel/booklore/controller/TaskController.java new file mode 100644 index 000000000..ccd21ebaa --- /dev/null +++ b/booklore-api/src/main/java/com/adityachandel/booklore/controller/TaskController.java @@ -0,0 +1,30 @@ +package com.adityachandel.booklore.controller; + +import com.adityachandel.booklore.quartz.JobSchedulerService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +@Slf4j +@RestController +@RequestMapping("/api/v1/tasks") +@RequiredArgsConstructor +public class TaskController { + + private final JobSchedulerService jobSchedulerService; + + @DeleteMapping("/{taskId}") + public ResponseEntity cancelTask(@PathVariable String taskId) { + log.info("Received request to cancel task: {}", taskId); + boolean cancelled = jobSchedulerService.cancelJob(taskId); + if (cancelled) { + return ResponseEntity.ok(Map.of("message", "Task cancellation scheduled")); + } else { + return ResponseEntity.badRequest().body(Map.of("error", "Failed to cancel task or task not found")); + } + } +} + diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/exception/GlobalExceptionHandler.java b/booklore-api/src/main/java/com/adityachandel/booklore/exception/GlobalExceptionHandler.java index 850211ca2..0a8debfb8 100644 --- a/booklore-api/src/main/java/com/adityachandel/booklore/exception/GlobalExceptionHandler.java +++ b/booklore-api/src/main/java/com/adityachandel/booklore/exception/GlobalExceptionHandler.java @@ -85,6 +85,14 @@ public class GlobalExceptionHandler { return new ResponseEntity<>(errorResponse, HttpStatus.OK); } + @ExceptionHandler(InterruptedException.class) + public ResponseEntity handleInterruptedException(InterruptedException ex) { + log.info("Request was interrupted: {}", ex.getMessage() != null ? ex.getMessage() : "Thread interrupted"); + Thread.currentThread().interrupt(); + ErrorResponse errorResponse = new ErrorResponse(HttpStatus.OK.value(), "Request was cancelled."); + return new ResponseEntity<>(errorResponse, HttpStatus.OK); + } + @ExceptionHandler(Exception.class) public ResponseEntity handleGenericException(Exception ex) { ErrorResponse errorResponse = new ErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), "An unexpected error occurred."); diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/model/dto/TaskMessage.java b/booklore-api/src/main/java/com/adityachandel/booklore/model/dto/TaskMessage.java new file mode 100644 index 000000000..a9a7c665f --- /dev/null +++ b/booklore-api/src/main/java/com/adityachandel/booklore/model/dto/TaskMessage.java @@ -0,0 +1,36 @@ +package com.adityachandel.booklore.model.dto; + +import com.adityachandel.booklore.model.enums.EventTaskType; +import com.adityachandel.booklore.model.enums.TaskStatus; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskMessage { + private String taskId; + + @Builder.Default + private Instant timestamp = Instant.now(); + + private String message; + private EventTaskType taskType; + private TaskStatus status; + + @JsonProperty + public String getTitle() { + return taskType != null ? taskType.getTitle() : null; + } + + @JsonProperty + public boolean isCancellable() { + return taskType != null && taskType.isCancellable(); + } +} \ No newline at end of file diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/EventTaskType.java b/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/EventTaskType.java new file mode 100644 index 000000000..f3a942bf8 --- /dev/null +++ b/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/EventTaskType.java @@ -0,0 +1,13 @@ +package com.adityachandel.booklore.model.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum EventTaskType { + METADATA_REFRESH("Metadata Refresh", true); + + private final String title; + private final boolean cancellable; +} \ No newline at end of file diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/TaskStatus.java b/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/TaskStatus.java new file mode 100644 index 000000000..c2d9243dd --- /dev/null +++ b/booklore-api/src/main/java/com/adityachandel/booklore/model/enums/TaskStatus.java @@ -0,0 +1,9 @@ +package com.adityachandel.booklore.model.enums; + +public enum TaskStatus { + IN_PROGRESS, + CANCELLED, + COMPLETED, + FAILED +} + diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/model/websocket/Topic.java b/booklore-api/src/main/java/com/adityachandel/booklore/model/websocket/Topic.java index 3ef47a9a9..24228b624 100644 --- a/booklore-api/src/main/java/com/adityachandel/booklore/model/websocket/Topic.java +++ b/booklore-api/src/main/java/com/adityachandel/booklore/model/websocket/Topic.java @@ -10,7 +10,7 @@ public enum Topic { BOOK_METADATA_BATCH_UPDATE("/topic/book-metadata-batch-update"), BOOK_METADATA_BATCH_PROGRESS("/topic/book-metadata-batch-progress"), BOOKDROP_FILE("/topic/bookdrop-file"), - + TASK("/topic/task"), LOG("/topic/log"); private final String path; diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/quartz/JobSchedulerService.java b/booklore-api/src/main/java/com/adityachandel/booklore/quartz/JobSchedulerService.java index 1ecd8762f..74fc64b55 100644 --- a/booklore-api/src/main/java/com/adityachandel/booklore/quartz/JobSchedulerService.java +++ b/booklore-api/src/main/java/com/adityachandel/booklore/quartz/JobSchedulerService.java @@ -9,7 +9,9 @@ import org.quartz.*; import org.quartz.impl.matchers.KeyMatcher; import org.springframework.stereotype.Service; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @Slf4j @@ -20,6 +22,7 @@ public class JobSchedulerService { private final Scheduler scheduler; private final BlockingQueue jobQueue = new LinkedBlockingQueue<>(); + private final Map runningJobs = new ConcurrentHashMap<>(); private boolean isJobRunning = false; public synchronized void scheduleMetadataRefresh(MetadataRefreshRequest request, Long userId) { @@ -47,7 +50,6 @@ public class JobSchedulerService { log.info("Processing job from queue. Remaining queue size: {}", jobQueue.size()); String jobId = generateUniqueJobId(request); try { - log.info("Scheduling job with ID: {}", jobId); scheduleJob(request, userId, jobId); } catch (Exception e) { isJobRunning = false; @@ -57,11 +59,39 @@ public class JobSchedulerService { } } + public synchronized boolean cancelJob(String jobId) { + try { + String quartzJobId = runningJobs.get(jobId); + if (quartzJobId == null) { + log.warn("Job with ID {} not found in running jobs", jobId); + return false; + } + + JobKey jobKey = new JobKey(quartzJobId, "metadataRefreshJobGroup"); + boolean cancelled = scheduler.interrupt(jobKey); + + if (cancelled) { + runningJobs.remove(jobId); + isJobRunning = false; + log.info("Job {} cancellation scheduled", jobId); + processQueue(); + } else { + log.warn("Failed to cancel job {}", jobId); + } + + return cancelled; + } catch (UnableToInterruptJobException e) { + log.error("Job {} cannot be interrupted: {}", jobId, e.getMessage()); + return false; + } + } + private void scheduleJob(MetadataRefreshRequest request, Long userId, String jobId) { try { JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("request", request); jobDataMap.put("userId", userId); + jobDataMap.put("jobId", jobId); JobDetail jobDetail = JobBuilder.newJob(RefreshMetadataJob.class) .withIdentity(jobId, "metadataRefreshJobGroup") @@ -74,6 +104,8 @@ public class JobSchedulerService { .startNow() .build(); + runningJobs.put(jobId, jobId); + scheduler.scheduleJob(jobDetail, trigger); log.info("Job scheduled successfully with ID: {}", jobId); @@ -99,6 +131,7 @@ public class JobSchedulerService { if (jobException != null) { log.error("Job execution encountered an error: {}", jobException.getMessage(), jobException); } + runningJobs.remove(jobId); isJobRunning = false; log.debug("Job completion handled. Processing next job in the queue."); processQueue(); diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/quartz/RefreshMetadataJob.java b/booklore-api/src/main/java/com/adityachandel/booklore/quartz/RefreshMetadataJob.java index 0a89b2b98..af060613c 100644 --- a/booklore-api/src/main/java/com/adityachandel/booklore/quartz/RefreshMetadataJob.java +++ b/booklore-api/src/main/java/com/adityachandel/booklore/quartz/RefreshMetadataJob.java @@ -1,32 +1,57 @@ package com.adityachandel.booklore.quartz; import com.adityachandel.booklore.model.dto.request.MetadataRefreshRequest; -import com.adityachandel.booklore.service.metadata.BookMetadataService; import com.adityachandel.booklore.service.metadata.MetadataRefreshService; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; +import org.quartz.*; import org.springframework.stereotype.Component; @Slf4j @Component -@AllArgsConstructor +@RequiredArgsConstructor @DisallowConcurrentExecution -public class RefreshMetadataJob implements Job { +public class RefreshMetadataJob implements InterruptableJob { - private MetadataRefreshService metadataRefreshService; + private final MetadataRefreshService metadataRefreshService; + private volatile Thread executionThread; @Override public void execute(JobExecutionContext context) throws JobExecutionException { + executionThread = Thread.currentThread(); try { MetadataRefreshRequest request = (MetadataRefreshRequest) context.getMergedJobDataMap().get("request"); Long userId = (Long) context.getMergedJobDataMap().get("userId"); - metadataRefreshService.refreshMetadata(request, userId); - } catch (Exception e) { + String jobId = (String) context.getMergedJobDataMap().get("jobId"); + log.info("Starting metadata refresh job with ID: {}", jobId); + metadataRefreshService.refreshMetadata(request, userId, jobId); + log.info("Completed metadata refresh job with ID: {}", jobId); + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) { + log.info("Metadata refresh job with ID was interrupted: {}", context.getMergedJobDataMap().get("jobId")); + Thread.currentThread().interrupt(); + return; + } throw new JobExecutionException("Error occurred while executing metadata refresh job", e); + } catch (Exception e) { + if (Thread.currentThread().isInterrupted()) { + log.info("Metadata refresh job was interrupted"); + Thread.currentThread().interrupt(); + return; + } + throw new JobExecutionException("Error occurred while executing metadata refresh job", e); + } finally { + executionThread = null; + } + } + + @Override + public void interrupt() throws UnableToInterruptJobException { + Thread thread = executionThread; + if (thread != null) { + thread.interrupt(); + } else { + log.warn("No execution thread found to interrupt"); } } } diff --git a/booklore-api/src/main/java/com/adityachandel/booklore/service/metadata/MetadataRefreshService.java b/booklore-api/src/main/java/com/adityachandel/booklore/service/metadata/MetadataRefreshService.java index 7c502c7aa..c734aa867 100644 --- a/booklore-api/src/main/java/com/adityachandel/booklore/service/metadata/MetadataRefreshService.java +++ b/booklore-api/src/main/java/com/adityachandel/booklore/service/metadata/MetadataRefreshService.java @@ -5,6 +5,7 @@ import com.adityachandel.booklore.mapper.BookMapper; import com.adityachandel.booklore.model.MetadataUpdateWrapper; import com.adityachandel.booklore.model.dto.Book; import com.adityachandel.booklore.model.dto.BookMetadata; +import com.adityachandel.booklore.model.dto.TaskMessage; import com.adityachandel.booklore.model.dto.MetadataBatchProgressNotification; import com.adityachandel.booklore.model.dto.request.FetchMetadataRequest; import com.adityachandel.booklore.model.dto.request.MetadataRefreshOptions; @@ -17,12 +18,13 @@ import com.adityachandel.booklore.model.entity.MetadataFetchProposalEntity; import com.adityachandel.booklore.model.enums.FetchedMetadataProposalStatus; import com.adityachandel.booklore.model.enums.MetadataFetchTaskStatus; import com.adityachandel.booklore.model.enums.MetadataProvider; +import com.adityachandel.booklore.model.enums.EventTaskType; +import com.adityachandel.booklore.model.enums.TaskStatus; import com.adityachandel.booklore.model.websocket.Topic; import com.adityachandel.booklore.repository.BookRepository; import com.adityachandel.booklore.repository.LibraryRepository; import com.adityachandel.booklore.repository.MetadataFetchJobRepository; import com.adityachandel.booklore.repository.MetadataFetchProposalRepository; -import com.adityachandel.booklore.service.BookQueryService; import com.adityachandel.booklore.service.NotificationService; import com.adityachandel.booklore.service.appsettings.AppSettingService; import com.adityachandel.booklore.service.metadata.parser.BookParser; @@ -41,7 +43,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import static com.adityachandel.booklore.model.enums.MetadataProvider.*; -import static com.adityachandel.booklore.model.websocket.LogNotification.createLogNotification; @Slf4j @AllArgsConstructor @@ -61,85 +62,187 @@ public class MetadataRefreshService { private final PlatformTransactionManager transactionManager; - public void refreshMetadata(MetadataRefreshRequest request, Long userId) { - log.info("Refresh Metadata task started!"); - - if (Boolean.TRUE.equals(request.getQuick())) { - AppSettings appSettings = appSettingService.getAppSettings(); - request.setRefreshOptions(appSettings.getMetadataRefreshOptions()); - } - - List providers = prepareProviders(request); - Set bookIds = getBookEntities(request); - - boolean isReviewMode = Boolean.TRUE.equals(request.getRefreshOptions().getReviewBeforeApply()); - MetadataFetchJobEntity task; - String taskId; - - if (isReviewMode) { - taskId = UUID.randomUUID().toString(); - task = MetadataFetchJobEntity.builder() - .taskId(taskId) - .userId(userId) - .status(MetadataFetchTaskStatus.IN_PROGRESS) - .startedAt(Instant.now()) - .totalBooksCount(bookIds.size()) - .completedBooks(0) - .build(); - metadataFetchJobRepository.save(task); - } else { - taskId = null; - task = null; - } - - TransactionTemplate txTemplate = new TransactionTemplate(transactionManager); - int completedCount = 0; + public void refreshMetadata(MetadataRefreshRequest request, Long userId, String jobId) { try { + if (Boolean.TRUE.equals(request.getQuick())) { + AppSettings appSettings = appSettingService.getAppSettings(); + request.setRefreshOptions(appSettings.getMetadataRefreshOptions()); + } + + List providers = prepareProviders(request); + Set bookIds = getBookEntities(request); + + boolean isReviewMode = Boolean.TRUE.equals(request.getRefreshOptions().getReviewBeforeApply()); + MetadataFetchJobEntity task; + + if (isReviewMode) { + task = MetadataFetchJobEntity.builder() + .taskId(jobId) + .userId(userId) + .status(MetadataFetchTaskStatus.IN_PROGRESS) + .startedAt(Instant.now()) + .totalBooksCount(bookIds.size()) + .completedBooks(0) + .build(); + metadataFetchJobRepository.save(task); + } else { + task = null; + } + + TransactionTemplate txTemplate = new TransactionTemplate(transactionManager); + int completedCount = 0; + for (Long bookId : bookIds) { + checkForInterruption(jobId, task, bookIds.size()); int finalCompletedCount = completedCount; txTemplate.execute(status -> { BookEntity book = bookRepository.findAllWithMetadataByIds(Collections.singleton(bookId)) .stream().findFirst() .orElseThrow(() -> ApiError.BOOK_NOT_FOUND.createException(bookId)); try { + checkForInterruption(jobId, task, bookIds.size()); if (book.getMetadata().areAllFieldsLocked()) { log.info("Skipping locked book: {}", book.getFileName()); - notificationService.sendMessage(Topic.LOG, - createLogNotification("Book '" + book.getMetadata().getTitle() + "' is locked.")); + sendTaskNotification(jobId, "Skipped locked book: " + book.getMetadata().getTitle(), TaskStatus.IN_PROGRESS); return null; } - reportProgressIfNeeded(task, taskId, finalCompletedCount, bookIds.size(), book); + reportProgressIfNeeded(task, jobId, finalCompletedCount, bookIds.size(), book); Map metadataMap = fetchMetadataForBook(providers, book); - if (providers.contains(GoodReads)) Thread.sleep(ThreadLocalRandom.current().nextLong(500, 1500)); + if (providers.contains(GoodReads)) { + try { + Thread.sleep(ThreadLocalRandom.current().nextLong(500, 1500)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + status.setRollbackOnly(); + return null; + } + } BookMetadata fetched = buildFetchMetadata(book.getId(), request, metadataMap); - if (isReviewMode) saveProposal(taskId, book.getId(), fetched); - else updateBookMetadata(book, fetched, request.getRefreshOptions().isRefreshCovers(), - request.getRefreshOptions().isMergeCategories()); + if (isReviewMode) { + saveProposal(task.getTaskId(), book.getId(), fetched); + } else { + updateBookMetadata(book, fetched, request.getRefreshOptions().isRefreshCovers(), request.getRefreshOptions().isMergeCategories()); + sendTaskProgressNotification(jobId, finalCompletedCount + 1, bookIds.size(), "Metadata updated: " + book.getMetadata().getTitle()); + } } catch (Exception e) { + if (Thread.currentThread().isInterrupted()) { + log.info("Processing interrupted for book: {}", book.getFileName()); + status.setRollbackOnly(); + return null; + } log.error("Metadata update failed for book: {}", book.getFileName(), e); + sendTaskNotification(jobId, String.format("Failed to process: %s - %s", book.getMetadata().getTitle(), e.getMessage()), TaskStatus.FAILED); } bookRepository.saveAndFlush(book); return null; }); completedCount++; } + if (isReviewMode) completeTask(task, completedCount, bookIds.size()); + log.info("Metadata refresh task {} completed successfully", jobId); + + sendTaskNotification(jobId, String.format("Metadata refresh completed successfully - processed %d books", completedCount), TaskStatus.COMPLETED); + + } catch (RuntimeException e) { + if (e.getCause() instanceof InterruptedException) { + log.info("Metadata refresh task {} cancelled successfully", jobId); + return; + } + log.error("Fatal error during metadata refresh", e); + sendTaskNotification(jobId, "Fatal error during metadata refresh: " + e.getMessage(), TaskStatus.FAILED); + throw e; } catch (Exception fatal) { log.error("Fatal error during metadata refresh", fatal); - if (isReviewMode) failTask(task, bookIds.size(), fatal.getMessage()); + sendTaskNotification(jobId, "Fatal error during metadata refresh: " + fatal.getMessage(), TaskStatus.FAILED); throw fatal; } - log.info("Metadata refresh task completed!"); + } + + public Map fetchMetadataForBook(List providers, Book book) { + return providers.stream() + .map(provider -> createInterruptibleMetadataFuture(() -> fetchTopMetadataFromAProvider(provider, book))) + .map(this::joinFutureSafely) + .filter(Objects::nonNull) + .collect(Collectors.toMap( + BookMetadata::getProvider, + metadata -> metadata, + (existing, replacement) -> existing + )); + } + + protected Map fetchMetadataForBook(List providers, BookEntity bookEntity) { + Book book = bookMapper.toBook(bookEntity); + return providers.stream() + .map(provider -> createInterruptibleMetadataFuture(() -> fetchTopMetadataFromAProvider(provider, book))) + .map(this::joinFutureSafely) + .filter(Objects::nonNull) + .collect(Collectors.toMap( + BookMetadata::getProvider, + metadata -> metadata, + (existing, replacement) -> existing + )); + } + + private CompletableFuture createInterruptibleMetadataFuture(java.util.function.Supplier metadataSupplier) { + return CompletableFuture.supplyAsync(() -> { + if (Thread.currentThread().isInterrupted()) { + log.info("Skipping metadata fetch due to interruption"); + return null; + } + return metadataSupplier.get(); + }).exceptionally(e -> { + if (e.getCause() instanceof InterruptedException) { + log.info("Metadata fetch was interrupted"); + Thread.currentThread().interrupt(); + return null; + } + log.error("Error fetching metadata from provider", e); + return null; + }); + } + + private BookMetadata joinFutureSafely(CompletableFuture future) { + try { + return future.join(); + } catch (Exception e) { + if (Thread.currentThread().isInterrupted()) { + log.info("Future join interrupted"); + return null; + } + throw e; + } + } + + private void checkForInterruption(String jobId, MetadataFetchJobEntity task, int totalBooks) { + if (Thread.currentThread().isInterrupted()) { + log.info("Metadata refresh task {} cancelled by user request", jobId); + sendTaskNotification(jobId, "Task cancelled by user", TaskStatus.CANCELLED); + if (task != null) { + failTask(task, totalBooks); + } + throw new RuntimeException(new InterruptedException("Task was cancelled")); + } + } + + private void sendTaskNotification(String taskId, String message, TaskStatus status) { + notificationService.sendMessage(Topic.TASK, TaskMessage.builder() + .taskId(taskId) + .taskType(EventTaskType.METADATA_REFRESH) + .message(message) + .status(status) + .build()); + } + + private void sendTaskProgressNotification(String taskId, int current, int total, String message) { + sendTaskNotification(taskId, String.format("(%d/%d) %s", current, total, message), TaskStatus.IN_PROGRESS); } private void reportProgressIfNeeded(MetadataFetchJobEntity task, String taskId, int completedCount, int total, BookEntity book) { if (task == null) return; - task.setCompletedBooks(completedCount); metadataFetchJobRepository.save(task); - String message = String.format("Processing '%s'", book.getMetadata().getTitle()); - notificationService.sendMessage(Topic.BOOK_METADATA_BATCH_PROGRESS, new MetadataBatchProgressNotification( taskId, completedCount, total, message, MetadataFetchTaskStatus.IN_PROGRESS.name() @@ -159,14 +262,14 @@ public class MetadataRefreshService { )); } - private void failTask(MetadataFetchJobEntity task, int total, String errorMessage) { + private void failTask(MetadataFetchJobEntity task, int total) { task.setStatus(MetadataFetchTaskStatus.ERROR); task.setCompletedAt(Instant.now()); metadataFetchJobRepository.save(task); notificationService.sendMessage(Topic.BOOK_METADATA_BATCH_PROGRESS, new MetadataBatchProgressNotification( - task.getTaskId(), 0, total, "Error: " + errorMessage, + task.getTaskId(), 0, total, "Error: " + "Task was cancelled", MetadataFetchTaskStatus.ERROR.name() )); } @@ -196,7 +299,6 @@ public class MetadataRefreshService { Book book = bookMapper.toBook(bookEntity); notificationService.sendMessage(Topic.BOOK_METADATA_UPDATE, book); - notificationService.sendMessage(Topic.LOG, createLogNotification("Book metadata updated: " + book.getMetadata().getTitle())); } } @@ -229,37 +331,6 @@ public class MetadataRefreshService { } } - public Map fetchMetadataForBook(List providers, Book book) { - return providers.stream() - .map(provider -> CompletableFuture.supplyAsync(() -> fetchTopMetadataFromAProvider(provider, book)) - .exceptionally(e -> { - log.error("Error fetching metadata from provider: {}", provider, e); - return null; - })) - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toMap( - BookMetadata::getProvider, - metadata -> metadata, - (existing, replacement) -> existing - )); - } - - protected Map fetchMetadataForBook(List providers, BookEntity bookEntity) { - return providers.stream() - .map(provider -> CompletableFuture.supplyAsync(() -> fetchTopMetadataFromAProvider(provider, bookMapper.toBook(bookEntity))) - .exceptionally(e -> { - log.error("Error fetching metadata from provider: {}", provider, e); - return null; - })) - .map(CompletableFuture::join) - .filter(Objects::nonNull) - .collect(Collectors.toMap( - BookMetadata::getProvider, - metadata -> metadata, - (existing, replacement) -> existing - )); - } public BookMetadata fetchTopMetadataFromAProvider(MetadataProvider provider, Book book) { return getParser(provider).fetchTopMetadata(book, buildFetchMetadataRequestFromBook(book)); @@ -444,3 +515,4 @@ public class MetadataRefreshService { }; } } + diff --git a/booklore-ui/src/app/app.component.ts b/booklore-ui/src/app/app.component.ts index 2d0c9d76c..f5a5d2e94 100644 --- a/booklore-ui/src/app/app.component.ts +++ b/booklore-ui/src/app/app.component.ts @@ -3,7 +3,7 @@ import {RxStompService} from './shared/websocket/rx-stomp.service'; import {Message} from '@stomp/stompjs'; import {BookService} from './book/service/book.service'; import {NotificationEventService} from './shared/websocket/notification-event.service'; -import {parseLogNotification} from './shared/websocket/model/log-notification.model'; +import {parseLogNotification, parseTaskMessage, TaskMessage} from './shared/websocket/model/log-notification.model'; import {ConfirmDialog} from 'primeng/confirmdialog'; import {Toast} from 'primeng/toast'; import {RouterOutlet} from '@angular/router'; @@ -12,6 +12,7 @@ import {AppConfigService} from './core/service/app-config.service'; import {MetadataBatchProgressNotification} from './core/model/metadata-batch-progress.model'; import {MetadataProgressService} from './core/service/metadata-progress-service'; import {BookdropFileService, BookdropFileNotification} from './bookdrop/bookdrop-file.service'; +import {TaskEventService} from './shared/websocket/task-event.service'; @Component({ selector: 'app-root', @@ -29,6 +30,7 @@ export class AppComponent implements OnInit { private notificationEventService = inject(NotificationEventService); private metadataProgressService = inject(MetadataProgressService); private bookdropFileService = inject(BookdropFileService); + private taskEventService = inject(TaskEventService); private appConfigService = inject(AppConfigService); ngOnInit(): void { @@ -64,6 +66,11 @@ export class AppComponent implements OnInit { this.notificationEventService.handleNewNotification(logNotification); }); + this.rxStompService.watch('/topic/task').subscribe((message: Message) => { + const taskMessage: TaskMessage = parseTaskMessage(message.body); + this.taskEventService.handleTaskMessage(taskMessage); + }); + this.rxStompService.watch('/topic/bookdrop-file').subscribe((message: Message) => { const notification = JSON.parse(message.body) as BookdropFileNotification; this.bookdropFileService.handleIncomingFile(notification); diff --git a/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.html b/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.html index 80a886bbe..c2d974a9e 100644 --- a/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.html +++ b/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.html @@ -1,23 +1,24 @@ -
+
-

Pending Bookdrop Files

+

Pending Bookdrop Files

{{ pendingCount }}

@if (lastUpdatedAt) { -

+

Last updated: {{ lastUpdatedAt | date: 'short' }}

}
- +
diff --git a/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.ts b/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.ts index ad71c3093..9bd2cc73e 100644 --- a/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.ts +++ b/booklore-ui/src/app/bookdrop/bookdrop-files-widget-component/bookdrop-files-widget.component.ts @@ -4,6 +4,7 @@ import {takeUntil} from 'rxjs/operators'; import {BookdropFileNotification, BookdropFileService} from '../bookdrop-file.service'; import {DatePipe} from '@angular/common'; import {Router} from '@angular/router'; +import {Button} from 'primeng/button'; @Component({ selector: 'app-bookdrop-files-widget-component', @@ -11,7 +12,8 @@ import {Router} from '@angular/router'; templateUrl: './bookdrop-files-widget.component.html', styleUrl: './bookdrop-files-widget.component.scss', imports: [ - DatePipe + DatePipe, + Button ] }) export class BookdropFilesWidgetComponent implements OnInit, OnDestroy { diff --git a/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.html b/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.html index 59f219e81..175e51877 100644 --- a/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.html +++ b/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.html @@ -1,4 +1,4 @@ -
-

{{ latestNotification.timestamp }}

-

{{ latestNotification.message }}

+
+

{{ latestNotification.timestamp }}

+

{{ latestNotification.message }}

diff --git a/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.ts b/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.ts index 2696a23ed..750fef8ca 100644 --- a/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.ts +++ b/booklore-ui/src/app/core/component/live-notification-box/live-notification-box.component.ts @@ -1,6 +1,6 @@ import {Component, inject} from '@angular/core'; -import {LogNotification} from '../../../shared/websocket/model/log-notification.model'; import {NotificationEventService} from '../../../shared/websocket/notification-event.service'; +import {LogNotification} from '../../../shared/websocket/model/log-notification.model'; @Component({ selector: 'app-live-notification-box', diff --git a/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.html b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.html new file mode 100644 index 000000000..6c249b429 --- /dev/null +++ b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.html @@ -0,0 +1,43 @@ +
+ @if (tasks$ | async; as tasks) { + @if (tasks.length === 0) { +
+

No active tasks

+
+ } @else { + @for (task of tasks; track task.taskId; let i = $index) { + @if (i > 0) { +
+ } +
+

{{ task.timestamp }}

+ +
+
+
+ + {{ getStatusText(task.status) }} + + @if (task.title) { +

Task: {{ task.title }}

+ } +
+
+ @if (task.status === TaskStatus.IN_PROGRESS && task.cancellable) { + + + } + + +
+
+

{{ task.message }}

+
+ +
+ } + } + } +
diff --git a/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.scss b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.scss new file mode 100644 index 000000000..76336d9e6 --- /dev/null +++ b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.scss @@ -0,0 +1,6 @@ +.live-border { + background: var(--card-background); + border: 1px solid var(--primary-color); + border-radius: 0.5rem; +} + diff --git a/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.ts b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.ts new file mode 100644 index 000000000..b47029c45 --- /dev/null +++ b/booklore-ui/src/app/core/component/live-task-event-box/live-task-event-box.component.ts @@ -0,0 +1,84 @@ +import {Component, inject} from '@angular/core'; +import {CommonModule} from '@angular/common'; +import {TaskMessage, TaskStatus} from '../../../shared/websocket/model/log-notification.model'; +import {TaskEventService} from '../../../shared/websocket/task-event.service'; +import {TaskService} from '../../../shared/services/task.service'; +import {Observable, take} from 'rxjs'; +import {Button} from 'primeng/button'; +import {MessageService} from 'primeng/api'; + +@Component({ + selector: 'app-live-task-event-box', + standalone: true, + imports: [CommonModule, Button], + templateUrl: './live-task-event-box.component.html', + styleUrls: ['./live-task-event-box.component.scss'], + host: { + class: 'config-panel' + } +}) +export class LiveTaskEventBoxComponent { + tasks$: Observable; + TaskStatus = TaskStatus; + + private taskEventService = inject(TaskEventService); + private taskService = inject(TaskService); + private messageService = inject(MessageService); + + constructor() { + this.tasks$ = this.taskEventService.tasks$; + } + + cancelTask(taskId: string): void { + this.taskService.cancelTask(taskId).pipe(take(1)).subscribe({ + next: (response) => { + this.messageService.add({ + severity: 'success', + summary: 'Success', + detail: `Task cancellation scheduled` + }); + }, + error: (error) => { + this.messageService.add({ + severity: 'error', + summary: 'Cancellation failed', + detail: error.error?.error || 'Failed to cancel task' + }); + } + }); + } + + removeTask(taskId: string): void { + this.taskEventService.removeTask(taskId); + } + + getStatusClasses(status: TaskStatus): string { + switch (status) { + case TaskStatus.IN_PROGRESS: + return 'bg-blue-700 text-zinc-100'; + case TaskStatus.COMPLETED: + return 'bg-green-700 text-zinc-100'; + case TaskStatus.FAILED: + return 'bg-red-700 text-zinc-100'; + case TaskStatus.CANCELLED: + return 'bg-gray-700 text-zinc-100'; + default: + return 'bg-zinc-700 text-zinc-100'; + } + } + + getStatusText(status: TaskStatus): string { + switch (status) { + case TaskStatus.IN_PROGRESS: + return 'Running'; + case TaskStatus.COMPLETED: + return 'Completed'; + case TaskStatus.FAILED: + return 'Failed'; + case TaskStatus.CANCELLED: + return 'Cancelled'; + default: + return 'Unknown'; + } + } +} diff --git a/booklore-ui/src/app/core/component/unified-notification-popover-component/unified-notification-popover-component.html b/booklore-ui/src/app/core/component/unified-notification-popover-component/unified-notification-popover-component.html index 36547be20..1d1c30865 100644 --- a/booklore-ui/src/app/core/component/unified-notification-popover-component/unified-notification-popover-component.html +++ b/booklore-ui/src/app/core/component/unified-notification-popover-component/unified-notification-popover-component.html @@ -1,5 +1,8 @@ -