-
Notifications
You must be signed in to change notification settings - Fork 7
/
kiteq_listener.go
135 lines (114 loc) · 3.08 KB
/
kiteq_listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package client
import (
"github.com/blackbeans/kiteq-common/protocol"
"github.com/blackbeans/kiteq-common/registry"
)
//
type IListener interface {
//接受投递消息的回调
OnMessage(msg *protocol.QMessage) bool
//接收事务回调
// 除非明确提交成功、其余都为不成功
// 有异常或者返回值为false均为不提交
OnMessageCheck(tx *protocol.TxResponse) error
//注册下handler,使用bingding的形式来
RegisteHandler(bind *registry.Binding) IListener
//增加消息处理的中间件
AddMiddleWares(wares ...MiddleWare) IListener
}
//消息处理上下文
type Context struct {
Msg *protocol.QMessage
//处理的error
Err error
//接收成功
Success bool
}
//处理链
type Chain func(ctx *Context) error
type MiddleWare func(next Chain) Chain
//处理器
type Handler func(body interface{}) (bool, error)
//kiteqListener
type KiteQListener struct {
IListener
wares []MiddleWare
handlers []*registry.Binding
}
//创建KiteQListener
func NewKiteQListener() *KiteQListener {
return &KiteQListener{handlers: make([]*registry.Binding, 0, 5)}
}
//增加中间件
func (self *KiteQListener) AddMiddleWares(wares ...MiddleWare) IListener {
self.wares = wares
return self
}
func (self *KiteQListener) RegisteHandler(bind *registry.Binding) IListener {
if nil != bind.Handler {
self.handlers = append(self.handlers, bind)
}
return self
}
func (self *KiteQListener) onFire(ctx *Context) error {
next := func(ctx *Context) error {
msg := ctx.Msg
topic := msg.GetHeader().GetTopic()
messageType := msg.GetHeader().GetMessageType()
for _, bind := range self.handlers {
if bind.Matches(topic, messageType) {
ctx.Success, ctx.Err = bind.Handler(msg.GetBody())
if nil != ctx.Err || !ctx.Success {
log.Errorf("OnMessage|Handle|FAIL|%s|%s|%s|%s|%v",
msg.GetHeader().GetMessageId(),
msg.GetHeader().GetTopic(),
msg.GetHeader().GetMessageType(),
msg.GetHeader().GetProperties(), ctx.Err)
return nil
}
return nil
}
}
//不存在这样的消息处理器
log.Warnf("OnMessage|Handle|FAIL|NO Handler|%s|%s|%s|%s",
msg.GetHeader().GetMessageId(),
msg.GetHeader().GetTopic(),
msg.GetHeader().GetMessageType(),
msg.GetHeader().GetProperties())
return nil
}
//开始进行处理链处理
if nil != self.wares {
for _, c := range self.wares {
next = c(next)
}
}
return next(ctx)
}
//接受投递消息的回调
func (self *KiteQListener) OnMessage(msg *protocol.QMessage) bool {
ctx := &Context{
Msg: msg,
}
err := self.onFire(ctx)
if nil != err {
return false
}
if nil != ctx.Err || !ctx.Success {
log.Errorf("OnMessage|Handle|FAIL|%s|%s|%s|%s|%v",
msg.GetHeader().GetMessageId(),
msg.GetHeader().GetTopic(),
msg.GetHeader().GetMessageType(),
msg.GetHeader().GetProperties(), err)
return false
}
return ctx.Success
}
//接收事务回调
// 除非明确提交成功、其余都为不成功
// 有异常或者返回值为false均为不提交
func (self *KiteQListener) OnMessageCheck(tx *protocol.TxResponse) error {
//TODO 暂时没有2PC的消息
tx.Commit()
return nil
}