Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublisherActor - Failed to enqueue errors #89

Open
pbuz opened this issue Dec 23, 2019 · 6 comments
Open

PublisherActor - Failed to enqueue errors #89

pbuz opened this issue Dec 23, 2019 · 6 comments

Comments

@pbuz
Copy link

pbuz commented Dec 23, 2019

We are trying to use KMS to schedule messages from a topic on which we produce 500 messages/second. Unfortunately our test fails while using KMS version 0.22.0 of the Docker image and the pod starts to restart and we can see this error pouring in the logs:

[2019-12-23 14:02:36,814] alto-kms ERROR [kafka-message-scheduler-akka.actor.default-dispatcher-21] com.sky.kms.actors.PublisherActor - Failed to enqueue 8450874c-a1a2-4895-9fc1-4f098d699a38
java.lang.IllegalStateException: You have to wait for the previous offer to be resolved to send another request
	at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:101)
	at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:114)
	at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:108)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:452)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:481)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:581)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:749)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:764)
	at akka.actor.Actor.aroundReceive(Actor.scala:539)
	at akka.actor.Actor.aroundReceive$(Actor.scala:537)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:671)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
	at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:581)
	at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1)
	at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
	at akka.kamon.instrumentation.ActorMonitors$$anon$1.$anonfun$processMessage$1(ActorMonitor.scala:134)
	at kamon.Kamon$.withContext(Kamon.scala:120)
	at akka.kamon.instrumentation.ActorMonitors$$anon$1.processMessage(ActorMonitor.scala:134)
	at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorInstrumentation.scala:45)
	at akka.actor.ActorCell.invoke(ActorCell.scala:574)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at kamon.executors.Executors$InstrumentedExecutorService$$anon$7.run(Executors.scala:270)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

We would like to mention that we were able to run our test successfully for 250 messages/second.

@manuelcueto
Copy link
Contributor

Thanks. We'll take a look.

@pbuz
Copy link
Author

pbuz commented Jan 3, 2020

Hi @manuelcueto, do you have any updates on this?

@manuelcueto
Copy link
Contributor

Some thoughts on it:

private def receiveWithQueue(queue: ScheduleQueue): Receive = {
   case Trigger(scheduleId, schedule) =>
     queue.offer((scheduleId, messageFrom(schedule))) onComplete {
       case Success(QueueOfferResult.Enqueued) =>
         log.debug(ScheduleQueueOfferResult(scheduleId, QueueOfferResult.Enqueued).show)
       case Success(res) =>
         log.warning(ScheduleQueueOfferResult(scheduleId, res).show)
       case Failure(t) =>
         log.error(t, s"Failed to enqueue $scheduleId")
         self ! DownstreamFailure(t)
     }
 }

This is the conflicting piece of code, where we offer to the queue and we get a Future back which is the way akka handles backpressure. The future will not complete until the buffer can hold another element. since we're not waiting for the future to complete here, if we call offer again while the buffer it's full, the queue will fail and we're currently not handling that gracefully.
A solution would be to context become to another receive which will wait for completion while stashing incoming requests, and once it's completed unstash them and go back to the 'available' state

@pbuz
Copy link
Author

pbuz commented Jan 7, 2020

@manuelcueto we played a bit with the value of the scheduler.publisher.queue-buffer-size to see whether we can have this passing, but it is still failing for 500 messages/second.

@lacarvalho91
Copy link
Contributor

have you tried setting the buffer size to max int? i believe thats how it is configured in MAP @manuelcueto

@paulaylingdev
Copy link
Contributor

@pbuz could you post your deployment configuration here where you are seeing the issues?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants