mirror of
https://github.com/hybridgroup/gobot.git
synced 2025-04-24 13:48:49 +08:00
aio(analog sensor): fix deadlock in cyclic reading (#1042)
This commit is contained in:
parent
39808451cf
commit
916c2bad18
@ -56,8 +56,7 @@ func NewAnalogSensorDriver(a AnalogReader, pin string, opts ...interface{}) *Ana
|
||||
driver: newDriver(a, "AnalogSensor"),
|
||||
sensorCfg: &sensorConfiguration{scale: func(input int) float64 { return float64(input) }},
|
||||
pin: pin,
|
||||
Eventer: gobot.NewEventer(),
|
||||
halt: make(chan bool),
|
||||
Eventer: gobot.NewEventer(), // needed early due to grove vibration sensor driver
|
||||
}
|
||||
d.afterStart = d.initialize
|
||||
d.beforeHalt = d.shutdown
|
||||
@ -76,10 +75,6 @@ func NewAnalogSensorDriver(a AnalogReader, pin string, opts ...interface{}) *Ana
|
||||
}
|
||||
}
|
||||
|
||||
d.AddEvent(Data)
|
||||
d.AddEvent(Value)
|
||||
d.AddEvent(Error)
|
||||
|
||||
d.AddCommand("Read", func(params map[string]interface{}) interface{} {
|
||||
val, err := d.Read()
|
||||
return map[string]interface{}{"val": val, "err": err}
|
||||
@ -112,59 +107,6 @@ func (a *AnalogSensorDriver) SetScaler(scaler func(int) float64) {
|
||||
WithSensorScaler(scaler).apply(a.sensorCfg)
|
||||
}
|
||||
|
||||
// initialize the AnalogSensorDriver and if the cyclic reading is active, reads the sensor at the given interval.
|
||||
// Emits the Events:
|
||||
//
|
||||
// Data int - Event is emitted on change and represents the current raw reading from the sensor.
|
||||
// Value float64 - Event is emitted on change and represents the current reading from the sensor.
|
||||
// Error error - Event is emitted on error reading from the sensor.
|
||||
func (a *AnalogSensorDriver) initialize() error {
|
||||
if a.sensorCfg.readInterval == 0 {
|
||||
// cyclic reading deactivated
|
||||
return nil
|
||||
}
|
||||
oldRawValue := 0
|
||||
oldValue := 0.0
|
||||
go func() {
|
||||
timer := time.NewTimer(a.sensorCfg.readInterval)
|
||||
timer.Stop()
|
||||
for {
|
||||
rawValue, value, err := a.analogRead()
|
||||
if err != nil {
|
||||
a.Publish(a.Event(Error), err)
|
||||
} else {
|
||||
if rawValue != oldRawValue && rawValue != -1 {
|
||||
a.Publish(a.Event(Data), rawValue)
|
||||
oldRawValue = rawValue
|
||||
}
|
||||
if value != oldValue && value != -1 {
|
||||
a.Publish(a.Event(Value), value)
|
||||
oldValue = value
|
||||
}
|
||||
}
|
||||
|
||||
timer.Reset(a.sensorCfg.readInterval)
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-a.halt:
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdown stops polling the analog sensor for new information
|
||||
func (a *AnalogSensorDriver) shutdown() error {
|
||||
if a.sensorCfg.readInterval == 0 {
|
||||
// cyclic reading deactivated
|
||||
return nil
|
||||
}
|
||||
a.halt <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pin returns the AnalogSensorDrivers pin
|
||||
func (a *AnalogSensorDriver) Pin() string { return a.pin }
|
||||
|
||||
@ -196,6 +138,73 @@ func (a *AnalogSensorDriver) RawValue() int {
|
||||
return a.lastRawValue
|
||||
}
|
||||
|
||||
// initialize the AnalogSensorDriver and if the cyclic reading is active, reads the sensor at the given interval.
|
||||
// Emits the Events:
|
||||
//
|
||||
// Data int - Event is emitted on change and represents the current raw reading from the sensor.
|
||||
// Value float64 - Event is emitted on change and represents the current reading from the sensor.
|
||||
// Error error - Event is emitted on error reading from the sensor.
|
||||
func (a *AnalogSensorDriver) initialize() error {
|
||||
if a.sensorCfg.readInterval == 0 {
|
||||
// cyclic reading deactivated
|
||||
return nil
|
||||
}
|
||||
|
||||
a.AddEvent(Data)
|
||||
a.AddEvent(Value)
|
||||
a.AddEvent(Error)
|
||||
|
||||
// A small buffer is needed to prevent mutex-channel-deadlock between Halt() and analogRead().
|
||||
// This can happen, if the shutdown is in progress (mutex passed) and the go routine is calling
|
||||
// the analogRead() in between, before the halt can be evaluated by the select statement.
|
||||
// In this case the mutex of analogRead() blocks the reading of the halt channel and, without a small buffer,
|
||||
// the writing to halt is blocked because there is no immediate read from channel.
|
||||
// Please note, that this is special behavior caused by the first read is done immediately before the select
|
||||
// statement.
|
||||
a.halt = make(chan bool, 1)
|
||||
|
||||
oldRawValue := 0
|
||||
oldValue := 0.0
|
||||
go func() {
|
||||
timer := time.NewTimer(a.sensorCfg.readInterval)
|
||||
timer.Stop()
|
||||
for {
|
||||
// please note, that this ensures the first read is done immediately, but has drawbacks, see notes above
|
||||
rawValue, value, err := a.analogRead()
|
||||
if err != nil {
|
||||
a.Publish(a.Event(Error), err)
|
||||
} else {
|
||||
if rawValue != oldRawValue && rawValue != -1 {
|
||||
a.Publish(a.Event(Data), rawValue)
|
||||
oldRawValue = rawValue
|
||||
}
|
||||
if value != oldValue && value != -1 {
|
||||
a.Publish(a.Event(Value), value)
|
||||
oldValue = value
|
||||
}
|
||||
}
|
||||
timer.Reset(a.sensorCfg.readInterval) // ensure that after each read is a wait, independent of duration of read
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-a.halt:
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// shutdown stops polling the analog sensor for new information
|
||||
func (a *AnalogSensorDriver) shutdown() error {
|
||||
if a.sensorCfg.readInterval == 0 || a.halt == nil {
|
||||
// cyclic reading deactivated
|
||||
return nil
|
||||
}
|
||||
a.halt <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
// analogRead performs an reading from the sensor and sets the internal attributes and returns the raw and scaled value
|
||||
func (a *AnalogSensorDriver) analogRead() (int, float64, error) {
|
||||
a.mutex.Lock()
|
||||
|
@ -34,7 +34,7 @@ func TestNewAnalogSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -171,6 +171,9 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// act (start cyclic reading)
|
||||
require.NoError(t, d.Start())
|
||||
|
||||
// arrange: expect raw value to be received
|
||||
_ = d.Once(d.Event(Data), func(data interface{}) {
|
||||
assert.Equal(t, 100, data.(int))
|
||||
@ -185,9 +188,6 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
|
||||
nextVal <- -1 // arrange: error in read function
|
||||
})
|
||||
|
||||
// act (start cyclic reading)
|
||||
require.NoError(t, d.Start())
|
||||
|
||||
// assert: both events within timeout
|
||||
select {
|
||||
case <-semDone:
|
||||
@ -233,13 +233,14 @@ func TestAnalogSensor_WithSensorCyclicRead(t *testing.T) {
|
||||
func TestAnalogSensorHalt_WithSensorCyclicRead(t *testing.T) {
|
||||
// arrange
|
||||
d := NewAnalogSensorDriver(newAioTestAdaptor(), "1", WithSensorCyclicRead(10*time.Millisecond))
|
||||
require.NoError(t, d.Start())
|
||||
done := make(chan struct{})
|
||||
// act & assert
|
||||
go func() {
|
||||
<-d.halt
|
||||
require.NoError(t, d.Halt())
|
||||
close(done)
|
||||
}()
|
||||
// act & assert
|
||||
require.NoError(t, d.Halt())
|
||||
// test that the halt is not blocked by any deadlock with mutex and/or channel
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
|
@ -39,7 +39,7 @@ func TestNewGroveRotaryDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -65,7 +65,7 @@ func TestNewGroveLightSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -91,7 +91,7 @@ func TestNewGrovePiezoVibrationSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -117,7 +117,7 @@ func TestNewGroveSoundSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -151,8 +151,9 @@ func TestGroveDriverHalt_WithSensorCyclicRead(t *testing.T) {
|
||||
lastCallCount := atomic.LoadInt32(&callCount)
|
||||
// If driver was not halted, digital reads would still continue
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
if atomic.LoadInt32(&callCount) != lastCallCount {
|
||||
t.Errorf("AnalogRead was called after driver was halted")
|
||||
// note: if a reading is already in progress, it will be finished before halt have an impact
|
||||
if atomic.LoadInt32(&callCount) > lastCallCount+1 {
|
||||
t.Errorf("AnalogRead was called more than once after driver was halted")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ func TestNewGroveTemperatureSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -103,14 +103,15 @@ func TestGroveTemperatureSensor_publishesTemperatureInCelsius(t *testing.T) {
|
||||
a.analogReadFunc = func() (int, error) {
|
||||
return 585, nil
|
||||
}
|
||||
|
||||
// act: start cyclic reading
|
||||
require.NoError(t, d.Start())
|
||||
|
||||
_ = d.Once(d.Event(Value), func(data interface{}) {
|
||||
assert.Equal(t, "31.62", fmt.Sprintf("%.2f", data.(float64)))
|
||||
sem <- true
|
||||
})
|
||||
|
||||
// act: start cyclic reading
|
||||
require.NoError(t, d.Start())
|
||||
|
||||
// assert: value was published
|
||||
select {
|
||||
case <-sem:
|
||||
|
@ -31,7 +31,7 @@ func TestNewTemperatureSensorDriver(t *testing.T) {
|
||||
assert.Equal(t, pin, d.Pin())
|
||||
assert.InDelta(t, 0.0, d.lastValue, 0, 0)
|
||||
assert.Equal(t, 0, d.lastRawValue)
|
||||
assert.NotNil(t, d.halt)
|
||||
assert.Nil(t, d.halt) // will be created on initialize, if cyclic reading is on
|
||||
assert.NotNil(t, d.Eventer)
|
||||
require.NotNil(t, d.sensorCfg)
|
||||
assert.Equal(t, time.Duration(0), d.sensorCfg.readInterval)
|
||||
@ -139,11 +139,12 @@ func TestTemperatureSensorPublishesTemperatureInCelsius(t *testing.T) {
|
||||
a.analogReadFunc = func() (int, error) {
|
||||
return 585, nil
|
||||
}
|
||||
|
||||
require.NoError(t, d.Start())
|
||||
_ = d.Once(d.Event(Value), func(data interface{}) {
|
||||
assert.Equal(t, "31.62", fmt.Sprintf("%.2f", data.(float64)))
|
||||
sem <- true
|
||||
})
|
||||
require.NoError(t, d.Start())
|
||||
|
||||
select {
|
||||
case <-sem:
|
||||
@ -184,16 +185,17 @@ func TestTemperatureSensorHalt_WithSensorCyclicRead(t *testing.T) {
|
||||
// arrange
|
||||
d := NewTemperatureSensorDriver(newAioTestAdaptor(), "1", WithSensorCyclicRead(10*time.Millisecond))
|
||||
done := make(chan struct{})
|
||||
require.NoError(t, d.Start())
|
||||
// act & assert
|
||||
go func() {
|
||||
<-d.halt
|
||||
require.NoError(t, d.Halt())
|
||||
close(done)
|
||||
}()
|
||||
// act & assert
|
||||
require.NoError(t, d.Halt())
|
||||
// test that the halt is not blocked by any deadlock with mutex and/or channel
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Errorf(" Temperature Sensor was not halted")
|
||||
t.Errorf("Temperature Sensor was not halted")
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user