Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic flyway for ledger sync (WIP) #200

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ dependencies {

implementation(libs.yaci.store.starter)
implementation(libs.yaci.store.governance.starter)
implementation(libs.yaci.store.starter.assets)
implementation(libs.yaci.store.starter.block)
implementation(libs.yaci.store.starter.epoch)
implementation(libs.yaci.store.starter.metadata)

implementation(libs.cardano.client.lib)
implementation(libs.flyway)


implementation(libs.snakeyaml)
implementation(libs.guava)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.cardanofoundation.ledgersync.configuration;

import lombok.RequiredArgsConstructor;
import org.flywaydb.core.Flyway;
import org.springframework.boot.autoconfigure.flyway.FlywayProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List;


@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties({StoreProperties.class, FlywayConfigurationProperties.class})
public class DynamicFlywayConfig {
private final DataSource dataSource;
private final StoreProperties storeProperties;
private final FlywayConfigurationProperties flywayConfigurationProperties;

@Bean
public Flyway flywayConfig() throws SQLException {
FlywayProperties flywayProperties = flywayConfigurationProperties.getFlyway();
List<String> locations = flywayProperties.getLocations();

String vendor = dataSource.getConnection().getMetaData().getDatabaseProductName().toLowerCase();
locations.add("classpath:db/store/" + vendor);

if (!storeProperties.getAssets().isEnabled()) {
locations.add("classpath:db/migration/ledgersync/assets");
}

if (!storeProperties.getMetadata().isEnabled()) {
locations.add("classpath:db/migration/ledgersync/metadata");
}

if (!storeProperties.getBlocks().isEnabled()) {
locations.add("classpath:db/migration/ledgersync/blocks");
}

if (!storeProperties.getEpoch().isEnabled()) {
locations.add("classpath:db/migration/ledgersync/epoch");
}

Flyway flyway = Flyway.configure()
.dataSource(dataSource)
.sqlMigrationPrefix(flywayProperties.getSqlMigrationPrefix())
.sqlMigrationSeparator(flywayProperties.getSqlMigrationSeparator())
.sqlMigrationSuffixes(flywayProperties.getSqlMigrationSuffixes().toArray(new String[0]))
.group(flywayProperties.isGroup())
.validateOnMigrate(flywayProperties.isValidateOnMigrate())
.schemas(flywayProperties.getSchemas().toArray(new String[0]))
.defaultSchema(flywayProperties.getDefaultSchema())
.locations(locations.toArray(new String[0]))
.createSchemas(flywayProperties.isCreateSchemas())
.outOfOrder(flywayProperties.isOutOfOrder())
.load();
flyway.migrate();
return flyway;
}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.cardanofoundation.ledgersync.configuration;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.autoconfigure.flyway.FlywayProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "spring")
public class FlywayConfigurationProperties {
private FlywayProperties flyway;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.cardanofoundation.ledgersync.configuration;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Getter
@Setter
@ConfigurationProperties(prefix = "store")
public class StoreProperties {
private Assets assets = new Assets();
private Metadata metadata = new Metadata();
private Blocks blocks = new Blocks();
private Epoch epoch = new Epoch();

@Getter
@Setter
public static final class Assets {
private boolean enabled = true;
}

@Getter
@Setter
public static final class Blocks {
private boolean enabled = true;
}

@Getter
@Setter
public static final class Metadata {
private boolean enabled = true;
}

@Getter
@Setter
public static final class Epoch {
private boolean enabled = true;
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.aggregate.AggregatedBlock;
import org.cardanofoundation.ledgersync.healthcheck.service.HealthCheckCachingService;
import org.cardanofoundation.ledgersync.repository.BlockRepository;
import org.cardanofoundation.ledgersync.repository.BlockRepositoryLS;
import org.cardanofoundation.ledgersync.service.*;
import org.cardanofoundation.ledgersync.service.impl.block.BlockAggregatorServiceImpl;
import org.cardanofoundation.ledgersync.service.impl.block.ByronEbbAggregatorServiceImpl;
Expand All @@ -16,10 +15,6 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -37,7 +32,7 @@ public class BlockEventListener {
private final BlockDataService blockDataService;
private final RollbackService rollbackService;

private final BlockRepository blockRepository;
private final BlockRepositoryLS blockRepositoryLS;
private final MetricCollectorService metricCollectorService;
private final AtomicInteger blockCount = new AtomicInteger(0);

Expand All @@ -52,7 +47,7 @@ public class BlockEventListener {

@PostConstruct
private void initBlockHeight() {
long blockNo = blockRepository.getBlockHeight().orElse(0L);
long blockNo = blockRepositoryLS.getBlockHeight().orElse(0L);
blockHeight = new AtomicLong(blockNo);
log.info("Block height {}", blockNo);
}
Expand Down Expand Up @@ -101,7 +96,7 @@ public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) {
@Transactional
public void handleRollback(RollbackEvent rollbackEvent) {
long rollbackBlockNo = 0;
var rollBackBlock = blockRepository.findBySlotNo(rollbackEvent.getRollbackTo().getSlot());
var rollBackBlock = blockRepositoryLS.findBySlotNo(rollbackEvent.getRollbackTo().getSlot());

if (rollBackBlock.isPresent()) {
rollbackBlockNo = rollBackBlock.get().getBlockNo();
Expand All @@ -120,7 +115,7 @@ public void handleRollback(RollbackEvent rollbackEvent) {
}

private boolean checkIfBlockExists(EventMetadata metadata) {
var optional = blockRepository.findBlockByHash(metadata.getBlockHash());
var optional = blockRepositoryLS.findBlockByHash(metadata.getBlockHash());
if (optional.isPresent()) {
log.info("Block already exists. Skipping block no {}, hash {}", metadata.getEpochSlot(),
metadata.getBlockHash());
Expand All @@ -141,7 +136,7 @@ private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock a
}

if (eventMetadata.getBlock() == 0) {//EBB or genesis block
boolean isExists = blockRepository.existsBlockByHash(eventMetadata.getBlockHash());
boolean isExists = blockRepositoryLS.existsBlockByHash(eventMetadata.getBlockHash());
if (isExists) {
log.warn("Skip existed block : number {}, slot_no {}, hash {}",
eventMetadata.getBlock(),
Expand All @@ -158,7 +153,7 @@ private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock a
// return;
}

if (Boolean.TRUE.equals(blockRepository.existsBlockByHash(aggregatedBlock.getHash()))) {
if (Boolean.TRUE.equals(blockRepositoryLS.existsBlockByHash(aggregatedBlock.getHash()))) {
log.warn("Skip existed block : number {}, slot_no {}, hash {}",
eventMetadata.getBlock(),
eventMetadata.getSlot(), eventMetadata.getBlockHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.bloxbean.cardano.yaci.store.common.service.CursorService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.repository.BlockRepository;
import org.cardanofoundation.ledgersync.repository.BlockRepositoryLS;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
Expand All @@ -14,11 +14,11 @@
@Slf4j
public class LedgerSyncEventListener {
private final CursorService cursorService;
private final BlockRepository blockRepository;
private final BlockRepositoryLS blockRepositoryLS;

@EventListener
public void initialize(ApplicationReadyEvent applicationReadyEvent) {
long slotHeight = blockRepository.getSlotHeight().orElse(0L);
long slotHeight = blockRepositoryLS.getSlotHeight().orElse(0L);

if (slotHeight == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Optional;

@Repository
public interface BlockRepository extends JpaRepository<Block, Long> {
public interface BlockRepositoryLS extends JpaRepository<Block, Long> {
Optional<Block> findBlockByHash(String hash);

Optional<Block> findBlockByBlockNo(long number);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.Set;

@Repository
public interface CostModelRepository extends JpaRepository<CostModel, Long> {
public interface CostModelRepositoryLS extends JpaRepository<CostModel, Long> {

@Query("SELECT MAX(c.id) FROM CostModel c")
Optional<Long> findCostModeMaxId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Collection;
import java.util.Optional;

public interface EpochParamRepository extends JpaRepository<EpochParam, Long> {
public interface EpochParamRepositoryLS extends JpaRepository<EpochParam, Long> {

@Query(value = "SELECT ep from EpochParam ep"
+ " WHERE ep.epochNo = (SELECT MAX(e.epochNo) FROM EpochParam e)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.cardanofoundation.ledgersync.common.common.cost.mdl.PlutusV2Keys;
import org.cardanofoundation.ledgersync.common.util.JsonUtil;
import org.cardanofoundation.ledgersync.consumercommon.entity.CostModel;
import org.cardanofoundation.ledgersync.repository.CostModelRepository;
import org.cardanofoundation.ledgersync.repository.CostModelRepositoryLS;
import org.cardanofoundation.ledgersync.service.CostModelService;
import org.cardanofoundation.ledgersync.service.impl.plutus.PlutusKey;
import org.springframework.stereotype.Service;
Expand All @@ -36,7 +36,7 @@
@FieldDefaults(level = AccessLevel.PRIVATE)
public class CostModelServiceImpl implements CostModelService {

final CostModelRepository costModelRepository;
final CostModelRepositoryLS costModelRepositoryLS;

@Override
public void handleCostModel(AggregatedTx tx) {
Expand Down Expand Up @@ -83,16 +83,16 @@ public void handleCostModel(AggregatedTx tx) {
, (past, future) -> future));

if (!ObjectUtils.isEmpty(costModels)) {
costModelRepository.existHash(
costModelRepositoryLS.existHash(
costModels.keySet())
.forEach(costModels::remove);
costModelRepository.saveAll(costModels.values());
costModelRepositoryLS.saveAll(costModels.values());
}
}

@Override
public CostModel findCostModelByHash(String hash) {
var costModelOptional = costModelRepository.findByHash(hash);
var costModelOptional = costModelRepositoryLS.findByHash(hash);
return costModelOptional.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.cardanofoundation.ledgersync.consumercommon.enumeration.EraType;
import org.cardanofoundation.ledgersync.mapper.EpochParamMapper;
import org.cardanofoundation.ledgersync.repository.*;
import org.cardanofoundation.ledgersync.service.CostModelService;
import org.cardanofoundation.ledgersync.service.EpochParamService;
import org.cardanofoundation.ledgersync.service.GenesisDataService;
import org.cardanofoundation.ledgersync.service.impl.plutus.PlutusKey;
Expand All @@ -30,11 +29,11 @@
@Service
public class EpochParamServiceImpl implements EpochParamService {

final BlockRepository blockRepository;
final BlockRepositoryLS blockRepositoryLS;
final ParamProposalRepository paramProposalRepository;
final EpochParamRepository epochParamRepository;
final EpochParamRepositoryLS epochParamRepository;
final EpochRepository epochRepository;
final CostModelRepository costModelRepository;
final CostModelRepositoryLS costModelRepository;
// final CostModelService costModelService;
final GenesisDataService genesisDataService;
final EpochParamMapper epochParamMapper;
Expand All @@ -44,12 +43,12 @@ public class EpochParamServiceImpl implements EpochParamService {
EpochParam defBabbageEpochParam;
EpochParam defConwayEpochParam;

public EpochParamServiceImpl(BlockRepository blockRepository, ParamProposalRepository paramProposalRepository,
EpochParamRepository epochParamRepository, EpochRepository epochRepository,
CostModelRepository costModelRepository,
public EpochParamServiceImpl(BlockRepositoryLS blockRepositoryLS, ParamProposalRepository paramProposalRepository,
EpochParamRepositoryLS epochParamRepository, EpochRepository epochRepository,
CostModelRepositoryLS costModelRepository,
@Lazy GenesisDataService genesisDataService,
EpochParamMapper epochParamMapper, ObjectMapper objectMapper) {
this.blockRepository = blockRepository;
this.blockRepositoryLS = blockRepositoryLS;
this.paramProposalRepository = paramProposalRepository;
this.epochParamRepository = epochParamRepository;
this.epochRepository = epochRepository;
Expand Down Expand Up @@ -180,7 +179,7 @@ void handleEpochParam(int epochNo) {
epochParamMapper.updateByParamProposal(curEpochParam, paramProposalToUpdate);
}

Block block = blockRepository.findFirstByEpochNo(epochNo)
Block block = blockRepositoryLS.findFirstByEpochNo(epochNo)
.orElseThrow(
() -> new RuntimeException("Block not found for epoch: " + epochNo));
curEpochParam.setEpochNo(epochNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.cardanofoundation.ledgersync.aggregate.AggregatedTx;
import org.cardanofoundation.ledgersync.aggregate.AggregatedTxIn;
import org.cardanofoundation.ledgersync.aggregate.AggregatedTxOut;
import org.cardanofoundation.ledgersync.configuration.StoreProperties;
import org.cardanofoundation.ledgersync.consumercommon.entity.*;
import org.cardanofoundation.ledgersync.dto.EUTXOWrapper;
import org.cardanofoundation.ledgersync.factory.CertificateSyncServiceFactory;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class TransactionServiceImpl implements TransactionService {
TxWitnessService txWitnessService;
TxBootstrapWitnessService txBootstrapWitnessService;

StoreProperties storeProperties;

@Override
public void prepareAndHandleTxs(Map<String, Block> blockMap,
Collection<AggregatedBlock> aggregatedBlocks) {
Expand Down Expand Up @@ -154,7 +157,9 @@ private void handleTxContents(Collection<AggregatedTx> successTxs,

// MUST SET FIRST
// multi asset mint
multiAssetService.handleMultiAssetMint(successTxs, txMap);
if (!storeProperties.getAssets().isEnabled()) {
multiAssetService.handleMultiAssetMint(successTxs, txMap);
}

// Handle stake address and its first appeared tx
Map<String, StakeAddress> stakeAddressMap = stakeAddressService
Expand Down Expand Up @@ -200,10 +205,14 @@ private void handleTxContents(Collection<AggregatedTx> successTxs,


// auxiliary
txMetaDataService.handleAuxiliaryDataMaps(txMap);
if (!storeProperties.getMetadata().isEnabled()) {
txMetaDataService.handleAuxiliaryDataMaps(txMap);
}

//param proposal
paramProposalService.handleParamProposals(successTxs, txMap);
if (!storeProperties.getEpoch().isEnabled()) {
paramProposalService.handleParamProposals(successTxs, txMap);
}

// reference inputs
referenceInputService.handleReferenceInputs(
Expand Down
Loading
Loading