Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid sending repeated block related report data #6451

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ public class DaoStateMonitoringService implements DaoSetupService, DaoStateListe
DaoStateNetworkService.Listener<NewDaoStateHashMessage, GetDaoStateHashesRequest, DaoStateHash> {

public interface Listener {
void onDaoStateHashesChanged();
default void onDaoStateHashesChanged() {
}

default void onCheckpointFail() {
}

void onCheckpointFail();
default void onDaoStateBlockCreated() {
}
}

private final DaoStateService daoStateService;
Expand Down Expand Up @@ -347,7 +352,7 @@ private Optional<DaoStateBlock> createDaoStateBlock(Block block) {
// We only broadcast after parsing of blockchain is complete
if (parseBlockChainComplete) {
// We delay broadcast to give peers enough time to have received the block.
// Otherwise they would ignore our data if received block is in future to their local blockchain.
// Otherwise, they would ignore our data if received block is in future to their local blockchain.
int delayInSec = 5 + new Random().nextInt(10);
if (Config.baseCurrencyNetwork().isRegtest()) {
delayInSec = 1;
Expand All @@ -361,6 +366,7 @@ private Optional<DaoStateBlock> createDaoStateBlock(Block block) {
duration);
accumulatedDuration += duration;
numCalls++;
listeners.forEach(Listener::onDaoStateBlockCreated);
return Optional.of(daoStateBlock);
}

Expand All @@ -371,15 +377,14 @@ private void processPeersDaoStateHashes(List<DaoStateHash> stateHashes, Optional
// If we do not add own hashes during initial parsing we fill the missing hashes from the peer and create
// at the last block our own hash.
int height = peersHash.getHeight();
if (!useDaoMonitor &&
!findDaoStateBlock(height).isPresent()) {
if (!useDaoMonitor && findDaoStateBlock(height).isEmpty()) {
if (daoStateService.getChainHeight() == height) {
// At the most recent block we create our own hash
optionalDaoStateBlock = daoStateService.getLastBlock()
.map(this::createDaoStateBlock)
.orElse(findDaoStateBlock(height));
} else {
// Otherwise we create a block from the peers daoStateHash
// Otherwise, we create a block from the peers daoStateHash
DaoStateHash daoStateHash = new DaoStateHash(height, peersHash.getHash(), false);
DaoStateBlock daoStateBlock = new DaoStateBlock(daoStateHash);
daoStateBlockChain.add(daoStateBlock);
Expand Down
71 changes: 34 additions & 37 deletions seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ public class SeedNodeReportingService {
private final String seedNodeReportingServerUrl;
private final DaoStateListener daoStateListener;
private final HttpClient httpClient;

private Timer dataReportTimer;
private final Timer heartBeatTimer;
private final ExecutorService executor;
private final Timer heartBeatTimer;
private Timer dataReportTimer;

@Inject
public SeedNodeReportingService(P2PService p2PService,
Expand Down Expand Up @@ -128,28 +127,22 @@ public SeedNodeReportingService(P2PService p2PService,

heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);

// We send each time when a new block is received and the DAO hash has been provided (which
// takes a bit after the block arrives).
daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() {
@Override
public void onDaoStateHashesChanged() {
sendBlockRelatedData();
}

@Override
public void onCheckpointFail() {
}
});

// Independent of the block
daoStateListener = new DaoStateListener() {
@Override
public void onParseBlockChainComplete() {
daoFacade.removeBsqStateListener(daoStateListener);
dataReportTimer = UserThread.runPeriodically(() -> sendDataReport(), REPORT_DELAY_SEC);
sendDataReport();

sendBlockRelatedData();

// We send each time when a new block is received and the DAO hash has been provided (which
// takes a bit after the block arrives).
daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() {
@Override
public void onDaoStateBlockCreated() {
sendBlockRelatedData();
}
});
}
};
daoFacade.addBsqStateListener(daoStateListener);
Expand Down Expand Up @@ -252,26 +245,30 @@ private void sendDataReport() {
}

private void sendReportingItems(ReportingItems reportingItems) {
CompletableFuture.runAsync(() -> {
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
try {
CompletableFuture.runAsync(() -> {
log.info("Send report to monitor server: {}", reportingItems.toString());
// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes();
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(seedNodeReportingServerUrl))
.POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes))
.header("User-Agent", getMyAddress())
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
}
} catch (IOException e) {
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} catch (IOException e) {
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executor);
}, executor);
} catch (Throwable t) {
log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString());
}
}

private String getMyAddress() {
Expand Down