This should improve announcement service performance

This commit is contained in:
NovaFox161
2021-12-07 10:42:35 -06:00
parent be41cdb52d
commit 766f1377ca
3 changed files with 69 additions and 13 deletions

View File

@@ -8,6 +8,8 @@ import discord4j.core.`object`.entity.channel.GuildMessageChannel
import discord4j.core.spec.MessageCreateSpec
import discord4j.rest.http.client.ClientException
import io.netty.handler.codec.http.HttpResponseStatus
import org.dreamexposure.discal.Application
import org.dreamexposure.discal.Application.Companion.getShardIndex
import org.dreamexposure.discal.client.DisCalClient
import org.dreamexposure.discal.client.message.embed.AnnouncementEmbed
import org.dreamexposure.discal.core.`object`.announcement.Announcement
@@ -38,7 +40,7 @@ class AnnouncementService : ApplicationRunner {
// Start
override fun run(args: ApplicationArguments?) {
Flux.interval(Duration.ofMinutes(5))
.onBackpressureBuffer()
.onBackpressureDrop()
.flatMap { doAnnouncementCycle() }
.doOnError { LOGGER.error(GlobalVal.DEFAULT, "!-Announcement run error-!", it) }
.subscribe()
@@ -49,18 +51,25 @@ class AnnouncementService : ApplicationRunner {
//TODO: This should come in through DI once other legacy is removed/rewritten
if (DisCalClient.client == null) return Mono.empty()
return DisCalClient.client!!.guilds.flatMap { guild ->
DatabaseManager.getEnabledAnnouncements(guild.id)
.flatMapMany { Flux.fromIterable(it) }
.flatMap { announcement ->
when (announcement.modifier) {
AnnouncementModifier.BEFORE -> handleBeforeModifier(guild, announcement)
AnnouncementModifier.DURING -> handleDuringModifier(guild, announcement)
AnnouncementModifier.END -> handleEndModifier(guild, announcement)
// Get announcements for this shard, then group by guild to make caching easier
return DatabaseManager.getAnnouncementsForShard(Application.getShardCount(), getShardIndex().toInt()).map { list ->
list.groupBy { it.guildId }
}.flatMapMany { groupedAnnouncements ->
Flux.fromIterable(groupedAnnouncements.entries)
.flatMap { DisCalClient.client!!.getGuildById(it.key) }
.flatMap { guild ->
val announcements = groupedAnnouncements[guild.id] ?: emptyList()
Flux.fromIterable(announcements).flatMap { announcement ->
when (announcement.modifier) {
AnnouncementModifier.BEFORE -> handleBeforeModifier(guild, announcement)
AnnouncementModifier.DURING -> handleDuringModifier(guild, announcement)
AnnouncementModifier.END -> handleEndModifier(guild, announcement)
}
}.doOnError {
LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it)
}.onErrorResume { Mono.empty() }
}
}.doOnError {
LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it)
}.onErrorResume { Mono.empty() }
}.doOnError {
LOGGER.error(GlobalVal.DEFAULT, "Announcement error", it)
}.onErrorResume {

View File

@@ -31,7 +31,7 @@ class StaticMessageService : ApplicationRunner {
override fun run(args: ApplicationArguments?) {
Flux.interval(Duration.ofHours(1))
.onBackpressureBuffer()
.onBackpressureDrop()
.flatMap { doMessageUpdateLogic() }
.doOnError { LOGGER.error(DEFAULT, "!-Static Message Service Error-!", it) }
.subscribe()

View File

@@ -1398,6 +1398,48 @@ object DatabaseManager {
}.defaultIfEmpty(emptyMap())
}
}
/* Announcement Data */
fun getAnnouncementsForShard(shardCount: Int, shardIndex: Int): Mono<List<Announcement>> {
return connect { c ->
Mono.from(
c.createStatement(Queries.SELECT_ANNOUNCEMENTS_FOR_SHARD)
.bind(0, shardCount)
.bind(1, shardIndex)
.execute()
).flatMapMany { res ->
res.map { row, _ ->
val announcementId = UUID.fromString(row.get("ANNOUNCEMENT_ID", String::class.java))
val guildId = Snowflake.of(row["GUILD_ID", String::class.java]!!)
val a = Announcement(guildId, announcementId)
a.calendarNumber = row["CALENDAR_NUMBER", Int::class.java]!!
a.setSubscriberRoleIdsFromString(row["SUBSCRIBERS_ROLE", String::class.java]!!)
a.setSubscriberUserIdsFromString(row["SUBSCRIBERS_USER", String::class.java]!!)
a.announcementChannelId = row["CHANNEL_ID", String::class.java]!!
a.type = AnnouncementType.valueOf(row["ANNOUNCEMENT_TYPE", String::class.java]!!)
a.modifier = AnnouncementModifier.valueOf(row["MODIFIER", String::class.java]!!)
a.eventId = row["EVENT_ID", String::class.java]!!
a.eventColor = fromNameOrHexOrId(row["EVENT_COLOR", String::class.java]!!)
a.hoursBefore = row["HOURS_BEFORE", Int::class.java]!!
a.minutesBefore = row["MINUTES_BEFORE", Int::class.java]!!
a.info = row["INFO", String::class.java]!!
a.enabled = row["ENABLED", Boolean::class.java]!!
a.publish = row["PUBLISH", Boolean::class.java]!!
a
}
}.retryWhen(Retry.max(3)
.filter(IllegalStateException::class::isInstance)
.filter { it.message != null && it.message!!.contains("Request queue was disposed") }
).doOnError {
LOGGER.error(DEFAULT, "Failed to get announcements for shard", it)
}.onErrorResume {
Mono.empty()
}.collectList()
}
}
}
private object Queries {
@@ -1606,6 +1648,11 @@ private object Queries {
val SELECT_MANY_EVENT_DATA = """SELECT * FROM ${Tables.EVENTS}
WHERE event_id in (?)
""".trimMargin()
@Language("MySQL")
val SELECT_ANNOUNCEMENTS_FOR_SHARD = """SELECT * FROM ${Tables.ANNOUNCEMENTS}
WHERE MOD(guild_id >> 22, ?) = ?
""".trimMargin()
}
private object Tables {