Skip to content

Commit

Permalink
store data as parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 12, 2024
1 parent 6006ab7 commit 0a80afe
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 32 deletions.
60 changes: 28 additions & 32 deletions db/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"time"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/ipc"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/compress"
"github.com/apache/arrow/go/v15/parquet/pqarrow"
"github.com/dgraph-io/badger/v4"
"github.com/oklog/ulid/v2"
"github.com/vinceanalytics/vince/buffers"
Expand Down Expand Up @@ -89,53 +90,48 @@ func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {

func (s *Store) SaveRecord(buf *buffers.BytesBuffer, base []byte, r arrow.Record) (n uint64, err error) {
schema := r.Schema()
var x uint64
b := buffers.Bytes()
defer b.Release()
props := []parquet.WriterProperty{
parquet.WithAllocator(s.mem),
parquet.WithBatchSize(r.NumRows()), // we save as a single row group
parquet.WithCompression(compress.Codecs.Zstd),
}
for i := 0; i < int(r.NumCols()); i++ {
x, err = s.SaveColumn(buf, base, r.ColumnName(i), schema.Field(i), r.Column(i))
if err != nil {
return
// save dictionaries
path := r.ColumnName(i)
if r.Column(i).DataType().ID() == arrow.DICTIONARY {
props = append(props, parquet.WithDictionaryFor(path, true))
}
n += x
}
return
}

func (s *Store) SaveColumn(buf *buffers.BytesBuffer, base []byte, key string, field arrow.Field, a arrow.Array) (n uint64, err error) {
r := array.NewRecord(
arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Array{a},
int64(a.Len()),
)
defer r.Release()
b := buffers.Bytes()
defer func() {
n = uint64(b.Len())
b.Release()
}()
w := ipc.NewWriter(b,
ipc.WithSchema(r.Schema()),
ipc.WithAllocator(s.mem),
ipc.WithZstd(),
ipc.WithMinSpaceSavings(0.3), //at least 30% savings
)
w, err := pqarrow.NewFileWriter(schema, b, parquet.NewWriterProperties(props...),
pqarrow.NewArrowWriterProperties(
pqarrow.WithAllocator(s.mem),
pqarrow.WithStoreSchema(),
))
if err != nil {
return 0, err
}
err = w.Write(r)
if err != nil {
return
return 0, err
}
err = w.Close()
if err != nil {
return
return 0, err
}
buf.Reset()
buf.Write(base)
buf.Write(slash)
buf.WriteString(key)
buf.Write(dataPath)
err = s.db.Set(buf.Bytes(), b.Bytes(), s.ttl)
n = uint64(b.Len())
return
}

var (
slash = []byte("/")
slash = []byte("/")
dataPath = []byte("data")
)

type KV struct {
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ require (

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -51,7 +53,10 @@ require (
github.com/google/cel-go v0.18.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ github.com/alecthomas/chroma/v2 v2.2.0 h1:Aten8jfQwUqEdadVFFjNyjx7HTexhKP0XuqBG6
github.com/alecthomas/chroma/v2 v2.2.0/go.mod h1:vf4zrexSH54oEjJ7EdB65tGNHmH3pGZmVkgTP5RHvAs=
github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae h1:zzGwJfFlFGD94CyyYwCJeSuD32Gj9GTaSi5y9hoVzdY=
github.com/alecthomas/repr v0.0.0-20220113201626-b1b626ac65ae/go.mod h1:2kn6fqh/zIyPLmm3ugklbEi5hg5wS435eygvNfaDQL8=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/apache/arrow/go/v15 v15.0.0 h1:1zZACWf85oEZY5/kd9dsQS7i+2G5zVQcbKTHgslqHNA=
github.com/apache/arrow/go/v15 v15.0.0/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
github.com/apache/thrift v0.17.0 h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
Expand Down Expand Up @@ -98,6 +102,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHg
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
Expand All @@ -111,6 +117,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
Expand All @@ -133,6 +143,7 @@ github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AV
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down

0 comments on commit 0a80afe

Please sign in to comment.