1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-01 13:48:56 +08:00
Dejan Mijic 481b5b230d Validate incoming requests
All validation has been moved to the API resources layer, i.e. the
domain objects provide validation method, but the service itself assumes
no corrupted values are passed to it.

Signed-off-by: Dejan Mijic <dejan@mainflux.com>
2017-10-01 01:12:06 +02:00

164 lines
3.8 KiB
Go

// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"errors"
"io"
"sync"
)
// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
// underlying buffer is an interface. (io.Pipe is always unbuffered)
type pipe struct {
mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer // nil when done reading
err error // read error once empty. non-nil means closed.
breakErr error // immediate read error (caller doesn't see rest of b)
donec chan struct{} // closed on error
readFn func() // optional code to run in Read before error
}
type pipeBuffer interface {
Len() int
io.Writer
io.Reader
}
func (p *pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
if p.b == nil {
return 0
}
return p.b.Len()
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.c.L == nil {
p.c.L = &p.mu
}
for {
if p.breakErr != nil {
return 0, p.breakErr
}
if p.b != nil && p.b.Len() > 0 {
return p.b.Read(d)
}
if p.err != nil {
if p.readFn != nil {
p.readFn() // e.g. copy trailers
p.readFn = nil // not sticky like p.err
}
p.b = nil
return 0, p.err
}
p.c.Wait()
}
}
var errClosedPipeWrite = errors.New("write on closed buffer")
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
func (p *pipe) Write(d []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.c.L == nil {
p.c.L = &p.mu
}
defer p.c.Signal()
if p.err != nil {
return 0, errClosedPipeWrite
}
if p.breakErr != nil {
return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
}
// CloseWithError causes the next Read (waking up a current blocked
// Read if needed) to return the provided err after all data has been
// read.
//
// The error must be non-nil.
func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
// BreakWithError causes the next Read (waking up a current blocked
// Read if needed) to return the provided err immediately, without
// waiting for unread data.
func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
// closeWithErrorAndCode is like CloseWithError but also sets some code to run
// in the caller's goroutine before returning the error.
func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
func (p *pipe) closeWithError(dst *error, err error, fn func()) {
if err == nil {
panic("err must be non-nil")
}
p.mu.Lock()
defer p.mu.Unlock()
if p.c.L == nil {
p.c.L = &p.mu
}
defer p.c.Signal()
if *dst != nil {
// Already been done.
return
}
p.readFn = fn
if dst == &p.breakErr {
p.b = nil
}
*dst = err
p.closeDoneLocked()
}
// requires p.mu be held.
func (p *pipe) closeDoneLocked() {
if p.donec == nil {
return
}
// Close if unclosed. This isn't racy since we always
// hold p.mu while closing.
select {
case <-p.donec:
default:
close(p.donec)
}
}
// Err returns the error (if any) first set by BreakWithError or CloseWithError.
func (p *pipe) Err() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.breakErr != nil {
return p.breakErr
}
return p.err
}
// Done returns a channel which is closed if and when this pipe is closed
// with CloseWithError.
func (p *pipe) Done() <-chan struct{} {
p.mu.Lock()
defer p.mu.Unlock()
if p.donec == nil {
p.donec = make(chan struct{})
if p.err != nil || p.breakErr != nil {
// Already hit an error.
p.closeDoneLocked()
}
}
return p.donec
}