Events¶
UCloud supports event streams, an event stream provides a message pipe. Producers send messages into the pipe and
consumers read and process the messages. We support two types of event streams: EventStream
s and
BroadcastingStream
s. UCloud implements both types using Redis.
Ordinary Event Streams¶
An ordinary EventStream
provides a way for services to load-balance events between instances of a given micro-service.
As shown in the figure, the example system contains to micro-services A and B. Both of these micro-services are running
in three instances of the same code. Both micro-services have subscribed to a the same EventStream
. When the producer
starts producing messages, those messages are load-balanced across the individual instances of a micro-services. This
means that both service A and service B receives all the messages but the individual messages are load balanced between
the instances of a micro-service.
A consumer does not need to be live when the message is sent for it to be received. Instead, the messages are kept in a persistent list which lives for some time. This list is pruned every once in a while.
Example: Consuming a message
object JobEvents : EventStreamContainer() {
val events = stream<JobEvent>("app-kubernetes-job-events", { it.jobName })
}
suspend fun initializeConsumer() {
micro.eventStreamService.subscribe(JobEvents.events, EventConsumer.Immediate { ev ->
println("received $ev")
})
}
Example: Consuming a batch of messages
suspend fun initializeConsumer() {
micro.eventStreamService.subscribe(
JobEvents.events,
EventConsumer.Batched(maxLatency = 500, maxBatchSize = 1000) { batch ->
println("received $batch")
}
)
}
Example: Producing a message
val eventProducer = micro.eventStreamService.createProducer(ProjectEvents.events)
eventProducer.produce(ProjectEvent.Created("foobar"))
Broadcasting Streams¶
The BroadcastingStream
works differently from the EventStream
. In this case all consumers receive every message
which is produced.
Only the consumers which are live will receive messages.
Example: Broadcasting a message
val broadcastingStream = RedisBroadcastingStream(micro.redisConnectionManager)
broadcastingStream.broadcast(MyMessage(42), MyStreams.stream)
Example: Subscribing to a stream
broadcastStream.subscribe(CancelWSStream.events) { (id) ->
streams.remove(id)?.close()
}