mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2025-12-30 07:20:35 -06:00
Make the buffer pool tiered
This commit is contained in:
committed by
Francesco Mazzoli
parent
74562dbe82
commit
255726f8bd
@@ -9,19 +9,38 @@ import (
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// An extremely dumb buffer pool. The assumption is that this is to be used in
|
||||
// cases where there is a fixed upper bound for the buffers, and most requests
|
||||
// are going to be for that upper bound.
|
||||
type BufPool sync.Pool
|
||||
const (
|
||||
minSize = 4096
|
||||
numBuckets = 20
|
||||
)
|
||||
|
||||
type BufPool struct {
|
||||
buckets [numBuckets]sync.Pool
|
||||
bucketsMaxSize [numBuckets-1]int
|
||||
}
|
||||
|
||||
func NewBufPool() *BufPool {
|
||||
pool := BufPool(sync.Pool{
|
||||
bp := &BufPool{}
|
||||
sz := minSize
|
||||
for i := range numBuckets-1 {
|
||||
sz = (sz * 3) / 2
|
||||
sz = ((sz + 4096 - 1) / 4096) * 4096 // page-align all sizes
|
||||
bp.bucketsMaxSize[i] = sz
|
||||
bucketIx := i
|
||||
bp.buckets[i] = sync.Pool{
|
||||
New: func() any {
|
||||
buf := make([]byte, bp.bucketsMaxSize[bucketIx])
|
||||
return &buf
|
||||
},
|
||||
}
|
||||
}
|
||||
bp.buckets[numBuckets-1] = sync.Pool{
|
||||
New: func() any {
|
||||
buf := []byte{}
|
||||
return &buf
|
||||
},
|
||||
})
|
||||
return &pool
|
||||
}
|
||||
return bp
|
||||
}
|
||||
|
||||
// Use with caution, if you then put the resulting buf in a bufpool: the buffer
|
||||
@@ -48,27 +67,54 @@ func (b *Buf) BytesPtr() *[]byte {
|
||||
return res
|
||||
}
|
||||
|
||||
func (b *Buf) Acquire() *Buf {
|
||||
res := b.b.Swap(nil)
|
||||
if res == nil {
|
||||
panic("accessing freed buffer")
|
||||
}
|
||||
return NewBuf(res)
|
||||
}
|
||||
|
||||
func (bpool *BufPool) bucket(l int) int {
|
||||
var i int
|
||||
for i = 0; i < numBuckets-1 && l > bpool.bucketsMaxSize[i]; i++ {
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
||||
// This does _not_ zero the memory in the bufs -- i.e. there might
|
||||
// be garbage in it.
|
||||
func (pool *BufPool) Get(l int) *Buf {
|
||||
// be garbage in it. The user shouldn't extend the backing buffer,
|
||||
// although things will keep working if he does. TODO possibly
|
||||
// enforce this?
|
||||
func (bpool *BufPool) Get(l int) *Buf {
|
||||
res := &Buf{}
|
||||
if l == 0 {
|
||||
b := []byte{}
|
||||
res.b.Store(&b)
|
||||
return res
|
||||
}
|
||||
buf := (*sync.Pool)(pool).Get().(*[]byte)
|
||||
poolIx := bpool.bucket(l)
|
||||
pool := &bpool.buckets[poolIx]
|
||||
buf := pool.Get().(*[]byte)
|
||||
if cap(*buf) >= l {
|
||||
*buf = (*buf)[:l]
|
||||
} else {
|
||||
} else { // happens for last bucket, and if users change the buffer
|
||||
*buf = (*buf)[:cap(*buf)]
|
||||
*buf = append(*buf, make([]byte, l-len(*buf))...)
|
||||
if poolIx == numBuckets-1 { // just make it exactly l
|
||||
*buf = append(*buf, make([]byte, l-len(*buf))...)
|
||||
} else {
|
||||
if l > bpool.bucketsMaxSize[poolIx] {
|
||||
panic("impossible")
|
||||
}
|
||||
*buf = append(*buf, make([]byte, bpool.bucketsMaxSize[poolIx]-len(*buf))...)
|
||||
}
|
||||
*buf = (*buf)[:l]
|
||||
}
|
||||
res.b.Store(buf)
|
||||
return res
|
||||
}
|
||||
|
||||
func (pool *BufPool) Put(buf *Buf) {
|
||||
func (bpool *BufPool) Put(buf *Buf) {
|
||||
if buf == nil {
|
||||
return
|
||||
}
|
||||
@@ -79,5 +125,6 @@ func (pool *BufPool) Put(buf *Buf) {
|
||||
if cap(*ptr) == 0 {
|
||||
return
|
||||
}
|
||||
(*sync.Pool)(pool).Put(ptr)
|
||||
poolIx := bpool.bucket(cap(*ptr))
|
||||
bpool.buckets[poolIx].Put(ptr)
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func ternErrToErrno(err error) syscall.Errno {
|
||||
case msgs.FATAL_ERROR:
|
||||
return syscall.EIO
|
||||
case msgs.TIMEOUT:
|
||||
return syscall.EIO
|
||||
return syscall.ETIMEDOUT
|
||||
case msgs.NOT_AUTHORISED:
|
||||
return syscall.EACCES
|
||||
case msgs.UNRECOGNIZED_REQUEST:
|
||||
|
||||
Reference in New Issue
Block a user