mirror of
https://github.com/mainflux/mainflux.git
synced 2025-05-02 22:17:10 +08:00

* Update dependencies Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix compose files and configs Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Upgrade image versions Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Update Postgres version Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Update test dependencies Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix fkey error handling Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
2346 lines
59 KiB
Go
2346 lines
59 KiB
Go
// Copyright 2020-2021 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package nats
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
// Request API subjects for JetStream.
|
|
const (
|
|
// defaultAPIPrefix is the default prefix for the JetStream API.
|
|
defaultAPIPrefix = "$JS.API."
|
|
|
|
// apiAccountInfo is for obtaining general information about JetStream.
|
|
apiAccountInfo = "INFO"
|
|
|
|
// apiConsumerCreateT is used to create consumers.
|
|
apiConsumerCreateT = "CONSUMER.CREATE.%s"
|
|
|
|
// apiDurableCreateT is used to create durable consumers.
|
|
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
|
|
|
|
// apiConsumerInfoT is used to create consumers.
|
|
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
|
|
|
|
// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
|
|
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
|
|
|
|
// apiDeleteConsumerT is used to delete consumers.
|
|
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
|
|
|
|
// apiConsumerListT is used to return all detailed consumer information
|
|
apiConsumerListT = "CONSUMER.LIST.%s"
|
|
|
|
// apiConsumerNamesT is used to return a list with all consumer names for the stream.
|
|
apiConsumerNamesT = "CONSUMER.NAMES.%s"
|
|
|
|
// apiStreams can lookup a stream by subject.
|
|
apiStreams = "STREAM.NAMES"
|
|
|
|
// apiStreamCreateT is the endpoint to create new streams.
|
|
apiStreamCreateT = "STREAM.CREATE.%s"
|
|
|
|
// apiStreamInfoT is the endpoint to get information on a stream.
|
|
apiStreamInfoT = "STREAM.INFO.%s"
|
|
|
|
// apiStreamUpdate is the endpoint to update existing streams.
|
|
apiStreamUpdateT = "STREAM.UPDATE.%s"
|
|
|
|
// apiStreamDeleteT is the endpoint to delete streams.
|
|
apiStreamDeleteT = "STREAM.DELETE.%s"
|
|
|
|
// apiPurgeStreamT is the endpoint to purge streams.
|
|
apiStreamPurgeT = "STREAM.PURGE.%s"
|
|
|
|
// apiStreamListT is the endpoint that will return all detailed stream information
|
|
apiStreamList = "STREAM.LIST"
|
|
|
|
// apiMsgGetT is the endpoint to get a message.
|
|
apiMsgGetT = "STREAM.MSG.GET.%s"
|
|
|
|
// apiMsgDeleteT is the endpoint to remove a message.
|
|
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
|
|
)
|
|
|
|
// JetStream allows persistent messaging through JetStream.
|
|
type JetStream interface {
|
|
// Publish publishes a message to JetStream.
|
|
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
|
|
|
|
// PublishMsg publishes a Msg to JetStream.
|
|
PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
|
|
|
|
// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
|
|
// The data should not be changed until the PubAckFuture has been processed.
|
|
PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
|
|
|
|
// PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
|
|
// The message should not be changed until the PubAckFuture has been processed.
|
|
PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
|
|
|
|
// PublishAsyncPending returns the number of async publishes outstanding for this context.
|
|
PublishAsyncPending() int
|
|
|
|
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
|
|
PublishAsyncComplete() <-chan struct{}
|
|
|
|
// Subscribe creates an async Subscription for JetStream.
|
|
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
|
|
|
|
// SubscribeSync creates a Subscription that can be used to process messages synchronously.
|
|
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
|
|
|
|
// ChanSubscribe creates channel based Subscription.
|
|
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
|
|
|
|
// QueueSubscribe creates a Subscription with a queue group.
|
|
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
|
|
|
|
// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
|
|
QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
|
|
|
|
// PullSubscribe creates a Subscription that can fetch messages.
|
|
PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
|
|
}
|
|
|
|
// JetStreamContext allows JetStream messaging and stream management.
|
|
type JetStreamContext interface {
|
|
JetStream
|
|
JetStreamManager
|
|
}
|
|
|
|
// js is an internal struct from a JetStreamContext.
|
|
type js struct {
|
|
nc *Conn
|
|
opts *jsOpts
|
|
|
|
// For async publish context.
|
|
mu sync.RWMutex
|
|
rpre string
|
|
rsub *Subscription
|
|
pafs map[string]*pubAckFuture
|
|
stc chan struct{}
|
|
dch chan struct{}
|
|
rr *rand.Rand
|
|
}
|
|
|
|
type jsOpts struct {
|
|
ctx context.Context
|
|
// For importing JetStream from other accounts.
|
|
pre string
|
|
// Amount of time to wait for API requests.
|
|
wait time.Duration
|
|
// For async publish error handling.
|
|
aecb MsgErrHandler
|
|
// Maximum in flight.
|
|
maxap int
|
|
}
|
|
|
|
const (
|
|
defaultRequestWait = 5 * time.Second
|
|
defaultAccountCheck = 20 * time.Second
|
|
)
|
|
|
|
// JetStream returns a JetStreamContext for messaging and stream management.
|
|
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
|
|
js := &js{
|
|
nc: nc,
|
|
opts: &jsOpts{
|
|
pre: defaultAPIPrefix,
|
|
wait: defaultRequestWait,
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
if err := opt.configureJSContext(js.opts); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// If we have check recently we can avoid another account lookup here.
|
|
// We want these to be lighweight and created at will.
|
|
nc.mu.Lock()
|
|
now := time.Now()
|
|
checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
|
|
if checkAccount {
|
|
nc.jsLastCheck = now
|
|
}
|
|
nc.mu.Unlock()
|
|
|
|
if checkAccount {
|
|
if _, err := js.AccountInfo(); err != nil {
|
|
if err == ErrNoResponders {
|
|
err = ErrJetStreamNotEnabled
|
|
}
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return js, nil
|
|
}
|
|
|
|
// JSOpt configures a JetStreamContext.
|
|
type JSOpt interface {
|
|
configureJSContext(opts *jsOpts) error
|
|
}
|
|
|
|
// jsOptFn configures an option for the JetStreamContext.
|
|
type jsOptFn func(opts *jsOpts) error
|
|
|
|
func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
|
|
return opt(opts)
|
|
}
|
|
|
|
// APIPrefix changes the default prefix used for the JetStream API.
|
|
func APIPrefix(pre string) JSOpt {
|
|
return jsOptFn(func(js *jsOpts) error {
|
|
js.pre = pre
|
|
if !strings.HasSuffix(js.pre, ".") {
|
|
js.pre = js.pre + "."
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (js *js) apiSubj(subj string) string {
|
|
if js.opts.pre == _EMPTY_ {
|
|
return subj
|
|
}
|
|
var b strings.Builder
|
|
b.WriteString(js.opts.pre)
|
|
b.WriteString(subj)
|
|
return b.String()
|
|
}
|
|
|
|
// PubOpt configures options for publishing JetStream messages.
|
|
type PubOpt interface {
|
|
configurePublish(opts *pubOpts) error
|
|
}
|
|
|
|
// pubOptFn is a function option used to configure JetStream Publish.
|
|
type pubOptFn func(opts *pubOpts) error
|
|
|
|
func (opt pubOptFn) configurePublish(opts *pubOpts) error {
|
|
return opt(opts)
|
|
}
|
|
|
|
type pubOpts struct {
|
|
ctx context.Context
|
|
ttl time.Duration
|
|
id string
|
|
lid string // Expected last msgId
|
|
str string // Expected stream name
|
|
seq uint64 // Expected last sequence
|
|
}
|
|
|
|
// pubAckResponse is the ack response from the JetStream API when publishing a message.
|
|
type pubAckResponse struct {
|
|
apiResponse
|
|
*PubAck
|
|
}
|
|
|
|
// PubAck is an ack received after successfully publishing a message.
|
|
type PubAck struct {
|
|
Stream string `json:"stream"`
|
|
Sequence uint64 `json:"seq"`
|
|
Duplicate bool `json:"duplicate,omitempty"`
|
|
}
|
|
|
|
// Headers for published messages.
|
|
const (
|
|
MsgIdHdr = "Nats-Msg-Id"
|
|
ExpectedStreamHdr = "Nats-Expected-Stream"
|
|
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
|
|
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
|
|
)
|
|
|
|
// PublishMsg publishes a Msg to a stream from JetStream.
|
|
func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
|
|
var o pubOpts
|
|
if len(opts) > 0 {
|
|
if m.Header == nil {
|
|
m.Header = Header{}
|
|
}
|
|
for _, opt := range opts {
|
|
if err := opt.configurePublish(&o); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
// Check for option collisions. Right now just timeout and context.
|
|
if o.ctx != nil && o.ttl != 0 {
|
|
return nil, ErrContextAndTimeout
|
|
}
|
|
if o.ttl == 0 && o.ctx == nil {
|
|
o.ttl = js.opts.wait
|
|
}
|
|
|
|
if o.id != _EMPTY_ {
|
|
m.Header.Set(MsgIdHdr, o.id)
|
|
}
|
|
if o.lid != _EMPTY_ {
|
|
m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
|
|
}
|
|
if o.str != _EMPTY_ {
|
|
m.Header.Set(ExpectedStreamHdr, o.str)
|
|
}
|
|
if o.seq > 0 {
|
|
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
|
|
}
|
|
|
|
var resp *Msg
|
|
var err error
|
|
|
|
if o.ttl > 0 {
|
|
resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
|
|
} else {
|
|
resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
|
|
}
|
|
|
|
if err != nil {
|
|
if err == ErrNoResponders {
|
|
err = ErrNoStreamResponse
|
|
}
|
|
return nil, err
|
|
}
|
|
var pa pubAckResponse
|
|
if err := json.Unmarshal(resp.Data, &pa); err != nil {
|
|
return nil, ErrInvalidJSAck
|
|
}
|
|
if pa.Error != nil {
|
|
return nil, fmt.Errorf("nats: %s", pa.Error.Description)
|
|
}
|
|
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
|
|
return nil, ErrInvalidJSAck
|
|
}
|
|
return pa.PubAck, nil
|
|
}
|
|
|
|
// Publish publishes a message to a stream from JetStream.
|
|
func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
|
|
return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
|
|
}
|
|
|
|
// PubAckFuture is a future for a PubAck.
|
|
type PubAckFuture interface {
|
|
// Ok returns a receive only channel that can be used to get a PubAck.
|
|
Ok() <-chan *PubAck
|
|
|
|
// Err returns a receive only channel that can be used to get the error from an async publish.
|
|
Err() <-chan error
|
|
|
|
// Msg returns the message that was sent to the server.
|
|
Msg() *Msg
|
|
}
|
|
|
|
type pubAckFuture struct {
|
|
js *js
|
|
msg *Msg
|
|
pa *PubAck
|
|
st time.Time
|
|
err error
|
|
errCh chan error
|
|
doneCh chan *PubAck
|
|
}
|
|
|
|
func (paf *pubAckFuture) Ok() <-chan *PubAck {
|
|
paf.js.mu.Lock()
|
|
defer paf.js.mu.Unlock()
|
|
|
|
if paf.doneCh == nil {
|
|
paf.doneCh = make(chan *PubAck, 1)
|
|
if paf.pa != nil {
|
|
paf.doneCh <- paf.pa
|
|
}
|
|
}
|
|
|
|
return paf.doneCh
|
|
}
|
|
|
|
func (paf *pubAckFuture) Err() <-chan error {
|
|
paf.js.mu.Lock()
|
|
defer paf.js.mu.Unlock()
|
|
|
|
if paf.errCh == nil {
|
|
paf.errCh = make(chan error, 1)
|
|
if paf.err != nil {
|
|
paf.errCh <- paf.err
|
|
}
|
|
}
|
|
|
|
return paf.errCh
|
|
}
|
|
|
|
func (paf *pubAckFuture) Msg() *Msg {
|
|
paf.js.mu.RLock()
|
|
defer paf.js.mu.RUnlock()
|
|
return paf.msg
|
|
}
|
|
|
|
// pullSubscribe creates the wildcard subscription used per pull subscriber
|
|
// to make fetch requests.
|
|
func (js *js) pullSubscribe(subj string) (*Subscription, error) {
|
|
jsi := &jsSub{js: js, pull: true}
|
|
|
|
// Similar to async request handler we create a wildcard subscription for making requests,
|
|
// though we do not use the token based approach since we cannot match the response to
|
|
// the requestor due to JS subject being remapped on delivery. Instead, we just use an array
|
|
// of channels similar to how ping/pong interval is handled and send the message to the first
|
|
// available requestor via a channel.
|
|
jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
jsi.rpre = fmt.Sprintf("%s.", NewInbox())
|
|
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jsi.psub = sub
|
|
|
|
return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil
|
|
}
|
|
|
|
// For quick token lookup etc.
|
|
const aReplyPreLen = 14
|
|
const aReplyTokensize = 6
|
|
|
|
func (js *js) newAsyncReply() string {
|
|
js.mu.Lock()
|
|
if js.rsub == nil {
|
|
// Create our wildcard reply subject.
|
|
sha := sha256.New()
|
|
sha.Write([]byte(nuid.Next()))
|
|
b := sha.Sum(nil)
|
|
for i := 0; i < aReplyTokensize; i++ {
|
|
b[i] = rdigits[int(b[i]%base)]
|
|
}
|
|
js.rpre = fmt.Sprintf("%s%s.", InboxPrefix, b[:aReplyTokensize])
|
|
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
|
|
if err != nil {
|
|
js.mu.Unlock()
|
|
return _EMPTY_
|
|
}
|
|
js.rsub = sub
|
|
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
}
|
|
var sb strings.Builder
|
|
sb.WriteString(js.rpre)
|
|
rn := js.rr.Int63()
|
|
var b [aReplyTokensize]byte
|
|
for i, l := 0, rn; i < len(b); i++ {
|
|
b[i] = rdigits[l%base]
|
|
l /= base
|
|
}
|
|
sb.Write(b[:])
|
|
js.mu.Unlock()
|
|
return sb.String()
|
|
}
|
|
|
|
// registerPAF will register for a PubAckFuture.
|
|
func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
|
|
js.mu.Lock()
|
|
if js.pafs == nil {
|
|
js.pafs = make(map[string]*pubAckFuture)
|
|
}
|
|
paf.js = js
|
|
js.pafs[id] = paf
|
|
np := len(js.pafs)
|
|
maxap := js.opts.maxap
|
|
js.mu.Unlock()
|
|
return np, maxap
|
|
}
|
|
|
|
// Lock should be held.
|
|
func (js *js) getPAF(id string) *pubAckFuture {
|
|
if js.pafs == nil {
|
|
return nil
|
|
}
|
|
return js.pafs[id]
|
|
}
|
|
|
|
// clearPAF will remove a PubAckFuture that was registered.
|
|
func (js *js) clearPAF(id string) {
|
|
js.mu.Lock()
|
|
delete(js.pafs, id)
|
|
js.mu.Unlock()
|
|
}
|
|
|
|
// PublishAsyncPending returns how many PubAckFutures are pending.
|
|
func (js *js) PublishAsyncPending() int {
|
|
js.mu.RLock()
|
|
defer js.mu.RUnlock()
|
|
return len(js.pafs)
|
|
}
|
|
|
|
func (js *js) asyncStall() <-chan struct{} {
|
|
js.mu.Lock()
|
|
if js.stc == nil {
|
|
js.stc = make(chan struct{})
|
|
}
|
|
stc := js.stc
|
|
js.mu.Unlock()
|
|
return stc
|
|
}
|
|
|
|
// Handle an async reply from PublishAsync.
|
|
func (js *js) handleAsyncReply(m *Msg) {
|
|
if len(m.Subject) <= aReplyPreLen {
|
|
return
|
|
}
|
|
id := m.Subject[aReplyPreLen:]
|
|
|
|
js.mu.Lock()
|
|
paf := js.getPAF(id)
|
|
if paf == nil {
|
|
js.mu.Unlock()
|
|
return
|
|
}
|
|
// Remove
|
|
delete(js.pafs, id)
|
|
|
|
// Check on anyone stalled and waiting.
|
|
if js.stc != nil && len(js.pafs) < js.opts.maxap {
|
|
close(js.stc)
|
|
js.stc = nil
|
|
}
|
|
// Check on anyone one waiting on done status.
|
|
if js.dch != nil && len(js.pafs) == 0 {
|
|
dch := js.dch
|
|
js.dch = nil
|
|
// Defer here so error is processed and can be checked.
|
|
defer close(dch)
|
|
}
|
|
|
|
doErr := func(err error) {
|
|
paf.err = err
|
|
if paf.errCh != nil {
|
|
paf.errCh <- paf.err
|
|
}
|
|
cb := js.opts.aecb
|
|
js.mu.Unlock()
|
|
if cb != nil {
|
|
cb(paf.js, paf.msg, err)
|
|
}
|
|
}
|
|
|
|
// Process no responders etc.
|
|
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
|
|
doErr(ErrNoResponders)
|
|
return
|
|
}
|
|
|
|
var pa pubAckResponse
|
|
if err := json.Unmarshal(m.Data, &pa); err != nil {
|
|
doErr(ErrInvalidJSAck)
|
|
return
|
|
}
|
|
if pa.Error != nil {
|
|
doErr(fmt.Errorf("nats: %s", pa.Error.Description))
|
|
return
|
|
}
|
|
if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
|
|
doErr(ErrInvalidJSAck)
|
|
return
|
|
}
|
|
|
|
// So here we have received a proper puback.
|
|
paf.pa = pa.PubAck
|
|
if paf.doneCh != nil {
|
|
paf.doneCh <- paf.pa
|
|
}
|
|
js.mu.Unlock()
|
|
}
|
|
|
|
// MsgErrHandler is used to process asynchronous errors from
|
|
// JetStream PublishAsync and PublishAsynMsg. It will return the original
|
|
// message sent to the server for possible retransmitting and the error encountered.
|
|
type MsgErrHandler func(JetStream, *Msg, error)
|
|
|
|
// PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
|
|
func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
|
|
return jsOptFn(func(js *jsOpts) error {
|
|
js.aecb = cb
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
|
|
func PublishAsyncMaxPending(max int) JSOpt {
|
|
return jsOptFn(func(js *jsOpts) error {
|
|
if max < 1 {
|
|
return errors.New("nats: max ack pending should be >= 1")
|
|
}
|
|
js.maxap = max
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// PublishAsync publishes a message to JetStream and returns a PubAckFuture
|
|
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
|
|
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
|
|
}
|
|
|
|
func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
|
|
var o pubOpts
|
|
if len(opts) > 0 {
|
|
if m.Header == nil {
|
|
m.Header = Header{}
|
|
}
|
|
for _, opt := range opts {
|
|
if err := opt.configurePublish(&o); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Timeouts and contexts do not make sense for these.
|
|
if o.ttl != 0 || o.ctx != nil {
|
|
return nil, ErrContextAndTimeout
|
|
}
|
|
|
|
// FIXME(dlc) - Make common.
|
|
if o.id != _EMPTY_ {
|
|
m.Header.Set(MsgIdHdr, o.id)
|
|
}
|
|
if o.lid != _EMPTY_ {
|
|
m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
|
|
}
|
|
if o.str != _EMPTY_ {
|
|
m.Header.Set(ExpectedStreamHdr, o.str)
|
|
}
|
|
if o.seq > 0 {
|
|
m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
|
|
}
|
|
|
|
// Reply
|
|
if m.Reply != _EMPTY_ {
|
|
return nil, errors.New("nats: reply subject should be empty")
|
|
}
|
|
m.Reply = js.newAsyncReply()
|
|
if m.Reply == _EMPTY_ {
|
|
return nil, errors.New("nats: error creating async reply handler")
|
|
}
|
|
id := m.Reply[aReplyPreLen:]
|
|
paf := &pubAckFuture{msg: m, st: time.Now()}
|
|
numPending, maxPending := js.registerPAF(id, paf)
|
|
|
|
if maxPending > 0 && numPending >= maxPending {
|
|
select {
|
|
case <-js.asyncStall():
|
|
case <-time.After(200 * time.Millisecond):
|
|
js.clearPAF(id)
|
|
return nil, errors.New("nats: stalled with too many outstanding async published messages")
|
|
}
|
|
}
|
|
|
|
if err := js.nc.PublishMsg(m); err != nil {
|
|
js.clearPAF(id)
|
|
return nil, err
|
|
}
|
|
|
|
return paf, nil
|
|
}
|
|
|
|
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
|
|
func (js *js) PublishAsyncComplete() <-chan struct{} {
|
|
js.mu.Lock()
|
|
defer js.mu.Unlock()
|
|
if js.dch == nil {
|
|
js.dch = make(chan struct{})
|
|
}
|
|
dch := js.dch
|
|
if len(js.pafs) == 0 {
|
|
close(js.dch)
|
|
js.dch = nil
|
|
}
|
|
return dch
|
|
}
|
|
|
|
// MsgId sets the message ID used for de-duplication.
|
|
func MsgId(id string) PubOpt {
|
|
return pubOptFn(func(opts *pubOpts) error {
|
|
opts.id = id
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ExpectStream sets the expected stream to respond from the publish.
|
|
func ExpectStream(stream string) PubOpt {
|
|
return pubOptFn(func(opts *pubOpts) error {
|
|
opts.str = stream
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ExpectLastSequence sets the expected sequence in the response from the publish.
|
|
func ExpectLastSequence(seq uint64) PubOpt {
|
|
return pubOptFn(func(opts *pubOpts) error {
|
|
opts.seq = seq
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ExpectLastSequence sets the expected sequence in the response from the publish.
|
|
func ExpectLastMsgId(id string) PubOpt {
|
|
return pubOptFn(func(opts *pubOpts) error {
|
|
opts.lid = id
|
|
return nil
|
|
})
|
|
}
|
|
|
|
type ackOpts struct {
|
|
ttl time.Duration
|
|
ctx context.Context
|
|
}
|
|
|
|
// AckOpt are the options that can be passed when acknowledge a message.
|
|
type AckOpt interface {
|
|
configureAck(opts *ackOpts) error
|
|
}
|
|
|
|
// MaxWait sets the maximum amount of time we will wait for a response.
|
|
type MaxWait time.Duration
|
|
|
|
func (ttl MaxWait) configureJSContext(js *jsOpts) error {
|
|
js.wait = time.Duration(ttl)
|
|
return nil
|
|
}
|
|
|
|
func (ttl MaxWait) configurePull(opts *pullOpts) error {
|
|
opts.ttl = time.Duration(ttl)
|
|
return nil
|
|
}
|
|
|
|
// AckWait sets the maximum amount of time we will wait for an ack.
|
|
type AckWait time.Duration
|
|
|
|
func (ttl AckWait) configurePublish(opts *pubOpts) error {
|
|
opts.ttl = time.Duration(ttl)
|
|
return nil
|
|
}
|
|
|
|
func (ttl AckWait) configureSubscribe(opts *subOpts) error {
|
|
opts.cfg.AckWait = time.Duration(ttl)
|
|
return nil
|
|
}
|
|
|
|
func (ttl AckWait) configureAck(opts *ackOpts) error {
|
|
opts.ttl = time.Duration(ttl)
|
|
return nil
|
|
}
|
|
|
|
// ContextOpt is an option used to set a context.Context.
|
|
type ContextOpt struct {
|
|
context.Context
|
|
}
|
|
|
|
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
|
|
opts.ctx = ctx
|
|
return nil
|
|
}
|
|
|
|
func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
|
|
opts.ctx = ctx
|
|
return nil
|
|
}
|
|
|
|
func (ctx ContextOpt) configurePull(opts *pullOpts) error {
|
|
opts.ctx = ctx
|
|
return nil
|
|
}
|
|
|
|
func (ctx ContextOpt) configureAck(opts *ackOpts) error {
|
|
opts.ctx = ctx
|
|
return nil
|
|
}
|
|
|
|
// Context returns an option that can be used to configure a context for APIs
|
|
// that are context aware such as those part of the JetStream interface.
|
|
func Context(ctx context.Context) ContextOpt {
|
|
return ContextOpt{ctx}
|
|
}
|
|
|
|
// Subscribe
|
|
|
|
// ConsumerConfig is the configuration of a JetStream consumer.
|
|
type ConsumerConfig struct {
|
|
Durable string `json:"durable_name,omitempty"`
|
|
DeliverSubject string `json:"deliver_subject,omitempty"`
|
|
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
|
|
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
|
|
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
|
|
AckPolicy AckPolicy `json:"ack_policy"`
|
|
AckWait time.Duration `json:"ack_wait,omitempty"`
|
|
MaxDeliver int `json:"max_deliver,omitempty"`
|
|
FilterSubject string `json:"filter_subject,omitempty"`
|
|
ReplayPolicy ReplayPolicy `json:"replay_policy"`
|
|
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
|
|
SampleFrequency string `json:"sample_freq,omitempty"`
|
|
MaxWaiting int `json:"max_waiting,omitempty"`
|
|
MaxAckPending int `json:"max_ack_pending,omitempty"`
|
|
FlowControl bool `json:"flow_control,omitempty"`
|
|
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
|
|
}
|
|
|
|
// ConsumerInfo is the info from a JetStream consumer.
|
|
type ConsumerInfo struct {
|
|
Stream string `json:"stream_name"`
|
|
Name string `json:"name"`
|
|
Created time.Time `json:"created"`
|
|
Config ConsumerConfig `json:"config"`
|
|
Delivered SequencePair `json:"delivered"`
|
|
AckFloor SequencePair `json:"ack_floor"`
|
|
NumAckPending int `json:"num_ack_pending"`
|
|
NumRedelivered int `json:"num_redelivered"`
|
|
NumWaiting int `json:"num_waiting"`
|
|
NumPending uint64 `json:"num_pending"`
|
|
Cluster *ClusterInfo `json:"cluster,omitempty"`
|
|
}
|
|
|
|
// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
|
|
type SequencePair struct {
|
|
Consumer uint64 `json:"consumer_seq"`
|
|
Stream uint64 `json:"stream_seq"`
|
|
}
|
|
|
|
// nextRequest is for getting next messages for pull based consumers from JetStream.
|
|
type nextRequest struct {
|
|
Expires time.Duration `json:"expires,omitempty"`
|
|
Batch int `json:"batch,omitempty"`
|
|
NoWait bool `json:"no_wait,omitempty"`
|
|
}
|
|
|
|
// jsSub includes JetStream subscription info.
|
|
type jsSub struct {
|
|
js *js
|
|
|
|
// To setup request mux handler for pull subscribers.
|
|
mu sync.RWMutex
|
|
psub *Subscription
|
|
rpre string
|
|
rr *rand.Rand
|
|
freqs []chan *Msg
|
|
|
|
consumer string
|
|
stream string
|
|
deliver string
|
|
pull bool
|
|
durable bool
|
|
attached bool
|
|
|
|
// Heartbeats and Flow Control handling from push consumers.
|
|
hbs bool
|
|
fc bool
|
|
cmeta string
|
|
fcs map[uint64]string
|
|
}
|
|
|
|
// newFetchReply generates a unique inbox used for a fetch request.
|
|
func (jsi *jsSub) newFetchReply() string {
|
|
jsi.mu.Lock()
|
|
rpre := jsi.rpre
|
|
rn := jsi.rr.Int63()
|
|
jsi.mu.Unlock()
|
|
var sb strings.Builder
|
|
sb.WriteString(rpre)
|
|
var b [aReplyTokensize]byte
|
|
for i, l := 0, rn; i < len(b); i++ {
|
|
b[i] = rdigits[l%base]
|
|
l /= base
|
|
}
|
|
sb.Write(b[:])
|
|
return sb.String()
|
|
}
|
|
|
|
// handleFetch is delivered a message requested by pull subscribers
|
|
// when calling Fetch.
|
|
func (jsi *jsSub) handleFetch(m *Msg) {
|
|
jsi.mu.Lock()
|
|
if len(jsi.freqs) == 0 {
|
|
nc := jsi.js.nc
|
|
sub := jsi.psub
|
|
nc.mu.Lock()
|
|
errCB := nc.Opts.AsyncErrorCB
|
|
err := fmt.Errorf("nats: fetch response delivered but requestor has gone away")
|
|
if errCB != nil {
|
|
nc.ach.push(func() { errCB(nc, sub, err) })
|
|
}
|
|
nc.mu.Unlock()
|
|
jsi.mu.Unlock()
|
|
return
|
|
}
|
|
mch := jsi.freqs[0]
|
|
if len(jsi.freqs) > 1 {
|
|
jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...)
|
|
} else {
|
|
jsi.freqs = jsi.freqs[:0]
|
|
}
|
|
jsi.mu.Unlock()
|
|
mch <- m
|
|
}
|
|
|
|
// fetchNoWait makes a request to get a single message using no wait.
|
|
func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) {
|
|
nc := jsi.js.nc
|
|
m := NewMsg(subj)
|
|
m.Reply = jsi.newFetchReply()
|
|
m.Data = payload
|
|
|
|
mch := make(chan *Msg, 1)
|
|
jsi.mu.Lock()
|
|
jsi.freqs = append(jsi.freqs, mch)
|
|
jsi.mu.Unlock()
|
|
if err := nc.PublishMsg(m); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var ok bool
|
|
var msg *Msg
|
|
|
|
select {
|
|
case msg, ok = <-mch:
|
|
if !ok {
|
|
return nil, ErrConnectionClosed
|
|
}
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
func (jsi *jsSub) unsubscribe(drainMode bool) error {
|
|
if drainMode && (jsi.durable || jsi.attached) {
|
|
// Skip deleting consumer for durables/attached
|
|
// consumers when using drain mode.
|
|
return nil
|
|
}
|
|
// Clear the extra async pull subscription used for fetch requests.
|
|
if jsi.psub != nil {
|
|
jsi.psub.Drain()
|
|
}
|
|
|
|
js := jsi.js
|
|
return js.DeleteConsumer(jsi.stream, jsi.consumer)
|
|
}
|
|
|
|
// SubOpt configures options for subscribing to JetStream consumers.
|
|
type SubOpt interface {
|
|
configureSubscribe(opts *subOpts) error
|
|
}
|
|
|
|
// subOptFn is a function option used to configure a JetStream Subscribe.
|
|
type subOptFn func(opts *subOpts) error
|
|
|
|
func (opt subOptFn) configureSubscribe(opts *subOpts) error {
|
|
return opt(opts)
|
|
}
|
|
|
|
// Subscribe will create a subscription to the appropriate stream and consumer.
|
|
func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
|
|
if cb == nil {
|
|
return nil, ErrBadSubscription
|
|
}
|
|
return js.subscribe(subj, _EMPTY_, cb, nil, false, opts)
|
|
}
|
|
|
|
// SubscribeSync will create a sync subscription to the appropriate stream and consumer.
|
|
func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
|
|
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
|
|
return js.subscribe(subj, _EMPTY_, nil, mch, true, opts)
|
|
}
|
|
|
|
// QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics.
|
|
func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
|
|
if cb == nil {
|
|
return nil, ErrBadSubscription
|
|
}
|
|
return js.subscribe(subj, queue, cb, nil, false, opts)
|
|
}
|
|
|
|
// QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics.
|
|
func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
|
|
mch := make(chan *Msg, js.nc.Opts.SubChanLen)
|
|
return js.subscribe(subj, queue, nil, mch, true, opts)
|
|
}
|
|
|
|
// Subscribe will create a subscription to the appropriate stream and consumer.
|
|
func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
|
|
return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
|
|
}
|
|
|
|
// PullSubscribe creates a pull subscriber.
|
|
func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
|
|
return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
|
|
}
|
|
|
|
func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) {
|
|
cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
|
|
o := subOpts{cfg: &cfg}
|
|
if len(opts) > 0 {
|
|
for _, opt := range opts {
|
|
if err := opt.configureSubscribe(&o); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
isPullMode := ch == nil && cb == nil
|
|
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
|
|
hasHeartbeats := o.cfg.Heartbeat > 0
|
|
hasFC := o.cfg.FlowControl
|
|
if isPullMode && badPullAck {
|
|
return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
|
|
}
|
|
|
|
var (
|
|
err error
|
|
shouldCreate bool
|
|
ccfg *ConsumerConfig
|
|
info *ConsumerInfo
|
|
deliver string
|
|
attached bool
|
|
stream = o.stream
|
|
consumer = o.consumer
|
|
isDurable = o.cfg.Durable != _EMPTY_
|
|
)
|
|
|
|
// Find the stream mapped to the subject if not bound to a stream already.
|
|
if o.stream == _EMPTY_ {
|
|
stream, err = js.lookupStreamBySubject(subj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
stream = o.stream
|
|
}
|
|
|
|
// With an explicit durable name, then can lookup
|
|
// the consumer to which it should be attaching to.
|
|
consumer = o.cfg.Durable
|
|
if consumer != _EMPTY_ {
|
|
// Only create in case there is no consumer already.
|
|
info, err = js.ConsumerInfo(stream, consumer)
|
|
if err != nil && err.Error() != "nats: consumer not found" {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if info != nil {
|
|
// Attach using the found consumer config.
|
|
ccfg = &info.Config
|
|
attached = true
|
|
|
|
// Make sure this new subject matches or is a subset.
|
|
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
|
|
return nil, ErrSubjectMismatch
|
|
}
|
|
|
|
if ccfg.DeliverSubject != _EMPTY_ {
|
|
deliver = ccfg.DeliverSubject
|
|
} else {
|
|
deliver = NewInbox()
|
|
}
|
|
} else {
|
|
shouldCreate = true
|
|
deliver = NewInbox()
|
|
if !isPullMode {
|
|
cfg.DeliverSubject = deliver
|
|
}
|
|
// Do filtering always, server will clear as needed.
|
|
cfg.FilterSubject = subj
|
|
}
|
|
|
|
var sub *Subscription
|
|
|
|
// Check if we are manual ack.
|
|
if cb != nil && !o.mack {
|
|
ocb := cb
|
|
cb = func(m *Msg) { ocb(m); m.Ack() }
|
|
}
|
|
|
|
if isPullMode {
|
|
sub, err = js.pullSubscribe(subj)
|
|
} else {
|
|
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// With flow control enabled async subscriptions we will disable msgs
|
|
// limits, and set a larger pending bytes limit by default.
|
|
if !isPullMode && cb != nil && hasFC {
|
|
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
|
|
}
|
|
|
|
// If we are creating or updating let's process that request.
|
|
if shouldCreate {
|
|
// If not set default to ack explicit.
|
|
if cfg.AckPolicy == ackPolicyNotSet {
|
|
cfg.AckPolicy = AckExplicitPolicy
|
|
}
|
|
// If we have acks at all and the MaxAckPending is not set go ahead
|
|
// and set to the internal max.
|
|
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
|
|
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
|
|
maxMsgs, _, _ := sub.PendingLimits()
|
|
cfg.MaxAckPending = maxMsgs
|
|
}
|
|
|
|
req := &createConsumerRequest{
|
|
Stream: stream,
|
|
Config: &cfg,
|
|
}
|
|
|
|
j, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var ccSubj string
|
|
if isDurable {
|
|
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
|
|
} else {
|
|
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
|
|
}
|
|
|
|
resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
|
|
if err != nil {
|
|
sub.Drain()
|
|
if err == ErrNoResponders {
|
|
err = ErrJetStreamNotEnabled
|
|
}
|
|
return nil, err
|
|
}
|
|
var cinfo consumerResponse
|
|
err = json.Unmarshal(resp.Data, &cinfo)
|
|
if err != nil {
|
|
sub.Drain()
|
|
return nil, err
|
|
}
|
|
info = cinfo.ConsumerInfo
|
|
if cinfo.Error != nil {
|
|
// Remove interest from previous subscribe since it
|
|
// may have an incorrect delivery subject.
|
|
sub.Drain()
|
|
|
|
// Multiple subscribers could compete in creating the first consumer
|
|
// that will be shared using the same durable name. If this happens, then
|
|
// do a lookup of the consumer info and resubscribe using the latest info.
|
|
if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
|
|
info, err = js.ConsumerInfo(stream, consumer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ccfg = &info.Config
|
|
|
|
// Validate that the original subject does still match.
|
|
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
|
|
return nil, ErrSubjectMismatch
|
|
}
|
|
|
|
// Use the deliver subject from latest consumer config to attach.
|
|
if ccfg.DeliverSubject != _EMPTY_ {
|
|
// We can't reuse the channel, so if one was passed, we need to create a new one.
|
|
if ch != nil {
|
|
ch = make(chan *Msg, cap(ch))
|
|
}
|
|
sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
|
|
&jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
attached = true
|
|
} else {
|
|
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
|
|
}
|
|
}
|
|
stream = info.Stream
|
|
consumer = info.Name
|
|
deliver = info.Config.DeliverSubject
|
|
}
|
|
sub.mu.Lock()
|
|
sub.jsi.stream = stream
|
|
sub.jsi.consumer = consumer
|
|
sub.jsi.durable = isDurable
|
|
sub.jsi.attached = attached
|
|
sub.jsi.deliver = deliver
|
|
sub.mu.Unlock()
|
|
|
|
return sub, nil
|
|
}
|
|
|
|
// ErrConsumerSequenceMismatch represents an error from a consumer
|
|
// that received a Heartbeat including sequence different to the
|
|
// one expected from the view of the client.
|
|
type ErrConsumerSequenceMismatch struct {
|
|
// StreamResumeSequence is the stream sequence from where the consumer
|
|
// should resume consuming from the stream.
|
|
StreamResumeSequence uint64
|
|
|
|
// ConsumerSequence is the sequence of the consumer that is behind.
|
|
ConsumerSequence uint64
|
|
|
|
// LastConsumerSequence is the sequence of the consumer when the heartbeat
|
|
// was received.
|
|
LastConsumerSequence uint64
|
|
}
|
|
|
|
func (ecs *ErrConsumerSequenceMismatch) Error() string {
|
|
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
|
|
ecs.ConsumerSequence,
|
|
ecs.LastConsumerSequence-ecs.ConsumerSequence,
|
|
ecs.StreamResumeSequence,
|
|
)
|
|
}
|
|
|
|
// isControlMessage will return true if this is an empty control status message.
|
|
func isControlMessage(msg *Msg) bool {
|
|
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
|
|
}
|
|
|
|
func (jsi *jsSub) trackSequences(reply string) {
|
|
jsi.mu.Lock()
|
|
jsi.cmeta = reply
|
|
jsi.mu.Unlock()
|
|
}
|
|
|
|
// checkForFlowControlResponse will check to see if we should send a flow control response
|
|
// based on the delivered index.
|
|
// Lock should be held.
|
|
func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
|
|
jsi, nc := sub.jsi, sub.conn
|
|
if jsi == nil {
|
|
return
|
|
}
|
|
|
|
jsi.mu.Lock()
|
|
defer jsi.mu.Unlock()
|
|
|
|
if len(jsi.fcs) == 0 {
|
|
return
|
|
}
|
|
|
|
if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
|
|
delete(jsi.fcs, delivered)
|
|
nc.Publish(reply, nil)
|
|
}
|
|
}
|
|
|
|
// Record an inbound flow control message.
|
|
func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
|
|
jsi.mu.Lock()
|
|
if jsi.fcs == nil {
|
|
jsi.fcs = make(map[uint64]string)
|
|
}
|
|
jsi.fcs[dfuture] = reply
|
|
jsi.mu.Unlock()
|
|
}
|
|
|
|
// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
|
|
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
|
|
nc.mu.Lock()
|
|
errCB := nc.Opts.AsyncErrorCB
|
|
if errCB != nil {
|
|
nc.ach.push(func() { errCB(nc, sub, err) })
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// processControlFlow will automatically respond to control messages sent by the server.
|
|
func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
|
|
// Process heartbeat received, get latest control metadata if present.
|
|
jsi.mu.RLock()
|
|
ctrl := jsi.cmeta
|
|
jsi.mu.RUnlock()
|
|
|
|
if ctrl == _EMPTY_ {
|
|
return
|
|
}
|
|
|
|
tokens, err := getMetadataFields(ctrl)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Consumer sequence.
|
|
var ldseq string
|
|
dseq := tokens[6]
|
|
hdr := msg.Header[lastConsumerSeqHdr]
|
|
if len(hdr) == 1 {
|
|
ldseq = hdr[0]
|
|
}
|
|
|
|
// Detect consumer sequence mismatch and whether
|
|
// should restart the consumer.
|
|
if ldseq != dseq {
|
|
// Dispatch async error including details such as
|
|
// from where the consumer could be restarted.
|
|
sseq := parseNum(tokens[5])
|
|
ecs := &ErrConsumerSequenceMismatch{
|
|
StreamResumeSequence: uint64(sseq),
|
|
ConsumerSequence: uint64(parseNum(dseq)),
|
|
LastConsumerSequence: uint64(parseNum(ldseq)),
|
|
}
|
|
nc.handleConsumerSequenceMismatch(s, ecs)
|
|
}
|
|
}
|
|
|
|
type streamRequest struct {
|
|
Subject string `json:"subject,omitempty"`
|
|
}
|
|
|
|
type streamNamesResponse struct {
|
|
apiResponse
|
|
apiPaged
|
|
Streams []string `json:"streams"`
|
|
}
|
|
|
|
func (js *js) lookupStreamBySubject(subj string) (string, error) {
|
|
var slr streamNamesResponse
|
|
req := &streamRequest{subj}
|
|
j, err := json.Marshal(req)
|
|
if err != nil {
|
|
return _EMPTY_, err
|
|
}
|
|
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
|
|
if err != nil {
|
|
if err == ErrNoResponders {
|
|
err = ErrJetStreamNotEnabled
|
|
}
|
|
return _EMPTY_, err
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &slr); err != nil {
|
|
return _EMPTY_, err
|
|
}
|
|
if slr.Error != nil || len(slr.Streams) != 1 {
|
|
return _EMPTY_, ErrNoMatchingStream
|
|
}
|
|
return slr.Streams[0], nil
|
|
}
|
|
|
|
type subOpts struct {
|
|
// For attaching.
|
|
stream, consumer string
|
|
// For manual ack
|
|
mack bool
|
|
// For creating or updating.
|
|
cfg *ConsumerConfig
|
|
}
|
|
|
|
// ManualAck disables auto ack functionality for async subscriptions.
|
|
func ManualAck() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.mack = true
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Durable defines the consumer name for JetStream durable subscribers.
|
|
func Durable(name string) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
if opts.cfg.Durable != "" {
|
|
return fmt.Errorf("nats: option Durable set more than once")
|
|
}
|
|
if strings.Contains(name, ".") {
|
|
return ErrInvalidDurableName
|
|
}
|
|
|
|
opts.cfg.Durable = name
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// DeliverAll will configure a Consumer to receive all the
|
|
// messages from a Stream.
|
|
func DeliverAll() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.DeliverPolicy = DeliverAllPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// DeliverLast configures a Consumer to receive messages
|
|
// starting with the latest one.
|
|
func DeliverLast() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.DeliverPolicy = DeliverLastPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// DeliverNew configures a Consumer to receive messages
|
|
// published after the subscription.
|
|
func DeliverNew() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.DeliverPolicy = DeliverNewPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// StartSequence configures a Consumer to receive
|
|
// messages from a start sequence.
|
|
func StartSequence(seq uint64) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
|
|
opts.cfg.OptStartSeq = seq
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// StartTime configures a Consumer to receive
|
|
// messages from a start time.
|
|
func StartTime(startTime time.Time) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
|
|
opts.cfg.OptStartTime = &startTime
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// AckNone requires no acks for delivered messages.
|
|
func AckNone() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.AckPolicy = AckNonePolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// AckAll when acking a sequence number, this implicitly acks all sequences
|
|
// below this one as well.
|
|
func AckAll() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.AckPolicy = AckAllPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// AckExplicit requires ack or nack for all messages.
|
|
func AckExplicit() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.AckPolicy = AckExplicitPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// MaxDeliver sets the number of redeliveries for a message.
|
|
func MaxDeliver(n int) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.MaxDeliver = n
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// MaxAckPending sets the number of outstanding acks that are allowed before
|
|
// message delivery is halted.
|
|
func MaxAckPending(n int) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.MaxAckPending = n
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ReplayOriginal replays the messages at the original speed.
|
|
func ReplayOriginal() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.ReplayPolicy = ReplayOriginalPolicy
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RateLimit is the Bits per sec rate limit applied to a push consumer.
|
|
func RateLimit(n uint64) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.RateLimit = n
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// BindStream binds a consumer to a stream explicitly based on a name.
|
|
func BindStream(name string) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.stream = name
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// EnableFlowControl enables flow control for a push based consumer.
|
|
func EnableFlowControl() SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.FlowControl = true
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
|
|
func IdleHeartbeat(duration time.Duration) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.Heartbeat = duration
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
|
|
sub.mu.Lock()
|
|
// TODO(dlc) - Better way to mark especially if we attach.
|
|
if sub.jsi.consumer == _EMPTY_ {
|
|
sub.mu.Unlock()
|
|
return nil, ErrTypeSubscription
|
|
}
|
|
|
|
// Consumer info lookup should fail if in direct mode.
|
|
js := sub.jsi.js
|
|
stream, consumer := sub.jsi.stream, sub.jsi.consumer
|
|
sub.mu.Unlock()
|
|
|
|
return js.getConsumerInfo(stream, consumer)
|
|
}
|
|
|
|
type pullOpts struct {
|
|
ttl time.Duration
|
|
ctx context.Context
|
|
}
|
|
|
|
// PullOpt are the options that can be passed when pulling a batch of messages.
|
|
type PullOpt interface {
|
|
configurePull(opts *pullOpts) error
|
|
}
|
|
|
|
// PullMaxWaiting defines the max inflight pull requests.
|
|
func PullMaxWaiting(n int) SubOpt {
|
|
return subOptFn(func(opts *subOpts) error {
|
|
opts.cfg.MaxWaiting = n
|
|
return nil
|
|
})
|
|
}
|
|
|
|
var errNoMessages = errors.New("nats: no messages")
|
|
|
|
// Fetch pulls a batch of messages from a stream for a pull consumer.
|
|
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
|
|
if sub == nil {
|
|
return nil, ErrBadSubscription
|
|
}
|
|
|
|
var o pullOpts
|
|
for _, opt := range opts {
|
|
if err := opt.configurePull(&o); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if o.ctx != nil && o.ttl != 0 {
|
|
return nil, ErrContextAndTimeout
|
|
}
|
|
|
|
sub.mu.Lock()
|
|
jsi := sub.jsi
|
|
if jsi == nil || sub.typ != PullSubscription {
|
|
sub.mu.Unlock()
|
|
return nil, ErrTypeSubscription
|
|
}
|
|
|
|
nc, _ := sub.conn, sub.Subject
|
|
stream, consumer := sub.jsi.stream, sub.jsi.consumer
|
|
js := sub.jsi.js
|
|
|
|
ttl := o.ttl
|
|
if ttl == 0 {
|
|
ttl = js.opts.wait
|
|
}
|
|
sub.mu.Unlock()
|
|
|
|
// Use the given context or setup a default one for the span
|
|
// of the pull batch request.
|
|
var (
|
|
ctx = o.ctx
|
|
err error
|
|
cancel context.CancelFunc
|
|
)
|
|
if o.ctx == nil {
|
|
ctx, cancel = context.WithTimeout(context.Background(), ttl)
|
|
defer cancel()
|
|
}
|
|
|
|
// Check if context not done already before making the request.
|
|
select {
|
|
case <-ctx.Done():
|
|
if ctx.Err() == context.Canceled {
|
|
err = ctx.Err()
|
|
} else {
|
|
err = ErrTimeout
|
|
}
|
|
default:
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check for empty payload message and process synchronously
|
|
// any status messages.
|
|
checkMsg := func(msg *Msg) error {
|
|
if len(msg.Data) == 0 {
|
|
switch msg.Header.Get(statusHdr) {
|
|
case noResponders:
|
|
return ErrNoResponders
|
|
case noMessages:
|
|
return errNoMessages
|
|
case "400", "408", "409":
|
|
return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
checkCtxErr := func(err error) error {
|
|
if o.ctx == nil && err == context.DeadlineExceeded {
|
|
return ErrTimeout
|
|
}
|
|
return err
|
|
}
|
|
|
|
var (
|
|
gotNoMessages bool
|
|
nr = &nextRequest{Batch: batch, NoWait: true}
|
|
req, _ = json.Marshal(nr)
|
|
reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
|
|
expires = ttl - 10*time.Millisecond
|
|
msgs = make([]*Msg, 0)
|
|
)
|
|
|
|
if batch == 1 {
|
|
// To optimize single message no wait fetch, we use a shared wildcard
|
|
// subscription per pull subscriber to wait for the response.
|
|
resp, err := jsi.fetchNoWait(ctx, reqNext, req)
|
|
if err != nil {
|
|
return nil, checkCtxErr(err)
|
|
}
|
|
|
|
// In case of a no messages instant error, then fallback
|
|
// into longer version of pull batch request.
|
|
err = checkMsg(resp)
|
|
if err != nil {
|
|
if err == errNoMessages {
|
|
// Use old request style for the retry of the pull request
|
|
// in order to use auto UNSUB 1 to prevent the server
|
|
// from delivering a message when there is no more interest.
|
|
nr.NoWait = false
|
|
nr.Expires = expires
|
|
req, _ = json.Marshal(nr)
|
|
resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req)
|
|
if err != nil {
|
|
return nil, checkCtxErr(err)
|
|
}
|
|
|
|
// This next message, could also be an error
|
|
// (e.g. 408 due to request timeout).
|
|
err = checkMsg(resp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return []*Msg{resp}, nil
|
|
} else {
|
|
// Hard error
|
|
return nil, checkCtxErr(err)
|
|
}
|
|
}
|
|
return []*Msg{resp}, nil
|
|
}
|
|
|
|
// Setup a request where we will wait for the first response
|
|
// in case of errors, then dispatch the rest of the replies
|
|
// to the channel.
|
|
inbox := NewInbox()
|
|
|
|
mch := make(chan *Msg, batch)
|
|
s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove interest in the subscription at the end so that the
|
|
// this inbox does not get delivered the results intended
|
|
// for another request.
|
|
defer s.Unsubscribe()
|
|
|
|
// Make a publish request to get results of the pull.
|
|
err = nc.publish(reqNext, inbox, nil, req)
|
|
if err != nil {
|
|
s.Unsubscribe()
|
|
return nil, err
|
|
}
|
|
|
|
// Try to get the first message or error with NoWait.
|
|
var (
|
|
firstMsg *Msg
|
|
ok bool
|
|
)
|
|
select {
|
|
case firstMsg, ok = <-mch:
|
|
if !ok {
|
|
err = s.getNextMsgErr()
|
|
} else {
|
|
err = s.processNextMsgDelivered(firstMsg)
|
|
if err == nil {
|
|
err = checkMsg(firstMsg)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
err = checkCtxErr(ctx.Err())
|
|
}
|
|
|
|
// If the first error is 'no more messages', then switch into
|
|
// longer form version of the request that waits for messages.
|
|
if err == errNoMessages {
|
|
gotNoMessages = true
|
|
} else if err != nil {
|
|
// We should be getting the response from the server
|
|
// in case we got a poll error, so stop and cleanup.
|
|
s.Unsubscribe()
|
|
return nil, err
|
|
}
|
|
|
|
if gotNoMessages {
|
|
// We started with a 404 response right away, so fallback into
|
|
// second request that waits longer for messages to delivered.
|
|
nr.NoWait = false
|
|
nr.Expires = expires
|
|
req, _ = json.Marshal(nr)
|
|
|
|
// Since first message was an error we UNSUB (batch+1)
|
|
// since we are counting it as the first message.
|
|
err = s.AutoUnsubscribe(batch + 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Make another request and wait for the messages...
|
|
err = nc.publish(reqNext, inbox, nil, req)
|
|
if err != nil {
|
|
s.Unsubscribe()
|
|
return nil, err
|
|
}
|
|
|
|
// Try to get the first result again or return the error.
|
|
select {
|
|
case firstMsg, ok = <-mch:
|
|
if !ok {
|
|
err = s.getNextMsgErr()
|
|
} else {
|
|
err = s.processNextMsgDelivered(firstMsg)
|
|
if err == nil {
|
|
err = checkMsg(firstMsg)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
err = checkCtxErr(ctx.Err())
|
|
}
|
|
if err != nil {
|
|
s.Unsubscribe()
|
|
return nil, err
|
|
}
|
|
// Check again if the delivered next message is a status error.
|
|
err = checkMsg(firstMsg)
|
|
if err != nil {
|
|
s.Unsubscribe()
|
|
return nil, err
|
|
}
|
|
} else {
|
|
// We are receiving messages at this point. Send UNSUB to let
|
|
// the server clear interest once enough replies are delivered.
|
|
err = s.AutoUnsubscribe(batch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
msgs = append(msgs, firstMsg)
|
|
for {
|
|
var (
|
|
msg *Msg
|
|
ok bool
|
|
)
|
|
select {
|
|
case msg, ok = <-mch:
|
|
if !ok {
|
|
err = s.getNextMsgErr()
|
|
} else {
|
|
err = s.processNextMsgDelivered(msg)
|
|
if err == nil {
|
|
err = checkMsg(msg)
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return msgs, checkCtxErr(err)
|
|
}
|
|
if err != nil {
|
|
// Discard the error which may have been a timeout
|
|
// or 408 request timeout status from the server,
|
|
// and just the return delivered messages.
|
|
break
|
|
}
|
|
if msg != nil {
|
|
msgs = append(msgs, msg)
|
|
}
|
|
|
|
if len(msgs) == batch {
|
|
// Done!
|
|
break
|
|
}
|
|
}
|
|
|
|
return msgs, nil
|
|
}
|
|
|
|
func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
|
|
defer cancel()
|
|
return js.getConsumerInfoContext(ctx, stream, consumer)
|
|
}
|
|
|
|
func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
|
|
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
|
|
resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
|
|
if err != nil {
|
|
if err == ErrNoResponders {
|
|
err = ErrJetStreamNotEnabled
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
var info consumerResponse
|
|
if err := json.Unmarshal(resp.Data, &info); err != nil {
|
|
return nil, err
|
|
}
|
|
if info.Error != nil {
|
|
return nil, fmt.Errorf("nats: %s", info.Error.Description)
|
|
}
|
|
return info.ConsumerInfo, nil
|
|
}
|
|
|
|
func (m *Msg) checkReply() (*js, *jsSub, error) {
|
|
if m == nil || m.Sub == nil {
|
|
return nil, nil, ErrMsgNotBound
|
|
}
|
|
if m.Reply == "" {
|
|
return nil, nil, ErrMsgNoReply
|
|
}
|
|
sub := m.Sub
|
|
sub.mu.Lock()
|
|
if sub.jsi == nil {
|
|
sub.mu.Unlock()
|
|
|
|
// Not using a JS context.
|
|
return nil, nil, nil
|
|
}
|
|
js := sub.jsi.js
|
|
jsi := sub.jsi
|
|
sub.mu.Unlock()
|
|
|
|
return js, jsi, nil
|
|
}
|
|
|
|
// ackReply handles all acks. Will do the right thing for pull and sync mode.
|
|
// It ensures that an ack is only sent a single time, regardless of
|
|
// how many times it is being called to avoid duplicated acks.
|
|
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
|
|
var o ackOpts
|
|
for _, opt := range opts {
|
|
if err := opt.configureAck(&o); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
js, _, err := m.checkReply()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip if already acked.
|
|
if atomic.LoadUint32(&m.ackd) == 1 {
|
|
return ErrInvalidJSAck
|
|
}
|
|
|
|
m.Sub.mu.Lock()
|
|
nc := m.Sub.conn
|
|
m.Sub.mu.Unlock()
|
|
|
|
usesCtx := o.ctx != nil
|
|
usesWait := o.ttl > 0
|
|
sync = sync || usesCtx || usesWait
|
|
ctx := o.ctx
|
|
wait := defaultRequestWait
|
|
if usesWait {
|
|
wait = o.ttl
|
|
} else if js != nil {
|
|
wait = js.opts.wait
|
|
}
|
|
|
|
if sync {
|
|
if usesCtx {
|
|
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
|
|
} else {
|
|
_, err = nc.Request(m.Reply, ackType, wait)
|
|
}
|
|
} else {
|
|
err = nc.Publish(m.Reply, ackType)
|
|
}
|
|
|
|
// Mark that the message has been acked unless it is AckProgress
|
|
// which can be sent many times.
|
|
if err == nil && !bytes.Equal(ackType, ackProgress) {
|
|
atomic.StoreUint32(&m.ackd, 1)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Ack acknowledges a message. This tells the server that the message was
|
|
// successfully processed and it can move on to the next message.
|
|
func (m *Msg) Ack(opts ...AckOpt) error {
|
|
return m.ackReply(ackAck, false, opts...)
|
|
}
|
|
|
|
// Ack is the synchronous version of Ack. This indicates successful message
|
|
// processing.
|
|
func (m *Msg) AckSync(opts ...AckOpt) error {
|
|
return m.ackReply(ackAck, true, opts...)
|
|
}
|
|
|
|
// Nak negatively acknowledges a message. This tells the server to redeliver
|
|
// the message. You can configure the number of redeliveries by passing
|
|
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
|
|
func (m *Msg) Nak(opts ...AckOpt) error {
|
|
return m.ackReply(ackNak, false, opts...)
|
|
}
|
|
|
|
// Term tells the server to not redeliver this message, regardless of the value
|
|
// of nats.MaxDeliver.
|
|
func (m *Msg) Term(opts ...AckOpt) error {
|
|
return m.ackReply(ackTerm, false, opts...)
|
|
}
|
|
|
|
// InProgress tells the server that this message is being worked on. It resets
|
|
// the redelivery timer on the server.
|
|
func (m *Msg) InProgress(opts ...AckOpt) error {
|
|
return m.ackReply(ackProgress, false, opts...)
|
|
}
|
|
|
|
// MsgMetadata is the JetStream metadata associated with received messages.
|
|
type MsgMetadata struct {
|
|
Sequence SequencePair
|
|
NumDelivered uint64
|
|
NumPending uint64
|
|
Timestamp time.Time
|
|
Stream string
|
|
Consumer string
|
|
}
|
|
|
|
func getMetadataFields(subject string) ([]string, error) {
|
|
const expectedTokens = 9
|
|
const btsep = '.'
|
|
|
|
tsa := [expectedTokens]string{}
|
|
start, tokens := 0, tsa[:0]
|
|
for i := 0; i < len(subject); i++ {
|
|
if subject[i] == btsep {
|
|
tokens = append(tokens, subject[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
tokens = append(tokens, subject[start:])
|
|
if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
|
|
return nil, ErrNotJSMessage
|
|
}
|
|
return tokens, nil
|
|
}
|
|
|
|
// Metadata retrieves the metadata from a JetStream message. This method will
|
|
// return an error for non-JetStream Msgs.
|
|
func (m *Msg) Metadata() (*MsgMetadata, error) {
|
|
if _, _, err := m.checkReply(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tokens, err := getMetadataFields(m.Reply)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
meta := &MsgMetadata{
|
|
NumDelivered: uint64(parseNum(tokens[4])),
|
|
NumPending: uint64(parseNum(tokens[8])),
|
|
Timestamp: time.Unix(0, parseNum(tokens[7])),
|
|
Stream: tokens[2],
|
|
Consumer: tokens[3],
|
|
}
|
|
meta.Sequence.Stream = uint64(parseNum(tokens[5]))
|
|
meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
|
|
return meta, nil
|
|
}
|
|
|
|
// Quick parser for positive numbers in ack reply encoding.
|
|
func parseNum(d string) (n int64) {
|
|
if len(d) == 0 {
|
|
return -1
|
|
}
|
|
|
|
// Ascii numbers 0-9
|
|
const (
|
|
asciiZero = 48
|
|
asciiNine = 57
|
|
)
|
|
|
|
for _, dec := range d {
|
|
if dec < asciiZero || dec > asciiNine {
|
|
return -1
|
|
}
|
|
n = n*10 + (int64(dec) - asciiZero)
|
|
}
|
|
return n
|
|
}
|
|
|
|
// AckPolicy determines how the consumer should acknowledge delivered messages.
|
|
type AckPolicy int
|
|
|
|
const (
|
|
// AckNonePolicy requires no acks for delivered messages.
|
|
AckNonePolicy AckPolicy = iota
|
|
|
|
// AckAllPolicy when acking a sequence number, this implicitly acks all
|
|
// sequences below this one as well.
|
|
AckAllPolicy
|
|
|
|
// AckExplicitPolicy requires ack or nack for all messages.
|
|
AckExplicitPolicy
|
|
|
|
// For setting
|
|
ackPolicyNotSet = 99
|
|
)
|
|
|
|
func jsonString(s string) string {
|
|
return "\"" + s + "\""
|
|
}
|
|
|
|
func (p *AckPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString("none"):
|
|
*p = AckNonePolicy
|
|
case jsonString("all"):
|
|
*p = AckAllPolicy
|
|
case jsonString("explicit"):
|
|
*p = AckExplicitPolicy
|
|
default:
|
|
return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p AckPolicy) MarshalJSON() ([]byte, error) {
|
|
switch p {
|
|
case AckNonePolicy:
|
|
return json.Marshal("none")
|
|
case AckAllPolicy:
|
|
return json.Marshal("all")
|
|
case AckExplicitPolicy:
|
|
return json.Marshal("explicit")
|
|
default:
|
|
return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p)
|
|
}
|
|
}
|
|
|
|
func (p AckPolicy) String() string {
|
|
switch p {
|
|
case AckNonePolicy:
|
|
return "AckNone"
|
|
case AckAllPolicy:
|
|
return "AckAll"
|
|
case AckExplicitPolicy:
|
|
return "AckExplicit"
|
|
case ackPolicyNotSet:
|
|
return "Not Initialized"
|
|
default:
|
|
return "Unknown AckPolicy"
|
|
}
|
|
}
|
|
|
|
// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
|
|
type ReplayPolicy int
|
|
|
|
const (
|
|
// ReplayInstantPolicy will replay messages as fast as possible.
|
|
ReplayInstantPolicy ReplayPolicy = iota
|
|
|
|
// ReplayOriginalPolicy will maintain the same timing as the messages were received.
|
|
ReplayOriginalPolicy
|
|
)
|
|
|
|
func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString("instant"):
|
|
*p = ReplayInstantPolicy
|
|
case jsonString("original"):
|
|
*p = ReplayOriginalPolicy
|
|
default:
|
|
return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
|
|
switch p {
|
|
case ReplayOriginalPolicy:
|
|
return json.Marshal("original")
|
|
case ReplayInstantPolicy:
|
|
return json.Marshal("instant")
|
|
default:
|
|
return nil, fmt.Errorf("nats: unknown replay policy %v", p)
|
|
}
|
|
}
|
|
|
|
var (
|
|
ackAck = []byte("+ACK")
|
|
ackNak = []byte("-NAK")
|
|
ackProgress = []byte("+WPI")
|
|
ackTerm = []byte("+TERM")
|
|
)
|
|
|
|
// DeliverPolicy determines how the consumer should select the first message to deliver.
|
|
type DeliverPolicy int
|
|
|
|
const (
|
|
// DeliverAllPolicy starts delivering messages from the very beginning of a
|
|
// stream. This is the default.
|
|
DeliverAllPolicy DeliverPolicy = iota
|
|
|
|
// DeliverLastPolicy will start the consumer with the last sequence
|
|
// received.
|
|
DeliverLastPolicy
|
|
|
|
// DeliverNewPolicy will only deliver new messages that are sent after the
|
|
// consumer is created.
|
|
DeliverNewPolicy
|
|
|
|
// DeliverByStartTimePolicy will deliver messages starting from a given
|
|
// sequence.
|
|
DeliverByStartSequencePolicy
|
|
|
|
// DeliverByStartTimePolicy will deliver messages starting from a given
|
|
// time.
|
|
DeliverByStartTimePolicy
|
|
)
|
|
|
|
func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString("all"), jsonString("undefined"):
|
|
*p = DeliverAllPolicy
|
|
case jsonString("last"):
|
|
*p = DeliverLastPolicy
|
|
case jsonString("new"):
|
|
*p = DeliverNewPolicy
|
|
case jsonString("by_start_sequence"):
|
|
*p = DeliverByStartSequencePolicy
|
|
case jsonString("by_start_time"):
|
|
*p = DeliverByStartTimePolicy
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
|
|
switch p {
|
|
case DeliverAllPolicy:
|
|
return json.Marshal("all")
|
|
case DeliverLastPolicy:
|
|
return json.Marshal("last")
|
|
case DeliverNewPolicy:
|
|
return json.Marshal("new")
|
|
case DeliverByStartSequencePolicy:
|
|
return json.Marshal("by_start_sequence")
|
|
case DeliverByStartTimePolicy:
|
|
return json.Marshal("by_start_time")
|
|
default:
|
|
return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
|
|
}
|
|
}
|
|
|
|
// RetentionPolicy determines how messages in a set are retained.
|
|
type RetentionPolicy int
|
|
|
|
const (
|
|
// LimitsPolicy (default) means that messages are retained until any given limit is reached.
|
|
// This could be one of MaxMsgs, MaxBytes, or MaxAge.
|
|
LimitsPolicy RetentionPolicy = iota
|
|
// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
|
|
InterestPolicy
|
|
// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
|
|
WorkQueuePolicy
|
|
)
|
|
|
|
// DiscardPolicy determines how to proceed when limits of messages or bytes are
|
|
// reached.
|
|
type DiscardPolicy int
|
|
|
|
const (
|
|
// DiscardOld will remove older messages to return to the limits. This is
|
|
// the default.
|
|
DiscardOld DiscardPolicy = iota
|
|
//DiscardNew will fail to store new messages.
|
|
DiscardNew
|
|
)
|
|
|
|
const (
|
|
limitsPolicyString = "limits"
|
|
interestPolicyString = "interest"
|
|
workQueuePolicyString = "workqueue"
|
|
)
|
|
|
|
func (rp RetentionPolicy) String() string {
|
|
switch rp {
|
|
case LimitsPolicy:
|
|
return "Limits"
|
|
case InterestPolicy:
|
|
return "Interest"
|
|
case WorkQueuePolicy:
|
|
return "WorkQueue"
|
|
default:
|
|
return "Unknown Retention Policy"
|
|
}
|
|
}
|
|
|
|
func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
|
|
switch rp {
|
|
case LimitsPolicy:
|
|
return json.Marshal(limitsPolicyString)
|
|
case InterestPolicy:
|
|
return json.Marshal(interestPolicyString)
|
|
case WorkQueuePolicy:
|
|
return json.Marshal(workQueuePolicyString)
|
|
default:
|
|
return nil, fmt.Errorf("nats: can not marshal %v", rp)
|
|
}
|
|
}
|
|
|
|
func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(limitsPolicyString):
|
|
*rp = LimitsPolicy
|
|
case jsonString(interestPolicyString):
|
|
*rp = InterestPolicy
|
|
case jsonString(workQueuePolicyString):
|
|
*rp = WorkQueuePolicy
|
|
default:
|
|
return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (dp DiscardPolicy) String() string {
|
|
switch dp {
|
|
case DiscardOld:
|
|
return "DiscardOld"
|
|
case DiscardNew:
|
|
return "DiscardNew"
|
|
default:
|
|
return "Unknown Discard Policy"
|
|
}
|
|
}
|
|
|
|
func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
|
|
switch dp {
|
|
case DiscardOld:
|
|
return json.Marshal("old")
|
|
case DiscardNew:
|
|
return json.Marshal("new")
|
|
default:
|
|
return nil, fmt.Errorf("nats: can not marshal %v", dp)
|
|
}
|
|
}
|
|
|
|
func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
|
|
switch strings.ToLower(string(data)) {
|
|
case jsonString("old"):
|
|
*dp = DiscardOld
|
|
case jsonString("new"):
|
|
*dp = DiscardNew
|
|
default:
|
|
return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StorageType determines how messages are stored for retention.
|
|
type StorageType int
|
|
|
|
const (
|
|
// FileStorage specifies on disk storage. It's the default.
|
|
FileStorage StorageType = iota
|
|
// MemoryStorage specifies in memory only.
|
|
MemoryStorage
|
|
)
|
|
|
|
const (
|
|
memoryStorageString = "memory"
|
|
fileStorageString = "file"
|
|
)
|
|
|
|
func (st StorageType) String() string {
|
|
switch st {
|
|
case MemoryStorage:
|
|
return strings.Title(memoryStorageString)
|
|
case FileStorage:
|
|
return strings.Title(fileStorageString)
|
|
default:
|
|
return "Unknown Storage Type"
|
|
}
|
|
}
|
|
|
|
func (st StorageType) MarshalJSON() ([]byte, error) {
|
|
switch st {
|
|
case MemoryStorage:
|
|
return json.Marshal(memoryStorageString)
|
|
case FileStorage:
|
|
return json.Marshal(fileStorageString)
|
|
default:
|
|
return nil, fmt.Errorf("nats: can not marshal %v", st)
|
|
}
|
|
}
|
|
|
|
func (st *StorageType) UnmarshalJSON(data []byte) error {
|
|
switch string(data) {
|
|
case jsonString(memoryStorageString):
|
|
*st = MemoryStorage
|
|
case jsonString(fileStorageString):
|
|
*st = FileStorage
|
|
default:
|
|
return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
}
|
|
return nil
|
|
}
|