-
Notifications
You must be signed in to change notification settings - Fork 6
/
SqsQueueConsumer.kt
29 lines (23 loc) · 948 Bytes
/
SqsQueueConsumer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.seekerwing.aws.sqsconsumer
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.seekerwing.aws.sqsconsumer.messageprovider.MessageProvider
/**
* Amazon SQS based implementation of [QueueConsumer].
*/
internal class SqsQueueConsumer(private val provider: MessageProvider, private val consumer: MessageConsumer) :
CoroutineScope, QueueConsumer {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + supervisorJob
override fun start() = launch {
val messageChannel = provider.provideMessages(CoroutineScope(coroutineContext))
consumer.launchConsumer(CoroutineScope(coroutineContext), messageChannel)
}
override fun stop() {
supervisorJob.cancel()
}
}