-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.go
140 lines (120 loc) · 5.13 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package amqpsender
import (
"fmt"
"time"
"github.com/pkg/errors"
"github.com/rb-pkg/amqp"
)
// NewSender returns new Sender instance
func NewSender() *Sender {
return &Sender{}
}
// Start is for initialize connection to AMQP and create new channel
func (sndr *Sender) Start(connectionString string, amqpConfig amqp.Config, noWait bool) error {
var err error
sndr.amqpConnection, err = amqp.DialConfig(connectionString, amqpConfig)
if err != nil {
return errors.Wrap(err, "failed to connect to RabbitMQ")
}
sndr.amqpChannel, err = sndr.amqpConnection.Channel()
if err != nil {
return fmt.Errorf("failed to open a channel: %v", err)
}
err = sndr.amqpChannel.Confirm(noWait)
if err != nil {
return fmt.Errorf("failed to set channel to noWait mode: %v", err)
}
return nil
}
// GetConnectionNotifyClose returns NotifyClose channel for AMQP Connection
func (sndr *Sender) GetConnectionNotifyClose() chan *amqp.Error {
if sndr.amqpConnectionNotifyClose == nil {
sndr.amqpConnectionNotifyClose = sndr.amqpConnection.NotifyClose(make(chan *amqp.Error))
}
return sndr.amqpConnectionNotifyClose
}
// GetConnectionNotifyBlocked returns NotifyBlocked channel for AMQP Connection
func (sndr *Sender) GetConnectionNotifyBlocked() chan amqp.Blocking {
if sndr.amqpConnectionNotifyBlocked == nil {
sndr.amqpConnectionNotifyBlocked = sndr.amqpConnection.NotifyBlocked(make(chan amqp.Blocking))
}
return sndr.amqpConnectionNotifyBlocked
}
// GetChannelNotifyCancel returns NotifyCancel channel for AMQP Channel
func (sndr *Sender) GetChannelNotifyCancel() chan string {
if sndr.amqpChannelNotifyCancel == nil {
sndr.amqpChannelNotifyCancel = sndr.amqpChannel.NotifyCancel(make(chan string))
}
return sndr.amqpChannelNotifyCancel
}
// GetChannelNotifyClose returns NotifyClose channel for AMQP Channel
func (sndr *Sender) GetChannelNotifyClose() chan *amqp.Error {
if sndr.amqpChannelNotifyClose == nil {
sndr.amqpChannelNotifyClose = sndr.amqpChannel.NotifyClose(make(chan *amqp.Error))
}
return sndr.amqpChannelNotifyClose
}
// GetChannelNotifyConfirm returns NotifyConfirm channels for Ack and Nack for AMQP Channel
func (sndr *Sender) GetChannelNotifyConfirm() (chan uint64, chan uint64) {
if sndr.amqpChannelNotifyConfirmAck == nil || sndr.amqpChannelNotifyConfirmNack == nil {
sndr.amqpChannelNotifyConfirmAck, sndr.amqpChannelNotifyConfirmNack = sndr.amqpChannel.NotifyConfirm(make(chan uint64), make(chan uint64))
}
return sndr.amqpChannelNotifyConfirmAck, sndr.amqpChannelNotifyConfirmNack
}
// GetChannelNotifyFlow returns NotifyFlow channel for AMQP Channel
func (sndr *Sender) GetChannelNotifyFlow() chan bool {
if sndr.amqpChannelNotifyFlow == nil {
sndr.amqpChannelNotifyFlow = sndr.amqpChannel.NotifyFlow(make(chan bool))
}
return sndr.amqpChannelNotifyFlow
}
// GetChannelNotifyPublish returns NotifyPublish channel for AMQP Channel
func (sndr *Sender) GetChannelNotifyPublish() chan amqp.Confirmation {
if sndr.amqpChannelNotifyPublish == nil {
sndr.amqpChannelNotifyPublish = sndr.amqpChannel.NotifyPublish(make(chan amqp.Confirmation))
}
return sndr.amqpChannelNotifyPublish
}
// GetChannelNotifyReturn returns NotifyReturn channel for AMQP Channel
func (sndr *Sender) GetChannelNotifyReturn() chan amqp.Return {
if sndr.amqpChannelNotifyReturn == nil {
sndr.amqpChannelNotifyReturn = sndr.amqpChannel.NotifyReturn(make(chan amqp.Return))
}
return sndr.amqpChannelNotifyReturn
}
// SendTask sends job task to AMQP
func (sndr *Sender) SendTask(exchangeName string, routingKey string, mandatory bool, immediate bool, amqpPub *amqp.Publishing) error {
return sndr.amqpChannel.Publish(exchangeName, routingKey, mandatory, immediate, amqpPub)
}
// SendTaskWithCurrentTimestamp sends job task to AMQP
func (sndr *Sender) SendTaskWithCurrentTimestamp(exchangeName string, routingKey string, mandatory bool, immediate bool, amqpPub *amqp.Publishing) error {
amqpPub.Timestamp = time.Now()
return sndr.amqpChannel.Publish(exchangeName, routingKey, mandatory, immediate, amqpPub)
}
// SendTaskSimple sends job task to AMQP
func (sndr *Sender) SendTaskSimple(exchangeName string, routingKey string, amqpPub *amqp.Publishing) error {
return sndr.amqpChannel.Publish(exchangeName, routingKey, false, false, amqpPub)
}
// SendTaskSimpleWithCurrentTimestamp sends job task to AMQP
func (sndr *Sender) SendTaskSimpleWithCurrentTimestamp(exchangeName string, routingKey string, amqpPub *amqp.Publishing) error {
amqpPub.Timestamp = time.Now()
return sndr.amqpChannel.Publish(exchangeName, routingKey, false, false, amqpPub)
}
// Close is for gracefull closing amqp connection, amqp channel and all notify channels
func (sndr *Sender) Close() error {
if err := sndr.amqpConnection.Close(); err != nil {
return errors.Wrap(err, "error with graceful close amqp connection")
}
_ = sndr.amqpChannel.Close()
// clean channels
close(sndr.amqpConnectionNotifyClose)
close(sndr.amqpConnectionNotifyBlocked)
close(sndr.amqpChannelNotifyCancel)
close(sndr.amqpChannelNotifyClose)
close(sndr.amqpChannelNotifyConfirmAck)
close(sndr.amqpChannelNotifyConfirmNack)
close(sndr.amqpChannelNotifyFlow)
close(sndr.amqpChannelNotifyPublish)
close(sndr.amqpChannelNotifyReturn)
return nil
}