Skip to content

Commit

Permalink
merged dagNodeStateStore and failedDagNodeStateStore tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Aug 16, 2024
1 parent bdbf43a commit e08ac21
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public interface DagStateStoreWithDagNodes extends DagStateStore {
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException;

void markDagAsFailed(Dag<JobExecutionPlan> dag) throws IOException;


/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
@Override
public void markDagFailed(DagManager.DagId dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
this.failedDagStateStore.writeCheckpoint(dag);
this.dagStateStore.cleanUp(dagId);
// todo - updated failedDagStateStore iff cleanup returned 1
// or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed`
this.failedDagStateStore.markDagAsFailed(dag);
log.info("Marked dag failed {}", dagId);
}

Expand All @@ -161,7 +158,7 @@ public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) thro
@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagId, dagNode, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;

// todo add a column that tells if it is a running dag or a failed dag
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
+ "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), "
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String CREATE_TABLE_STATEMENT =
"CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
+ "is_failed_dag INT NOT NULL DEFAULT 0, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT =
"INSERT INTO %s (dag_node_id, parent_dag_id, dag_node,is_failed_dag) "
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
Expand All @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());

try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
PreparedStatement createStatement = connection.prepareStatement(
String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
Expand All @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
if (updateDagNode(dagId, dagNode, false) == 1) {
newDag = true;
}
}
Expand All @@ -140,6 +140,14 @@ public void writeCheckpoint(Dag<JobExecutionPlan> dag)
}
}

public void markDagAsFailed(Dag<JobExecutionPlan> dag) throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode, true) == 1) {
}
}
}

@Override
public void cleanUp(Dag<JobExecutionPlan> dag) throws IOException {
cleanUp(generateDagId(dag));
Expand All @@ -153,7 +161,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}}, true);
}
}, true);
this.totalDagCount.dec();
return true;
}
Expand All @@ -167,7 +176,8 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
+ "the DagManager that is replaced by DagProcessingEngine"); }
+ "the DagManager that is replaced by DagProcessingEngine");
}

@Override
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
Expand Down Expand Up @@ -195,33 +205,37 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag)
throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.setInt(4, isFailedDag ? 1 : 0);
return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
}}, true);
}
}, true);
}

@Override
public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDagId) throws IOException {
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName),
getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
}

@Override
Expand Down

0 comments on commit e08ac21

Please sign in to comment.