mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2025-12-18 01:05:38 -06:00
Kill all references to internal services
This commit is contained in:
@@ -29,8 +29,6 @@ Finally, TernFS can be replicated to multiple data centres to make it resilient
|
||||
|
||||
## Components
|
||||
|
||||

|
||||
|
||||
TODO decorate list below with links drilling down on specific concepts.
|
||||
|
||||
* **servers**
|
||||
|
||||
4
ci.py
4
ci.py
@@ -31,7 +31,7 @@ if args.functional:
|
||||
bold_print('functional tests')
|
||||
if args.docker:
|
||||
bold_print('starting functional tests in docker')
|
||||
container = 'REDACTED'
|
||||
container = 'ghcr.io/xtxmarkets/ternfs-ubuntu-build:2025-09-03'
|
||||
# See <https://groups.google.com/g/seastar-dev/c/r7W-Kqzy9O4>
|
||||
# for motivation for `--security-opt seccomp=unconfined`,
|
||||
# the `--pids-limit -1` is not something I hit but it seems
|
||||
@@ -53,7 +53,7 @@ if args.functional:
|
||||
if args.integration:
|
||||
if args.docker:
|
||||
bold_print('starting integration tests in docker')
|
||||
container = 'REDACTED'
|
||||
container = 'ghcr.io/xtxmarkets/ternfs-ubuntu-build:2025-09-03'
|
||||
# See <https://groups.google.com/g/seastar-dev/c/r7W-Kqzy9O4>
|
||||
# for motivation for `--security-opt seccomp=unconfined`,
|
||||
# the `--pids-limit -1` is not something I hit but it seems
|
||||
|
||||
17
cpp/Dockerfile-alpine
Normal file
17
cpp/Dockerfile-alpine
Normal file
@@ -0,0 +1,17 @@
|
||||
# The image we use to build the static "alpine" binaries
|
||||
# that we deploy.
|
||||
# We are staying on 3.18 and patching in go1.22 instead of moving to 3.20
|
||||
# due to issues with compiling the go-sqlite3 lib on 3.20.
|
||||
# We intend to drop the alpine build entirely, see https://internal-repo/issues/336.
|
||||
FROM alpine:3.18
|
||||
RUN set -eu
|
||||
RUN apk update
|
||||
RUN apk add ca-certificates
|
||||
RUN /usr/sbin/update-ca-certificates
|
||||
RUN apk add --no-cache bash perl coreutils python3 musl gcc g++ clang lld make cmake ninja mandoc linux-headers patch wget
|
||||
# Explicitly install go outside of apk since the default version is 1.18
|
||||
RUN wget https://go.dev/dl/go1.22.3.linux-amd64.tar.gz
|
||||
RUN echo 8920ea521bad8f6b7bc377b4824982e011c19af27df88a815e3586ea895f1b36 go1.22.3.linux-amd64.tar.gz | sha256sum --check
|
||||
RUN tar -C /usr/local -xzf go1.22.3.linux-amd64.tar.gz; rm go1.22.3.linux-amd64.tar.gz
|
||||
ENV PATH="${PATH}:/usr/local/go/bin"
|
||||
ENV IN_TERN_BUILD_CONTAINER Y
|
||||
14
cpp/Dockerfile-ubuntu
Normal file
14
cpp/Dockerfile-ubuntu
Normal file
@@ -0,0 +1,14 @@
|
||||
FROM ubuntu:22.04
|
||||
RUN set -eu
|
||||
# See <https://stackoverflow.com/questions/71941032/why-i-cannot-run-apt-update-inside-a-fresh-ubuntu22-04>
|
||||
RUN rm -f /etc/apt/apt.conf.d/docker-clean
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y python3 gcc g++ clang lld make cmake ninja-build mandoc build-essential git fuse valgrind llvm ca-certificates wget
|
||||
RUN apt-get clean
|
||||
# Explicitly install go outside of apt since the default version is 1.18
|
||||
RUN /usr/sbin/update-ca-certificates
|
||||
RUN wget https://go.dev/dl/go1.22.3.linux-amd64.tar.gz
|
||||
RUN echo 8920ea521bad8f6b7bc377b4824982e011c19af27df88a815e3586ea895f1b36 go1.22.3.linux-amd64.tar.gz | sha256sum --check
|
||||
RUN tar -C /usr/local -xzf go1.22.3.linux-amd64.tar.gz; rm go1.22.3.linux-amd64.tar.gz
|
||||
ENV PATH="${PATH}:/usr/local/go/bin"
|
||||
ENV IN_TERN_BUILD_CONTAINER Y
|
||||
@@ -19,17 +19,17 @@ repo_dir = cpp_dir.parent
|
||||
build_dir = cpp_dir / 'build' / build_type
|
||||
build_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if build_type in ('ubuntu', 'ubuntudebug', 'ubuntusanitized', 'ubuntuvalgrind', 'alpine', 'alpinedebug') and 'IN_EGGS_BUILD_CONTAINER' not in os.environ:
|
||||
if build_type in ('ubuntu', 'ubuntudebug', 'ubuntusanitized', 'ubuntuvalgrind', 'alpine', 'alpinedebug') and 'IN_TERN_BUILD_CONTAINER' not in os.environ:
|
||||
if build_type.startswith('alpine'):
|
||||
container = 'REDACTED'
|
||||
container = 'ghcr.io/xtxmarkets/ternfs-alpine-build:2025-09-03'
|
||||
else:
|
||||
container = 'REDACTED'
|
||||
container = 'ghcr.io/xtxmarkets/ternfs-ubuntu-build:2025-09-03'
|
||||
# See <https://groups.google.com/g/seastar-dev/c/r7W-Kqzy9O4>
|
||||
# for motivation for `--security-opt seccomp=unconfined`,
|
||||
# the `--pids-limit -1` is not something I hit but it seems
|
||||
# like a good idea.
|
||||
subprocess.run(
|
||||
['docker', 'run', '--network', 'host', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--rm', '-i', '--mount', f'type=bind,src={repo_dir},dst=/eggsfs', '-u', f'{os.getuid()}:{os.getgid()}', container, '/eggsfs/cpp/build.py', build_type] + sys.argv[2:],
|
||||
['docker', 'run', '--network', 'host', '-e', 'http_proxy', '-e', 'https_proxy', '-e', 'no_proxy', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--rm', '-i', '--mount', f'type=bind,src={repo_dir},dst=/ternfs', '-u', f'{os.getuid()}:{os.getgid()}', container, '/ternfs/cpp/build.py', build_type] + sys.argv[2:],
|
||||
check=True,
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -1066,6 +1066,7 @@ static void logsDBstatsToMetrics(struct MetricsBuilder& metricsBuilder, const Lo
|
||||
|
||||
struct CDCMetricsInserter : PeriodicLoop {
|
||||
private:
|
||||
InfluxDB _influxDB;
|
||||
CDCShared& _shared;
|
||||
ReplicaId _replicaId;
|
||||
XmonNCAlert _sendMetricsAlert;
|
||||
@@ -1073,8 +1074,9 @@ private:
|
||||
std::unordered_map<std::string, uint64_t> _rocksDBStats;
|
||||
XmonNCAlert _updateSizeAlert;
|
||||
public:
|
||||
CDCMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCShared& shared, ReplicaId replicaId):
|
||||
CDCMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const InfluxDB& influxDB, CDCShared& shared, ReplicaId replicaId):
|
||||
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
|
||||
_influxDB(influxDB),
|
||||
_shared(shared),
|
||||
_replicaId(replicaId),
|
||||
_sendMetricsAlert(XmonAppType::DAYTIME, 1_mins),
|
||||
@@ -1143,7 +1145,7 @@ public:
|
||||
}
|
||||
}
|
||||
logsDBstatsToMetrics(_metricsBuilder, _shared.logsDB.getStats(), _replicaId, now);
|
||||
std::string err = sendMetrics(10_sec, _metricsBuilder.payload());
|
||||
std::string err = sendMetrics(_influxDB, 10_sec, _metricsBuilder.payload());
|
||||
_metricsBuilder.reset();
|
||||
if (err.empty()) {
|
||||
LOG_INFO(_env, "Sent metrics to influxdb");
|
||||
@@ -1168,7 +1170,7 @@ void runCDC(CDCOptions& options) {
|
||||
Logger logger(options.logLevel, logOutFd, options.syslog, true);
|
||||
|
||||
std::shared_ptr<XmonAgent> xmon;
|
||||
if (options.xmon) {
|
||||
if (!options.xmonAddr.empty()) {
|
||||
xmon = std::make_shared<XmonAgent>();
|
||||
}
|
||||
|
||||
@@ -1197,7 +1199,7 @@ void runCDC(CDCOptions& options) {
|
||||
config.appInstance = ss.str();
|
||||
}
|
||||
config.appType = XmonAppType::CRITICAL;
|
||||
config.prod = options.xmonProd;
|
||||
config.addr = options.xmonAddr;
|
||||
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, config)));
|
||||
}
|
||||
@@ -1227,8 +1229,8 @@ void runCDC(CDCOptions& options) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCShardUpdater>(logger, xmon, options, shared)));
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCServer>(logger, xmon, options, shared)));
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCRegisterer>(logger, xmon, options, shared)));
|
||||
if (options.metrics) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, shared, options.replicaId)));
|
||||
if (options.influxDB) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, *options.influxDB, shared, options.replicaId)));
|
||||
}
|
||||
|
||||
LoopThread::waitUntilStopped(threads);
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
|
||||
#include "Env.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "Shard.hpp"
|
||||
#include "Time.hpp"
|
||||
#include <cstdint>
|
||||
#include "Metrics.hpp"
|
||||
|
||||
struct CDCOptions {
|
||||
LogLevel logLevel = LogLevel::LOG_INFO;
|
||||
@@ -17,9 +20,8 @@ struct CDCOptions {
|
||||
AddrsInfo cdcToShardAddress = {};
|
||||
bool syslog = false;
|
||||
Duration shardTimeout = 100_ms;
|
||||
bool xmon = false;
|
||||
bool xmonProd = false;
|
||||
bool metrics = false;
|
||||
std::string xmonAddr;
|
||||
std::optional<InfluxDB> influxDB;
|
||||
ReplicaId replicaId;
|
||||
uint8_t location;
|
||||
|
||||
|
||||
@@ -23,9 +23,11 @@ void usage(const char* binary) {
|
||||
fprintf(stderr, " If not provided, stdout.\n");
|
||||
fprintf(stderr, " -shard-timeout-ms milliseconds\n");
|
||||
fprintf(stderr, " How much to wait for shard responses. Right now this is a simple loop.\n");
|
||||
fprintf(stderr, " -xmon qa|prod\n");
|
||||
fprintf(stderr, " -xmon host:port\n");
|
||||
fprintf(stderr, " Enable Xmon alerts.\n");
|
||||
fprintf(stderr, " -metrics\n");
|
||||
fprintf(stderr, " -influx-db-origin\n");
|
||||
fprintf(stderr, " -influx-db-org\n");
|
||||
fprintf(stderr, " -influx-db-bucket\n");
|
||||
fprintf(stderr, " Enable metrics.\n");
|
||||
fprintf(stderr, " -logsdb-leader\n");
|
||||
fprintf(stderr, " Allow replica to become leader. Default is false\n");
|
||||
@@ -102,6 +104,9 @@ int main(int argc, char** argv) {
|
||||
CDCOptions options;
|
||||
std::vector<std::string> args;
|
||||
std::string shuckleAddress;
|
||||
std::string influxDBOrigin;
|
||||
std::string influxDBOrg;
|
||||
std::string influxDBBucket;
|
||||
uint8_t numAddressesFound = 0;
|
||||
for (int i = 1; i < argc; i++) {
|
||||
const auto getNextArg = [argc, &argv, &dieWithUsage, &i]() {
|
||||
@@ -156,18 +161,13 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
options.shardTimeout = Duration(ms * 1'000'000);
|
||||
} else if (arg == "-xmon") {
|
||||
options.xmon = true;
|
||||
std::string xmonEnv = getNextArg();
|
||||
if (xmonEnv == "qa") {
|
||||
options.xmonProd = false;
|
||||
} else if (xmonEnv == "prod") {
|
||||
options.xmonProd = true;
|
||||
} else {
|
||||
fprintf(stderr, "Invalid xmon env %s", xmonEnv.c_str());
|
||||
dieWithUsage();
|
||||
}
|
||||
} else if (arg == "-metrics") {
|
||||
options.metrics = true;
|
||||
options.xmonAddr = getNextArg();
|
||||
} else if (arg == "-influx-db-origin") {
|
||||
influxDBOrigin = getNextArg();
|
||||
} else if (arg == "-influx-db-org") {
|
||||
influxDBOrg = getNextArg();
|
||||
} else if (arg == "-influx-db-bucket") {
|
||||
influxDBBucket = getNextArg();
|
||||
} else if (arg == "-logsdb-leader") {
|
||||
options.avoidBeingLeader = false;
|
||||
} else if (arg == "-logsdb-no-replication") {
|
||||
@@ -177,6 +177,18 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
}
|
||||
|
||||
if (influxDBOrigin.empty() != influxDBOrg.empty() || influxDBOrigin.empty() != influxDBBucket.empty()) {
|
||||
fprintf(stderr, "Either all or none of the -influx-db flags must be provided\n");
|
||||
dieWithUsage();
|
||||
}
|
||||
if (influxDBOrigin.empty()) {
|
||||
options.influxDB = InfluxDB{
|
||||
.origin = influxDBOrigin,
|
||||
.org = influxDBOrg,
|
||||
.bucket = influxDBBucket,
|
||||
};
|
||||
}
|
||||
|
||||
if (args.size() < 2 || args.size() > 3) {
|
||||
fprintf(stderr, "Expecting two or three positional argument (DIRECTORY REPLICA_ID [LOCATION_ID]), got %ld.\n", args.size());
|
||||
dieWithUsage();
|
||||
|
||||
@@ -6,30 +6,6 @@
|
||||
#define CPPHTTPLIB_USE_POLL
|
||||
#include "httplib.h"
|
||||
|
||||
#if 0
|
||||
|
||||
curl -vvv --request POST \
|
||||
"http://REDACTED?org=restech&bucket=metrics&precision=ns" \
|
||||
--header "Content-Type: text/plain; charset=utf-8" \
|
||||
--header "Accept: application/json" \
|
||||
--data-binary 'eggsfs_fmazzol_test,sensor_id=TLM0201 temperature=73.97038159354763,humidity=35.23103248356096,co=0.48445310567793615 1690998953988422912'
|
||||
|
||||
curl -G 'http://REDACTED?pretty=true' --data-urlencode "db=metrics" --data-urlencode "q=SELECT \"value\" FROM \"disk_ecn_hostmon_disk_write_bytes\""
|
||||
|
||||
var TeamRestech = &Team{
|
||||
Name: "restech",
|
||||
Domain: "REDACTED",
|
||||
ManagementDomain: "REDACTED",
|
||||
XmonRota: "restech",
|
||||
BootServerURLs: []string{"https://REDACTED"},
|
||||
BuildServerURL: "https://REDACTED",
|
||||
MetricsPrefix: "restech",
|
||||
InfluxOrg: "restech",
|
||||
InfluxDBURL: "http://REDACTED",
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
// We're being overly conservative here, see
|
||||
// <https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/#special-characters>,
|
||||
// but better being safe than sorry.
|
||||
@@ -88,23 +64,19 @@ void MetricsBuilder::timestamp(TernTime t) {
|
||||
_state = State::TIMESTAMP;
|
||||
}
|
||||
|
||||
const std::string influxDBUrl = "http://REDACTED";
|
||||
const std::string influxDBOrg = "restech";
|
||||
const std::string influxDBBucket = "metrics";
|
||||
|
||||
std::string sendMetrics(Duration timeout, const std::string& payload) {
|
||||
httplib::Client cli(influxDBUrl);
|
||||
std::string sendMetrics(const InfluxDB& idb, Duration timeout, const std::string& payload) {
|
||||
httplib::Client cli(idb.origin);
|
||||
time_t ds = timeout.ns / 1'000'000'000ull;
|
||||
time_t dus = (timeout.ns % 1'000'000'000ull) / 1'000ull;
|
||||
cli.set_connection_timeout(ds, dus);
|
||||
cli.set_read_timeout(ds, dus);
|
||||
cli.set_write_timeout(ds, dus);
|
||||
|
||||
std::string path = "/api/v2/write?org=" + influxDBOrg + "&bucket=" + influxDBBucket + "&precision=ns";
|
||||
std::string path = "/api/v2/write?org=" + idb.org + "&bucket=" + idb.bucket + "&precision=ns";
|
||||
const auto res = cli.Post(path, payload, "text/plain");
|
||||
if (res.error() != httplib::Error::Success) {
|
||||
std::ostringstream ss;
|
||||
ss << "Could not insert metrics to " << influxDBUrl << path << ": " << res.error();
|
||||
ss << "Could not insert metrics to " << idb.origin << path << ": " << res.error();
|
||||
return ss.str();
|
||||
}
|
||||
return {};
|
||||
|
||||
@@ -54,5 +54,11 @@ public:
|
||||
void timestamp(TernTime t);
|
||||
};
|
||||
|
||||
struct InfluxDB {
|
||||
std::string origin;
|
||||
std::string org;
|
||||
std::string bucket;
|
||||
};
|
||||
|
||||
// error string on error
|
||||
std::string sendMetrics(Duration timeout, const std::string& payload);
|
||||
std::string sendMetrics(const InfluxDB& idb, Duration timeout, const std::string& payload);
|
||||
@@ -6,6 +6,7 @@
|
||||
#include <sys/socket.h>
|
||||
#include <unordered_set>
|
||||
#include <sys/timerfd.h>
|
||||
#include <charconv>
|
||||
|
||||
#include "Common.hpp"
|
||||
#include "Env.hpp"
|
||||
@@ -55,13 +56,36 @@ Xmon::Xmon(
|
||||
Loop(logger, agent, "xmon"),
|
||||
_agent(agent),
|
||||
_appInstance(config.appInstance),
|
||||
_parent(config.appType),
|
||||
_xmonHost(config.prod ? "REDACTED" : "REDACTED"),
|
||||
_xmonPort(5004)
|
||||
_parent(config.appType)
|
||||
{
|
||||
if (_appInstance.empty()) {
|
||||
throw TERN_EXCEPTION("empty app name");
|
||||
}
|
||||
|
||||
{
|
||||
size_t colon = config.addr.find_last_of(':');
|
||||
if (colon == std::string_view::npos || colon == 0 || colon == config.addr.size()-1) {
|
||||
throw TERN_EXCEPTION("invalid xmon addr %s", config.addr);
|
||||
}
|
||||
|
||||
std::string host = config.addr.substr(0, colon);
|
||||
std::string portString = config.addr.substr(colon + 1);
|
||||
|
||||
uint64_t port64;
|
||||
auto [ptr, ec] = std::from_chars(portString.data(), portString.data() + portString.size(), port64);
|
||||
|
||||
if (ec != std::errc() || ptr != portString.data() + portString.size()) {
|
||||
throw TERN_EXCEPTION("invalid xmon addr %s", config.addr);
|
||||
}
|
||||
|
||||
if (port64 > ~(uint16_t)0) {
|
||||
throw TERN_EXCEPTION("invalid port %s", port64);
|
||||
}
|
||||
|
||||
_xmonHost = host;
|
||||
_xmonPort = port64;
|
||||
}
|
||||
|
||||
{
|
||||
char buf[HOST_NAME_MAX];
|
||||
int res = gethostname(buf, HOST_NAME_MAX);
|
||||
@@ -136,7 +160,6 @@ const uint8_t HEARTBEAT_INTERVAL_SECS = HEARTBEAT_INTERVAL.ns/1'000'000'000ull;
|
||||
void Xmon::_packLogon(XmonBuf& buf) {
|
||||
buf.reset();
|
||||
|
||||
// <https://REDACTED>
|
||||
// Name Type Description
|
||||
// Magic int16 always 'T'
|
||||
// Version int32 version number (latest version is 4)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#include "Loop.hpp"
|
||||
|
||||
struct XmonConfig {
|
||||
bool prod = false;
|
||||
std::string addr;
|
||||
XmonAppType appType = XmonAppType::NEVER;
|
||||
std::string appInstance = "";
|
||||
};
|
||||
|
||||
@@ -2000,6 +2000,7 @@ static void logsDBstatsToMetrics(struct MetricsBuilder& metricsBuilder, const Lo
|
||||
|
||||
struct ShardMetricsInserter : PeriodicLoop {
|
||||
private:
|
||||
InfluxDB _influxDB;
|
||||
ShardShared& _shared;
|
||||
ShardReplicaId _shrid;
|
||||
uint8_t _location;
|
||||
@@ -2009,8 +2010,9 @@ private:
|
||||
std::array<XmonNCAlert, 2> _sockQueueAlerts;
|
||||
XmonNCAlert _writeQueueAlert;
|
||||
public:
|
||||
ShardMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared):
|
||||
ShardMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const InfluxDB& influxDB, ShardShared& shared):
|
||||
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
|
||||
_influxDB(influxDB),
|
||||
_shared(shared),
|
||||
_shrid(_shared.options.shrid),
|
||||
_location(_shared.options.location),
|
||||
@@ -2103,7 +2105,7 @@ public:
|
||||
}
|
||||
}
|
||||
logsDBstatsToMetrics(_metricsBuilder, _shared.logsDB.getStats(), _shrid, _location, now);
|
||||
std::string err = sendMetrics(10_sec, _metricsBuilder.payload());
|
||||
std::string err = sendMetrics(_influxDB, 10_sec, _metricsBuilder.payload());
|
||||
_metricsBuilder.reset();
|
||||
if (err.empty()) {
|
||||
LOG_INFO(_env, "Sent metrics to influxdb");
|
||||
@@ -2127,7 +2129,7 @@ void runShard(ShardOptions& options) {
|
||||
Logger logger(options.logLevel, logOutFd, options.syslog, true);
|
||||
|
||||
std::shared_ptr<XmonAgent> xmon;
|
||||
if (options.xmon) {
|
||||
if (!options.xmonAddr.empty()) {
|
||||
xmon = std::make_shared<XmonAgent>();
|
||||
}
|
||||
|
||||
@@ -2158,7 +2160,7 @@ void runShard(ShardOptions& options) {
|
||||
ss << "eggsshard_" << std::setfill('0') << std::setw(3) << options.shrid.shardId() << "_" << options.shrid.replicaId();
|
||||
config.appInstance = ss.str();
|
||||
}
|
||||
config.prod = options.xmonProd;
|
||||
config.addr = options.xmonAddr;
|
||||
config.appType = XmonAppType::CRITICAL;
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, config)));
|
||||
}
|
||||
@@ -2197,8 +2199,8 @@ void runShard(ShardOptions& options) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardWriter>(logger, xmon, shared)));
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardRegisterer>(logger, xmon, shared)));
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardBlockServiceUpdater>(logger, xmon, shared)));
|
||||
if (options.metrics) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardMetricsInserter>(logger, xmon, shared)));
|
||||
if (options.influxDB) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardMetricsInserter>(logger, xmon, *options.influxDB, shared)));
|
||||
}
|
||||
|
||||
// from this point on termination on SIGINT/SIGTERM will be graceful
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
|
||||
#include "Env.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "MsgsGen.hpp"
|
||||
#include "ShardDB.hpp"
|
||||
#include <cstdint>
|
||||
#include "Metrics.hpp"
|
||||
|
||||
struct ShardOptions {
|
||||
ShardReplicaId shrid;
|
||||
@@ -22,9 +25,8 @@ struct ShardOptions {
|
||||
// resilience of the system.
|
||||
double simulateOutgoingPacketDrop = 0.0;
|
||||
bool syslog = false;
|
||||
bool xmon = false;
|
||||
bool xmonProd = false;
|
||||
bool metrics = false;
|
||||
std::string xmonAddr;
|
||||
std::optional<InfluxDB> influxDB;
|
||||
Duration transientDeadlineInterval = DEFAULT_DEADLINE_INTERVAL;
|
||||
|
||||
// LogsDB settings
|
||||
|
||||
@@ -30,9 +30,11 @@ static void usage(const char* binary) {
|
||||
fprintf(stderr, " If not provided, stdout.\n");
|
||||
fprintf(stderr, " -outgoing-packet-drop [0, 1)\n");
|
||||
fprintf(stderr, " Drop given ratio of packets after processing them.\n");
|
||||
fprintf(stderr, " -xmon qa|prod\n");
|
||||
fprintf(stderr, " -xmon host:port\n");
|
||||
fprintf(stderr, " Enable Xmon alerts.\n");
|
||||
fprintf(stderr, " -metrics\n");
|
||||
fprintf(stderr, " -influx-db-origin\n");
|
||||
fprintf(stderr, " -influx-db-org\n");
|
||||
fprintf(stderr, " -influx-db-bucket\n");
|
||||
fprintf(stderr, " Enable metrics.\n");
|
||||
fprintf(stderr, " -transient-deadline-interval\n");
|
||||
fprintf(stderr, " Tweaks the interval with wich the deadline for transient file gets bumped.\n");
|
||||
@@ -144,6 +146,9 @@ int main(int argc, char** argv) {
|
||||
ShardOptions options;
|
||||
std::vector<std::string> args;
|
||||
std::string shuckleAddress;
|
||||
std::string influxDBOrigin;
|
||||
std::string influxDBOrg;
|
||||
std::string influxDBBucket;
|
||||
uint8_t numAddressesFound = 0;
|
||||
for (int i = 1; i < argc; i++) {
|
||||
const auto getNextArg = [argc, &argv, &dieWithUsage, &i]() {
|
||||
@@ -189,18 +194,13 @@ int main(int argc, char** argv) {
|
||||
} else if (arg == "-syslog") {
|
||||
options.syslog = true;
|
||||
} else if (arg == "-xmon") {
|
||||
options.xmon = true;
|
||||
std::string xmonEnv = getNextArg();
|
||||
if (xmonEnv == "qa") {
|
||||
options.xmonProd = false;
|
||||
} else if (xmonEnv == "prod") {
|
||||
options.xmonProd = true;
|
||||
} else {
|
||||
fprintf(stderr, "Invalid xmon env %s", xmonEnv.c_str());
|
||||
dieWithUsage();
|
||||
}
|
||||
} else if (arg == "-metrics") {
|
||||
options.metrics = true;
|
||||
options.xmonAddr = getNextArg();
|
||||
} else if (arg == "-influx-db-origin") {
|
||||
influxDBOrigin = getNextArg();
|
||||
} else if (arg == "-influx-db-org") {
|
||||
influxDBOrg = getNextArg();
|
||||
} else if (arg == "-influx-db-bucket") {
|
||||
influxDBBucket = getNextArg();
|
||||
} else if (arg == "-transient-deadline-interval") {
|
||||
options.transientDeadlineInterval = parseDuration(getNextArg());
|
||||
} else if (arg == "-logsdb-leader") {
|
||||
@@ -212,6 +212,18 @@ int main(int argc, char** argv) {
|
||||
}
|
||||
}
|
||||
|
||||
if (influxDBOrigin.empty() != influxDBOrg.empty() || influxDBOrigin.empty() != influxDBBucket.empty()) {
|
||||
fprintf(stderr, "Either all or none of the -influx-db flags must be provided\n");
|
||||
dieWithUsage();
|
||||
}
|
||||
if (influxDBOrigin.empty()) {
|
||||
options.influxDB = InfluxDB{
|
||||
.origin = influxDBOrigin,
|
||||
.org = influxDBOrg,
|
||||
.bucket = influxDBBucket,
|
||||
};
|
||||
}
|
||||
|
||||
if (options.noReplication && options.avoidBeingLeader) {
|
||||
fprintf(stderr, "-logsdb-leader needs to be set if -logsdb-no-replication is set\n");
|
||||
dieWithUsage();
|
||||
|
||||
10
go/build.py
10
go/build.py
@@ -27,21 +27,21 @@ if not args.generate and len(paths) == 0:
|
||||
if os.path.isdir(os.path.join(str(go_dir), path)):
|
||||
paths.append(path)
|
||||
|
||||
if 'IN_EGGS_BUILD_CONTAINER' not in os.environ:
|
||||
container = 'REDACTED'
|
||||
if 'IN_TERN_BUILD_CONTAINER' not in os.environ:
|
||||
container = 'ghcr.io/xtxmarkets/ternfs-alpine-build:2025-09-03'
|
||||
# See <https://groups.google.com/g/seastar-dev/c/r7W-Kqzy9O4>
|
||||
# for motivation for `--security-opt seccomp=unconfined`,
|
||||
# the `--pids-limit -1` is not something I hit but it seems
|
||||
# like a good idea.
|
||||
subprocess.run(
|
||||
['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--rm', '-i', '--mount', f'type=bind,src={repo_dir},dst=/eggsfs', '-u', f'{os.getuid()}:{os.getgid()}', container, '/eggsfs/go/build.py'] + sys.argv[1:],
|
||||
['docker', 'run', '--pids-limit', '-1', '--security-opt', 'seccomp=unconfined', '--rm', '-i', '--mount', f'type=bind,src={repo_dir},dst=/ternfs', '-u', f'{os.getuid()}:{os.getgid()}', container, '/ternfs/go/build.py'] + sys.argv[1:],
|
||||
check=True,
|
||||
)
|
||||
else:
|
||||
# Otherwise go will try to create the cache in /.cache, which won't work
|
||||
# since we're not running as root.
|
||||
os.environ['GOCACHE'] = '/eggsfs/.cache'
|
||||
os.environ['GOMODCACHE'] = '/eggsfs/.go-cache'
|
||||
os.environ['GOCACHE'] = '/ternfs/.cache'
|
||||
os.environ['GOMODCACHE'] = '/ternfs/.go-cache'
|
||||
if args.generate:
|
||||
subprocess.run(['go', 'generate', './...'], cwd=go_dir, check=True)
|
||||
else:
|
||||
|
||||
@@ -52,7 +52,7 @@ type LoggerOptions struct {
|
||||
Level LogLevel
|
||||
Syslog bool
|
||||
AppInstance string
|
||||
Xmon string // "dev", "qa", empty string for no xmon
|
||||
XmonAddr string // address to xmon endpoint, empty string for no xmon
|
||||
HardwareEventServerURL string // URL of the server you want to send hardware events OR empty for no logging
|
||||
AppType XmonAppType // only used for xmon
|
||||
PrintQuietAlerts bool // whether to print alerts in quiet period
|
||||
@@ -153,14 +153,11 @@ func NewLogger(
|
||||
xmonConfig := XmonConfig{
|
||||
PrintQuietAlerts: options.PrintQuietAlerts,
|
||||
}
|
||||
if options.Xmon != "" {
|
||||
if options.Xmon != "prod" && options.Xmon != "qa" {
|
||||
panic(fmt.Errorf("invalid xmon environment %q", options.Xmon))
|
||||
}
|
||||
if options.XmonAddr != "" {
|
||||
if options.AppInstance == "" {
|
||||
panic(fmt.Errorf("empty app instance"))
|
||||
}
|
||||
xmonConfig.Prod = options.Xmon == "prod"
|
||||
xmonConfig.Addr = options.XmonAddr
|
||||
xmonConfig.AppInstance = options.AppInstance
|
||||
xmonConfig.AppType = options.AppType
|
||||
} else {
|
||||
@@ -285,9 +282,9 @@ type blockServiceErrorMessage struct {
|
||||
|
||||
// Asynchronously sends a hardware event to the server or simply logs
|
||||
// if hardware event logging is not enabled.
|
||||
func (l *Logger) RaiseHardwareEvent(failureDomain string, blockServiceID string, msg string) {
|
||||
func (l *Logger) RaiseHardwareEvent(hostname string, blockServiceID string, msg string) {
|
||||
if l.heClient == nil {
|
||||
l.Log(syslogError, "Hardware event on %s for block service %s: %s", failureDomain, blockServiceID, msg)
|
||||
l.Log(syslogError, "Hardware event on %s for block service %s: %s", hostname, blockServiceID, msg)
|
||||
return
|
||||
}
|
||||
f := func() {
|
||||
@@ -303,10 +300,10 @@ func (l *Logger) RaiseHardwareEvent(failureDomain string, blockServiceID string,
|
||||
return
|
||||
}
|
||||
evt := HardwareEvent{
|
||||
Hostname: fmt.Sprintf("%REDACTED", failureDomain),
|
||||
Hostname: hostname,
|
||||
Timestamp: time.Now(),
|
||||
Component: DiskComponent,
|
||||
Location: "TernFS",
|
||||
Location: "EggsFS",
|
||||
Message: string(msgData),
|
||||
}
|
||||
err = l.heClient.SendHardwareEvent(evt)
|
||||
|
||||
@@ -99,14 +99,14 @@ func (b *MetricsBuilder) Timestamp(t time.Time) {
|
||||
b.state = METRICS_TIMESTAMP
|
||||
}
|
||||
|
||||
const (
|
||||
influxDBUrl = "http://REDACTED"
|
||||
influxDBOrg = "restech"
|
||||
influxDBBucket = "metrics"
|
||||
)
|
||||
type InfluxDB struct {
|
||||
Origin string
|
||||
Org string
|
||||
Bucket string
|
||||
}
|
||||
|
||||
func SendMetrics(payload io.Reader) error {
|
||||
url := influxDBUrl + "/api/v2/write?org=" + influxDBOrg + "&bucket=" + influxDBBucket + "&precision=ns"
|
||||
func (idb *InfluxDB) SendMetrics(payload io.Reader) error {
|
||||
url := idb.Origin + "/api/v2/write?org=" + idb.Org + "&bucket=" + idb.Bucket + "&precision=ns"
|
||||
resp, err := http.Post(url, "text/plain", payload)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -64,7 +64,7 @@ type XmonConfig struct {
|
||||
// If this is true, the alerts won't actually be sent,
|
||||
// but it'll still log alert creation etc
|
||||
OnlyLogging bool
|
||||
Prod bool
|
||||
Addr string
|
||||
// This is the "default" app instance/app type. We then
|
||||
// implicitly create instances for all the other app types,
|
||||
// and with the same app instance, so that we can send
|
||||
@@ -100,7 +100,6 @@ const heartbeatIntervalSecs uint8 = 5
|
||||
func (x *Xmon) packLogon(buf *bytes.Buffer) {
|
||||
buf.Reset()
|
||||
|
||||
// <https://REDACTED>
|
||||
// Name Type Description
|
||||
// Magic int16 always 'T'
|
||||
// Version int32 version number (latest version is 4)
|
||||
@@ -521,11 +520,7 @@ func NewXmon(log *Logger, config *XmonConfig) (*Xmon, error) {
|
||||
AppInstance: config.AppInstance + "@" + hostname,
|
||||
})
|
||||
}
|
||||
if config.Prod {
|
||||
x.xmonAddr = "REDACTED"
|
||||
} else {
|
||||
x.xmonAddr = "REDACTED"
|
||||
}
|
||||
x.xmonAddr = config.Addr
|
||||
}
|
||||
go x.run(log)
|
||||
return x, nil
|
||||
|
||||
@@ -123,6 +123,7 @@ type env struct {
|
||||
eraseLocks map[msgs.BlockServiceId]*sync.Mutex
|
||||
shuckleConn *client.ShuckleConn
|
||||
failureDomain string
|
||||
hostname string
|
||||
pathPrefix string
|
||||
readWholeFile bool
|
||||
ioAlertPercent uint8
|
||||
@@ -137,8 +138,8 @@ func BlockWriteProof(blockServiceId msgs.BlockServiceId, blockId msgs.BlockId, k
|
||||
return lib.CBCMAC(key, buf.Bytes())
|
||||
}
|
||||
|
||||
func raiseAlertAndHardwareEvent(logger *lib.Logger, failureDomain string, blockServiceId string, msg string) {
|
||||
logger.RaiseHardwareEvent(failureDomain, blockServiceId, msg)
|
||||
func raiseAlertAndHardwareEvent(logger *lib.Logger, hostname string, blockServiceId string, msg string) {
|
||||
logger.RaiseHardwareEvent(hostname, blockServiceId, msg)
|
||||
}
|
||||
|
||||
func blockServiceIdFromKey(secretKey [16]byte) msgs.BlockServiceId {
|
||||
@@ -262,7 +263,6 @@ var minimumRegisterInterval time.Duration = time.Second * 60
|
||||
var maximumRegisterInterval time.Duration = minimumRegisterInterval * 2
|
||||
var variantRegisterInterval time.Duration = maximumRegisterInterval - minimumRegisterInterval
|
||||
|
||||
|
||||
func registerPeriodically(
|
||||
log *lib.Logger,
|
||||
blockServices map[msgs.BlockServiceId]*blockService,
|
||||
@@ -286,7 +286,7 @@ func registerPeriodically(
|
||||
continue
|
||||
}
|
||||
log.ClearNC(alert)
|
||||
waitFor := minimumRegisterInterval + time.Duration(mrand.Uint64() % uint64(variantRegisterInterval.Nanoseconds()))
|
||||
waitFor := minimumRegisterInterval + time.Duration(mrand.Uint64()%uint64(variantRegisterInterval.Nanoseconds()))
|
||||
log.Info("registered with %v (%v alive), waiting %v", env.shuckleConn.ShuckleAddress(), len(blockServices), waitFor)
|
||||
time.Sleep(waitFor)
|
||||
}
|
||||
@@ -458,7 +458,7 @@ func sendFetchBlock(log *lib.Logger, env *env, blockServiceId msgs.BlockServiceI
|
||||
|
||||
if errors.Is(err, syscall.ENODATA) {
|
||||
// see <https://internal-repo/issues/106>
|
||||
raiseAlertAndHardwareEvent(log, env.failureDomain, blockServiceId.String(),
|
||||
raiseAlertAndHardwareEvent(log, env.hostname, blockServiceId.String(),
|
||||
fmt.Sprintf("could not open block %v, got ENODATA, this probably means that the block/disk is gone", blockPath))
|
||||
// return io error, downstream code will pick it up
|
||||
return syscall.EIO
|
||||
@@ -495,7 +495,7 @@ func sendFetchBlock(log *lib.Logger, env *env, blockServiceId msgs.BlockServiceI
|
||||
if withCrc {
|
||||
offset = offsetPageCount * msgs.TERN_PAGE_WITH_CRC_SIZE
|
||||
count = pageCount * msgs.TERN_PAGE_WITH_CRC_SIZE
|
||||
unix.Fadvise(int(f.Fd()), int64(offset), preReadSize, unix.FADV_SEQUENTIAL | unix.FADV_WILLNEED)
|
||||
unix.Fadvise(int(f.Fd()), int64(offset), preReadSize, unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED)
|
||||
|
||||
if _, err := reader.Seek(int64(offset), 0); err != nil {
|
||||
return err
|
||||
@@ -524,7 +524,7 @@ func sendFetchBlock(log *lib.Logger, env *env, blockServiceId msgs.BlockServiceI
|
||||
} else {
|
||||
// the only remaining case is that we have a file in new format and client wants old format
|
||||
offset = offsetPageCount * msgs.TERN_PAGE_WITH_CRC_SIZE
|
||||
unix.Fadvise(int(f.Fd()), int64(offset), preReadSize, unix.FADV_SEQUENTIAL | unix.FADV_WILLNEED)
|
||||
unix.Fadvise(int(f.Fd()), int64(offset), preReadSize, unix.FADV_SEQUENTIAL|unix.FADV_WILLNEED)
|
||||
if _, err := reader.Seek(int64(offset), 0); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1208,7 +1208,7 @@ func raiseAlerts(log *lib.Logger, env *env, blockServices map[msgs.BlockServiceI
|
||||
}
|
||||
}
|
||||
|
||||
func sendMetrics(log *lib.Logger, env *env, blockServices map[msgs.BlockServiceId]*blockService, failureDomain string) {
|
||||
func sendMetrics(log *lib.Logger, env *env, influxDB *lib.InfluxDB, blockServices map[msgs.BlockServiceId]*blockService, failureDomain string) {
|
||||
metrics := lib.MetricsBuilder{}
|
||||
rand := wyhash.New(mrand.Uint64())
|
||||
alert := log.NewNCAlert(10 * time.Second)
|
||||
@@ -1271,7 +1271,7 @@ func sendMetrics(log *lib.Logger, env *env, blockServices map[msgs.BlockServiceI
|
||||
}
|
||||
metrics.Timestamp(now)
|
||||
}
|
||||
err = lib.SendMetrics(metrics.Payload())
|
||||
err = influxDB.SendMetrics(metrics.Payload())
|
||||
if err == nil {
|
||||
log.ClearNC(alert)
|
||||
sleepFor := time.Minute + time.Duration(rand.Uint64() & ^(uint64(1)<<63))%time.Minute
|
||||
@@ -1328,13 +1328,14 @@ func getMountsInfo(log *lib.Logger, mountsPath string) (map[string]string, error
|
||||
func main() {
|
||||
flag.Usage = usage
|
||||
failureDomainStr := flag.String("failure-domain", "", "Failure domain")
|
||||
hostname := flag.String("hostname", "", "Hostname (for hardware event reporting)")
|
||||
pathPrefixStr := flag.String("path-prefix", "", "We filter our block service not only by failure domain but also by path prefix")
|
||||
|
||||
futureCutoff := flag.Duration("future-cutoff", DEFAULT_FUTURE_CUTOFF, "")
|
||||
var addresses lib.StringArrayFlags
|
||||
flag.Var(&addresses, "addr", "Addresses (up to two) to bind to, and that will be advertised to shuckle.")
|
||||
verbose := flag.Bool("verbose", false, "")
|
||||
xmon := flag.String("xmon", "", "Xmon environment (empty, prod, qa)")
|
||||
xmon := flag.String("xmon", "", "Xmon address (empty for no xmon)")
|
||||
trace := flag.Bool("trace", false, "")
|
||||
logFile := flag.String("log-file", "", "If empty, stdout")
|
||||
shuckleAddress := flag.String("shuckle", "", "Shuckle address (host:port).")
|
||||
@@ -1343,7 +1344,9 @@ func main() {
|
||||
syslog := flag.Bool("syslog", false, "")
|
||||
connectionTimeout := flag.Duration("connection-timeout", 10*time.Minute, "")
|
||||
reservedStorage := flag.Uint64("reserved-storage", 100<<30, "How many bytes to reserve and under-report capacity")
|
||||
metrics := flag.Bool("metrics", false, "")
|
||||
influxDBOrigin := flag.String("influx-db-origin", "", "Base URL to InfluxDB endpoint")
|
||||
influxDBOrg := flag.String("influx-db-org", "", "InfluxDB org")
|
||||
influxDBBucket := flag.String("influx-db-bucket", "", "InfluxDB bucket")
|
||||
locationId := flag.Uint("location", 10000, "Location ID")
|
||||
readWholeFile := flag.Bool("read-whole-file", false, "")
|
||||
ioAlertPercent := flag.Uint("io-alert-percent", 10, "Threshold percent of I/O errors over which we alert")
|
||||
@@ -1388,6 +1391,29 @@ func main() {
|
||||
*pathPrefixStr = *failureDomainStr
|
||||
}
|
||||
|
||||
if *hardwareEventAddress != "" && *hostname == "" {
|
||||
fmt.Fprintf(os.Stderr, "-hostname must be provided if you need hardware event reporting\n")
|
||||
flagErrors = true
|
||||
}
|
||||
|
||||
var influxDB *lib.InfluxDB
|
||||
if *influxDBOrigin == "" {
|
||||
if *influxDBOrg != "" || *influxDBBucket != "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
flagErrors = true
|
||||
}
|
||||
} else {
|
||||
if *influxDBOrg == "" || *influxDBBucket == "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
flagErrors = true
|
||||
}
|
||||
influxDB = &lib.InfluxDB{
|
||||
Origin: *influxDBOrigin,
|
||||
Org: *influxDBOrg,
|
||||
Bucket: *influxDBBucket,
|
||||
}
|
||||
}
|
||||
|
||||
if flagErrors {
|
||||
usage()
|
||||
os.Exit(2)
|
||||
@@ -1441,7 +1467,7 @@ func main() {
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{
|
||||
Level: level,
|
||||
Syslog: *syslog,
|
||||
Xmon: *xmon,
|
||||
XmonAddr: *xmon,
|
||||
HardwareEventServerURL: *hardwareEventAddress,
|
||||
AppInstance: "eggsblocks",
|
||||
AppType: "restech_eggsfs.daytime",
|
||||
@@ -1652,10 +1678,10 @@ func main() {
|
||||
updateBlockServiceInfoCapacityForever(log, blockServices, *reservedStorage)
|
||||
}()
|
||||
|
||||
if *metrics {
|
||||
if influxDB != nil {
|
||||
go func() {
|
||||
defer func() { lib.HandleRecoverChan(log, terminateChan, recover()) }()
|
||||
sendMetrics(log, env, blockServices, *failureDomainStr)
|
||||
sendMetrics(log, env, influxDB, blockServices, *failureDomainStr)
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@@ -75,7 +75,7 @@ type CountState struct {
|
||||
|
||||
func main() {
|
||||
verbose := flag.Bool("verbose", false, "Enables debug logging.")
|
||||
xmon := flag.String("xmon", "", "Xmon environment (empty, prod, qa)")
|
||||
xmon := flag.String("xmon", "", "Xmon address (empty for no xmon)")
|
||||
appInstance := flag.String("app-instance", "eggsgc", "")
|
||||
trace := flag.Bool("trace", false, "Enables debug logging.")
|
||||
logFile := flag.String("log-file", "", "File to log to, stdout if not provided.")
|
||||
@@ -97,7 +97,9 @@ func main() {
|
||||
defragMinSpanSize := flag.Uint("defrag-min-span-size", 0, "")
|
||||
defragStorageClass := flag.String("defrag-storage-class", "", "If present, will only defrag spans in this storage class.")
|
||||
zeroBlockServices := flag.Bool("zero-block-services", false, "")
|
||||
metrics := flag.Bool("metrics", false, "Send metrics")
|
||||
influxDBOrigin := flag.String("influx-db-origin", "", "Base URL to InfluxDB endpoint")
|
||||
influxDBOrg := flag.String("influx-db-org", "", "InfluxDB org")
|
||||
influxDBBucket := flag.String("influx-db-bucket", "", "InfluxDB bucket")
|
||||
countMetrics := flag.Bool("count-metrics", false, "Compute and send count metrics")
|
||||
migrate := flag.Bool("migrate", false, "migrate")
|
||||
numMigrators := flag.Int("num-migrators", 1, "How many migrate instances are running. 1 by default")
|
||||
@@ -150,6 +152,29 @@ func main() {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
var influxDB *lib.InfluxDB
|
||||
if *influxDBOrigin == "" {
|
||||
if *influxDBOrg != "" || *influxDBBucket != "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
} else {
|
||||
if *influxDBOrg == "" || *influxDBBucket == "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
influxDB = &lib.InfluxDB{
|
||||
Origin: *influxDBOrigin,
|
||||
Org: *influxDBOrg,
|
||||
Bucket: *influxDBBucket,
|
||||
}
|
||||
}
|
||||
|
||||
if *countMetrics && influxDB == nil {
|
||||
fmt.Fprintf(os.Stderr, "-count-metrics requires -influx-db info\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
logOut := os.Stdout
|
||||
if *logFile != "" {
|
||||
var err error
|
||||
@@ -168,7 +193,7 @@ func main() {
|
||||
level = lib.TRACE
|
||||
}
|
||||
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, Xmon: *xmon, AppType: lib.XMON_DAYTIME, AppInstance: *appInstance})
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, XmonAddr: *xmon, AppType: lib.XMON_DAYTIME, AppInstance: *appInstance})
|
||||
|
||||
if *mtu != 0 {
|
||||
client.SetMTU(*mtu)
|
||||
@@ -403,7 +428,7 @@ func main() {
|
||||
}()
|
||||
}
|
||||
|
||||
if *metrics && (*destructFiles || *collectDirectories || *zeroBlockServices || *scrub || *defrag) {
|
||||
if influxDB != nil && (*destructFiles || *collectDirectories || *zeroBlockServices || *scrub || *defrag) {
|
||||
// one thing just pushing the stats every minute
|
||||
go func() {
|
||||
metrics := lib.MetricsBuilder{}
|
||||
@@ -510,7 +535,7 @@ func main() {
|
||||
metrics.FieldU64("lookups", dirInfoCache.Lookups())
|
||||
metrics.Timestamp(now)
|
||||
}
|
||||
err := lib.SendMetrics(metrics.Payload())
|
||||
err := influxDB.SendMetrics(metrics.Payload())
|
||||
if err == nil {
|
||||
log.ClearNC(alert)
|
||||
sleepFor := time.Second * 30
|
||||
@@ -628,7 +653,7 @@ func main() {
|
||||
metrics.FieldU64("count", countState.Directories.Counts[i])
|
||||
metrics.Timestamp(now)
|
||||
}
|
||||
err = lib.SendMetrics(metrics.Payload())
|
||||
err = influxDB.SendMetrics(metrics.Payload())
|
||||
if err == nil {
|
||||
log.ClearNC(alert)
|
||||
sleepFor := time.Minute + time.Duration(rand.Uint64() & ^(uint64(1)<<63))%time.Minute
|
||||
|
||||
@@ -2781,7 +2781,7 @@ func setupRouting(log *lib.Logger, st *state, scriptsJsFile string) {
|
||||
}
|
||||
|
||||
// Writes stats to influx db.
|
||||
func sendMetrics(log *lib.Logger, st *state) error {
|
||||
func sendMetrics(log *lib.Logger, st *state, influxDB *lib.InfluxDB) error {
|
||||
metrics := lib.MetricsBuilder{}
|
||||
rand := wyhash.New(rand.Uint64())
|
||||
alert := log.NewNCAlert(10 * time.Second)
|
||||
@@ -2796,7 +2796,7 @@ func sendMetrics(log *lib.Logger, st *state) error {
|
||||
metrics.FieldU64("count", t.Count())
|
||||
metrics.Timestamp(now)
|
||||
}
|
||||
err := lib.SendMetrics(metrics.Payload())
|
||||
err := influxDB.SendMetrics(metrics.Payload())
|
||||
if err == nil {
|
||||
log.ClearNC(alert)
|
||||
sleepFor := time.Minute + time.Duration(rand.Uint64() & ^(uint64(1)<<63))%time.Minute
|
||||
@@ -3303,9 +3303,11 @@ func main() {
|
||||
logFile := flag.String("log-file", "", "File in which to write logs (or stdout)")
|
||||
verbose := flag.Bool("verbose", false, "")
|
||||
trace := flag.Bool("trace", false, "")
|
||||
xmon := flag.String("xmon", "", "Xmon environment (empty, prod, qa)")
|
||||
xmon := flag.String("xmon", "", "Xmon address (empty for no xmon)")
|
||||
syslog := flag.Bool("syslog", false, "")
|
||||
metrics := flag.Bool("metrics", false, "")
|
||||
influxDBOrigin := flag.String("influx-db-origin", "", "Base URL to InfluxDB endpoint")
|
||||
influxDBOrg := flag.String("influx-db-org", "", "InfluxDB org")
|
||||
influxDBBucket := flag.String("influx-db-bucket", "", "InfluxDB bucket")
|
||||
dataDir := flag.String("data-dir", "", "Where to store the shuckle files")
|
||||
maxConnections := flag.Uint("max-connections", 4000, "Maximum number of connections to accept.")
|
||||
mtu := flag.Uint64("mtu", 0, "")
|
||||
@@ -3359,13 +3361,31 @@ func main() {
|
||||
if *trace {
|
||||
level = lib.TRACE
|
||||
}
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, Xmon: *xmon, AppInstance: "eggsshuckle", AppType: "restech_eggsfs.critical"})
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, XmonAddr: *xmon, AppInstance: "eggsshuckle", AppType: "restech_eggsfs.critical"})
|
||||
|
||||
if *dataDir == "" {
|
||||
fmt.Fprintf(os.Stderr, "You need to specify a -data-dir\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
var influxDB *lib.InfluxDB
|
||||
if *influxDBOrigin == "" {
|
||||
if *influxDBOrg != "" || *influxDBBucket != "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
} else {
|
||||
if *influxDBOrg == "" || *influxDBBucket == "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
influxDB = &lib.InfluxDB{
|
||||
Origin: *influxDBOrigin,
|
||||
Org: *influxDBOrg,
|
||||
Bucket: *influxDBBucket,
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("Running shuckle with options:")
|
||||
log.Info(" addr = %v", addresses)
|
||||
log.Info(" httpPort = %v", *httpPort)
|
||||
@@ -3504,10 +3524,10 @@ func main() {
|
||||
checkBlockServiceFilePresence(log, state)
|
||||
}()
|
||||
|
||||
if *metrics {
|
||||
if influxDB != nil {
|
||||
go func() {
|
||||
defer func() { lib.HandleRecoverPanic(log, recover()) }()
|
||||
sendMetrics(log, state)
|
||||
sendMetrics(log, state, influxDB)
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ type shuckleProxyConfig struct {
|
||||
func newState(
|
||||
log *lib.Logger,
|
||||
conf *shuckleProxyConfig,
|
||||
idb *lib.InfluxDB,
|
||||
) *state {
|
||||
st := &state{
|
||||
config: conf,
|
||||
@@ -91,7 +92,6 @@ func handleShuckle(log *lib.Logger, s *state) (msgs.ShuckleResponse, error) {
|
||||
return &msgs.ShuckleResp{s.config.addrs}, nil
|
||||
}
|
||||
|
||||
|
||||
func handleRequestParsed(log *lib.Logger, s *state, req msgs.ShuckleRequest) (msgs.ShuckleResponse, error) {
|
||||
t0 := time.Now()
|
||||
defer func() {
|
||||
@@ -376,7 +376,7 @@ func noRunawayArgs() {
|
||||
}
|
||||
|
||||
// Writes stats to influx db.
|
||||
func sendMetrics(log *lib.Logger, st *state) error {
|
||||
func sendMetrics(log *lib.Logger, st *state, influxDB *lib.InfluxDB) error {
|
||||
metrics := lib.MetricsBuilder{}
|
||||
rand := wyhash.New(rand.Uint64())
|
||||
alert := log.NewNCAlert(10 * time.Second)
|
||||
@@ -391,7 +391,7 @@ func sendMetrics(log *lib.Logger, st *state) error {
|
||||
metrics.FieldU64("count", t.Count())
|
||||
metrics.Timestamp(now)
|
||||
}
|
||||
err := lib.SendMetrics(metrics.Payload())
|
||||
err := influxDB.SendMetrics(metrics.Payload())
|
||||
if err == nil {
|
||||
log.ClearNC(alert)
|
||||
sleepFor := time.Minute + time.Duration(rand.Uint64() & ^(uint64(1)<<63))%time.Minute
|
||||
@@ -411,9 +411,11 @@ func main() {
|
||||
logFile := flag.String("log-file", "", "File in which to write logs (or stdout)")
|
||||
verbose := flag.Bool("verbose", false, "")
|
||||
trace := flag.Bool("trace", false, "")
|
||||
xmon := flag.String("xmon", "", "Xmon environment (empty, prod, qa)")
|
||||
xmon := flag.String("xmon", "", "Xmon address (empty for no xmon)")
|
||||
syslog := flag.Bool("syslog", false, "")
|
||||
metrics := flag.Bool("metrics", false, "")
|
||||
influxDBOrigin := flag.String("influx-db-origin", "", "Base URL to InfluxDB endpoint")
|
||||
influxDBOrg := flag.String("influx-db-org", "", "InfluxDB org")
|
||||
influxDBBucket := flag.String("influx-db-bucket", "", "InfluxDB bucket")
|
||||
shuckleAddress := flag.String("shuckle-address", "", "Shuckle address to connect to.")
|
||||
location := flag.Uint("location", 0, "Location id for this shuckle proxy.")
|
||||
numHandlers := flag.Uint("num-handlers", 100, "Number of shuckle connections to open.")
|
||||
@@ -438,6 +440,24 @@ func main() {
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
var influxDB *lib.InfluxDB
|
||||
if *influxDBOrigin == "" {
|
||||
if *influxDBOrg != "" || *influxDBBucket != "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
} else {
|
||||
if *influxDBOrg == "" || *influxDBBucket == "" {
|
||||
fmt.Fprintf(os.Stderr, "Either all or none of the -influx-db flags must be passed\n")
|
||||
os.Exit(2)
|
||||
}
|
||||
influxDB = &lib.InfluxDB{
|
||||
Origin: *influxDBOrigin,
|
||||
Org: *influxDBOrg,
|
||||
Bucket: *influxDBBucket,
|
||||
}
|
||||
}
|
||||
|
||||
ownIp1, ownPort1, err := lib.ParseIPV4Addr(addresses[0])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -468,7 +488,7 @@ func main() {
|
||||
if *trace {
|
||||
level = lib.TRACE
|
||||
}
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, Xmon: *xmon, AppInstance: "eggsshuckleproxy", AppType: "restech_eggsfs.critical"})
|
||||
log := lib.NewLogger(logOut, &lib.LoggerOptions{Level: level, Syslog: *syslog, XmonAddr: *xmon, AppInstance: "eggsshuckleproxy", AppType: "restech_eggsfs.critical"})
|
||||
|
||||
log.Info("Running shuckle proxy with options:")
|
||||
log.Info(" addr = %v", addresses)
|
||||
@@ -479,12 +499,10 @@ func main() {
|
||||
log.Info(" maxConnections = %d", *maxConnections)
|
||||
log.Info(" mtu = %v", *mtu)
|
||||
|
||||
|
||||
if *mtu != 0 {
|
||||
client.SetMTU(*mtu)
|
||||
}
|
||||
|
||||
|
||||
bincodeListener1, err := net.Listen("tcp", fmt.Sprintf("%v:%v", net.IP(ownIp1[:]), ownPort1))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -501,7 +519,6 @@ func main() {
|
||||
defer bincodeListener2.Close()
|
||||
}
|
||||
|
||||
|
||||
if bincodeListener2 == nil {
|
||||
log.Info("running on %v (bincode)", bincodeListener1.Addr())
|
||||
} else {
|
||||
@@ -514,7 +531,7 @@ func main() {
|
||||
numHandlers: *numHandlers,
|
||||
shuckleAddress: *shuckleAddress,
|
||||
}
|
||||
state := newState(log, config)
|
||||
state := newState(log, config, influxDB)
|
||||
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT, syscall.SIGSTKFLT, syscall.SIGSYS)
|
||||
@@ -536,7 +553,7 @@ func main() {
|
||||
terminateChan <- err
|
||||
return
|
||||
}
|
||||
if (atomic.AddInt64(&activeConnections, 1) > int64(*maxConnections)) {
|
||||
if atomic.AddInt64(&activeConnections, 1) > int64(*maxConnections) {
|
||||
conn.Close()
|
||||
atomic.AddInt64(&activeConnections, -1)
|
||||
continue
|
||||
@@ -557,10 +574,10 @@ func main() {
|
||||
startBincodeHandler(bincodeListener2)
|
||||
}
|
||||
|
||||
if *metrics {
|
||||
if influxDB != nil {
|
||||
go func() {
|
||||
defer func() { lib.HandleRecoverPanic(log, recover()) }()
|
||||
sendMetrics(log, state)
|
||||
sendMetrics(log, state, influxDB)
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ while ! scp -v -P 2223 -o StrictHostKeyChecking=no -i image-key ternfs.ko fmazzo
|
||||
done
|
||||
|
||||
# Deploy ternfs
|
||||
../deploy/vm_deploy.py
|
||||
./vm_deploy.py
|
||||
|
||||
# Insert module
|
||||
ssh -p 2223 -i image-key fmazzol@localhost "sudo insmod ternfs.ko"
|
||||
|
||||
@@ -27,8 +27,6 @@ echo "Preparing kmod CI environment with base image $base_img"
|
||||
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
|
||||
cd $SCRIPT_DIR
|
||||
|
||||
export https_proxy=http://REDACTED
|
||||
|
||||
# prepare linux sources
|
||||
./fetchlinux.sh
|
||||
|
||||
|
||||
@@ -4,9 +4,7 @@ set -eu -o pipefail
|
||||
version=linux-5.4.237
|
||||
|
||||
# Download or resume
|
||||
# use artifactory so that this works in iceland as well
|
||||
# curl -C - -O "https://cdn.kernel.org/pub/linux/kernel/v5.x/${version}.tar.gz"
|
||||
curl -C - -O "https://REDACTED${version}.tar.gz"
|
||||
curl -C - -O "https://cdn.kernel.org/pub/linux/kernel/v5.x/${version}.tar.gz"
|
||||
|
||||
# Check
|
||||
sha512sum -c "${version}.tar.gz.sha512"
|
||||
|
||||
@@ -66,7 +66,7 @@ ssh -i image-key -p 2223 fmazzol@localhost 'sudo insmod ternfs.ko'
|
||||
|
||||
if [[ "$deploy" = true ]]; then
|
||||
# Deploy binaries
|
||||
(cd ../deploy && $(pwd)/vm_deploy.py --build-type "$build_type")
|
||||
./vm_deploy.py --build-type "$build_type"
|
||||
fi
|
||||
|
||||
# Create shells
|
||||
|
||||
47
kmod/vm_deploy.py
Executable file
47
kmod/vm_deploy.py
Executable file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Utility for importing binaries into vm for tests
|
||||
"""
|
||||
import argparse
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
REPO_DIR = Path(__file__).parent.parent
|
||||
|
||||
|
||||
def build_and_upload(build_type: str) -> None:
|
||||
"""Builds and uploads binaries to the vm."""
|
||||
subprocess.run([str(REPO_DIR / "build.sh"), build_type], check=True)
|
||||
test_binaries = [
|
||||
f"build/{build_type}/ternshard",
|
||||
f"build/{build_type}/terncdc",
|
||||
f"build/{build_type}/ternshuckle",
|
||||
f"build/{build_type}/ternrun",
|
||||
f"build/{build_type}/ternblocks",
|
||||
f"build/{build_type}/ternfuse",
|
||||
f"build/{build_type}/terngc",
|
||||
f"build/{build_type}/terntests",
|
||||
f"build/{build_type}/terndbtools",
|
||||
]
|
||||
|
||||
subprocess.run(
|
||||
[
|
||||
"rsync",
|
||||
"-p",
|
||||
"--quiet",
|
||||
"-e",
|
||||
"ssh -p 2223 -i ../kmod/image-key -l fmazzol",
|
||||
]
|
||||
+ [str(REPO_DIR / f) for f in test_binaries]
|
||||
+ ["localhost:tern/"],
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--build-type", choices=["alpine", "alpinedebug"], default="alpine"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
build_and_upload(args.build_type)
|
||||
Reference in New Issue
Block a user