-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserialstream.go
80 lines (72 loc) · 2.4 KB
/
serialstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package event
import (
"reflect"
)
// SerialStream is an event stream that implements Publisher and Subscribable
// for publishing events and subscribing to them.
// Handler.HandleEvent method calls are done synchronously,
// meaning that Publish will only return after all event handlers have been called
// in the order they have been subscribed.
//
// Use Stream instead if the events should be published
// in parallel Go routines.
//
// SerialStream is threadsafe.
type SerialStream struct {
subscribable
}
// NewSerialStream returns a new Stream with optional RepublishHandler
// subscriptions to the passed subscribeTo Subscribable implementations.
func NewSerialStream(subscribeTo ...Subscribable) *SerialStream {
stream := new(SerialStream)
for _, source := range subscribeTo {
source.Subscribe(RepublishHandler(stream))
}
return stream
}
// Publish calls Handler.HandleEvent(event) for all subscribed event handlers.
// First all type specific handlers are called in the order
// they have been subscribed for the type of the event.
// Then all non type specific handlers are called in the order
// they have been subscribed.
//
// Use Stream instead if the events should be published
// asynchronously in parallel Go routines.
func (stream *SerialStream) Publish(event interface{}) error {
return stream.PublishAwait(event)
}
// PublishAsync publishes an event asynchronousely
// using one or more go routines.
// Exactly one error or nil will be written to
// the returned channel when the event has been
// handled by the subsribed handlers.
// The error can be a combination of multiple
// errors from multiple event handlers.
func (stream *SerialStream) PublishAsync(event interface{}) <-chan error {
errChan := make(chan error, 1)
go func() {
errChan <- stream.PublishAwait(event)
}()
return errChan
}
// PublishAwait publishes an event and waits
// for all handlers to return an error or nil.
// The error can be a combination of multiple
// errors from multiple event handlers.
func (stream *SerialStream) PublishAwait(event interface{}) error {
stream.handlerMtx.RLock()
defer stream.handlerMtx.RUnlock()
for _, handler := range stream.eventTypeHandlers[reflect.TypeOf(event)] {
err := safelyHandleEvent(handler, event)
if err != nil {
return err
}
}
for _, handler := range stream.anyEventHandlers {
err := safelyHandleEvent(handler, event)
if err != nil {
return err
}
}
return nil
}