Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix typos and doc comments for JetStream simplified API #1339

Merged
merged 1 commit into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
var (
// API errors

// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled.
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}

// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
Expand All @@ -71,13 +71,13 @@ var (
// ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNotFound, Description: "stream not found", Code: 404}}

// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrMsgNotFound is returned when message with provided sequence number does npt exist.
// ErrMsgNotFound is returned when message with provided sequence number does not exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

// ErrBadRequest is returned when invalid request is sent to JetStream API.
Expand Down Expand Up @@ -167,7 +167,7 @@ var (
// messages using Fetch (or FetchBytes).
ErrOrderConsumerUsedAsFetch = &jsError{message: "ordered consumer initialized as fetch"}

// ErrOrderConsumerUsedAsFetch is returned when ordered consumer was already used to process
// ErrOrderConsumerUsedAsConsume is returned when ordered consumer was already used to process
// messages using Consume or Messages.
ErrOrderConsumerUsedAsConsume = &jsError{message: "ordered consumer initialized as consume"}

Expand Down
10 changes: 5 additions & 5 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type (
// PublishMsg performs a synchronous publish to a stream and waits for ack from server
// It accepts subject name (which must be bound to a stream) and nats.Message
PublishMsg(ctx context.Context, msg *nats.Msg, opts ...PublishOpt) (*PubAck, error)
// PublishAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// PublishAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and message data
PublishAsync(subject string, payload []byte, opts ...PublishOpt) (PubAckFuture, error)
// PublishMsgAsync performs a asynchronous publish to a stream and returns [PubAckFuture] interface
// PublishMsgAsync performs an asynchronous publish to a stream and returns [PubAckFuture] interface
// It accepts subject name (which must be bound to a stream) and nats.Message
PublishMsgAsync(msg *nats.Msg, opts ...PublishOpt) (PubAckFuture, error)
// PublishAsyncPending returns the number of async publishes outstanding for this context
Expand Down Expand Up @@ -197,11 +197,11 @@ type (

var subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`)

// New returns a enw JetStream instance
// New returns a new JetStream instance.
//
// Available options:
// [WithClientTrace] - enables request/response tracing
// [WithPublishAsyncErrHandler] - sets error handler for async message publish
// [WithClientTrace] - enables request/response tracing.
// [WithPublishAsyncErrHandler] - sets error handler for async message publish.
// [WithPublishAsyncMaxPending] - sets the maximum outstanding async publishes that can be inflight at one time.
// [WithDirectGet] - specifies whether client should use direct get requests.
func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error) {
Expand Down
4 changes: 2 additions & 2 deletions jetstream/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ func (m *jetStreamMsg) Headers() nats.Header {
return m.msg.Header
}

// Subject reutrns a subject on which a message is published
// Subject returns a subject on which a message is published
func (m *jetStreamMsg) Subject() string {
return m.msg.Subject
}

// Reply reutrns a reply subject for a JetStream message
// Reply returns a reply subject for a JetStream message
func (m *jetStreamMsg) Reply() string {
return m.msg.Reply
}
Expand Down
11 changes: 5 additions & 6 deletions jetstream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func WithPublishAsyncMaxPending(max int) JetStreamOpt {
}
}

// WithPurgeSubject sets a sprecific subject for which messages on a stream will be purged
// WithPurgeSubject sets a specific subject for which messages on a stream will be purged
func WithPurgeSubject(subject string) StreamPurgeOpt {
return func(req *StreamPurgeRequest) error {
req.Subject = subject
return nil
}
}

// WithPurgeSequence is used to set a sprecific sequence number up to which (but not including) messages will be purged from a stream
// WithPurgeSequence is used to set a specific sequence number up to which (but not including) messages will be purged from a stream
// Can be combined with [WithPurgeSubject] option, but not with [WithPurgeKeep]
func WithPurgeSequence(sequence uint64) StreamPurgeOpt {
return func(req *StreamPurgeRequest) error {
Expand Down Expand Up @@ -206,16 +206,15 @@ func ConsumeErrHandler(cb ConsumeErrHandlerFunc) PullConsumeOpt {
})
}

// ConsumeErrHandler sets custom error handler invoked when an error was encountered while consuming messages
// It will be invoked for both terminal (Consumer Deleted, invalid request body) and non-terminal (e.g. missing heartbeats) errors
// WithMessagesErrOnMissingHeartbeat sets whether a missing heartbeat error should be reported when calling Next (Default: true).
func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt {
return pullOptFunc(func(cfg *consumeOpts) error {
cfg.ReportMissingHeartbeats = hbErr
return nil
})
}

// FetchMaxWait sets custom timeout fir fetching predefined batch of messages
// FetchMaxWait sets custom timeout for fetching predefined batch of messages
func FetchMaxWait(timeout time.Duration) FetchOpt {
return func(req *pullRequest) error {
if timeout <= 0 {
Expand Down Expand Up @@ -274,7 +273,7 @@ func WithExpectLastSequencePerSubject(seq uint64) PublishOpt {
}
}

// ExpectLastMsgId sets the expected last msgId in the response from the publish.
// WithExpectLastMsgID sets the expected last msgId in the response from the publish.
func WithExpectLastMsgID(id string) PublishOpt {
return func(opts *pubOpts) error {
opts.lastMsgID = id
Expand Down
2 changes: 1 addition & 1 deletion jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type (
)

const (
// Default time wait between retries on Publish iff err is NoResponders.
// Default time wait between retries on Publish if err is NoResponders.
DefaultPubRetryWait = 250 * time.Millisecond

// Default number of retries
Expand Down
28 changes: 13 additions & 15 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
type (
// MessagesContext supports iterating over a messages on a stream.
MessagesContext interface {
// Next retreives nest message on a stream. It will block until the next message is available.
// Next retreives next message on a stream. It will block until the next message is available.
Next() (Msg, error)
// Stop closes the iterator and cancels subscription.
Stop()
Expand Down Expand Up @@ -136,13 +136,13 @@ const (
// Consume returns a ConsumeContext, allowing for processing incoming messages from a stream in a given callback function.
//
// Available options:
// [ConsumeMaxMessages] - sets maximum number of messages stored in a buffer, default is set to 100
// [ConsumeMaxBytes] - sets maximum number of bytes stored in a buffer
// [ConsumeExpiry] - sets a timeout for individual batch request, default is set to 30 seconds
// [ConsumeHeartbeat] - sets an idle heartbeat setting for a pull request, default is set to 5s
// [PullMaxMessages] - sets maximum number of messages stored in a buffer, default is set to 100
// [PullMaxBytes] - sets maximum number of bytes stored in a buffer
// [PullExpiry] - sets a timeout for individual batch request, default is set to 30 seconds
// [PullHeartbeat] - sets an idle heartbeat setting for a pull request, default is set to 5s
// [ConsumeErrHandler] - sets custom consume error callback handler
// [ConsumeThresholdMessages] - sets the byte count on which Consume will trigger new pull request to the server
// [ConsumeThresholdBytes] - sets the message count on which Consume will trigger new pull request to the server
// [PullThresholdMessages] - sets the message count on which Consume will trigger new pull request to the server
// [PullThresholdBytes] - sets the byte count on which Consume will trigger new pull request to the server
func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
if handler == nil {
return nil, ErrHandlerRequired
Expand Down Expand Up @@ -350,13 +350,11 @@ func (s *pullSubscription) checkPending() {
// Messages returns MessagesContext, allowing continuously iterating over messages on a stream.
//
// Available options:
// [ConsumeMaxMessages] - sets maximum number of messages stored in a buffer, default is set to 100
// [ConsumeMaxBytes] - sets maximum number of bytes stored in a buffer
// [ConsumeExpiry] - sets a timeout for individual batch request, default is set to 30 seconds
// [ConsumeHeartbeat] - sets an idle heartbeat setting for a pull request, default is set to 5s
// [ConsumeErrHandler] - sets custom consume error callback handler
// [ConsumeThresholdMessages] - sets the byte count on which Consume will trigger new pull request to the server
// [ConsumeThresholdBytes] - sets the message count on which Consume will trigger new pull request to the server
// [PullMaxMessages] - sets maximum number of messages stored in a buffer, default is set to 100
// [PullMaxBytes] - sets maximum number of bytes stored in a buffer
// [PullExpiry] - sets a timeout for individual batch request, default is set to 30 seconds
// [PullHeartbeat] - sets an idle heartbeat setting for a pull request, default is set to 5s
// [WithMessagesErrOnMissingHeartbeat] - sets whether a missing heartbeat error should be reported when calling Next
func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) {
consumeOpts, err := parseMessagesOpts(opts...)
if err != nil {
Expand Down Expand Up @@ -596,7 +594,7 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch,
return p.fetch(req)
}

// Fetch sends a single request to retrieve given number of messages.
// FetchNoWait sends a single request to retrieve given number of messages.
// If there are any messages available at the time of sending request,
// FetchNoWait will return immediately.
func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) {
Expand Down
8 changes: 4 additions & 4 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type (
// ListConsumers returns ConsumerInfoLister enabling iterating over a channel of consumer infos
ListConsumers(context.Context) ConsumerInfoLister

// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
ConsumerNames(context.Context) ConsumerNameLister
}
RawStreamMsg struct {
Expand Down Expand Up @@ -276,8 +276,8 @@ func (s *stream) CachedInfo() *StreamInfo {
// Purge removes messages from a stream
//
// Available options:
// [WithPurgeSubject] - can be used set a sprecific subject for which messages on a stream will be purged
// [WithPurgeSequence] - can be used to set a sprecific sequence number up to which (but not including) messages will be purged from a stream
// [WithPurgeSubject] - can be used set a specific subject for which messages on a stream will be purged
// [WithPurgeSequence] - can be used to set a specific sequence number up to which (but not including) messages will be purged from a stream
// [WithPurgeKeep] - can be used to set the number of messages to be kept in the stream after purge.
func (s *stream) Purge(ctx context.Context, opts ...StreamPurgeOpt) error {
var purgeReq StreamPurgeRequest
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s *consumerLister) Err() <-chan error {
return s.errs
}

// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
func (s *stream) ConsumerNames(ctx context.Context) ConsumerNameLister {
l := &consumerLister{
js: s.jetStream,
Expand Down
4 changes: 2 additions & 2 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type (
}

// RePublish is for republishing messages once committed to a stream. The original
// subject cis remapped from the subject pattern to the destination pattern.
// subject is remapped from the subject pattern to the destination pattern.
RePublish struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
Expand Down Expand Up @@ -165,7 +165,7 @@ const (
// DiscardOld will remove older messages to return to the limits. This is
// the default.
DiscardOld DiscardPolicy = iota
//DiscardNew will fail to store new messages.
// DiscardNew will fail to store new messages.
DiscardNew
)

Expand Down