Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add param which like consumeThreadMin of java sdk to control consumption rate #883

Merged
merged 2 commits into from
Aug 7, 2022

Conversation

cserwen
Copy link
Member

@cserwen cserwen commented Aug 4, 2022

What is the purpose of the change

  • make some configs can be set
  • add ConsumeGoroutineNums to control the number of goroutines that are consuming.

Brief changelog

XX

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@ShannonDing ShannonDing merged commit e1ddb88 into apache:master Aug 7, 2022
@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

rocketmq-client-go-2.1.2-rc1

This ConsumeGoroutineNums option changes the default behavior, because it opens 20 goroutines by default, and there is no way to have more than 1000, which may affect the original consumption speed.
I suggest that the default value of 0 be the same as the original, that is, there is no limit to the number of goroutines, and more goroutines can be configured for consumption.

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

rocketmq-client-go-2.1.2-rc1

This ConsumeGoroutineNums option changes the default behavior, because it opens 20 goroutines by default, and there is no way to have more than 1000, which may affect the original consumption speed. I suggest that the default value of 0 be the same as the original, that is, there is no limit to the number of goroutines, and more goroutines can be configured for consumption.

Just for consistency with java.

@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

The current implementation is inconsistent with Java. Java is a thread pool, and threads are reused, but in the implementation of Go, goroutines are not reused.
The maximum number of goroutines is 1000, which is a bit small for the go language.

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

The current implementation is inconsistent with Java. Java is a thread pool, and threads are reused, but in the implementation of Go, goroutines are not reused. The maximum number of goroutines is 1000, which is a bit small for the go language.

The purpose of this is not to reuse threads, but to limit concurrency. The original design did not consider concurrency issues at all, which would bring down the downstream system.

@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

It can be controlled from the speed of pulling (WithPullInterval), which will be simpler. Control the speed of consumption by controlling the amount of messages fetched.

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

It can be controlled from the speed of pulling (WithPullInterval), which will be simpler. Control the speed of consumption by controlling the amount of messages fetched.

By default, there is a pull speed limit, but the pull contorl needs to be suppressed through consumption. I don’t think that the change of contorlling the pull speed will be smaller. You must know that for RocketMQ Push consumer, pull and consumption are Asynchronous.

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

@twz915 In addition, controlling the pull speed does not limit the concurrency of consumption. pullInterval is only for a single queue. If I have a lot of queues, there will still be a large number of messages flooding in at once.

IMO, if you think 1024 is too small, you can submit pr to change it. :-D

@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

I know that for RocketMQ Push consumer, pull and consumption are Asynchronous. but I also think controlling the speed of pulling and controlling the speed of consumption is a more effective way.
There is no problem in controlling the speed of consumption, but I think the maximum number of 1000 is relatively small. If the performance of the machine is better, it may affect the maximum capacity of consumption.

if pc.option.ConsumeGoroutineNums < 1 || pc.option.ConsumeGoroutineNums > 1000 {
if pc.option.ConsumeGoroutineNums == 0 {
pc.option.ConsumeGoroutineNums = 20
} else {
return errors.New("option.ConsumeGoroutineNums out of range [1, 1000]")
}
}

@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

@cserwen In most cases, I only need to control the TPS of consumption, so I just pull messages at this speed.
TPS = 1000(ms) / pullInterval(ms) * BatchSize * Queue Number, and the queue number could be queried.
However, after the number of goroutines consumed is limited, upgrading to a new version may affect the consumption speed of the production environment.

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

@cserwen In most cases, I only need to control the TPS of consumption, so I just pull messages at this speed. TPS = 1000(ms) / pullInterval(ms) * BatchSize * Queue Number, and the queue number could be queried. However, after the number of goroutines consumed is limited, upgrading to a new version may affect the consumption speed of the production environment.

In general, users should not modify the behavior of pulling messages, but dynamically adjust the pull rate according to the expected consumption rate. This is more intuitive, isn't it?

@twz915
Copy link
Contributor

twz915 commented Apr 28, 2023

I disagree with what you said that the behavior of pulling messages should not be changed. It is acceptable to control by fetching messages or when consuming messages.
It’s okay to provide the ability to control the speed of message consumption, but when I don’t want to limit it, or want to set it to a relatively large value, there should be a way to achieve it. @cserwen

@cserwen
Copy link
Member Author

cserwen commented Apr 28, 2023

I disagree with what you said that the behavior of pulling messages should not be changed. It is acceptable to control by fetching messages or when consuming messages. It’s okay to provide the ability to control the speed of message consumption, but when I don’t want to limit it, or want to set it to a relatively large value, there should be a way to achieve it. @cserwen

You can set it by WithConsumeGoroutineNums. if you think 1024 is too small, you can submit a pr to change it. :-D @twz915

twz915 pushed a commit to twz915/rocketmq-client-go that referenced this pull request May 9, 2023
ConsumeGoroutineNums should allow large values to be set, detail see apache#883 (comment)
ShannonDing pushed a commit that referenced this pull request May 9, 2023
ConsumeGoroutineNums should allow large values to be set, detail see #883 (comment)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants