Skip to content

Commit

Permalink
Go: Add support for metadata indexes to reader (#589)
Browse files Browse the repository at this point in the history
* Go: Add support for metadata indexes to reader

Gather metadata index information when reading the summary section.
Also, bump required go version to 1.18.

* Upgrade golangci-lint
  • Loading branch information
wkalt committed Sep 20, 2022
1 parent dd26169 commit d96232f
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ jobs:
with:
lfs: true
- name: install golangci-lint
run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.44.2
run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.49.0
- run: make lint
- run: make test

Expand Down
2 changes: 1 addition & 1 deletion go/mcap/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/foxglove/mcap/go/mcap

go 1.17
go 1.18

require (
github.com/klauspost/compress v1.14.1
Expand Down
8 changes: 7 additions & 1 deletion go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type indexedMessageIterator struct {
statistics *Statistics
chunkIndexes []*ChunkIndex
attachmentIndexes []*AttachmentIndex
metadataIndexes []*MetadataIndex

indexHeap rangeIndexHeap

Expand Down Expand Up @@ -89,6 +90,12 @@ func (it *indexedMessageIterator) parseSummarySection() error {
return fmt.Errorf("failed to parse attachment index: %w", err)
}
it.attachmentIndexes = append(it.attachmentIndexes, idx)
case TokenMetadataIndex:
idx, err := ParseMetadataIndex(record)
if err != nil {
return fmt.Errorf("failed to parse metadata index: %w", err)
}
it.metadataIndexes = append(it.metadataIndexes, idx)
case TokenChunkIndex:
idx, err := ParseChunkIndex(record)
if err != nil {
Expand Down Expand Up @@ -211,7 +218,6 @@ func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, e
return nil, nil, nil, err
}
}

for it.indexHeap.Len() > 0 {
ri, err := it.indexHeap.HeapPop()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/mcap/mcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ type Info struct {
Channels map[uint16]*Channel
Schemas map[uint16]*Schema
ChunkIndexes []*ChunkIndex
MetadataIndexes []*MetadataIndex
AttachmentIndexes []*AttachmentIndex
Header *Header
}
Expand Down
111 changes: 111 additions & 0 deletions go/mcap/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,117 @@ func TestParseHeader(t *testing.T) {
}
}

func TestParseMetadata(t *testing.T) {
cases := []struct {
assertion string
input []byte
output *Metadata
err error
}{
{
"empty input",
[]byte{},
nil,
io.ErrShortBuffer,
},
{
"missing metadata",
prefixedString("metadata"),
nil,
io.ErrShortBuffer,
},
{
"empty metadata",
flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{})),
&Metadata{
Name: "metadata",
Metadata: make(map[string]string),
},
nil,
},
{
"one value",
flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{
"foo": "bar",
})),
&Metadata{
Name: "metadata",
Metadata: map[string]string{
"foo": "bar",
},
},
nil,
},
{
"two values",
flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{
"foo": "bar",
"spam": "eggs",
})),
&Metadata{
Name: "metadata",
Metadata: map[string]string{
"foo": "bar",
"spam": "eggs",
},
},
nil,
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
output, err := ParseMetadata(c.input)
assert.ErrorIs(t, err, c.err)
assert.Equal(t, output, c.output)
})
}
}

func TestParseMetadataIndex(t *testing.T) {
cases := []struct {
assertion string
input []byte
output *MetadataIndex
err error
}{
{
"empty input",
[]byte{},
nil,
io.ErrShortBuffer,
},
{
"offset only",
encodedUint64(100),
nil,
io.ErrShortBuffer,
},
{
"missing name",
flatten(encodedUint64(100), encodedUint64(1000)),
nil,
io.ErrShortBuffer,
},
{
"well-formed index",
flatten(encodedUint64(100), encodedUint64(1000), prefixedString("metadata")),
&MetadataIndex{
Name: "metadata",
Offset: 100,
Length: 1000,
},
nil,
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
output, err := ParseMetadataIndex(c.input)
assert.ErrorIs(t, err, c.err)
assert.Equal(t, output, c.output)
})
}
}

func TestParseFooter(t *testing.T) {
cases := []struct {
assertion string
Expand Down
1 change: 1 addition & 0 deletions go/mcap/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (r *Reader) Info() (*Info, error) {
Channels: it.channels,
ChunkIndexes: it.chunkIndexes,
AttachmentIndexes: it.attachmentIndexes,
MetadataIndexes: it.metadataIndexes,
Schemas: it.schemas,
Header: header,
}, nil
Expand Down
173 changes: 152 additions & 21 deletions go/mcap/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,29 +396,160 @@ func TestReaderCounting(t *testing.T) {
}

func TestMCAPInfo(t *testing.T) {
f, err := os.Open("../../testdata/mcap/demo.mcap")
assert.Nil(t, err)
defer f.Close()
assert.Nil(t, err)
r, err := NewReader(f)
assert.Nil(t, err)
info, err := r.Info()
assert.Nil(t, err)
assert.Equal(t, uint64(1606), info.Statistics.MessageCount)
assert.Equal(t, uint32(7), info.Statistics.ChannelCount)
assert.Equal(t, 14, int(info.Statistics.ChunkCount))
expectedCounts := map[string]uint64{
"/radar/points": 156,
"/radar/tracks": 156,
"/radar/range": 156,
"/velodyne_points": 78,
"/diagnostics": 52,
"/tf": 774,
"/image_color/compressed": 234,
cases := []struct {
assertion string
schemas []*Schema
channels []*Channel
messages []*Message
metadata []*Metadata
attachments []*Attachment
}{
{
"no metadata or attachments",
[]*Schema{
{
ID: 1,
},
{
ID: 2,
},
},
[]*Channel{
{
ID: 1,
SchemaID: 1,
Topic: "/foo",
},
{
ID: 2,
SchemaID: 2,
Topic: "/bar",
},
},
[]*Message{
{
ChannelID: 1,
},
{
ChannelID: 2,
},
},
[]*Metadata{},
[]*Attachment{},
},
{
"no metadata or attachments",
[]*Schema{
{
ID: 1,
},
{
ID: 2,
},
},
[]*Channel{
{
ID: 1,
SchemaID: 1,
Topic: "/foo",
},
{
ID: 2,
SchemaID: 2,
Topic: "/bar",
},
},
[]*Message{
{
ChannelID: 1,
},
{
ChannelID: 2,
},
},
[]*Metadata{
{
Name: "metadata1",
Metadata: map[string]string{
"foo": "bar",
},
},
{
Name: "metadata2",
Metadata: map[string]string{
"foo": "bar",
},
},
},
[]*Attachment{
{
Name: "my attachment",
},
},
},
}
for _, c := range cases {
t.Run(c.assertion, func(t *testing.T) {
buf := &bytes.Buffer{}
w, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compression: CompressionLZ4,
})
assert.Nil(t, err)
assert.Nil(t, w.WriteHeader(&Header{}))
for _, schema := range c.schemas {
assert.Nil(t, w.WriteSchema(schema))
}
for _, channel := range c.channels {
assert.Nil(t, w.WriteChannel(channel))
}
for _, message := range c.messages {
assert.Nil(t, w.WriteMessage(message))
}
for _, metadata := range c.metadata {
assert.Nil(t, w.WriteMetadata(metadata))
}
for _, attachment := range c.attachments {
assert.Nil(t, w.WriteAttachment(attachment))
}
assert.Nil(t, w.Close())

reader := bytes.NewReader(buf.Bytes())
r, err := NewReader(reader)
assert.Nil(t, err)
info, err := r.Info()
assert.Nil(t, err)
assert.Equal(t, uint64(len(c.messages)), info.Statistics.MessageCount, "unexpected message count")
assert.Equal(t, uint32(len(c.channels)), info.Statistics.ChannelCount, "unexpected channel count")
assert.Equal(t, uint32(len(c.metadata)), info.Statistics.MetadataCount, "unexpected metadata count")
assert.Equal(
t,
uint32(len(c.attachments)),
info.Statistics.AttachmentCount,
"unexpected attachment count",
)
expectedTopicCounts := make(map[string]uint64)
for _, message := range c.messages {
channel, err := find(c.channels, func(channel *Channel) bool {
return channel.ID == message.ChannelID
})
assert.Nil(t, err)
expectedTopicCounts[channel.Topic]++
}
assert.Equal(t, expectedTopicCounts, info.ChannelCounts())
})
}
for k, v := range info.ChannelCounts() {
assert.Equal(t, expectedCounts[k], v, "mismatch on %s - got %d", k, v)
}

// find returns the first element in items that satisfies the given predicate.
func find[T any](items []T, f func(T) bool) (val T, err error) {
for _, v := range items {
if f(v) {
return v, nil
}
}
return val, fmt.Errorf("not found")
}

func TestReadingDiagnostics(t *testing.T) {
Expand Down

0 comments on commit d96232f

Please sign in to comment.