switch js to use buffer reads

This commit is contained in:
Rafael Weinstein
2015-09-11 14:41:51 -07:00
parent aad09b7ff8
commit 10350dbd7e
3 changed files with 152 additions and 56 deletions
+19 -6
View File
@@ -2,6 +2,7 @@ package chunks
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"io"
@@ -45,6 +46,7 @@ type ChunkSink interface {
Chunk N
Chunk:
Ref // 20-byte sha1 hash
Len // 4-byte int
Data // len(Data) == Len
*/
@@ -54,12 +56,17 @@ func Serialize(w io.Writer, refs map[ref.Ref]bool, cs ChunkSource) {
chunk := &bytes.Buffer{}
for r, _ := range refs {
chunk.Reset()
r := cs.Get(r)
if r == nil {
reader := cs.Get(r)
if reader == nil {
continue
}
_, err := io.Copy(chunk, r)
digest := r.Digest()
n, err := io.Copy(w, bytes.NewReader(digest[:]))
d.Chk.NoError(err)
d.Chk.Equal(int64(sha1.Size), n)
_, err = io.Copy(chunk, reader)
d.Chk.NoError(err)
// Because of chunking at higher levels, no chunk should never be more than 4GB
@@ -67,7 +74,7 @@ func Serialize(w io.Writer, refs map[ref.Ref]bool, cs ChunkSource) {
err = binary.Write(w, binary.LittleEndian, chunkSize)
d.Chk.NoError(err)
n, err := io.Copy(w, chunk)
n, err = io.Copy(w, chunk)
d.Chk.NoError(err)
d.Chk.Equal(uint32(n), chunkSize)
}
@@ -75,17 +82,23 @@ func Serialize(w io.Writer, refs map[ref.Ref]bool, cs ChunkSource) {
func Deserialize(r io.Reader, cs ChunkSink) {
for {
chunkSize := uint32(0)
err := binary.Read(r, binary.LittleEndian, &chunkSize)
digest := ref.Sha1Digest{}
n, err := io.ReadFull(r, digest[:])
if err == io.EOF {
break
}
d.Chk.NoError(err)
d.Chk.Equal(int(sha1.Size), n)
chunkSize := uint32(0)
err = binary.Read(r, binary.LittleEndian, &chunkSize)
d.Chk.NoError(err)
w := cs.Put()
_, err = io.CopyN(w, r, int64(chunkSize))
d.Chk.NoError(err)
w.Close()
d.Chk.Equal(w.Ref(), ref.New(digest))
}
}
+14 -24
View File
@@ -162,33 +162,23 @@ function decodeValue(value, ref, getChunk) {
return decodeTaggedValue(value, ref, getChunk);
}
function readBlobAsText(blob) {
return new Promise((res, rej) => {
var reader = new FileReader();
reader.addEventListener('loadend',
() => res(reader.result));
reader.addEventListener('error', rej);
reader.readAsText(blob);
});
}
var textDecoder = new TextDecoder();
function readValue(ref, getChunk) {
return getChunk(ref)
.then(response => response.blob())
.then(blob => {
return readBlobAsText(blob.slice(0, 2))
.then(header => {
var body = blob.slice(2);
switch (header) {
case 'j ':
return readBlobAsText(body)
.then(data => decodeValue(JSON.parse(data), ref, getChunk));
case 'b ':
return Promise.resolve(body);
default :
throw Error('Unsupported encoding: ' + header);
}
});
.then(chunk => {
var hBytes = new Uint8Array(chunk.slice(0, 2));
var header = String.fromCharCode(hBytes[0], hBytes[1]);
var body = chunk.slice(2);
switch (header) {
case 'j ':
return decodeValue(JSON.parse(textDecoder.decode(body)), ref, getChunk)
case 'b ':
return body;
default :
throw Error('Unsupported encoding: ' + header);
}
});
}
+119 -26
View File
@@ -6,52 +6,145 @@ var host = function(host) {
var i = host.indexOf(':');
return i < 0 ? host : host.substring(0, i);
}(location.host);
var nomsPort = "8000";
var nomsServer = location.protocol + '//' + host + ":" + nomsPort;
var nomsPort = '8000';
var nomsServer = location.protocol + '//' + host + ':' + nomsPort;
var rpc = {
ref: nomsServer + '/ref',
root: nomsServer + '/root',
getRefs: nomsServer + '/getRefs/',
root: nomsServer + '/root/',
};
// Note that chrome limits the number of active xhrs to the same security origin to 6, more than that just sit in the "stalled" state.
var maxConnections = 8;
var activeFetches = 0;
var pendingFetches = [];
// Note that chrome limits the number of active xhrs to the same security origin to 6, more than that just sit in the 'stalled' state.
var maxReads = 3;
var activeReads = 0;
var bufferedReads = Object.create(null);
var anyPending = false;
var fetchScheduled = false;
function requestFetch(url) {
return new Promise((resolve, reject) => {
pendingFetches.push({ url, resolve, reject });
pumpFetchQueue();
function beginFetch() {
activeReads++;
var reqs = bufferedReads;
bufferedReads = Object.create(null);
anyPending = false;
fetchScheduled = false;
var refs = Object.keys(reqs);
var body = refs.map(r => 'ref=' + r).join('&');
fetch(rpc.getRefs, {
method: 'post',
body: body,
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
}
}).then(r => r.blob()).then(deserializeChunks).then((chunks) => {
// Return success
Object.keys(chunks).forEach(r => {
var chunk = chunks[r];
var callers = reqs[r];
callers.forEach(c => {
c.resolve(chunk);
});
delete reqs[r];
});
// Report failure
Object.keys(reqs).forEach(r => {
var callers = reqs[r];
callers.forEach(c => {
r.resolve(null);
})
});
endFetch();
}).catch((err) => {
Object.keys(reqs).forEach(r => {
var callers = reqs[r];
callers.forEach(c => {
c.reject(err);
});
});
});
}
function beginFetch(req) {
activeFetches++;
fetch(req.url).then((r) => {
// TODO: The caller should be able to do additional async work before endFetch() is called.
req.resolve(r);
endFetch();
}).catch(req.reject);
}
function endFetch() {
activeFetches--;
activeReads--;
pumpFetchQueue();
}
function pumpFetchQueue() {
while (pendingFetches.length && activeFetches < maxConnections) {
beginFetch(pendingFetches.shift());
if (!fetchScheduled && anyPending && activeReads < maxReads) {
fetchScheduled = true;
setTimeout(beginFetch, 0); // send all requests from this Task in a single request
}
}
const sha1Size = 20;
const chunkLengthSize = 4; // uint32
const chunkHeaderSize = sha1Size + chunkLengthSize;
function uint8ArrayToRef(a) {
var ref = 'sha1-';
for (var i = 0; i < a.length; i++) {
var v = a[i].toString(16);
if (v.length == 1) {
ref += '0' + v;
} else {
ref += v;
}
}
return ref;
}
function deserializeChunks(blob) {
return new Promise((resolve, reject) => {
var reader = new FileReader();
reader.addEventListener('loadend', () => {
var buffer = reader.result;
var totalLenth = buffer.byteLength;
var chunks = {};
for (var i = 0; i < totalLenth;) {
if (buffer.byteLength - i < chunkHeaderSize) {
reject('Invalid chunk buffer');
}
var sha1Bytes = new Uint8Array(reader.result.slice(i, i + sha1Size));
i += sha1Size;
var ref = uint8ArrayToRef(sha1Bytes);
var length = new Uint32Array(reader.result.slice(i, i + chunkLengthSize))[0];
i += chunkLengthSize;
if (i + length > totalLenth) {
reject('Invalid chunk buffer');
}
var chunk = reader.result.slice(i, i + length);
i += length;
chunks[ref] = chunk;
}
resolve(chunks);
});
reader.readAsArrayBuffer(blob);
});
}
function getChunk(ref) {
return requestFetch(rpc.ref + '/' + ref);
return new Promise((resolve, reject) => {
var callers = bufferedReads[ref] || [];
callers.push({ resolve, reject });
bufferedReads[ref] = callers;
anyPending = true;
pumpFetchQueue();
});
}
function getRoot() {
return requestFetch(rpc.root).then(res => res.text());
return new Promise((resolve, reject) => {
fetch(rpc.root).then(r => r.text()).then(resolve).catch(reject);
});
}
module.exports = {