Skip to content

Commit

Permalink
[ISSUE #354] feat: Support PanicHandler (#355)
Browse files Browse the repository at this point in the history
* feat: Support PanicHandler

Closes #354
  • Loading branch information
xujianhai666 authored and wenfengwang committed Jan 7, 2020
1 parent 33798bc commit 7308bc9
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 63 deletions.
23 changes: 11 additions & 12 deletions consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type pushConsumer struct {
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
lockTicker *time.Ticker
done chan struct{}
closeOnce sync.Once
}
Expand Down Expand Up @@ -107,7 +106,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
lockTicker: time.NewTicker(dc.option.RebalanceLockInterval),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
}
Expand Down Expand Up @@ -168,14 +166,16 @@ func (pc *pushConsumer) Start() error {
pc.Rebalance()
time.Sleep(1 * time.Second)

go func() {
go primitive.WithRecover(func() {
// initial lock.
time.Sleep(1000 * time.Millisecond)
pc.lockAll()

lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
defer lockTicker.Stop()
for {
select {
case <-pc.lockTicker.C:
case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{
Expand All @@ -184,7 +184,7 @@ func (pc *pushConsumer) Start() error {
return
}
}
}()
})
})

if err != nil {
Expand All @@ -209,7 +209,6 @@ func (pc *pushConsumer) Start() error {
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
pc.lockTicker.Stop()
close(pc.done)

pc.client.UnregisterConsumer(pc.consumerGroup)
Expand Down Expand Up @@ -438,7 +437,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
})
var sleepTime time.Duration
pq := request.pq
go func() {
go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
Expand All @@ -450,7 +449,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
pc.submitToConsume(request.pq, request.mq)
}
}
}()
})

for {
NEXT:
Expand Down Expand Up @@ -683,13 +682,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
go func() {
go primitive.WithRecover(func() {
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.storage.remove(request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
}()
})
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
Expand Down Expand Up @@ -866,7 +865,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
subMsgs = msgs[count:next]
count = next - 1
}
go func() {
go primitive.WithRecover(func() {
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
Expand Down Expand Up @@ -948,7 +947,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
"message": msgs,
})
}
}()
})
}
}

Expand Down
25 changes: 13 additions & 12 deletions consumer/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"

"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)

Expand Down Expand Up @@ -148,7 +149,7 @@ func newStatsItemSet(statsName string) *statsItemSet {
}

func (sis *statsItemSet) init() {
go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
Expand All @@ -160,9 +161,9 @@ func (sis *statsItemSet) init() {

}
}
}()
})

go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
Expand All @@ -173,9 +174,9 @@ func (sis *statsItemSet) init() {
sis.samplingInMinutes()
}
}
}()
})

go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
Expand All @@ -186,9 +187,9 @@ func (sis *statsItemSet) init() {
sis.samplingInHour()
}
}
}()
})

go func() {
go primitive.WithRecover(func() {
time.Sleep(nextMinutesTime().Sub(time.Now()))
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
Expand All @@ -200,9 +201,9 @@ func (sis *statsItemSet) init() {
sis.printAtMinutes()
}
}
}()
})

go func() {
go primitive.WithRecover(func() {
time.Sleep(nextHourTime().Sub(time.Now()))
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
Expand All @@ -214,9 +215,9 @@ func (sis *statsItemSet) init() {
sis.printAtHour()
}
}
}()
})

go func() {
go primitive.WithRecover(func() {
time.Sleep(nextMonthTime().Sub(time.Now()))
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
Expand All @@ -228,7 +229,7 @@ func (sis *statsItemSet) init() {
sis.printAtDay()
}
}
}()
})
}

func (sis *statsItemSet) samplingInSeconds() {
Expand Down
1 change: 0 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

var (
// ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
ErrMQEmpty = errors.New("MessageQueue is nil")
ErrOffset = errors.New("offset < 0")
Expand Down
17 changes: 8 additions & 9 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (c *rmqClient) Start() {
}

// schedule update route info
go func() {
go primitive.WithRecover(func() {
// delay
ticker := time.NewTicker(_PullNameServerInterval)
defer ticker.Stop()
Expand All @@ -298,10 +298,9 @@ func (c *rmqClient) Start() {
return
}
}
}()
})

// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(_HeartbeatBrokerInterval)
defer ticker.Stop()
for {
Expand All @@ -316,10 +315,10 @@ func (c *rmqClient) Start() {
return
}
}
}()
})

// schedule persist offset
go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(_PersistOffsetInterval)
defer ticker.Stop()
for {
Expand All @@ -342,9 +341,9 @@ func (c *rmqClient) Start() {
return
}
}
}()
})

go func() {
go primitive.WithRecover(func() {
ticker := time.NewTicker(_RebalanceInterval)
defer ticker.Stop()
for {
Expand All @@ -358,7 +357,7 @@ func (c *rmqClient) Start() {
return
}
}
}()
})
})
}

Expand Down
16 changes: 10 additions & 6 deletions internal/remote/remote_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *
if err != nil {
return err
}
go c.receiveAsync(resp)
go primitive.WithRecover(func() {
c.receiveAsync(resp)
})
return nil
}

Expand Down Expand Up @@ -127,7 +129,9 @@ func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrap
return nil, err
}
c.connectionTable.Store(addr, tcpConn)
go c.receiveResponse(tcpConn)
go primitive.WithRecover(func() {
c.receiveResponse(tcpConn)
})
return tcpConn, nil
}

Expand Down Expand Up @@ -196,20 +200,20 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
if exist {
c.responseTable.Delete(cmd.Opaque)
responseFuture := resp.(*ResponseFuture)
go func() {
go primitive.WithRecover(func() {
responseFuture.ResponseCommand = cmd
responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
responseFuture.Done <- true
}
}()
})
}
} else {
f := c.processors[cmd.Code]
if f != nil {
// single goroutine will be deadlock
// TODO: optimize with goroutine pool, https:/apache/rocketmq-client-go/issues/307
go func() {
go primitive.WithRecover(func() {
res := f(cmd, r.RemoteAddr())
if res != nil {
res.Opaque = cmd.Opaque
Expand All @@ -222,7 +226,7 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
})
}
}
}()
})
} else {
rlog.Warning("receive broker's requests, but no func to handle", map[string]interface{}{
"responseCode": cmd.Code,
Expand Down
16 changes: 12 additions & 4 deletions internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func (td *traceDispatcher) GetTraceTopicName() string {
func (td *traceDispatcher) Start() {
td.running = true
td.cli.Start()
go td.process()
go primitive.WithRecover(func() {
td.process()
})
}

func (td *traceDispatcher) Close() {
Expand Down Expand Up @@ -299,7 +301,9 @@ func (td *traceDispatcher) process() {
batch = append(batch, ctx)
if count == batchSize {
count = 0
go td.batchCommit(batch)
go primitive.WithRecover(func() {
td.batchCommit(batch)
})
batch = make([]TraceContext, 0)
}
case <-td.ticker.C:
Expand All @@ -308,12 +312,16 @@ func (td *traceDispatcher) process() {
count++
lastput = time.Now()
if len(batch) > 0 {
go td.batchCommit(batch)
go primitive.WithRecover(func() {
td.batchCommit(batch)
})
batch = make([]TraceContext, 0)
}
}
case <-td.ctx.Done():
go td.batchCommit(batch)
go primitive.WithRecover(func() {
td.batchCommit(batch)
})
batch = make([]TraceContext, 0)

now := time.Now().UnixNano() / int64(time.Millisecond)
Expand Down
18 changes: 0 additions & 18 deletions internal/utils/fun.go

This file was deleted.

15 changes: 15 additions & 0 deletions primitive/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,18 @@ func verifyIP(ip string) error {
}
return nil
}

var PanicHandler func(interface{})

func WithRecover(fn func()) {
defer func() {
handler := PanicHandler
if handler != nil {
if err := recover(); err != nil {
handler(err)
}
}
}()

fn()
}
4 changes: 3 additions & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ func NewTransactionProducer(listener primitive.TransactionListener, opts ...Opti
}

func (tp *transactionProducer) Start() error {
go tp.checkTransactionState()
go primitive.WithRecover(func() {
tp.checkTransactionState()
})
return tp.producer.Start()
}
func (tp *transactionProducer) Shutdown() error {
Expand Down

0 comments on commit 7308bc9

Please sign in to comment.