Skip to content

Producer/Consumer GenStage pipeline with graceful shutdown (ie. draining of buffered events)

Notifications You must be signed in to change notification settings

rt-chang/genstage-draining

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GenStage Draining

Background

This project demonstrates an approach to draining buffered events at shutdown in a GenStage pipeline. This particular pipeline handles the processing of numbers. The Consumer will demand a number from the Producer in order to output it to console.

There is the ability to drain buffered events with Broadway, but it did not fit my use case -- or perhaps I was not using it correctly. I wanted to ensure that the Consumer would have enough time to process data in-flight at the invocation of shutting down

About the Code

The Producer features the use of the Erlang queue and keeps track of accumulated demand via a counter. This was modelled after the examples given in the GenStage documentation.

The Consumer traps exits in order to execute the terminate callback. In terminate, the Consumer requests the event buffer queue from the Producer and proceeds to drain the events in order.

It should be noted that the Consumer is given 10 seconds to complete the draining of any buffered events. After the 10 seconds has elapsed, the application is forcefully shutdown. The 10 seconds is set in the Application module

To Run

  1. Run mix deps.get to get all dependencies
  2. Run iex -S mix in console
  3. Run the helper function to load n events into the Producer. This is done by calling GenStageDraining.Helper.enqueue_n_events(n) with a sufficiently large number (perhaps >5000)
  4. Arbitrarily call :init.stop() to shutdown the application and trigger the draining of events from the buffer queue. This can be done by copy/pasting the command into console as typing it out may be too slow
  5. You can check the log file (located in ./log/info.log) to see the output. Statements have been added in to observe when the application begins its termination routine, when the process' mailbox is emptied, and when events are drained from the pipeline

About

Producer/Consumer GenStage pipeline with graceful shutdown (ie. draining of buffered events)

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages