Skip to content

Commit

Permalink
store index without compression
Browse files Browse the repository at this point in the history
  • Loading branch information
gernest committed Feb 10, 2024
1 parent 31ce3e4 commit fd00296
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 129 deletions.
2 changes: 2 additions & 0 deletions db/persistance.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (s *Store) Save(r arrow.Record, idx index.Full) (*v1.Granule, error) {
if err != nil {
return nil, err
}
os.WriteFile("index/testdata/"+id, buf.Bytes(), 0600)
return &v1.Granule{
Min: int64(idx.Min()),
Max: int64(idx.Max()),
Expand Down
88 changes: 35 additions & 53 deletions gen/go/staples/v1/scan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 35 additions & 72 deletions index/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"slices"
"strings"
"sync"
"unsafe"

Expand Down Expand Up @@ -112,25 +113,28 @@ func (f *FileIndex) get(name string) (*FullColumn, error) {
func readColumn(r ReaderAtSeeker, meta *v1.Metadata_Column) (*FullColumn, error) {
buf := get()
defer buf.Release()
data, err := readChunk(r, meta.Offset, buf)
data := buf.get(int(meta.Size))
n, err := r.ReadAt(data, int64(meta.Offset))
if err != nil {
return nil, err
}
raw := get()
defer raw.Release()
rawData, err := ZSTDDecompress(raw.get(int(meta.RawSize)), data)
if err != nil {
return nil, err
if n != int(meta.Size) {
return nil, fmt.Errorf("index: Too little data read want=%d got %d", meta.Size, n)
}
o := &FullColumn{
name: meta.Name,
numRows: meta.NumRows,
fst: bytes.Clone(chuckFromRaw(rawData, meta.FstOffset)),
fst: bytes.Clone(data[:meta.FstOffset]),
}
for _, bm := range meta.BitmapsOffset {
data = data[meta.FstOffset:]
rd := bytes.NewReader(data)
for {
b := new(roaring.Bitmap)
err := b.UnmarshalBinary(chuckFromRaw(rawData, bm))
_, err := b.ReadFrom(rd)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
break
}
return nil, err
}
o.bitmaps = append(o.bitmaps, b)
Expand All @@ -153,13 +157,23 @@ func WriteFull(w io.Writer, full Full, id string) error {
}
var startOffset uint64
err := full.Columns(func(column Column) (err error) {
var col *v1.Metadata_Column
col, startOffset, err = writeColumn(w, column, startOffset, b)
if err == nil {
meta.Columns = append(meta.Columns, col)
data, offset, err := writeColumn(column, b)
if err != nil {
return err
}
b.Reset()
return err
n, err := w.Write(data)
if err != nil {
return err
}
meta.Columns = append(meta.Columns, &v1.Metadata_Column{
Name: column.Name(),
NumRows: column.NumRows(),
FstOffset: uint32(offset),
Offset: startOffset,
Size: uint32(n),
})
startOffset += uint64(n)
return
})
if err != nil {
return err
Expand All @@ -178,73 +192,22 @@ func WriteFull(w io.Writer, full Full, id string) error {
return err
}

func writeColumn(out io.Writer, column Column, startOffset uint64, w *buffers.BytesBuffer) (meta *v1.Metadata_Column, offset uint64, err error) {
meta = &v1.Metadata_Column{
Name: column.Name(),
NumRows: column.NumRows(),
Offset: &v1.Metadata_Chunk{
Offset: startOffset,
},
}
func writeColumn(column Column, w *buffers.BytesBuffer) (data []byte, offset int, err error) {
w.Reset()
// fst is the first chunk
n, err := w.Write(column.Fst())
offset, err = w.Write(column.Fst())
if err != nil {
return nil, 0, err
}
meta.FstOffset = &v1.Metadata_Chunk{
Offset: 0,
Size: uint64(n),
}
offset += uint64(n)

column.Bitmaps(func(i int, b *roaring.Bitmap) error {
data, err := b.MarshalBinary()
if err != nil {
return err
}
n, err := w.Write(data)
if err != nil {
return err
}

meta.BitmapsOffset = append(meta.BitmapsOffset, &v1.Metadata_Chunk{
Offset: offset,
Size: uint64(n),
})
offset += uint64(n)
return nil
_, err := b.WriteTo(w)
return err
})
if err != nil {
return nil, 0, err
}

meta.RawSize = uint32(w.Len())
cb := get()
defer cb.Release()
o, err := ZSTDCompress(cb.dst(w.Len()), w.Bytes(), ZSTDCompressionLevel)
if err != nil {
return nil, 0, err
}
n, err = out.Write(o)
if err != nil {
return nil, 0, err
}
meta.Offset.Size = uint64(n)
offset += uint64(n)
return meta, startOffset + offset, nil
}

func readChunk(r ReaderAtSeeker, chunk *v1.Metadata_Chunk, b *compressBuf) ([]byte, error) {
o := b.get(int(chunk.Size))
n, err := r.ReadAt(o, int64(chunk.Offset))
if err != nil {
return nil, err
}
if n != int(chunk.Size) {
return nil, fmt.Errorf("index: Too little data read want=%d got %d", chunk.Size, n)
}
return o, nil
data = w.Bytes()
return
}

func readMetadata(r ReaderAtSeeker) (*v1.Metadata, error) {
Expand Down
27 changes: 27 additions & 0 deletions index/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package index

import (
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestReadIndexFile(t *testing.T) {
f, err := os.Open("testdata/01HPA7QZNP1E8DP8H1SKK253HQ")
if err != nil {
t.Fatal(err)
}
defer f.Close()

m, err := NewFileIndex(f)
if err != nil {
t.Fatal(err)
}
col, err := m.get("path")
if err != nil {
t.Fatal(err)
}
require.Equal(t, "path", col.name)
require.Equal(t, 1, len(col.bitmaps))
}
Binary file added index/testdata/01HPA7QZNP1E8DP8H1SKK253HQ
Binary file not shown.
7 changes: 3 additions & 4 deletions proto/staples/v1/scan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ message Metadata {
message Column {
string name = 1;
uint32 num_rows = 2;
repeated Chunk bitmaps_offset = 3;
Chunk fst_offset = 4;
uint32 raw_size = 5;
Chunk offset = 6;
uint32 fst_offset = 4;
uint64 offset = 5;
uint32 size = 6;
}

message Chunk {
Expand Down

0 comments on commit fd00296

Please sign in to comment.