-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Streaming Client interaction test #4132
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
With this change, we are adding tests for interaction between the producer and consumer client. The test scenarion being added is where we kill the consumer client and make sure that producer client kills the stream as well
442ce6f
to
f87a33e
Compare
pkg/nats/stream/producer_client.go
Outdated
log.Ctx(ctx).Err(err).Msg("heartbeat request to consumer client timed out") | ||
nonActiveStreamIds[c] = append(nonActiveStreamIds[c], activeStreamIds...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this mean we close all streams the first time we fail to heartbeat? There can be an area of improvement here we should only close streams that fail N consecutive heartbeats, where N can equal to 3 by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I already have an issue opened for optimising this ! Will solve it as part of that ! #4026
pkg/nats/stream/producer_client.go
Outdated
for streamId, streamInfo := range streamInfoMap { | ||
activeStreamIds = append(activeStreamIds, streamId) | ||
activeStreamIdsByReqSubj[streamInfo.RequestSub] = append(activeStreamIdsByReqSubj[streamInfo.RequestSub], streamInfo.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is streamInfo.ID
the same as streamId
?
There are too many maps in ProducerClient
, activeStreamInfo
and activeConnHeartBeatRequestSubjects
, but still they are not enough as we have to construct this activeStreamIdsByReqSubj
with every heartbeat. Is there anything we can do to make things more readable? If having two maps already are not making things more readable or efficient as we still have to loop here, does it make sense to just have one activeStreamInfo
type and have helper methods that return grouped by streams on-demand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me revisit it !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made it more readable!
pkg/nats/stream/producer_client.go
Outdated
nonActiveStreamIdsFromConsumer := getStringList(heartBeatResponse.NonActiveStreamIds) | ||
if len(nonActiveStreamIdsFromConsumer) != 0 { | ||
nonActiveStreamIds[c] = append(nonActiveStreamIds[c], nonActiveStreamIdsFromConsumer...) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we expect nonActiveStreamIds
to have values for the same consumer from a previous loop iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand. This is still happening in the same loop right ! We are just flattening the map. The loop is for different consumers!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is nonActiveStreamIds[c] = append(nonActiveStreamIds[c], nonActiveStreamIdsFromConsumer...)
effectively the same as nonActiveStreamIds[c] = nonActiveStreamIdsFromConsumer
because nonActiveStreamIds[c]
is only populated once
Config: StreamProducerClientConfig{ | ||
HeartBeatIntervalDuration: 2 * time.Second, | ||
HeartBeatRequestTimeout: 1 * time.Second, | ||
StreamCancellationBufferDuration: 2 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feel free to adjust these values to speed up the test if they are too slow
pkg/nats/stream/producer_client.go
Outdated
activeStreamIdsForConn := pc.activeConsumers[consumerID].ActiveStreamInfo | ||
if activeStreamIdsForConn == nil { | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this return nil pointer exception if no entry for for consumerID
is found?
pkg/nats/stream/producer_client.go
Outdated
nonActiveStreamIdsFromConsumer := getStringList(heartBeatResponse.NonActiveStreamIds) | ||
if len(nonActiveStreamIdsFromConsumer) != 0 { | ||
nonActiveStreamIds[c] = append(nonActiveStreamIds[c], nonActiveStreamIdsFromConsumer...) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is nonActiveStreamIds[c] = append(nonActiveStreamIds[c], nonActiveStreamIdsFromConsumer...)
effectively the same as nonActiveStreamIds[c] = nonActiveStreamIdsFromConsumer
because nonActiveStreamIds[c]
is only populated once
With this change, we are adding tests for interaction between the producer and consumer client.
The test scenarion being added is where we kill the consumer client and make sure that producer client kills the stream as well