mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-25 19:50:32 -05:00
Merge pull request #956 from cmasone-attic/issue220
MakeSeekable(): layer io.Seeker on an arbitrary io.Reader
This commit is contained in:
@@ -0,0 +1,89 @@
|
||||
package base
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/attic-labs/noms/d"
|
||||
)
|
||||
|
||||
// ReadSeekCloser unifies io.Reader, io.Seeker and io.Closer
|
||||
type ReadSeekCloser interface {
|
||||
io.ReadSeeker
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// MakeSeekable adds the io.Seeker interface to r. The caller is responsible for calling Close() on the returned object when done reading data.
|
||||
func MakeSeekable(r io.Reader, length int64) ReadSeekCloser {
|
||||
// It might be a nice optimization to buffer small objects in memory, but bytes.Buffer doesn't implement io.Seeker, and bytes.Reader doesn't implement io.Writer.
|
||||
cache, err := ioutil.TempFile("", "seekable-reader-")
|
||||
d.Chk.NoError(err)
|
||||
return &seekableReader{r: r, cache: cache, length: length}
|
||||
}
|
||||
|
||||
type seekableReader struct {
|
||||
r io.Reader
|
||||
cache *os.File
|
||||
|
||||
length, cached, pos int64
|
||||
}
|
||||
|
||||
func (s *seekableReader) Read(b []byte) (n int, err error) {
|
||||
if s.pos < s.cached {
|
||||
// Caller sought backwards, so current position is somewhere in the cached data. Satisfy the Read() from the cache as much as possible. If that doesn't fill b, the caller will see that n < len(b) and try again.
|
||||
n, err = io.ReadAtLeast(s.cache, b, int(s.cached-s.pos))
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
s.pos += int64(n)
|
||||
return
|
||||
}
|
||||
d.Chk.Equal(s.cached, s.pos, "Position is somehow _after_ the cached data!")
|
||||
if n, err = io.ReadFull(s.r, b); err != nil {
|
||||
return
|
||||
}
|
||||
if _, werr := s.cache.Write(b); werr != nil {
|
||||
return 0, werr
|
||||
}
|
||||
s.pos += int64(n)
|
||||
s.cached = s.pos
|
||||
return
|
||||
}
|
||||
|
||||
func (s *seekableReader) Seek(offset int64, whence int) (ret int64, err error) {
|
||||
if offset < 0 {
|
||||
return -1, fmt.Errorf("Cannot seek to negative offset %d", offset)
|
||||
}
|
||||
|
||||
switch whence {
|
||||
default:
|
||||
return -1, fmt.Errorf("whence must be one of 0, 1, or 2; not %d", whence)
|
||||
case 0:
|
||||
ret = offset
|
||||
case 1:
|
||||
ret = s.pos + offset
|
||||
case 2:
|
||||
ret = s.length - offset
|
||||
}
|
||||
if ret < s.cached {
|
||||
if _, err = s.cache.Seek(ret, 0); err != nil {
|
||||
return
|
||||
}
|
||||
} else if ret > s.cached {
|
||||
var n int64
|
||||
if n, err = io.CopyN(s.cache, s.r, ret-s.cached); err != nil {
|
||||
return
|
||||
}
|
||||
s.cached += n
|
||||
d.Chk.Equal(ret, s.cached)
|
||||
}
|
||||
s.pos = ret
|
||||
return
|
||||
}
|
||||
|
||||
func (s *seekableReader) Close() error {
|
||||
defer func() { d.Chk.NoError(s.cache.Close()) }()
|
||||
return os.Remove(s.cache.Name())
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
package base
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
func TestSeekableReaderTestSuite(t *testing.T) {
|
||||
suite.Run(t, &SeekableReaderTestSuite{})
|
||||
}
|
||||
|
||||
type SeekableReaderTestSuite struct {
|
||||
suite.Suite
|
||||
dir string
|
||||
content []byte
|
||||
contentLen int64
|
||||
contentRSC ReadSeekCloser
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) SetupTest() {
|
||||
suite.content = []byte("0123456789")
|
||||
suite.contentLen = int64(len(suite.content))
|
||||
|
||||
var err error
|
||||
suite.dir, err = ioutil.TempDir("", "")
|
||||
suite.NoError(err)
|
||||
cache, err := ioutil.TempFile(suite.dir, "")
|
||||
suite.NoError(err)
|
||||
|
||||
suite.contentRSC = &seekableReader{
|
||||
r: bytes.NewReader(suite.content),
|
||||
cache: cache,
|
||||
length: suite.contentLen,
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TearDownTest() {
|
||||
suite.contentRSC.Close()
|
||||
os.Remove(suite.dir)
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestRead() {
|
||||
suite.readAndExpect(suite.content)
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestSeekFromStart() {
|
||||
offset := suite.contentLen - 2
|
||||
ret, err := suite.contentRSC.Seek(offset, 0)
|
||||
suite.NoError(err)
|
||||
suite.EqualValues(offset, ret)
|
||||
suite.readAndExpect(suite.content[offset:])
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestSeekFromEnd() {
|
||||
// Seek to last two bytes
|
||||
offset := suite.contentLen - 2
|
||||
ret, err := suite.contentRSC.Seek(2, 2)
|
||||
suite.NoError(err)
|
||||
suite.EqualValues(offset, ret)
|
||||
suite.readAndExpect(suite.content[offset:])
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestSeekFromCur() {
|
||||
// Seek to last two bytes
|
||||
offset := suite.contentLen - 2
|
||||
ret, err := suite.contentRSC.Seek(2, 1)
|
||||
suite.NoError(err)
|
||||
ret, err = suite.contentRSC.Seek(offset-2, 1)
|
||||
suite.NoError(err)
|
||||
suite.EqualValues(offset, ret)
|
||||
suite.readAndExpect(suite.content[offset:])
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestReadSeekRead() {
|
||||
suite.readAndExpect(suite.content[:2])
|
||||
suite.contentRSC.Seek(2, 2)
|
||||
suite.readAndExpect(suite.content[suite.contentLen-2:])
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) TestReadSeekBackRead() {
|
||||
suite.readAndExpect(suite.content[:2])
|
||||
suite.contentRSC.Seek(0, 0)
|
||||
suite.readAndExpect(suite.content)
|
||||
}
|
||||
|
||||
func (suite *SeekableReaderTestSuite) readAndExpect(expected []byte) {
|
||||
expectedLen := len(expected)
|
||||
p := make([]byte, expectedLen)
|
||||
n, err := io.ReadFull(suite.contentRSC, p)
|
||||
|
||||
suite.NoError(err)
|
||||
suite.EqualValues(expectedLen, n, "Didn't read all the data")
|
||||
suite.True(bytes.Equal(expected, p), "%s != %s", string(expected), string(p))
|
||||
}
|
||||
Reference in New Issue
Block a user