1
0
mirror of https://github.com/hybridgroup/gobot.git synced 2025-04-24 13:48:49 +08:00

system(gpio): add edge polling function (#1015)

This commit is contained in:
Thomas Kohler 2023-10-26 20:41:41 +02:00 committed by GitHub
parent 002c75ce88
commit 1f09353831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 737 additions and 83 deletions

View File

@ -21,12 +21,18 @@ type DigitalPinOptioner interface {
SetDrive(drive int) (changed bool)
// SetDebounce initializes the input pin with the given debounce period.
SetDebounce(period time.Duration) (changed bool)
// SetEventHandlerForEdge initializes the input pin for edge detection and to call the event handler on specified edge.
// SetEventHandlerForEdge initializes the input pin for edge detection to call the event handler on specified edge.
// lineOffset is within the GPIO chip (needs to transformed to the pin id), timestamp is the detection time,
// detectedEdge contains the direction of the pin changes, seqno is the sequence number for this event in the sequence
// of events for all the lines in this line request, lseqno is the same but for this line
SetEventHandlerForEdge(handler func(lineOffset int, timestamp time.Duration, detectedEdge string, seqno uint32,
lseqno uint32), edge int) (changed bool)
// SetPollForEdgeDetection use a discrete input polling method to detect edges. A poll interval of zero or smaller
// will deactivate this function. Please note: Using this feature is CPU consuming and less accurate than using cdev
// event handler (gpiod implementation) and should be done only if the former is not implemented or not working for
// the adaptor. E.g. sysfs driver in gobot has not implemented edge detection yet. The function is only useful
// together with SetEventHandlerForEdge() and its corresponding With*() functions.
SetPollForEdgeDetection(pollInterval time.Duration, pollQuitChan chan struct{}) (changed bool)
}
// DigitalPinOptionApplier is the interface to apply options to change pin behavior immediately

View File

@ -142,6 +142,28 @@ Connect the input header pin26 to +3.3V with an resistor (e.g. 1kOhm).
1
```
### Test edge detection behavior of gpio251 (sysfs Tinkerboard)
investigate status:
```sh
# cat /sys/class/gpio/gpio251/edge
none
```
The file exists only if the pin can be configured as an interrupt generating input pin. To activate edge detection,
"rising", "falling", or "both" needs to be set.
```sh
# cat /sys/class/gpio/gpio251/value
1
```
If edge detection is activated, a poll will return only when the interrupt was triggered. The new value is written to
the beginning of the file.
> Not tested yet, not supported by gobot yet.
### Test output behavior of gpio251 (sysfs Tinkerboard)
Connect the output header pin26 to +3.3V with an resistor (e.g. 1kOhm leads to ~0.3mA, 300Ohm leads to ~10mA).

View File

@ -53,6 +53,8 @@ type digitalPinConfig struct {
debouncePeriod time.Duration
edge int
edgeEventHandler func(lineOffset int, timestamp time.Duration, detectedEdge string, seqno uint32, lseqno uint32)
pollInterval time.Duration
pollQuitChan chan struct{}
}
func newDigitalPinConfig(label string, options ...func(gobot.DigitalPinOptioner) bool) *digitalPinConfig {
@ -140,6 +142,16 @@ func WithPinEventOnBothEdges(handler func(lineOffset int, timestamp time.Duratio
}
}
// WithPinPollForEdgeDetection initializes a discrete input pin polling function to use for edge detection.
func WithPinPollForEdgeDetection(
pollInterval time.Duration,
pollQuitChan chan struct{},
) func(gobot.DigitalPinOptioner) bool {
return func(d gobot.DigitalPinOptioner) bool {
return d.SetPollForEdgeDetection(pollInterval, pollQuitChan)
}
}
// SetLabel sets the label to use for next reconfigure. The function is intended to use by WithPinLabel().
func (d *digitalPinConfig) SetLabel(label string) bool {
if d.label == label {
@ -211,9 +223,12 @@ func (d *digitalPinConfig) SetDebounce(period time.Duration) bool {
return true
}
// SetEventHandlerForEdge sets the input pin to edge detection and to call the event handler on specified edge. The
// SetEventHandlerForEdge sets the input pin to edge detection to call the event handler on specified edge. The
// function is intended to use by WithPinEventOnFallingEdge(), WithPinEventOnRisingEdge() and WithPinEventOnBothEdges().
func (d *digitalPinConfig) SetEventHandlerForEdge(handler func(int, time.Duration, string, uint32, uint32), edge int) bool {
func (d *digitalPinConfig) SetEventHandlerForEdge(
handler func(int, time.Duration, string, uint32, uint32),
edge int,
) bool {
if d.edge == edge {
return false
}
@ -221,3 +236,21 @@ func (d *digitalPinConfig) SetEventHandlerForEdge(handler func(int, time.Duratio
d.edgeEventHandler = handler
return true
}
// SetPollForEdgeDetection use a discrete input polling method to detect edges. A poll interval of zero or smaller
// will deactivate this function. Please note: Using this feature is CPU consuming and less accurate than using cdev
// event handler (gpiod implementation) and should be done only if the former is not implemented or not working for
// the adaptor. E.g. sysfs driver in gobot has not implemented edge detection yet. The function is only useful
// together with SetEventHandlerForEdge() and its corresponding With*() functions.
// The function is intended to use by WithPinPollForEdgeDetection().
func (d *digitalPinConfig) SetPollForEdgeDetection(
pollInterval time.Duration,
pollQuitChan chan struct{},
) (changed bool) {
if d.pollInterval == pollInterval {
return false
}
d.pollInterval = pollInterval
d.pollQuitChan = pollQuitChan
return true
}

View File

@ -413,3 +413,36 @@ func TestWithPinEventOnBothEdges(t *testing.T) {
})
}
}
func TestWithPinPollForEdgeDetection(t *testing.T) {
const (
oldVal = time.Duration(1)
newVal = time.Duration(3)
)
tests := map[string]struct {
oldPollInterval time.Duration
want bool
wantVal time.Duration
}{
"no_change": {
oldPollInterval: newVal,
},
"change": {
oldPollInterval: oldVal,
want: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
dpc := &digitalPinConfig{pollInterval: tc.oldPollInterval}
stopChan := make(chan struct{})
defer close(stopChan)
// act
got := WithPinPollForEdgeDetection(newVal, stopChan)(dpc)
// assert
assert.Equal(t, tc.want, got)
assert.Equal(t, newVal, dpc.pollInterval)
})
}
}

View File

@ -215,7 +215,8 @@ func digitalPinGpiodReconfigureLine(d *digitalPinGpiod, forceInput bool) error {
opts = append(opts, gpiod.WithDebounce(d.debouncePeriod))
}
// edge detection
if d.edgeEventHandler != nil {
if d.edgeEventHandler != nil && d.pollInterval <= 0 {
// use edge detection provided by gpiod
wrappedHandler := digitalPinGpiodGetWrappedEventHandler(d.edgeEventHandler)
switch d.edge {
case digitalPinEventOnFallingEdge:
@ -277,10 +278,20 @@ func digitalPinGpiodReconfigureLine(d *digitalPinGpiod, forceInput bool) error {
}
d.line = gpiodLine
// start discrete polling function and wait for first read is done
if (d.direction == IN || forceInput) && d.pollInterval > 0 {
if err := startEdgePolling(d.label, d.Read, d.pollInterval, d.edge, d.edgeEventHandler,
d.pollQuitChan); err != nil {
return err
}
}
return nil
}
func digitalPinGpiodGetWrappedEventHandler(handler func(int, time.Duration, string, uint32, uint32)) func(gpiod.LineEvent) {
func digitalPinGpiodGetWrappedEventHandler(
handler func(int, time.Duration, string, uint32, uint32),
) func(gpiod.LineEvent) {
return func(evt gpiod.LineEvent) {
detectedEdge := "none"
switch evt.Type {

View File

@ -114,7 +114,7 @@ func TestApplyOptions(t *testing.T) {
}
}
func TestExport(t *testing.T) {
func TestExportGpiod(t *testing.T) {
tests := map[string]struct {
simErr error
wantReconfigured int
@ -155,7 +155,7 @@ func TestExport(t *testing.T) {
}
}
func TestUnexport(t *testing.T) {
func TestUnexportGpiod(t *testing.T) {
tests := map[string]struct {
simNoLine bool
simReconfErr error
@ -217,7 +217,7 @@ func TestUnexport(t *testing.T) {
}
}
func TestWrite(t *testing.T) {
func TestWriteGpiod(t *testing.T) {
tests := map[string]struct {
val int
simErr error
@ -266,7 +266,7 @@ func TestWrite(t *testing.T) {
}
}
func TestRead(t *testing.T) {
func TestReadGpiod(t *testing.T) {
tests := map[string]struct {
simVal int
simErr error

81
system/digitalpin_poll.go Normal file
View File

@ -0,0 +1,81 @@
package system
import (
"fmt"
"sync"
"time"
)
func startEdgePolling(
pinLabel string,
pinReadFunc func() (int, error),
pollInterval time.Duration,
wantedEdge int,
eventHandler func(offset int, t time.Duration, et string, sn uint32, lsn uint32),
quitChan chan struct{},
) error {
if eventHandler == nil {
return fmt.Errorf("an event handler is mandatory for edge polling")
}
if quitChan == nil {
return fmt.Errorf("the quit channel is mandatory for edge polling")
}
const allEdges = "all"
triggerEventOn := "none"
switch wantedEdge {
case digitalPinEventOnFallingEdge:
triggerEventOn = DigitalPinEventFallingEdge
case digitalPinEventOnRisingEdge:
triggerEventOn = DigitalPinEventRisingEdge
case digitalPinEventOnBothEdges:
triggerEventOn = allEdges
default:
return fmt.Errorf("unsupported edge type %d for edge polling", wantedEdge)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
var oldState int
var readStart time.Time
var firstLoopDone bool
for {
select {
case <-quitChan:
return
default:
// note: pure reading takes between 30us and 1ms on rasperry Pi1, typically 50us, with sysfs also 500us
// can happen, so we use the time stamp before start of reading to reduce random duration offset
readStart = time.Now()
readValue, err := pinReadFunc()
if err != nil {
fmt.Printf("edge polling error occurred while reading the pin %s: %v", pinLabel, err)
readValue = oldState // keep the value
}
if readValue != oldState {
detectedEdge := DigitalPinEventRisingEdge
if readValue < oldState {
detectedEdge = DigitalPinEventFallingEdge
}
if firstLoopDone && (triggerEventOn == allEdges || triggerEventOn == detectedEdge) {
eventHandler(0, time.Duration(readStart.UnixNano()), detectedEdge, 0, 0)
}
oldState = readValue
}
// the real poll interval is increased by the reading time, see also note above
// negative or zero duration causes no sleep
time.Sleep(pollInterval - time.Since(readStart))
if !firstLoopDone {
wg.Done()
firstLoopDone = true
}
}
}
}()
wg.Wait()
return nil
}

View File

@ -0,0 +1,175 @@
package system
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func Test_startEdgePolling(t *testing.T) {
type readValue struct {
value int
err string
}
tests := map[string]struct {
eventOnEdge int
simulateReadValues []readValue
simulateNoEventHandler bool
simulateNoQuitChan bool
wantEdgeTypes []string
wantErr string
}{
"edge_falling": {
eventOnEdge: digitalPinEventOnFallingEdge,
simulateReadValues: []readValue{
{value: 1},
{value: 0},
{value: 1},
{value: 0},
{value: 0},
},
wantEdgeTypes: []string{DigitalPinEventFallingEdge, DigitalPinEventFallingEdge},
},
"no_edge_falling": {
eventOnEdge: digitalPinEventOnFallingEdge,
simulateReadValues: []readValue{
{value: 0},
{value: 1},
{value: 1},
},
wantEdgeTypes: nil,
},
"edge_rising": {
eventOnEdge: digitalPinEventOnRisingEdge,
simulateReadValues: []readValue{
{value: 0},
{value: 1},
{value: 0},
{value: 1},
{value: 1},
},
wantEdgeTypes: []string{DigitalPinEventRisingEdge, DigitalPinEventRisingEdge},
},
"no_edge_rising": {
eventOnEdge: digitalPinEventOnRisingEdge,
simulateReadValues: []readValue{
{value: 1},
{value: 0},
{value: 0},
},
wantEdgeTypes: nil,
},
"edge_both": {
eventOnEdge: digitalPinEventOnBothEdges,
simulateReadValues: []readValue{
{value: 0},
{value: 1},
{value: 0},
{value: 1},
{value: 1},
},
wantEdgeTypes: []string{DigitalPinEventRisingEdge, DigitalPinEventFallingEdge, DigitalPinEventRisingEdge},
},
"no_edges_low": {
eventOnEdge: digitalPinEventOnBothEdges,
simulateReadValues: []readValue{
{value: 0},
{value: 0},
{value: 0},
},
wantEdgeTypes: nil,
},
"no_edges_high": {
eventOnEdge: digitalPinEventOnBothEdges,
simulateReadValues: []readValue{
{value: 1},
{value: 1},
{value: 1},
},
wantEdgeTypes: nil,
},
"read_error_keep_state": {
eventOnEdge: digitalPinEventOnBothEdges,
simulateReadValues: []readValue{
{value: 0},
{value: 1, err: "read error suppress rising and falling edge"},
{value: 0},
{value: 1},
{value: 1},
},
wantEdgeTypes: []string{DigitalPinEventRisingEdge},
},
"error_no_eventhandler": {
simulateNoEventHandler: true,
wantErr: "event handler is mandatory",
},
"error_no_quitchannel": {
simulateNoQuitChan: true,
wantErr: "quit channel is mandatory",
},
"error_unsupported_edgetype_none": {
eventOnEdge: digitalPinEventNone,
wantErr: "unsupported edge type 0",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
pinLabel := "test_pin"
pollInterval := time.Microsecond // zero is possible, just to show usage
// arrange event handler
var edgeTypes []string
var eventHandler func(int, time.Duration, string, uint32, uint32)
if !tc.simulateNoEventHandler {
eventHandler = func(offset int, t time.Duration, et string, sn uint32, lsn uint32) {
edgeTypes = append(edgeTypes, et)
}
}
// arrange quit channel
var quitChan chan struct{}
if !tc.simulateNoQuitChan {
quitChan = make(chan struct{})
}
defer func() {
if quitChan != nil {
close(quitChan)
}
}()
// arrange reads
numCallsRead := 0
wg := sync.WaitGroup{}
if tc.simulateReadValues != nil {
wg.Add(1)
}
readFunc := func() (int, error) {
numCallsRead++
readVal := tc.simulateReadValues[numCallsRead-1]
var err error
if readVal.err != "" {
err = fmt.Errorf(readVal.err)
}
if numCallsRead >= len(tc.simulateReadValues) {
close(quitChan) // ensure no further read call
quitChan = nil // lets skip defer routine
wg.Done() // release assertions
}
return readVal.value, err
}
// act
err := startEdgePolling(pinLabel, readFunc, pollInterval, tc.eventOnEdge, eventHandler, quitChan)
wg.Wait()
// assert
if tc.wantErr != "" {
assert.ErrorContains(t, err, tc.wantErr)
} else {
assert.NoError(t, err)
}
assert.Equal(t, len(tc.simulateReadValues), numCallsRead)
assert.Equal(t, tc.wantEdgeTypes, edgeTypes)
})
}
}

View File

@ -47,7 +47,7 @@ func newDigitalPinSysfs(fs filesystem, pin string, options ...func(gobot.Digital
func (d *digitalPinSysfs) ApplyOptions(options ...func(gobot.DigitalPinOptioner) bool) error {
anyChange := false
for _, option := range options {
anyChange = anyChange || option(d)
anyChange = option(d) || anyChange
}
if anyChange {
return d.reconfigure()
@ -87,7 +87,7 @@ func (d *digitalPinSysfs) Unexport() error {
d.activeLowFile = nil
}
_, err = writeFile(unexport, []byte(d.pin))
err = writeFile(unexport, []byte(d.pin))
if err != nil {
// If EINVAL then the pin is reserved in the system and can't be unexported
e, ok := err.(*os.PathError)
@ -101,7 +101,7 @@ func (d *digitalPinSysfs) Unexport() error {
// Write writes the given value to the character device
func (d *digitalPinSysfs) Write(b int) error {
_, err := writeFile(d.valFile, []byte(strconv.Itoa(b)))
err := writeFile(d.valFile, []byte(strconv.Itoa(b)))
return err
}
@ -121,7 +121,7 @@ func (d *digitalPinSysfs) reconfigure() error {
}
defer exportFile.Close()
_, err = writeFile(exportFile, []byte(d.pin))
err = writeFile(exportFile, []byte(d.pin))
if err != nil {
// If EBUSY then the pin has already been exported
e, ok := err.(*os.PathError)
@ -142,7 +142,7 @@ func (d *digitalPinSysfs) reconfigure() error {
break
}
if attempt > 10 {
return err
break
}
time.Sleep(10 * time.Millisecond)
}
@ -164,45 +164,59 @@ func (d *digitalPinSysfs) reconfigure() error {
if d.activeLow {
d.activeLowFile, err = d.fs.openFile(fmt.Sprintf("%s/%s/active_low", gpioPath, d.label), os.O_RDWR, 0o644)
if err == nil {
_, err = writeFile(d.activeLowFile, []byte("1"))
err = writeFile(d.activeLowFile, []byte("1"))
}
}
}
// configure bias (unsupported)
// configure bias (inputs and outputs, unsupported)
if err == nil {
if d.bias != digitalPinBiasDefault && systemSysfsDebug {
log.Printf("bias options (%d) are not supported by sysfs, please use hardware resistors instead\n", d.bias)
}
}
// configure drive (unsupported)
if d.drive != digitalPinDrivePushPull && systemSysfsDebug {
log.Printf("drive options (%d) are not supported by sysfs\n", d.drive)
}
// configure debounce period (inputs only), edge detection (inputs only) and drive (outputs only)
if d.direction == IN {
// configure debounce (unsupported)
if d.debouncePeriod != 0 && systemSysfsDebug {
log.Printf("debounce period option (%d) is not supported by sysfs\n", d.debouncePeriod)
}
// configure debounce (unsupported)
if d.debouncePeriod != 0 && systemSysfsDebug {
log.Printf("debounce period option (%d) is not supported by sysfs\n", d.debouncePeriod)
}
// configure edge detection
if err == nil {
if d.edge != 0 && d.pollInterval <= 0 {
err = fmt.Errorf("edge detect option (%d) is not implemented for sysfs without discrete polling", d.edge)
}
}
// configure edge detection (not implemented)
if d.edge != 0 && systemSysfsDebug {
log.Printf("edge detect option (%d) is not implemented for sysfs\n", d.edge)
// start discrete polling function and wait for first read is done
if err == nil {
if d.pollInterval > 0 {
err = startEdgePolling(d.label, d.Read, d.pollInterval, d.edge, d.edgeEventHandler, d.pollQuitChan)
}
}
} else {
// configure drive (unsupported)
if d.drive != digitalPinDrivePushPull && systemSysfsDebug {
log.Printf("drive options (%d) are not supported by sysfs\n", d.drive)
}
}
if err != nil {
return d.Unexport()
if e := d.Unexport(); e != nil {
err = fmt.Errorf("unexport error '%v' after '%v'", e, err)
}
}
return err
}
func (d *digitalPinSysfs) writeDirectionWithInitialOutput() error {
if _, err := writeFile(d.dirFile, []byte(d.direction)); err != nil || d.direction == IN {
if err := writeFile(d.dirFile, []byte(d.direction)); err != nil || d.direction == IN {
return err
}
_, err := writeFile(d.valFile, []byte(strconv.Itoa(d.outInitialState)))
err := writeFile(d.valFile, []byte(strconv.Itoa(d.outInitialState)))
return err
}
@ -210,9 +224,9 @@ func (d *digitalPinSysfs) writeDirectionWithInitialOutput() error {
// https://www.kernel.org/doc/Documentation/filesystems/sysfs.txt
// https://www.kernel.org/doc/Documentation/gpio/sysfs.txt
var writeFile = func(f File, data []byte) (i int, err error) {
var writeFile = func(f File, data []byte) error {
if f == nil {
return 0, errNotExported
return errNotExported
}
// sysfs docs say:
@ -221,8 +235,9 @@ var writeFile = func(f File, data []byte) (i int, err error) {
// > entire buffer back.
// however, this seems outdated/inaccurate (docs are from back in the Kernel BitKeeper days).
i, err = f.Write(data)
return i, err
// Write() returns already a non-nil error when n != len(b).
_, err := f.Write(data)
return err
}
var readFile = func(f File) ([]byte, error) {

View File

@ -4,8 +4,10 @@ import (
"errors"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gobot.io/x/gobot/v2"
)
@ -16,20 +18,333 @@ var (
_ gobot.DigitalPinOptionApplier = (*digitalPinSysfs)(nil)
)
func initTestDigitalPinSysFsWithMockedFilesystem(mockPaths []string) (*digitalPinSysfs, *MockFilesystem) {
func initTestDigitalPinSysfsWithMockedFilesystem(mockPaths []string) (*digitalPinSysfs, *MockFilesystem) {
fs := newMockFilesystem(mockPaths)
pin := newDigitalPinSysfs(fs, "10")
return pin, fs
}
func TestDigitalPin(t *testing.T) {
func Test_newDigitalPinSysfs(t *testing.T) {
// arrange
m := &MockFilesystem{}
const pinID = "1"
// act
pin := newDigitalPinSysfs(m, pinID, WithPinOpenDrain())
// assert
assert.Equal(t, pinID, pin.pin)
assert.Equal(t, m, pin.fs)
assert.Equal(t, "gpio"+pinID, pin.label)
assert.Equal(t, "in", pin.direction)
assert.Equal(t, 1, pin.drive)
}
func TestApplyOptionsSysfs(t *testing.T) {
tests := map[string]struct {
changed []bool
simErr bool
wantExport string
wantErr string
}{
"both_changed": {
changed: []bool{true, true},
wantExport: "10",
},
"first_changed": {
changed: []bool{true, false},
wantExport: "10",
},
"second_changed": {
changed: []bool{false, true},
wantExport: "10",
},
"none_changed": {
changed: []bool{false, false},
wantExport: "",
},
"error_on_change": {
changed: []bool{false, true},
simErr: true,
wantExport: "10",
wantErr: "gpio10/direction: no such file",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
mockPaths := []string{
"/sys/class/gpio/export",
"/sys/class/gpio/gpio10/value",
}
if !tc.simErr {
mockPaths = append(mockPaths, "/sys/class/gpio/gpio10/direction")
}
pin, fs := initTestDigitalPinSysfsWithMockedFilesystem(mockPaths)
optionFunction1 := func(gobot.DigitalPinOptioner) bool {
pin.digitalPinConfig.direction = OUT
return tc.changed[0]
}
optionFunction2 := func(gobot.DigitalPinOptioner) bool {
pin.digitalPinConfig.drive = 15
return tc.changed[1]
}
// act
err := pin.ApplyOptions(optionFunction1, optionFunction2)
// assert
if tc.wantErr != "" {
assert.ErrorContains(t, err, tc.wantErr)
} else {
assert.NoError(t, err)
}
assert.Equal(t, OUT, pin.digitalPinConfig.direction)
assert.Equal(t, 15, pin.digitalPinConfig.drive)
// marker for call of reconfigure, correct reconfigure is tested independently
assert.Equal(t, tc.wantExport, fs.Files["/sys/class/gpio/export"].Contents)
})
}
}
func TestDirectionBehaviorSysfs(t *testing.T) {
// arrange
pin := newDigitalPinSysfs(nil, "1")
require.Equal(t, "in", pin.direction)
pin.direction = "test"
// act && assert
assert.Equal(t, "test", pin.DirectionBehavior())
}
func TestDigitalPinExportSysfs(t *testing.T) {
// this tests mainly the function reconfigure()
const (
exportPath = "/sys/class/gpio/export"
dirPath = "/sys/class/gpio/gpio10/direction"
valuePath = "/sys/class/gpio/gpio10/value"
inversePath = "/sys/class/gpio/gpio10/active_low"
unexportPath = "/sys/class/gpio/unexport"
)
allMockPaths := []string{exportPath, dirPath, valuePath, inversePath, unexportPath}
tests := map[string]struct {
mockPaths []string
changeDirection string
changeOutInitialState int
changeActiveLow bool
changeBias int
changeDrive int
changeDebouncePeriod time.Duration
changeEdge int
changePollInterval time.Duration
simEbusyOnWrite int
wantWrites int
wantExport string
wantUnexport string
wantDirection string
wantValue string
wantInverse string
wantErr string
}{
"ok_without_option": {
mockPaths: allMockPaths,
wantWrites: 2,
wantExport: "10",
wantDirection: "in",
},
"ok_input_bias_dropped": {
mockPaths: allMockPaths,
changeBias: 3,
wantWrites: 2,
wantExport: "10",
wantDirection: "in",
},
"ok_input_drive_dropped": {
mockPaths: allMockPaths,
changeDrive: 2,
wantWrites: 2,
wantExport: "10",
wantDirection: "in",
},
"ok_input_debounce_dropped": {
mockPaths: allMockPaths,
changeDebouncePeriod: 2 * time.Second,
wantWrites: 2,
wantExport: "10",
wantDirection: "in",
},
"ok_input_inverse": {
mockPaths: allMockPaths,
changeActiveLow: true,
wantWrites: 3,
wantExport: "10",
wantDirection: "in",
wantInverse: "1",
},
"ok_output": {
mockPaths: allMockPaths,
changeDirection: "out",
changeOutInitialState: 4,
wantWrites: 3,
wantExport: "10",
wantDirection: "out",
wantValue: "4",
},
"ok_output_bias_dropped": {
mockPaths: allMockPaths,
changeDirection: "out",
changeBias: 3,
wantWrites: 3,
wantExport: "10",
wantDirection: "out",
wantValue: "0",
},
"ok_output_drive_dropped": {
mockPaths: allMockPaths,
changeDirection: "out",
changeDrive: 2,
wantWrites: 3,
wantExport: "10",
wantDirection: "out",
wantValue: "0",
},
"ok_output_debounce_dropped": {
mockPaths: allMockPaths,
changeDirection: "out",
changeDebouncePeriod: 2 * time.Second,
wantWrites: 3,
wantExport: "10",
wantDirection: "out",
wantValue: "0",
},
"ok_output_inverse": {
mockPaths: allMockPaths,
changeDirection: "out",
changeActiveLow: true,
wantWrites: 4,
wantExport: "10",
wantDirection: "out",
wantInverse: "1",
wantValue: "0",
},
"ok_already_exported": {
mockPaths: allMockPaths,
wantWrites: 2,
wantExport: "10",
wantDirection: "in",
simEbusyOnWrite: 1, // just means "already exported"
},
"error_no_eventhandler_for_polling": { // this only tests the call of function, all other is tested separately
mockPaths: allMockPaths,
changePollInterval: 3 * time.Second,
wantWrites: 3,
wantUnexport: "10",
wantDirection: "in",
wantErr: "event handler is mandatory",
},
"error_no_export_file": {
mockPaths: []string{unexportPath},
wantErr: "/export: no such file",
},
"error_no_direction_file": {
mockPaths: []string{exportPath, unexportPath},
wantWrites: 2,
wantUnexport: "10",
wantErr: "gpio10/direction: no such file",
},
"error_write_direction_file": {
mockPaths: allMockPaths,
wantWrites: 3,
wantUnexport: "10",
simEbusyOnWrite: 2,
wantErr: "device or resource busy",
},
"error_no_value_file": {
mockPaths: []string{exportPath, dirPath, unexportPath},
wantWrites: 2,
wantUnexport: "10",
wantErr: "gpio10/value: no such file",
},
"error_no_inverse_file": {
mockPaths: []string{exportPath, dirPath, valuePath, unexportPath},
changeActiveLow: true,
wantWrites: 3,
wantUnexport: "10",
wantErr: "gpio10/active_low: no such file",
},
"error_input_edge_without_poll": {
mockPaths: allMockPaths,
changeEdge: 2,
wantWrites: 3,
wantUnexport: "10",
wantErr: "not implemented for sysfs without discrete polling",
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// arrange
fs := newMockFilesystem(tc.mockPaths)
pin := newDigitalPinSysfs(fs, "10")
if tc.changeDirection != "" {
pin.direction = tc.changeDirection
}
if tc.changeOutInitialState != 0 {
pin.outInitialState = tc.changeOutInitialState
}
if tc.changeActiveLow {
pin.activeLow = tc.changeActiveLow
}
if tc.changeBias != 0 {
pin.bias = tc.changeBias
}
if tc.changeDrive != 0 {
pin.drive = tc.changeDrive
}
if tc.changeDebouncePeriod != 0 {
pin.debouncePeriod = tc.changeDebouncePeriod
}
if tc.changeEdge != 0 {
pin.edge = tc.changeEdge
}
if tc.changePollInterval != 0 {
pin.pollInterval = tc.changePollInterval
}
// arrange write function
oldWriteFunc := writeFile
numCallsWrite := 0
writeFile = func(f File, data []byte) error {
numCallsWrite++
require.NoError(t, oldWriteFunc(f, data))
if numCallsWrite == tc.simEbusyOnWrite {
return &os.PathError{Err: Syscall_EBUSY}
}
return nil
}
defer func() { writeFile = oldWriteFunc }()
// act
err := pin.Export()
// assert
if tc.wantErr != "" {
assert.ErrorContains(t, err, tc.wantErr)
} else {
assert.NoError(t, err)
assert.NotNil(t, pin.valFile)
assert.NotNil(t, pin.dirFile)
assert.Equal(t, tc.wantDirection, fs.Files[dirPath].Contents)
assert.Equal(t, tc.wantExport, fs.Files[exportPath].Contents)
assert.Equal(t, tc.wantValue, fs.Files[valuePath].Contents)
assert.Equal(t, tc.wantInverse, fs.Files[inversePath].Contents)
}
assert.Equal(t, tc.wantUnexport, fs.Files[unexportPath].Contents)
assert.Equal(t, tc.wantWrites, numCallsWrite)
})
}
}
func TestDigitalPinSysfs(t *testing.T) {
mockPaths := []string{
"/sys/class/gpio/export",
"/sys/class/gpio/unexport",
"/sys/class/gpio/gpio10/value",
"/sys/class/gpio/gpio10/direction",
}
pin, fs := initTestDigitalPinSysFsWithMockedFilesystem(mockPaths)
pin, fs := initTestDigitalPinSysfsWithMockedFilesystem(mockPaths)
assert.Equal(t, "10", pin.pin)
assert.Equal(t, "gpio10", pin.label)
@ -39,10 +354,7 @@ func TestDigitalPin(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "10", fs.Files["/sys/class/gpio/unexport"].Contents)
err = pin.Export()
assert.NoError(t, err)
assert.Equal(t, "10", fs.Files["/sys/class/gpio/export"].Contents)
assert.NotNil(t, pin.valFile)
require.NoError(t, pin.Export())
err = pin.Write(1)
assert.NoError(t, err)
@ -63,63 +375,29 @@ func TestDigitalPin(t *testing.T) {
assert.ErrorContains(t, err, "pin has not been exported")
assert.Equal(t, 0, data)
writeFile = func(File, []byte) (int, error) {
return 0, &os.PathError{Err: Syscall_EINVAL}
writeFile = func(File, []byte) error {
return &os.PathError{Err: Syscall_EINVAL}
}
err = pin.Unexport()
assert.NoError(t, err)
writeFile = func(File, []byte) (int, error) {
return 0, &os.PathError{Err: errors.New("write error")}
writeFile = func(File, []byte) error {
return &os.PathError{Err: errors.New("write error")}
}
err = pin.Unexport()
assert.ErrorContains(t, err.(*os.PathError).Err, "write error")
// assert a busy error is dropped (just means "already exported")
cnt := 0
writeFile = func(File, []byte) (int, error) {
cnt++
if cnt == 1 {
return 0, &os.PathError{Err: Syscall_EBUSY}
}
return 0, nil
}
err = pin.Export()
assert.NoError(t, err)
// assert write error on export
writeFile = func(File, []byte) (int, error) {
return 0, &os.PathError{Err: errors.New("write error")}
}
err = pin.Export()
assert.ErrorContains(t, err.(*os.PathError).Err, "write error")
}
func TestDigitalPinExportError(t *testing.T) {
mockPaths := []string{
"/sys/class/gpio/export",
"/sys/class/gpio/gpio11/direction",
}
pin, _ := initTestDigitalPinSysFsWithMockedFilesystem(mockPaths)
writeFile = func(File, []byte) (int, error) {
return 0, &os.PathError{Err: Syscall_EBUSY}
}
err := pin.Export()
assert.ErrorContains(t, err, " : /sys/class/gpio/gpio10/direction: no such file")
}
func TestDigitalPinUnexportError(t *testing.T) {
func TestDigitalPinUnexportErrorSysfs(t *testing.T) {
mockPaths := []string{
"/sys/class/gpio/unexport",
}
pin, _ := initTestDigitalPinSysFsWithMockedFilesystem(mockPaths)
pin, _ := initTestDigitalPinSysfsWithMockedFilesystem(mockPaths)
writeFile = func(File, []byte) (int, error) {
return 0, &os.PathError{Err: Syscall_EBUSY}
writeFile = func(File, []byte) error {
return &os.PathError{Err: Syscall_EBUSY}
}
err := pin.Unexport()