From d1d029c781161e50b34b5b5ed34998acf8f18128 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 10:23:51 +0800 Subject: [PATCH 01/21] update trigger const. --- pkg/constant/constant.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index 97e709be3..4f779234f 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -18,7 +18,7 @@ const ( CmdSyncData = "001" CmdSyncFlag = "002" CmdNotification = "003" - CmdDeleteConversation = "004" + CmdMsgSyncInReinstall = "004" CmdNewMsgCome = "005" CmdSuperGroupMsgCome = "006" CmdUpdateConversation = "007" From 8ecb1774b65dad9583d89c77b53525a29b79a171 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 15:19:21 +0800 Subject: [PATCH 02/21] feat: implement more accurate progress. --- internal/conversation_msg/conversation_msg.go | 138 +++++++++++++++++- .../conversation_notification.go | 30 ++-- 2 files changed, 154 insertions(+), 14 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 747927614..3fb59bf93 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "math" + "sync" "github.com/openimsdk/openim-sdk-core/v3/internal/business" "github.com/openimsdk/openim-sdk-core/v3/internal/cache" @@ -80,6 +81,10 @@ type Conversation struct { full *full.Full maxSeqRecorder MaxSeqRecorder IsExternalExtensions bool + offsetByReinstalled int + syncFinishCH chan struct{} + progress int + mu sync.Mutex startTime time.Time @@ -117,6 +122,9 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr, messageController: NewMessageController(db, ch), IsExternalExtensions: info.IsExternalExtensions(), maxSeqRecorder: NewMaxSeqRecorder(), + offsetByReinstalled: 0, + syncFinishCH: make(chan struct{}), + progress: 0, } n.typing = newTyping(n) n.initSyncer() @@ -441,12 +449,134 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) } +func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { + allMsg := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Msgs + ctx := c2v.Ctx + // msgLen := len(allMsg) + total := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Total + + var isTriggerUnReadCount bool + insertMsg := make(map[string][]*model_struct.LocalChatLog, 10) + // var exceptionMsg []*model_struct.LocalErrChatLog + conversationSet := make(map[string]*model_struct.LocalConversation) + + //var unreadMessages []*model_struct.LocalConversationUnreadMessage + log.ZDebug(ctx, "message come here conversation ch", "conversation length", len(allMsg)) + b := time.Now() + + for conversationID, msgs := range allMsg { + log.ZDebug(ctx, "parse message in one conversation", "conversationID", + conversationID, "message length", len(msgs.Msgs)) + var insertMessage []*model_struct.LocalChatLog + for _, v := range msgs.Msgs { + log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) + msg := &sdk_struct.MsgStruct{} + // TODO + copier.Copy(msg, v) + msg.Content = string(v.Content) + var attachedInfo sdk_struct.AttachedInfoElem + _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo) + msg.AttachedInfoElem = &attachedInfo + msg.Status = constant.MsgStatusSendSuccess + + err := c.msgHandleByContentType(msg) + if err != nil { + log.ZError(ctx, "Parsing data error:", err, "type: ", msg.ContentType, "msg", msg) + continue + } + + // //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update. + // if msg.Status == constant.MsgStatusHasDeleted { + // insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) + // continue + // } + + // if msg.ClientMsgID == "" { + // exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg)) + // continue + // } + // if conversationID == "" { + // log.ZError(ctx, "conversationID is empty", errors.New("conversationID is empty"), "msg", msg) + // continue + // } + + insertMsg[conversationID] = append(insertMessage, c.msgStructToLocalChatLog(msg)) + + lc := model_struct.LocalConversation{ + ConversationType: v.SessionType, + LatestMsg: utils.StructToJsonString(msg), + LatestMsgSendTime: msg.SendTime, + ConversationID: conversationID, + } + switch v.SessionType { + case constant.SingleChatType: + lc.UserID = v.SendID + lc.ShowName = msg.SenderNickname + lc.FaceURL = msg.SenderFaceURL + case constant.GroupChatType, constant.SuperGroupChatType: + lc.GroupID = v.GroupID + case constant.NotificationChatType: + lc.UserID = v.SendID + } + + conversationSet[conversationID] = &lc + } + + // message storage + _ = c.messageController.BatchInsertMessageList(ctx, insertMsg) + + // conversation storage + if err := c.db.BatchInsertConversationList(ctx, mapConversationToList(conversationSet)); err != nil { + log.ZError(ctx, "insert new conversation err:", err) + } + log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) + + if isTriggerUnReadCount { + c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.TotalUnreadMessageChanged, Args: ""}}) + } + + for _, msgs := range allMsg { + for _, msg := range msgs.Msgs { + if msg.ContentType == constant.Typing { + c.typing.onNewMsg(ctx, msg) + } + } + } + log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) + + } + c.offsetByReinstalled += len(insertMsg) + c.AddProgress(c.offsetByReinstalled / total * 90) + c.ConversationListener().OnSyncServerProgress(c.getProgress()) + select { + case <-c.syncFinishCH: + return + default: + + } +} + +func (c *Conversation) AddProgress(progress int) { + c.mu.Lock() + defer c.mu.Unlock() + c.progress += progress + if c.progress > 100 { + c.progress = 100 + } +} + +func (c *Conversation) getProgress() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.progress +} + func listToMap(list []*model_struct.LocalConversation, m map[string]*model_struct.LocalConversation) { for _, v := range list { m[v.ConversationID] = v } - } + func (c *Conversation) diff(ctx context.Context, local, generated, cc, nc map[string]*model_struct.LocalConversation) { var newConversations []*model_struct.LocalConversation for _, v := range generated { @@ -474,6 +604,7 @@ func (c *Conversation) diff(ctx context.Context, local, generated, cc, nc map[st } } } + func (c *Conversation) genConversationGroupAtType(lc *model_struct.LocalConversation, s *sdk_struct.MsgStruct) { if s.ContentType == constant.AtText { tagMe := utils.IsContain(c.loginUserID, s.AtTextElem.AtUserList) @@ -870,6 +1001,7 @@ func (c *Conversation) msgHandleByContentType(msg *sdk_struct.MsgStruct) (err er return utils.Wrap(err, "") } + func (c *Conversation) updateConversation(lc *model_struct.LocalConversation, cs map[string]*model_struct.LocalConversation) { if oldC, ok := cs[lc.ConversationID]; !ok { cs[lc.ConversationID] = lc @@ -926,12 +1058,14 @@ func (c *Conversation) updateConversation(lc *model_struct.LocalConversation, cs //} } + func mapConversationToList(m map[string]*model_struct.LocalConversation) (cs []*model_struct.LocalConversation) { for _, v := range m { cs = append(cs, v) } return cs } + func (c *Conversation) addFaceURLAndName(ctx context.Context, lc *model_struct.LocalConversation) error { switch lc.ConversationType { case constant.SingleChatType, constant.NotificationChatType: @@ -1000,6 +1134,7 @@ func (c *Conversation) batchAddFaceURLAndName(ctx context.Context, conversations } return nil } + func (c *Conversation) batchGetUserNameAndFaceURL(ctx context.Context, userIDs ...string) (map[string]*user.BasicInfo, error) { m := make(map[string]*user.BasicInfo) @@ -1050,6 +1185,7 @@ func (c *Conversation) batchGetUserNameAndFaceURL(ctx context.Context, userIDs . } return m, nil } + func (c *Conversation) getUserNameAndFaceURL(ctx context.Context, userID string) (faceURL, name string, err error) { //find in cache if value, ok := c.user.UserBasicCache.Load(userID); ok { diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 32a34c85e..601fc2779 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -57,6 +57,8 @@ func (c *Conversation) Work(c2v common.Cmd2Value) { c.syncData(c2v) case constant.CmdSyncFlag: c.syncFlag(c2v) + case constant.CmdMsgSyncInReinstall: + c.doMsgSyncByReinstalled(c2v) } } @@ -72,13 +74,13 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.group.SyncAllJoinedGroupsAndMembers, c.friend.IncrSyncFriends, } - runSyncFunctions(ctx, asyncWaitFunctions, asyncWait, c.ConversationListener().OnSyncServerProgress) + runSyncFunctions(ctx, asyncWaitFunctions, asyncWait, 4, c.ConversationListener().OnSyncServerProgress) // progress is add 4 percent syncWaitFunctions := []func(c context.Context) error{ c.IncrSyncConversations, c.SyncAllConversationHashReadSeqs, } - runSyncFunctions(ctx, syncWaitFunctions, syncWait, c.ConversationListener().OnSyncServerProgress) + runSyncFunctions(ctx, syncWaitFunctions, syncWait, 6, c.ConversationListener().OnSyncServerProgress) // progress is add 6 percent log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) asyncNoWaitFunctions := []func(c context.Context) error{ @@ -90,17 +92,18 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.group.SyncAllSelfGroupApplicationWithoutNotice, c.user.SyncAllCommandWithoutNotice, } - runSyncFunctions(ctx, asyncNoWaitFunctions, asyncNoWait, c.ConversationListener().OnSyncServerProgress) + runSyncFunctions(ctx, asyncNoWaitFunctions, 0, asyncNoWait, nil) // avoid current progress case constant.AppDataSyncFinish: log.ZDebug(ctx, "AppDataSyncFinish", "time", time.Since(c.startTime).Milliseconds()) c.ConversationListener().OnSyncServerFinish(true) + c.progress = 100 + c.syncFinishCH <- struct{}{} // send to `doMsgSyncByReinstalled` case constant.MsgSyncBegin: log.ZDebug(ctx, "MsgSyncBegin") c.ConversationListener().OnSyncServerStart(false) c.syncData(c2v) - case constant.MsgSyncFailed: c.ConversationListener().OnSyncServerFailed(false) case constant.MsgSyncEnd: @@ -376,7 +379,7 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) { c.SyncAllConversationHashReadSeqs, } - runSyncFunctions(ctx, syncFuncs, syncWait, nil) + runSyncFunctions(ctx, syncFuncs, syncWait, 0, nil) // Asynchronous sync functions asyncFuncs := []func(c context.Context) error{ @@ -392,22 +395,22 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) { c.IncrSyncConversations, } - runSyncFunctions(ctx, asyncFuncs, asyncNoWait, nil) + runSyncFunctions(ctx, asyncFuncs, asyncNoWait, 0, nil) } -func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error, mode int, progressCallback func(progress int)) { +func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error, mode int, currentProgress int, progressCallback func(progress int)) { totalFuncs := len(funcs) var wg sync.WaitGroup - for i, fn := range funcs { + for _, fn := range funcs { switch mode { case asyncWait: wg.Add(1) - go executeSyncFunction(ctx, fn, i, totalFuncs, progressCallback, &wg) + go executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, &wg) case asyncNoWait: - go executeSyncFunction(ctx, fn, i, totalFuncs, progressCallback, nil) + go executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, nil) case syncWait: - executeSyncFunction(ctx, fn, i, totalFuncs, progressCallback, nil) + executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, nil) } } @@ -416,7 +419,7 @@ func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error } } -func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, index, total int, progressCallback func(progress int), wg *sync.WaitGroup) { +func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, currentProgress int, total int, progressCallback func(progress int), wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -431,7 +434,8 @@ func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, log.ZDebug(ctx, fmt.Sprintf("%s completed successfully", funcName), "duration", duration.Seconds()) } if progressCallback != nil { - progress := int(float64(index+1) / float64(total) * 100) + // progress := int(float64(currentProgress+1) / float64(total) * 100 / 10) + progress := int(currentProgress / 2) if progress == 0 { progress = 1 } From 0c4a9a457e116437c620107e945e57f8776629dc Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:04:46 +0800 Subject: [PATCH 03/21] optimize method implement. --- internal/conversation_msg/conversation_msg.go | 158 +++++++++--------- .../conversation_notification.go | 37 ++-- internal/conversation_msg/message_check.go | 1 + 3 files changed, 92 insertions(+), 104 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 3fb59bf93..88331aa1e 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -19,7 +19,6 @@ import ( "encoding/json" "errors" "math" - "sync" "github.com/openimsdk/openim-sdk-core/v3/internal/business" "github.com/openimsdk/openim-sdk-core/v3/internal/cache" @@ -81,10 +80,7 @@ type Conversation struct { full *full.Full maxSeqRecorder MaxSeqRecorder IsExternalExtensions bool - offsetByReinstalled int - syncFinishCH chan struct{} progress int - mu sync.Mutex startTime time.Time @@ -122,8 +118,6 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr, messageController: NewMessageController(db, ch), IsExternalExtensions: info.IsExternalExtensions(), maxSeqRecorder: NewMaxSeqRecorder(), - offsetByReinstalled: 0, - syncFinishCH: make(chan struct{}), progress: 0, } n.typing = newTyping(n) @@ -250,6 +244,11 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo) msg.AttachedInfoElem = &attachedInfo + //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update. + if msg.Status == constant.MsgStatusHasDeleted { + insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) + continue + } msg.Status = constant.MsgStatusSendSuccess // msg.IsRead = false //De-analyze data @@ -258,11 +257,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { log.ZError(ctx, "Parsing data error:", err, "type: ", msg.ContentType, "msg", msg) continue } - //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update. - if msg.Status == constant.MsgStatusHasDeleted { - insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) - continue - } + if !isNotPrivate { msg.AttachedInfoElem.IsPrivateChat = true } @@ -452,31 +447,34 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) { func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { allMsg := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Msgs ctx := c2v.Ctx - // msgLen := len(allMsg) + msgLen := len(allMsg) total := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Total - var isTriggerUnReadCount bool insertMsg := make(map[string][]*model_struct.LocalChatLog, 10) - // var exceptionMsg []*model_struct.LocalErrChatLog - conversationSet := make(map[string]*model_struct.LocalConversation) + conversationList := make([]*model_struct.LocalConversation, 0) - //var unreadMessages []*model_struct.LocalConversationUnreadMessage - log.ZDebug(ctx, "message come here conversation ch", "conversation length", len(allMsg)) + log.ZDebug(ctx, "message come here conversation ch", "conversation length", msgLen) b := time.Now() for conversationID, msgs := range allMsg { log.ZDebug(ctx, "parse message in one conversation", "conversationID", - conversationID, "message length", len(msgs.Msgs)) - var insertMessage []*model_struct.LocalChatLog + conversationID, "message length", msgLen) + var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog for _, v := range msgs.Msgs { log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) msg := &sdk_struct.MsgStruct{} - // TODO + // TODO need replace when after. copier.Copy(msg, v) msg.Content = string(v.Content) var attachedInfo sdk_struct.AttachedInfoElem _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo) msg.AttachedInfoElem = &attachedInfo + + //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update. + if msg.Status == constant.MsgStatusHasDeleted { + insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) + continue + } msg.Status = constant.MsgStatusSendSuccess err := c.msgHandleByContentType(msg) @@ -485,80 +483,76 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { continue } - // //When the message has been marked and deleted by the cloud, it is directly inserted locally without any conversation and message update. - // if msg.Status == constant.MsgStatusHasDeleted { - // insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg)) - // continue - // } - - // if msg.ClientMsgID == "" { - // exceptionMsg = append(exceptionMsg, c.msgStructToLocalErrChatLog(msg)) - // continue - // } - // if conversationID == "" { - // log.ZError(ctx, "conversationID is empty", errors.New("conversationID is empty"), "msg", msg) - // continue - // } - - insertMsg[conversationID] = append(insertMessage, c.msgStructToLocalChatLog(msg)) - - lc := model_struct.LocalConversation{ - ConversationType: v.SessionType, - LatestMsg: utils.StructToJsonString(msg), - LatestMsgSendTime: msg.SendTime, - ConversationID: conversationID, - } - switch v.SessionType { - case constant.SingleChatType: - lc.UserID = v.SendID - lc.ShowName = msg.SenderNickname - lc.FaceURL = msg.SenderFaceURL - case constant.GroupChatType, constant.SuperGroupChatType: - lc.GroupID = v.GroupID - case constant.NotificationChatType: - lc.UserID = v.SendID + if conversationID == "" { + log.ZError(ctx, "conversationID is empty", errors.New("conversationID is empty"), "msg", msg) + continue } - conversationSet[conversationID] = &lc - } - - // message storage - _ = c.messageController.BatchInsertMessageList(ctx, insertMsg) - - // conversation storage - if err := c.db.BatchInsertConversationList(ctx, mapConversationToList(conversationSet)); err != nil { - log.ZError(ctx, "insert new conversation err:", err) - } - log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - - if isTriggerUnReadCount { - c.doUpdateConversation(common.Cmd2Value{Value: common.UpdateConNode{Action: constant.TotalUnreadMessageChanged, Args: ""}}) - } + log.ZDebug(ctx, "decode message", "msg", msg) + if v.SendID == c.loginUserID { + // Messages sent by myself //if sent through this terminal + log.ZInfo(ctx, "sync message", "msg", msg) + lc := model_struct.LocalConversation{ + ConversationType: v.SessionType, + LatestMsg: utils.StructToJsonString(msg), + LatestMsgSendTime: msg.SendTime, + ConversationID: conversationID, + } + switch v.SessionType { + case constant.SingleChatType: + lc.UserID = v.RecvID + case constant.GroupChatType, constant.SuperGroupChatType: + lc.GroupID = v.GroupID + } + selfInsertMessage = append(selfInsertMessage, c.msgStructToLocalChatLog(msg)) + } else { //Sent by others + if _, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation + lc := model_struct.LocalConversation{ + ConversationType: v.SessionType, + LatestMsg: utils.StructToJsonString(msg), + LatestMsgSendTime: msg.SendTime, + ConversationID: conversationID, + } + switch v.SessionType { + case constant.SingleChatType: + lc.UserID = v.SendID + lc.ShowName = msg.SenderNickname + lc.FaceURL = msg.SenderFaceURL + case constant.GroupChatType, constant.SuperGroupChatType: + lc.GroupID = v.GroupID + case constant.NotificationChatType: + lc.UserID = v.SendID + } + othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) + } - for _, msgs := range allMsg { - for _, msg := range msgs.Msgs { - if msg.ContentType == constant.Typing { - c.typing.onNewMsg(ctx, msg) + lc := model_struct.LocalConversation{ + ConversationType: v.SessionType, + LatestMsg: utils.StructToJsonString(msg), + LatestMsgSendTime: msg.SendTime, + ConversationID: conversationID, } + conversationList = append(conversationList, &lc) } } - log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) + insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) + } + // message storage + _ = c.messageController.BatchUpdateMessageList(ctx, insertMsg) + + // conversation storage + if err := c.db.BatchInsertConversationList(ctx, conversationList); err != nil { + log.ZError(ctx, "insert new conversation err:", err) } - c.offsetByReinstalled += len(insertMsg) - c.AddProgress(c.offsetByReinstalled / total * 90) + log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) + + c.AddProgress(msgLen / total * 90) c.ConversationListener().OnSyncServerProgress(c.getProgress()) - select { - case <-c.syncFinishCH: - return - default: - } } func (c *Conversation) AddProgress(progress int) { - c.mu.Lock() - defer c.mu.Unlock() c.progress += progress if c.progress > 100 { c.progress = 100 @@ -566,8 +560,6 @@ func (c *Conversation) AddProgress(progress int) { } func (c *Conversation) getProgress() int { - c.mu.Lock() - defer c.mu.Unlock() return c.progress } diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 0d1b04b9b..3c4e9575c 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -74,14 +74,18 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.group.SyncAllJoinedGroupsAndMembers, c.friend.IncrSyncFriends, } - runSyncFunctions(ctx, asyncWaitFunctions, asyncWait, 4, c.ConversationListener().OnSyncServerProgress) // progress is add 4 percent + runSyncFunctions(ctx, asyncWaitFunctions, asyncWait) + c.AddProgress(4) // add 4 percent in progress + c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress syncWaitFunctions := []func(c context.Context) error{ c.IncrSyncConversations, c.SyncAllConversationHashReadSeqs, } - runSyncFunctions(ctx, syncWaitFunctions, syncWait, 6, c.ConversationListener().OnSyncServerProgress) // progress is add 6 percent + runSyncFunctions(ctx, syncWaitFunctions, syncWait) log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) + c.AddProgress(6) // add 6 percent in progress + c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress asyncNoWaitFunctions := []func(c context.Context) error{ c.user.SyncLoginUserInfoWithoutNotice, @@ -92,13 +96,13 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.group.SyncAllSelfGroupApplicationWithoutNotice, c.user.SyncAllCommandWithoutNotice, } - runSyncFunctions(ctx, asyncNoWaitFunctions, 0, asyncNoWait, nil) // avoid current progress + runSyncFunctions(ctx, asyncNoWaitFunctions, asyncNoWait) case constant.AppDataSyncFinish: log.ZDebug(ctx, "AppDataSyncFinish", "time", time.Since(c.startTime).Milliseconds()) - c.ConversationListener().OnSyncServerFinish(true) c.progress = 100 - c.syncFinishCH <- struct{}{} // send to `doMsgSyncByReinstalled` + c.ConversationListener().OnSyncServerProgress(c.progress) + c.ConversationListener().OnSyncServerFinish(true) case constant.MsgSyncBegin: log.ZDebug(ctx, "MsgSyncBegin") c.ConversationListener().OnSyncServerStart(false) @@ -379,7 +383,7 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) { c.SyncAllConversationHashReadSeqs, } - runSyncFunctions(ctx, syncFuncs, syncWait, 0, nil) + runSyncFunctions(ctx, syncFuncs, syncWait) // Asynchronous sync functions asyncFuncs := []func(c context.Context) error{ @@ -395,22 +399,21 @@ func (c *Conversation) syncData(c2v common.Cmd2Value) { c.IncrSyncConversations, } - runSyncFunctions(ctx, asyncFuncs, asyncNoWait, 0, nil) + runSyncFunctions(ctx, asyncFuncs, asyncNoWait) } -func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error, mode int, currentProgress int, progressCallback func(progress int)) { - totalFuncs := len(funcs) +func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error, mode int) { var wg sync.WaitGroup for _, fn := range funcs { switch mode { case asyncWait: wg.Add(1) - go executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, &wg) + go executeSyncFunction(ctx, fn, &wg) case asyncNoWait: - go executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, nil) + go executeSyncFunction(ctx, fn, nil) case syncWait: - executeSyncFunction(ctx, fn, currentProgress, totalFuncs, progressCallback, nil) + executeSyncFunction(ctx, fn, nil) } } @@ -419,7 +422,7 @@ func runSyncFunctions(ctx context.Context, funcs []func(c context.Context) error } } -func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, currentProgress int, total int, progressCallback func(progress int), wg *sync.WaitGroup) { +func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -433,14 +436,6 @@ func executeSyncFunction(ctx context.Context, fn func(c context.Context) error, } else { log.ZDebug(ctx, fmt.Sprintf("%s completed successfully", funcName), "duration", duration.Seconds()) } - if progressCallback != nil { - // progress := int(float64(currentProgress+1) / float64(total) * 100 / 10) - progress := int(currentProgress / 2) - if progress == 0 { - progress = 1 - } - progressCallback(progress) - } } func (c *Conversation) doUpdateMessage(c2v common.Cmd2Value) { diff --git a/internal/conversation_msg/message_check.go b/internal/conversation_msg/message_check.go index fae918405..4747e48c8 100644 --- a/internal/conversation_msg/message_check.go +++ b/internal/conversation_msg/message_check.go @@ -309,6 +309,7 @@ func (c *Conversation) faceURLAndNicknameHandle(ctx context.Context, self, other } return append(self, others...) } + func (c *Conversation) singleHandle(ctx context.Context, self, others []*model_struct.LocalChatLog, lc *model_struct.LocalConversation) { userInfo, err := c.db.GetLoginUser(ctx, c.loginUserID) if err == nil { From ce6a0936a29bff12dbeced595e630e6d9cecbf95 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:19:37 +0800 Subject: [PATCH 04/21] update conversation List logic. --- internal/conversation_msg/conversation_msg.go | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 88331aa1e..72dd89c23 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -498,41 +498,22 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { LatestMsgSendTime: msg.SendTime, ConversationID: conversationID, } - switch v.SessionType { - case constant.SingleChatType: - lc.UserID = v.RecvID - case constant.GroupChatType, constant.SuperGroupChatType: - lc.GroupID = v.GroupID - } + + conversationList = append(conversationList, &lc) + selfInsertMessage = append(selfInsertMessage, c.msgStructToLocalChatLog(msg)) } else { //Sent by others if _, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation + othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) + lc := model_struct.LocalConversation{ ConversationType: v.SessionType, LatestMsg: utils.StructToJsonString(msg), LatestMsgSendTime: msg.SendTime, ConversationID: conversationID, } - switch v.SessionType { - case constant.SingleChatType: - lc.UserID = v.SendID - lc.ShowName = msg.SenderNickname - lc.FaceURL = msg.SenderFaceURL - case constant.GroupChatType, constant.SuperGroupChatType: - lc.GroupID = v.GroupID - case constant.NotificationChatType: - lc.UserID = v.SendID - } - othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) - } - - lc := model_struct.LocalConversation{ - ConversationType: v.SessionType, - LatestMsg: utils.StructToJsonString(msg), - LatestMsgSendTime: msg.SendTime, - ConversationID: conversationID, + conversationList = append(conversationList, &lc) } - conversationList = append(conversationList, &lc) } } insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) @@ -542,7 +523,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { _ = c.messageController.BatchUpdateMessageList(ctx, insertMsg) // conversation storage - if err := c.db.BatchInsertConversationList(ctx, conversationList); err != nil { + if err := c.db.BatchUpdateConversationList(ctx, conversationList); err != nil { log.ZError(ctx, "insert new conversation err:", err) } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) From 5017cf64014b87ccdbcd23cb308b2f7a4ade31a7 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:35:22 +0800 Subject: [PATCH 05/21] update method logic. --- internal/conversation_msg/conversation_msg.go | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 72dd89c23..e2df4f94e 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -460,6 +460,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { log.ZDebug(ctx, "parse message in one conversation", "conversationID", conversationID, "message length", msgLen) var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog + var localConversation *model_struct.LocalConversation for _, v := range msgs.Msgs { log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) msg := &sdk_struct.MsgStruct{} @@ -498,24 +499,24 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { LatestMsgSendTime: msg.SendTime, ConversationID: conversationID, } - - conversationList = append(conversationList, &lc) + localConversation = &lc selfInsertMessage = append(selfInsertMessage, c.msgStructToLocalChatLog(msg)) } else { //Sent by others - if _, err := c.db.GetMessage(ctx, conversationID, msg.ClientMsgID); err != nil { //Deduplication operation - othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) + othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) - lc := model_struct.LocalConversation{ - ConversationType: v.SessionType, - LatestMsg: utils.StructToJsonString(msg), - LatestMsgSendTime: msg.SendTime, - ConversationID: conversationID, - } - conversationList = append(conversationList, &lc) + lc := model_struct.LocalConversation{ + ConversationType: v.SessionType, + LatestMsg: utils.StructToJsonString(msg), + LatestMsgSendTime: msg.SendTime, + ConversationID: conversationID, } + localConversation = &lc + } } + conversationList = append(conversationList, localConversation) + insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) } From b980b3a81d6d20eaf3053da79f86787e7904dccc Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:44:48 +0800 Subject: [PATCH 06/21] update conversation logic. --- internal/conversation_msg/conversation_msg.go | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index e2df4f94e..293b23dfe 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -460,7 +460,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { log.ZDebug(ctx, "parse message in one conversation", "conversationID", conversationID, "message length", msgLen) var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog - var localConversation *model_struct.LocalConversation + var latestMsg *sdk_struct.MsgStruct for _, v := range msgs.Msgs { log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) msg := &sdk_struct.MsgStruct{} @@ -493,29 +493,21 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { if v.SendID == c.loginUserID { // Messages sent by myself //if sent through this terminal log.ZInfo(ctx, "sync message", "msg", msg) - lc := model_struct.LocalConversation{ - ConversationType: v.SessionType, - LatestMsg: utils.StructToJsonString(msg), - LatestMsgSendTime: msg.SendTime, - ConversationID: conversationID, - } - localConversation = &lc + + latestMsg = msg selfInsertMessage = append(selfInsertMessage, c.msgStructToLocalChatLog(msg)) } else { //Sent by others othersInsertMessage = append(othersInsertMessage, c.msgStructToLocalChatLog(msg)) - lc := model_struct.LocalConversation{ - ConversationType: v.SessionType, - LatestMsg: utils.StructToJsonString(msg), - LatestMsgSendTime: msg.SendTime, - ConversationID: conversationID, - } - localConversation = &lc - + latestMsg = msg } } - conversationList = append(conversationList, localConversation) + conversationList = append(conversationList, &model_struct.LocalConversation{ + LatestMsg: utils.StructToJsonString(latestMsg), + LatestMsgSendTime: latestMsg.SendTime, + ConversationID: conversationID, + }) insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) } From 887922bf3c39cad17018d307073592974dba33ff Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:45:58 +0800 Subject: [PATCH 07/21] update BatchInsertMessageList. --- internal/conversation_msg/conversation_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 293b23dfe..5140bc363 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -513,7 +513,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } // message storage - _ = c.messageController.BatchUpdateMessageList(ctx, insertMsg) + _ = c.messageController.BatchInsertMessageList(ctx, insertMsg) // conversation storage if err := c.db.BatchUpdateConversationList(ctx, conversationList); err != nil { From b711208ec0589b2a50b3ddf852ea2cf2bce4a3a2 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Mon, 22 Jul 2024 18:47:26 +0800 Subject: [PATCH 08/21] update addProgress to private. --- internal/conversation_msg/conversation_msg.go | 4 ++-- internal/conversation_msg/conversation_notification.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 5140bc363..605172b79 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -521,12 +521,12 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - c.AddProgress(msgLen / total * 90) + c.addProgress(msgLen / total * 90) c.ConversationListener().OnSyncServerProgress(c.getProgress()) } -func (c *Conversation) AddProgress(progress int) { +func (c *Conversation) addProgress(progress int) { c.progress += progress if c.progress > 100 { c.progress = 100 diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 3c4e9575c..f128c85c0 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -75,7 +75,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.friend.IncrSyncFriends, } runSyncFunctions(ctx, asyncWaitFunctions, asyncWait) - c.AddProgress(4) // add 4 percent in progress + c.addProgress(4) // add 4 percent in progress c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress syncWaitFunctions := []func(c context.Context) error{ @@ -84,7 +84,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { } runSyncFunctions(ctx, syncWaitFunctions, syncWait) log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) - c.AddProgress(6) // add 6 percent in progress + c.addProgress(6) // add 6 percent in progress c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress asyncNoWaitFunctions := []func(c context.Context) error{ From a220b3f818148068e5498c4a876d112de66a64ba Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 15:37:25 +0800 Subject: [PATCH 09/21] fix update loss contents. --- internal/interaction/msg_sync.go | 7 +++++-- pkg/common/trigger_channel.go | 2 +- pkg/db/chat_log_model.go | 6 ++++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index 249fd3798..5355b99e2 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -16,10 +16,12 @@ package interaction import ( "context" - "golang.org/x/sync/errgroup" "strings" "sync" + "golang.org/x/sync/errgroup" + "gorm.io/gorm" + "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" "github.com/openimsdk/openim-sdk-core/v3/pkg/db/db_interface" @@ -207,7 +209,8 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma } err := m.db.BatchInsertNotificationSeq(ctx, notificationSeqs) - if err != nil { + + if err != nil && errs.Unwrap(err) != gorm.ErrEmptySlice { log.ZWarn(ctx, "BatchInsertNotificationSeq err", err) } for conversationID, maxSeq := range messagesSeqMap { diff --git a/pkg/common/trigger_channel.go b/pkg/common/trigger_channel.go index 880b54e4f..260c36ee4 100644 --- a/pkg/common/trigger_channel.go +++ b/pkg/common/trigger_channel.go @@ -44,7 +44,7 @@ func TriggerCmdMsgSyncInReinstall(ctx context.Context, msg sdk_struct.CmdMsgSync return errs.Wrap(ErrChanNil) } - c2v := Cmd2Value{Cmd: constant.CmdNewMsgCome, Value: msg, Ctx: ctx} + c2v := Cmd2Value{Cmd: constant.CmdMsgSyncInReinstall, Value: msg, Ctx: ctx} return sendCmd(conversationCh, c2v, 100) } diff --git a/pkg/db/chat_log_model.go b/pkg/db/chat_log_model.go index 9dbcb1ac1..28809d0bd 100644 --- a/pkg/db/chat_log_model.go +++ b/pkg/db/chat_log_model.go @@ -96,6 +96,12 @@ func (d *DataBase) UpdateMessageBySeq(ctx context.Context, conversationID string } func (d *DataBase) BatchInsertMessageList(ctx context.Context, conversationID string, MessageList []*model_struct.LocalChatLog) error { + err := d.initChatLog(ctx, conversationID) + if err != nil { + log.ZWarn(ctx, "initChatLog err", err) + return err + } + if MessageList == nil { return nil } From f8471d89483a9d62307d34f6eb71aab11f77728d Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 15:42:19 +0800 Subject: [PATCH 10/21] update condition logic. --- internal/interaction/long_conn_mgr.go | 2 +- internal/interaction/msg_sync.go | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/interaction/long_conn_mgr.go b/internal/interaction/long_conn_mgr.go index d08f584b8..9f22c0eba 100644 --- a/internal/interaction/long_conn_mgr.go +++ b/internal/interaction/long_conn_mgr.go @@ -384,7 +384,7 @@ func (c *LongConnMgr) sendAndWaitResp(msg *GeneralWsReq) (*GeneralWsResp, error) select { case resp := <-tempChan: return resp, nil - case <-time.After(time.Second * 5): + case <-time.After(time.Second * 10): return nil, sdkerrs.ErrNetworkTimeOut } diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index 5355b99e2..86ef5d39c 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -20,7 +20,6 @@ import ( "sync" "golang.org/x/sync/errgroup" - "gorm.io/gorm" "github.com/openimsdk/openim-sdk-core/v3/pkg/common" "github.com/openimsdk/openim-sdk-core/v3/pkg/constant" @@ -208,11 +207,13 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma m.syncedMaxSeqs[conversationID] = seq } - err := m.db.BatchInsertNotificationSeq(ctx, notificationSeqs) - - if err != nil && errs.Unwrap(err) != gorm.ErrEmptySlice { - log.ZWarn(ctx, "BatchInsertNotificationSeq err", err) + if len(notificationSeqs) > 0 { + err := m.db.BatchInsertNotificationSeq(ctx, notificationSeqs) + if err != nil { + log.ZWarn(ctx, "BatchInsertNotificationSeq err", err) + } } + for conversationID, maxSeq := range messagesSeqMap { if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok { if maxSeq > syncedMaxSeq { From 065b48e2fedd52bd526dc487d58d04e588cf8a33 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 17:41:34 +0800 Subject: [PATCH 11/21] fix: remove uncorrect use ctx. --- internal/conversation_msg/conversation_msg.go | 28 +++++++++++++++---- internal/interaction/msg_sync.go | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index f8d03ba2d..d7222a712 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -40,6 +40,7 @@ import ( "github.com/openimsdk/openim-sdk-core/v3/pkg/syncer" pbConversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" @@ -457,10 +458,15 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { for conversationID, msgs := range allMsg { log.ZDebug(ctx, "parse message in one conversation", "conversationID", - conversationID, "message length", msgLen) + conversationID, "message length", len(msgs.Msgs)) var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog var latestMsg *sdk_struct.MsgStruct + if len(msgs.Msgs) == 0 { + log.ZWarn(ctx, "msg.Msgs is empty", errs.New("msg.Msgs is empty"), "conversationID", conversationID) + continue + } for _, v := range msgs.Msgs { + log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v) msg := &sdk_struct.MsgStruct{} // TODO need replace when after. @@ -502,11 +508,21 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { latestMsg = msg } } - conversationList = append(conversationList, &model_struct.LocalConversation{ - LatestMsg: utils.StructToJsonString(latestMsg), - LatestMsgSendTime: latestMsg.SendTime, - ConversationID: conversationID, - }) + + if latestMsg != nil { + conversationList = append(conversationList, &model_struct.LocalConversation{ + LatestMsg: utils.StructToJsonString(latestMsg), + LatestMsgSendTime: latestMsg.SendTime, + ConversationID: conversationID, + }) + } else { + log.ZWarn(ctx, "latestMsg is nil", errs.New("latestMsg is nil"), "conversationID", conversationID) + conversationList = append(conversationList, &model_struct.LocalConversation{ + LatestMsg: utils.StructToJsonString(latestMsg), + LatestMsgSendTime: 0, + ConversationID: conversationID, + }) + } insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) } diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index 86ef5d39c..a1f341aab 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -389,7 +389,7 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ total = len(seqMap) gr *errgroup.Group ) - gr, ctx = errgroup.WithContext(ctx) + gr, _ = errgroup.WithContext(ctx) gr.SetLimit(pullMsgGoroutineLimit) for k, v := range seqMap { oneConversationSyncNum := min(v[1]-v[0]+1, syncMsgNum) From 21c9084ce4a0e61100b342130df896ddb42a666b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 17:44:24 +0800 Subject: [PATCH 12/21] remove unnecesary content. --- internal/conversation_msg/conversation_msg.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index d7222a712..7cf668812 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -517,11 +517,6 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { }) } else { log.ZWarn(ctx, "latestMsg is nil", errs.New("latestMsg is nil"), "conversationID", conversationID) - conversationList = append(conversationList, &model_struct.LocalConversation{ - LatestMsg: utils.StructToJsonString(latestMsg), - LatestMsgSendTime: 0, - ConversationID: conversationID, - }) } insertMsg[conversationID] = append(insertMessage, c.faceURLAndNicknameHandle(ctx, selfInsertMessage, othersInsertMessage, conversationID)...) From 5456963ed59a8637cc7fe06a967f4b358fdb01de Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 17:55:48 +0800 Subject: [PATCH 13/21] fix:remove uncorrect log call. --- internal/conversation_msg/conversation_notification.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index f128c85c0..438d77c8c 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -83,7 +83,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.SyncAllConversationHashReadSeqs, } runSyncFunctions(ctx, syncWaitFunctions, syncWait) - log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) + log.ZDebug(ctx, "core data sync over", "cost time", time.Since(c.startTime).Seconds()) c.addProgress(6) // add 6 percent in progress c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress From 58f61780aa45bcf257e48d543476cd77d4813c4e Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 18:24:46 +0800 Subject: [PATCH 14/21] fix uncorrect progress add. --- internal/conversation_msg/conversation_msg.go | 3 +-- internal/conversation_msg/conversation_notification.go | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 7cf668812..19852760e 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -531,9 +531,8 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - c.addProgress(msgLen / total * 90) + c.addProgress((msgLen * 90) / total) c.ConversationListener().OnSyncServerProgress(c.getProgress()) - } func (c *Conversation) addProgress(progress int) { diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 438d77c8c..a333576eb 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -65,6 +65,7 @@ func (c *Conversation) Work(c2v common.Cmd2Value) { func (c *Conversation) syncFlag(c2v common.Cmd2Value) { ctx := c2v.Ctx syncFlag := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).SyncFlag + initSyncBaseProgress := 5 switch syncFlag { case constant.AppDataSyncStart: log.ZDebug(ctx, "AppDataSyncStart") @@ -75,7 +76,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.friend.IncrSyncFriends, } runSyncFunctions(ctx, asyncWaitFunctions, asyncWait) - c.addProgress(4) // add 4 percent in progress + c.addProgress(initSyncBaseProgress) c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress syncWaitFunctions := []func(c context.Context) error{ @@ -84,7 +85,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { } runSyncFunctions(ctx, syncWaitFunctions, syncWait) log.ZDebug(ctx, "core data sync over", "cost time", time.Since(c.startTime).Seconds()) - c.addProgress(6) // add 6 percent in progress + c.addProgress(initSyncBaseProgress) c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress asyncNoWaitFunctions := []func(c context.Context) error{ From f70620c4e0dfb2bb1a88481f13ef45c7acf52a95 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 24 Jul 2024 18:25:58 +0800 Subject: [PATCH 15/21] recovery. --- internal/conversation_msg/conversation_notification.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index a333576eb..339ddb773 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -84,7 +84,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.SyncAllConversationHashReadSeqs, } runSyncFunctions(ctx, syncWaitFunctions, syncWait) - log.ZDebug(ctx, "core data sync over", "cost time", time.Since(c.startTime).Seconds()) + log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) c.addProgress(initSyncBaseProgress) c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress From 2b7c2d6f70926d7083b8ffa47176704f0c13c565 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 14:51:13 +0800 Subject: [PATCH 16/21] fix: uncorrect map init. --- internal/interaction/msg_sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index a1f341aab..c34d6abd3 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -398,7 +398,7 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ msgNum += int(oneConversationSyncNum) } if msgNum >= SplitPullMsgNum { - tpSeqMap := tempSeqMap + tpSeqMap := make(map[string][2]int64, len(tempSeqMap)) gr.Go(func() error { resp, err := m.pullMsgBySeqRange(ctx, tpSeqMap, syncMsgNum) if err != nil { From 4795d3ceda5fdcef385cc1e75f86e9c920c114d3 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 14:51:35 +0800 Subject: [PATCH 17/21] feat: update progress logic. --- internal/conversation_msg/conversation_msg.go | 23 +++++++++++-------- .../conversation_notification.go | 11 +++++---- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 19852760e..d9a87c6a7 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -54,7 +54,8 @@ import ( ) const ( - conversationSyncLimit int64 = math.MaxInt64 + conversationSyncLimit int64 = math.MaxInt64 + MsgSyncProgressPercents = 100 - InitSyncProgress ) var SearchContentType = []int{constant.Text, constant.AtText, constant.File} @@ -81,6 +82,7 @@ type Conversation struct { full *full.Full maxSeqRecorder MaxSeqRecorder IsExternalExtensions bool + msgOffset int progress int startTime time.Time @@ -119,6 +121,7 @@ func NewConversation(ctx context.Context, longConnMgr *interaction.LongConnMgr, messageController: NewMessageController(db, ch), IsExternalExtensions: info.IsExternalExtensions(), maxSeqRecorder: NewMaxSeqRecorder(), + msgOffset: 0, progress: 0, } n.typing = newTyping(n) @@ -448,12 +451,13 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { allMsg := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Msgs ctx := c2v.Ctx msgLen := len(allMsg) + c.msgOffset += msgLen total := c2v.Value.(sdk_struct.CmdMsgSyncInReinstall).Total insertMsg := make(map[string][]*model_struct.LocalChatLog, 10) conversationList := make([]*model_struct.LocalConversation, 0) - log.ZDebug(ctx, "message come here conversation ch", "conversation length", msgLen) + log.ZDebug(ctx, "message come here conversation ch in reinstalled", "conversation length", msgLen) b := time.Now() for conversationID, msgs := range allMsg { @@ -497,7 +501,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { log.ZDebug(ctx, "decode message", "msg", msg) if v.SendID == c.loginUserID { // Messages sent by myself //if sent through this terminal - log.ZInfo(ctx, "sync message", "msg", msg) + log.ZInfo(ctx, "sync message in reinstalled", "msg", msg) latestMsg = msg @@ -531,21 +535,20 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - c.addProgress((msgLen * 90) / total) - c.ConversationListener().OnSyncServerProgress(c.getProgress()) + // c.addProgress((msgLen * 90) / total) + + log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total) + // c.ConversationListener().OnSyncServerProgress(c.getProgress()) + c.ConversationListener().OnSyncServerProgress((c.msgOffset*MsgSyncProgressPercents)/total + InitSyncProgress) } -func (c *Conversation) addProgress(progress int) { +func (c *Conversation) addInitProgress(progress int) { c.progress += progress if c.progress > 100 { c.progress = 100 } } -func (c *Conversation) getProgress() int { - return c.progress -} - func listToMap(list []*model_struct.LocalConversation, m map[string]*model_struct.LocalConversation) { for _, v := range list { m[v.ConversationID] = v diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 339ddb773..d132eb192 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -40,6 +40,8 @@ const ( asyncWait ) +const InitSyncProgress = 10 + func (c *Conversation) Work(c2v common.Cmd2Value) { log.ZDebug(c2v.Ctx, "NotificationCmd start", "cmd", c2v.Cmd, "value", c2v.Value) defer log.ZDebug(c2v.Ctx, "NotificationCmd end", "cmd", c2v.Cmd, "value", c2v.Value) @@ -65,7 +67,6 @@ func (c *Conversation) Work(c2v common.Cmd2Value) { func (c *Conversation) syncFlag(c2v common.Cmd2Value) { ctx := c2v.Ctx syncFlag := c2v.Value.(sdk_struct.CmdNewMsgComeToConversation).SyncFlag - initSyncBaseProgress := 5 switch syncFlag { case constant.AppDataSyncStart: log.ZDebug(ctx, "AppDataSyncStart") @@ -76,8 +77,8 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { c.friend.IncrSyncFriends, } runSyncFunctions(ctx, asyncWaitFunctions, asyncWait) - c.addProgress(initSyncBaseProgress) - c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress + c.addInitProgress(InitSyncProgress * 4 / 10) // add 40% of InitSyncProgress as progress + c.ConversationListener().OnSyncServerProgress(c.progress) // notify server current Progress syncWaitFunctions := []func(c context.Context) error{ c.IncrSyncConversations, @@ -85,8 +86,8 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { } runSyncFunctions(ctx, syncWaitFunctions, syncWait) log.ZWarn(ctx, "core data sync over", nil, "cost time", time.Since(c.startTime).Seconds()) - c.addProgress(initSyncBaseProgress) - c.ConversationListener().OnSyncServerProgress(c.getProgress()) // notify server current Progress + c.addInitProgress(InitSyncProgress * 6 / 10) // add 60% of InitSyncProgress as progress + c.ConversationListener().OnSyncServerProgress(c.progress) // notify server current Progress asyncNoWaitFunctions := []func(c context.Context) error{ c.user.SyncLoginUserInfoWithoutNotice, From 071f502ee2c85602d1f375df1e01e8fa97b3a498 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 15:05:16 +0800 Subject: [PATCH 18/21] fix: use map to correct. --- internal/interaction/msg_sync.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/interaction/msg_sync.go b/internal/interaction/msg_sync.go index c34d6abd3..1b65d0c4e 100644 --- a/internal/interaction/msg_sync.go +++ b/internal/interaction/msg_sync.go @@ -399,6 +399,10 @@ func (m *MsgSyncer) syncAndTriggerReinstallMsgs(ctx context.Context, seqMap map[ } if msgNum >= SplitPullMsgNum { tpSeqMap := make(map[string][2]int64, len(tempSeqMap)) + for k, v := range tempSeqMap { + tpSeqMap[k] = v + } + gr.Go(func() error { resp, err := m.pullMsgBySeqRange(ctx, tpSeqMap, syncMsgNum) if err != nil { From 28b9dc6eceba90a9f6048674ecb31a9beccf857e Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 15:05:50 +0800 Subject: [PATCH 19/21] add debug log comment. --- internal/conversation_msg/conversation_msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 6c7930261..2b34950f5 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -535,7 +535,7 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - // log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total) + // log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*MsgSyncProgressPercents)/total+InitSyncProgress) c.ConversationListener().OnSyncServerProgress((c.msgOffset*MsgSyncProgressPercents)/total + InitSyncProgress) } From b5320c10a1ef0df789e8aa3fe2356eb0b85ccce5 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 15:16:11 +0800 Subject: [PATCH 20/21] update progress number implement. --- internal/conversation_msg/conversation_msg.go | 7 +++---- internal/conversation_msg/conversation_notification.go | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/conversation_msg/conversation_msg.go b/internal/conversation_msg/conversation_msg.go index 2b34950f5..68ff59278 100644 --- a/internal/conversation_msg/conversation_msg.go +++ b/internal/conversation_msg/conversation_msg.go @@ -54,8 +54,7 @@ import ( ) const ( - conversationSyncLimit int64 = math.MaxInt64 - MsgSyncProgressPercents = 100 - InitSyncProgress + conversationSyncLimit int64 = math.MaxInt64 ) var SearchContentType = []int{constant.Text, constant.AtText, constant.File} @@ -535,8 +534,8 @@ func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) { } log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg)) - // log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*MsgSyncProgressPercents)/total+InitSyncProgress) - c.ConversationListener().OnSyncServerProgress((c.msgOffset*MsgSyncProgressPercents)/total + InitSyncProgress) + // log.ZDebug(ctx, "progress is", "msgLen", msgLen, "msgOffset", c.msgOffset, "total", total, "now progress is", (c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress) + c.ConversationListener().OnSyncServerProgress((c.msgOffset*(100-InitSyncProgress))/total + InitSyncProgress) } func (c *Conversation) addInitProgress(progress int) { diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index d132eb192..72439a426 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -40,6 +40,7 @@ const ( asyncWait ) +// InitSyncProgress is initialize Sync when reinstall. const InitSyncProgress = 10 func (c *Conversation) Work(c2v common.Cmd2Value) { From 0016089dd7f53c15085db38ecf098a45d9637a57 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 25 Jul 2024 15:20:18 +0800 Subject: [PATCH 21/21] update SyncProgress. --- internal/conversation_msg/conversation_notification.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/conversation_msg/conversation_notification.go b/internal/conversation_msg/conversation_notification.go index 72439a426..4ee156137 100644 --- a/internal/conversation_msg/conversation_notification.go +++ b/internal/conversation_msg/conversation_notification.go @@ -73,6 +73,7 @@ func (c *Conversation) syncFlag(c2v common.Cmd2Value) { log.ZDebug(ctx, "AppDataSyncStart") c.startTime = time.Now() c.ConversationListener().OnSyncServerStart(true) + c.ConversationListener().OnSyncServerProgress(1) asyncWaitFunctions := []func(c context.Context) error{ c.group.SyncAllJoinedGroupsAndMembers, c.friend.IncrSyncFriends,