From 2079142e0e86f0926564adb86d6d95d3f40b057a Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Thu, 9 Jun 2022 14:19:38 -0700 Subject: [PATCH] CLI: Add support for merge subcommand (#418) Adds a merge subcommand that merges multiple mcap files into one. Examples of usage: mcap merge file1.mcap file2.mcap -o output.mcap mcap merge *.mcap -o output.mcap mcap merge *.mcap | mcap cat --json --- go/cli/mcap/Makefile | 2 + go/cli/mcap/cmd/merge.go | 324 +++++++++++++++++++++++ go/cli/mcap/cmd/merge_test.go | 94 +++++++ go/cli/mcap/utils/priority_queue.go | 63 +++++ go/cli/mcap/utils/priority_queue_test.go | 24 ++ go/cli/mcap/utils/utils.go | 15 ++ go/mcap/reader.go | 17 ++ go/mcap/writer.go | 11 +- 8 files changed, 545 insertions(+), 5 deletions(-) create mode 100644 go/cli/mcap/cmd/merge.go create mode 100644 go/cli/mcap/cmd/merge_test.go create mode 100644 go/cli/mcap/utils/priority_queue.go create mode 100644 go/cli/mcap/utils/priority_queue_test.go diff --git a/go/cli/mcap/Makefile b/go/cli/mcap/Makefile index 67fc1cbf0a..c5d3bf4199 100644 --- a/go/cli/mcap/Makefile +++ b/go/cli/mcap/Makefile @@ -1,3 +1,5 @@ +SHELL := /bin/bash + # enable cgo to build sqlite export CGO_ENABLED = 1 diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go new file mode 100644 index 0000000000..2c61176164 --- /dev/null +++ b/go/cli/mcap/cmd/merge.go @@ -0,0 +1,324 @@ +package cmd + +import ( + "container/heap" + "errors" + "fmt" + "io" + "math" + "os" + + "github.com/foxglove/mcap/go/cli/mcap/utils" + "github.com/foxglove/mcap/go/mcap" + "github.com/spf13/cobra" +) + +var ( + mergeProfile string + mergeCompression string + mergeChunkSize int64 + mergeIncludeCRC bool + mergeChunked bool + mergeOutputFile string +) + +type mergeOpts struct { + profile string + compression string + chunkSize int64 + includeCRC bool + chunked bool +} + +// schemaID uniquely identifies a schema across the inputs +type schemaID struct { + inputID int + schemaID uint16 +} + +// channelID uniquely identifies a channel across the inputs +type channelID struct { + inputID int + channelID uint16 +} + +type mcapMerger struct { + schemas map[schemaID]*mcap.Schema + channels map[channelID]*mcap.Channel + schemaIDs map[schemaID]uint16 + channelIDs map[channelID]uint16 + + outputChannelSchemas map[uint16]uint16 + + nextChannelID uint16 + nextSchemaID uint16 + opts mergeOpts +} + +func newMCAPMerger(opts mergeOpts) *mcapMerger { + return &mcapMerger{ + schemas: make(map[schemaID]*mcap.Schema), + channels: make(map[channelID]*mcap.Channel), + schemaIDs: make(map[schemaID]uint16), + channelIDs: make(map[channelID]uint16), + outputChannelSchemas: make(map[uint16]uint16), + nextChannelID: 1, + nextSchemaID: 1, + opts: opts, + } +} + +func (m *mcapMerger) outputChannelID(inputID int, inputChannelID uint16) (uint16, bool) { + v, ok := m.channelIDs[channelID{ + inputID: inputID, + channelID: inputChannelID, + }] + return v, ok +} + +func (m *mcapMerger) outputSchemaID(inputID int, inputSchemaID uint16) (uint16, bool) { + v, ok := m.schemaIDs[schemaID{ + inputID: inputID, + schemaID: inputSchemaID, + }] + return v, ok +} + +func (m *mcapMerger) addChannel(w *mcap.Writer, inputID int, channel *mcap.Channel) (uint16, error) { + outputSchemaID, ok := m.outputSchemaID(inputID, channel.SchemaID) + if !ok { + return 0, fmt.Errorf("unknown schema on channel %d for input %d topic %s", channel.ID, inputID, channel.Topic) + } + key := channelID{inputID, channel.ID} + newChannel := &mcap.Channel{ + ID: m.nextChannelID, // substitute the next output channel ID + SchemaID: outputSchemaID, // substitute the output schema ID + Topic: channel.Topic, + MessageEncoding: channel.MessageEncoding, + Metadata: channel.Metadata, + } + m.channels[key] = channel + m.channelIDs[key] = m.nextChannelID + err := w.WriteChannel(newChannel) + if err != nil { + return 0, fmt.Errorf("failed to write channel: %w", err) + } + m.nextChannelID++ + return newChannel.ID, nil +} + +func (m *mcapMerger) addSchema(w *mcap.Writer, inputID int, schema *mcap.Schema) (uint16, error) { + key := schemaID{inputID, schema.ID} + newSchema := &mcap.Schema{ + ID: m.nextSchemaID, // substitute the next output schema ID + Name: schema.Name, + Encoding: schema.Encoding, + Data: schema.Data, + } + m.schemas[key] = newSchema + m.schemaIDs[key] = m.nextSchemaID + err := w.WriteSchema(newSchema) + if err != nil { + return 0, fmt.Errorf("failed to write schema: %w", err) + } + m.nextSchemaID++ + return newSchema.ID, nil +} + +func buildIterator(r io.Reader) (mcap.MessageIterator, error) { + reader, err := mcap.NewReader(r) + if err != nil { + return nil, err + } + iterator, err := reader.Messages(0, math.MaxInt64, nil, false) + if err != nil { + return nil, err + } + return iterator, nil +} + +func (m *mcapMerger) mergeInputs(w io.Writer, inputs []io.Reader) error { + writer, err := mcap.NewWriter(w, &mcap.WriterOptions{ + Chunked: m.opts.chunked, + ChunkSize: m.opts.chunkSize, + Compression: mcap.CompressionFormat(m.opts.compression), + IncludeCRC: m.opts.includeCRC, + }) + if err != nil { + return fmt.Errorf("failed to create writer: %w", err) + } + err = writer.WriteHeader(&mcap.Header{ + Profile: m.opts.profile, + }) + if err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + + iterators := make([]mcap.MessageIterator, len(inputs)) + pq := utils.NewPriorityQueue(nil) + + // for each input reader, initialize an mcap reader and read the first + // message off. Insert the schema and channel into the output with + // renumbered IDs, and load the message (with renumbered IDs) into the + // priority queue. + for inputID, inputReader := range inputs { + iterator, err := buildIterator(inputReader) + if err != nil { + return fmt.Errorf("failed to build iterator for input %d: %w", inputID, err) + } + schema, channel, message, err := iterator.Next(nil) + if err != nil { + if errors.Is(err, io.EOF) { + // the file may be an empty mcap. if so, just ignore it. + continue + } + return fmt.Errorf("failed to read first message on input %d: %w", inputID, err) + } + schema.ID, err = m.addSchema(writer, inputID, schema) + if err != nil { + return fmt.Errorf("failed to add initial schema for input %d: %w", inputID, err) + } + message.ChannelID, err = m.addChannel(writer, inputID, channel) + if err != nil { + return fmt.Errorf("failed to add initial channel for input %d: %w", inputID, err) + } + iterators[inputID] = iterator + + // push the first message onto the priority queue + heap.Push(pq, utils.NewTaggedMessage(inputID, message)) + } + // there's one message per input on the heap now. Pop messages off, + // replacing them with the next message from the corresponding input. + for pq.Len() > 0 { + // the message to be written. This is numbered with the correct channel + // ID for the output, and schemas + channels for it have already been + // written, so it can be written straight to the output. + msg := heap.Pop(pq).(utils.TaggedMessage) + err = writer.WriteMessage(msg.Message) + if err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + + // Pull the next message off the iterator, to replace the one just + // popped from the queue. Before pushing this message, it must be + // renumbered and the related channels/schemas may need to be inserted. + newSchema, newChannel, newMessage, err := iterators[msg.InputID].Next(nil) + if err != nil { + if errors.Is(err, io.EOF) { + // if the iterator is empty, skip this read. No further messages + // on the input will be drawn from the heap, so we will not hit + // this code on behalf of the same iterator again. Once this + // happens for each input the queue will be empty and the loop + // will break. + continue + } + return fmt.Errorf("failed to pull next message: %w", err) + } + + // if the channel is unknown, need to add it to the output + var ok bool + newMessage.ChannelID, ok = m.outputChannelID(msg.InputID, newChannel.ID) + if !ok { + _, ok := m.outputSchemaID(msg.InputID, newSchema.ID) + if !ok { + // if the schema is unknown, add it to the output + m.addSchema(writer, msg.InputID, newSchema) + } + newMessage.ChannelID, err = m.addChannel(writer, msg.InputID, newChannel) + if err != nil { + return fmt.Errorf("failed to add channel: %w", err) + } + } + heap.Push(pq, utils.NewTaggedMessage(msg.InputID, newMessage)) + } + return writer.Close() +} + +// mergeCmd represents the merge command +var mergeCmd = &cobra.Command{ + Use: "merge file1.mcap [file2.mcap] [file3.mcap]...", + Short: "Merge a selection of mcap files by record timestamp", + Run: func(cmd *cobra.Command, args []string) { + if mergeOutputFile == "" && !utils.StdoutRedirected() { + die("Binary output can screw up your terminal. Supply -o or redirect to a file or pipe.") + } + var readers []io.Reader + for _, arg := range args { + f, err := os.Open(arg) + if err != nil { + die("failed to open %s: %s\n", arg, err) + } + defer f.Close() + readers = append(readers, f) + } + opts := mergeOpts{ + profile: mergeProfile, + compression: mergeCompression, + chunkSize: mergeChunkSize, + includeCRC: mergeIncludeCRC, + chunked: mergeChunked, + } + merger := newMCAPMerger(opts) + var writer io.Writer + if mergeOutputFile == "" { + writer = os.Stdout + } else { + f, err := os.Create(mergeOutputFile) + if err != nil { + die("failed to open output file %s: %s\n", mergeOutputFile, err) + } + defer f.Close() + writer = f + } + err := merger.mergeInputs(writer, readers) + if err != nil { + die(err.Error()) + } + }, +} + +func init() { + rootCmd.AddCommand(mergeCmd) + mergeCmd.PersistentFlags().StringVarP( + &mergeCompression, + "compression", + "", + "zstd", + "chunk compression algorithm (supported: zstd, lz4, none)", + ) + mergeCmd.PersistentFlags().StringVarP( + &mergeOutputFile, + "output-file", + "o", + "", + "output file", + ) + mergeCmd.PersistentFlags().Int64VarP( + &mergeChunkSize, + "chunk-size", + "", + 8*1024*1024, + "chunk size to target", + ) + mergeCmd.PersistentFlags().BoolVarP( + &mergeIncludeCRC, + "include-crc", + "", + true, + "include chunk CRC checksums in output", + ) + mergeCmd.PersistentFlags().BoolVarP( + &mergeChunked, + "chunked", + "", + true, + "chunk the output file", + ) + mergeCmd.PersistentFlags().StringVarP( + &mergeProfile, + "profile", + "", + "", + "profile to record in output header (default: empty string)", + ) +} diff --git a/go/cli/mcap/cmd/merge_test.go b/go/cli/mcap/cmd/merge_test.go new file mode 100644 index 0000000000..4b73ffd015 --- /dev/null +++ b/go/cli/mcap/cmd/merge_test.go @@ -0,0 +1,94 @@ +package cmd + +import ( + "bytes" + "io" + "math" + "testing" + + "github.com/foxglove/mcap/go/mcap" + "github.com/stretchr/testify/assert" +) + +func prepInput(t *testing.T, w io.Writer, schemaID uint16, channelID uint16, topic string) { + writer, err := mcap.NewWriter(w, &mcap.WriterOptions{ + Chunked: true, + }) + assert.Nil(t, err) + + assert.Nil(t, writer.WriteHeader(&mcap.Header{})) + assert.Nil(t, writer.WriteSchema(&mcap.Schema{ + ID: schemaID, + })) + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: channelID, + SchemaID: schemaID, + Topic: topic, + })) + for i := 0; i < 100; i++ { + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: channelID, + LogTime: uint64(i), + })) + } + assert.Nil(t, writer.Close()) +} + +func TestMCAPMerging(t *testing.T) { + for _, chunked := range []bool{true, false} { + buf1 := &bytes.Buffer{} + buf2 := &bytes.Buffer{} + buf3 := &bytes.Buffer{} + prepInput(t, buf1, 1, 1, "/foo") + prepInput(t, buf2, 1, 1, "/bar") + prepInput(t, buf3, 1, 1, "/baz") + merger := newMCAPMerger(mergeOpts{ + chunked: chunked, + }) + output := &bytes.Buffer{} + assert.Nil(t, merger.mergeInputs(output, []io.Reader{buf1, buf2, buf3})) + + // output should now be a well-formed mcap + reader, err := mcap.NewReader(output) + assert.Nil(t, err) + it, err := reader.Messages(0, math.MaxInt64, nil, false) + assert.Nil(t, err) + + messages := make(map[string]int) + err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error { + messages[channel.Topic]++ + return nil + }) + assert.Nil(t, err) + assert.Equal(t, 100, messages["/foo"]) + assert.Equal(t, 100, messages["/bar"]) + assert.Equal(t, 100, messages["/baz"]) + } +} + +func TestMultiChannelInput(t *testing.T) { + buf1 := &bytes.Buffer{} + buf2 := &bytes.Buffer{} + prepInput(t, buf1, 1, 1, "/foo") + prepInput(t, buf2, 1, 1, "/bar") + merger := newMCAPMerger(mergeOpts{}) + multiChannelInput := &bytes.Buffer{} + assert.Nil(t, merger.mergeInputs(multiChannelInput, []io.Reader{buf1, buf2})) + buf3 := &bytes.Buffer{} + prepInput(t, buf3, 2, 2, "/baz") + output := &bytes.Buffer{} + assert.Nil(t, merger.mergeInputs(output, []io.Reader{multiChannelInput, buf3})) + reader, err := mcap.NewReader(output) + assert.Nil(t, err) + it, err := reader.Messages(0, math.MaxInt64, nil, false) + assert.Nil(t, err) + messages := make(map[string]int) + err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error { + messages[channel.Topic]++ + return nil + }) + assert.Nil(t, err) + assert.Equal(t, 100, messages["/foo"]) + assert.Equal(t, 100, messages["/bar"]) + assert.Equal(t, 100, messages["/baz"]) +} diff --git a/go/cli/mcap/utils/priority_queue.go b/go/cli/mcap/utils/priority_queue.go new file mode 100644 index 0000000000..21b8296dfb --- /dev/null +++ b/go/cli/mcap/utils/priority_queue.go @@ -0,0 +1,63 @@ +package utils + +import ( + "container/heap" + + "github.com/foxglove/mcap/go/mcap" +) + +type PriorityQueue []TaggedMessage + +func (pq PriorityQueue) Len() int { + return len(pq) +} + +func (pq PriorityQueue) Less(i, j int) bool { + if pq[i].Message.LogTime != pq[j].Message.LogTime { + return pq[i].Message.LogTime < pq[j].Message.LogTime + } + if pq[i].InputID != pq[j].InputID { + return pq[i].InputID < pq[j].InputID + } + return pq[i].Message.ChannelID < pq[j].Message.ChannelID +} + +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +func (pq *PriorityQueue) Push(x any) { + msg := x.(TaggedMessage) + *pq = append(*pq, msg) +} + +func (pq *PriorityQueue) Pop() any { + old := *pq + n := len(old) + if n == 0 { + return nil + } + msg := old[n-1] + *pq = old[0 : n-1] + return msg +} + +// TaggedMessage is an mcap message, tagged with an identifier for the input it +// came from. +type TaggedMessage struct { + Message *mcap.Message + InputID int +} + +func NewTaggedMessage(inputID int, msg *mcap.Message) TaggedMessage { + return TaggedMessage{msg, inputID} +} + +func NewPriorityQueue(msgs []TaggedMessage) *PriorityQueue { + pq := &PriorityQueue{} + heap.Init(pq) + for _, msg := range msgs { + heap.Push(pq, msg) + } + return pq +} diff --git a/go/cli/mcap/utils/priority_queue_test.go b/go/cli/mcap/utils/priority_queue_test.go new file mode 100644 index 0000000000..4c4ee382af --- /dev/null +++ b/go/cli/mcap/utils/priority_queue_test.go @@ -0,0 +1,24 @@ +package utils + +import ( + "container/heap" + "testing" + + "github.com/foxglove/mcap/go/mcap" + "github.com/stretchr/testify/assert" +) + +func TestPriorityQueue(t *testing.T) { + a := NewTaggedMessage(1, &mcap.Message{LogTime: 1}) + b := NewTaggedMessage(2, &mcap.Message{LogTime: 2}) + c := NewTaggedMessage(3, &mcap.Message{LogTime: 3}) + t.Run("initialized with messages", func(t *testing.T) { + pq := NewPriorityQueue([]TaggedMessage{c, b, a}) + for _, expectedTime := range []uint64{1, 2, 3} { + msg, ok := heap.Pop(pq).(TaggedMessage) + assert.True(t, ok) + assert.Equal(t, expectedTime, msg.Message.LogTime) + } + assert.Panics(t, func() { heap.Pop(pq) }, "expected Pop on empty heap to panic") + }) +} diff --git a/go/cli/mcap/utils/utils.go b/go/cli/mcap/utils/utils.go index 5cc07d1002..2f69ce4ea9 100644 --- a/go/cli/mcap/utils/utils.go +++ b/go/cli/mcap/utils/utils.go @@ -22,6 +22,21 @@ func GetScheme(filename string) (string, string, string) { return match[1], match[2], match[3] } +func ReadingStdin() (bool, error) { + stat, err := os.Stdin.Stat() + if err != nil { + return false, err + } + return stat.Mode()&os.ModeCharDevice == 0, nil +} + +func StdoutRedirected() bool { + if fi, _ := os.Stdout.Stat(); (fi.Mode() & os.ModeCharDevice) == os.ModeCharDevice { + return false + } + return true +} + func GetReader(ctx context.Context, filename string) (func() error, io.ReadSeekCloser, error) { var rs io.ReadSeekCloser var err error diff --git a/go/mcap/reader.go b/go/mcap/reader.go index 71dcfa3244..3206f6475c 100644 --- a/go/mcap/reader.go +++ b/go/mcap/reader.go @@ -2,6 +2,7 @@ package mcap import ( "encoding/binary" + "errors" "fmt" "io" "math" @@ -62,6 +63,22 @@ type MessageIterator interface { Next([]byte) (*Schema, *Channel, *Message, error) } +func Range(it MessageIterator, f func(*Schema, *Channel, *Message) error) error { + for { + schema, channel, message, err := it.Next(nil) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("failed to read record: %w", err) + } + err = f(schema, channel, message) + if err != nil { + return fmt.Errorf("failed to process record: %w", err) + } + } +} + func (r *Reader) unindexedIterator(topics []string, start uint64, end uint64) *unindexedMessageIterator { topicMap := make(map[string]bool) for _, topic := range topics { diff --git a/go/mcap/writer.go b/go/mcap/writer.go index 57a39fa7b8..dcd4bff957 100644 --- a/go/mcap/writer.go +++ b/go/mcap/writer.go @@ -44,6 +44,8 @@ type Writer struct { currentChunkEndTime uint64 opts *WriterOptions + + closed bool } // WriteHeader writes a header record to the output. @@ -108,7 +110,7 @@ func (w *Writer) WriteSchema(s *Schema) (err error) { offset += putPrefixedString(w.msg[offset:], s.Name) offset += putPrefixedString(w.msg[offset:], s.Encoding) offset += putPrefixedBytes(w.msg[offset:], s.Data) - if w.opts.Chunked { + if w.opts.Chunked && !w.closed { _, err = w.writeRecord(w.compressedWriter, OpSchema, w.msg[:offset]) } else { _, err = w.writeRecord(w.w, OpSchema, w.msg[:offset]) @@ -147,7 +149,7 @@ func (w *Writer) WriteChannel(c *Channel) error { offset += putPrefixedString(w.msg[offset:], c.MessageEncoding) offset += copy(w.msg[offset:], userdata) var err error - if w.opts.Chunked { + if w.opts.Chunked && !w.closed { _, err = w.writeRecord(w.compressedWriter, OpChannel, w.msg[:offset]) if err != nil { return err @@ -183,7 +185,7 @@ func (w *Writer) WriteMessage(m *Message) error { offset += copy(w.msg[offset:], m.Data) w.Statistics.ChannelMessageCounts[m.ChannelID]++ w.Statistics.MessageCount++ - if w.opts.Chunked { + if w.opts.Chunked && !w.closed { idx, ok := w.messageIndexes[m.ChannelID] if !ok { idx = &MessageIndex{ @@ -628,8 +630,7 @@ func (w *Writer) Close() error { return fmt.Errorf("failed to flush active chunks: %w", err) } } - w.opts.Chunked = false - + w.closed = true err := w.WriteDataEnd(&DataEnd{ DataSectionCRC: 0, })