mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-23 18:58:50 -06:00
Rate-limit LevelDBStore read operations as well (#2239)
Under load, our server can exhaust the number of file descriptors it's allowed to have open at one time. Part of this is because of how many incoming connections it's handling, but we believe that handling lots of simultaneous reads to leveldb is the larger part of the issue. This patch applies the rate limit we were using for writing to both read and write operations. Fixes #2227
This commit is contained in:
@@ -146,9 +146,8 @@ func (l *LevelDBStore) setVersIfUnset() {
|
||||
}
|
||||
|
||||
type internalLevelDBStore struct {
|
||||
db *leveldb.DB
|
||||
mu *sync.Mutex
|
||||
concurrentWriteLimit chan struct{}
|
||||
db *rateLimitedLevelDB
|
||||
mu sync.Mutex
|
||||
getCount, hasCount, putCount, putBytes int64
|
||||
dumpStats bool
|
||||
}
|
||||
@@ -164,10 +163,8 @@ func newBackingStore(dir string, maxFileHandles int, dumpStats bool) *internalLe
|
||||
})
|
||||
d.Chk.NoError(err, "opening internalLevelDBStore in %s", dir)
|
||||
return &internalLevelDBStore{
|
||||
db: db,
|
||||
mu: &sync.Mutex{},
|
||||
concurrentWriteLimit: make(chan struct{}, maxFileHandles),
|
||||
dumpStats: dumpStats,
|
||||
db: &rateLimitedLevelDB{db, make(chan struct{}, maxFileHandles)},
|
||||
dumpStats: dumpStats,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,29 +220,23 @@ func (l *internalLevelDBStore) versByKey(key []byte) string {
|
||||
}
|
||||
|
||||
func (l *internalLevelDBStore) setVersByKey(key []byte) {
|
||||
l.concurrentWriteLimit <- struct{}{}
|
||||
err := l.db.Put(key, []byte(constants.NomsVersion), nil)
|
||||
d.Chk.NoError(err)
|
||||
<-l.concurrentWriteLimit
|
||||
}
|
||||
|
||||
func (l *internalLevelDBStore) putByKey(key []byte, c Chunk) {
|
||||
l.concurrentWriteLimit <- struct{}{}
|
||||
data := snappy.Encode(nil, c.Data())
|
||||
err := l.db.Put(key, data, nil)
|
||||
d.Chk.NoError(err)
|
||||
l.putCount++
|
||||
l.putBytes += int64(len(data))
|
||||
<-l.concurrentWriteLimit
|
||||
}
|
||||
|
||||
func (l *internalLevelDBStore) putBatch(b *leveldb.Batch, numBytes int) {
|
||||
l.concurrentWriteLimit <- struct{}{}
|
||||
err := l.db.Write(b, nil)
|
||||
d.Chk.NoError(err)
|
||||
l.putCount += int64(b.Len())
|
||||
l.putBytes += int64(numBytes)
|
||||
<-l.concurrentWriteLimit
|
||||
}
|
||||
|
||||
func (l *internalLevelDBStore) Close() error {
|
||||
|
||||
39
go/chunks/rate_limited_leveldb.go
Normal file
39
go/chunks/rate_limited_leveldb.go
Normal file
@@ -0,0 +1,39 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
type rateLimitedLevelDB struct {
|
||||
*leveldb.DB
|
||||
concurrentFileIOLimit chan struct{}
|
||||
}
|
||||
|
||||
func (db *rateLimitedLevelDB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
|
||||
db.concurrentFileIOLimit <- struct{}{}
|
||||
defer func() { <-db.concurrentFileIOLimit }()
|
||||
return db.DB.Get(key, ro)
|
||||
}
|
||||
|
||||
func (db *rateLimitedLevelDB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
|
||||
db.concurrentFileIOLimit <- struct{}{}
|
||||
defer func() { <-db.concurrentFileIOLimit }()
|
||||
return db.DB.Has(key, ro)
|
||||
}
|
||||
|
||||
func (db *rateLimitedLevelDB) Put(key, value []byte, wo *opt.WriteOptions) error {
|
||||
db.concurrentFileIOLimit <- struct{}{}
|
||||
defer func() { <-db.concurrentFileIOLimit }()
|
||||
return db.DB.Put(key, value, wo)
|
||||
}
|
||||
|
||||
func (db *rateLimitedLevelDB) Write(b *leveldb.Batch, wo *opt.WriteOptions) (err error) {
|
||||
db.concurrentFileIOLimit <- struct{}{}
|
||||
defer func() { <-db.concurrentFileIOLimit }()
|
||||
return db.DB.Write(b, wo)
|
||||
}
|
||||
Reference in New Issue
Block a user