Skip to content

Commit

Permalink
cluster: add transport
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 23, 2024
1 parent a1fa5a2 commit 93ce1d2
Show file tree
Hide file tree
Showing 8 changed files with 774 additions and 1 deletion.
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.32.0-20231115204500-e097f827e652.1
github.com/Depado/bfchroma/v2 v2.0.0
github.com/RoaringBitmap/roaring v1.9.0
github.com/VictoriaMetrics/metrics v1.32.0
github.com/alecthomas/chroma/v2 v2.2.0
github.com/apache/arrow/go/v15 v15.0.0
github.com/blevesearch/vellum v1.0.10
Expand All @@ -19,6 +20,7 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/gorilla/websocket v1.5.1
github.com/gosimple/slug v1.13.1
github.com/hashicorp/raft v1.6.1
github.com/klauspost/compress v1.16.7
github.com/oklog/ulid/v2 v2.1.0
github.com/oschwald/maxminddb-golang v1.12.0
Expand All @@ -35,15 +37,16 @@ require (

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/VictoriaMetrics/metrics v1.32.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -55,9 +58,15 @@ require (
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/hashicorp/go-hclog v1.6.2 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mschoch/smat v0.2.0 // indirect
Expand Down
101 changes: 101 additions & 0 deletions go.sum

Large diffs are not rendered by default.

78 changes: 78 additions & 0 deletions internal/closter/store/gzip/compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package gzip

import (
"bytes"
"compress/gzip"
"io"
)

// DefaultBufferSize is the default buffer size used by the Compressor.
const DefaultBufferSize = 65536

// Compressor is a wrapper around a gzip.Writer that reads from an io.Reader
// and writes to an internal buffer. The internal buffer is used to store
// compressed data until it is read by the caller.
type Compressor struct {
r io.Reader
bufSz int
buf *bytes.Buffer
gzw *gzip.Writer

nRx int64
nTx int64
}

// NewCompressor returns an instantiated Compressor that reads from r and
// compresses the data using gzip.
func NewCompressor(r io.Reader, bufSz int) (*Compressor, error) {
buf := new(bytes.Buffer)
gzw, err := gzip.NewWriterLevel(buf, gzip.DefaultCompression)
if err != nil {
return nil, err
}
return &Compressor{
r: r,
bufSz: bufSz,
buf: buf,
gzw: gzw,
}, nil
}

// Read reads compressed data.
func (c *Compressor) Read(p []byte) (n int, err error) {
if c.buf.Len() == 0 && c.gzw != nil {
nn, err := io.CopyN(c.gzw, c.r, int64(c.bufSz))
c.nRx += nn
c.nTx += int64(len(p))
if err != nil {
// Time to write the footer.
if err := c.Close(); err != nil {
return 0, err
}
if err != io.EOF {
// Actual error, let caller handle
return 0, err
}
} else if nn > 0 {
// We read some data, but didn't hit any error.
// Just flush the data in the buffer, ready
// to be read.
if err := c.gzw.Flush(); err != nil {
return 0, err
}
}
}
return c.buf.Read(p)
}

// Close closes the Compressor.
func (c *Compressor) Close() error {
if c.gzw == nil {
return nil
}
if err := c.gzw.Close(); err != nil {
return err
}
c.gzw = nil
return nil
}
Loading

0 comments on commit 93ce1d2

Please sign in to comment.