Remove dead iohelp code

This commit is contained in:
Neil Macneale IV
2025-12-15 20:56:07 +00:00
parent bb6b7e0a7e
commit f637baa3af
2 changed files with 0 additions and 437 deletions
-229
View File
@@ -16,12 +16,7 @@ package iohelp
import (
"bufio"
"encoding/binary"
"errors"
"io"
"sync"
"sync/atomic"
"time"
)
// ErrPreservingReader is a utility class that provides methods to read from a reader where errors can be ignored and
@@ -53,22 +48,6 @@ func (r *ErrPreservingReader) Read(p []byte) (int, error) {
return n, r.Err
}
// Read
func (r *ErrPreservingReader) ReadUint32(order binary.ByteOrder) (uint32, error) {
if r.Err != nil {
return 0, r.Err
}
bytes, err := ReadNBytes(r, 4)
if err != nil {
r.Err = err
return 0, r.Err
}
return order.Uint32(bytes), nil
}
// ReadNBytes will read n bytes from the given reader and return a new slice containing the data. ReadNBytes will always
// return n bytes, or it will return no data and an error (So if you request 100 bytes and there are only 99 left before
// the reader returns io.EOF you won't receive any of the data as this is considered an error as it can't read 100 bytes).
@@ -90,53 +69,6 @@ func ReadNBytes(r io.Reader, n int) ([]byte, error) {
return bytes, nil
}
// ReadLineNoBuf will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n).
// The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of
// the stream has been reached, and any errors that have been encountered (other than eof which is treated as the end of
// the final line). This isn't efficient, so you shouldn't do this if you can use a buffered reader and the
// iohelp.ReadLine method.
func ReadLineNoBuf(r io.Reader) (string, bool, error) {
var err error
var dest []byte
var oneByte [1]byte
for {
var n int
n, err = r.Read(oneByte[:])
if err != nil && err != io.EOF {
return "", true, err
}
if n == 1 {
c := oneByte[0]
if c == '\n' {
break
}
dest = append(dest, c)
}
if err == io.EOF {
break
}
}
crlfCount := 0
lineLen := len(dest)
for i := lineLen - 1; i >= 0; i-- {
ch := dest[i]
if ch == '\r' || ch == '\n' {
crlfCount++
} else {
break
}
}
return string(dest[:lineLen-crlfCount]), err != nil, nil
}
// ReadLine will read a line from an unbuffered io.Reader where it considers lines to be separated by newlines (\n).
// The data returned will be a string with \r\n characters removed from the end, a bool which says whether the end of
@@ -165,165 +97,4 @@ func ReadLine(br *bufio.Reader) (line string, done bool, err error) {
return line[:lineLen-crlfCount], err != nil, nil
}
/*func ReadLineFromJSON(br *bufio.Reader) (line map[string]interface{}, done bool, err error) {
line, err = br.ReadMap()
}*/
// ErrThroughput is the error that is returned by ReadWithMinThroughput if the throughput drops below the threshold
var ErrThroughput = errors.New("throughput below minimum allowable")
// MinThroughputCheckParams defines the miminimum throughput, how often it should be checked, and what the time window
// size is
type MinThroughputCheckParams struct {
// MinBytesPerSec is the minimum throughput. If ReadWithMinThroughput drops below this value for the most recent
// time window then it will fail.
MinBytesPerSec int64
// CheckInterval how often should the throughput be checked
CheckInterval time.Duration
// NumIntervals defines the number of intervals that should be considered when looking at the throughput.
// NumIntervals*CheckInterval defines the window size
NumIntervals int
}
type datapoint struct {
ts time.Time
val int64
}
type datapoints []datapoint
// getThroughput returns the throughput for the most recent time window
func (initialDps datapoints) getThroughput(duration time.Duration) (datapoints, int64) {
dps := initialDps
now := time.Now()
cutoff := now.Add(-duration)
// restrict datapoints to datapoints within the time window
for len(dps) > 1 {
if cutoff.After(dps[0].ts) {
dps = dps[1:]
} else {
break
}
}
if len(dps) <= 1 {
return dps, 0
}
elapsed := now.Sub(dps[0].ts)
bytesRead := dps[len(dps)-1].val - dps[0].val
return dps, int64(float64(bytesRead) / elapsed.Seconds())
}
// safeClose closes the provided closer recovering from any errors.
func safeClose(c io.Closer) {
defer func() {
recover()
}()
c.Close()
}
type readResults struct {
bytes []byte
err error
}
// ReadNWithProgress reads n bytes from reader r. As it reads it atomically updates the value pointed at by
// bytesRead. In order to cancel this read the reader should be closed.
func ReadNWithProgress(r io.Reader, n int64, bytesRead *int64) ([]byte, error) {
var totalRead int64
bytes := make([]byte, n)
var err error
for totalRead < n && err == nil {
var read int
read, err = r.Read(bytes[totalRead:])
if err != nil && err != io.EOF {
break
}
totalRead += int64(read)
if bytesRead != nil {
atomic.StoreInt64(bytesRead, totalRead)
}
if err == io.EOF {
err = nil
if totalRead != n {
err = io.ErrUnexpectedEOF
}
}
}
return bytes[:totalRead], err
}
// ReadWithMinThroughput reads n bytes from reader r erroring if the throughput ever drops below the threshold
// defined by MinThroughputCheckParams.
func ReadWithMinThroughput(r io.ReadCloser, n int64, mtcParams MinThroughputCheckParams) ([]byte, error) {
resChan := make(chan readResults, 1)
defer close(resChan)
wg := &sync.WaitGroup{}
var bytesReadSync int64
wg.Add(1)
go func() {
defer wg.Done()
defer func() { recover() }()
bytes, err := ReadNWithProgress(r, n, &bytesReadSync)
res := readResults{bytes, err}
resChan <- res
}()
checkDuration := mtcParams.CheckInterval * time.Duration(mtcParams.NumIntervals)
ticker := time.NewTicker(mtcParams.CheckInterval)
defer ticker.Stop()
var points datapoints
var throughputErr bool
for !throughputErr {
select {
case res := <-resChan:
return res.bytes, res.err
case <-ticker.C:
}
read := atomic.LoadInt64(&bytesReadSync)
points = append(points, datapoint{time.Now(), read})
if len(points) >= mtcParams.NumIntervals {
var bps int64
points, bps = points.getThroughput(checkDuration)
if bps < mtcParams.MinBytesPerSec {
safeClose(r)
throughputErr = true
}
}
}
wg.Wait()
select {
case res := <-resChan:
err := res.err
if throughputErr {
err = ErrThroughput
}
return res.bytes, err
default:
panic("bug. Should never reach here.")
}
}
-208
View File
@@ -17,15 +17,9 @@ package iohelp
import (
"bufio"
"bytes"
"errors"
"io"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/utils/osutil"
"github.com/dolthub/dolt/go/libraries/utils/test"
)
@@ -77,10 +71,8 @@ var rlTests = []struct {
func TestReadReadLineFunctions(t *testing.T) {
for _, test := range rlTests {
bufferedTest := getTestReadLineClosure(test.inputStr)
unbufferedTest := getTestReadLineNoBufClosure(test.inputStr)
testReadLineFunctions(t, "buffered", test.expectedLines, bufferedTest)
testReadLineFunctions(t, "unbuffered", test.expectedLines, unbufferedTest)
}
}
@@ -93,13 +85,6 @@ func getTestReadLineClosure(inputStr string) func() (string, bool, error) {
}
}
func getTestReadLineNoBufClosure(inputStr string) func() (string, bool, error) {
r := bytes.NewReader([]byte(inputStr))
return func() (string, bool, error) {
return ReadLineNoBuf(r)
}
}
func testReadLineFunctions(t *testing.T, testType string, expected []string, rlFunc func() (string, bool, error)) {
var isDone bool
@@ -120,197 +105,4 @@ func testReadLineFunctions(t *testing.T, testType string, expected []string, rlF
}
}
var ErrClosed = errors.New("")
type FixedRateDataGenerator struct {
BytesPerInterval int
Interval time.Duration
lastRead time.Time
closeChan chan struct{}
dataGenerated uint64
}
func NewFixedRateDataGenerator(bytesPerInterval int, interval time.Duration) *FixedRateDataGenerator {
return &FixedRateDataGenerator{
bytesPerInterval,
interval,
time.Now(),
make(chan struct{}),
0,
}
}
func (gen *FixedRateDataGenerator) Read(p []byte) (int, error) {
nextRead := gen.Interval - (time.Now().Sub(gen.lastRead))
select {
case <-gen.closeChan:
return 0, ErrClosed
case <-time.After(nextRead):
gen.dataGenerated += uint64(gen.BytesPerInterval)
gen.lastRead = time.Now()
return min(gen.BytesPerInterval, len(p)), nil
}
}
func (gen *FixedRateDataGenerator) Close() error {
close(gen.closeChan)
return nil
}
type ErroringReader struct {
Err error
}
func (er ErroringReader) Read(p []byte) (int, error) {
return 0, er.Err
}
func (er ErroringReader) Close() error {
return nil
}
type ReaderSizePair struct {
Reader io.ReadCloser
Size int
}
type ReaderCollection struct {
ReadersAndSizes []ReaderSizePair
currIdx int
currReaderRead int
}
func NewReaderCollection(readerSizePair ...ReaderSizePair) *ReaderCollection {
if len(readerSizePair) == 0 {
panic("no readers")
}
for _, rsp := range readerSizePair {
if rsp.Size <= 0 {
panic("invalid size")
}
if rsp.Reader == nil {
panic("invalid reader")
}
}
return &ReaderCollection{readerSizePair, 0, 0}
}
func (rc *ReaderCollection) Read(p []byte) (int, error) {
if rc.currIdx < len(rc.ReadersAndSizes) {
currReader := rc.ReadersAndSizes[rc.currIdx].Reader
currSize := rc.ReadersAndSizes[rc.currIdx].Size
remaining := currSize - rc.currReaderRead
n, err := currReader.Read(p)
if err != nil {
return 0, err
}
if n >= remaining {
n = remaining
rc.currIdx++
rc.currReaderRead = 0
} else {
rc.currReaderRead += n
}
return n, err
}
return 0, io.EOF
}
func (rc *ReaderCollection) Close() error {
for _, rsp := range rc.ReadersAndSizes {
err := rsp.Reader.Close()
if err != nil {
return err
}
}
return nil
}
func TestReadWithMinThroughput(t *testing.T) {
t.Skip("Skipping test in all cases as it is inconsistent on Unix")
if osutil.IsWindows {
t.Skip("Skipping test as it is too inconsistent on Windows and will randomly pass or fail")
}
tests := []struct {
name string
numBytes int64
reader io.ReadCloser
mtcp MinThroughputCheckParams
expErr bool
expThroughErr bool
}{
{
"10MB @ max(100MBps) > 50MBps",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 10 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
false,
},
{
"5MB then error",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{ErroringReader{errors.New("test err")}, 100 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
true,
false,
},
{
"5MB then slow < 50Mbps",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(49*1024, time.Millisecond), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
true,
},
{
"5MB then stops",
10 * 1024 * 1024,
NewReaderCollection(
ReaderSizePair{NewFixedRateDataGenerator(100*1024, time.Millisecond), 5 * 1024 * 1024},
ReaderSizePair{NewFixedRateDataGenerator(0, 100*time.Second), 5 * 1024 * 1024},
),
MinThroughputCheckParams{50 * 1024 * 1024, 5 * time.Millisecond, 10},
false,
true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
data, err := ReadWithMinThroughput(test.reader, test.numBytes, test.mtcp)
if test.expErr || test.expThroughErr {
if test.expThroughErr {
assert.Equal(t, err, ErrThroughput)
} else {
assert.Error(t, err)
assert.NotEqual(t, err, ErrThroughput)
}
} else {
assert.Equal(t, len(data), int(test.numBytes))
assert.NoError(t, err)
}
})
}
}