Skip to content

Commit

Permalink
[ISSUE #299] offset store read missing fallthrough for _ReadMemoryThe…
Browse files Browse the repository at this point in the history
…nStore
  • Loading branch information
jalesys authored and wenfengwang committed Nov 19, 2019
1 parent 4364ebd commit 0e9d5b1
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
5 changes: 4 additions & 1 deletion consumer/offset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {
return off
}
fallthrough
case _ReadFromStore:
local.load()
return readFromMemory(local.OffsetTable, mq)
Expand Down Expand Up @@ -277,14 +278,16 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i
r.mutex.RLock()
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
defer r.mutex.RUnlock()
off, exist := r.OffsetTable[*mq]
if exist {
r.mutex.RUnlock()
return off
}
if t == _ReadFromMemory {
r.mutex.RUnlock()
return -1
}
fallthrough
case _ReadFromStore:
off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
if err != nil {
Expand Down
22 changes: 17 additions & 5 deletions consumer/offset_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package consumer

import (
"path/filepath"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -41,21 +42,21 @@ func TestNewLocalFileOffsetStore(t *testing.T) {
group: "testGroup",
expectedResult: &localFileOffsetStore{
group: "testGroup",
path: _LocalOffsetStorePath + "/testGroup/offset.json",
path: filepath.Join(_LocalOffsetStorePath, "/testGroup/offset.json"),
},
}, {
clientId: "192.168.24.1@default",
group: "",
expectedResult: &localFileOffsetStore{
group: "",
path: _LocalOffsetStorePath + "/192.168.24.1@default/offset.json",
path: filepath.Join(_LocalOffsetStorePath, "/192.168.24.1@default/offset.json"),
},
}, {
clientId: "192.168.24.1@default",
group: "testGroup",
expectedResult: &localFileOffsetStore{
group: "testGroup",
path: _LocalOffsetStorePath + "/192.168.24.1@default/testGroup/offset.json",
path: filepath.Join(_LocalOffsetStorePath, "/192.168.24.1@default/testGroup/offset.json"),
},
},
}
Expand Down Expand Up @@ -134,6 +135,10 @@ func TestLocalFileOffsetStore(t *testing.T) {
localStore.persist(queues)
offset = localStore.read(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)

delete(localStore.(*localFileOffsetStore).OffsetTable, MessageQueueKey(*mq))
offset = localStore.read(mq, _ReadMemoryThenStore)
So(offset, ShouldEqual, 1)
})
})
}
Expand Down Expand Up @@ -204,19 +209,26 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
Convey("test persist", func() {
queues := []*primitive.MessageQueue{mq}

namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911")
namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("192.168.24.1:10911").MaxTimes(2)

ret := &remote.RemotingCommand{
Code: internal.ResSuccess,
ExtFields: map[string]string{
"offset": "1",
},
}
rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)

remoteStore.persist(queues)
offset := remoteStore.read(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)

remoteStore.remove(mq)
offset = remoteStore.read(mq, _ReadFromMemory)
So(offset, ShouldEqual, -1)
offset = remoteStore.read(mq, _ReadMemoryThenStore)
So(offset, ShouldEqual, 1)

})

Convey("test remove", func() {
Expand Down

0 comments on commit 0e9d5b1

Please sign in to comment.