From 766f1377cae64cb71cf3f07a520137076027f0c2 Mon Sep 17 00:00:00 2001 From: NovaFox161 Date: Tue, 7 Dec 2021 10:42:35 -0600 Subject: [PATCH] This should improve announcement service performance --- .../client/service/AnnouncementService.kt | 33 ++++++++----- .../client/service/StaticMessageService.kt | 2 +- .../discal/core/database/DatabaseManager.kt | 47 +++++++++++++++++++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/client/src/main/kotlin/org/dreamexposure/discal/client/service/AnnouncementService.kt b/client/src/main/kotlin/org/dreamexposure/discal/client/service/AnnouncementService.kt index 804645ab..a4255915 100644 --- a/client/src/main/kotlin/org/dreamexposure/discal/client/service/AnnouncementService.kt +++ b/client/src/main/kotlin/org/dreamexposure/discal/client/service/AnnouncementService.kt @@ -8,6 +8,8 @@ import discord4j.core.`object`.entity.channel.GuildMessageChannel import discord4j.core.spec.MessageCreateSpec import discord4j.rest.http.client.ClientException import io.netty.handler.codec.http.HttpResponseStatus +import org.dreamexposure.discal.Application +import org.dreamexposure.discal.Application.Companion.getShardIndex import org.dreamexposure.discal.client.DisCalClient import org.dreamexposure.discal.client.message.embed.AnnouncementEmbed import org.dreamexposure.discal.core.`object`.announcement.Announcement @@ -38,7 +40,7 @@ class AnnouncementService : ApplicationRunner { // Start override fun run(args: ApplicationArguments?) { Flux.interval(Duration.ofMinutes(5)) - .onBackpressureBuffer() + .onBackpressureDrop() .flatMap { doAnnouncementCycle() } .doOnError { LOGGER.error(GlobalVal.DEFAULT, "!-Announcement run error-!", it) } .subscribe() @@ -49,18 +51,25 @@ class AnnouncementService : ApplicationRunner { //TODO: This should come in through DI once other legacy is removed/rewritten if (DisCalClient.client == null) return Mono.empty() - return DisCalClient.client!!.guilds.flatMap { guild -> - DatabaseManager.getEnabledAnnouncements(guild.id) - .flatMapMany { Flux.fromIterable(it) } - .flatMap { announcement -> - when (announcement.modifier) { - AnnouncementModifier.BEFORE -> handleBeforeModifier(guild, announcement) - AnnouncementModifier.DURING -> handleDuringModifier(guild, announcement) - AnnouncementModifier.END -> handleEndModifier(guild, announcement) + // Get announcements for this shard, then group by guild to make caching easier + return DatabaseManager.getAnnouncementsForShard(Application.getShardCount(), getShardIndex().toInt()).map { list -> + list.groupBy { it.guildId } + }.flatMapMany { groupedAnnouncements -> + Flux.fromIterable(groupedAnnouncements.entries) + .flatMap { DisCalClient.client!!.getGuildById(it.key) } + .flatMap { guild -> + val announcements = groupedAnnouncements[guild.id] ?: emptyList() + + Flux.fromIterable(announcements).flatMap { announcement -> + when (announcement.modifier) { + AnnouncementModifier.BEFORE -> handleBeforeModifier(guild, announcement) + AnnouncementModifier.DURING -> handleDuringModifier(guild, announcement) + AnnouncementModifier.END -> handleEndModifier(guild, announcement) + } + }.doOnError { + LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it) + }.onErrorResume { Mono.empty() } } - }.doOnError { - LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it) - }.onErrorResume { Mono.empty() } }.doOnError { LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it) }.onErrorResume { diff --git a/client/src/main/kotlin/org/dreamexposure/discal/client/service/StaticMessageService.kt b/client/src/main/kotlin/org/dreamexposure/discal/client/service/StaticMessageService.kt index acc0e3b0..e3b8272a 100644 --- a/client/src/main/kotlin/org/dreamexposure/discal/client/service/StaticMessageService.kt +++ b/client/src/main/kotlin/org/dreamexposure/discal/client/service/StaticMessageService.kt @@ -31,7 +31,7 @@ class StaticMessageService : ApplicationRunner { override fun run(args: ApplicationArguments?) { Flux.interval(Duration.ofHours(1)) - .onBackpressureBuffer() + .onBackpressureDrop() .flatMap { doMessageUpdateLogic() } .doOnError { LOGGER.error(DEFAULT, "!-Static Message Service Error-!", it) } .subscribe() diff --git a/core/src/main/kotlin/org/dreamexposure/discal/core/database/DatabaseManager.kt b/core/src/main/kotlin/org/dreamexposure/discal/core/database/DatabaseManager.kt index ffddbdd8..73b37233 100644 --- a/core/src/main/kotlin/org/dreamexposure/discal/core/database/DatabaseManager.kt +++ b/core/src/main/kotlin/org/dreamexposure/discal/core/database/DatabaseManager.kt @@ -1398,6 +1398,48 @@ object DatabaseManager { }.defaultIfEmpty(emptyMap()) } } + + /* Announcement Data */ + + fun getAnnouncementsForShard(shardCount: Int, shardIndex: Int): Mono> { + return connect { c -> + Mono.from( + c.createStatement(Queries.SELECT_ANNOUNCEMENTS_FOR_SHARD) + .bind(0, shardCount) + .bind(1, shardIndex) + .execute() + ).flatMapMany { res -> + res.map { row, _ -> + val announcementId = UUID.fromString(row.get("ANNOUNCEMENT_ID", String::class.java)) + val guildId = Snowflake.of(row["GUILD_ID", String::class.java]!!) + + val a = Announcement(guildId, announcementId) + a.calendarNumber = row["CALENDAR_NUMBER", Int::class.java]!! + a.setSubscriberRoleIdsFromString(row["SUBSCRIBERS_ROLE", String::class.java]!!) + a.setSubscriberUserIdsFromString(row["SUBSCRIBERS_USER", String::class.java]!!) + a.announcementChannelId = row["CHANNEL_ID", String::class.java]!! + a.type = AnnouncementType.valueOf(row["ANNOUNCEMENT_TYPE", String::class.java]!!) + a.modifier = AnnouncementModifier.valueOf(row["MODIFIER", String::class.java]!!) + a.eventId = row["EVENT_ID", String::class.java]!! + a.eventColor = fromNameOrHexOrId(row["EVENT_COLOR", String::class.java]!!) + a.hoursBefore = row["HOURS_BEFORE", Int::class.java]!! + a.minutesBefore = row["MINUTES_BEFORE", Int::class.java]!! + a.info = row["INFO", String::class.java]!! + a.enabled = row["ENABLED", Boolean::class.java]!! + a.publish = row["PUBLISH", Boolean::class.java]!! + + a + } + }.retryWhen(Retry.max(3) + .filter(IllegalStateException::class::isInstance) + .filter { it.message != null && it.message!!.contains("Request queue was disposed") } + ).doOnError { + LOGGER.error(DEFAULT, "Failed to get announcements for shard", it) + }.onErrorResume { + Mono.empty() + }.collectList() + } + } } private object Queries { @@ -1606,6 +1648,11 @@ private object Queries { val SELECT_MANY_EVENT_DATA = """SELECT * FROM ${Tables.EVENTS} WHERE event_id in (?) """.trimMargin() + + @Language("MySQL") + val SELECT_ANNOUNCEMENTS_FOR_SHARD = """SELECT * FROM ${Tables.ANNOUNCEMENTS} + WHERE MOD(guild_id >> 22, ?) = ? + """.trimMargin() } private object Tables {