terncli: add tag-files subcommand

This commit is contained in:
Miroslav Crnic
2026-05-05 14:35:21 +00:00
parent 21385bfa20
commit 63aefdd5d7
5 changed files with 1112 additions and 0 deletions
+226
View File
@@ -0,0 +1,226 @@
// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later
package main
import (
"fmt"
"os"
"path"
"sync/atomic"
"time"
"xtx/ternfs/client"
"xtx/ternfs/core/log"
"xtx/ternfs/msgs"
)
type tagFilesParams struct {
rulesPath string
roots []string
outputDir string
outputOnTernFS bool
rotationRows int
rotationInterval time.Duration
workersPerShard int
dryRun bool
creationTimePreskip bool
}
type tagFilesStats struct {
files atomic.Uint64
dirs atomic.Uint64
preskipped atomic.Uint64
dirsEmptyCheck atomic.Uint64
tags []string
rowsByTag map[string]*atomic.Uint64
bytesByTag map[string]*atomic.Uint64
statErrors atomic.Uint64
matchErrors atomic.Uint64
}
func newTagFilesStats(rules []*Rule) *tagFilesStats {
s := &tagFilesStats{
rowsByTag: make(map[string]*atomic.Uint64),
bytesByTag: make(map[string]*atomic.Uint64),
}
for _, r := range rules {
if _, ok := s.rowsByTag[r.Tag]; ok {
continue
}
s.tags = append(s.tags, r.Tag)
s.rowsByTag[r.Tag] = new(atomic.Uint64)
s.bytesByTag[r.Tag] = new(atomic.Uint64)
}
return s
}
func runTagFiles(l *log.Logger, c *client.Client, p *tagFilesParams) error {
rulesBytes, err := os.ReadFile(p.rulesPath)
if err != nil {
return fmt.Errorf("read rules: %w", err)
}
rules, err := LoadRules(rulesBytes)
if err != nil {
return fmt.Errorf("parse rules: %w", err)
}
fileRules := make([]*Rule, 0, len(rules))
dirRules := make([]*Rule, 0, len(rules))
for _, r := range rules {
if r.AppliesTo == "directory" {
dirRules = append(dirRules, r)
} else {
fileRules = append(fileRules, r)
}
}
l.Info("loaded %d rules (%d file, %d directory)", len(rules), len(fileRules), len(dirRules))
stats := newTagFilesStats(rules)
var bw *batchWriters
if !p.dryRun {
bw, err = newBatchWriters(p.outputDir, p.rotationRows, p.rotationInterval, p.outputOnTernFS, stats.tags)
if err != nil {
return fmt.Errorf("batch writer: %w", err)
}
}
now := msgs.Now()
startedAt := time.Now()
cb := func(parent msgs.InodeId, parentPath string, name string, creationTime msgs.TernTime, id msgs.InodeId, current bool, owned bool) error {
if id.Type() == msgs.DIRECTORY {
stats.dirs.Add(1)
if !owned || !current || len(dirRules) == 0 {
return nil
}
fullPath := path.Join(parentPath, name)
if !AnyMaybeMatches(dirRules, fullPath, creationTime, now) {
return nil
}
stats.dirsEmptyCheck.Add(1)
rdResp := msgs.ReadDirResp{}
if err := c.ShardRequest(l, id.Shard(), &msgs.ReadDirReq{DirId: id}, &rdResp); err != nil {
l.ErrorNoAlert("readdir %s: %v", fullPath, err)
stats.statErrors.Add(1)
return nil
}
if len(rdResp.Results) > 0 {
return nil
}
statResp := msgs.StatDirectoryResp{}
if err := c.ShardRequest(l, id.Shard(), &msgs.StatDirectoryReq{Id: id}, &statResp); err != nil {
l.ErrorNoAlert("statdir %s: %v", fullPath, err)
stats.statErrors.Add(1)
return nil
}
// Directories have no atime; reuse mtime so workers can drift-
// check uniformly with file rows.
fired := FirstMatch(dirRules, fullPath, 0, statResp.Mtime, statResp.Mtime, now)
if fired == nil {
return nil
}
stats.rowsByTag[fired.Tag].Add(1)
if p.dryRun {
return nil
}
row := fmt.Sprintf(
"%s\t%d\t%d\t%d\t%s\t%s",
id.String(),
0,
uint64(statResp.Mtime),
uint64(statResp.Mtime),
fired.Name,
fullPath,
)
if err := bw.AppendRow(fired.Tag, uint8(id.Shard()), row); err != nil {
l.ErrorNoAlert("append row %s tag=%s: %v", fullPath, fired.Tag, err)
stats.matchErrors.Add(1)
}
return nil
}
if !owned || !current {
return nil
}
stats.files.Add(1)
fullPath := path.Join(parentPath, name)
if p.creationTimePreskip && !AnyMaybeMatches(fileRules, fullPath, creationTime, now) {
stats.preskipped.Add(1)
return nil
}
resp := msgs.StatFileResp{}
if err := c.ShardRequest(l, id.Shard(), &msgs.StatFileReq{Id: id}, &resp); err != nil {
l.ErrorNoAlert("stat %s: %v", fullPath, err)
stats.statErrors.Add(1)
return nil
}
fired := FirstMatch(fileRules, fullPath, resp.Size, resp.Atime, resp.Mtime, now)
if fired == nil {
return nil
}
stats.rowsByTag[fired.Tag].Add(1)
stats.bytesByTag[fired.Tag].Add(resp.Size)
if p.dryRun {
return nil
}
// Row schema: inode_hex \t size \t atime_ns \t mtime_ns \t rule \t path
row := fmt.Sprintf(
"%s\t%d\t%d\t%d\t%s\t%s",
id.String(),
resp.Size,
uint64(resp.Atime),
uint64(resp.Mtime),
fired.Name,
fullPath,
)
if err := bw.AppendRow(fired.Tag, uint8(id.Shard()), row); err != nil {
l.ErrorNoAlert("append row %s tag=%s: %v", fullPath, fired.Tag, err)
stats.matchErrors.Add(1)
}
return nil
}
walkErr := client.ParwalkMany(
l,
c,
&client.ParwalkOptions{WorkersPerShard: p.workersPerShard},
p.roots,
cb,
)
if walkErr != nil {
l.ErrorNoAlert("walk: %v", walkErr)
}
var closeErr error
if bw != nil {
closeErr = bw.Close()
if closeErr != nil {
l.ErrorNoAlert("close batches: %v", closeErr)
}
}
elapsed := time.Since(startedAt)
files := stats.files.Load()
rate := float64(files) / elapsed.Seconds()
l.Info("tag-files done in %s: %d files visited (%d dirs), %.0f files/s, %d stat-preskipped, %d dir-empty-checks, %d stat errors, %d write errors",
elapsed.Truncate(time.Second), files, stats.dirs.Load(), rate, stats.preskipped.Load(), stats.dirsEmptyCheck.Load(), stats.statErrors.Load(), stats.matchErrors.Load())
fmt.Fprintln(os.Stderr, "tag\trows\ttotal_bytes")
for _, tag := range stats.tags {
rows := stats.rowsByTag[tag].Load()
bytes := stats.bytesByTag[tag].Load()
fmt.Fprintf(os.Stderr, "%s\t%d\t%d\n", tag, rows, bytes)
}
if walkErr != nil {
return walkErr
}
return closeErr
}
+217
View File
@@ -0,0 +1,217 @@
// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later
package main
import (
"bufio"
"fmt"
"os"
"path/filepath"
"time"
)
// batchWriter is a per-(tag, shard) rotating TSV writer. One goroutine
// per writer is the sole owner of the file/buffer state; AppendRow is a
// buffered channel send.
//
// When ternfsTransient is true, files are created under a TernFS path
// where an open file is not listed by readdir until close. Otherwise the
// writer creates "<name>.tmp" and renames to "<name>" on rotation.
type batchWriter struct {
dir string
tag string
shard uint8
rotationRows int
rotationAge time.Duration
ternfsTransient bool
rows chan string
done chan struct{}
// Owned by the loop goroutine.
w *bufio.Writer
f *os.File
openPath string
row int
openedAt time.Time
seq int
err error
}
const batchStreamQueueDepth = 1024
type batchKey struct {
tag string
shard uint8
}
type batchWriters struct {
writers map[batchKey]*batchWriter
}
// newBatchWriters preallocates one writer per (tag, shard). AppendRow
// errors if it gets a tag that wasn't supplied here.
func newBatchWriters(outDir string, rotationRows int, rotationAge time.Duration, ternfsTransient bool, tags []string) (*batchWriters, error) {
if rotationRows <= 0 {
return nil, fmt.Errorf("rotationRows must be > 0")
}
if rotationAge <= 0 {
return nil, fmt.Errorf("rotationAge must be > 0")
}
if err := os.MkdirAll(outDir, 0755); err != nil {
return nil, fmt.Errorf("mkdir %q: %w", outDir, err)
}
bws := &batchWriters{writers: make(map[batchKey]*batchWriter, len(tags)*256)}
for _, tag := range tags {
for shard := 0; shard < 256; shard++ {
bw := &batchWriter{
dir: outDir,
tag: tag,
shard: uint8(shard),
rotationRows: rotationRows,
rotationAge: rotationAge,
ternfsTransient: ternfsTransient,
rows: make(chan string, batchStreamQueueDepth),
done: make(chan struct{}),
}
bws.writers[batchKey{tag: tag, shard: uint8(shard)}] = bw
go bw.loop()
}
}
return bws, nil
}
// AppendRow appends one row to the (tag, shard) writer. The row must not
// include a trailing newline.
func (b *batchWriters) AppendRow(tag string, shard uint8, row string) (err error) {
bw, ok := b.writers[batchKey{tag: tag, shard: shard}]
if !ok {
return fmt.Errorf("batchWriters: unknown tag %q", tag)
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("batchWriters: AppendRow after Close (%v)", r)
}
}()
bw.rows <- row
return nil
}
// Close drains every writer and returns the first error.
func (b *batchWriters) Close() error {
for _, bw := range b.writers {
close(bw.rows)
}
var firstErr error
for _, bw := range b.writers {
<-bw.done
if bw.err != nil && firstErr == nil {
firstErr = bw.err
}
}
return firstErr
}
func (b *batchWriter) loop() {
defer close(b.done)
ticker := time.NewTicker(b.rotationAge)
defer ticker.Stop()
for {
select {
case row, ok := <-b.rows:
if !ok {
if err := b.closeFile(); err != nil {
b.recordErr(err)
}
return
}
if err := b.ensureOpen(); err != nil {
b.recordErr(err)
continue
}
if _, err := b.w.WriteString(row); err != nil {
b.recordErr(err)
continue
}
if err := b.w.WriteByte('\n'); err != nil {
b.recordErr(err)
continue
}
b.row++
if b.row >= b.rotationRows {
if err := b.closeFile(); err != nil {
b.recordErr(err)
}
}
case <-ticker.C:
if b.f != nil && time.Since(b.openedAt) >= b.rotationAge {
if err := b.closeFile(); err != nil {
b.recordErr(err)
}
}
}
}
}
func (b *batchWriter) recordErr(err error) {
if b.err == nil {
b.err = err
}
}
func (b *batchWriter) ensureOpen() error {
if b.f != nil {
return nil
}
finalBase := batchFinalName(b.tag, b.shard, b.seq)
openBase := finalBase
if !b.ternfsTransient {
openBase = finalBase + ".tmp"
}
openPath := filepath.Join(b.dir, openBase)
f, err := os.OpenFile(openPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open %q: %w", openPath, err)
}
b.f = f
b.w = bufio.NewWriterSize(f, 64*1024)
b.openPath = openPath
b.row = 0
b.openedAt = time.Now()
return nil
}
func (b *batchWriter) closeFile() error {
if b.f == nil {
return nil
}
flushErr := b.w.Flush()
closeErr := b.f.Close()
openPath := b.openPath
finalBase := batchFinalName(b.tag, b.shard, b.seq)
b.f = nil
b.w = nil
b.openPath = ""
if flushErr != nil {
return flushErr
}
if closeErr != nil {
return closeErr
}
if !b.ternfsTransient {
finalPath := filepath.Join(b.dir, finalBase)
if err := os.Rename(openPath, finalPath); err != nil {
return fmt.Errorf("rename %q -> %q: %w", openPath, finalPath, err)
}
}
b.seq++
return nil
}
func batchFinalName(tag string, shard uint8, seq int) string {
return fmt.Sprintf("%s-%02x-%06d.tsv", tag, shard, seq)
}
+229
View File
@@ -0,0 +1,229 @@
// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later
package main
import (
"encoding/json"
"fmt"
"math"
"path/filepath"
"regexp"
"time"
"xtx/ternfs/msgs"
)
// Rule is one entry in a tag-rules JSON file.
//
// Tag is an arbitrary string. When a rule fires, its tag selects the
// output bucket; tag values have no built-in meaning here.
//
// AppliesTo is "file" (default) or "directory".
type Rule struct {
Name string
Tag string
AppliesTo string
IncludePatterns []*regexp.Regexp
ExcludePatterns []*regexp.Regexp
SuffixPatterns []*regexp.Regexp
AtimeDays float64
MtimeDays float64
SizeBytes uint64
ExtendedRetentionBitfieldMatch *bool
}
// Sentinel checked against the seconds portion of mtime. When a rule sets
// ExtendedRetentionBitfieldMatch, the low 10 bits of (mtime / 1s) must
// match this pattern (true) or not (false).
const (
extendedRetentionBitfield uint64 = 0b1010101010
extendedRetentionBitmask uint64 = 0b1111111111
)
// threshold accepts either a JSON number or the string "inf".
type threshold float64
func (t *threshold) UnmarshalJSON(b []byte) error {
if len(b) > 0 && b[0] == '"' {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
if s == "inf" || s == "Infinity" {
*t = threshold(math.Inf(+1))
return nil
}
return fmt.Errorf("unexpected threshold string %q", s)
}
var f float64
if err := json.Unmarshal(b, &f); err != nil {
return err
}
*t = threshold(f)
return nil
}
type ruleJSON struct {
Name string `json:"name"`
Tag string `json:"tag"`
AppliesTo string `json:"applies_to"`
IncludePathMatch []string `json:"include_path_match"`
ExcludePathMatch []string `json:"exclude_path_match"`
FileSuffixMatch []string `json:"file_suffix_match"`
AtimeDays threshold `json:"atime_days"`
MtimeDays threshold `json:"mtime_days"`
SizeBytes uint64 `json:"size_bytes"`
ExtendedRetentionBitfieldMatch *bool `json:"extended_retention_bitfield_match"`
}
func LoadRules(data []byte) ([]*Rule, error) {
var raw []ruleJSON
if err := json.Unmarshal(data, &raw); err != nil {
return nil, err
}
out := make([]*Rule, len(raw))
for i, r := range raw {
if r.Tag == "" {
return nil, fmt.Errorf("rule %q has empty tag", r.Name)
}
appliesTo := r.AppliesTo
if appliesTo == "" {
appliesTo = "file"
}
if appliesTo != "file" && appliesTo != "directory" {
return nil, fmt.Errorf("rule %q has invalid applies_to %q", r.Name, appliesTo)
}
inc, err := compileAll(r.IncludePathMatch)
if err != nil {
return nil, fmt.Errorf("rule %q include: %w", r.Name, err)
}
exc, err := compileAll(r.ExcludePathMatch)
if err != nil {
return nil, fmt.Errorf("rule %q exclude: %w", r.Name, err)
}
suf, err := compileAll(r.FileSuffixMatch)
if err != nil {
return nil, fmt.Errorf("rule %q suffix: %w", r.Name, err)
}
out[i] = &Rule{
Name: r.Name,
Tag: r.Tag,
AppliesTo: appliesTo,
IncludePatterns: inc,
ExcludePatterns: exc,
SuffixPatterns: suf,
AtimeDays: float64(r.AtimeDays),
MtimeDays: float64(r.MtimeDays),
SizeBytes: r.SizeBytes,
ExtendedRetentionBitfieldMatch: r.ExtendedRetentionBitfieldMatch,
}
}
return out, nil
}
func compileAll(patterns []string) ([]*regexp.Regexp, error) {
out := make([]*regexp.Regexp, len(patterns))
for i, p := range patterns {
// \A anchors at start-of-string; the rest of the pattern is left
// unanchored at end.
re, err := regexp.Compile(`\A(?:` + p + `)`)
if err != nil {
return nil, fmt.Errorf("compile %q: %w", p, err)
}
out[i] = re
}
return out, nil
}
func matchAny(patterns []*regexp.Regexp, s string) bool {
for _, re := range patterns {
if re.MatchString(s) {
return true
}
}
return false
}
// Matches reports whether the rule fires.
func (r *Rule) Matches(path string, size uint64, atime, mtime, now msgs.TernTime) bool {
if time.Duration(now-atime) < daysToDuration(r.AtimeDays) {
return false
}
if time.Duration(now-mtime) < daysToDuration(r.MtimeDays) {
return false
}
if size < r.SizeBytes {
return false
}
if r.ExtendedRetentionBitfieldMatch != nil {
mtimeSec := uint64(mtime) / uint64(time.Second)
hasBitfield := (mtimeSec & extendedRetentionBitmask) == extendedRetentionBitfield
if hasBitfield != *r.ExtendedRetentionBitfieldMatch {
return false
}
}
if !matchAny(r.IncludePatterns, path) {
return false
}
if matchAny(r.ExcludePatterns, path) {
return false
}
suffix := filepath.Ext(path)
if !matchAny(r.SuffixPatterns, suffix) {
return false
}
return true
}
// FirstMatch returns the first rule whose predicate holds, or nil.
func FirstMatch(rules []*Rule, path string, size uint64, atime, mtime, now msgs.TernTime) *Rule {
for _, r := range rules {
if r.Matches(path, size, atime, mtime, now) {
return r
}
}
return nil
}
// MaybeMatchesByCreationTime is a conservative prefilter: returns false
// only when atime/mtime are bounded below by creationTime tightly enough
// to rule out a match. Size and the bitfield are not checked.
func (r *Rule) MaybeMatchesByCreationTime(path string, creationTime, now msgs.TernTime) bool {
creationAge := time.Duration(now - creationTime)
if creationAge < daysToDuration(r.AtimeDays) {
return false
}
if creationAge < daysToDuration(r.MtimeDays) {
return false
}
if !matchAny(r.IncludePatterns, path) {
return false
}
if matchAny(r.ExcludePatterns, path) {
return false
}
suffix := filepath.Ext(path)
if !matchAny(r.SuffixPatterns, suffix) {
return false
}
return true
}
// AnyMaybeMatches returns true if any rule passes the prefilter.
func AnyMaybeMatches(rules []*Rule, path string, creationTime, now msgs.TernTime) bool {
for _, r := range rules {
if r.MaybeMatchesByCreationTime(path, creationTime, now) {
return true
}
}
return false
}
// daysToDuration treats +Inf as the largest representable duration.
func daysToDuration(days float64) time.Duration {
if math.IsInf(days, +1) {
return time.Duration(math.MaxInt64)
}
return time.Duration(days * 24 * float64(time.Hour))
}
+395
View File
@@ -0,0 +1,395 @@
// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later
package main
import (
"os"
"path/filepath"
"sort"
"strings"
"testing"
"time"
"xtx/ternfs/msgs"
)
const oneRuleJSON = `[
{
"name": "example_delete",
"tag": "DELETE",
"applies_to": "file",
"include_path_match": ["^/test/.*x/.*"],
"exclude_path_match": [".*/skip/.*"],
"file_suffix_match": ["\\.dat$"],
"atime_days": 30,
"mtime_days": 30,
"size_bytes": 1048576,
"extended_retention_bitfield_match": null
}
]`
func mustLoad(t *testing.T, data string) []*Rule {
t.Helper()
rules, err := LoadRules([]byte(data))
if err != nil {
t.Fatalf("LoadRules: %v", err)
}
return rules
}
func TestLoadRules_RequiresTag(t *testing.T) {
const j = `[{"name":"x","tag":"","include_path_match":[],"exclude_path_match":[],"file_suffix_match":[],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null}]`
if _, err := LoadRules([]byte(j)); err == nil {
t.Fatal("expected empty-tag rule to be rejected")
}
}
func TestLoadRules_AppliesToDefaultsToFile(t *testing.T) {
const j = `[{"name":"x","tag":"DELETE","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null}]`
rules := mustLoad(t, j)
if rules[0].AppliesTo != "file" {
t.Fatalf("default applies_to = %q, want %q", rules[0].AppliesTo, "file")
}
}
func TestLoadRules_AppliesToInvalid(t *testing.T) {
const j = `[{"name":"x","tag":"DELETE","applies_to":"sock","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null}]`
if _, err := LoadRules([]byte(j)); err == nil {
t.Fatal("expected invalid applies_to to be rejected")
}
}
func TestRulePredicate_HappyPath(t *testing.T) {
rules := mustLoad(t, oneRuleJSON)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
older := now - msgs.TernTime(60*24*time.Hour)
if !rules[0].Matches("/test/foo/x/bar/file.dat", 2<<20, older, older, now) {
t.Fatal("expected match")
}
}
func TestRulePredicate_RejectsOnEachDimension(t *testing.T) {
rules := mustLoad(t, oneRuleJSON)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
older := now - msgs.TernTime(60*24*time.Hour)
tooFresh := now - msgs.TernTime(1*24*time.Hour)
good := "/test/foo/x/bar/file.dat"
cases := []struct {
name string
path string
size uint64
atime, mtime msgs.TernTime
wantFire bool
}{
{"happy", good, 2 << 20, older, older, true},
{"include_miss", "/etc/passwd", 2 << 20, older, older, false},
{"exclude_hit", "/test/foo/x/skip/file.dat", 2 << 20, older, older, false},
{"suffix_miss", "/test/foo/x/bar/file.txt", 2 << 20, older, older, false},
{"too_small", good, 1024, older, older, false},
{"too_fresh_atime", good, 2 << 20, tooFresh, older, false},
{"too_fresh_mtime", good, 2 << 20, older, tooFresh, false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := rules[0].Matches(tc.path, tc.size, tc.atime, tc.mtime, now)
if got != tc.wantFire {
t.Fatalf("Matches=%v want %v", got, tc.wantFire)
}
})
}
}
func TestRulePredicate_ExtendedRetentionBitfield(t *testing.T) {
const j = `[
{"name":"er_required","tag":"NOOP","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":true},
{"name":"er_forbidden","tag":"NOOP","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":false}
]`
rules := mustLoad(t, j)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
// Pick a seconds-aligned mtime whose low 10 bits match the sentinel.
baseSec := int64((uint64(1_600_000_000) & ^extendedRetentionBitmask) | extendedRetentionBitfield)
withER := msgs.MakeTernTime(time.Unix(baseSec, 0))
noER := msgs.MakeTernTime(time.Unix(baseSec+1, 0))
if !rules[0].Matches("x", 0, now, withER, now) {
t.Fatal("er_required should match when bitfield is present")
}
if rules[0].Matches("x", 0, now, noER, now) {
t.Fatal("er_required should not match without bitfield")
}
if rules[1].Matches("x", 0, now, withER, now) {
t.Fatal("er_forbidden should not match when bitfield is present")
}
if !rules[1].Matches("x", 0, now, noER, now) {
t.Fatal("er_forbidden should match when bitfield is absent")
}
}
func TestRulePredicate_InfThresholdNeverFires(t *testing.T) {
const j = `[
{"name":"disabled","tag":"NOOP","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":"inf","mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null}
]`
rules := mustLoad(t, j)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
ancient := msgs.MakeTernTime(time.Unix(0, 0))
if rules[0].Matches("x", 0, ancient, ancient, now) {
t.Fatal("atime_days=inf rule should never match")
}
}
func TestMaybeMatchesByCreationTime_RejectsTooYoung(t *testing.T) {
rules := mustLoad(t, oneRuleJSON)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
young := now - msgs.TernTime(10*24*time.Hour)
if rules[0].MaybeMatchesByCreationTime("/test/foo/x/bar/file.dat", young, now) {
t.Fatal("creation-time prefilter should reject when creationTime is too young for the rule")
}
}
func TestMaybeMatchesByCreationTime_AcceptsOldEnough(t *testing.T) {
rules := mustLoad(t, oneRuleJSON)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
old := now - msgs.TernTime(60*24*time.Hour)
if !rules[0].MaybeMatchesByCreationTime("/test/foo/x/bar/file.dat", old, now) {
t.Fatal("creation-time prefilter should keep the candidate when creationTime is old enough")
}
}
func TestMaybeMatchesByCreationTime_RegexStillFilters(t *testing.T) {
rules := mustLoad(t, oneRuleJSON)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
old := now - msgs.TernTime(60*24*time.Hour)
if rules[0].MaybeMatchesByCreationTime("/etc/passwd", old, now) {
t.Fatal("creation-time prefilter should still apply the include regex")
}
}
func TestAnyMaybeMatches_OneOldEnoughIsEnough(t *testing.T) {
const j = `[
{"name":"slow","tag":"NOOP","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":365,"mtime_days":365,"size_bytes":0,"extended_retention_bitfield_match":null},
{"name":"fast","tag":"NOOP","include_path_match":[".*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":1,"mtime_days":1,"size_bytes":0,"extended_retention_bitfield_match":null}
]`
rules := mustLoad(t, j)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
young := now - msgs.TernTime(7*24*time.Hour)
if !AnyMaybeMatches(rules, "/x", young, now) {
t.Fatal("AnyMaybeMatches should be true when at least one rule is satisfiable")
}
veryYoung := now - msgs.TernTime(1*time.Hour)
if AnyMaybeMatches(rules, "/x", veryYoung, now) {
t.Fatal("AnyMaybeMatches should be false when no rule can possibly fire")
}
}
func TestFirstMatch_FirstWins(t *testing.T) {
const j = `[
{"name":"a","tag":"DELETE","include_path_match":["^/a/.*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null},
{"name":"b","tag":"COMPRESS","include_path_match":["^/a/b/.*"],"exclude_path_match":[],"file_suffix_match":[".*"],"atime_days":0,"mtime_days":0,"size_bytes":0,"extended_retention_bitfield_match":null}
]`
rules := mustLoad(t, j)
now := msgs.MakeTernTime(time.Unix(1_700_000_000, 0))
got := FirstMatch(rules, "/a/b/x.dat", 0, now, now, now)
if got == nil || got.Name != "a" {
t.Fatalf("want rule a, got %+v", got)
}
}
// listFinal returns sorted dir entries excluding *.tmp.
func listFinal(t *testing.T, dir string) []string {
t.Helper()
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatalf("ReadDir: %v", err)
}
var names []string
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".tmp") {
continue
}
names = append(names, e.Name())
}
sort.Strings(names)
return names
}
func TestBatchWriter_RotatesByRows_LocalFs(t *testing.T) {
dir := t.TempDir()
bw, err := newBatchWriters(dir, 3, time.Hour, false, []string{"DELETE"})
if err != nil {
t.Fatalf("newBatchWriters: %v", err)
}
for i := 0; i < 7; i++ {
if err := bw.AppendRow("DELETE", 0x2a, "row-x"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
}
if err := bw.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
got := listFinal(t, dir)
want := []string{
"DELETE-2a-000000.tsv",
"DELETE-2a-000001.tsv",
"DELETE-2a-000002.tsv",
}
if !equalStringSlice(got, want) {
t.Fatalf("listFinal = %v\nwant %v", got, want)
}
for _, name := range got {
body, err := os.ReadFile(filepath.Join(dir, name))
if err != nil {
t.Fatalf("ReadFile %s: %v", name, err)
}
if len(body) == 0 || body[len(body)-1] != '\n' {
t.Fatalf("file %s does not end with newline (body=%q)", name, body)
}
}
}
func TestBatchWriter_RotatesByAge_LocalFs(t *testing.T) {
dir := t.TempDir()
bw, err := newBatchWriters(dir, 1_000_000, 10*time.Millisecond, false, []string{"DELETE"})
if err != nil {
t.Fatalf("newBatchWriters: %v", err)
}
if err := bw.AppendRow("DELETE", 1, "row1"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
time.Sleep(20 * time.Millisecond)
if err := bw.AppendRow("DELETE", 1, "row2"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
got := listFinal(t, dir)
want := []string{
"DELETE-01-000000.tsv",
"DELETE-01-000001.tsv",
}
if !equalStringSlice(got, want) {
t.Fatalf("listFinal = %v\nwant %v", got, want)
}
}
func TestBatchWriter_PerTagShardIndependent_LocalFs(t *testing.T) {
dir := t.TempDir()
bw, err := newBatchWriters(dir, 1_000_000, time.Hour, false, []string{"DELETE", "COMPRESS"})
if err != nil {
t.Fatalf("newBatchWriters: %v", err)
}
if err := bw.AppendRow("DELETE", 0, "a"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.AppendRow("COMPRESS", 0, "b"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.AppendRow("DELETE", 1, "c"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
got := listFinal(t, dir)
want := []string{
"COMPRESS-00-000000.tsv",
"DELETE-00-000000.tsv",
"DELETE-01-000000.tsv",
}
if !equalStringSlice(got, want) {
t.Fatalf("listFinal = %v\nwant %v", got, want)
}
}
func TestBatchWriter_LocalFsHidesTmp(t *testing.T) {
dir := t.TempDir()
bw, err := newBatchWriters(dir, 1_000_000, time.Hour, false, []string{"DELETE"})
if err != nil {
t.Fatalf("newBatchWriters: %v", err)
}
if err := bw.AppendRow("DELETE", 0, "a"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
// AppendRow is async; wait for the .tmp to appear.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if hasTmpFile(t, dir) {
break
}
time.Sleep(5 * time.Millisecond)
}
if names := listFinal(t, dir); len(names) != 0 {
t.Fatalf("expected no final batches while open, got %v", names)
}
if !hasTmpFile(t, dir) {
entries, _ := os.ReadDir(dir)
t.Fatalf("expected a .tmp file while open, dir=%v", entries)
}
if err := bw.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
got := listFinal(t, dir)
want := []string{"DELETE-00-000000.tsv"}
if !equalStringSlice(got, want) {
t.Fatalf("after close: %v want %v", got, want)
}
}
func hasTmpFile(t *testing.T, dir string) bool {
t.Helper()
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatalf("ReadDir: %v", err)
}
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".tmp") {
return true
}
}
return false
}
func TestBatchWriter_TernfsTransient_NoTmpSuffix(t *testing.T) {
dir := t.TempDir()
bw, err := newBatchWriters(dir, 1, time.Hour, true, []string{"DELETE"})
if err != nil {
t.Fatalf("newBatchWriters: %v", err)
}
if err := bw.AppendRow("DELETE", 0, "a"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.AppendRow("DELETE", 0, "b"); err != nil {
t.Fatalf("AppendRow: %v", err)
}
if err := bw.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatalf("ReadDir: %v", err)
}
for _, e := range entries {
if strings.HasSuffix(e.Name(), ".tmp") {
t.Errorf("ternfs-transient mode should not create .tmp files, found %q", e.Name())
}
}
}
func equalStringSlice(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
+45
View File
@@ -2084,6 +2084,51 @@ func main() {
run: resolveSamplePathsRun,
}
tagFilesCmd := flag.NewFlagSet("tag-files", flag.ExitOnError)
tagFilesRules := tagFilesCmd.String("rules", "", "Path to tag-rules.json.")
var tagFilesRoots flags.StringArrayFlags
tagFilesCmd.Var(&tagFilesRoots, "root", "TernFS root path to walk. May be repeated; all roots share one worker pool.")
tagFilesOutput := tagFilesCmd.String("output", "", "Directory for per-(tag,shard) batch TSVs. Required unless -dry-run.")
tagFilesOnTernFS := tagFilesCmd.Bool("output-on-ternfs", false, "Output dir is on TernFS: rely on transient-file semantics (open files invisible until closed) instead of .tmp rename.")
tagFilesRotationRows := tagFilesCmd.Int("rotation-rows", 100000, "Roll a batch when it reaches this row count.")
tagFilesRotationInterval := tagFilesCmd.Duration("rotation-interval", 10*time.Minute, "Roll a batch when it has been open this long.")
tagFilesWorkersPerShard := tagFilesCmd.Int("workers-per-shard", 20, "Parwalk workers per shard.")
tagFilesDryRun := tagFilesCmd.Bool("dry-run", false, "Match and count, but do not write batch TSVs.")
tagFilesCreationTimePreskip := tagFilesCmd.Bool("creation-time-preskip", false, "Use the directory edge's creation time to short-circuit the stat call when no rule could possibly match. atime/mtime are >= creationTime, so if the rule's age window excludes creationTime it can't fire. Skips the stat for ~most files on a young tree.")
tagFilesRun := func() {
if *tagFilesRules == "" {
fmt.Fprintln(os.Stderr, "tag-files: -rules is required")
os.Exit(2)
}
if len(tagFilesRoots) == 0 {
fmt.Fprintln(os.Stderr, "tag-files: at least one -root is required")
os.Exit(2)
}
if *tagFilesOutput == "" && !*tagFilesDryRun {
fmt.Fprintln(os.Stderr, "tag-files: -output is required (use -dry-run to skip writing batches)")
os.Exit(2)
}
err := runTagFiles(l, getClient(), &tagFilesParams{
rulesPath: *tagFilesRules,
roots: []string(tagFilesRoots),
outputDir: *tagFilesOutput,
outputOnTernFS: *tagFilesOnTernFS,
rotationRows: *tagFilesRotationRows,
rotationInterval: *tagFilesRotationInterval,
workersPerShard: *tagFilesWorkersPerShard,
dryRun: *tagFilesDryRun,
creationTimePreskip: *tagFilesCreationTimePreskip,
})
if err != nil {
fmt.Fprintf(os.Stderr, "tag-files: %v\n", err)
os.Exit(2)
}
}
commands["tag-files"] = commandSpec{
flags: tagFilesCmd,
run: tagFilesRun,
}
flag.Parse()
if *mtu != "" {