Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split sync data and notification cmd and add some heartbeat log. #598

Merged
merged 17 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions internal/conversation_msg/conversation_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ func (c *Conversation) Work(c2v common.Cmd2Value) {
c.doUpdateMessage(c2v)
case constant.CmSyncReactionExtensions:
case constant.CmdNotification:
c.doNotificationNew(c2v)
c.doNotification(c2v)
case constant.CmdSyncData:
c.syncData(c2v)
case constant.CmdSyncFlag:
c.syncFlag(c2v)
}
}

func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) {
func (c *Conversation) syncFlag(c2v common.Cmd2Value) {
ctx := c2v.Ctx
allMsg := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).Msgs
syncFlag := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).SyncFlag
switch syncFlag {
case constant.AppDataSyncStart:
Expand All @@ -70,15 +71,15 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) {
asyncWaitFunctions := []func(c context.Context) error{
c.group.SyncAllJoinedGroupsAndMembers,
c.friend.IncrSyncFriends,
c.IncrSyncConversations,
}
runSyncFunctions(ctx, asyncWaitFunctions, asyncWait, c.ConversationListener().OnSyncServerProgress)

syncWaitFunctions := []func(c context.Context) error{
c.IncrSyncConversations,
c.SyncAllConversationHashReadSeqs,
}
runSyncFunctions(ctx, syncWaitFunctions, syncWait, c.ConversationListener().OnSyncServerProgress)
log.ZDebug(ctx, "core data sync over", "cost time", time.Since(c.startTime).Seconds())
log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds())

asyncNoWaitFunctions := []func(c context.Context) error{
c.user.SyncLoginUserInfoWithoutNotice,
Expand Down Expand Up @@ -106,6 +107,11 @@ func (c *Conversation) doNotificationNew(c2v common.Cmd2Value) {
log.ZDebug(ctx, "MsgSyncEnd", "time", time.Since(c.startTime).Milliseconds())
c.ConversationListener().OnSyncServerFinish(false)
}
}

func (c *Conversation) doNotification(c2v common.Cmd2Value) {
ctx := c2v.Ctx
allMsg := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).Msgs

for conversationID, msgs := range allMsg {
log.ZDebug(ctx, "notification handling", "conversationID", conversationID, "msgs", msgs)
Expand Down
10 changes: 7 additions & 3 deletions internal/interaction/long_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,14 @@ func (c *LongConnMgr) sendPingMessage(ctx context.Context) {
defer c.connWrite.Unlock()
log.ZDebug(ctx, "ping Message Started", "goroutine ID:", getGoroutineID())
if c.IsConnected() {
log.ZDebug(ctx, "ping Message Started isConnected", "goroutine ID:", getGoroutineID())
c.conn.SetWriteDeadline(writeWait)
if err := c.conn.WriteMessage(PingMessage, nil); err != nil {
if err := c.conn.WriteMessage(PingMessage, []byte(utils.OperationIDGenerator())); err != nil {
log.ZWarn(ctx, "ping Message failed", err, "goroutine ID:", getGoroutineID())
return
}
} else {
log.ZDebug(ctx, "ping Message failed, connection", "connStatus", c.GetConnectionStatus(), "goroutine ID:", getGoroutineID())
}

}
Expand Down Expand Up @@ -592,8 +596,8 @@ func (c *LongConnMgr) pingHandler(_ string) error {
return c.writePongMsg()
}

func (c *LongConnMgr) pongHandler(_ string) error {
log.ZDebug(c.ctx, "server Pong Message Received")
func (c *LongConnMgr) pongHandler(appData string) error {
log.ZDebug(c.ctx, "server Pong Message Received", "appData", appData)
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,23 @@ func (m *MsgSyncer) pushTriggerAndSync(ctx context.Context, pullMsgs map[string]
func (m *MsgSyncer) doConnected(ctx context.Context) {
reinstalled := m.reinstalled
if reinstalled {
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncStart}, m.conversationCh)
common.TriggerCmdSyncFlag(m.ctx, constant.AppDataSyncStart, m.conversationCh)
} else {
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncBegin}, m.conversationCh)
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncBegin, m.conversationCh)
}
var resp sdkws.GetMaxSeqResp
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
log.ZError(m.ctx, "get max seq error", err)
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncFailed}, m.conversationCh)
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncFailed, m.conversationCh)
return
} else {
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
}
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, connectPullNums)
if reinstalled {
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.AppDataSyncFinish}, m.conversationCh)
common.TriggerCmdSyncFlag(m.ctx, constant.AppDataSyncFinish, m.conversationCh)
} else {
common.TriggerCmdNotification(m.ctx, sdk_struct.CmdNewMsgComeToConversation{SyncFlag: constant.MsgSyncEnd}, m.conversationCh)
common.TriggerCmdSyncFlag(m.ctx, constant.MsgSyncEnd, m.conversationCh)
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/common/trigger_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func TriggerCmdNotification(ctx context.Context, msg sdk_struct.CmdNewMsgComeToC
}
}

func TriggerCmdSyncFlag(ctx context.Context, syncFlag int, conversationCh chan Cmd2Value) {
c2v := Cmd2Value{Cmd: constant.CmdSyncFlag, Value: sdk_struct.CmdNewMsgComeToConversation{SyncFlag: syncFlag}, Ctx: ctx}
err := sendCmd(conversationCh, c2v, 100)
if err != nil {
log.ZWarn(ctx, "TriggerCmdNotification error", err, "syncFlag", syncFlag)
}
}

func TriggerCmdWakeUp(ch chan Cmd2Value) error {
if ch == nil {
return errs.Wrap(ErrChanNil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package constant

const (
CmdSyncData = "001"
CmdBlackList = "002"
CmdSyncFlag = "002"
CmdNotification = "003"
CmdDeleteConversation = "004"
CmdNewMsgCome = "005"
Expand Down
33 changes: 0 additions & 33 deletions sdk_struct/sdk_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,36 +331,3 @@ type CmdPushMsgToMsgSync struct {
type CmdMaxSeqToMsgSync struct {
ConversationMaxSeqOnSvr map[string]int64
}

type CmdJoinedSuperGroup struct {
OperationID string
}

type OANotificationElem struct {
NotificationName string `mapstructure:"notificationName" validate:"required"`
NotificationFaceURL string `mapstructure:"notificationFaceURL" validate:"required"`
NotificationType int32 `mapstructure:"notificationType" validate:"required"`
Text string `mapstructure:"text" validate:"required"`
Url string `mapstructure:"url"`
MixType int32 `mapstructure:"mixType"`
Image struct {
SourceUrl string `mapstructure:"sourceURL"`
SnapshotUrl string `mapstructure:"snapshotURL"`
} `mapstructure:"image"`
Video struct {
SourceUrl string `mapstructure:"sourceURL"`
SnapshotUrl string `mapstructure:"snapshotURL"`
Duration int64 `mapstructure:"duration"`
} `mapstructure:"video"`
File struct {
SourceUrl string `mapstructure:"sourceURL"`
FileName string `mapstructure:"fileName"`
FileSize int64 `mapstructure:"fileSize"`
} `mapstructure:"file"`
Ex string `mapstructure:"ex"`
}
type MsgDeleteNotificationElem struct {
GroupID string `json:"groupID"`
IsAllDelete bool `json:"isAllDelete"`
SeqList []string `json:"seqList"`
}
Loading