#include #include #include #include "Exception.hpp" #include "Xmon.hpp" #include "Connect.hpp" static std::string explicitGenerateErrString(const std::string& what, int err, const char* str) { std::stringstream ss; ss << "could not " << what << ": " << err << "/" << str; return ss.str(); } static std::string generateErrString(const std::string& what, int err) { return explicitGenerateErrString(what, err, (std::string(translateErrno(err)) + "=" + safe_strerror(err)).c_str()); } Xmon::Xmon( Logger& logger, std::shared_ptr& agent, const XmonConfig& config, std::atomic& stop ) : _env(logger, agent, "xmon"), _stop(stop), _agent(agent), _appType("restech.info"), _xmonHost(config.prod ? "REDACTED" : "REDACTED"), _xmonPort(5004) { if (config.appInstance.empty()) { throw EGGS_EXCEPTION("empty app name"); } { char buf[HOST_NAME_MAX]; int res = gethostname(buf, HOST_NAME_MAX); if (res != 0) { throw SYSCALL_EXCEPTION("gethostname"); } _hostname = buf; _hostname = _hostname.substr(0, _hostname.find(".")); } _appInstance = config.appInstance + "@" + _hostname; } void Xmon::packLogon(XmonBuf& buf) { // // Name Type Description // Magic int16 always 'T' // Version int32 version number (latest version is 4) // Message type int32 0x0 // (unused) int32 must be 0 // Hostname string your hostname // Heartbeat interval byte in seconds, 0 for default (5s)) // (unused) byte must be 0 // (unused) int16 must be 0 // App type string defines rota and schedule // App inst string arbitrary identifier // Heap size int64 (deprecated) // Num threads int32 (deprecated) // Num errors int32 (deprecated) // Mood int32 current mood // Details JSON string (unused) buf.reset(); buf.packScalar('T'); // magic buf.packScalar(4); // version buf.packScalar(0x0); // message type buf.packScalar(0x0); // unused buf.packString(_hostname); // hostname buf.packScalar(0); // default interval buf.packScalar(0); // unused buf.packScalar(0); // unused buf.packString(_appType); // app type buf.packString(_appInstance); // app instance buf.packScalar(0); // heap size buf.packScalar(0); // num threads buf.packScalar(0); // num errors buf.packScalar(XmonMood::Happy); // mood std::string details; buf.packString(details); // details } void Xmon::packUpdate(XmonBuf& buf) { buf.reset(); // Message type int32 0x1 // (unused) int32 must be 0 // Heap size int64 (deprecated) // Num threads int32 (deprecated) // Num errors int32 (deprecated) // Mood int32 updated mood buf.packScalar(0x1); buf.packScalar(0); buf.packScalar(0); buf.packScalar(0); buf.packScalar(0); buf.packScalar(XmonMood::Happy); } void Xmon::packRequest(XmonBuf& buf, const XmonRequest& req) { buf.reset(); buf.packScalar(req.msgType); buf.packScalar(req.alertId); if (req.msgType == XmonRequestType::CREATE || req.msgType == XmonRequestType::UPDATE) { buf.packScalar(req.binnable); buf.packString(req.message); } } // Automatically keeps going on EAGAIN std::string XmonBuf::writeOut(int fd) { const char* curr = buf + cur; const char* end = buf + len; while (curr < end) { ssize_t written = write(fd, curr, end-curr); if (written < 0) { return generateErrString("writing out xmon buf", errno); } if (written == 0) { return "unexpected EOF"; } curr += written; } return {}; } bool XmonBuf::readIn(int fd, size_t sz, std::string& errString) { reset(); ensureUnpackSizeOrPanic(sz); size_t readSoFar = 0; while (readSoFar < sz) { ssize_t r = read(fd, buf+cur+readSoFar, sz-readSoFar); if (r < 0 && errno == EAGAIN && readSoFar == 0) { return false; } if (r < 0) { errString = generateErrString("read in xmon buf", errno); return false; } if (r == 0) { errString = "unexpected EOF"; return false; } readSoFar += r; } len += sz; return readSoFar; } void Xmon::run() { int numFailures = 0; int maxFailures = 3; XmonBuf buf; int sock = -1; std::vector requests; std::string errString; #define CHECK_ERR_STRING(__what) \ if (errString.size()) { \ if (numFailures >= maxFailures) { \ throw EGGS_EXCEPTION("could not %s: %s", __what, errString); \ } \ LOG_INFO(_env, "could not %s: %s, will reconnect", __what, errString); \ numFailures++; \ sleepFor(1_sec); \ goto reconnect; \ } reconnect: errString.clear(); sock = connectToHost(_xmonHost, _xmonPort, 3, errString); CHECK_ERR_STRING(errString); LOG_INFO(_env, "connected to xmon %s:%s", _xmonHost, _xmonPort); // We've got a socket, reset conn failures numFailures = 0; // 10ms timeout for prompt termination/processing { struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 10'000; if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,&tv,sizeof(tv)) < 0) { throw SYSCALL_EXCEPTION("setsockopt"); } } // Send logon message packLogon(buf); errString = buf.writeOut(sock); CHECK_ERR_STRING(errString); LOG_INFO(_env, "sent logon to xmon, appType=%s appInstance=%s", _appType, _appInstance); // Start recv loop bool gotHeartbeat = false; for (;;) { bool shutDown = _stop.load(); // send all requests if (gotHeartbeat) { _agent->getRequests(requests); for (const auto& req: requests) { if (req.msgType == XmonRequestType::CREATE) { LOG_DEBUG(_env, "creating alert, aid=%s binnable=%s message=%s", req.alertId, req.binnable, req.message); } else if (req.msgType == XmonRequestType::UPDATE) { LOG_DEBUG(_env, "updating alert, aid=%s binnable=%s message=%s", req.alertId, req.binnable, req.message); } else if (req.msgType == XmonRequestType::CLEAR) { LOG_DEBUG(_env, "clearing alert, aid=%s", req.alertId); } else { ALWAYS_ASSERT(false, "bad req type %s", (int)req.msgType); } packRequest(buf, req); errString = buf.writeOut(sock); CHECK_ERR_STRING(errString); LOG_DEBUG(_env, "sent request to xmon"); } requests.clear(); } // Check if we're done if (shutDown) { LOG_DEBUG(_env, "got told to stop, stopping"); close(sock); return; } errString.clear(); bool success = buf.readIn(sock, 4, errString); CHECK_ERR_STRING("read message type"); if (!success) { continue; } int32_t msgType = buf.unpackScalar(); switch (msgType) { case 0x0: { LOG_DEBUG(_env, "got xmon heartbeat"); gotHeartbeat = true; packUpdate(buf); errString = buf.writeOut(sock); CHECK_ERR_STRING("send heartbeat"); break; } case 0x1: { LOG_DEBUG(_env, "got alert binned from UI, ignoring"); bool success = false; do { errString.clear(); success = buf.readIn(sock, 4, errString); CHECK_ERR_STRING("reading alert binned id"); } while (!success); break; } default: throw EGGS_EXCEPTION("unknown message type %s", msgType); } } }