From 0026a673079bc48c8bb8455399b38279c3345317 Mon Sep 17 00:00:00 2001 From: Jakub Sobon Date: Wed, 20 Feb 2019 23:36:17 -0500 Subject: [PATCH] The EDS now reports the number of processed events. Useful for synchronization in tests. --- event/event.go | 46 +++++++++++++-- event/event_test.go | 135 +++++++++++++++++++++++++++++++++----------- 2 files changed, 143 insertions(+), 38 deletions(-) diff --git a/event/event.go b/event/event.go index 58399e4..539abb1 100644 --- a/event/event.go +++ b/event/event.go @@ -47,6 +47,13 @@ type subscriber struct { // cancel when called terminates the goroutine that forwards events towards // this subscriber. cancel context.CancelFunc + + // processes is the number of events that were fully processed, i.e. + // delivered to the callback. + processed int + + // mu protects busy. + mu sync.Mutex } // newSubscriber creates a new event subscriber. @@ -69,13 +76,24 @@ func newSubscriber(filter []terminalapi.Event, cb Callback) *subscriber { return s } +// callback sends the event to the callback. +func (s *subscriber) callback(ev terminalapi.Event) { + s.cb(ev) + + func() { + s.mu.Lock() + defer s.mu.Unlock() + s.processed++ + }() +} + // run periodically forwards events towards the subscriber. // Terminates when the context expires. func (s *subscriber) run(ctx context.Context) { for { ev := s.queue.Pull(ctx) if ev != nil { - s.cb(ev) + s.callback(ev) } select { @@ -98,6 +116,13 @@ func (s *subscriber) event(ev terminalapi.Event) { } } +// processedEvents returns the number of events processed by this subscriber. +func (s *subscriber) processedEvents() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.processed +} + // stop stops the event subscriber. func (s *subscriber) stop() { s.cancel() @@ -123,7 +148,7 @@ type DistributionSystem struct { nextID int // mu protects the distribution system. - mu sync.RWMutex + mu sync.Mutex } // NewDistributionSystem creates a new event distribution system. @@ -136,8 +161,8 @@ func NewDistributionSystem() *DistributionSystem { // Event should be called with events coming from the terminal. // The distribution system will distribute these to all the subscribers. func (eds *DistributionSystem) Event(ev terminalapi.Event) { - eds.mu.RLock() - defer eds.mu.RUnlock() + eds.mu.Lock() + defer eds.mu.Unlock() for _, sub := range eds.subscribers { sub.event(ev) @@ -170,3 +195,16 @@ func (eds *DistributionSystem) Subscribe(filter []terminalapi.Event, cb Callback delete(eds.subscribers, id) } } + +// Processed returns the number of events that were fully processed, i.e. +// delivered to all the subscribers and their callbacks returned. +func (eds *DistributionSystem) Processed() int { + eds.mu.Lock() + defer eds.mu.Unlock() + + var res int + for _, sub := range eds.subscribers { + res += sub.processedEvents() + } + return res +} diff --git a/event/event_test.go b/event/event_test.go index 1f6e47a..2e68afb 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -15,7 +15,7 @@ package event import ( - "context" + "errors" "fmt" "image" "sync" @@ -23,6 +23,7 @@ import ( "time" "github.com/kylelemons/godebug/pretty" + "github.com/mum4k/termdash/event/testevent" "github.com/mum4k/termdash/keyboard" "github.com/mum4k/termdash/terminalapi" ) @@ -82,34 +83,6 @@ func (r *receiver) getEvents() map[terminalapi.Event]bool { return res } -// waitFor waits until the receiver receives the specified number of events or -// the timeout. -// Returns the received events in an unspecified order. -func (r *receiver) waitFor(want int, timeout time.Duration) (map[terminalapi.Event]bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - for { - tick := time.NewTimer(5 * time.Millisecond) - select { - case <-tick.C: - ev := r.getEvents() - - switch got := len(ev); { - case got > want: - return nil, fmt.Errorf("got %d events %v, want %d", got, ev, want) - - case got == want: - return ev, nil - } - - case <-ctx.Done(): - ev := r.getEvents() - return nil, fmt.Errorf("while waiting for events, got %d so far: %v, want %d, err: %v", len(ev), ev, want, ctx.Err()) - } - } -} - // subscriberCase holds test case specifics for one subscriber. type subscriberCase struct { // filter is the subscribers filter. @@ -121,7 +94,7 @@ type subscriberCase struct { // want are the expected events that should be delivered to this subscriber. want map[terminalapi.Event]bool - // wantErr asserts whether we want an error from waitFor. + // wantErr asserts whether we want an error from testevent.WaitFor. wantErr bool } @@ -291,18 +264,112 @@ func TestDistributionSystem(t *testing.T) { } for i, sc := range tc.subCase { - got, err := sc.rec.waitFor(len(sc.want), 5*time.Second) + gotEv := map[terminalapi.Event]bool{} + err := testevent.WaitFor(5*time.Second, func() error { + ev := sc.rec.getEvents() + want := len(sc.want) + switch got := len(ev); { + case got == want: + gotEv = ev + return nil + + default: + return fmt.Errorf("got %d events %v, want %d", got, ev, want) + } + }) if (err != nil) != sc.wantErr { - t.Errorf("sc.rec.waitFor[%d] => unexpected error: %v, wantErr:%v", i, err, sc.wantErr) + t.Errorf("testevent.WaitFor subscriber[%d] => unexpected error: %v, wantErr: %v", i, err, sc.wantErr) } if err != nil { continue } - if diff := pretty.Compare(sc.want, got); diff != "" { - t.Errorf("sc.rec.waitFor[%d] => unexpected diff (-want, +got):\n%s", i, diff) + if diff := pretty.Compare(sc.want, gotEv); diff != "" { + t.Errorf("testevent.WaitFor subscriber[%d] => unexpected diff (-want, +got):\n%s", i, diff) } } }) } } + +func TestProcessed(t *testing.T) { + tests := []struct { + desc string + // events will be sent down the distribution system. + events []terminalapi.Event + + // subCase are the event subscribers and their expectations. + subCase []*subscriberCase + + want int + }{ + { + desc: "zero without events", + want: 0, + }, + { + desc: "zero without subscribers", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + }, + want: 0, + }, + { + desc: "zero when a receiver blocks", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + }, + subCase: []*subscriberCase{ + { + filter: []terminalapi.Event{ + &terminalapi.Keyboard{}, + }, + rec: newReceiver(receiverModeBlock), + }, + }, + want: 0, + }, + { + desc: "counts processed events", + events: []terminalapi.Event{ + &terminalapi.Keyboard{Key: keyboard.KeyEnter}, + }, + subCase: []*subscriberCase{ + { + filter: []terminalapi.Event{ + &terminalapi.Keyboard{}, + }, + rec: newReceiver(receiverModeReceive), + }, + }, + want: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + eds := NewDistributionSystem() + for _, sc := range tc.subCase { + stop := eds.Subscribe(sc.filter, sc.rec.receive) + defer stop() + } + + for _, ev := range tc.events { + eds.Event(ev) + } + + for _, sc := range tc.subCase { + testevent.WaitFor(5*time.Second, func() error { + if len(sc.rec.getEvents()) > 0 { + return nil + } + return errors.New("the receiver got no events") + }) + } + + if got := eds.Processed(); got != tc.want { + t.Errorf("Processed => %v, want %d", got, tc.want) + } + }) + } +}