Skip to content

Commit

Permalink
[ISSUE #293] Optimize Close of rmqClient (#303)
Browse files Browse the repository at this point in the history
* optimize shutdown rmqClient
  • Loading branch information
wenfengwang authored Nov 20, 2019
1 parent 0e9d5b1 commit 28715e5
Showing 1 changed file with 68 additions and 28 deletions.
96 changes: 68 additions & 28 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,18 @@ type rmqClient struct {
hbMutex sync.Mutex
close bool
namesrvs *namesrvs
done chan struct{}
shutdownOnce sync.Once
}

var clientMap sync.Map

func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *rmqClient {
func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(),
namesrvs: option.Namesrv,
done: make(chan struct{}),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
if !loaded {
Expand Down Expand Up @@ -228,63 +231,100 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *
func (c *rmqClient) Start() {
//ctx, cancel := context.WithCancel(context.Background())
//c.cancel = cancel
c.close = false
c.once.Do(func() {
// TODO fetchNameServerAddr
if !c.option.Credentials.IsEmpty() {
c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
}
// TODO fetchNameServerAddr
go func() {}()

// schedule update route info
go func() {
// delay
ticker := time.NewTicker(_PullNameServerInterval)
defer ticker.Stop()
time.Sleep(50 * time.Millisecond)
for !c.close {
c.UpdateTopicRouteInfo()
time.Sleep(_PullNameServerInterval)
for {
select {
case <-ticker.C:
c.UpdateTopicRouteInfo()
case <-c.done:
rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
}()

// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
go func() {
for !c.close {
c.namesrvs.cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
time.Sleep(_HeartbeatBrokerInterval)
ticker := time.NewTicker(_HeartbeatBrokerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.namesrvs.cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
case <-c.done:
rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
}()

// schedule persist offset
go func() {
//time.Sleep(10 * time.Second)
for !c.close {
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
err := consumer.PersistConsumerOffset()
if err != nil {
rlog.Error("persist offset failed", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
return true
})
time.Sleep(_PersistOffset)
ticker := time.NewTicker(_PersistOffset)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.consumerMap.Range(func(key, value interface{}) bool {
consumer := value.(InnerConsumer)
err := consumer.PersistConsumerOffset()
if err != nil {
rlog.Error("persist offset failed", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
}
return true
})
case <-c.done:
rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
}()

go func() {
for !c.close {
c.RebalanceImmediately()
time.Sleep(_RebalanceInterval)
ticker := time.NewTicker(_RebalanceInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.RebalanceImmediately()
case <-c.done:
rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
"clientID": c.ClientID(),
})
return
}
}
}()
})
}

func (c *rmqClient) Shutdown() {
c.remoteClient.ShutDown()
c.close = true
c.shutdownOnce.Do(func() {
close(c.done)
c.close = true
c.remoteClient.ShutDown()
})
}

func (c *rmqClient) ClientID() string {
Expand Down

0 comments on commit 28715e5

Please sign in to comment.