Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up "Synchronizing DAO" by ~30% #5484

Merged
merged 7 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions core/src/main/java/bisq/core/dao/node/parser/BlockParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import javax.inject.Inject;

import java.util.LinkedList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -76,6 +76,7 @@ public BlockParser(TxParser txParser,
* @throws BlockHeightNotConnectingException If new block height is not current chain Height + 1
*/
public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
long startTs = System.currentTimeMillis();
int blockHeight = rawBlock.getHeight();
log.trace("Parse block at height={} ", blockHeight);

Expand All @@ -102,7 +103,6 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio
// There are some blocks with testing such dependency chains like block 130768 where at each iteration only
// one get resolved.
// Lately there is a patter with 24 iterations observed
long startTs = System.currentTimeMillis();

rawBlock.getRawTxs().forEach(rawTx ->
txParser.findTx(rawTx,
Expand All @@ -111,24 +111,22 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio
genesisTotalSupply)
.ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx)));

daoStateService.onParseBlockComplete(block);
log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);

daoStateService.onParseBlockComplete(block);
return block;
}

private void validateIfBlockIsConnecting(RawBlock rawBlock) throws BlockHashNotConnectingException, BlockHeightNotConnectingException {
LinkedList<Block> blocks = daoStateService.getBlocks();
List<Block> blocks = daoStateService.getBlocks();

if (blocks.isEmpty())
return;

Block last = blocks.getLast();
if (last.getHeight() + 1 != rawBlock.getHeight())
if (daoStateService.getBlockHeightOfLastBlock() + 1 != rawBlock.getHeight())
throw new BlockHeightNotConnectingException(rawBlock);

if (!last.getHash().equals(rawBlock.getPreviousBlockHash()))
if (!daoStateService.getBlockHashOfLastBlock().equals(rawBlock.getPreviousBlockHash()))
throw new BlockHashNotConnectingException(rawBlock);
}

Expand Down
23 changes: 10 additions & 13 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ public void applySnapshot(DaoState snapshot) {

daoState.setTxCache(snapshot.getTxCache());

daoState.getBlocks().clear();
daoState.getBlocks().addAll(snapshot.getBlocks());
daoState.clearAndSetBlocks(snapshot.getBlocks());

daoState.getCycles().clear();
daoState.getCycles().addAll(snapshot.getCycles());
Expand Down Expand Up @@ -221,7 +220,7 @@ public void onNewBlockWithEmptyTxs(Block block) {
"We ignore that block as the first block need to be the genesis block. " +
"That might happen in edge cases at reorgs. Received block={}", block);
} else {
daoState.getBlocks().add(block);
daoState.addBlock(block);

if (parseBlockChainComplete)
log.info("New Block added at blockHeight {}", block.getHeight());
Expand Down Expand Up @@ -283,7 +282,7 @@ public void onParseBlockChainComplete() {
}


public LinkedList<Block> getBlocks() {
public List<Block> getBlocks() {
return daoState.getBlocks();
}

Expand All @@ -295,12 +294,12 @@ public LinkedList<Block> getBlocks() {
* {@code false}.
*/
public boolean isBlockHashKnown(String blockHash) {
return getBlocks().stream().anyMatch(block -> block.getHash().equals(blockHash));
return daoState.getBlockHashes().contains(blockHash);
}

public Optional<Block> getLastBlock() {
if (!getBlocks().isEmpty())
return Optional.of(getBlocks().getLast());
return Optional.of(daoState.getLastBlock());
else
return Optional.empty();
}
Expand All @@ -309,20 +308,18 @@ public int getBlockHeightOfLastBlock() {
return getLastBlock().map(Block::getHeight).orElse(0);
}

public String getBlockHashOfLastBlock() {
return getLastBlock().map(Block::getHash).orElse("");
}

public Optional<Block> getBlockAtHeight(int height) {
return getBlocks().stream()
.filter(block -> block.getHeight() == height)
.findAny();
return Optional.ofNullable(daoState.getBlocksByHeight().get(height));
}

public boolean containsBlock(Block block) {
return getBlocks().contains(block);
}

public boolean containsBlockHash(String blockHash) {
return getBlocks().stream().anyMatch(block -> block.getHash().equals(blockHash));
}

public long getBlockTime(int height) {
return getBlockAtHeight(height).map(Block::getTime).orElse(0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void maybeCreateSnapshot(Block block) {
daoStateSnapshotCandidate.getChainHeight() != chainHeight;
if (isSnapshotHeight(chainHeight) &&
!daoStateService.getBlocks().isEmpty() &&
isValidHeight(daoStateService.getBlocks().getLast().getHeight()) &&
isValidHeight(daoStateService.getBlockHeightOfLastBlock()) &&
noSnapshotCandidateOrDifferentHeight) {
// At trigger event we store the latest snapshotCandidate to disc
long ts = System.currentTimeMillis();
Expand All @@ -126,10 +126,9 @@ public void applySnapshot(boolean fromReorg) {
DaoState persistedBsqState = daoStateStorageService.getPersistedBsqState();
LinkedList<DaoStateHash> persistedDaoStateHashChain = daoStateStorageService.getPersistedDaoStateHashChain();
if (persistedBsqState != null) {
LinkedList<Block> blocks = persistedBsqState.getBlocks();
int chainHeightOfPersisted = persistedBsqState.getChainHeight();
if (!blocks.isEmpty()) {
int heightOfLastBlock = blocks.getLast().getHeight();
if (!persistedBsqState.getBlocks().isEmpty()) {
int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight();
log.debug("applySnapshot from persistedBsqState daoState with height of last block {}", heightOfLastBlock);
if (isValidHeight(heightOfLastBlock)) {
if (chainHeightOfLastApplySnapshot != chainHeightOfPersisted) {
Expand Down
68 changes: 65 additions & 3 deletions core/src/main/java/bisq/core/dao/state/model/DaoState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -75,7 +76,9 @@ public static DaoState getClone(DaoState daoState) {

@Getter
private int chainHeight; // Is set initially to genesis height
@Getter

// We override the getter so callers can't modify the list without also updating
// the block caches and indices below
private final LinkedList<Block> blocks;
@Getter
private final LinkedList<Cycle> cycles;
Expand Down Expand Up @@ -107,7 +110,10 @@ public static DaoState getClone(DaoState daoState) {
// Transient data used only as an index - must be kept in sync with the block list
@JsonExclude
private transient final Map<String, Tx> txCache; // key is txId

@JsonExclude
private transient final Map<Integer, Block> blocksByHeight; // Blocks indexed by height
@JsonExclude
private transient final Set<String> blockHashes; // Cache of known block hashes

///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -159,6 +165,13 @@ private DaoState(int chainHeight,
txCache = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));

blockHashes = blocks.stream()
.map(Block::getHash)
.collect(Collectors.toSet());

blocksByHeight = blocks.stream()
.collect(Collectors.toMap(Block::getHeight, Function.identity(), (x, y) -> x, HashMap::new));
}

@Override
Expand Down Expand Up @@ -235,7 +248,7 @@ public byte[] getSerializedStateForHashChain() {
// Reorgs are handled by rebuilding the hash chain from last snapshot.
// Using the full blocks list becomes quite heavy. 7000 blocks are
// about 1.4 MB and creating the hash takes 30 sec. By using just the last block we reduce the time to 7 sec.
return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray();
return getBsqStateBuilderExcludingBlocks().addBlocks(getLastBlock().toProtoMessage()).build().toByteArray();
}

public void addToTxCache(Tx tx) {
Expand All @@ -253,6 +266,55 @@ public Map<String, Tx> getTxCache() {
return Collections.unmodifiableMap(txCache);
}

public Set<String> getBlockHashes() {
return Collections.unmodifiableSet(blockHashes);
}

public Map<Integer, Block> getBlocksByHeight() {
return Collections.unmodifiableMap(blocksByHeight);
}

/**
* @return Unmodifiable view of the list of blocks. This prevents callers from
* directly modifying the list. We need to do this to make sure the block list is only
* modified together with the corresponding caches and indices.
*
* @see #addBlock(Block) to add a single block
* @see #addBlocks(List) to add a list of blocks
* @see #clearAndSetBlocks(List) to replace existing blocks with a new list
*/
public List<Block> getBlocks() {
return Collections.unmodifiableList(blocks);
}

// Wrapper that directly accesses the LinkedList, such that we don't have to expose
// the LinkedList
public Block getLastBlock() {
return blocks.getLast();
}

public void addBlock(Block block) {
blocks.add(block);
blockHashes.add(block.getHash());
blocksByHeight.put(block.getHeight(), block);
}

public void addBlocks(List<Block> newBlocks) {
newBlocks.forEach(b -> addBlock(b));
}

/**
* Clears the existing block list and caches, and repopulates them with the new list
* @param newBlocks
*/
public void clearAndSetBlocks(List<Block> newBlocks) {
blocks.clear();
blocksByHeight.clear();
blockHashes.clear();

addBlocks(newBlocks);
}

@Override
public String toString() {
return "DaoState{" +
Expand Down