Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables #4032

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

pratapaditya04
Copy link
Contributor

@pratapaditya04 pratapaditya04 commented Aug 16, 2024

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Right now we are maintaining two tables to maintain DagState and Failed Dag State, In this PR ,we have tried to merge FailedDagState tables into DagState by adding a column is_failed_dag in DagState

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@codecov-commenter
Copy link

codecov-commenter commented Aug 19, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 38.80%. Comparing base (e501b62) to head (7e3e556).
Report is 9 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (e501b62) and HEAD (7e3e556). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (e501b62) HEAD (7e3e556)
2 1
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4032      +/-   ##
============================================
- Coverage     45.86%   38.80%   -7.06%     
+ Complexity     3257     1599    -1658     
============================================
  Files           707      388     -319     
  Lines         27865    15995   -11870     
  Branches       2796     1585    -1211     
============================================
- Hits          12779     6207    -6572     
+ Misses        14008     9290    -4718     
+ Partials       1078      498     -580     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Aditya Pratap Singh added 4 commits August 20, 2024 12:04
…nto failed_dag_store_table_merger

# Conflicts:
#	gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
#	gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java
#	gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodesTest.java
@pratapaditya04 pratapaditya04 changed the title [WIP]merged dagNodeStateStore and failedDagNodeStateStore tables [GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables Aug 20, 2024
@@ -48,6 +48,8 @@ public class Dag<T> {
// Map to maintain parent to children mapping.
private Map<DagNode, List<DagNode<T>>> parentChildMap;
private List<DagNode<T>> nodes;
@Setter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we do not persist dag level field in mysql, adding fields to Dag will not be much useful and may lead to bugs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to have this field here as we didn't want to add additional parameters in all the methods to pass on is_failed value.

@arjun4084346
Copy link
Contributor

Should
a) isFailedDag be ONLY within DagNode always, everywhere, with up-to-date value?
b) isFailedDag be within DagNode, but be also in mysql table as a cache; still be always in sync with the field inside DagNode? It may make sql queries more readable.

before completing the review, i would like to understand your thoughts on this.

@pratapaditya04
Copy link
Contributor Author

Should a) isFailedDag be ONLY within DagNode always, everywhere, with up-to-date value? b) isFailedDag be within DagNode, but be also in mysql table as a cache; still be always in sync with the field inside DagNode? It may make sql queries more readable.

before completing the review, i would like to understand your thoughts on this.

If we don't keep it in mysql, we may lose it in case of restarts/deployments, so we will have to store in mysql

Copy link
Contributor

@phet phet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be a nice simplification. looks close

@@ -77,22 +77,11 @@ public interface DagManagementStateStore {

/**
* This marks the dag as a failed one.
* Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
* Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it remain useful to both retrieve the DAG while also asserting that it's failed?

@@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User
private synchronized void start() {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any ideas on handling migration when we roll this out (presuming the failed DagStateStore was not empty)?

@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before starting a comment. also more brevity; e.g.:

// create all DagNodes as isFailedDag == false

Comment on lines +108 to +109
PreparedStatement createStatement = connection.prepareStatement(
String.format(CREATE_TABLE_STATEMENT, tableName))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given arjun just wrote this class a month or two back, please ensure your auto-formatting is what it's supposed to be. it is possible his was off, but let's check. sure we might fix spelling errors, but there should be little reason to reformat files we've only just created

@@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
+ "the DagManager that is replaced by DagProcessingEngine"); }
+ "the DagManager that is replaced by DagProcessingEngine");}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, doesn't this need a newline before }?

@@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
return dagManagementStateStore.getFailedDag(getDagId());
return dagManagementStateStore.getDag(getDagId());
Copy link
Contributor

@phet phet Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we verify the one returned is actually failed?

Comment on lines +186 to +189
private List<Boolean> fetchDagNodeStates(String dagId) throws IOException {
List<Boolean> states = new ArrayList<>();

dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is behind-the-scenes DB access the only way to validate behavior here? is there no way to access from the "official" DagStateStore, then mark failed and finally re-access from the DSS to verify all nodes have changed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants