-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
112 lines (92 loc) · 2.65 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package monkey
import (
"context"
"errors"
"fmt"
"log"
"sync"
)
var (
// ErrMsgTypeAlreadRegisted ...
ErrMsgTypeAlreadRegisted = errors.New("msg type has alread registed")
)
// HandleFunc 包处理器
type HandleFunc func(context.Context, *JSONGatePacket, Transport)
// JSONPacketProcess 。。。
type JSONPacketProcess struct {
dispatchFunc map[uint16]HandleFunc
timeoutFunc map[uint16]HandleFunc
sync.RWMutex
Packeter GatePacketer
}
// RegisterFunc 注册 handle
func (jp *JSONPacketProcess) RegisterFunc(msgType uint16, handFunc, timeoutFunc HandleFunc) error {
return jp.registerFunc(msgType, handFunc, timeoutFunc)
}
func (jp *JSONPacketProcess) registerFunc(msgType uint16, handFunc, timeoutFunc HandleFunc) error {
jp.Lock()
defer jp.Unlock()
if jp.dispatchFunc == nil {
jp.dispatchFunc = make(map[uint16]HandleFunc)
jp.timeoutFunc = make(map[uint16]HandleFunc)
}
if _, ok := jp.dispatchFunc[msgType]; ok {
return ErrMsgTypeAlreadRegisted
}
jp.dispatchFunc[msgType] = handFunc
jp.timeoutFunc[msgType] = timeoutFunc
return nil
}
// GetHandler 获取handle
func (jp *JSONPacketProcess) GetHandler(packetID uint16) (process HandleFunc, timeout HandleFunc) {
jp.RLock()
defer jp.RUnlock()
if handleFunc, ok := jp.dispatchFunc[packetID]; ok {
process = handleFunc
}
if to, ok := jp.timeoutFunc[packetID]; ok {
timeout = to
}
return
}
// OnTransportMade implement for protocol
func (jp *JSONPacketProcess) OnTransportMade(trasnport Transport) {
// Do nothing but 可以做用户登陆的信息存储
}
// OnTransportLost implement for protocol
func (jp *JSONPacketProcess) OnTransportLost(trasnport Transport) {
// DO nothing
}
// OnTransportData implement for protocol
func (jp *JSONPacketProcess) OnTransportData(transport Transport, envelop *Envelope) {
if envelop == nil {
return
}
var base JSONGatePacket
err := jp.Packeter.Unpack(envelop.Msg, &base)
if err != nil {
log.Output(1, fmt.Sprintf("unpack failed %v", err))
return
}
p, _ := jp.GetHandler(uint16(base.PacketID)) // 超时的处理器暂不实现,主要是想针对服务端chan buffer 满的情况,提供给上层处理
if p == nil {
return
}
// error 不需要抛到这一层,因为这是业务逻辑代码,出错了这里也处理不了
defer func() {
if err := recover(); err != nil {
log.Output(1, fmt.Sprintf("process message encounter panic: %v", err))
}
}()
ctx := context.Background()
p(ctx, &base, transport)
}
// OnPing implement for protocol
func (jp *JSONPacketProcess) OnPing(m []byte) []byte {
log.Output(1, "on ping")
return m
}
// OnPong implement for protocol
func (jp *JSONPacketProcess) OnPong(_ []byte) error {
return nil
}