Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Feed PubSub: execute subscribers' blocking operations in separate goroutines #2208

Merged
merged 8 commits into from
Feb 4, 2022

Conversation

leszko
Copy link
Contributor

@leszko leszko commented Jan 25, 2022

What does this pull request do? Explain your changes. (required)
Avoid blocking Feed mechanism by executing every blocking operation in a separate goroutine. As a result, if any operation blocks, then we have a blocked/leaked goroutine instead of stopping the whole livepeer.

See #2207 for a detailed description.

Specific updates (required)

  • Serve each feed event in a separate goroutine
  • Add locks to avoid multiple goroutines performing the same operations more than once
  • Renaming the feed sink variables to keep the same naming convention in all files

How did you test each of these updates (required)

Checked the same scenario using the code from master and the code from this PR.

  1. Insert time.Sleep(1 * time.Hour) into roundinitializer.go#TryInitialize() to simulate that this operation is blocked
  2. Start local Geth, Orchestrator, Broadcaster, and a video stream.
  3. From Livepeer CLI, execute a few times Initialize Round and Reward.
  4. After 5 min check the logs from orchestrator

In the case of master, orchestrator was blocked and stopped transcoding. Broadcaster failed transcoding with the following logs.

I0125 11:23:11.586426    6783 mediaserver.go:925] manifestID=movie nonce=17482076527360272128 seqNo=0 Finished push request at url=http://localhost:8935/live/movie/test197.ts ua=Go-http-client/1.1 addr=127.0.0.1:57864 bytes=995648 dur= resolution= took=26.432ms
I0125 11:23:11.586430    6783 broadcast.go:122] manifestID=movie nonce=17482076527360272128 seqNo=0 Starting session refresh
I0125 11:23:11.586697    6783 rpc.go:265] manifestID=movie nonce=17482076527360272128 Connecting RPC to uri=https://127.0.0.1:8936
I0125 11:23:11.588929    6783 db_discovery.go:129] manifestID=movie nonce=17482076527360272128 invalid ticket params orch=https://127.0.0.1:8936 err="TicketParams expired"

In the case of code from this PR, everything worked, even though the roundinitializer was blocked.

Does this pull request close any open issues?
fix #2168

Checklist:

Copy link
Member

@victorges victorges left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing only the go logic for processing events in a separate go-routine. My understanding is that it works just because we don't stop receiving from the src channel and allow it to fill up, blocking the sender. That looks good to me, just make sure that the order of the events really do not matter, as mutexes are not FIFO.

It would be good to avoid holding so many (locked) goroutines in background. At the same time as you could use a channel for the locking instead, with which you could provide rate-limiting and timeouts, it could also be a premature optimization right now that we don't really need. So if we do implement timeouts for all the internal operations of those routines (which I understood is the next step here), we should be good already.

server/redeemer.go Outdated Show resolved Hide resolved
server/redeemer.go Outdated Show resolved Hide resolved
pm/queue.go Outdated Show resolved Hide resolved
@victorges
Copy link
Member

victorges commented Jan 26, 2022

Thinking further about this, WDYT instead of providing a helper function somewhat like this? It basically works as a channel with a dynamically-sized, growing buffer.

func pipeBuffered(ctx context.Context, in <-chan interface{}) <-chan interface{} {
	out := make(chan interface{}, 1)
	go func() {
		defer close(out)
		buf := []interface{}{}
	pipe:
		for {
			if len(buf) == 0 {
				select {
				case <-ctx.Done():
					return
				case val, ok := <-in:
					if !ok {
						break pipe
					}
					buf = append(buf, val)
				}
				continue
			}
			select {
			case <-ctx.Done():
				return
			case val, ok := <-in:
				if !ok {
					break pipe
				}
				buf = append(buf, val)
			case out <- buf[0]:
				buf = buf[1:]
			}
		}
		for val := range buf {
			select {
			case <-ctx.Done():
				return
			case out <- val:
			}
		}
	}()
	return out
}

With it, you would pass the subscription channel and it would return a channel which only has a "direct" buffer of 1, but actually has a resizable buffer kept as an off-channel slice. It would keep the events ordering while still holding more control over the parallelism, and a simpler usage in the end (just the way it was before). For example, later we could define a hard limit to the slice buffer and have events be dropped if we reach it. Or maybe add a log in case the buffer gets too big.

Implementation can be optimized on how it grows/shrinks the slice, for example with a circular buffer, but I guess it could be enough for now. Performance-wise, I think it would even be a little cheaper than maintaining N locked goroutines, given that each goroutine will also need to hold at least 1 event in the stack, which is everything that we hold in the slice buffer.

@leszko
Copy link
Contributor Author

leszko commented Jan 27, 2022

Thinking further about this, WDYT instead of providing a helper function somewhat like this? It basically works as a channel with a dynamically-sized, growing buffer.

func pipeBuffered(ctx context.Context, in <-chan interface{}) <-chan interface{} {
	out := make(chan interface{}, 1)
	go func() {
		defer close(out)
		buf := []interface{}{}
	pipe:
		for {
			if len(buf) == 0 {
				select {
				case <-ctx.Done():
					return
				case val, ok := <-in:
					if !ok {
						break pipe
					}
					buf = append(buf, val)
				}
				continue
			}
			select {
			case <-ctx.Done():
				return
			case val, ok := <-in:
				if !ok {
					break pipe
				}
				buf = append(buf, val)
			case out <- buf[0]:
				buf = buf[1:]
			}
		}
		for val := range buf {
			select {
			case <-ctx.Done():
				return
			case out <- val:
			}
		}
	}()
	return out
}

With it, you would pass the subscription channel and it would return a channel which only has a "direct" buffer of 1, but actually has a resizable buffer kept as an off-channel slice. It would keep the events ordering while still holding more control over the parallelism, and a simpler usage in the end (just the way it was before). For example, later we could define a hard limit to the slice buffer and have events be dropped if we reach it. Or maybe add a log in case the buffer gets too big.

Implementation can be optimized on how it grows/shrinks the slice, for example with a circular buffer, but I guess it could be enough for now. Performance-wise, I think it would even be a little cheaper than maintaining N locked goroutines, given that each goroutine will also need to hold at least 1 event in the stack, which is everything that we hold in the slice buffer.

This looks very nice at first, but... Golang (at least the version we use in the project) does not have generics and therefore it'll not bake in nicely into our watchers. We would need to create a similar function for each chan parameter that we use (which is probably overkill).

At first, I thought we could change all the Subscribe functions to use sink chan<- interface{} instead of sink chan<- types.Log (or similar typed channels), but it will not work as well, because you actually need to pass a typed channel into feed.Subscribe(). Otherwise feed.Send() does not work.

@leszko
Copy link
Contributor Author

leszko commented Jan 27, 2022

Reviewing only the go logic for processing events in a separate go-routine. My understanding is that it works just because we don't stop receiving from the src channel and allow it to fill up, blocking the sender. That looks good to me, just make sure that the order of the events really do not matter, as mutexes are not FIFO.

Yes. I think that the order of events does not matter. Also, these locks are actually kind-of optimization to not execute the same transaction at the same time. It's enough to execute it once, so trying to send it twice at the same time may result in some gas burnt unnecessarily.

It would be good to avoid holding so many (locked) goroutines in background. At the same time as you could use a channel for the locking instead, with which you could provide rate-limiting and timeouts, it could also be a premature optimization right now that we don't really need. So if we do implement timeouts for all the internal operations of those routines (which I understood is the next step here), we should be good already.

Yes, I think it's technically possible to convert it into a channel-based solution, I just find it more difficult. Check my comment into your solution with pipeBuffered().

@victorges
Copy link
Member

This looks very nice at first, but... Golang (at least the version we use in the project) does not have generics and therefore it'll not bake in nicely into our watchers. We would need to create a similar function for each chan parameter that we use (which is probably overkill).

Yeah makes sense! Can't wait for generics to reach a stable version 🤩

Copy link
Member

@victorges victorges left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

eth/watchers/orchestratorwatcher.go Outdated Show resolved Hide resolved
eth/watchers/orchestratorwatcher.go Outdated Show resolved Hide resolved
server/redeemer.go Show resolved Hide resolved
Copy link
Contributor Author

@leszko leszko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yondonfu I addressed your comments, PTAL

@leszko leszko requested a review from yondonfu February 3, 2022 15:31
Copy link
Member

@yondonfu yondonfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Looks like the commit structure could use a bit of clean up before merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve TimeWatcher PubSub mechanism
3 participants