From ccbb9b5a0dbc8a80870c2a51aa215314ab856473 Mon Sep 17 00:00:00 2001 From: alex-hancock Date: Wed, 30 Aug 2017 15:09:44 -0700 Subject: [PATCH 1/4] Cleaned up comments, clarified error messages and added more stderr debugging to aid when failures occur. --- luigi-interface/monitor.py | 377 +++++++++++++++++++------------------ 1 file changed, 190 insertions(+), 187 deletions(-) diff --git a/luigi-interface/monitor.py b/luigi-interface/monitor.py index 2577e85..19cbb52 100644 --- a/luigi-interface/monitor.py +++ b/luigi-interface/monitor.py @@ -14,179 +14,184 @@ from datetime import datetime def get_touchfile(bucket_name, touchfile_name): - s3 = boto.connect_s3() - bucket = s3.get_bucket(bucket_name, validate=False) + s3 = boto.connect_s3() + bucket = s3.get_bucket(bucket_name, validate=False) - key = bucket.new_key(touchfile_name) - contents = key.get_contents_as_string() - return contents + key = bucket.new_key(touchfile_name) + contents = key.get_contents_as_string() + return contents # # Luigi Scraping below # def get_job_list(): - server = os.getenv("LUIGI_SERVER") + ":" + os.getenv("LUIGI_PORT", "8082") + "/api/" - - running_url = server + "task_list?data=%7B%22status%22%3A%22RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - batch_url = server + "task_list?data=%7B%22status%22%3A%22BATCH_RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - failed_url = server + "task_list?data=%7B%22status%22%3A%22FAILED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - upfail_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_FAILED%22%2C%22search%22%3A%22%22%7D" - disable_url = server + "task_list?data=%7B%22status%22%3A%22DISABLED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - updisable_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_DISABLED%22%2C%22search%22%3A%22%22%7D" - pending_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - done_url = server + "task_list?data=%7B%22status%22%3A%22DONE%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - - list_of_URLs = [running_url, batch_url, failed_url, upfail_url, - disable_url, updisable_url, pending_url, done_url] - - relevant_attributes = ["status", "name", "start_time", "params"] - required_parameters = ["project", "donor_id", "sample_id", "pipeline_name"] - - local_job_list = {} - for URL in list_of_URLs: - name = URL[62:] - suffix = "" - if "UPSTREAM" in name: - if "FAILED" in name: - name = "UPSTREAM_FAILED" - else: - name = "UPSTREAM_DISABLED" - else: - name = name.split("%")[0] + suffix - - # Retrieve api tool dump from URL and read it into json_tools - print "URL: ", URL - req = urllib2.Request(URL) - response = urllib2.urlopen(req) - text_tools = response.read() - print "TEXT TOOLS:", text_tools - json_tools = json.loads(text_tools) - - luigi_job_list = json_tools["response"] - - if not luigi_job_list: - # Just skip an empty response - continue - - for job in luigi_job_list: - local_job_list[job] = luigi_job_list[job] - - return local_job_list + server = os.getenv("LUIGI_SERVER") + ":" + os.getenv("LUIGI_PORT", "8082") + "/api/" + running_url = server + "task_list?data=%7B%22status%22%3A%22RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + batch_url = server + "task_list?data=%7B%22status%22%3A%22BATCH_RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + failed_url = server + "task_list?data=%7B%22status%22%3A%22FAILED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + upfail_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_FAILED%22%2C%22search%22%3A%22%22%7D" + disable_url = server + "task_list?data=%7B%22status%22%3A%22DISABLED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + updisable_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_DISABLED%22%2C%22search%22%3A%22%22%7D" + pending_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + done_url = server + "task_list?data=%7B%22status%22%3A%22DONE%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + + list_of_URLs = [running_url, batch_url, failed_url, upfail_url, + disable_url, updisable_url, pending_url, done_url] + + relevant_attributes = ["status", "name", "start_time", "params"] + required_parameters = ["project", "donor_id", "sample_id", "pipeline_name"] + + local_job_list = {} + for URL in list_of_URLs: + name = URL[62:] + suffix = "" + if "UPSTREAM" in name: + if "FAILED" in name: + name = "UPSTREAM_FAILED" + else: + name = "UPSTREAM_DISABLED" + else: + name = name.split("%")[0] + suffix + + # Retrieve api tool dump from URL and read it into json_tools + print "URL: ", URL + req = urllib2.Request(URL) + response = urllib2.urlopen(req) + text_tools = response.read() + print "TEXT TOOLS:", text_tools + json_tools = json.loads(text_tools) + + luigi_job_list = json_tools["response"] + + if not luigi_job_list: + # Just skip an empty response + continue + + for job in luigi_job_list: + local_job_list[job] = luigi_job_list[job] + + return local_job_list def query_to_list(result_proxy): - # Can use iterator to create list, - # but result from SQLAlchemy isn't - # great for other techniques used here - return [row for row in result_proxy] + # Can use iterator to create list, + # but result from SQLAlchemy isn't + # great for other techniques used here + return [row for row in result_proxy] def get_consonance_status(consonance_uuid): - cmd = ['consonance', 'status', '--job_uuid', str(consonance_uuid)] - status_text = subprocess.check_output(cmd) - return json.loads(status_text) + cmd = ['consonance', 'status', '--job_uuid', str(consonance_uuid)] + status_text = subprocess.check_output(cmd) + return json.loads(status_text) # This was exported to a method to avoid duplication # for both the creation and update timestamps def format_consonance_timestamp(consonance_timestamp): - datetime_obj = datetime.strptime(consonance_timestamp, '%Y-%m-%dT%H:%M:%S.%f+0000') - return datetime.strftime(datetime_obj, '%Y-%m-%d %H:%M') + datetime_obj = datetime.strptime(consonance_timestamp, '%Y-%m-%dT%H:%M:%S.%f+0000') + return datetime.strftime(datetime_obj, '%Y-%m-%d %H:%M') # # Database initialization, creation if table doesn't exist # # Change echo to True to show SQL code... unnecessary for production +# db = create_engine('postgresql://{}:{}@db/{}'.format(os.getenv("POSTGRES_USER"), os.getenv("POSTGRES_PASSWORD"), os.getenv("POSTGRES_DB")), echo=False) conn = db.connect() metadata = MetaData(db) luigi = Table('luigi', metadata, - Column("luigi_job", String(100), primary_key=True), - Column("status", String(20)), - - Column("submitter_specimen_id", String(100)), - Column("specimen_uuid", String(100)), - Column("workflow_name", String(100)), - Column("center_name", String(100)), - Column("submitter_donor_id", String(100)), - Column("consonance_job_uuid", String(100)), - Column("submitter_donor_primary_site", String(100)), - Column("project", String(100)), - Column("analysis_type", String(100)), - Column("program", String(100)), - Column("donor_uuid", String(100)), - Column("submitter_sample_id", String(100)), - Column("submitter_experimental_design", String(100)), - Column("submitter_specimen_type", String(100)), - Column("workflow_version", String(100)), - Column("sample_uuid", String(100)), - - Column("start_time", String(100)), - Column("last_updated", String(100)) -) + Column("luigi_job", String(100), primary_key=True), + Column("status", String(20)), + + Column("submitter_specimen_id", String(100)), + Column("specimen_uuid", String(100)), + Column("workflow_name", String(100)), + Column("center_name", String(100)), + Column("submitter_donor_id", String(100)), + Column("consonance_job_uuid", String(100)), + Column("submitter_donor_primary_site", String(100)), + Column("project", String(100)), + Column("analysis_type", String(100)), + Column("program", String(100)), + Column("donor_uuid", String(100)), + Column("submitter_sample_id", String(100)), + Column("submitter_experimental_design", String(100)), + Column("submitter_specimen_type", String(100)), + Column("workflow_version", String(100)), + Column("sample_uuid", String(100)), + + Column("start_time", String(100)), + Column("last_updated", String(100))) + if not db.dialect.has_table(db, luigi): - luigi.create() + luigi.create() jobList = get_job_list() -#print jobList for job in jobList: - job_dict = jobList[job] - - # - # S3 Scraping below - # - try: - s3string = job_dict['params']['touch_file_path'] - bucket_name, filepath = s3string.split('/', 1) - touchfile_name = filepath + '/' + \ + job_dict = jobList[job] + + # + # S3 scraping below + # + try: + s3string = job_dict['params']['touch_file_path'] + bucket_name, filepath = s3string.split('/', 1) + touchfile_name = filepath + '/' + \ job_dict['params']['metadata_json_file_name'] print "TOUCH FILE NAME:", touchfile_name - stringContents = get_touchfile(bucket_name, touchfile_name) - jsonMetadata = json.loads(stringContents) - except: - # Hardcoded jsonMetadata - print >>sys.stderr, "Problems with s3 retrieval" - print >>sys.stderr, job_dict - continue - - select_query = select([luigi]).where(luigi.c.luigi_job == job) - select_exist_result = query_to_list(conn.execute(select_query)) - - try: - status_json = get_consonance_status(jsonMetadata['consonance_job_uuid']) - except: - # Default to Luigi statusand timestamps - status_json = { - 'create_timestamp' : job_dict['start_time'], - 'update_timestamp' : job_dict['last_updated'], - 'state' : 'LUIGI:' + job_dict['status'] - } - - if len(select_exist_result) == 0: - try: - # insert into db - ins_query = luigi.insert().values(luigi_job=job, - submitter_specimen_id=jsonMetadata['submitter_specimen_id'], - specimen_uuid=jsonMetadata['specimen_uuid'], - workflow_name=jsonMetadata['workflow_name'], - center_name=jsonMetadata['center_name'], - submitter_donor_id=jsonMetadata['submitter_donor_id'], - consonance_job_uuid=jsonMetadata['consonance_job_uuid'], - submitter_donor_primary_site=jsonMetadata['submitter_donor_primary_site'], - project=jsonMetadata['project'], - analysis_type=jsonMetadata['analysis_type'], - program=jsonMetadata['program'], - donor_uuid=jsonMetadata['donor_uuid'], - submitter_sample_id=jsonMetadata['submitter_sample_id'], - submitter_experimental_design=jsonMetadata['submitter_experimental_design'], - submitter_specimen_type=jsonMetadata['submitter_specimen_type'], - workflow_version=jsonMetadata['workflow_version'], - sample_uuid=jsonMetadata['sample_uuid'] - ) - exec_result = conn.execute(ins_query) - except Exception as e: - print >>sys.stderr, e.message, e.args - print "Dumping jsonMetadata to aid debug:\n", jsonMetadata - continue + stringContents = get_touchfile(bucket_name, touchfile_name) + jsonMetadata = json.loads(stringContents) + except Exception as e: + # Hardcoded jsonMetadata + print >>sys.stderr, e.message, e.args + print >>sys.stderr, "Failure when connecting to S3, dumping job dictionary:", job_dict + continue + + # + # Consonance scraping below + # + try: + # Use uuid from S3 + status_json = get_consonance_status(jsonMetadata['consonance_job_uuid']) + except: + # Default to Luigi status and timestamps + status_json = { + 'create_timestamp' : job_dict['start_time'], + 'update_timestamp' : job_dict['last_updated'], + 'state' : 'LUIGI:' + job_dict['status'] + } + + # + # Find if current job is already listed in + # job database, insert if absent + # + select_query = select([luigi]).where(luigi.c.luigi_job == job) + select_result = query_to_list(conn.execute(select_query)) + if len(select_result) == 0: + try: + ins_query = luigi.insert().values(luigi_job=job, + submitter_specimen_id=jsonMetadata['submitter_specimen_id'], + specimen_uuid=jsonMetadata['specimen_uuid'], + workflow_name=jsonMetadata['workflow_name'], + center_name=jsonMetadata['center_name'], + submitter_donor_id=jsonMetadata['submitter_donor_id'], + consonance_job_uuid=jsonMetadata['consonance_job_uuid'], + submitter_donor_primary_site=jsonMetadata['submitter_donor_primary_site'], + project=jsonMetadata['project'], + analysis_type=jsonMetadata['analysis_type'], + program=jsonMetadata['program'], + donor_uuid=jsonMetadata['donor_uuid'], + submitter_sample_id=jsonMetadata['submitter_sample_id'], + submitter_experimental_design=jsonMetadata['submitter_experimental_design'], + submitter_specimen_type=jsonMetadata['submitter_specimen_type'], + workflow_version=jsonMetadata['workflow_version'], + sample_uuid=jsonMetadata['sample_uuid']) + exec_result = conn.execute(ins_query) + except Exception as e: + print >>sys.stderr, e.message, e.args + print "Dumping jsonMetadata to aid debug:\n", jsonMetadata + continue +# # Get Consonance status for each entry in our db # # Select all from the table, pipe results into a list @@ -199,48 +204,46 @@ def format_consonance_timestamp(consonance_timestamp): select_result = conn.execute(select_query) result_list = [dict(row) for row in select_result] for job in result_list: - try: - job_name = job['luigi_job'] - job_uuid = job['consonance_job_uuid'] - - if job_uuid == "no consonance id in test mode": - # Skip test mode Consonance ID's - # and force next job - print "\nTest ID, skipping" - - stmt = luigi.delete().\ - where(luigi.c.luigi_job == job_name) - exec_result = conn.execute(stmt) - else: - # DEBUG - # Consonace job id is real - print "\nJOB NAME:", job_uuid - - status_json = get_consonance_status(job_uuid) - state = status_json['state'] - created = format_consonance_timestamp(status_json['create_timestamp']) - updated = format_consonance_timestamp(status_json['update_timestamp']) - - # DEBUG to check if state, created, and updated are collected - print "STATE:", state - print "CREATED:", created - print "UPDATED:", updated - - stmt = luigi.update().\ - where(luigi.c.luigi_job == job_name).\ - values(status=status_json['state'], - start_time=created, - last_updated=updated) - exec_result = conn.execute(stmt) - - except Exception as e: - print >>sys.stderr, "ERROR:", str(e) - - state = 'JOB NOT FOUND' - - stmt = luigi.update().\ - where((and_(luigi.c.luigi_job == job_name, - luigi.c.status != 'SUCCESS', - luigi.c.status != 'FAILED'))).\ - values(status=state) - exec_result = conn.execute(stmt) + try: + job_name = job['luigi_job'] + job_uuid = job['consonance_job_uuid'] + + if job_uuid == "no consonance id in test mode": + # Skip test mode Consonance ID's + # and force next job + print "\nTest ID, skipping" + + stmt = luigi.delete().\ + where(luigi.c.luigi_job == job_name) + exec_result = conn.execute(stmt) + else: + # Consonace job id is real + print "\nJOB NAME:", job_uuid + + status_json = get_consonance_status(job_uuid) + state = status_json['state'] + created = format_consonance_timestamp(status_json['create_timestamp']) + updated = format_consonance_timestamp(status_json['update_timestamp']) + + # DEBUG to check if state, created, and updated are collected + print "STATE:", state + print "CREATED:", created + print "UPDATED:", updated + + stmt = luigi.update().\ + where(luigi.c.luigi_job == job_name).\ + values(status=status_json['state'], + start_time=created, + last_updated=updated) + exec_result = conn.execute(stmt) + + except Exception as e: + print >>sys.stderr, e.message, e.args + print >>sys.stderr, "Dumping job entry:", job + + stmt = luigi.update().\ + where((and_(luigi.c.luigi_job == job_name, + luigi.c.status != 'SUCCESS', + luigi.c.status != 'FAILED'))).\ + values(status='JOB NOT FOUND') + exec_result = conn.execute(stmt) From 5f719d2d3845b4399b66a31bbec6caa7c1950c0b Mon Sep 17 00:00:00 2001 From: alex-hancock Date: Wed, 30 Aug 2017 15:54:15 -0700 Subject: [PATCH 2/4] Changed formatting with respect to flake8 --- luigi-interface/monitor.py | 153 ++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 78 deletions(-) diff --git a/luigi-interface/monitor.py b/luigi-interface/monitor.py index 19cbb52..b0e5b00 100644 --- a/luigi-interface/monitor.py +++ b/luigi-interface/monitor.py @@ -1,5 +1,5 @@ # Alex Hancock, UCSC CGL -# +# # Luigi Monitor import boto @@ -9,10 +9,10 @@ import sys import urllib2 -from boto.s3.key import Key -from sqlalchemy import * +from sqlalchemy import create_engine, MetaData, Table, Column, String, select, and_ from datetime import datetime + def get_touchfile(bucket_name, touchfile_name): s3 = boto.connect_s3() bucket = s3.get_bucket(bucket_name, validate=False) @@ -21,26 +21,22 @@ def get_touchfile(bucket_name, touchfile_name): contents = key.get_contents_as_string() return contents -# -# Luigi Scraping below -# + def get_job_list(): + # Scrape Luigi server = os.getenv("LUIGI_SERVER") + ":" + os.getenv("LUIGI_PORT", "8082") + "/api/" - running_url = server + "task_list?data=%7B%22status%22%3A%22RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - batch_url = server + "task_list?data=%7B%22status%22%3A%22BATCH_RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - failed_url = server + "task_list?data=%7B%22status%22%3A%22FAILED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - upfail_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_FAILED%22%2C%22search%22%3A%22%22%7D" - disable_url = server + "task_list?data=%7B%22status%22%3A%22DISABLED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + running_url = server + "task_list?data=%7B%22status%22%3A%22RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + batch_url = server + "task_list?data=%7B%22status%22%3A%22BATCH_RUNNING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + failed_url = server + "task_list?data=%7B%22status%22%3A%22FAILED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + upfail_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_FAILED%22%2C%22search%22%3A%22%22%7D" + disable_url = server + "task_list?data=%7B%22status%22%3A%22DISABLED%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" updisable_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22UPSTREAM_DISABLED%22%2C%22search%22%3A%22%22%7D" - pending_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - done_url = server + "task_list?data=%7B%22status%22%3A%22DONE%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + pending_url = server + "task_list?data=%7B%22status%22%3A%22PENDING%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" + done_url = server + "task_list?data=%7B%22status%22%3A%22DONE%22%2C%22upstream_status%22%3A%22%22%2C%22search%22%3A%22%22%7D" - list_of_URLs = [running_url, batch_url, failed_url, upfail_url, + list_of_URLs = [running_url, batch_url, failed_url, upfail_url, disable_url, updisable_url, pending_url, done_url] - relevant_attributes = ["status", "name", "start_time", "params"] - required_parameters = ["project", "donor_id", "sample_id", "pipeline_name"] - local_job_list = {} for URL in list_of_URLs: name = URL[62:] @@ -72,23 +68,27 @@ def get_job_list(): return local_job_list + def query_to_list(result_proxy): # Can use iterator to create list, # but result from SQLAlchemy isn't # great for other techniques used here return [row for row in result_proxy] + def get_consonance_status(consonance_uuid): cmd = ['consonance', 'status', '--job_uuid', str(consonance_uuid)] status_text = subprocess.check_output(cmd) return json.loads(status_text) + # This was exported to a method to avoid duplication # for both the creation and update timestamps def format_consonance_timestamp(consonance_timestamp): datetime_obj = datetime.strptime(consonance_timestamp, '%Y-%m-%dT%H:%M:%S.%f+0000') return datetime.strftime(datetime_obj, '%Y-%m-%d %H:%M') + # # Database initialization, creation if table doesn't exist # @@ -98,28 +98,26 @@ def format_consonance_timestamp(consonance_timestamp): conn = db.connect() metadata = MetaData(db) luigi = Table('luigi', metadata, - Column("luigi_job", String(100), primary_key=True), - Column("status", String(20)), - - Column("submitter_specimen_id", String(100)), - Column("specimen_uuid", String(100)), - Column("workflow_name", String(100)), - Column("center_name", String(100)), - Column("submitter_donor_id", String(100)), - Column("consonance_job_uuid", String(100)), - Column("submitter_donor_primary_site", String(100)), - Column("project", String(100)), - Column("analysis_type", String(100)), - Column("program", String(100)), - Column("donor_uuid", String(100)), - Column("submitter_sample_id", String(100)), - Column("submitter_experimental_design", String(100)), - Column("submitter_specimen_type", String(100)), - Column("workflow_version", String(100)), - Column("sample_uuid", String(100)), - - Column("start_time", String(100)), - Column("last_updated", String(100))) + Column("luigi_job", String(100), primary_key=True), + Column("status", String(20)), + Column("submitter_specimen_id", String(100)), + Column("specimen_uuid", String(100)), + Column("workflow_name", String(100)), + Column("center_name", String(100)), + Column("submitter_donor_id", String(100)), + Column("consonance_job_uuid", String(100)), + Column("submitter_donor_primary_site", String(100)), + Column("project", String(100)), + Column("analysis_type", String(100)), + Column("program", String(100)), + Column("donor_uuid", String(100)), + Column("submitter_sample_id", String(100)), + Column("submitter_experimental_design", String(100)), + Column("submitter_specimen_type", String(100)), + Column("workflow_version", String(100)), + Column("sample_uuid", String(100)), + Column("start_time", String(100)), + Column("last_updated", String(100))) if not db.dialect.has_table(db, luigi): luigi.create() @@ -127,17 +125,17 @@ def format_consonance_timestamp(consonance_timestamp): jobList = get_job_list() for job in jobList: - job_dict = jobList[job] - + job_dict = jobList[job] + # # S3 scraping below # try: s3string = job_dict['params']['touch_file_path'] bucket_name, filepath = s3string.split('/', 1) - touchfile_name = filepath + '/' + \ - job_dict['params']['metadata_json_file_name'] - print "TOUCH FILE NAME:", touchfile_name + touchfile_name = filepath + '/' + job_dict['params']['metadata_json_file_name'] + + print "TOUCH FILE NAME:", touchfile_name stringContents = get_touchfile(bucket_name, touchfile_name) jsonMetadata = json.loads(stringContents) except Exception as e: @@ -146,7 +144,7 @@ def format_consonance_timestamp(consonance_timestamp): print >>sys.stderr, "Failure when connecting to S3, dumping job dictionary:", job_dict continue - # + # # Consonance scraping below # try: @@ -155,13 +153,13 @@ def format_consonance_timestamp(consonance_timestamp): except: # Default to Luigi status and timestamps status_json = { - 'create_timestamp' : job_dict['start_time'], - 'update_timestamp' : job_dict['last_updated'], - 'state' : 'LUIGI:' + job_dict['status'] + 'create_timestamp': job_dict['start_time'], + 'update_timestamp': job_dict['last_updated'], + 'state': 'LUIGI:' + job_dict['status'] } # - # Find if current job is already listed in + # Find if current job is already listed in # job database, insert if absent # select_query = select([luigi]).where(luigi.c.luigi_job == job) @@ -169,23 +167,23 @@ def format_consonance_timestamp(consonance_timestamp): if len(select_result) == 0: try: ins_query = luigi.insert().values(luigi_job=job, - submitter_specimen_id=jsonMetadata['submitter_specimen_id'], - specimen_uuid=jsonMetadata['specimen_uuid'], - workflow_name=jsonMetadata['workflow_name'], - center_name=jsonMetadata['center_name'], - submitter_donor_id=jsonMetadata['submitter_donor_id'], - consonance_job_uuid=jsonMetadata['consonance_job_uuid'], - submitter_donor_primary_site=jsonMetadata['submitter_donor_primary_site'], - project=jsonMetadata['project'], - analysis_type=jsonMetadata['analysis_type'], - program=jsonMetadata['program'], - donor_uuid=jsonMetadata['donor_uuid'], - submitter_sample_id=jsonMetadata['submitter_sample_id'], - submitter_experimental_design=jsonMetadata['submitter_experimental_design'], - submitter_specimen_type=jsonMetadata['submitter_specimen_type'], - workflow_version=jsonMetadata['workflow_version'], - sample_uuid=jsonMetadata['sample_uuid']) - exec_result = conn.execute(ins_query) + submitter_specimen_id=jsonMetadata['submitter_specimen_id'], + specimen_uuid=jsonMetadata['specimen_uuid'], + workflow_name=jsonMetadata['workflow_name'], + center_name=jsonMetadata['center_name'], + submitter_donor_id=jsonMetadata['submitter_donor_id'], + consonance_job_uuid=jsonMetadata['consonance_job_uuid'], + submitter_donor_primary_site=jsonMetadata['submitter_donor_primary_site'], + project=jsonMetadata['project'], + analysis_type=jsonMetadata['analysis_type'], + program=jsonMetadata['program'], + donor_uuid=jsonMetadata['donor_uuid'], + submitter_sample_id=jsonMetadata['submitter_sample_id'], + submitter_experimental_design=jsonMetadata['submitter_experimental_design'], + submitter_specimen_type=jsonMetadata['submitter_specimen_type'], + workflow_version=jsonMetadata['workflow_version'], + sample_uuid=jsonMetadata['sample_uuid']) + exec_result = conn.execute(ins_query) except Exception as e: print >>sys.stderr, e.message, e.args print "Dumping jsonMetadata to aid debug:\n", jsonMetadata @@ -193,9 +191,9 @@ def format_consonance_timestamp(consonance_timestamp): # # Get Consonance status for each entry in our db -# +# # Select all from the table, pipe results into a list -# +# # for job in list # consonance status using job.consonance_uuid # update that job using the information from status return @@ -213,8 +211,7 @@ def format_consonance_timestamp(consonance_timestamp): # and force next job print "\nTest ID, skipping" - stmt = luigi.delete().\ - where(luigi.c.luigi_job == job_name) + stmt = luigi.delete().where(luigi.c.luigi_job == job_name) exec_result = conn.execute(stmt) else: # Consonace job id is real @@ -231,10 +228,10 @@ def format_consonance_timestamp(consonance_timestamp): print "UPDATED:", updated stmt = luigi.update().\ - where(luigi.c.luigi_job == job_name).\ - values(status=status_json['state'], - start_time=created, - last_updated=updated) + where(luigi.c.luigi_job == job_name).\ + values(status=status_json['state'], + start_time=created, + last_updated=updated) exec_result = conn.execute(stmt) except Exception as e: @@ -242,8 +239,8 @@ def format_consonance_timestamp(consonance_timestamp): print >>sys.stderr, "Dumping job entry:", job stmt = luigi.update().\ - where((and_(luigi.c.luigi_job == job_name, - luigi.c.status != 'SUCCESS', - luigi.c.status != 'FAILED'))).\ - values(status='JOB NOT FOUND') + where((and_(luigi.c.luigi_job == job_name, + luigi.c.status != 'SUCCESS', + luigi.c.status != 'FAILED'))).\ + values(status='JOB NOT FOUND') exec_result = conn.execute(stmt) From 21a14f4ceb995230de75d64b02068c8352738c61 Mon Sep 17 00:00:00 2001 From: alex-hancock Date: Thu, 31 Aug 2017 16:59:54 -0700 Subject: [PATCH 3/4] Changed mapi to pull distinct values from monitor database, fixes duplicates in monitor table. --- mapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapi.py b/mapi.py index 694dc44..f02b9f0 100644 --- a/mapi.py +++ b/mapi.py @@ -1578,7 +1578,7 @@ def get_action_service(): Column("start_time", String(100)), Column("last_updated", String(100)) ) - select_query = select([luigi]).order_by("last_updated") + select_query = select([luigi], distinct=True).order_by("last_updated") select_result = conn.execute(select_query) result_list = [dict(row) for row in select_result] return jsonify(result_list) From 8f03e70667c2c0985f70762792fea5a049af6157 Mon Sep 17 00:00:00 2001 From: Walt Shands Date: Tue, 5 Sep 2017 17:32:46 +0000 Subject: [PATCH 4/4] select distinct rows in action service db --- mapi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapi.py b/mapi.py index f02b9f0..16c446d 100644 --- a/mapi.py +++ b/mapi.py @@ -1578,7 +1578,7 @@ def get_action_service(): Column("start_time", String(100)), Column("last_updated", String(100)) ) - select_query = select([luigi], distinct=True).order_by("last_updated") + select_query = select([luigi]).distinct("last_updated").order_by("last_updated") select_result = conn.execute(select_query) result_list = [dict(row) for row in select_result] return jsonify(result_list)