terncli: estimate-file-age

This commit is contained in:
Miroslav Crnic
2025-07-23 10:43:07 +00:00
parent d5c40f7e74
commit 8db4389872
2 changed files with 183 additions and 19 deletions

View File

@@ -140,6 +140,26 @@ func prepareSpanInitiateReq(
offset uint64,
sizeWithZeros uint32,
data *[]byte,
) *msgs.AddSpanInitiateReq {
return prepareSpanInitiateReqWithParams(
blacklist,
ComputeSpanParameters(spanPolicies, blockPolicies, stripePolicy, uint32(len(*data))),
id,
cookie,
offset,
sizeWithZeros,
data,
)
}
func prepareSpanInitiateReqWithParams(
blacklist []msgs.BlacklistEntry,
spanParameters *SpanParameters,
id msgs.InodeId,
cookie [8]byte,
offset uint64,
sizeWithZeros uint32,
data *[]byte,
) *msgs.AddSpanInitiateReq {
if int(sizeWithZeros) < len(*data) {
panic(fmt.Errorf("sizeWithZeros=%v < len(data)=%v", sizeWithZeros, len(*data)))
@@ -147,8 +167,6 @@ func prepareSpanInitiateReq(
crc := crc32c.Sum(0, *data)
crc = crc32c.ZeroExtend(crc, int(sizeWithZeros)-len(*data))
spanParameters := ComputeSpanParameters(spanPolicies, blockPolicies, stripePolicy, uint32(len(*data)))
S := int(spanParameters.Stripes)
D := spanParameters.Parity.DataBlocks()
P := spanParameters.Parity.ParityBlocks()
@@ -274,6 +292,34 @@ func (c *Client) CreateSpan(
// The contents of this pointer might be modified by this function (we might have to extend the
// buffer), the intention is that if you're using a buf pool to get this you can put it back after.
data *[]byte,
) ([]msgs.BlockId, error) {
return c.CreateSpanWithParams(
log,
blacklist,
ComputeSpanParameters(spanPolicies, blockPolicies, stripePolicy, uint32(len(*data))),
id,
reference,
cookie,
offset,
spanSize,
data,
)
}
func (c *Client) CreateSpanWithParams(
log *log.Logger,
blacklist []msgs.BlacklistEntry,
spanParams *SpanParameters,
id msgs.InodeId,
reference msgs.InodeId,
cookie [8]byte,
offset uint64,
// The span size might be greater than `len(*data)`, in which case we have trailing
// zeros (this allows us to cheaply stored zero sections).
spanSize uint32,
// The contents of this pointer might be modified by this function (we might have to extend the
// buffer), the intention is that if you're using a buf pool to get this you can put it back after.
data *[]byte,
) ([]msgs.BlockId, error) {
if reference == msgs.NULL_INODE_ID {
reference = id
@@ -288,7 +334,7 @@ func (c *Client) CreateSpan(
}
// initiate span add
bareInitiateReq := prepareSpanInitiateReq(append([]msgs.BlacklistEntry{}, blacklist...), spanPolicies, blockPolicies, stripePolicy, id, cookie, offset, spanSize, data)
bareInitiateReq := prepareSpanInitiateReqWithParams(append([]msgs.BlacklistEntry{}, blacklist...), spanParams, id, cookie, offset, spanSize, data)
{
expectedSize := float64(spanSize) * float64(bareInitiateReq.Parity.Blocks()) / float64(bareInitiateReq.Parity.DataBlocks())
actualSize := bareInitiateReq.CellSize * uint32(bareInitiateReq.Stripes) * uint32(bareInitiateReq.Parity.Blocks())
@@ -298,47 +344,113 @@ func (c *Client) CreateSpan(
Req: *bareInitiateReq,
Reference: reference,
}
initiateResp := &msgs.AddSpanInitiateWithReferenceResp{}
return c.writeSpanBlocks(log, id, initiateReq, initiateResp, &initiateReq.Req, &initiateResp.Resp, cookie, offset, spanSize, data)
}
func (c *Client) CreateSpanWithParamsAtLocation(
log *log.Logger,
blacklist []msgs.BlacklistEntry,
location msgs.Location,
spanParams *SpanParameters,
id msgs.InodeId,
reference msgs.InodeId,
cookie [8]byte,
offset uint64,
// The span size might be greater than `len(*data)`, in which case we have trailing
// zeros (this allows us to cheaply stored zero sections).
spanSize uint32,
// The contents of this pointer might be modified by this function (we might have to extend the
// buffer), the intention is that if you're using a buf pool to get this you can put it back after.
data *[]byte,
) ([]msgs.BlockId, error) {
if reference == msgs.NULL_INODE_ID {
reference = id
}
log.Debug("writing span spanSize=%v len=%v", spanSize, len(*data))
if len(*data) < 256 {
if err := c.createInlineSpan(log, id, cookie, offset, spanSize, *data); err != nil {
return nil, err
}
return []msgs.BlockId{}, nil
}
// initiate span add
bareInitiateReq := prepareSpanInitiateReqWithParams(append([]msgs.BlacklistEntry{}, blacklist...), spanParams, id, cookie, offset, spanSize, data)
{
expectedSize := float64(spanSize) * float64(bareInitiateReq.Parity.Blocks()) / float64(bareInitiateReq.Parity.DataBlocks())
actualSize := bareInitiateReq.CellSize * uint32(bareInitiateReq.Stripes) * uint32(bareInitiateReq.Parity.Blocks())
log.Debug("span logical size: %v, span physical size: %v, waste: %v%%", spanSize, actualSize, 100.0*(float64(actualSize)-expectedSize)/float64(actualSize))
}
initiateReq := &msgs.AddSpanAtLocationInitiateReq{
LocationId: location,
Req: msgs.AddSpanInitiateWithReferenceReq{
Req: *bareInitiateReq,
Reference: reference,
},
}
initiateResp := &msgs.AddSpanAtLocationInitiateResp{}
return c.writeSpanBlocks(log, id, initiateReq, initiateResp, &initiateReq.Req.Req, &initiateResp.Resp, cookie, offset, spanSize, data)
}
func (c *Client) writeSpanBlocks(
log *log.Logger,
id msgs.InodeId,
initiateReqWrapper msgs.ShardRequest,
initiateRespWrapper msgs.ShardResponse,
addSpanInitiateReq *msgs.AddSpanInitiateReq,
addSpanInitiateResp *msgs.AddSpanInitiateResp,
cookie [8]byte,
offset uint64,
// The span size might be greater than `len(*data)`, in which case we have trailing
// zeros (this allows us to cheaply stored zero sections).
spanSize uint32,
// The contents of this pointer might be modified by this function (we might have to extend the
// buffer), the intention is that if you're using a buf pool to get this you can put it back after.
data *[]byte,
) ([]msgs.BlockId, error) {
maxAttempts := 5 // 4 = number of block services that can be down at once in the tests right now
var err error
var blocks []msgs.BlockId
for attempt := 0; ; attempt++ {
log.Debug("span writing attempt %v", attempt+1)
initiateResp := msgs.AddSpanInitiateWithReferenceResp{}
if err = c.ShardRequest(log, id.Shard(), initiateReq, &initiateResp); err != nil {
if err = c.ShardRequest(log, id.Shard(), initiateReqWrapper, initiateRespWrapper); err != nil {
return nil, err
}
blocks = []msgs.BlockId{}
for _, block := range initiateResp.Resp.Blocks {
for _, block := range addSpanInitiateResp.Blocks {
blocks = append(blocks, block.BlockId)
}
certifyReq := msgs.AddSpanCertifyReq{
FileId: id,
Cookie: cookie,
ByteOffset: offset,
Proofs: make([]msgs.BlockProof, len(initiateResp.Resp.Blocks)),
Proofs: make([]msgs.BlockProof, len(addSpanInitiateResp.Blocks)),
}
writeCh := make(chan *blockCompletion, len(initiateResp.Resp.Blocks))
for i, block := range initiateResp.Resp.Blocks {
blockCrc, blockReader := mkBlockReader(&initiateReq.Req, *data, i)
writeCh := make(chan *blockCompletion, len(addSpanInitiateResp.Blocks))
for i, block := range addSpanInitiateResp.Blocks {
blockCrc, blockReader := mkBlockReader(addSpanInitiateReq, *data, i)
// Start writing block asynchronously
if startErr := c.StartWriteBlock(log, &block, blockReader, initiateReq.Req.CellSize*uint32(initiateReq.Req.Stripes), blockCrc, i, writeCh); startErr != nil {
initiateReq.Req.Blacklist = append(initiateReq.Req.Blacklist, msgs.BlacklistEntry{FailureDomain: block.BlockServiceFailureDomain})
if startErr := c.StartWriteBlock(log, &block, blockReader, addSpanInitiateReq.CellSize*uint32(addSpanInitiateReq.Stripes), blockCrc, i, writeCh); startErr != nil {
addSpanInitiateReq.Blacklist = append(addSpanInitiateReq.Blacklist, msgs.BlacklistEntry{FailureDomain: block.BlockServiceFailureDomain})
log.Info("failed to start write block to %+v: %v, might retry without failure domain %q", block, startErr, string(block.BlockServiceFailureDomain.Name[:]))
goto FailedAttempt
}
}
for range initiateResp.Resp.Blocks {
for range addSpanInitiateResp.Blocks {
result := <-writeCh
blockIdx := result.Extra.(int)
if result.Error != nil {
block := initiateResp.Resp.Blocks[blockIdx]
initiateReq.Req.Blacklist = append(initiateReq.Req.Blacklist, msgs.BlacklistEntry{FailureDomain: block.BlockServiceFailureDomain})
block := addSpanInitiateResp.Blocks[blockIdx]
addSpanInitiateReq.Blacklist = append(addSpanInitiateReq.Blacklist, msgs.BlacklistEntry{FailureDomain: block.BlockServiceFailureDomain})
log.Info("failed to write block to %+v: %v, might retry without failure domain %q", block, result.Error, string(block.BlockServiceFailureDomain.Name[:]))
err = result.Error
goto FailedAttempt
}
certifyReq.Proofs[blockIdx].BlockId = initiateResp.Resp.Blocks[blockIdx].BlockId
certifyReq.Proofs[blockIdx].BlockId = addSpanInitiateResp.Blocks[blockIdx].BlockId
certifyReq.Proofs[blockIdx].Proof = result.Resp.(*msgs.WriteBlockResp).Proof
}
if err = c.ShardRequest(log, id.Shard(), &certifyReq, &msgs.AddSpanCertifyResp{}); err != nil {
@@ -349,7 +461,7 @@ func (c *Client) CreateSpan(
FailedAttempt:
if attempt >= maxAttempts { // too many failures
break
return nil, err
}
err = nil
// create temp file, move the bad span there, then we can restart
@@ -370,8 +482,7 @@ func (c *Client) CreateSpan(
return nil, err
}
}
return blocks, err
return blocks, nil
}
func (c *Client) WriteFile(

View File

@@ -14,6 +14,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"os"
"path"
@@ -1145,6 +1146,58 @@ func main() {
run: fileLocationsRun,
}
estimateFileAgeCmd := flag.NewFlagSet("estimate-file-age", flag.ExitOnError)
estimateFileAgeId := estimateFileAgeCmd.Uint64("id", 0, "ID of the file to estimage age for")
estimateFileAgeRun := func() {
id := msgs.InodeId(*estimateFileAgeId)
c := getClient()
fileSpansReq := msgs.FileSpansReq{
FileId: id,
ByteOffset: 0,
}
fileSpansResp := msgs.FileSpansResp{}
var oldestBlock uint64 = math.MaxUint64
for {
if err := c.ShardRequest(l, id.Shard(), &fileSpansReq, &fileSpansResp); err != nil {
panic(err)
}
for spanIx := range fileSpansResp.Spans {
span := &fileSpansResp.Spans[spanIx]
if span.Header.IsInline {
continue
}
locBody := span.Body.(*msgs.FetchedLocations)
for _, loc := range locBody.Locations {
for _, block := range loc.Blocks {
oldestBlock = min(oldestBlock, uint64(block.BlockId))
}
}
}
if fileSpansResp.NextOffset == 0 {
break
}
fileSpansReq.ByteOffset = fileSpansResp.NextOffset
}
if oldestBlock == math.MaxUint64 {
statFileReq := msgs.StatFileReq{Id: id}
statFileResp := msgs.StatFileResp{}
if err := c.ShardRequest(l, id.Shard(), &statFileReq, &statFileResp); err != nil {
panic(err)
}
mtime := statFileResp.Mtime
l.Info("File %v has no blocks to use in file age estimation, returning mtime %v", id, msgs.TernTime(mtime))
return
} else {
l.Info("Estimated file age %v, %v", id, msgs.TernTime(oldestBlock))
}
}
commands["estimate-file-age"] = commandSpec{
flags: estimateFileAgeCmd,
run: estimateFileAgeRun,
}
findCmd := flag.NewFlagSet("find", flag.ExitOnError)
findDir := findCmd.String("path", "/", "")
findName := findCmd.String("name", "", "Regex to match the name against.")