Skip to content

Commit

Permalink
sebtopic/ReadRecords: stop when no more records available
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Jul 5, 2024
1 parent b4afb96 commit 6bd0977
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 1 addition & 1 deletion internal/sebrecords/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (rb *Parser) Records(recordIndexStart uint32, recordIndexEnd uint32) ([]Rec
return nil, fmt.Errorf("%d records available, end record index %d does not exist: %w", rb.Header.NumRecords, recordIndexEnd, seb.ErrOutOfBounds)
}
if recordIndexStart >= recordIndexEnd {
return nil, fmt.Errorf("recordIndexStart must be lower than recordIndexEnd")
return nil, fmt.Errorf("recordIndexStart (%d) must be lower than recordIndexEnd (%d)", recordIndexStart, recordIndexEnd)
}

recordOffsetStart := rb.recordIndex[recordIndexStart]
Expand Down
5 changes: 5 additions & 0 deletions internal/sebtopic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ func (s *Topic) ReadRecords(ctx context.Context, offset uint64, maxRecords int,
}
}

// we read enough records to satisfy the request
if numRecords == 0 {
break
}

newRecords, err := rb.Records(batchRecordIndex, batchRecordIndex+numRecords)
if err != nil {
return records, fmt.Errorf("record batch '%s': %w", s.recordBatchPath(batchOffset), err)
Expand Down
5 changes: 3 additions & 2 deletions internal/sebtopic/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ func TestTopicReadRecords(t *testing.T) {
"max bytes, 20": {offset: 10, maxRecords: 50, softMaxBytes: 20 * recordSize, expectedRecords: records[10:30]},
"max bytes, at least one record": {offset: 10, maxRecords: 50, softMaxBytes: 1, expectedRecords: records[10:11]},
"max bytes before max records": {offset: 10, maxRecords: 4, softMaxBytes: 5 * recordSize, expectedRecords: records[10:14]},
"max records, offset into middle of batch": {offset: 13, maxRecords: 13, expectedRecords: records[13:26]},
"max bytes, offset into middle of batch": {offset: 13, maxRecords: totalRecords, softMaxBytes: recordSize * 13, expectedRecords: records[13:26]},
"max records, offset into middle of batch": {offset: 13, maxRecords: 13, expectedRecords: records[13:26]},
"max bytes, offset into middle of batch": {offset: 13, maxRecords: totalRecords, softMaxBytes: recordSize * 13, expectedRecords: records[13:26]},
"last record of batch, not first of next batch": {offset: 9, maxRecords: 10, softMaxBytes: recordSize + 1, expectedRecords: records[9:10]},
}

for name, test := range tests {
Expand Down

0 comments on commit 6bd0977

Please sign in to comment.