Basic Concepts
Deferred
- Single future result (like Promise)
- Call
await()
to get the value - Best for one-time async operations
Channel
- Stream of values between coroutines
- Use
send()
andreceive()
for communication - Perfect for continuous data flow
Channel Types & Capacity
// Unlimited buffer
val channel = Channel<Int>(Channel.UNLIMITED)
// No buffer (suspends sender until receiver collects)
val channel = Channel<Int>(Channel.RENDEZVOUS)
// Conflated (keeps only latest value)
val channel = Channel<Int>(Channel.CONFLATED)
// Fixed buffer size
val channel = Channel<Int>(5)
Channel Lifecycle
// Create a channel
val channel = Channel<Int>()
// Send values
launch {
try {
for (i in 1..5) channel.send(i)
} finally {
channel.close() // Important!
}
}
// Receive until closed
for (value in channel) {
println(value)
}
// Alternative: channel.consumeEach { println(it) }
Complete Pipeline Example
// Stage 1: Generate numbers
fun CoroutineScope.produceNumbers() = produce<Int> {
for (i in 1..10) {
send(i)
delay(100)
}
}
// Stage 2: Square the numbers
fun CoroutineScope.squareNumbers(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
// Use the pipeline
val numbers = produceNumbers()
val squares = squareNumbers(numbers)
squares.consumeEach { println(it) }
Prime Number Sieve Example
// Generate sequential numbers
fun CoroutineScope.generateNumbers() = produce<Int> {
var x = 2
while (true) send(x++)
}
// Filter out multiples of a prime
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
// Build sieve of prime numbers
fun CoroutineScope.findPrimes() = produce<Int> {
val numbers = generateNumbers()
while (true) {
val prime = numbers.receive()
send(prime)
val filtered = filter(numbers, prime)
numbers = filtered
}
}
Error Handling
try {
channel.send(value) // Might throw if channel is closed
} catch (e: ClosedSendChannelException) {
// Handle channel closure
}
try {
val value = channel.receive() // Might throw if empty & closed
} catch (e: ClosedReceiveChannelException) {
// Handle channel closure
}
Backpressure & Buffering
- Unbuffered channels suspend senders until receivers collect
- Buffered channels store values up to capacity
- Use buffering to handle speed mismatches:
// Fast producer, slow consumer
val channel = Channel<Int>(50) // Buffer 50 values
Working with Multiple Channels
// Select from multiple channels
select<Unit> {
channel1.onReceive { value ->
println("Channel 1 received: $value")
}
channel2.onReceive { value ->
println("Channel 2 received: $value")
}
}
BroadcastChannel
// One sender, multiple receivers
val broadcast = BroadcastChannel<Int>(10)
// Send to all subscribers
broadcast.send(42)
// Subscribe to the broadcast
val subscription1 = broadcast.openSubscription()
val subscription2 = broadcast.openSubscription()
// Each subscription receives independently
launch { subscription1.consumeEach { println("Sub1: $it") } }
launch { subscription2.consumeEach { println("Sub2: $it") } }
Structured Concurrency with Channels
coroutineScope {
// Producer in child scope
val channel = produce {
for (i in 1..5) send(i)
}
// Consumer in same scope
channel.consumeEach { println(it) }
// Channel automatically closed when scope completes
}
Practical Tips
- Close channels when done sending
- Use
consumeEach
for automatic handling of channel closure - Choose buffering strategy based on producer/consumer speeds
- For simple one-off results, prefer Deferred over Channel
- For complex processing pipelines, use channels to connect stages
- Use Fan-out for load distribution, Fan-in for aggregation
Performance Considerations
- Channels have overhead - don’t use for trivial operations
- Buffered channels consume memory - size appropriately
- Consider Flow for cold streams with less overhead
- Use
Channel.CONFLATED
when only the latest value matters
Remember: Channels are for communication, Deferred is for computation results!