Happy Eyeballs
Happy Eyeballs is an RFC defined by the IETF and is (according to Wikipedia)
an algorithm published by the IETF that makes dual-stack applications (those that understand both IPv4 and IPv6) more responsive to users by attempting to connect using both IPv4 and IPv6 at the same time (preferring IPv6 ...
Having seen this post by Nathaniel J Smith and then watched his video where he codes up Happy Eyeballs I thought I'd give it a go in Kotlin using coroutines.
Nathaniel's blog post is his introduction to Trio which is an async library for Python and is all about structured concurrency. It is an excellent introduction to structured concurrency and lays out the benefits of this approach when building/using/writing asynchronous APIs.
Coroutines in Kotlin also follow the principle of structured concurrency and so seem like a natural fit for this algorithm.
The algorithm for Happy Eyeballs is roughly this:
- Sort the IP addresses of the endpoint you are trying to connect to
- Try and connect to the first address in the list
- If this fails or it's taking too long then try the next address in the list
- Keep trying until one address connects or the all fail
- Cancel all waiting attempts when one succeeds
'Taking too long' means waiting for about 250ms although the RFC does talk about more nuanced implementations.
I'm not going to implement the full RFC as that would involve sorting address according to RFC6724 which I haven't done. This is Kotlin, which runs on the JVM and so I'm using Java APIs to get at some of the data. I'm relying on those APIs to return resolve a domain name into a list of IP addresses and then connecting to those addresses in turn.
The RFC also says "Next, the client SHOULD modify the ordered list to interleave address families " this means that we should try an IPv6 address first, then IPV4, then IPV6 until the address list is exhausted, that we can do.
So, the code. The first thing to think about is that we can't really use the Socket
class from Java to create the connections as that blocks and there's no way to cancel its connection attempt. Instead we make use of Java's NIO support and use the java.nio.channels.AsynchronousSocketChannel
class. I have to wrap this up to make sure I can integrate it into the coroutine infrastructure and make sure that I can cancel it from within a coroutine. To do that I create a TcpSocket
class that acts as a wrapper around the AsynchronousSocketChannel
The code for that looks like this:
class TcpSocket(private val socket: AsynchronousSocketChannel) {
suspend fun connect(addr: InetSocketAddress): Void? {
return socket.asyncConnect(addr)
}
...
suspend fun AsynchronousSocketChannel.asyncConnect(address: InetSocketAddress): Void? {
return suspendCoroutine { continuation ->
this.connect(address, continuation, ConnectCompletionHandler)
}
}
...
object ConnectCompletionHandler : CompletionHandler<Void?, Continuation<Void?>> {
override fun completed(result: Void?, attachment: Continuation<Void?>) {
attachment.resume(result)
}
override fun failed(exc: Throwable, attachment: Continuation<Void?>) {
attachment.resumeWithException(exc)
}
}
}
Notice that I provide a connect
function that wraps the AsynchronousSocketChannel
's asyncConnect
. These are suspend functions that, when completed, restart the coroutine.
That's the infrastructure part of the code.
For the actual algorithm I followed Nathaniel's code as closely as I could.
There's a class called HappyEyeballs
with a data member named winningSocket
that will hold the socket that's connected or null
.
The openTcpSocket
method gathers the IP addresses and then fires off the first attempt. It first calls getSortedAddresses
passing in the hostname, that code looks like this:
private fun getSortedAddress(hostname: String): List<InetAddress> {
val addresses = InetAddress.getAllByName(hostname).toList()
val inet4Addresses = mutableListOf<Inet4Address>()
val inet6Addresses = mutableListOf<Inet6Address>()
addresses.forEach {
when (it) {
is Inet4Address -> {
inet4Addresses.add(it)
}
is Inet6Address -> {
inet6Addresses.add(it)
}
else -> {}
}
}
val sortedAddress = inet6Addresses.zip(inet4Addresses) { i6, i4 -> listOf(i6, i4) }.flatten()
return sortedAddress
}
I grab all the address associated with this host, separate out the IPv4 and IPv6 addresses and then zip them together so I get IPv6 followed by IPv6. There's some other logic in the actual code to make sure the IPv4 and IPv6 lists are the same length but essentially that's it.
Once we have the IP addresses we can think about the actual algorithm. Again taking this from Nathaniel's talk we have to do the following
- Run an attempt to connect
- If it's not the first attempt then wait for the previous attempt to fail or for the timeout to happen, if either of these is true start then try to connect
- In the current attempt try to connect asynchronously (so that we can cancel if necessary)
- If an attempt connects then cancel all the other attempts
The reason coroutines lend themselves to this sort of work is because of structured concurrency, it's easy to cancel everything when any attempt succeeds.
The attempt
function signature looks like this:
private suspend fun attempt(
scope: CoroutineScope,
which: Int,
maxWaitTime: Long,
addresses: List,
port: Int,
channels: List<Channel>
) {
We pass in the list of addresses and the attempt number (zero, up to the size of the list - 1) and the maximum time to wait before we start this attempt.
We also pass in the CoroutineScope
that we are running inside. We use this to do two things, we want to launch
the next attempt asynchronously, and, if any attempt succeeds we can cancel the scope and cancel all the other attempts.
I use Channel
s to communicate between attempts, so each attempt listens and sends on one channel. We communicate only when an attempt fails. To do this I pass around a list of channels, the same size as the attempts.
Attempt zero doesn't listen on a channel as it's the first attempt so there's no previous attempts that can fail.
Each subsequent attempt listens on the 'previous' channel, so attempt one listens on channel 0, attempt 2 listens on channel 1 etc.
If an attempt fails, maybe the socket cannot connect for some reason, it signals on its channel, so if attempt zero fails it signals on channel 0 which is being listened to by attempt 1, which means that attempt one then starts, if attempt 1 fails it signals on channel 1 which is being listened to by attempt 2. Each attempt only communicates via one channel to only one other attempt.
Once any attempt succeeds it sets the value of the socket held by the HappyEyeballs
class and cancels the scope so that everything else finishes. This is the beauty of structured concurrency and coroutine scopes, it's easy to manage the coroutines within the scope.
The code looks like this
private suspend fun attempt(
scope: CoroutineScope,
which: Int,
maxWaitTime: Long,
addresses: List<InetAddress>,
port: Int,
channels: List<Channel<Boolean>>
) {
// wait for previous to fail or timeout has expired
if (which > 0) {
val res = withTimeoutOrNull(maxWaitTime) {
channels[which - 1].receive()
}
if (res == null) {
println("Attempt $which timed-out")
} else {
println("Attempt $which signalled $res")
}
}
// start next attempt
if (which + 1 < channels.size) {
scope.launch { attempt(scope, which + 1, maxWaitTime, addresses, port, channels) }
}
// try connecting
try {
val tcpsocket = TcpSocket(AsynchronousSocketChannel.open())
tcpsocket.connect(InetSocketAddress(addresses[which], port))
lock.lock()
winningSocket = if (winningSocket == null) {
tcpsocket
} else {
tcpsocket.close()
null
}
lock.unlock()
} catch (e: Throwable) {
println("Try connecting failed on atttempt $which with exception $e")
channels[which].send(true)
channels[which].close()
}
if (winningSocket != null) {
println("On $which, socket is $winningSocket")
scope.cancel()
}
}
Notice the call to lock
and unlock
, we have to do this as we are updating shared state.
And is called like this:
suspend fun openTcpSocket(hostname: String, port: Int, maxWaitTime: Long = 250) {
val sortedAddress = getSortedAddress(hostname)
val channels = List(sortedAddress.size) { Channel<Boolean>() }
coroutineScope {
attempt(this, 0, maxWaitTime, sortedAddress, port, channels)
}
}
If I run this for debian.org
then I get this:
Try connecting failed on atttempt 0 with exception java.net.NoRouteToHostException: No route to host
Attempt 1 signalled true
On 1, socket is sun.nio.ch.UnixAsynchronousSocketChannelImpl[connected local=/192.168.86.24:54125 remote=debian.org/151.101.66.132:443]
The code for this is on github at https://github.com/kevinrjones/happyeyeballs