1
0
mirror of https://github.com/mum4k/termdash.git synced 2025-04-27 13:48:49 +08:00

An event distribution system.

Allows multiple subscribers, supports filtering and doesn't block on
slow subscribers.
This commit is contained in:
Jakub Sobon 2019-02-19 00:12:59 -05:00
parent 772fdae7cc
commit 2ffbe5be94
No known key found for this signature in database
GPG Key ID: F2451A77FB05D3B7
2 changed files with 420 additions and 0 deletions

175
event/event.go Normal file
View File

@ -0,0 +1,175 @@
// Package event provides a non-blocking event distribution and subscription
// system.
package event
import (
"context"
"reflect"
"sync"
"github.com/mum4k/termdash/event/eventqueue"
"github.com/mum4k/termdash/terminalapi"
)
// Callback is a function provided by an event subscriber.
// It gets called with each event that passed the subscription filter.
// Implementations must be thread-safe, events come from a separate goroutine.
// Implementation should be light-weight, otherwise a slow-processing
// subscriber can build a long tail of events.
type Callback func(terminalapi.Event)
// subscriber represents a single subscriber.
type subscriber struct {
// cb is the callback the subscriber receives events on.
cb Callback
// filter filters events towards the subscriber.
// An empty filter receives all events.
filter map[reflect.Type]bool
// queue is a queue of events towards the subscriber.
queue *eventqueue.Unbound
// cancel when called terminates the goroutine that forwards events towards
// this subscriber.
cancel context.CancelFunc
}
// newSubscriber creates a new event subscriber.
func newSubscriber(filter []terminalapi.Event, cb Callback) *subscriber {
f := map[reflect.Type]bool{}
for _, ev := range filter {
f[reflect.TypeOf(ev)] = true
}
ctx, cancel := context.WithCancel(context.Background())
s := &subscriber{
cb: cb,
filter: f,
queue: eventqueue.New(),
cancel: cancel,
}
// Terminates when stop() is called.
go s.run(ctx)
return s
}
// run periodically forwards events towards the subscriber.
// Terminates when the context expires.
func (s *subscriber) run(ctx context.Context) {
for {
e, err := s.queue.Pull(ctx)
if err != nil {
e = terminalapi.NewErrorf("failed to pull event off the queue: %v", err)
}
switch ev := e.(type) {
case *terminalapi.Error:
// Don't forward the error if the context is closed.
select {
case <-ctx.Done():
default:
s.cb(ev)
}
default:
s.cb(ev)
}
select {
case <-ctx.Done():
return
default:
}
}
}
// event forwards an event to the subscriber.
func (s *subscriber) event(ev terminalapi.Event) {
if len(s.filter) == 0 {
s.queue.Push(ev)
}
var errT *terminalapi.Error
t := reflect.TypeOf(ev)
if t == reflect.TypeOf(errT) || s.filter[t] {
s.queue.Push(ev)
}
}
// stop stops the event subscriber.
func (s *subscriber) stop() {
s.cancel()
s.queue.Close()
}
// DistributionSystem distributes events to subscribers.
//
// Subscribers can request filtering of events they get based on event type or
// subscribe to all events.
//
// The distribution system maintains a queue towards each subscriber, making
// sure that a single slow subscriber only slows itself down, rather than the
// entire application.
//
// This object is thread-safe.
type DistributionSystem struct {
// subscribers subscribe to events.
// maps subscriber id to subscriber.
subscribers map[int]*subscriber
// nextID is id for the next subscriber.
nextID int
// mu protects the distribution system.
mu sync.RWMutex
}
// NewDistributionSystem creates a new event distribution system.
func NewDistributionSystem() *DistributionSystem {
return &DistributionSystem{
subscribers: map[int]*subscriber{},
}
}
// Event should be called with events coming from the terminal.
// The distribution system will distribute these to all the subscribers.
func (eds *DistributionSystem) Event(ev terminalapi.Event) {
eds.mu.RLock()
defer eds.mu.RUnlock()
for _, sub := range eds.subscribers {
sub.event(ev)
}
}
// StopFunc when called unsubscribes the subscriber from all events and
// releases resources tied to the subscriber.
type StopFunc func()
// Subscribe subscribes to events according to the filter.
// An empty filter indicates that the subscriber wishes to receive events of
// all kinds. If the filter is non-empty, only events of the provided type will
// be sent to the subscriber.
// Returns a function that allows the subscriber to unsubscribe.
//
// Subscribers cannot filter out event type terminalapi.Error which is used to
// forward errors to the Callback.
func (eds *DistributionSystem) Subscribe(filter []terminalapi.Event, cb Callback) StopFunc {
eds.mu.Lock()
defer eds.mu.Unlock()
id := eds.nextID
eds.nextID++
sub := newSubscriber(filter, cb)
eds.subscribers[id] = sub
return func() {
eds.mu.Lock()
defer eds.mu.Unlock()
sub.stop()
delete(eds.subscribers, id)
}
}

245
event/event_test.go Normal file
View File

@ -0,0 +1,245 @@
package event
import (
"context"
"fmt"
"image"
"sync"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
"github.com/mum4k/termdash/keyboard"
"github.com/mum4k/termdash/terminalapi"
)
// receiver receives events from the distribution system.
type receiver struct {
mu sync.Mutex
// events are the received events.
events []terminalapi.Event
}
// newReceiver returns a new event receiver.
func newReceiver() *receiver {
return &receiver{}
}
// receive receives an event.
func (r *receiver) receive(ev terminalapi.Event) {
r.mu.Lock()
defer r.mu.Unlock()
r.events = append(r.events, ev)
}
// getEvents returns the received events.
func (r *receiver) getEvents() map[terminalapi.Event]bool {
r.mu.Lock()
defer r.mu.Unlock()
res := map[terminalapi.Event]bool{}
for _, ev := range r.events {
res[ev] = true
}
return res
}
// waitFor waits until the receiver receives the specified number of events or
// the timeout.
// Returns the received events in an unspecified order.
func (r *receiver) waitFor(want int, timeout time.Duration) (map[terminalapi.Event]bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
tick := time.NewTimer(5 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
ev := r.getEvents()
switch got := len(ev); {
case got > want:
return nil, fmt.Errorf("got %d events %v, want %d", got, ev, want)
case got == want:
return ev, nil
}
case <-ctx.Done():
ev := r.getEvents()
return nil, fmt.Errorf("while waiting for events, got %d so far: %v, err: %v", len(ev), ev, ctx.Err())
}
}
}
// subscriberCase holds test case specifics for one subscriber.
type subscriberCase struct {
// filter is the subscribers filter.
filter []terminalapi.Event
// rec receives the events.
rec *receiver
// want are the expected events that should be delivered to this subscriber.
want map[terminalapi.Event]bool
// wantErr asserts whether we want an error from waitFor.
wantErr bool
}
func TestDistributionSystem(t *testing.T) {
tests := []struct {
desc string
// events will be sent down the distribution system.
events []terminalapi.Event
// subCase are the event subscribers and their expectations.
subCase []*subscriberCase
}{
{
desc: "no events and no subscribers",
},
{
desc: "events and no subscribers",
events: []terminalapi.Event{
&terminalapi.Mouse{},
&terminalapi.Keyboard{},
},
},
{
desc: "single subscriber, wants all events and gets them",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
&terminalapi.Mouse{Position: image.Point{1, 1}},
&terminalapi.Resize{Size: image.Point{2, 2}},
terminalapi.NewError("error"),
},
subCase: []*subscriberCase{
{
filter: nil,
rec: newReceiver(),
want: map[terminalapi.Event]bool{
&terminalapi.Keyboard{Key: keyboard.KeyEnter}: true,
&terminalapi.Mouse{Position: image.Point{1, 1}}: true,
&terminalapi.Resize{Size: image.Point{2, 2}}: true,
terminalapi.NewError("error"): true,
},
},
},
},
{
desc: "single subscriber, filters events",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
&terminalapi.Mouse{Position: image.Point{1, 1}},
&terminalapi.Resize{Size: image.Point{2, 2}},
},
subCase: []*subscriberCase{
{
filter: []terminalapi.Event{
&terminalapi.Keyboard{},
&terminalapi.Mouse{},
},
rec: newReceiver(),
want: map[terminalapi.Event]bool{
&terminalapi.Keyboard{Key: keyboard.KeyEnter}: true,
&terminalapi.Mouse{Position: image.Point{1, 1}}: true,
},
},
},
},
{
desc: "single subscriber, errors are always received",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
&terminalapi.Mouse{Position: image.Point{1, 1}},
&terminalapi.Resize{Size: image.Point{2, 2}},
terminalapi.NewError("error"),
},
subCase: []*subscriberCase{
{
filter: []terminalapi.Event{
&terminalapi.Keyboard{},
&terminalapi.Mouse{},
},
rec: newReceiver(),
want: map[terminalapi.Event]bool{
&terminalapi.Keyboard{Key: keyboard.KeyEnter}: true,
&terminalapi.Mouse{Position: image.Point{1, 1}}: true,
terminalapi.NewError("error"): true,
},
},
},
},
{
desc: "multiple subscribers and events",
events: []terminalapi.Event{
&terminalapi.Keyboard{Key: keyboard.KeyEnter},
&terminalapi.Keyboard{Key: keyboard.KeyEsc},
&terminalapi.Mouse{Position: image.Point{0, 0}},
&terminalapi.Mouse{Position: image.Point{1, 1}},
&terminalapi.Resize{Size: image.Point{1, 1}},
&terminalapi.Resize{Size: image.Point{2, 2}},
terminalapi.NewError("error1"),
terminalapi.NewError("error2"),
},
subCase: []*subscriberCase{
{
filter: []terminalapi.Event{
&terminalapi.Keyboard{},
},
rec: newReceiver(),
want: map[terminalapi.Event]bool{
&terminalapi.Keyboard{Key: keyboard.KeyEnter}: true,
&terminalapi.Keyboard{Key: keyboard.KeyEsc}: true,
terminalapi.NewError("error1"): true,
terminalapi.NewError("error2"): true,
},
},
{
filter: []terminalapi.Event{
&terminalapi.Mouse{},
&terminalapi.Resize{},
},
rec: newReceiver(),
want: map[terminalapi.Event]bool{
&terminalapi.Mouse{Position: image.Point{0, 0}}: true,
&terminalapi.Mouse{Position: image.Point{1, 1}}: true,
&terminalapi.Resize{Size: image.Point{1, 1}}: true,
&terminalapi.Resize{Size: image.Point{2, 2}}: true,
terminalapi.NewError("error1"): true,
terminalapi.NewError("error2"): true,
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
eds := NewDistributionSystem()
for _, sc := range tc.subCase {
stop := eds.Subscribe(sc.filter, sc.rec.receive)
defer stop()
}
for _, ev := range tc.events {
eds.Event(ev)
}
for i, sc := range tc.subCase {
got, err := sc.rec.waitFor(len(sc.want), 5*time.Second)
if err != nil {
t.Fatalf("sc.rec.waitFor[%d] => unexpected error: %v", i, err)
}
if diff := pretty.Compare(sc.want, got); diff != "" {
t.Errorf("sc.rec.waitFor[%d] => unexpected diff (-want, +got):\n%s", i, diff)
}
}
})
}
}