From d96232ffd8d88c0e041ff6bb1dbba7eef87d1b44 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 20 Sep 2022 11:49:48 -0700 Subject: [PATCH] Go: Add support for metadata indexes to reader (#589) * Go: Add support for metadata indexes to reader Gather metadata index information when reading the summary section. Also, bump required go version to 1.18. * Upgrade golangci-lint --- .github/workflows/ci.yml | 2 +- go/mcap/go.mod | 2 +- go/mcap/indexed_message_iterator.go | 8 +- go/mcap/mcap.go | 1 + go/mcap/parse_test.go | 111 ++++++++++++++++++ go/mcap/reader.go | 1 + go/mcap/reader_test.go | 173 ++++++++++++++++++++++++---- 7 files changed, 274 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3dbfeccee2..2d917f42fb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -301,7 +301,7 @@ jobs: with: lfs: true - name: install golangci-lint - run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.44.2 + run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.49.0 - run: make lint - run: make test diff --git a/go/mcap/go.mod b/go/mcap/go.mod index ef2597205f..76bde5e819 100644 --- a/go/mcap/go.mod +++ b/go/mcap/go.mod @@ -1,6 +1,6 @@ module github.com/foxglove/mcap/go/mcap -go 1.17 +go 1.18 require ( github.com/klauspost/compress v1.14.1 diff --git a/go/mcap/indexed_message_iterator.go b/go/mcap/indexed_message_iterator.go index 7a09717a1c..ffc5d8342d 100644 --- a/go/mcap/indexed_message_iterator.go +++ b/go/mcap/indexed_message_iterator.go @@ -26,6 +26,7 @@ type indexedMessageIterator struct { statistics *Statistics chunkIndexes []*ChunkIndex attachmentIndexes []*AttachmentIndex + metadataIndexes []*MetadataIndex indexHeap rangeIndexHeap @@ -89,6 +90,12 @@ func (it *indexedMessageIterator) parseSummarySection() error { return fmt.Errorf("failed to parse attachment index: %w", err) } it.attachmentIndexes = append(it.attachmentIndexes, idx) + case TokenMetadataIndex: + idx, err := ParseMetadataIndex(record) + if err != nil { + return fmt.Errorf("failed to parse metadata index: %w", err) + } + it.metadataIndexes = append(it.metadataIndexes, idx) case TokenChunkIndex: idx, err := ParseChunkIndex(record) if err != nil { @@ -211,7 +218,6 @@ func (it *indexedMessageIterator) Next(p []byte) (*Schema, *Channel, *Message, e return nil, nil, nil, err } } - for it.indexHeap.Len() > 0 { ri, err := it.indexHeap.HeapPop() if err != nil { diff --git a/go/mcap/mcap.go b/go/mcap/mcap.go index f8e42d0aea..e8f69a21ce 100644 --- a/go/mcap/mcap.go +++ b/go/mcap/mcap.go @@ -291,6 +291,7 @@ type Info struct { Channels map[uint16]*Channel Schemas map[uint16]*Schema ChunkIndexes []*ChunkIndex + MetadataIndexes []*MetadataIndex AttachmentIndexes []*AttachmentIndex Header *Header } diff --git a/go/mcap/parse_test.go b/go/mcap/parse_test.go index 91d6f02fb1..71c2b94331 100644 --- a/go/mcap/parse_test.go +++ b/go/mcap/parse_test.go @@ -45,6 +45,117 @@ func TestParseHeader(t *testing.T) { } } +func TestParseMetadata(t *testing.T) { + cases := []struct { + assertion string + input []byte + output *Metadata + err error + }{ + { + "empty input", + []byte{}, + nil, + io.ErrShortBuffer, + }, + { + "missing metadata", + prefixedString("metadata"), + nil, + io.ErrShortBuffer, + }, + { + "empty metadata", + flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{})), + &Metadata{ + Name: "metadata", + Metadata: make(map[string]string), + }, + nil, + }, + { + "one value", + flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{ + "foo": "bar", + })), + &Metadata{ + Name: "metadata", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + nil, + }, + { + "two values", + flatten(prefixedString("metadata"), makePrefixedMap(map[string]string{ + "foo": "bar", + "spam": "eggs", + })), + &Metadata{ + Name: "metadata", + Metadata: map[string]string{ + "foo": "bar", + "spam": "eggs", + }, + }, + nil, + }, + } + for _, c := range cases { + t.Run(c.assertion, func(t *testing.T) { + output, err := ParseMetadata(c.input) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, output, c.output) + }) + } +} + +func TestParseMetadataIndex(t *testing.T) { + cases := []struct { + assertion string + input []byte + output *MetadataIndex + err error + }{ + { + "empty input", + []byte{}, + nil, + io.ErrShortBuffer, + }, + { + "offset only", + encodedUint64(100), + nil, + io.ErrShortBuffer, + }, + { + "missing name", + flatten(encodedUint64(100), encodedUint64(1000)), + nil, + io.ErrShortBuffer, + }, + { + "well-formed index", + flatten(encodedUint64(100), encodedUint64(1000), prefixedString("metadata")), + &MetadataIndex{ + Name: "metadata", + Offset: 100, + Length: 1000, + }, + nil, + }, + } + for _, c := range cases { + t.Run(c.assertion, func(t *testing.T) { + output, err := ParseMetadataIndex(c.input) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, output, c.output) + }) + } +} + func TestParseFooter(t *testing.T) { cases := []struct { assertion string diff --git a/go/mcap/reader.go b/go/mcap/reader.go index 968819910b..8f28e46e7a 100644 --- a/go/mcap/reader.go +++ b/go/mcap/reader.go @@ -178,6 +178,7 @@ func (r *Reader) Info() (*Info, error) { Channels: it.channels, ChunkIndexes: it.chunkIndexes, AttachmentIndexes: it.attachmentIndexes, + MetadataIndexes: it.metadataIndexes, Schemas: it.schemas, Header: header, }, nil diff --git a/go/mcap/reader_test.go b/go/mcap/reader_test.go index e8ac939c09..abe8ddb75a 100644 --- a/go/mcap/reader_test.go +++ b/go/mcap/reader_test.go @@ -396,29 +396,160 @@ func TestReaderCounting(t *testing.T) { } func TestMCAPInfo(t *testing.T) { - f, err := os.Open("../../testdata/mcap/demo.mcap") - assert.Nil(t, err) - defer f.Close() - assert.Nil(t, err) - r, err := NewReader(f) - assert.Nil(t, err) - info, err := r.Info() - assert.Nil(t, err) - assert.Equal(t, uint64(1606), info.Statistics.MessageCount) - assert.Equal(t, uint32(7), info.Statistics.ChannelCount) - assert.Equal(t, 14, int(info.Statistics.ChunkCount)) - expectedCounts := map[string]uint64{ - "/radar/points": 156, - "/radar/tracks": 156, - "/radar/range": 156, - "/velodyne_points": 78, - "/diagnostics": 52, - "/tf": 774, - "/image_color/compressed": 234, + cases := []struct { + assertion string + schemas []*Schema + channels []*Channel + messages []*Message + metadata []*Metadata + attachments []*Attachment + }{ + { + "no metadata or attachments", + []*Schema{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + []*Channel{ + { + ID: 1, + SchemaID: 1, + Topic: "/foo", + }, + { + ID: 2, + SchemaID: 2, + Topic: "/bar", + }, + }, + []*Message{ + { + ChannelID: 1, + }, + { + ChannelID: 2, + }, + }, + []*Metadata{}, + []*Attachment{}, + }, + { + "no metadata or attachments", + []*Schema{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + []*Channel{ + { + ID: 1, + SchemaID: 1, + Topic: "/foo", + }, + { + ID: 2, + SchemaID: 2, + Topic: "/bar", + }, + }, + []*Message{ + { + ChannelID: 1, + }, + { + ChannelID: 2, + }, + }, + []*Metadata{ + { + Name: "metadata1", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + { + Name: "metadata2", + Metadata: map[string]string{ + "foo": "bar", + }, + }, + }, + []*Attachment{ + { + Name: "my attachment", + }, + }, + }, + } + for _, c := range cases { + t.Run(c.assertion, func(t *testing.T) { + buf := &bytes.Buffer{} + w, err := NewWriter(buf, &WriterOptions{ + Chunked: true, + ChunkSize: 1024, + Compression: CompressionLZ4, + }) + assert.Nil(t, err) + assert.Nil(t, w.WriteHeader(&Header{})) + for _, schema := range c.schemas { + assert.Nil(t, w.WriteSchema(schema)) + } + for _, channel := range c.channels { + assert.Nil(t, w.WriteChannel(channel)) + } + for _, message := range c.messages { + assert.Nil(t, w.WriteMessage(message)) + } + for _, metadata := range c.metadata { + assert.Nil(t, w.WriteMetadata(metadata)) + } + for _, attachment := range c.attachments { + assert.Nil(t, w.WriteAttachment(attachment)) + } + assert.Nil(t, w.Close()) + + reader := bytes.NewReader(buf.Bytes()) + r, err := NewReader(reader) + assert.Nil(t, err) + info, err := r.Info() + assert.Nil(t, err) + assert.Equal(t, uint64(len(c.messages)), info.Statistics.MessageCount, "unexpected message count") + assert.Equal(t, uint32(len(c.channels)), info.Statistics.ChannelCount, "unexpected channel count") + assert.Equal(t, uint32(len(c.metadata)), info.Statistics.MetadataCount, "unexpected metadata count") + assert.Equal( + t, + uint32(len(c.attachments)), + info.Statistics.AttachmentCount, + "unexpected attachment count", + ) + expectedTopicCounts := make(map[string]uint64) + for _, message := range c.messages { + channel, err := find(c.channels, func(channel *Channel) bool { + return channel.ID == message.ChannelID + }) + assert.Nil(t, err) + expectedTopicCounts[channel.Topic]++ + } + assert.Equal(t, expectedTopicCounts, info.ChannelCounts()) + }) } - for k, v := range info.ChannelCounts() { - assert.Equal(t, expectedCounts[k], v, "mismatch on %s - got %d", k, v) +} + +// find returns the first element in items that satisfies the given predicate. +func find[T any](items []T, f func(T) bool) (val T, err error) { + for _, v := range items { + if f(v) { + return v, nil + } } + return val, fmt.Errorf("not found") } func TestReadingDiagnostics(t *testing.T) {