Properly implement buffer pool
It wasn't reusing buffers, idiot idiot idiot...
This commit is contained in:
@@ -2,12 +2,20 @@ package pdu
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BufferPoolManager struct {
|
type BufferPoolManager struct {
|
||||||
pools map[int]*sync.Pool
|
pools map[int]*sync.Pool
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
debug bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bpm *BufferPoolManager) logf(format string, args ...interface{}) {
|
||||||
|
if bpm.debug {
|
||||||
|
log.Printf(format, args...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBufferPoolManager() *BufferPoolManager {
|
func NewBufferPoolManager() *BufferPoolManager {
|
||||||
@@ -26,9 +34,13 @@ func (bpm *BufferPoolManager) Get(size int) *bytes.Buffer {
|
|||||||
// Double-check if another goroutine added the pool while we were waiting
|
// Double-check if another goroutine added the pool while we were waiting
|
||||||
pool, exists = bpm.pools[size]
|
pool, exists = bpm.pools[size]
|
||||||
if !exists {
|
if !exists {
|
||||||
|
bpm.logf("Creating new pool for size %d\n", size)
|
||||||
pool = &sync.Pool{
|
pool = &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
return bytes.NewBuffer(make([]byte, size))
|
bpm.logf("Creating new buffer of size %d\n", size)
|
||||||
|
buf := bytes.NewBuffer(make([]byte, size))
|
||||||
|
buf.Reset()
|
||||||
|
return buf
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
bpm.pools[size] = pool
|
bpm.pools[size] = pool
|
||||||
@@ -36,19 +48,23 @@ func (bpm *BufferPoolManager) Get(size int) *bytes.Buffer {
|
|||||||
bpm.mu.Unlock()
|
bpm.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return pool.Get().(*bytes.Buffer);
|
buf := pool.Get().(*bytes.Buffer)
|
||||||
|
bpm.logf("Returning buffer of size %d: %p\n", buf.Cap(), buf)
|
||||||
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bpm *BufferPoolManager) Put(buf *bytes.Buffer) {
|
func (bpm *BufferPoolManager) Put(buf *bytes.Buffer) {
|
||||||
size := buf.Len()
|
size := buf.Cap()
|
||||||
bpm.mu.RLock()
|
bpm.mu.RLock()
|
||||||
pool, exists := bpm.pools[size]
|
pool, exists := bpm.pools[size]
|
||||||
bpm.mu.RUnlock()
|
bpm.mu.RUnlock()
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
|
bpm.logf("Cannot return %p, No pool for size %d\n", buf, size)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
buf.Reset()
|
buf.Reset()
|
||||||
pool.Put(buf)
|
pool.Put(buf)
|
||||||
|
bpm.logf("Returned buffer of size %d: %p\n", size, buf)
|
||||||
}
|
}
|
||||||
|
@@ -16,8 +16,8 @@ func TestRetrieveBufferOfRequestedSize(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer, got nil")
|
t.Fatalf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -31,8 +31,8 @@ func TestRequestBufferSizeZero(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer, got nil")
|
t.Fatalf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,8 +51,8 @@ func TestConcurrentAccessToBufferPool(t *testing.T) {
|
|||||||
if buffer == nil {
|
if buffer == nil {
|
||||||
t.Errorf("Expected buffer, got nil")
|
t.Errorf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -70,8 +70,8 @@ func TestGetBufferLockUnlock(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer, got nil")
|
t.Fatalf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,8 +85,8 @@ func TestVerifyPoolCreationForNewSizes(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer, got nil")
|
t.Fatalf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,8 +100,8 @@ func TestBufferPoolManagerGetBuffer(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer, got nil")
|
t.Fatalf("Expected buffer, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -116,8 +116,8 @@ func TestGetBufferWithMultipleSizes(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer for size %d, got nil", size)
|
t.Fatalf("Expected buffer for size %d, got nil", size)
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -133,8 +133,8 @@ func TestGetBufferIsAlwaysZero(t *testing.T) {
|
|||||||
t.Fatalf("Expected buffer for size %d, got nil", size)
|
t.Fatalf("Expected buffer for size %d, got nil", size)
|
||||||
}
|
}
|
||||||
|
|
||||||
if buffer.Len() != size {
|
if buffer.Cap() != size {
|
||||||
t.Errorf("Expected buffer size %d, got %d", size, buffer.Len())
|
t.Errorf("Expected buffer size %d, got %d", size, buffer.Cap())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range buffer.Bytes() {
|
for _, b := range buffer.Bytes() {
|
||||||
@@ -147,6 +147,23 @@ func TestGetBufferIsAlwaysZero(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolReusesBuffers(t *testing.T) {
|
||||||
|
bpm := NewBufferPoolManager()
|
||||||
|
|
||||||
|
size := 1024
|
||||||
|
buffer := bpm.Get(size)
|
||||||
|
initialPtr := buffer
|
||||||
|
bpm.Put(buffer)
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
buffer = bpm.Get(size)
|
||||||
|
if buffer != initialPtr {
|
||||||
|
t.Errorf("Expected initial buffer (%p) to be reused, got %p", initialPtr, buffer)
|
||||||
|
}
|
||||||
|
bpm.Put(buffer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// region benchmark
|
||||||
func BenchmarkBufferPoolManager(b *testing.B) {
|
func BenchmarkBufferPoolManager(b *testing.B) {
|
||||||
bpm := NewBufferPoolManager()
|
bpm := NewBufferPoolManager()
|
||||||
bufSize := 128 * 1024 // a PDU should not be larger than this... Even this is way too large
|
bufSize := 128 * 1024 // a PDU should not be larger than this... Even this is way too large
|
||||||
@@ -195,7 +212,7 @@ func BenchmarkBufferPoolManager_Memory(b *testing.B) {
|
|||||||
|
|
||||||
// Simulate some work
|
// Simulate some work
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
for _ = range buf.Bytes() {
|
for range buf.Bytes() {
|
||||||
buf.WriteByte(i % 255)
|
buf.WriteByte(i % 255)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user