From 2ffbe5be9403ff32422092925579dd8ccd51699a Mon Sep 17 00:00:00 2001 From: Jakub Sobon Date: Tue, 19 Feb 2019 00:12:59 -0500 Subject: [PATCH] An event distribution system. Allows multiple subscribers, supports filtering and doesn't block on slow subscribers. --- event/event.go | 175 +++++++++++++++++++++++++++++++ event/event_test.go | 245 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 420 insertions(+) create mode 100644 event/event.go create mode 100644 event/event_test.go diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..bd49afd --- /dev/null +++ b/event/event.go @@ -0,0 +1,175 @@ +// Package event provides a non-blocking event distribution and subscription +// system. +package event + +import ( + "context" + "reflect" + "sync" + + "github.com/mum4k/termdash/event/eventqueue" + "github.com/mum4k/termdash/terminalapi" +) + +// Callback is a function provided by an event subscriber. +// It gets called with each event that passed the subscription filter. +// Implementations must be thread-safe, events come from a separate goroutine. +// Implementation should be light-weight, otherwise a slow-processing +// subscriber can build a long tail of events. +type Callback func(terminalapi.Event) + +// subscriber represents a single subscriber. +type subscriber struct { + // cb is the callback the subscriber receives events on. + cb Callback + + // filter filters events towards the subscriber. + // An empty filter receives all events. + filter map[reflect.Type]bool + + // queue is a queue of events towards the subscriber. + queue *eventqueue.Unbound + + // cancel when called terminates the goroutine that forwards events towards + // this subscriber. + cancel context.CancelFunc +} + +// newSubscriber creates a new event subscriber. +func newSubscriber(filter []terminalapi.Event, cb Callback) *subscriber { + f := map[reflect.Type]bool{} + for _, ev := range filter { + f[reflect.TypeOf(ev)] = true + } + + ctx, cancel := context.WithCancel(context.Background()) + s := &subscriber{ + cb: cb, + filter: f, + queue: eventqueue.New(), + cancel: cancel, + } + + // Terminates when stop() is called. + go s.run(ctx) + return s +} + +// run periodically forwards events towards the subscriber. +// Terminates when the context expires. +func (s *subscriber) run(ctx context.Context) { + for { + e, err := s.queue.Pull(ctx) + if err != nil { + e = terminalapi.NewErrorf("failed to pull event off the queue: %v", err) + } + + switch ev := e.(type) { + case *terminalapi.Error: + // Don't forward the error if the context is closed. + select { + case <-ctx.Done(): + default: + s.cb(ev) + } + + default: + s.cb(ev) + } + + select { + case <-ctx.Done(): + return + default: + } + } +} + +// event forwards an event to the subscriber. +func (s *subscriber) event(ev terminalapi.Event) { + if len(s.filter) == 0 { + s.queue.Push(ev) + } + + var errT *terminalapi.Error + t := reflect.TypeOf(ev) + if t == reflect.TypeOf(errT) || s.filter[t] { + s.queue.Push(ev) + } +} + +// stop stops the event subscriber. +func (s *subscriber) stop() { + s.cancel() + s.queue.Close() +} + +// DistributionSystem distributes events to subscribers. +// +// Subscribers can request filtering of events they get based on event type or +// subscribe to all events. +// +// The distribution system maintains a queue towards each subscriber, making +// sure that a single slow subscriber only slows itself down, rather than the +// entire application. +// +// This object is thread-safe. +type DistributionSystem struct { + // subscribers subscribe to events. + // maps subscriber id to subscriber. + subscribers map[int]*subscriber + + // nextID is id for the next subscriber. + nextID int + + // mu protects the distribution system. + mu sync.RWMutex +} + +// NewDistributionSystem creates a new event distribution system. +func NewDistributionSystem() *DistributionSystem { + return &DistributionSystem{ + subscribers: map[int]*subscriber{}, + } +} + +// Event should be called with events coming from the terminal. +// The distribution system will distribute these to all the subscribers. +func (eds *DistributionSystem) Event(ev terminalapi.Event) { + eds.mu.RLock() + defer eds.mu.RUnlock() + + for _, sub := range eds.subscribers { + sub.event(ev) + } +} + +// StopFunc when called unsubscribes the subscriber from all events and +// releases resources tied to the subscriber. +type StopFunc func() + +// Subscribe subscribes to events according to the filter. +// An empty filter indicates that the subscriber wishes to receive events of +// all kinds. If the filter is non-empty, only events of the provided type will +// be sent to the subscriber. +// Returns a function that allows the subscriber to unsubscribe. +// +// Subscribers cannot filter out event type terminalapi.Error which is used to +// forward errors to the Callback. +func (eds *DistributionSystem) Subscribe(filter []terminalapi.Event, cb Callback) StopFunc { + eds.mu.Lock() + defer eds.mu.Unlock() + + id := eds.nextID + eds.nextID++ + sub := newSubscriber(filter, cb) + eds.subscribers[id] = sub + + return func() { + eds.mu.Lock() + defer eds.mu.Unlock() + + sub.stop() + delete(eds.subscribers, id) + } +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 0000000..ab17f98 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,245 @@ +package event + +import ( + "context" + "fmt" + "image" + "sync" + "testing" + "time" + + "github.com/kylelemons/godebug/pretty" + "github.com/mum4k/termdash/keyboard" + "github.com/mum4k/termdash/terminalapi" +) + +// receiver receives events from the distribution system. +type receiver struct { + mu sync.Mutex + + // events are the received events. + events []terminalapi.Event +} + +// newReceiver returns a new event receiver. +func newReceiver() *receiver { + return &receiver{} +} + +// receive receives an event. +func (r *receiver) receive(ev terminalapi.Event) { + r.mu.Lock() + defer r.mu.Unlock() + + r.events = append(r.events, ev) +} + +// getEvents returns the received events. +func (r *receiver) getEvents() map[terminalapi.Event]bool { + r.mu.Lock() + defer r.mu.Unlock() + + res := map[terminalapi.Event]bool{} + for _, ev := range r.events { + res[ev] = true + } + return res +} + +// waitFor waits until the receiver receives the specified number of events or +// the timeout. +// Returns the received events in an unspecified order. +func (r *receiver) waitFor(want int, timeout time.Duration) (map[terminalapi.Event]bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + tick := time.NewTimer(5 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + ev := r.getEvents() + + switch got := len(ev); { + case got > want: + return nil, fmt.Errorf("got %d events %v, want %d", got, ev, want) + + case got == want: + return ev, nil + } + + case <-ctx.Done(): + ev := r.getEvents() + return nil, fmt.Errorf("while waiting for events, got %d so far: %v, err: %v", len(ev), ev, ctx.Err()) + } + } +} + +// subscriberCase holds test case specifics for one subscriber. +type subscriberCase struct { + // filter is the subscribers filter. + filter []terminalapi.Event + + // rec receives the events. + rec *receiver + + // want are the expected events that should be delivered to this subscriber. + want map[terminalapi.Event]bool + + // wantErr asserts whether we want an error from waitFor. + wantErr bool +} + +func TestDistributionSystem(t *testing.T) { + tests := []struct { + desc string + // events will be sent down the distribution system. + events []terminalapi.Event + + // subCase are the event subscribers and their expectations. + subCase []*subscriberCase + }{ + { + desc: "no events and no subscribers", + }, + { + desc: "events and no subscribers", + events: []terminalapi.Event{ + &terminalapi.Mouse{}, + &terminalapi.Keyboard{}, + }, + }, + { + desc: "single subscriber, wants all events and gets them", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + &terminalapi.Mouse{Position: image.Point{1, 1}}, + &terminalapi.Resize{Size: image.Point{2, 2}}, + terminalapi.NewError("error"), + }, + subCase: []*subscriberCase{ + { + filter: nil, + rec: newReceiver(), + want: map[terminalapi.Event]bool{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}: true, + &terminalapi.Mouse{Position: image.Point{1, 1}}: true, + &terminalapi.Resize{Size: image.Point{2, 2}}: true, + terminalapi.NewError("error"): true, + }, + }, + }, + }, + { + desc: "single subscriber, filters events", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + &terminalapi.Mouse{Position: image.Point{1, 1}}, + &terminalapi.Resize{Size: image.Point{2, 2}}, + }, + subCase: []*subscriberCase{ + { + filter: []terminalapi.Event{ + &terminalapi.Keyboard{}, + &terminalapi.Mouse{}, + }, + rec: newReceiver(), + want: map[terminalapi.Event]bool{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}: true, + &terminalapi.Mouse{Position: image.Point{1, 1}}: true, + }, + }, + }, + }, + { + desc: "single subscriber, errors are always received", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + &terminalapi.Mouse{Position: image.Point{1, 1}}, + &terminalapi.Resize{Size: image.Point{2, 2}}, + terminalapi.NewError("error"), + }, + subCase: []*subscriberCase{ + { + filter: []terminalapi.Event{ + &terminalapi.Keyboard{}, + &terminalapi.Mouse{}, + }, + rec: newReceiver(), + want: map[terminalapi.Event]bool{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}: true, + &terminalapi.Mouse{Position: image.Point{1, 1}}: true, + terminalapi.NewError("error"): true, + }, + }, + }, + }, + { + desc: "multiple subscribers and events", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + &terminalapi.Keyboard{Key: keyboard.KeyEsc}, + &terminalapi.Mouse{Position: image.Point{0, 0}}, + &terminalapi.Mouse{Position: image.Point{1, 1}}, + &terminalapi.Resize{Size: image.Point{1, 1}}, + &terminalapi.Resize{Size: image.Point{2, 2}}, + terminalapi.NewError("error1"), + terminalapi.NewError("error2"), + }, + subCase: []*subscriberCase{ + { + filter: []terminalapi.Event{ + &terminalapi.Keyboard{}, + }, + rec: newReceiver(), + want: map[terminalapi.Event]bool{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}: true, + &terminalapi.Keyboard{Key: keyboard.KeyEsc}: true, + terminalapi.NewError("error1"): true, + terminalapi.NewError("error2"): true, + }, + }, + { + filter: []terminalapi.Event{ + &terminalapi.Mouse{}, + &terminalapi.Resize{}, + }, + rec: newReceiver(), + want: map[terminalapi.Event]bool{ + &terminalapi.Mouse{Position: image.Point{0, 0}}: true, + &terminalapi.Mouse{Position: image.Point{1, 1}}: true, + &terminalapi.Resize{Size: image.Point{1, 1}}: true, + &terminalapi.Resize{Size: image.Point{2, 2}}: true, + terminalapi.NewError("error1"): true, + terminalapi.NewError("error2"): true, + }, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + eds := NewDistributionSystem() + for _, sc := range tc.subCase { + stop := eds.Subscribe(sc.filter, sc.rec.receive) + defer stop() + } + + for _, ev := range tc.events { + eds.Event(ev) + } + + for i, sc := range tc.subCase { + got, err := sc.rec.waitFor(len(sc.want), 5*time.Second) + if err != nil { + t.Fatalf("sc.rec.waitFor[%d] => unexpected error: %v", i, err) + } + + if diff := pretty.Compare(sc.want, got); diff != "" { + t.Errorf("sc.rec.waitFor[%d] => unexpected diff (-want, +got):\n%s", i, diff) + } + } + }) + } +}