Files
dolt/go/libraries/utils/iohelp/read_with_stats.go
T
Dhruv Sringari 009bb85d8d fix bats
2022-02-17 14:22:46 -08:00

93 lines
2.2 KiB
Go

// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package iohelp
import (
"io"
"sync/atomic"
"time"
)
const updateFrequency = 500 * time.Millisecond
type ReadStats struct {
Read uint64
Elapsed time.Duration
Percent float64
}
type ReaderWithStats struct {
read uint64
size int64
rd io.Reader
start time.Time
closeCh chan struct{}
}
func NewReaderWithStats(rd io.Reader, size int64) *ReaderWithStats {
return &ReaderWithStats{
rd: rd,
size: size,
closeCh: make(chan struct{}),
}
}
func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
rws.start = time.Now()
go func() {
timer := time.NewTimer(updateFrequency)
for {
select {
case <-rws.closeCh:
return
case <-timer.C:
read := atomic.LoadUint64(&rws.read)
elapsed := time.Since(rws.start)
var percent float64
if rws.size != 0 {
percent = float64(read) / float64(rws.size)
}
updateFunc(ReadStats{Read: read, Elapsed: elapsed, Percent: percent})
timer.Reset(updateFrequency)
}
}
}()
}
// Stop "closes" ReaderWithStats. Occasionally, we might pass this ReaderWithStats as the body of
// a http.Request. Since http.Request will close the body if it is an io.Closer, we can't have ReaderWithStats conform
// to io.Closer. We want full control over the Start and Stop of ReaderWithStats.
func (rws *ReaderWithStats) Stop() error {
close(rws.closeCh)
if closer, ok := rws.rd.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (rws *ReaderWithStats) Read(p []byte) (int, error) {
n, err := rws.rd.Read(p)
atomic.AddUint64(&rws.read, uint64(n))
return n, err
}
func (rws *ReaderWithStats) Size() int64 {
return rws.size
}