From e08ac21c951d56ddf01eb3ef59ae35a0747ab5d7 Mon Sep 17 00:00:00 2001 From: Aditya Pratap Singh Date: Fri, 16 Aug 2024 15:00:25 +0530 Subject: [PATCH] merged dagNodeStateStore and failedDagNodeStateStore tables --- .../DagStateStoreWithDagNodes.java | 5 +- .../MySqlDagManagementStateStore.java | 7 +- .../MysqlDagStateStoreWithDagNodes.java | 78 +++++++++++-------- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520c..f2469fb3182 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -40,7 +40,10 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * Refer */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException; + + void markDagAsFailed(Dag dag) throws IOException; + /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b2..6002362a59e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -134,10 +134,7 @@ public void addDag(Dag dag) throws IOException { @Override public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); - this.failedDagStateStore.writeCheckpoint(dag); - this.dagStateStore.cleanUp(dagId); - // todo - updated failedDagStateStore iff cleanup returned 1 - // or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed` + this.failedDagStateStore.markDagAsFailed(dag); log.info("Marked dag failed {}", dagId); } @@ -161,7 +158,7 @@ public Optional> getFailedDag(DagManager.DagId dagId) thro @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false); } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697a..2db56c2a3d6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,18 +77,18 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - // todo add a column that tells if it is a running dag or a failed dag - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " - + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " - + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " - + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "is_failed_dag INT NOT NULL DEFAULT 0, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "PRIMARY KEY (dag_node_id), " + "UNIQUE INDEX dag_node_index (dag_node_id), " + + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node,is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { + public void writeCheckpoint(Dag dag) throws IOException { DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); boolean newDag = false; for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { + if (updateDagNode(dagId, dagNode, false) == 1) { newDag = true; } } @@ -140,6 +140,14 @@ public void writeCheckpoint(Dag dag) } } + public void markDagAsFailed(Dag dag) throws IOException { + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + for (Dag.DagNode dagNode : dag.getNodes()) { + if (updateDagNode(dagId, dagNode, true) == 1) { + } + } + } + @Override public void cleanUp(Dag dag) throws IOException { cleanUp(generateDagId(dag)); @@ -153,7 +161,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException { return deleteStatement.executeUpdate() != 0; } catch (SQLException e) { throw new IOException(String.format("Failure deleting dag for %s", dagId), e); - }}, true); + } + }, true); this.totalDagCount.dec(); return true; } @@ -167,7 +176,8 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine"); + } @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -195,33 +205,37 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) + throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { insertStatement.setString(1, dagNodeId); insertStatement.setString(2, parentDagId.toString()); insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.setInt(4, isFailedDag ? 1 : 0); return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); - }}, true); + } + }, true); } @Override public Set> getDagNodes(DagManager.DagId parentDagId) throws IOException { - return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { - getStatement.setString(1, parentDagId.toString()); - HashSet> dagNodes = new HashSet<>(); - try (ResultSet rs = getStatement.executeQuery()) { - while (rs.next()) { - dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); - } - return dagNodes; - } catch (SQLException e) { - throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); - } - }, true); + return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), + getStatement -> { + getStatement.setString(1, parentDagId.toString()); + HashSet> dagNodes = new HashSet<>(); + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0))); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e); + } + }, true); } @Override