mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-30 11:31:37 -05:00
d8a2d285e9
This patch is unfortunately large, but it seemed necessary to make all these changes at once to transition away from having an HTTP ChunkStore that could allow for invalid state in the DB. Now, we have a RemoteDataStoreClient that allows for reading and writing of Values, and performs validation on the server side before persisting chunks. The semantics of DataStore are that written values can be read back out immediately, but are not guaranteed to be persistent until after Commit() The semantics are now that Put() blocks until the Chunk is persisted, and the new PutMany() can be used to write a number of Chunks all at once. From a command-line tool point of view, -h and -h-auth still work as expected.
132 lines
2.7 KiB
Go
132 lines
2.7 KiB
Go
package chunks
|
|
|
|
import "github.com/attic-labs/noms/ref"
|
|
|
|
type ReadRequest interface {
|
|
Ref() ref.Ref
|
|
Outstanding() OutstandingRequest
|
|
}
|
|
|
|
func NewGetRequest(r ref.Ref, ch chan Chunk) GetRequest {
|
|
return GetRequest{r, ch}
|
|
}
|
|
|
|
type GetRequest struct {
|
|
r ref.Ref
|
|
ch chan Chunk
|
|
}
|
|
|
|
func NewHasRequest(r ref.Ref, ch chan bool) HasRequest {
|
|
return HasRequest{r, ch}
|
|
}
|
|
|
|
type HasRequest struct {
|
|
r ref.Ref
|
|
ch chan bool
|
|
}
|
|
|
|
func (g GetRequest) Ref() ref.Ref {
|
|
return g.r
|
|
}
|
|
|
|
func (g GetRequest) Outstanding() OutstandingRequest {
|
|
return OutstandingGet(g.ch)
|
|
}
|
|
|
|
func (h HasRequest) Ref() ref.Ref {
|
|
return h.r
|
|
}
|
|
|
|
func (h HasRequest) Outstanding() OutstandingRequest {
|
|
return OutstandingHas(h.ch)
|
|
}
|
|
|
|
type OutstandingRequest interface {
|
|
Satisfy(c Chunk)
|
|
Fail()
|
|
}
|
|
|
|
type OutstandingGet chan Chunk
|
|
type OutstandingHas chan bool
|
|
|
|
func (r OutstandingGet) Satisfy(c Chunk) {
|
|
r <- c
|
|
close(r)
|
|
}
|
|
|
|
func (r OutstandingGet) Fail() {
|
|
r <- EmptyChunk
|
|
close(r)
|
|
}
|
|
|
|
func (h OutstandingHas) Satisfy(c Chunk) {
|
|
h <- true
|
|
close(h)
|
|
}
|
|
|
|
func (h OutstandingHas) Fail() {
|
|
h <- false
|
|
close(h)
|
|
}
|
|
|
|
// ReadBatch represents a set of queued Get/Has requests, each of which are blocking on a receive channel for a response.
|
|
type ReadBatch map[ref.Ref][]OutstandingRequest
|
|
|
|
// GetBatch represents a set of queued Get requests, each of which are blocking on a receive channel for a response.
|
|
type GetBatch map[ref.Ref][]chan Chunk
|
|
type HasBatch map[ref.Ref][]chan bool
|
|
|
|
// Close ensures that callers to Get() and Has() are failed correctly if the corresponding chunk wasn't in the response from the server (i.e. it wasn't found).
|
|
func (rb *ReadBatch) Close() error {
|
|
for _, reqs := range *rb {
|
|
for _, req := range reqs {
|
|
req.Fail()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Put is implemented so that ReadBatch implements the ChunkSink interface.
|
|
func (rb *ReadBatch) Put(c Chunk) {
|
|
for _, or := range (*rb)[c.Ref()] {
|
|
or.Satisfy(c)
|
|
}
|
|
|
|
delete(*rb, c.Ref())
|
|
}
|
|
|
|
// PutMany is implemented so that ReadBatch implements the ChunkSink interface.
|
|
func (rb *ReadBatch) PutMany(chunks []Chunk) (e BackpressureError) {
|
|
for _, c := range chunks {
|
|
rb.Put(c)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Close ensures that callers to Get() must receive nil if the corresponding chunk wasn't in the response from the server (i.e. it wasn't found).
|
|
func (gb *GetBatch) Close() error {
|
|
for _, chs := range *gb {
|
|
for _, ch := range chs {
|
|
ch <- EmptyChunk
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Put is implemented so that GetBatch implements the ChunkSink interface.
|
|
func (gb *GetBatch) Put(c Chunk) {
|
|
for _, ch := range (*gb)[c.Ref()] {
|
|
ch <- c
|
|
}
|
|
|
|
delete(*gb, c.Ref())
|
|
}
|
|
|
|
// PutMany is implemented so that GetBatch implements the ChunkSink interface.
|
|
func (gb *GetBatch) PutMany(chunks []Chunk) (e BackpressureError) {
|
|
for _, c := range chunks {
|
|
gb.Put(c)
|
|
}
|
|
return
|
|
}
|