Files
dolt/chunks/chunk_serializer.go
T
Chris Masone 1316d19462 Enable clients/server to address multiple DataStores
Add DataStore.Factory and ChunkStore.Factory so that client programs
that need to create multiple namespaced {Chunk,Data}Stores of the kind
indicated by command line flags have a convenient way to do so. The
details of how this is implemented are mostly contained in the various
ChunkStore.Factory implementations:

1) MemoryStore, TestStore - no change
2) LevelDBStore - the namespace is used as a subdirectory of the path
                  provided by the user
3) HTTPStore - the namespace is used as a path prefix for the endpoints
               supported by DataStoreServer
4) DynamoStore - the namespace is used as a prefix on all keys

This change also required that DataStoreServer be able to handle URLs
of the form "http://server:port/namespace/endpoint", e.g.
"http://localhost:8000/mydatastore/getRefs". It currently still handles
the non-namespaced endpoints as well.

In order to make code from DataStoreServer more re-usable in other
contexts, the functions that handle calls to the server's various
endpoints are now standalone and live in datas/handlers.go.
DataStoreServer just contains logic for dispatch and server lifetime
management.
2016-01-26 13:58:57 -08:00

116 lines
2.3 KiB
Go

package chunks
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"io"
"sync"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
)
/*
Chunk Serialization:
Chunk 0
Chunk 1
..
Chunk N
Chunk:
Ref // 20-byte sha1 hash
Len // 4-byte int
Data // len(Data) == Len
*/
// NewSerializer creates a serializer which is a ChunkSink. Put() chunks will be serialized to |writer|. Close() must be called when no more chunks will be serialized.
func NewSerializer(writer io.Writer) ChunkSink {
s := &serializer{
writer,
make(chan Chunk, 64),
make(chan struct{}),
}
go func() {
for chunk := range s.chs {
d.Chk.NotNil(chunk.Data)
digest := chunk.Ref().Digest()
n, err := io.Copy(s.writer, bytes.NewReader(digest[:]))
d.Chk.NoError(err)
d.Chk.Equal(int64(sha1.Size), n)
// Because of chunking at higher levels, no chunk should never be more than 4GB
chunkSize := uint32(len(chunk.Data()))
err = binary.Write(s.writer, binary.LittleEndian, chunkSize)
d.Chk.NoError(err)
n, err = io.Copy(s.writer, bytes.NewReader(chunk.Data()))
d.Chk.NoError(err)
d.Chk.Equal(uint32(n), chunkSize)
}
s.done <- struct{}{}
}()
return s
}
type serializer struct {
writer io.Writer
chs chan Chunk
done chan struct{}
}
func (sz *serializer) Put(c Chunk) {
sz.chs <- c
}
func (sz *serializer) Close() error {
close(sz.chs)
<-sz.done
return nil
}
// Deserialize reads off of |reader| until EOF, sending chunks to |cs|. If |rateLimit| is non-nill, concurrency will be limited to the available capacity of the channel.
func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
wg := sync.WaitGroup{}
for {
digest := ref.Sha1Digest{}
n, err := io.ReadFull(reader, digest[:])
if err == io.EOF {
break
}
d.Chk.NoError(err)
d.Chk.Equal(int(sha1.Size), n)
r := ref.New(digest)
chunkSize := uint32(0)
err = binary.Read(reader, binary.LittleEndian, &chunkSize)
d.Chk.NoError(err)
w := NewChunkWriter()
n2, err := io.CopyN(w, reader, int64(chunkSize))
d.Chk.NoError(err)
d.Chk.Equal(int64(chunkSize), n2)
c := w.Chunk()
d.Chk.Equal(r, c.Ref())
wg.Add(1)
if rateLimit != nil {
rateLimit <- struct{}{}
}
go func() {
cs.Put(c)
wg.Done()
if rateLimit != nil {
<-rateLimit
}
}()
}
wg.Wait()
}