Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatal error: concurrent map writes #139

Closed
wolftankk opened this issue Aug 2, 2019 · 5 comments
Closed

fatal error: concurrent map writes #139

wolftankk opened this issue Aug 2, 2019 · 5 comments
Labels
bug Something isn't working

Comments

@wolftankk
Copy link
Contributor

In the newest sdk, get the errors:

fatal error: concurrent map writes

goroutine 55 [running]:
runtime.throw(0x5ddf1e, 0x15)
	/usr/lib/golang/src/runtime/panic.go:617 +0x72 fp=0xc0001076b0 sp=0xc000107680 pc=0x42ccc2
runtime.mapassign_fast64(0x5a01e0, 0xc0001e2660, 0x0, 0x75fd00)
	/usr/lib/golang/src/runtime/map_fast64.go:101 +0x35f fp=0xc0001076f0 sp=0xc0001076b0 pc=0x41220f
github.com/apache/rocketmq-client-go/consumer.(*localFileOffsetStore).update(0xc00009ec90, 0xc0001a2b70, 0x16, 0xc00012c000)
	/home/work/rocketmq-client-go/consumer/offset_store.go:137 +0x1e1 fp=0xc000107748 sp=0xc0001076f0 pc=0x5639e1
github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).pullMessage.func2(0xc00007c700, 0xc0001a2c90)
	/home/work/rocketmq-client-go/consumer/push_consumer.go:482 +0x7b fp=0xc0001077d0 sp=0xc000107748 pc=0x5719eb
runtime.goexit()
	/usr/lib/golang/src/runtime/asm_amd64.s:1337 +0x1 fp=0xc0001077d8 sp=0xc0001077d0 pc=0x458311
created by github.com/apache/rocketmq-client-go/consumer.(*pushConsumer).pullMessage
	/home/work/rocketmq-client-go/consumer/push_consumer.go:480 +0x138b

goroutine 1 [sleep]:
runtime.goparkunlock(...)
	/usr/lib/golang/src/runtime/proc.go:307
time.Sleep(0x34630b8a000)
	/usr/lib/golang/src/runtime/time.go:105 +0x159
main.main()
	/home/work/rocketmq-client-go/examples/consumer/simple/main.go:51 +0x258

goroutine 5 [sleep]:
runtime.goparkunlock(...)
	/usr/lib/golang/src/runtime/proc.go:307
time.Sleep(0x2540be400)
	/usr/lib/golang/src/runtime/time.go:105 +0x159
github.com/apache/rocketmq-client-go/consumer.(*statsItemSet).init.func1(0xc00007c5c0)
	/home/work/rocketmq-client-go/consumer/statistics.go:149 +0x3b
created by github.com/apache/rocketmq-client-go/consumer.(*statsItemSet).init
	/home/work/rocketmq-client-go/consumer/statistics.go:146 +0x43

goroutine 6 [sleep]:
runtime.goparkunlock(...)
	/usr/lib/golang/src/runtime/proc.go:307
time.Sleep(0x8bb2c97000)
	/usr/lib/golang/src/runtime/time.go:105 +0x159
github.com/apache/rocketmq-client-go/consumer.(*statsItemSet).init.func2(0xc00007c5c0)
	/home/work/rocketmq-client-go/consumer/statistics.go:156 +0x3b
created by github.com/apache/rocketmq-client-go/consumer.(*statsItemSet).init
	/home/work/rocketmq-client-go/consumer/statistics.go:153 +0x65

test code:

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go"
	"github.com/apache/rocketmq-client-go/consumer"
	"github.com/apache/rocketmq-client-go/primitive"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
	)
	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
}
@wolftankk
Copy link
Contributor Author

wolftankk commented Aug 2, 2019

it must add locker for CAS, so in

func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
	rlog.Debugf("update offset: %s to %d", mq, offset)
	localOffset, exist := local.OffsetTable[mq.Topic]
	if !exist {
		localOffset = make(map[int]*queueOffset)
		local.OffsetTable[mq.Topic] = localOffset
	}
	q, exist := localOffset[mq.QueueId]
	if !exist {
		q = &queueOffset{
			QueueID: mq.QueueId,
			Broker:  mq.BrokerName,
		}
		localOffset[mq.QueueId] = q
	}
	if increaseOnly {
		if q.Offset < offset {
			q.Offset = offset
		}
	} else {
		q.Offset = offset
	}
}```

it will add 

```golang
func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
        local.mutex.Lock()
	defer local.mutex.Unlock()

@wenfengwang
Copy link
Member

Thank you @wolftankk! could you create a PR for this issue?

@ShannonDing ShannonDing added the bug Something isn't working label Aug 8, 2019
@ShannonDing
Copy link
Member

@wolftankk Could you please create a PR to fix this issue?

wolftankk added a commit to wolftankk/rocketmq-client-go that referenced this issue Aug 8, 2019
@wolftankk
Copy link
Contributor Author

@wolftankk Could you please create a PR to fix this issue?

Sure.

@wenfengwang
Copy link
Member

fixed at #146

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants