From 7437b201d79e65cb07eddc3543ec0db6fd34ab33 Mon Sep 17 00:00:00 2001 From: Thomas Kowalski Date: Fri, 19 Jun 2026 09:23:50 +0200 Subject: [PATCH 1/3] perf: reuse contexts --- zstd.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/zstd.go b/zstd.go index 8499bf1..b0c4faf 100644 --- a/zstd.go +++ b/zstd.go @@ -13,6 +13,8 @@ import ( "bytes" "errors" "io/ioutil" + "runtime" + "sync" "unsafe" ) @@ -57,6 +59,38 @@ func cCompressBound(srcSize int) int { return int(C.ZSTD_compressBound(C.size_t(srcSize))) } +// Keep pools of reusable contexts and use them in calls to +// ZSTD_compressCCtx/ZSTD_decompressDCtx. Those functions reset +// session state so pooling context objects is safe and significantly +// reduces allocation churn. +type cctxWrapper struct { + cctx *C.ZSTD_CCtx +} + +type dctxWrapper struct { + dctx *C.ZSTD_DCtx +} + +var cctxPool = sync.Pool{ + New: func() interface{} { + w := &cctxWrapper{cctx: C.ZSTD_createCCtx()} + runtime.SetFinalizer(w, func(w *cctxWrapper) { + C.ZSTD_freeCCtx(w.cctx) + }) + return w + }, +} + +var dctxPool = sync.Pool{ + New: func() interface{} { + w := &dctxWrapper{dctx: C.ZSTD_createDCtx()} + runtime.SetFinalizer(w, func(w *dctxWrapper) { + C.ZSTD_freeDCtx(w.dctx) + }) + return w + }, +} + // decompressSizeHint tries to give a hint on how much of the output buffer size we should have // based on zstd frame descriptors. To prevent DOS from maliciously-created payloads, limit the size func decompressSizeHint(src []byte) int { @@ -100,19 +134,23 @@ func CompressLevel(dst, src []byte, level int) ([]byte, error) { dst = make([]byte, bound) } + w := cctxPool.Get().(*cctxWrapper) + // We need unsafe.Pointer(&src[0]) in the Cgo call to avoid "Go pointer to Go pointer" panics. // This means we need to special case empty input. See: // https://github.com/golang/go/issues/14210#issuecomment-346402945 var cWritten C.size_t if len(src) == 0 { - cWritten = C.ZSTD_compress( + cWritten = C.ZSTD_compressCCtx( + w.cctx, unsafe.Pointer(&dst[0]), C.size_t(len(dst)), unsafe.Pointer(nil), C.size_t(0), C.int(level)) } else { - cWritten = C.ZSTD_compress( + cWritten = C.ZSTD_compressCCtx( + w.cctx, unsafe.Pointer(&dst[0]), C.size_t(len(dst)), unsafe.Pointer(&src[0]), @@ -120,6 +158,8 @@ func CompressLevel(dst, src []byte, level int) ([]byte, error) { C.int(level)) } + cctxPool.Put(w) + written := int(cWritten) // Check if the return is an Error code if err := getError(written); err != nil { @@ -165,10 +205,13 @@ func Decompress(dst, src []byte) ([]byte, error) { // It returns the number of bytes copied and an error if any is encountered. If // dst is too small, DecompressInto errors. func DecompressInto(dst, src []byte) (int, error) { - written := int(C.ZSTD_decompress( + w := dctxPool.Get().(*dctxWrapper) + written := int(C.ZSTD_decompressDCtx( + w.dctx, unsafe.Pointer(&dst[0]), C.size_t(len(dst)), unsafe.Pointer(&src[0]), C.size_t(len(src)))) + dctxPool.Put(w) return written, getError(written) } From fd69ebc3cd94e1b81c7bf27f5036c2480c44575d Mon Sep 17 00:00:00 2001 From: Thomas Kowalski Date: Fri, 19 Jun 2026 09:53:21 +0200 Subject: [PATCH 2/3] test: add benchmarks --- zstd_perf_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 zstd_perf_test.go diff --git a/zstd_perf_test.go b/zstd_perf_test.go new file mode 100644 index 0000000..8b95d59 --- /dev/null +++ b/zstd_perf_test.go @@ -0,0 +1,116 @@ +package zstd + +import ( + "bytes" + "fmt" + "testing" +) + +// perfPayload builds a compressible, log/JSON-like payload of approximately +// size bytes, representative of the per-message data the DataDog services +// compress in practice. +func perfPayload(size int) []byte { + var b bytes.Buffer + i := 0 + for b.Len() < size { + fmt.Fprintf(&b, + `{"ts":%d,"level":"INFO","service":"trace-writer","host":"i-%08x","trace_id":%d,"duration_ms":%d,"msg":"request completed"}`+"\n", + 1718800000+i, i*2654435761, int64(i)*1099511628211, i%5000) + i++ + } + return b.Bytes()[:size] +} + +func benchmarkOneShotCompress(b *testing.B, size int) { + src := perfPayload(size) + dst := make([]byte, CompressBound(len(src))) + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := Compress(dst, src); err != nil { + b.Fatal(err) + } + } +} + +func benchmarkOneShotDecompress(b *testing.B, size int) { + src := perfPayload(size) + comp, err := Compress(nil, src) + if err != nil { + b.Fatal(err) + } + dst := make([]byte, len(src)) + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := DecompressInto(dst, comp); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkOneShotCompress1K(b *testing.B) { benchmarkOneShotCompress(b, 1024) } +func BenchmarkOneShotCompress8K(b *testing.B) { benchmarkOneShotCompress(b, 8*1024) } +func BenchmarkOneShotCompress64K(b *testing.B) { benchmarkOneShotCompress(b, 64*1024) } +func BenchmarkOneShotDecompress1K(b *testing.B) { benchmarkOneShotDecompress(b, 1024) } +func BenchmarkOneShotDecompress8K(b *testing.B) { benchmarkOneShotDecompress(b, 8*1024) } +func BenchmarkOneShotDecompress64K(b *testing.B) { benchmarkOneShotDecompress(b, 64*1024) } + +// The *Nil benchmarks model call sites that pass dst == nil (the common +// pattern in dd-go/dd-source), forcing a Go-heap allocation per call. Compare +// their B/op and ns/op against the reused-buffer benchmarks above to see the +// GC cost that a caller-side buffer pool would remove. + +func benchmarkOneShotCompressNil(b *testing.B, size int) { + src := perfPayload(size) + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := Compress(nil, src); err != nil { + b.Fatal(err) + } + } +} + +func benchmarkOneShotDecompressNil(b *testing.B, size int) { + src := perfPayload(size) + comp, err := Compress(nil, src) + if err != nil { + b.Fatal(err) + } + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := Decompress(nil, comp); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkOneShotCompressNil1K(b *testing.B) { benchmarkOneShotCompressNil(b, 1024) } +func BenchmarkOneShotCompressNil8K(b *testing.B) { benchmarkOneShotCompressNil(b, 8*1024) } +func BenchmarkOneShotCompressNil64K(b *testing.B) { benchmarkOneShotCompressNil(b, 64*1024) } +func BenchmarkOneShotDecompressNil1K(b *testing.B) { benchmarkOneShotDecompressNil(b, 1024) } +func BenchmarkOneShotDecompressNil8K(b *testing.B) { benchmarkOneShotDecompressNil(b, 8*1024) } +func BenchmarkOneShotDecompressNil64K(b *testing.B) { benchmarkOneShotDecompressNil(b, 64*1024) } + +// BenchmarkOneShotCompressParallel mirrors highly concurrent services that +// compress from many goroutines, exercising context-pool scalability. +func BenchmarkOneShotCompressParallel(b *testing.B) { + src := perfPayload(8 * 1024) + b.SetBytes(int64(len(src))) + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + dst := make([]byte, CompressBound(len(src))) + for pb.Next() { + if _, err := Compress(dst, src); err != nil { + b.Fatal(err) + } + } + }) +} From 30f7a4efa957a2a3db4ce2183c36ff2f37a16871 Mon Sep 17 00:00:00 2001 From: Thomas Kowalski Date: Fri, 19 Jun 2026 10:13:23 +0200 Subject: [PATCH 3/3] test: remove benchmarks --- zstd_perf_test.go | 116 ---------------------------------------------- 1 file changed, 116 deletions(-) delete mode 100644 zstd_perf_test.go diff --git a/zstd_perf_test.go b/zstd_perf_test.go deleted file mode 100644 index 8b95d59..0000000 --- a/zstd_perf_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package zstd - -import ( - "bytes" - "fmt" - "testing" -) - -// perfPayload builds a compressible, log/JSON-like payload of approximately -// size bytes, representative of the per-message data the DataDog services -// compress in practice. -func perfPayload(size int) []byte { - var b bytes.Buffer - i := 0 - for b.Len() < size { - fmt.Fprintf(&b, - `{"ts":%d,"level":"INFO","service":"trace-writer","host":"i-%08x","trace_id":%d,"duration_ms":%d,"msg":"request completed"}`+"\n", - 1718800000+i, i*2654435761, int64(i)*1099511628211, i%5000) - i++ - } - return b.Bytes()[:size] -} - -func benchmarkOneShotCompress(b *testing.B, size int) { - src := perfPayload(size) - dst := make([]byte, CompressBound(len(src))) - b.SetBytes(int64(len(src))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := Compress(dst, src); err != nil { - b.Fatal(err) - } - } -} - -func benchmarkOneShotDecompress(b *testing.B, size int) { - src := perfPayload(size) - comp, err := Compress(nil, src) - if err != nil { - b.Fatal(err) - } - dst := make([]byte, len(src)) - b.SetBytes(int64(len(src))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := DecompressInto(dst, comp); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkOneShotCompress1K(b *testing.B) { benchmarkOneShotCompress(b, 1024) } -func BenchmarkOneShotCompress8K(b *testing.B) { benchmarkOneShotCompress(b, 8*1024) } -func BenchmarkOneShotCompress64K(b *testing.B) { benchmarkOneShotCompress(b, 64*1024) } -func BenchmarkOneShotDecompress1K(b *testing.B) { benchmarkOneShotDecompress(b, 1024) } -func BenchmarkOneShotDecompress8K(b *testing.B) { benchmarkOneShotDecompress(b, 8*1024) } -func BenchmarkOneShotDecompress64K(b *testing.B) { benchmarkOneShotDecompress(b, 64*1024) } - -// The *Nil benchmarks model call sites that pass dst == nil (the common -// pattern in dd-go/dd-source), forcing a Go-heap allocation per call. Compare -// their B/op and ns/op against the reused-buffer benchmarks above to see the -// GC cost that a caller-side buffer pool would remove. - -func benchmarkOneShotCompressNil(b *testing.B, size int) { - src := perfPayload(size) - b.SetBytes(int64(len(src))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := Compress(nil, src); err != nil { - b.Fatal(err) - } - } -} - -func benchmarkOneShotDecompressNil(b *testing.B, size int) { - src := perfPayload(size) - comp, err := Compress(nil, src) - if err != nil { - b.Fatal(err) - } - b.SetBytes(int64(len(src))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - if _, err := Decompress(nil, comp); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkOneShotCompressNil1K(b *testing.B) { benchmarkOneShotCompressNil(b, 1024) } -func BenchmarkOneShotCompressNil8K(b *testing.B) { benchmarkOneShotCompressNil(b, 8*1024) } -func BenchmarkOneShotCompressNil64K(b *testing.B) { benchmarkOneShotCompressNil(b, 64*1024) } -func BenchmarkOneShotDecompressNil1K(b *testing.B) { benchmarkOneShotDecompressNil(b, 1024) } -func BenchmarkOneShotDecompressNil8K(b *testing.B) { benchmarkOneShotDecompressNil(b, 8*1024) } -func BenchmarkOneShotDecompressNil64K(b *testing.B) { benchmarkOneShotDecompressNil(b, 64*1024) } - -// BenchmarkOneShotCompressParallel mirrors highly concurrent services that -// compress from many goroutines, exercising context-pool scalability. -func BenchmarkOneShotCompressParallel(b *testing.B) { - src := perfPayload(8 * 1024) - b.SetBytes(int64(len(src))) - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - dst := make([]byte, CompressBound(len(src))) - for pb.Next() { - if _, err := Compress(dst, src); err != nil { - b.Fatal(err) - } - } - }) -}