Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables #4032

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class Dag<T> {
// Map to maintain parent to children mapping.
private Map<DagNode, List<DagNode<T>>> parentChildMap;
private List<DagNode<T>> nodes;
@Setter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we do not persist dag level field in mysql, adding fields to Dag will not be much useful and may lead to bugs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to have this field here as we didn't want to add additional parameters in all the methods to pass on is_failed value.

private boolean isFailedDag;

@Setter
@Deprecated // because this field is not persisted in mysql and contains information in very limited cases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,11 @@ public interface DagManagementStateStore {

/**
* This marks the dag as a failed one.
* Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
* Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it remain useful to both retrieve the DAG while also asserting that it's failed?

* @param dagId failing dag's dagId
*/
void markDagFailed(DagManager.DagId dagId) throws IOException;

/**
* Returns the failed dag.
* If the dag is not found because it was never marked as failed through
* {@link DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)},
* it returns Optional.absent.
* @param dagId dag id of the failed dag
*/
Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws IOException;

void deleteFailedDag(DagManager.DagId dagId) throws IOException;

/**
* Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
* Note that a DagNode is a part of a Dag and must already be present in the store through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore {
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException;
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved


/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,12 @@
@Slf4j
@Singleton
public class MySqlDagManagementStateStore implements DagManagementStateStore {
// todo - these two stores should merge
private DagStateStoreWithDagNodes dagStateStore;
private DagStateStoreWithDagNodes failedDagStateStore;
private final JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
FlowCatalog flowCatalog;
@Getter
Expand All @@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User
private synchronized void start() {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any ideas on handling migration when we roll this out (presuming the failed DagStateStore was not empty)?

topologySpecMap);
// This implementation does not need to update quota usage when the service restarts or when its leadership status
// changes because quota usage are persisted in mysql table. For the same reason, there is no need to call getDags also.
// Also, calling getDags during startUp may fail, because the topologies that are required to deserialize dags may
Expand Down Expand Up @@ -134,10 +129,8 @@ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
@Override
public void markDagFailed(DagManager.DagId dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
this.failedDagStateStore.writeCheckpoint(dag);
this.dagStateStore.cleanUp(dagId);
// todo - updated failedDagStateStore iff cleanup returned 1
// or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed`
dag.setFailedDag(true);
this.dagStateStore.writeCheckpoint(dag);
log.info("Marked dag failed {}", dagId);
}

Expand All @@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws IOException {
log.info("Deleted dag {}", dagId);
}

@Override
public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
this.failedDagStateStore.cleanUp(dagId);
log.info("Deleted failed dag {}", dagId);
}

@Override
public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws IOException {
return Optional.of(this.failedDagStateStore.getDag(dagId));
}

@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before starting a comment. also more brevity; e.g.:

// create all DagNodes as isFailedDag == false

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;

// todo add a column that tells if it is a running dag or a failed dag
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
+ "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), "
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String CREATE_TABLE_STATEMENT =
"CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "is_failed_dag TINYINT(1) DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved
+ "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT =
"INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
protected static final String GET_DAG_NODES_STATEMENT =
"SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
private final ContextAwareCounter totalDagCount;
Expand All @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());

try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
PreparedStatement createStatement = connection.prepareStatement(
String.format(CREATE_TABLE_STATEMENT, tableName))) {
Comment on lines +108 to +109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given arjun just wrote this class a month or two back, please ensure your auto-formatting is what it's supposed to be. it is possible his was off, but let's check. sure we might fix spelling errors, but there should be little reason to reformat files we've only just created

createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
Expand All @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) {
newDag = true;
}
}
Expand Down Expand Up @@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
+ "the DagManager that is replaced by DagProcessingEngine"); }
+ "the DagManager that is replaced by DagProcessingEngine");}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, doesn't this need a newline before }?


@Override
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
Expand Down Expand Up @@ -195,13 +195,14 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.setInt(4, isFailedDag ? 1 : 0);
return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
return dagManagementStateStore.getFailedDag(getDagId());
return dagManagementStateStore.getDag(getDagId());
Copy link
Contributor

@phet phet Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we verify the one returned is actually failed?

}

@Override
Expand Down Expand Up @@ -92,7 +92,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
dagManagementStateStore.addDag(failedDag.get());

// if it fails here, it will check point the failed dag in the (running) dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(getDagId());
dagManagementStateStore.deleteDag(getDagId());

DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), getDagId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, MysqlDagStateStoreWithDagNodes.class.getName())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag" + 1)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
.addPrimitive(MySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 2);
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
Config config = configBuilder.build();
JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
JobStatus dummyJobStatus = JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@

package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.net.URI;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -36,17 +44,22 @@
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.util.DBStatementExecutor;

/**
* Mainly testing functionalities related to DagStateStore but not Mysql-related components.
*/
@Slf4j
public class MysqlDagStateStoreWithDagNodesTest {

private DagStateStore dagStateStore;

private static final String TEST_USER = "testUser";
private static ITestMetastoreDatabase testDb;
private DBStatementExecutor dbStatementExecutor;
private static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node, is_failed_dag FROM %s WHERE parent_dag_id = ?";
private static final String tableName = "dag_node_state_store";

@BeforeClass
public void setUp() throws Exception {
Expand All @@ -63,6 +76,8 @@ public void setUp() throws Exception {
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
this.dagStateStore = new MysqlDagStateStoreWithDagNodes(configBuilder.build(), topologySpecMap);
dbStatementExecutor = new DBStatementExecutor(
MysqlDataSourceFactory.get(configBuilder.build(), SharedResourcesBrokerFactory.getImplicitBroker()), log);
}

@AfterClass(alwaysRun = true)
Expand All @@ -74,7 +89,7 @@ public void tearDown() throws Exception {
}

@Test
public void testAddGetAndDeleteDag() throws Exception{
public void testAddGetAndDeleteDag() throws Exception {
Dag<JobExecutionPlan> originalDag1 = DagTestUtils.buildDag("random_1", 123L);
Dag<JobExecutionPlan> originalDag2 = DagTestUtils.buildDag("random_2", 456L);
DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1);
Expand Down Expand Up @@ -137,4 +152,56 @@ public void testAddGetAndDeleteDag() throws Exception{
Assert.assertNull(this.dagStateStore.getDag(dagId1));
Assert.assertNull(this.dagStateStore.getDag(dagId2));
}

@Test
public void testMarkDagAsFailed() throws Exception {
// Set up initial conditions
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);

this.dagStateStore.writeCheckpoint(dag);

// Fetch all initial states into a list
List<Boolean> initialStates = fetchDagNodeStates(dagId.toString());

// Check Initial State
for (Boolean state : initialStates) {
Assert.assertFalse(state);
}
// Set the DAG as failed
dag.setFailedDag(true);
this.dagStateStore.writeCheckpoint(dag);

// Fetch all states after marking the DAG as failed
List<Boolean> failedStates = fetchDagNodeStates(dagId.toString());

// Check if all states are now true (indicating failure)
for (Boolean state : failedStates) {
Assert.assertTrue(state);
}
dagStateStore.cleanUp(dagId);
Assert.assertNull(this.dagStateStore.getDag(dagId));
}

private List<Boolean> fetchDagNodeStates(String dagId) throws IOException {
List<Boolean> states = new ArrayList<>();

dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
Comment on lines +186 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is behind-the-scenes DB access the only way to validate behavior here? is there no way to access from the "official" DagStateStore, then mark failed and finally re-access from the DSS to verify all nodes have changed?


getStatement.setString(1, dagId.toString());

HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();

try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
states.add(rs.getBoolean(2));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", dagId), e);
}
}, true);

return states;
}
}
Loading