diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index ef89efbd..f588ab81 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -586,7 +586,7 @@ private: // we restart everything while under load, it's not great to block here // but it's probably OK to do so in those cases. We should also automatically // clear the alert when done with this. - XmonAlert alert = -1; + XmonNCAlert alert; for (;;) { if (sendto(sock, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)) == len) { break; @@ -594,7 +594,7 @@ private: int err = errno; // Note that we get EPERM on `sendto` when nf drops packets. if (err == EAGAIN || err == EPERM) { - _env.raiseAlert(alert, false, "we got %s/%s=%s when trying to send shard message, will wait and retry", err, translateErrno(err), safe_strerror(err)); + _env.updateAlert(alert, "we got %s/%s=%s when trying to send shard message, will wait and retry", err, translateErrno(err), safe_strerror(err)); sleepFor(100_ms); } else { _env.clearAlert(alert); @@ -642,8 +642,8 @@ public: void run() { EggsTime successfulIterationAt = 0; auto shards = std::make_unique>(); - XmonAlert alert = -1; - _env.raiseAlert(alert, false, "Waiting to get shards"); + XmonNCAlert alert; + _env.updateAlert(alert, "Waiting to get shards"); for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (_stopper.shouldStop()) { @@ -658,7 +658,7 @@ public: LOG_INFO(_env, "Last successful shard fetch was at %s, now we're at %s, fetching again", successfulIterationAt, now); std::string err = fetchShards(_shuckleHost, _shucklePort, 100_ms, *shards); if (!err.empty()) { - _env.raiseAlert(alert, false, "failed to reach shuckle at %s:%s to fetch shards, will retry: %s", _shuckleHost, _shucklePort, err); + _env.updateAlert(alert, "failed to reach shuckle at %s:%s to fetch shards, will retry: %s", _shuckleHost, _shucklePort, err); EggsTime successfulIterationAt = 0; continue; } @@ -671,7 +671,7 @@ public: } if (badShard) { EggsTime successfulIterationAt = 0; - _env.raiseAlert(alert, false, "Shard info is still not present in shuckle, will keep trying"); + _env.updateAlert(alert, "Shard info is still not present in shuckle, will keep trying"); continue; } { @@ -726,8 +726,8 @@ public: void run() { uint64_t rand = eggsNow().ns; EggsTime nextRegister = 0; // when 0, it means that the last one wasn't successful - XmonAlert alert = -1; - _env.raiseAlert(alert, false, "Waiting to register ourselves for the first time"); + XmonNCAlert alert; + _env.updateAlert(alert, "Waiting to register ourselves for the first time"); for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(100 + (wyhash64(&rand)%100))); // fuzz the startup busy loop if (_stopper.shouldStop()) { @@ -748,7 +748,7 @@ public: LOG_DEBUG(_env, "Registering ourselves (CDC, %s:%s, %s:%s) with shuckle", in_addr{htonl(_ownIp1)}, port1, in_addr{htonl(_ownIp2)}, port2); std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp1, port1, _ownIp2, port2); if (!err.empty()) { - _env.raiseAlert(alert, false, "Couldn't register ourselves with shuckle: %s", err); + _env.updateAlert(alert, "Couldn't register ourselves with shuckle: %s", err); nextRegister = 0; continue; } @@ -797,8 +797,8 @@ public: bool lastRequestSuccessful = false; std::vector stats; std::string prefix = "cdc"; - XmonAlert alert = -1; - _env.raiseAlert(alert, false, "Waiting to insert stats for the first time"); + XmonNCAlert alert; + _env.updateAlert(alert, "Waiting to insert stats for the first time"); const auto insertCDCStats = [this, &stats, &alert]() { std::string err; @@ -825,7 +825,7 @@ public: _shared.timingsProcess[(int)kind].reset(); } } else { - _env.raiseAlert(alert, false, "Could not insert stats: %s", err); + _env.updateAlert(alert, "Could not insert stats: %s", err); } return err; }; diff --git a/cpp/core/Env.hpp b/cpp/core/Env.hpp index d99f34a7..e1207125 100644 --- a/cpp/core/Env.hpp +++ b/cpp/core/Env.hpp @@ -103,24 +103,31 @@ public: } template - void raiseAlert(XmonAlert& alert, bool binnable, const char* fmt, Args&&... args) { + void raiseAlert(const char* fmt, Args&&... args) { std::stringstream ss; format_pack(ss, fmt, args...); std::string line; std::string s = ss.str(); _log(LogLevel::LOG_ERROR, s.c_str()); if (_xmon) { - if (alert < 0) { - alert = _xmon->createAlert(binnable, s); - } else { - _xmon->updateAlert(alert, binnable, s); - } + _xmon->raiseAlert(s); } } - void clearAlert(XmonAlert& alert) { - if (_xmon && alert >= 0) { _xmon->clearAlert(alert); } - alert = -1; + template + void updateAlert(XmonNCAlert& alert, const char* fmt, Args&&... args) { + std::stringstream ss; + format_pack(ss, fmt, args...); + std::string line; + std::string s = ss.str(); + _log(LogLevel::LOG_ERROR, s.c_str()); + if (_xmon) { + _xmon->updateAlert(alert, s); + } + } + + void clearAlert(XmonNCAlert& alert) { + if (_xmon) { _xmon->clearAlert(alert); } } bool _shouldLog(LogLevel level) { @@ -166,6 +173,5 @@ public: #define RAISE_ALERT(env, ...) \ do { \ - XmonAlert alert = -1; \ - (env).raiseAlert(alert, true, VALIDATE_FORMAT(__VA_ARGS__)); \ + (env).raiseAlert(VALIDATE_FORMAT(__VA_ARGS__)); \ } while (false) diff --git a/cpp/core/Xmon.cpp b/cpp/core/Xmon.cpp index 4bf2b174..66a3d405 100644 --- a/cpp/core/Xmon.cpp +++ b/cpp/core/Xmon.cpp @@ -1,10 +1,12 @@ #include #include #include +#include #include "Exception.hpp" #include "Xmon.hpp" #include "Connect.hpp" +#include "XmonAgent.hpp" enum struct XmonMood : int32_t { Happy = 0, @@ -148,15 +150,18 @@ bool XmonBuf::readIn(int fd, size_t sz, std::string& errString) { readSoFar += r; } len += sz; - return readSoFar; + return true; } +constexpr int MAX_BINNABLE_ALERTS = 20; + void Xmon::run() { int numFailures = 0; XmonBuf buf; int sock = -1; std::deque requests; + std::unordered_set binnableAlerts; std::string errString; @@ -177,11 +182,11 @@ reconnect: // We've got a socket, reset conn failures numFailures = 0; - // 10ms timeout for prompt termination/processing + // 100ms timeout for prompt termination/processing { struct timeval tv; tv.tv_sec = 0; - tv.tv_usec = 10'000; + tv.tv_usec = 100'000; if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,&tv,sizeof(tv)) < 0) { throw SYSCALL_EXCEPTION("setsockopt"); } @@ -204,20 +209,46 @@ reconnect: _agent->getRequests(requests); while (!requests.empty()) { const auto& req = requests.front(); + int64_t alertIdToInsert = -1; if (req.msgType == XmonRequestType::CREATE) { + if (req.binnable) { + alertIdToInsert = req.alertId; + if (binnableAlerts.size() > MAX_BINNABLE_ALERTS) { + LOG_ERROR(_env, "not creating alert, aid=%s binnable=%s message=%s, we're full", req.alertId, req.binnable, req.message); + if (binnableAlerts.count(XmonAgent::TOO_MANY_ALERTS_ALERT_ID) == 0) { + XmonRequest req; + req.msgType = XmonRequestType::CREATE; + req.alertId = XmonAgent::TOO_MANY_ALERTS_ALERT_ID; + req.message = "too many alerts, alerts dropped"; + req.binnable = true; + packRequest(buf, req); + alertIdToInsert = XmonAgent::TOO_MANY_ALERTS_ALERT_ID; + goto write_request; + } else { + goto skip_request; + } + } + } LOG_INFO(_env, "creating alert, aid=%s binnable=%s message=%s", req.alertId, req.binnable, req.message); } else if (req.msgType == XmonRequestType::UPDATE) { + ALWAYS_ASSERT(!req.binnable); LOG_INFO(_env, "updating alert, aid=%s binnable=%s message=%s", req.alertId, req.binnable, req.message); } else if (req.msgType == XmonRequestType::CLEAR) { + ALWAYS_ASSERT(!req.binnable); LOG_INFO(_env, "clearing alert, aid=%s", req.alertId); } else { ALWAYS_ASSERT(false, "bad req type %s", (int)req.msgType); } packRequest(buf, req); + write_request: errString = buf.writeOut(sock); CHECK_ERR_STRING(errString); - requests.pop_front(); LOG_DEBUG(_env, "sent request to xmon"); + if (alertIdToInsert >= 0) { + binnableAlerts.insert(alertIdToInsert); + } + skip_request: + requests.pop_front(); } } @@ -247,13 +278,15 @@ reconnect: CHECK_ERR_STRING("send heartbeat"); break; } case 0x1: { - LOG_INFO(_env, "got alert binned from UI, ignoring"); bool success = false; do { errString.clear(); success = buf.readIn(sock, 8, errString); CHECK_ERR_STRING("reading alert binned id"); } while (!success); + int64_t alertId = buf.unpackScalar(); + LOG_INFO(_env, "got alert %s binned from UI", alertId); + binnableAlerts.erase(alertId); break; } default: throw EGGS_EXCEPTION("unknown message type %s", msgType); diff --git a/cpp/core/Xmon.hpp b/cpp/core/Xmon.hpp index da343787..9d2568a4 100644 --- a/cpp/core/Xmon.hpp +++ b/cpp/core/Xmon.hpp @@ -93,7 +93,7 @@ struct XmonBuf { // returns an error string if it failed std::string writeOut(int fd); - // if false, we got EAGAIN immediately. + // if false with empty error string, we got EAGAIN immediately. bool readIn(int fd, size_t sz, std::string& errString); }; diff --git a/cpp/core/XmonAgent.hpp b/cpp/core/XmonAgent.hpp index 2f7fea49..55ece983 100644 --- a/cpp/core/XmonAgent.hpp +++ b/cpp/core/XmonAgent.hpp @@ -19,8 +19,11 @@ struct XmonRequest { std::string message; }; -// We use -1 for "nothing" sometimes. -using XmonAlert = int64_t; +struct XmonNCAlert { + int64_t alertId; + + XmonNCAlert() : alertId(-1) {} +}; struct XmonAgent { private: @@ -28,41 +31,53 @@ private: std::deque _requests; std::atomic _alertId; -public: - XmonAgent() : _alertId(0) {} - - void addRequest(XmonRequest&& req) { + void _addRequest(XmonRequest&& req) { std::lock_guard lock(_mu); _requests.emplace_back(req); } - XmonAlert createAlert(bool binnable, const std::string& message) { - XmonAlert aid = _alertId.fetch_add(1); + int64_t _createAlert(bool binnable, const std::string& message) { + int64_t aid = _alertId.fetch_add(1); XmonRequest req; req.msgType = XmonRequestType::CREATE; req.alertId = aid; req.binnable = binnable; req.message = message; - addRequest(std::move(req)); + _addRequest(std::move(req)); return aid; } - void updateAlert(XmonAlert aid, bool binnable, const std::string& message) { - XmonRequest req; - req.msgType = XmonRequestType::UPDATE; - req.alertId = aid; - req.binnable = binnable; - req.message = message; - addRequest(std::move(req)); +public: + static constexpr int64_t TOO_MANY_ALERTS_ALERT_ID = 0; + + XmonAgent() : _alertId(1) {} + + void raiseAlert(const std::string& message) { + _createAlert(true, message); } - void clearAlert(XmonAlert aid) { + void updateAlert(XmonNCAlert& aid, const std::string& message) { + if (aid.alertId < 0) { + aid.alertId = _createAlert(false, message); + } else { + XmonRequest req; + req.msgType = XmonRequestType::UPDATE; + req.alertId = aid.alertId; + req.binnable = false; + req.message = message; + _addRequest(std::move(req)); + } + } + + void clearAlert(XmonNCAlert& aid) { + if (aid.alertId < 0) { return; } XmonRequest req; req.msgType = XmonRequestType::CLEAR; - req.alertId = aid; + req.alertId = aid.alertId; req.binnable = false; req.message = {}; - addRequest(std::move(req)); + _addRequest(std::move(req)); + aid.alertId = -1; } void getRequests(std::deque& reqs) { diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 96eec036..cc15eb2a 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -366,8 +366,8 @@ public: void run() { uint64_t rand = eggsNow().ns; EggsTime nextRegister = 0; // when 0, it means that the last one wasn't successful - XmonAlert alert = -1; - _env.raiseAlert(alert, false, "Waiting to register ourselves for the first time"); + XmonNCAlert alert; + _env.updateAlert(alert, "Waiting to register ourselves for the first time"); for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(100 + (wyhash64(&rand)%100))); // fuzz the startup busy loop if (_stopper.shouldStop()) { @@ -391,7 +391,7 @@ public: LOG_DEBUG(_env, "Registering ourselves (shard %s, %s:%s, %s:%s) with shuckle", _shid, in_addr{htonl(ip1)}, port1, in_addr{htonl(ip2)}, port2); std::string err = registerShard(_shuckleHost, _shucklePort, 100_ms, _shid, ip1, port1, ip2, port2); if (!err.empty()) { - _env.raiseAlert(alert, false, "Couldn't register ourselves with shuckle: %s", err); + _env.updateAlert(alert, "Couldn't register ourselves with shuckle: %s", err); continue; } _env.clearAlert(alert); @@ -441,8 +441,8 @@ public: bool lastRequestSuccessful = false; auto respContainer = std::make_unique(); auto logEntry = std::make_unique(); - XmonAlert shuckleAlert = -1; - _env.raiseAlert(shuckleAlert, false, "Waiting to fetch block services for the first time"); + XmonNCAlert shuckleAlert; + _env.updateAlert(shuckleAlert, "Waiting to fetch block services for the first time"); #define GO_TO_NEXT_ITERATION \ std::this_thread::sleep_for(std::chrono::milliseconds(10)); \ @@ -477,7 +477,7 @@ public: } lastRequestT = t; if (!lastRequestSuccessful) { - _env.raiseAlert(shuckleAlert, false, "could not reach shuckle: %s", err); + _env.updateAlert(shuckleAlert, "could not reach shuckle: %s", err); GO_TO_NEXT_ITERATION } @@ -536,7 +536,7 @@ public: EggsTime lastRequestT = 0; bool lastRequestSuccessful = false; std::vector stats; - XmonAlert alert = -1; + XmonNCAlert alert; const auto insertShardStats = [this, &stats, &alert]() { std::string err; @@ -555,7 +555,7 @@ public: _shared.errors[(int)kind].reset(); } } else { - _env.raiseAlert(alert, false, "Could not insert stats: %s", err); + _env.updateAlert(alert, "Could not insert stats: %s", err); } return err; }; diff --git a/go/eggsblocks/eggsblocks.go b/go/eggsblocks/eggsblocks.go index 8c9fa5a3..f6ee21e4 100644 --- a/go/eggsblocks/eggsblocks.go +++ b/go/eggsblocks/eggsblocks.go @@ -118,8 +118,8 @@ func initBlockServicesInfo( var wg sync.WaitGroup wg.Add(len(blockServices)) failed := int32(0) - alert := log.NewAlert() - log.Raise(alert, false, "getting info for %v block services", len(blockServices)) + alert := log.NewNCAlert() + log.RaiseNC(alert, "getting info for %v block services", len(blockServices)) for id, bs := range blockServices { bs.cachedInfo.Id = id bs.cachedInfo.Ip1 = ip1 @@ -143,7 +143,7 @@ func initBlockServicesInfo( if failed == 1 { return fmt.Errorf("some block service infos failed to update") } - log.Clear(alert) + log.ClearNC(alert) return nil } @@ -153,7 +153,7 @@ func registerPeriodically( shuckleAddress string, ) { req := msgs.RegisterBlockServicesReq{} - alert := log.NewAlert() + alert := log.NewNCAlert() for { req.BlockServices = req.BlockServices[:0] for _, bs := range blockServices { @@ -162,11 +162,11 @@ func registerPeriodically( log.Trace("registering with %+v", req) _, err := lib.ShuckleRequest(log, nil, shuckleAddress, &req) if err != nil { - log.Raise(alert, false, "could not register block services with %+v: %v", shuckleAddress, err) + log.RaiseNC(alert, "could not register block services with %+v: %v", shuckleAddress, err) time.Sleep(100 * time.Millisecond) continue } - log.Clear(alert) + log.ClearNC(alert) waitForRange := time.Minute * 2 waitFor := time.Duration(mrand.Uint64() % uint64(waitForRange.Nanoseconds())) log.Info("registered with %v, waiting %v", shuckleAddress, waitFor) @@ -532,7 +532,7 @@ NextRequest: futureCutoffTime := msgs.EggsTime(uint64(whichReq.BlockId)).Time().Add(FUTURE_CUTOFF) now := time.Now() if timeCheck && (now.Before(pastCutoffTime) || now.After(futureCutoffTime)) { - log.RaiseAlert("block %v is too old or too new to be deleted (now=%v, pastCutoffTime=%v, futureCutoffTime=%v)", whichReq.BlockId, now, pastCutoffTime, futureCutoffTime) + log.RaiseAlert("block %v is too old or too new to be written (now=%v, pastCutoffTime=%v, futureCutoffTime=%v)", whichReq.BlockId, now, pastCutoffTime, futureCutoffTime) lib.WriteBlocksResponseError(log, conn, msgs.BLOCK_TOO_RECENT_FOR_DELETION) continue NextRequest } diff --git a/go/eggsshuckle/eggsshuckle.go b/go/eggsshuckle/eggsshuckle.go index 25a0d203..7fa4a011 100644 --- a/go/eggsshuckle/eggsshuckle.go +++ b/go/eggsshuckle/eggsshuckle.go @@ -1747,7 +1747,7 @@ func statsWriter(ll *lib.Logger, st *state) { func serviceMonitor(ll *lib.Logger, st *state, staleDelta time.Duration) error { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - staleAlert := ll.NewAlert() + staleAlert := ll.NewNCAlert() for { <-ticker.C @@ -1837,11 +1837,11 @@ func serviceMonitor(ll *lib.Logger, st *state, staleDelta time.Duration) error { }() if len(alerts) == 0 { - ll.Clear(staleAlert) + ll.ClearNC(staleAlert) continue } msg := strings.Join(alerts, "\n") - ll.Raise(staleAlert, false, msg) + ll.RaiseNC(staleAlert, msg) } } diff --git a/go/lib/blocksreq.go b/go/lib/blocksreq.go index 2e910a86..fcff2573 100644 --- a/go/lib/blocksreq.go +++ b/go/lib/blocksreq.go @@ -33,7 +33,7 @@ func BlockServiceConnection(log *Logger, ip1 [4]byte, port1 uint16, ip2 [4]byte, if errs[i%2] == nil { return sock, nil } - log.RaiseAlert("could not connect to block service %v:%v: %w, might try other ip/port", ip, port, errs[i%2]) + log.RaiseAlert("could not connect to block service %v:%v: %v, might try other ip/port", ip, port, errs[i%2]) } // return one of the two errors, we don't want to mess with them too much and they are alerts for _, err := range errs { diff --git a/go/lib/cdcreq.go b/go/lib/cdcreq.go index 21f8dfdd..2714cdc0 100644 --- a/go/lib/cdcreq.go +++ b/go/lib/cdcreq.go @@ -79,7 +79,7 @@ func (c *Client) CDCRequest( if c.counters != nil { counters = c.counters.CDC } - return c.metadataRequestInternal(logger, -1, c.CDCAddrs(), uint8(msgKind), reqBody, respBody, counters, false) + return c.metadataRequest(logger, -1, c.CDCAddrs(), uint8(msgKind), reqBody, respBody, counters, false) } func CreateCDCSocket(ip [4]byte, port uint16) (*net.UDPConn, error) { diff --git a/go/lib/client.go b/go/lib/client.go index 0bf13993..3f72a1cf 100644 --- a/go/lib/client.go +++ b/go/lib/client.go @@ -661,7 +661,7 @@ func (f *blocksConnFactory) getBlocksConns(log *Logger, blockServiceId msgs.Bloc return sock, nil } if errs[i&1] != nil { - log.RaiseAlert("could not connect to block service %v:%v: %w, might try other ip/port", ip, port, errs[i&1]) + log.RaiseAlert("could not connect to block service %v:%v: %v, might try other ip/port", ip, port, errs[i&1]) } } // if we got two errors, return one of them @@ -677,7 +677,7 @@ func (f *blocksConnFactory) getBlocksConns(log *Logger, blockServiceId msgs.Bloc if errs[i&1] == nil { return sock, nil } - log.RaiseAlert("could not connect to block service %v:%v: %w, might try other ip/port", ip, port, errs[i&1]) + log.RaiseAlert("could not connect to block service %v:%v: %v, might try other ip/port", ip, port, errs[i&1]) } for _, err := range errs { if err != nil { diff --git a/go/lib/log.go b/go/lib/log.go index d83fde2a..4f1fcb7a 100644 --- a/go/lib/log.go +++ b/go/lib/log.go @@ -287,15 +287,6 @@ func (l *Logger) ErrorNoAlert(format string, v ...any) { l.LogStack(1, ERROR, format, v...) } -func (l *Logger) RaiseAlert(format string, v ...any) { - l.RaiseAlertStack(1, format, v...) -} - -func (l *Logger) RaiseAlertStack(calldepth int, format string, v ...any) { - alert := l.NewAlert() - l.RaiseStack(1+calldepth, alert, true, format, v...) -} - func (l *Logger) Metric(category, name string, help string, value uint64, extralabels map[string]string) { if l.mw == nil { return @@ -306,27 +297,39 @@ func (l *Logger) Metric(category, name string, help string, value uint64, extral l.mw.AddMetric(hostmon.NewMetricNowWithLabels(category, name, help, float64(value), extralabels)) } -func (l *Logger) NewAlert() *XmonAlert { +func (l *Logger) NewNCAlert() *XmonNCAlert { if l.xmon == nil { - return &XmonAlert{} + return &XmonNCAlert{} } else { - return l.xmon.NewAlert() + return l.xmon.NewNCAlert() } } -func (l *Logger) RaiseStack(calldepth int, alert *XmonAlert, binnable bool, format string, v ...any) { +func (l *Logger) RaiseAlertStack(calldepth int, format string, v ...any) { l.LogStack(1+calldepth, ERROR, "ALERT "+format, v...) if l.xmon == nil { return } - alert.RaiseStack(l, l.xmon, 1+calldepth, binnable, format, v...) + l.xmon.RaiseStack(l, l.xmon, 1+calldepth, format, v...) } -func (l *Logger) Raise(alert *XmonAlert, binnable bool, format string, v ...any) { - l.RaiseStack(1, alert, binnable, format, v...) +func (l *Logger) RaiseAlert(format string, v ...any) { + l.RaiseAlertStack(1, format, v...) } -func (l *Logger) Clear(alert *XmonAlert) { +func (l *Logger) RaiseNCStack(alert *XmonNCAlert, calldepth int, format string, v ...any) { + l.LogStack(1+calldepth, ERROR, "ALERT "+format, v...) + if l.xmon == nil { + return + } + alert.RaiseStack(l, l.xmon, 1+calldepth, format, v...) +} + +func (l *Logger) RaiseNC(alert *XmonNCAlert, format string, v ...any) { + l.RaiseNCStack(alert, 1, format, v...) +} + +func (l *Logger) ClearNC(alert *XmonNCAlert) { if alert.lastMessage != "" { l.LogStack(1, INFO, "clearing alert, last message %q", alert.lastMessage) } diff --git a/go/lib/metadatareq.go b/go/lib/metadatareq.go index e03ca428..301c907c 100644 --- a/go/lib/metadatareq.go +++ b/go/lib/metadatareq.go @@ -111,7 +111,7 @@ func (requestId *unpackedMetadataRequestId) Unpack(r io.Reader) error { return nil } -func (c *Client) metadataRequestInternal( +func (c *Client) metadataRequest( log *Logger, shid int16, // -1 for cdc addrs *[2]net.UDPAddr, @@ -136,11 +136,11 @@ func (c *Client) metadataRequestInternal( startedAt := time.Now() requestId := c.newRequestId() // will keep trying as long as we get timeouts - timeoutAlert := log.NewAlert() + timeoutAlert := log.NewNCAlert() // We return the error anyway, so once we exit this function this alert must be gone - defer log.Clear(timeoutAlert) + defer log.ClearNC(timeoutAlert) timeoutAlertAndStandalone := func(f string, v ...any) { - log.Raise(timeoutAlert, false, f, v...) + log.RaiseNC(timeoutAlert, f, v...) log.RaiseAlert(f, v...) } timeouts := c.shardTimeout @@ -174,7 +174,7 @@ func (c *Client) metadataRequestInternal( log.Info("dontWait is on, we couldn't send the request due to EPERM %v, goodbye", err) return nil } else { - log.Raise(timeoutAlert, false, "got possibly transient EPERM when sending to shard %v, might retry after waiting for %v: %v", shid, timeout, err) + log.RaiseNC(timeoutAlert, "got possibly transient EPERM when sending to shard %v, might retry after waiting for %v: %v", shid, timeout, err) time.Sleep(timeout) attempts++ continue @@ -208,7 +208,7 @@ func (c *Client) metadataRequestInternal( shouldRetry = true } if shouldRetry { - log.Raise(timeoutAlert, false, "got network error %v to shard %v for req id %v of type %T, will try to retry", err, shid, requestId, reqBody) + log.RaiseNC(timeoutAlert, "got network error %v to shard %v for req id %v of type %T, will try to retry", err, shid, requestId, reqBody) // make sure we've waited as much as the expected timeout, otherwise we might // call in a busy loop due to the server just not being up. time.Sleep(time.Until(readLoopDeadline)) @@ -224,7 +224,7 @@ func (c *Client) metadataRequestInternal( } if err := (&respRequestId).Unpack(respReader); err != nil { if protocolError, ok := err.(*badRespProtocol); ok && (protocolError.receivedProtocol == msgs.CDC_RESP_PROTOCOL_VERSION || protocolError.receivedProtocol == msgs.SHARD_RESP_PROTOCOL_VERSION) { - log.Raise(timeoutAlert, false, "received shard/CDC protocol, probably a late shard/CDC request: %v", err) + log.RaiseNC(timeoutAlert, "received shard/CDC protocol, probably a late shard/CDC request: %v", err) } else { timeoutAlertAndStandalone("could not decode Shard response header for request %v (%T) from shard %v, will continue waiting for responses: %w", req.requestId, req.body, shid, err) } @@ -232,7 +232,7 @@ func (c *Client) metadataRequestInternal( } // Check if we're interested in the request id we got if respRequestId.requestId != requestId { - log.Raise(timeoutAlert, false, "dropping response %v from shard %v, since we expected one of %v", respRequestId.requestId, shid, requestId) + log.RaiseNC(timeoutAlert, "dropping response %v from shard %v, since we expected one of %v", respRequestId.requestId, shid, requestId) continue } // We are interested, parse the kind @@ -268,7 +268,7 @@ func (c *Client) metadataRequestInternal( } // If we've got a timeout, keep trying if eggsError != nil && *eggsError == msgs.TIMEOUT { - log.Raise(timeoutAlert, false, "got resp timeout error %v from shard %v, will try to retry", err, shid) + log.RaiseNC(timeoutAlert, "got resp timeout error %v from shard %v, will try to retry", err, shid) break // keep trying } // At this point, we know we've got a response diff --git a/go/lib/shardreq.go b/go/lib/shardreq.go index 2e0b97a3..c5f38fa9 100644 --- a/go/lib/shardreq.go +++ b/go/lib/shardreq.go @@ -138,7 +138,7 @@ func (c *Client) shardRequestInternal( if c.counters != nil { counters = c.counters.Shard } - return c.metadataRequestInternal(logger, int16(shid), c.ShardAddrs(shid), uint8(msgKind), reqBody, respBody, counters, dontWait) + return c.metadataRequest(logger, int16(shid), c.ShardAddrs(shid), uint8(msgKind), reqBody, respBody, counters, dontWait) } func (c *Client) ShardRequestDontWait( diff --git a/go/lib/shucklereq.go b/go/lib/shucklereq.go index 6addfe48..c7d949bc 100644 --- a/go/lib/shucklereq.go +++ b/go/lib/shucklereq.go @@ -214,8 +214,8 @@ func ShuckleRequest( timeout = &DefaultShuckleTimeout } - alert := log.NewAlert() - defer log.Clear(alert) + alert := log.NewNCAlert() + defer log.ClearNC(alert) var err error var conn net.Conn @@ -230,7 +230,7 @@ Reconnect: log.Info("could not connect to shuckle and we're out of attempts: %v", err) return nil, err } - log.RaiseStack(1, alert, false, "could not connect to shuckle, will retry in %v: %v", delay, err) + log.RaiseNCStack(alert, 1, "could not connect to shuckle, will retry in %v: %v", delay, err) time.Sleep(delay) ReconnectBegin: diff --git a/go/lib/xmon.go b/go/lib/xmon.go index 0895299b..69e2436b 100644 --- a/go/lib/xmon.go +++ b/go/lib/xmon.go @@ -17,24 +17,19 @@ const ( XMON_CLEAR int32 = 0x3 ) -type XmonRequest struct { +type xmonRequest struct { msgType int32 alertId int64 binnable bool message string } -type XmonAlert struct { - alertId int64 // -1 if we haven't got one yet - lastMessage string -} - type Xmon struct { appType string appInstance string hostname string xmonAddr string - requests chan XmonRequest + requests chan xmonRequest } type XmonConfig struct { @@ -102,7 +97,7 @@ func (x *Xmon) packUpdate(buf *bytes.Buffer) { binary.Write(buf, binary.BigEndian, int32(0)) // happy mood } -func (x *Xmon) packRequest(buf *bytes.Buffer, req *XmonRequest) { +func (x *Xmon) packRequest(buf *bytes.Buffer, req *xmonRequest) { if req.alertId < 0 { panic(fmt.Errorf("bad alert id %v", req.alertId)) } @@ -115,13 +110,16 @@ func (x *Xmon) packRequest(buf *bytes.Buffer, req *XmonRequest) { } } +const maxBinnableAlerts int = 20 + func (x *Xmon) run(log *Logger) { buffer := bytes.NewBuffer([]byte{}) requestsCap := uint64(4096) requestsMask := requestsCap - 1 - requests := make([]XmonRequest, requestsCap) + requests := make([]xmonRequest, requestsCap) requestsHead := uint64(0) requestsTail := uint64(0) + binnableAlerts := make(map[int64]struct{}) var conn *net.TCPConn defer func() { @@ -179,10 +177,31 @@ Reconnect: req := &requests[requestsHead&requestsMask] switch req.msgType { case XMON_CREATE: - log.Info("sending create alert, alertId=%v binnable=%v message=%v", req.alertId, req.binnable, req.message) + if req.binnable && len(binnableAlerts) > maxBinnableAlerts { + log.ErrorNoAlert("skipping create alert, alertId=%v binnable=%v message=%v, too many already", req.alertId, req.binnable, req.message) + // if we don't have the "too many alerts" alert, create it + if _, ok := binnableAlerts[tooManyAlertsAlertId]; !ok { + req = &xmonRequest{ + msgType: XMON_CREATE, + alertId: tooManyAlertsAlertId, + message: "too many alerts, alerts dropped", + binnable: true, + } + } else { + goto SkipRequest + } + } else { + log.Info("sending create alert, alertId=%v binnable=%v message=%v", req.alertId, req.binnable, req.message) + } case XMON_UPDATE: + if req.binnable { + panic(fmt.Errorf("unexpected update to non-binnable alert")) + } log.Info("sending update alert, alertId=%v binnable=%v message=%v", req.alertId, req.binnable, req.message) case XMON_CLEAR: + if req.binnable { + panic(fmt.Errorf("unexpected clear to non-binnable alert")) + } log.Info("sending clear alert, alertId=%v", req.alertId) default: panic(fmt.Errorf("bad req type %v", req.msgType)) @@ -193,12 +212,19 @@ Reconnect: // note that we haven't removed the request from the ring buffer yet goto Reconnect } + switch req.msgType { + case XMON_CREATE: + if req.binnable { + binnableAlerts[req.alertId] = struct{}{} + } + } + SkipRequest: requestsHead++ } } // read all responses, not waiting too long for { - if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 10)); err != nil { + if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)); err != nil { log.ErrorNoAlert("could not set deadline: %v", err) goto Reconnect } @@ -232,6 +258,7 @@ Reconnect: log.ErrorNoAlert("could not read alert id: %v", err) goto Reconnect } + delete(binnableAlerts, alertId) log.Info("UI cleared alert alertId=%v", alertId) default: panic(fmt.Errorf("bad message type %v", respType)) @@ -253,7 +280,7 @@ func NewXmon(log *Logger, config *XmonConfig) (*Xmon, error) { appType: "restech.info", appInstance: config.AppInstance + "@" + hostname, hostname: hostname, - requests: make(chan XmonRequest, 4096), + requests: make(chan xmonRequest, 4096), } if config.Prod { x.xmonAddr = "REDACTED" @@ -264,48 +291,69 @@ func NewXmon(log *Logger, config *XmonConfig) (*Xmon, error) { return x, nil } -func (x *Xmon) NewAlert() *XmonAlert { - return &XmonAlert{ +type XmonNCAlert struct { + alertId int64 // -1 if we haven't got one yet + lastMessage string +} + +func (x *Xmon) NewNCAlert() *XmonNCAlert { + return &XmonNCAlert{ alertId: -1, } } -var alertIdCount = int64(0) +const tooManyAlertsAlertId = int64(0) -func (a *XmonAlert) RaiseStack(log *Logger, xmon *Xmon, calldepth int, binnable bool, format string, v ...any) { +var alertIdCount = int64(1) + +func xmonRaiseStack(log *Logger, xmon *Xmon, calldepth int, alertId *int64, binnable bool, format string, v ...any) string { file, line := getFileLine(1 + calldepth) message := fmt.Sprintf("%s:%d "+format, append([]any{file, line}, v...)...) - if a.alertId < 0 { - a.alertId = atomic.AddInt64(&alertIdCount, 1) - log.LogStack(1, INFO, "creating alert alertId=%v binnable=%v message=%v", a.alertId, binnable, message) - xmon.requests <- XmonRequest{ + if *alertId < 0 { + *alertId = atomic.AddInt64(&alertIdCount, 1) + log.LogStack(1, INFO, "creating alert alertId=%v binnable=%v message=%v", *alertId, binnable, message) + xmon.requests <- xmonRequest{ msgType: XMON_CREATE, - alertId: a.alertId, + alertId: *alertId, binnable: binnable, message: message, } } else { - log.LogStack(1, INFO, "updating alert alertId=%v binnable=%v message=%v", a.alertId, binnable, message) - xmon.requests <- XmonRequest{ + log.LogStack(1, INFO, "updating alert alertId=%v binnable=%v message=%v", *alertId, binnable, message) + xmon.requests <- xmonRequest{ msgType: XMON_UPDATE, - alertId: a.alertId, + alertId: *alertId, binnable: binnable, message: message, } } - a.lastMessage = message + return message } -func (a *XmonAlert) Raise(log *Logger, xmon *Xmon, binnable bool, format string, v ...any) { - a.RaiseStack(log, xmon, 1, binnable, format, v...) +func (x *Xmon) RaiseStack(log *Logger, xmon *Xmon, calldepth int, format string, v ...any) { + alertId := int64(-1) + xmonRaiseStack(log, x, 1+calldepth, &alertId, true, format, v...) } -func (a *XmonAlert) Clear(log *Logger, xmon *Xmon) { +func (x *Xmon) Raise(log *Logger, xmon *Xmon, format string, v ...any) { + alertId := int64(-1) + xmonRaiseStack(log, x, 1, &alertId, true, format, v...) +} + +func (a *XmonNCAlert) RaiseStack(log *Logger, xmon *Xmon, calldepth int, format string, v ...any) { + a.lastMessage = xmonRaiseStack(log, xmon, 1+calldepth, &a.alertId, false, format, v...) +} + +func (a *XmonNCAlert) Raise(log *Logger, xmon *Xmon, format string, v ...any) { + a.lastMessage = xmonRaiseStack(log, xmon, 1, &a.alertId, false, format, v...) +} + +func (a *XmonNCAlert) Clear(log *Logger, xmon *Xmon) { if a.alertId < 0 { return } log.LogStack(1, INFO, "clearing alert alertId=%v lastMessage=%v", a.alertId, a.lastMessage) - xmon.requests <- XmonRequest{ + xmon.requests <- xmonRequest{ msgType: XMON_CLEAR, alertId: a.alertId, }