Skip to content

Commit

Permalink
feat: Support FetchSurroundingMessages (#741)
Browse files Browse the repository at this point in the history
* feat: code adjustment

* feat: Cmd2Value carry caller

* feat: Cmd2Value carry caller

* feat: Cmd2Value carry caller

* feat: Cmd2Value carry caller

* fix: SearchLocalMessages no such table

* feat: FetchSurroundingMessages

* feat: FetchSurroundingMessages
  • Loading branch information
withchao authored Oct 16, 2024
1 parent 1dcc7a7 commit e9bad3d
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 18 deletions.
84 changes: 71 additions & 13 deletions internal/conversation_msg/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,76 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
log.ZDebug(ctx, "pull message", "pull cost time", time.Since(t))
t = time.Now()
//var thisMinSeq int64
//for _, v := range list {
// if v.Seq != 0 && thisMinSeq == 0 {
// thisMinSeq = v.Seq
// }
// if v.Seq < thisMinSeq && v.Seq != 0 {
// thisMinSeq = v.Seq
// }
// if v.Status >= constant.MsgStatusHasDeleted {
// log.ZDebug(ctx, "this message has been deleted or exception message", "msg", v)
// continue
// }
// temp := sdk_struct.MsgStruct{}
// temp.ClientMsgID = v.ClientMsgID
// temp.ServerMsgID = v.ServerMsgID
// temp.CreateTime = v.CreateTime
// temp.SendTime = v.SendTime
// temp.SessionType = v.SessionType
// temp.SendID = v.SendID
// temp.RecvID = v.RecvID
// temp.MsgFrom = v.MsgFrom
// temp.ContentType = v.ContentType
// temp.SenderPlatformID = v.SenderPlatformID
// temp.SenderNickname = v.SenderNickname
// temp.SenderFaceURL = v.SenderFaceURL
// temp.Content = v.Content
// temp.Seq = v.Seq
// temp.IsRead = v.IsRead
// temp.Status = v.Status
// var attachedInfo sdk_struct.AttachedInfoElem
// _ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo)
// temp.AttachedInfoElem = &attachedInfo
// temp.Ex = v.Ex
// temp.LocalEx = v.LocalEx
// err := c.msgHandleByContentType(&temp)
// if err != nil {
// log.ZError(ctx, "Parsing data error", err, "temp", temp)
// continue
// }
// switch sessionType {
// case constant.WriteGroupChatType:
// fallthrough
// case constant.ReadGroupChatType:
// temp.GroupID = temp.RecvID
// temp.RecvID = c.loginUserID
// }
// if attachedInfo.IsPrivateChat && temp.SendTime+int64(attachedInfo.BurnDuration) < time.Now().Unix() {
// continue
// }
// messageList = append(messageList, &temp)
//}
var thisMinSeq int64
thisMinSeq, messageList = c.LocalChatLog2MsgStruct(ctx, list, sessionType)
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq == 0 {
thisMinSeq = req.LastMinSeq
}
messageListCallback.LastMinSeq = thisMinSeq
return &messageListCallback, nil

}

func (c *Conversation) LocalChatLog2MsgStruct(ctx context.Context, list []*model_struct.LocalChatLog, sessionType int) (int64, []*sdk_struct.MsgStruct) {
messageList := make([]*sdk_struct.MsgStruct, 0, len(list))
var thisMinSeq int64
for _, v := range list {
if v.Seq != 0 && thisMinSeq == 0 {
Expand Down Expand Up @@ -161,19 +231,7 @@ func (c *Conversation) getAdvancedHistoryMessageList(ctx context.Context, req sd
}
messageList = append(messageList, &temp)
}
log.ZDebug(ctx, "message convert and unmarshal", "unmarshal cost time", time.Since(t))
t = time.Now()
if !isReverse {
sort.Sort(messageList)
}
log.ZDebug(ctx, "sort", "sort cost time", time.Since(t))
messageListCallback.MessageList = messageList
if thisMinSeq == 0 {
thisMinSeq = req.LastMinSeq
}
messageListCallback.LastMinSeq = thisMinSeq
return &messageListCallback, nil

return thisMinSeq, messageList
}

func (c *Conversation) typingStatusUpdate(ctx context.Context, recvID, msgTip string) error {
Expand Down
51 changes: 51 additions & 0 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
sdk "github.com/openimsdk/openim-sdk-core/v3/pkg/sdk_params_callback"
"math"
"sync"

Expand Down Expand Up @@ -1013,3 +1014,53 @@ func (c *Conversation) GetInputStates(ctx context.Context, conversationID string
func (c *Conversation) ChangeInputStates(ctx context.Context, conversationID string, focus bool) error {
return c.typing.ChangeInputStates(ctx, conversationID, focus)
}

func (c *Conversation) FetchSurroundingMessages(ctx context.Context, conversationID string, seq int64, before int64, after int64) ([]*sdk_struct.MsgStruct, error) {
lc, err := c.db.GetConversation(ctx, conversationID)
if err != nil {
return nil, err
}
c.pullMessageAndReGetHistoryMessages(ctx, conversationID, []int64{seq}, false, false, 0, 0, &[]*model_struct.LocalChatLog{}, &sdk.GetAdvancedHistoryMessageListCallback{})
res, err := c.db.GetMessagesBySeqs(ctx, conversationID, []int64{seq})
if err != nil {
return nil, err
}
if len(res) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
_, msgList := c.LocalChatLog2MsgStruct(ctx, []*model_struct.LocalChatLog{res[0]}, int(lc.ConversationType))
if len(msgList) == 0 {
return []*sdk_struct.MsgStruct{}, nil
}
msg := msgList[0]
result := make([]*sdk_struct.MsgStruct, 0, before+after+1)
if before > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
LastMinSeq: msg.Seq,
ConversationID: conversationID,
Count: int(before),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, false)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
result = append(result, msg)
if after > 0 {
req := sdk.GetAdvancedHistoryMessageListParams{
LastMinSeq: msg.Seq,
ConversationID: conversationID,
Count: int(after),
StartClientMsgID: msg.ClientMsgID,
}
val, err := c.getAdvancedHistoryMessageList(ctx, req, true)
if err != nil {
return nil, err
}
result = append(result, val.MessageList...)
}
sort.Sort(sdk_struct.NewMsgList(result))
return result, nil
}
5 changes: 1 addition & 4 deletions internal/conversation_msg/entering.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ func (e *typing) onNewMsg(ctx context.Context, msg *sdkws.MsgData) {
return
}
now := time.Now().UnixMilli()
expirationTimestamp := msg.SendTime + int64(inputStatesSendTime/time.Millisecond)
if msg.SendTime > now || expirationTimestamp <= now {
return
}
expirationTimestamp := now + int64(inputStatesSendTime/time.Millisecond)
var sourceID string
if msg.GroupID == "" {
sourceID = msg.SendID
Expand Down
4 changes: 4 additions & 0 deletions open_im_sdk/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,7 @@ func ChangeInputStates(callback open_im_sdk_callback.Base, operationID string, c
func GetInputStates(callback open_im_sdk_callback.Base, operationID string, conversationID string, userID string) {
call(callback, operationID, UserForSDK.Conversation().GetInputStates, conversationID, userID)
}

func FetchSurroundingMessages(callback open_im_sdk_callback.Base, operationID string, conversationID string, seq int64, before int64, after int64) {
call(callback, operationID, UserForSDK.Conversation().FetchSurroundingMessages, conversationID, seq, before, after)
}
13 changes: 13 additions & 0 deletions test/create_msg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,16 @@ func Test_CreateForwardMessage(t *testing.T) {
}
t.Log(message)
}

func Test_FetchSurroundingMessages(t *testing.T) {
msgs, err := open_im_sdk.UserForSDK.Conversation().FetchSurroundingMessages(ctx, "sg_3559850526", 15, 14, 8)
if err != nil {
t.Error(err)
return
}
t.Log(len(msgs))
for _, msg := range msgs {
t.Logf("[%d] %#v", msg.Seq, msg.TextElem)
}
t.Log(msgs)
}
6 changes: 5 additions & 1 deletion test/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ func init() {
if err := open_im_sdk.UserForSDK.Login(ctx, UserID, token); err != nil {
panic(err)
}
open_im_sdk.UserForSDK.SetConversationListener(&onConversationListener{ctx: ctx})
ch := make(chan error)
open_im_sdk.UserForSDK.SetConversationListener(&onConversationListener{ctx: ctx, ch: ch})
open_im_sdk.UserForSDK.SetGroupListener(&onGroupListener{ctx: ctx})
open_im_sdk.UserForSDK.SetAdvancedMsgListener(&onAdvancedMsgListener{ctx: ctx})
open_im_sdk.UserForSDK.SetFriendshipListener(&onFriendshipListener{ctx: ctx})
open_im_sdk.UserForSDK.SetUserListener(&onUserListener{ctx: ctx})
if err := <-ch; err != nil {
panic(err)
}
}

func getConf(APIADDR, WSADDR string) sdk_struct.IMConfig {
Expand Down
4 changes: 4 additions & 0 deletions test/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package test

import (
"context"
"fmt"
"github.com/openimsdk/tools/log"
)

Expand Down Expand Up @@ -45,6 +46,7 @@ func (c *OnConnListener) OnUserTokenExpired() {

type onConversationListener struct {
ctx context.Context
ch chan error
}

func (o *onConversationListener) OnSyncServerStart(reinstalled bool) {
Expand All @@ -53,10 +55,12 @@ func (o *onConversationListener) OnSyncServerStart(reinstalled bool) {

func (o *onConversationListener) OnSyncServerFinish(reinstalled bool) {
log.ZInfo(o.ctx, "OnSyncServerFinish")
o.ch <- nil
}

func (o *onConversationListener) OnSyncServerFailed(reinstalled bool) {
log.ZInfo(o.ctx, "OnSyncServerFailed")
o.ch <- fmt.Errorf("OnSyncServerFailed")
}

func (o *onConversationListener) OnSyncServerProgress(progress int) {
Expand Down
1 change: 1 addition & 0 deletions wasm/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func registerFunc() {

js.Global().Set("changeInputStates", js.FuncOf(wrapperConMsg.ChangeInputStates))
js.Global().Set("getInputStates", js.FuncOf(wrapperConMsg.GetInputStates))
js.Global().Set("fetchSurroundingMessages", js.FuncOf(wrapperConMsg.FetchSurroundingMessages))

//register group func
wrapperGroup := wasm_wrapper.NewWrapperGroup(globalFuc)
Expand Down
5 changes: 5 additions & 0 deletions wasm/wasm_wrapper/wasm_conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,8 @@ func (w *WrapperConMsg) GetInputStates(_ js.Value, args []js.Value) interface{}
callback := event_listener.NewBaseCallback(utils.FirstLower(utils.GetSelfFuncName()), w.commonFunc)
return event_listener.NewCaller(open_im_sdk.GetInputStates, callback, &args).AsyncCallWithCallback()
}

func (w *WrapperConMsg) FetchSurroundingMessages(_ js.Value, args []js.Value) interface{} {
callback := event_listener.NewBaseCallback(utils.FirstLower(utils.GetSelfFuncName()), w.commonFunc)
return event_listener.NewCaller(open_im_sdk.FetchSurroundingMessages, callback, &args).AsyncCallWithCallback()
}

0 comments on commit e9bad3d

Please sign in to comment.