Parallel processing with Worker Pools in Kotlin

Worker pools

Recently while doing a project with Go I came across Worker Pools on GoByExample to do parallel processing. I didn't find many resources for implementing Worker Pools in Kotlin, but it seemed a good idea for my current Spring Boot + Kotlin application.

Kotlin

Kotlin uses coroutines for concurrency which are fairly similar to goroutines.

Coroutines use structured concurrency to delimit the lifetime of each coroutine to a certain scope.

To be able to create a worker group we need to create a coroutine scope that is persistent over the lifetime of our application. We achieve this behavior with the SupervisorJob() context.

private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

We then create a buffered channel as a queue for our image data and the URL where we want to upload it.

val channel = Channel<Pair<String, ByteArray>>(10000)

I'm using the Spring @PostConstruct annotation to create the worker group and listen to the channel for new data. Each time an item is in the queue we launch the upload function, if no item is in the queue the function is suspended.

@PostConstruct
    fun createWorkerGroup() {
        coroutineScope.launch {
            for (x in 1..5) {
                launch {
                    println("Create Worker $x")
                    while (true) {
                        uploadImage(channel.receive())
                    }
                }
            }
        }
    }

Finally, we can send our data to our channel inside a runBlocking coroutine scope:

runBlocking {
  uploadService.channel.send(Pair(url, image.bytes))
}

WebDav

In my web application users upload images from their mobile phone to my webserver, afterwards I want to upload these pictures to a Hetzner Storage Box over webdav as a cheap alternative to an S3 object storage.

I use the sardine java webdav client library for its simplicity.

The usage is very straightforward, you configure the client with:

val sardine = SardineFactory.begin("webDavUsername", "webDavPassword")

The uploadImage Function is called every time a new image is sent over the channel we created earlier. In this function, we call sarding.put() to save the image file.

sardine.put("https://username.your-storagebox.de/foldername/imagename.jpg", ImageByteArray)

That is all we need to have a highly parallel File upload.