Skip to content

Commit

Permalink
Merge branch 'apache:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
juhuan authored Sep 13, 2024
2 parents 1241eb5 + 00d5047 commit a9c0658
Show file tree
Hide file tree
Showing 85 changed files with 2,951 additions and 527 deletions.
1 change: 1 addition & 0 deletions .github/workflows/bazel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
- name: Retry if failed
# if it failed , retry 2 times at most
if: failure() && fromJSON(github.run_attempt) < 3
continue-on-error: true
env:
GH_REPO: ${{ github.repository }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
45 changes: 45 additions & 0 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Run Integration Tests
on:
pull_request:
types: [opened, reopened, synchronize]
push:
branches: [master, develop]

jobs:
it-test:
name: "maven-compile (${{ matrix.os }}, JDK-${{ matrix.jdk }})"
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest]
jdk: [8]
steps:
- name: Cache Maven Repos
uses: actions/cache@v4
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Checkout
uses: actions/checkout@v2

- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.jdk }}
distribution: "adopt"
cache: "maven"

- name: Run integration tests with Maven
run: mvn clean verify -Pit-test -Pskip-unit-tests

- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: always()
with:
report_paths: 'test/target/failsafe-reports/TEST-*.xml'
annotate_only: true
include_passed: true
detailed_summary: true
13 changes: 12 additions & 1 deletion .github/workflows/maven.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,27 @@ jobs:
cache: "maven"
- name: Build with Maven
run: mvn -B package --file pom.xml
- name: Upload JVM crash logs

- name: Upload Auth JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/auth/hs_err_pid*.log
retention-days: 1

- name: Upload broker JVM crash logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: jvm-crash-logs
path: /Users/runner/work/rocketmq/rocketmq/broker/hs_err_pid*.log
retention-days: 1

- name: Retry if failed
# if it failed , retry 2 times at most
if: failure() && fromJSON(github.run_attempt) < 3
continue-on-error: true
env:
GH_REPO: ${{ github.repository }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public List<DefaultAuthorizationContext> build(ChannelHandlerContext context, Re
subject = User.of(fields.get(SessionCredentials.ACCESS_KEY));
}
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(context.channel());
String sourceIp = StringUtils.substringBefore(remoteAddr, CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(remoteAddr, CommonConstants.COLON);

Resource topic;
Resource group;
Expand Down Expand Up @@ -394,7 +394,7 @@ private List<DefaultAuthorizationContext> newContext(Metadata metadata, QueryRou
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
Resource resource = Resource.ofTopic(topic.getName());
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Arrays.asList(Action.PUB, Action.SUB), sourceIp);
return Collections.singletonList(context);
}
Expand Down Expand Up @@ -437,7 +437,7 @@ private static List<DefaultAuthorizationContext> newPubContext(Metadata metadata
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
Resource resource = Resource.ofTopic(topic.getName());
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
DefaultAuthorizationContext context = DefaultAuthorizationContext.of(subject, resource, Action.PUB, sourceIp);
return Collections.singletonList(context);
}
Expand Down Expand Up @@ -483,7 +483,7 @@ private static List<DefaultAuthorizationContext> newSubContexts(Metadata metadat
if (metadata.containsKey(GrpcConstants.AUTHORIZATION_AK)) {
subject = User.of(metadata.get(GrpcConstants.AUTHORIZATION_AK));
}
String sourceIp = StringUtils.substringBefore(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
String sourceIp = StringUtils.substringBeforeLast(metadata.get(GrpcConstants.REMOTE_ADDRESS), CommonConstants.COLON);
result.add(DefaultAuthorizationContext.of(subject, resource, Action.SUB, sourceIp));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ public void run() {
LOG.error("Failed to update nameServer address list", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}, 1000 * 10, this.brokerConfig.getUpdateNameServerAddrPeriod(), TimeUnit.MILLISECONDS);
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.config;
package org.apache.rocketmq.broker;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;

import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;

public class RocksDBConfigManager {
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected volatile boolean isStop = false;
protected ConfigRocksDBStorage configRocksDBStorage = null;
public volatile boolean isStop = false;
public ConfigRocksDBStorage configRocksDBStorage = null;
private FlushOptions flushOptions = null;
private volatile long lastFlushMemTableMicroSecond = 0;

private final String filePath;
private final long memTableFlushInterval;
private DataVersion kvDataVersion = new DataVersion();


public RocksDBConfigManager(long memTableFlushInterval) {
public RocksDBConfigManager(String filePath, long memTableFlushInterval) {
this.filePath = filePath;
this.memTableFlushInterval = memTableFlushInterval;
}

public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) {
public boolean init() {
this.isStop = false;
this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath);
if (!this.configRocksDBStorage.start()) {
return false;
}
RocksIterator iterator = this.configRocksDBStorage.iterator();
this.configRocksDBStorage = new ConfigRocksDBStorage(filePath);
return this.configRocksDBStorage.start();
}
public boolean loadDataVersion() {
String currDataVersionString = null;
try {
byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion();
if (dataVersion != null && dataVersion.length > 0) {
currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8);
}
kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion();
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public boolean loadData(BiConsumer<byte[], byte[]> biConsumer) {
try (RocksIterator iterator = this.configRocksDBStorage.iterator()) {
iterator.seekToFirst();
while (iterator.isValid()) {
biConsumer.accept(iterator.key(), iterator.value());
iterator.next();
}
} finally {
iterator.close();
}

this.flushOptions = new FlushOptions();
Expand Down Expand Up @@ -103,6 +123,20 @@ public void delete(final byte[] keyBytes) throws Exception {
this.configRocksDBStorage.delete(keyBytes);
}

public void updateKvDataVersion() throws Exception {
kvDataVersion.nextVersion();
this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public DataVersion getKvDataVersion() {
return kvDataVersion;
}

public void updateForbidden(String key, String value) throws Exception {
this.configRocksDBStorage.updateForbidden(key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8));
}


public void batchPutWithWal(final WriteBatch batch) throws Exception {
this.configRocksDBStorage.batchPutWithWal(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}

callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
if (!isBroadcastMode(info.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
return removed;
Expand Down Expand Up @@ -196,7 +197,7 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
}

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
Expand All @@ -219,7 +220,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
if (updateChannelRst && isNotifyConsumerIdsChangedEnable) {
if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
if (null != this.brokerStatsManager) {
Expand All @@ -244,7 +245,7 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
Expand Down Expand Up @@ -334,4 +335,8 @@ protected void callConsumerIdsChangeListener(ConsumerGroupEvent event, String gr
}
}
}

private boolean isBroadcastMode(final MessageModel messageModel) {
return MessageModel.BROADCASTING.equals(messageModel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.config.RocksDBConfigManager;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.utils.DataConverter;
import org.rocksdb.WriteBatch;

Expand All @@ -31,14 +31,19 @@

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
return this.rocksDBConfigManager.load(configFilePath(), this::decode0);
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
}

@Override
Expand All @@ -56,8 +61,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
}
}

@Override
protected void decode0(final byte[] key, final byte[] body) {
protected void decodeOffset(final byte[] key, final byte[] body) {
String topicAtGroup = new String(key, DataConverter.CHARSET_UTF8);
RocksDBOffsetSerializeWrapper wrapper = JSON.parseObject(body, RocksDBOffsetSerializeWrapper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
Expand Down Expand Up @@ -282,6 +283,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.unlockBatchMQ(ctx, request);
case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP:
return this.updateAndCreateSubscriptionGroup(ctx, request);
case RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_LIST:
return this.updateAndCreateSubscriptionGroupList(ctx, request);
case RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG:
return this.getAllSubscriptionGroup(ctx, request);
case RequestCode.DELETE_SUBSCRIPTIONGROUP:
Expand Down Expand Up @@ -1571,6 +1574,41 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
return response;
}

private RemotingCommand updateAndCreateSubscriptionGroupList(ChannelHandlerContext ctx, RemotingCommand request) {
final long startTime = System.nanoTime();

final SubscriptionGroupList subscriptionGroupList = SubscriptionGroupList.decode(request.getBody(), SubscriptionGroupList.class);
final List<SubscriptionGroupConfig> groupConfigList = subscriptionGroupList.getGroupConfigList();

final StringBuilder builder = new StringBuilder();
for (SubscriptionGroupConfig config : groupConfigList) {
builder.append(config.getGroupName()).append(";");
}
final String groupNames = builder.toString();
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroupList: groupNames: {}, called by {}",
groupNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

final RemotingCommand response = RemotingCommand.createResponseCommand(null);
try {
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfigList(groupConfigList);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} finally {
long executionTime = (System.nanoTime() - startTime) / 1000000L;
LOGGER.info("executionTime of create updateAndCreateSubscriptionGroupList: {} is {} ms", groupNames, executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
}

return response;
}


private void initConsumerOffset(String clientHost, String groupName, int mode, TopicConfig topicConfig) {
String topic = topicConfig.getTopicName();
for (int queueId = 0; queueId < topicConfig.getReadQueueNums(); queueId++) {
Expand Down Expand Up @@ -2024,7 +2062,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId
Map<Integer, Long> queueOffsetMap = new HashMap<>();

// Reset offset for all queues belonging to the specified topic
TopicConfig topicConfig = brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("Topic " + topic + " does not exist");
Expand Down
Loading

0 comments on commit a9c0658

Please sign in to comment.