Skip to content

Commit

Permalink
Optimize the logic of switching to spin locks
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghuaiyuan committed Oct 9, 2024
1 parent 6b5a40b commit 1d7bac5
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 53 deletions.
10 changes: 3 additions & 7 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLock;
import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
Expand Down Expand Up @@ -97,8 +96,6 @@ public class CommitLog implements Swappable {

protected final PutMessageLock putMessageLock;

protected final AdaptiveBackOffSpinLock adaptiveBackOffSpinLock;

protected final TopicQueueLock topicQueueLock;

private volatile Set<String> fullStorePaths = Collections.emptySet();
Expand Down Expand Up @@ -134,9 +131,8 @@ protected PutMessageThreadLocal initialValue() {
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
}
};
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

this.adaptiveBackOffSpinLock = new AdaptiveBackOffSpinLockImpl(putMessageLock);
this.putMessageLock = new AdaptiveBackOffSpinLockImpl(messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock());

this.flushDiskWatcher = new FlushDiskWatcher();

Expand Down Expand Up @@ -954,7 +950,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}

this.adaptiveBackOffSpinLock.isOpen(this.defaultMessageStore.getMessageStoreConfig().getUseABSLock());
this.putMessageLock.isOpen(this.defaultMessageStore.getMessageStoreConfig().getUseABSLock());

int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(msg);
Expand Down Expand Up @@ -1121,7 +1117,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
}

this.adaptiveBackOffSpinLock.isOpen(this.defaultMessageStore.getMessageStoreConfig().getUseABSLock());
this.putMessageLock.isOpen(this.defaultMessageStore.getMessageStoreConfig().getUseABSLock());

int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
boolean needHandleHA = needHandleHA(messageExtBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.store;

import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLock;
import org.apache.rocketmq.store.config.MessageStoreConfig;

/**
* Used when trying to put message
Expand All @@ -25,4 +25,13 @@ public interface PutMessageLock {
void lock();

void unlock();

default void update(MessageStoreConfig messageStoreConfig) {
}

default void swap() {
}

default void isOpen(boolean open) {
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class AdaptiveBackOffSpinLockImpl implements AdaptiveBackOffSpinLock {
private AdaptiveBackOffSpinLock adaptiveLock;
public class AdaptiveBackOffSpinLockImpl implements PutMessageLock {
private PutMessageLock adaptiveLock;
//state
private AtomicBoolean state = new AtomicBoolean(true);

Expand All @@ -40,7 +40,7 @@ public class AdaptiveBackOffSpinLockImpl implements AdaptiveBackOffSpinLock {

private final static int BASE_SWAP_LOCK_RATIO = 320;

private Map<String, AdaptiveBackOffSpinLock> locks;
private Map<String, PutMessageLock> locks;

private final List<AtomicInteger> tpsTable;

Expand Down Expand Up @@ -177,11 +177,11 @@ public void isOpen(boolean open) {
}
}

public List<AdaptiveBackOffSpinLock> getLocks() {
return (List<AdaptiveBackOffSpinLock>) this.locks.values();
public List<PutMessageLock> getLocks() {
return (List<PutMessageLock>) this.locks.values();
}

public void setLocks(Map<String, AdaptiveBackOffSpinLock> locks) {
public void setLocks(Map<String, PutMessageLock> locks) {
this.locks = locks;
}

Expand All @@ -193,7 +193,7 @@ public void setState(boolean state) {
this.state.set(state);
}

public AdaptiveBackOffSpinLock getAdaptiveLock() {
public PutMessageLock getAdaptiveLock() {
return adaptiveLock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.PutMessageLock;

import java.util.concurrent.locks.ReentrantLock;

public class BackOffReentrantLock implements AdaptiveBackOffSpinLock {
public class BackOffReentrantLock implements PutMessageLock {
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.lock;

import org.apache.rocketmq.store.PutMessageLock;
import org.apache.rocketmq.store.config.MessageStoreConfig;

import java.time.LocalTime;
Expand All @@ -24,7 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class BackOffSpinLock implements AdaptiveBackOffSpinLock {
public class BackOffSpinLock implements PutMessageLock {

private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

Expand Down

0 comments on commit 1d7bac5

Please sign in to comment.