1
0
mirror of https://github.com/mum4k/termdash.git synced 2025-04-27 13:48:49 +08:00

The EDS now reports the number of processed events.

Useful for synchronization in tests.
This commit is contained in:
Jakub Sobon 2019-02-20 23:36:17 -05:00
parent d100f6fc24
commit 0026a67307
No known key found for this signature in database
GPG Key ID: F2451A77FB05D3B7
2 changed files with 143 additions and 38 deletions

View File

@ -47,6 +47,13 @@ type subscriber struct {
// cancel when called terminates the goroutine that forwards events towards // cancel when called terminates the goroutine that forwards events towards
// this subscriber. // this subscriber.
cancel context.CancelFunc cancel context.CancelFunc
// processes is the number of events that were fully processed, i.e.
// delivered to the callback.
processed int
// mu protects busy.
mu sync.Mutex
} }
// newSubscriber creates a new event subscriber. // newSubscriber creates a new event subscriber.
@ -69,13 +76,24 @@ func newSubscriber(filter []terminalapi.Event, cb Callback) *subscriber {
return s return s
} }
// callback sends the event to the callback.
func (s *subscriber) callback(ev terminalapi.Event) {
s.cb(ev)
func() {
s.mu.Lock()
defer s.mu.Unlock()
s.processed++
}()
}
// run periodically forwards events towards the subscriber. // run periodically forwards events towards the subscriber.
// Terminates when the context expires. // Terminates when the context expires.
func (s *subscriber) run(ctx context.Context) { func (s *subscriber) run(ctx context.Context) {
for { for {
ev := s.queue.Pull(ctx) ev := s.queue.Pull(ctx)
if ev != nil { if ev != nil {
s.cb(ev) s.callback(ev)
} }
select { select {
@ -98,6 +116,13 @@ func (s *subscriber) event(ev terminalapi.Event) {
} }
} }
// processedEvents returns the number of events processed by this subscriber.
func (s *subscriber) processedEvents() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.processed
}
// stop stops the event subscriber. // stop stops the event subscriber.
func (s *subscriber) stop() { func (s *subscriber) stop() {
s.cancel() s.cancel()
@ -123,7 +148,7 @@ type DistributionSystem struct {
nextID int nextID int
// mu protects the distribution system. // mu protects the distribution system.
mu sync.RWMutex mu sync.Mutex
} }
// NewDistributionSystem creates a new event distribution system. // NewDistributionSystem creates a new event distribution system.
@ -136,8 +161,8 @@ func NewDistributionSystem() *DistributionSystem {
// Event should be called with events coming from the terminal. // Event should be called with events coming from the terminal.
// The distribution system will distribute these to all the subscribers. // The distribution system will distribute these to all the subscribers.
func (eds *DistributionSystem) Event(ev terminalapi.Event) { func (eds *DistributionSystem) Event(ev terminalapi.Event) {
eds.mu.RLock() eds.mu.Lock()
defer eds.mu.RUnlock() defer eds.mu.Unlock()
for _, sub := range eds.subscribers { for _, sub := range eds.subscribers {
sub.event(ev) sub.event(ev)
@ -170,3 +195,16 @@ func (eds *DistributionSystem) Subscribe(filter []terminalapi.Event, cb Callback
delete(eds.subscribers, id) delete(eds.subscribers, id)
} }
} }
// Processed returns the number of events that were fully processed, i.e.
// delivered to all the subscribers and their callbacks returned.
func (eds *DistributionSystem) Processed() int {
eds.mu.Lock()
defer eds.mu.Unlock()
var res int
for _, sub := range eds.subscribers {
res += sub.processedEvents()
}
return res
}

View File

@ -15,7 +15,7 @@
package event package event
import ( import (
"context" "errors"
"fmt" "fmt"
"image" "image"
"sync" "sync"
@ -23,6 +23,7 @@ import (
"time" "time"
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/mum4k/termdash/event/testevent"
"github.com/mum4k/termdash/keyboard" "github.com/mum4k/termdash/keyboard"
"github.com/mum4k/termdash/terminalapi" "github.com/mum4k/termdash/terminalapi"
) )
@ -82,34 +83,6 @@ func (r *receiver) getEvents() map[terminalapi.Event]bool {
return res return res
} }
// waitFor waits until the receiver receives the specified number of events or
// the timeout.
// Returns the received events in an unspecified order.
func (r *receiver) waitFor(want int, timeout time.Duration) (map[terminalapi.Event]bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
tick := time.NewTimer(5 * time.Millisecond)
select {
case <-tick.C:
ev := r.getEvents()
switch got := len(ev); {
case got > want:
return nil, fmt.Errorf("got %d events %v, want %d", got, ev, want)
case got == want:
return ev, nil
}
case <-ctx.Done():
ev := r.getEvents()
return nil, fmt.Errorf("while waiting for events, got %d so far: %v, want %d, err: %v", len(ev), ev, want, ctx.Err())
}
}
}
// subscriberCase holds test case specifics for one subscriber. // subscriberCase holds test case specifics for one subscriber.
type subscriberCase struct { type subscriberCase struct {
// filter is the subscribers filter. // filter is the subscribers filter.
@ -121,7 +94,7 @@ type subscriberCase struct {
// want are the expected events that should be delivered to this subscriber. // want are the expected events that should be delivered to this subscriber.
want map[terminalapi.Event]bool want map[terminalapi.Event]bool
// wantErr asserts whether we want an error from waitFor. // wantErr asserts whether we want an error from testevent.WaitFor.
wantErr bool wantErr bool
} }
@ -291,18 +264,112 @@ func TestDistributionSystem(t *testing.T) {
} }
for i, sc := range tc.subCase { for i, sc := range tc.subCase {
got, err := sc.rec.waitFor(len(sc.want), 5*time.Second) gotEv := map[terminalapi.Event]bool{}
err := testevent.WaitFor(5*time.Second, func() error {
ev := sc.rec.getEvents()
want := len(sc.want)
switch got := len(ev); {
case got == want:
gotEv = ev
return nil
default:
return fmt.Errorf("got %d events %v, want %d", got, ev, want)
}
})
if (err != nil) != sc.wantErr { if (err != nil) != sc.wantErr {
t.Errorf("sc.rec.waitFor[%d] => unexpected error: %v, wantErr:%v", i, err, sc.wantErr) t.Errorf("testevent.WaitFor subscriber[%d] => unexpected error: %v, wantErr: %v", i, err, sc.wantErr)
} }
if err != nil { if err != nil {
continue continue
} }
if diff := pretty.Compare(sc.want, got); diff != "" { if diff := pretty.Compare(sc.want, gotEv); diff != "" {
t.Errorf("sc.rec.waitFor[%d] => unexpected diff (-want, +got):\n%s", i, diff) t.Errorf("testevent.WaitFor subscriber[%d] => unexpected diff (-want, +got):\n%s", i, diff)
} }
} }
}) })
} }
} }
func TestProcessed(t *testing.T) {
tests := []struct {
desc string
// events will be sent down the distribution system.
events []terminalapi.Event
// subCase are the event subscribers and their expectations.
subCase []*subscriberCase
want int
}{
{
desc: "zero without events",
want: 0,
},
{
desc: "zero without subscribers",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
},
want: 0,
},
{
desc: "zero when a receiver blocks",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
},
subCase: []*subscriberCase{
{
filter: []terminalapi.Event{
&terminalapi.Keyboard{},
},
rec: newReceiver(receiverModeBlock),
},
},
want: 0,
},
{
desc: "counts processed events",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
},
subCase: []*subscriberCase{
{
filter: []terminalapi.Event{
&terminalapi.Keyboard{},
},
rec: newReceiver(receiverModeReceive),
},
},
want: 1,
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
eds := NewDistributionSystem()
for _, sc := range tc.subCase {
stop := eds.Subscribe(sc.filter, sc.rec.receive)
defer stop()
}
for _, ev := range tc.events {
eds.Event(ev)
}
for _, sc := range tc.subCase {
testevent.WaitFor(5*time.Second, func() error {
if len(sc.rec.getEvents()) > 0 {
return nil
}
return errors.New("the receiver got no events")
})
}
if got := eds.Processed(); got != tc.want {
t.Errorf("Processed => %v, want %d", got, tc.want)
}
})
}
}