All Articles

Kotlin Coroutines and Synchronisation: An Example of using Mutex

I’ve recently got the chance to work on a piece of logic in an application that handles syncing a local version of a shopping basket.

We will start by creating a queue of items to be synced, a caller adds to it if the local basket changes and BasketSynchroniser executes the sync logic at specific time intervals before clearing the queue.

class BasketSynchroniser {
	private val itemsQueue = mutableListOf<BasketItem>()
	
	fun addItems(updatedItems: List<BasketItem>) {
		itemsQueue.addAll(updatedItems)
	}
	
	fun startSync() {
		// TODO: Execute network request at time interval
		itemsQueue.clear()
	}
}

We can use TickerChannel to execute code at specific time intervals which means adding coroutines and concurrency to our simple BasketSynchroniser. Coroutines will also help us perform the network request so it’s really helpful to use it.

class BasketSynchroniser(
    private val timer: ReceiveChannel<Unit> =
        ticker(
            delayMillis = TimeUnit.SECONDS.toMillis(1),
            initialDelayMillis = 0
        )
) : CoroutineScope {
    override val coroutineContext = Dispatchers.IO
    private val itemsQueue = mutableListOf<BasketItem>()
	
    fun addItems(updatedItems: List<BasketItem>){
        itemsQueue.addAll(updatedItems)
    }

    fun startSync() {
        launch {
            for (event in timer) {
                // TODO: Execute network request
                itemsQueue.clear()
            }
        }
    }
}

At first glance, this code should “work” and it will compile but it actually has some issues. Since we are reading and clearing itemsQueue from a coroutine, a caller can call addItems and add new items in the queue in the middle of network requests. Even worse, an item can be added to itemsQueue after processing it but before we clear it which means that this item will be lost and never be synced.

To fix this problem we need to synchronise operations on itemsQueue. We can do that using Mutex. An initial approach starts with wrapping addItems() and startSync() with Mutex.withLock{} which will guarantee that the operations on itemsQueue are synchronised.

class BasketSynchroniser(
    ...
) : CoroutineScope {

    private val mutex = Mutex()
    ...

    suspend fun addItems(updatedItems: List<BasketItem>) {
        mutex.withLock { itemsQueue.addAll(updatedItems) }
    }

    fun startSync() {
        launch {
            for (event in timer) {
                mutex.withLock {
                    // TODO: Execute network request
                    itemsQueue.clear()
                }
            }
        }
    }
}

Mutex.withLock is a suspending call which makes fun addItems a suspending function as well. Every time a caller is adding items to the queue, they will have to launch a coroutine and this coroutine will suspend, or wait, until the lock can be owned and withLock block is executed. This means that holding a lock for a long time will block any caller from adding items to itemsQueue.

Initially we wrapped both the synchronisation network request and clearing the queue with Mutex.withLock{}. This works but IO operations can take a long time which means that we will be blocking addItems callers until network requests are complete. We can fix this by only using locking on the parts where it is needed. Those parts are

  • Accessing the items in the queue to process them.
  • Clearing the queue after processing is done.

The process & clear operation has to be an atomic one to avoid a caller adding more items in-between processing and clearing. One way to achieve is by first making a deep copy of the queue then clearing it before processing starts.

fun startSync() {
    launch {
        for (event in timer) {
            val queueCopy = mutex.withLock {
                val copy = itemsQueue.map { it.copy() }
                itemsQueue.clear()
                copy
            }
            // TODO: Execute network request
        }
    }
}

By doing copy-clear-process, we preserved the state of the queue for processing, avoided owning the lock for a long time and avoided the loss of any items between processing and clearing the queue.

The final version of BasketSynchroniser can now safely handle its own shared mutable state using locking to synchronise access to it and it will remain safe as long as itemsQueue is not published outside of BasketSynchroniser. You can read more about shared mutable state and concurrency on the official coroutines docs.