-
Notifications
You must be signed in to change notification settings - Fork 13
/
SAClient.py
298 lines (277 loc) · 11 KB
/
SAClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# Wrapper for the IBM Application Security on Cloud SAClient Utility
# Use this to automate the creation of IRX files, Scans in ASoC, and downloading scan results
# Author: Cody Travis
# Email: [email protected]
# Date: 2018-05-06
import sys
import json
import subprocess
import io
import datetime
import os
import time
# Function: loginASoC
# Description:
# Wraps the appscan api_login command
# Authenticate to the ASoC Service using API Key
# Requires:
# keyId - The API Key Id used for authentication
# keySecret - The API Key Secret used for authentication
# Optional:
# persist - (optional) Set to true if you want the client to attempt
# to reauthenticate when the token expires
# Returns:
# True if login is successful
# False if login fails
def loginASoC(keyId, keySecret, persist=False):
if(persist):
cmd = ["appscan", "api_login", "-u", keyId, "-P", keySecret, "-persist"]
else:
cmd = ["appscan", "api_login", "-u", keyId, "-P", keySecret]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
return output.decode('ascii').find("Authenticated successfully")>=0
# Function destroyToken
# Description:
# Removes the cli.token file which is used by SAClient to authenticate to the service
# Call this when the automation process is complete to invalidate the session
# The default location of this file is %HOME%/.appscan/cli.token
# If the HOME env variable is not set, you will need to specify the location manually
# Optional:
# tokenPath - The file path to the cli.token file. Leave blank to use the default location
# Returns:
# True if the file was deleted successfully
# False if the file was not deleted or does not exist
def destroyToken(tokenPath=None):
if(tokenPath is None):
homeDir = os.getenv("HOME")
if(homeDir is None):
return False
tokenPath = homeDir + "/.appscan/cli.token"
if(os.path.exists(tokenPath)):
os.remove(tokenPath)
return True
return False
# Function: generateIRX
# Description:
# Wraps the appscan prepare command
# Generates an IRX File configured by the supplied appscan-config.xml file
# Requires:
# scanName - This will be the base name of the irx file
# configFile - The name or path to a config file that will be used to generate the IRX
# Returns:
# ScanName The name of the IRX file (scan name + a timestamp)
# None - if IRX generation is not successful
def generateIRX(scanName, configFile):
scanName = scanName+"_"+getTimeStamp()
stdoutFile = scanName+'_stdout.txt'
with io.open(stdoutFile, 'wb') as writer, io.open(stdoutFile, 'rb', 1) as reader:
process = subprocess.Popen(["appscan", "prepare", "-c", configFile, "-n", scanName], stdout=writer, shell=True)
while process.poll() is None:
sys.stdout.write(reader.read().decode('ascii'))
time.sleep(0.5)
sys.stdout.write(str(reader.read().decode('ascii')))
#if(fileContains(stdoutFile, "IRX file generation successful.")):
if(os.path.exists(scanName + ".irx")):
return scanName
else:
return None
# Function: getReport
# Description:
# Wraps the appscan get_result command
# Downloads an html scan report or a zip file that contains all available reports (xml, raw xml, html)
# Requires:
# scanId - The Id of this scan as generated by "queue_analysis"
# filePath - The destination file path of the report file
# Optional:
# type - The type of file returned. Possible values are "html" (default) and "zip"
# Returns:
# True - If the report was saved successfully
# False - If the report was not saved
# Exceptions:
# UnauthenticatedException - if not logged in
# ScanNotFoundException - if the scanId is not valid
def getReport(scanId, filePath, type="html"):
p = subprocess.Popen(["appscan", "get_result", "-d", filePath, "-i", scanId, "-t", type], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if(isLoggedIn(err.decode()) == False):
raise UnauthenticatedException()
if(err.decode().find("Invalid UUID")>=0):
raise ScanNotFoundException()
return output.decode().find("Results retrieved successfully")>=0
# Function: getScanSummary
# Description:
# Wraps the appscan info command
# Gets the summary of a scan that is in progress or complete
# Requires:
# scanId - The Id of this scan as generated by "queue_analysis"
# Optional:
# fileName - The destination file to save the json response
# Returns:
# The json object that contains the scan summary information or None if something went wrong
# Exceptions:
# UnauthenticatedException - raised if not logged in
# ScanNotFoundException - raised if the scanId is not valid
def getScanSummary(scanId, fileName=None):
p = subprocess.Popen(["appscan", "info", "-i", scanId, "-json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if(isLoggedIn(err.decode()) == False):
raise UnauthenticatedException()
if(err.decode().find("Invalid UUID")>=0):
raise ScanNotFoundException()
jsonObj = json.loads(output.decode())
if(fileName is not None):
f = open(fileName, 'w+')
f.write(json.dumps(jsonObj))
return jsonObj
# Function: getScanStatus
# Description:
# Wraps the appscan status command
# Gets the status of a scan
# Requires:
# scanId - The Id of this scan as generated by "queue_analysis"
# Returns:
# The status of the scan - possible values are Ready, Running, Failed, or InQueue
# Exceptions:
# UnauthenticatedException - raised if not logged in
# ScanNotFoundException - raised if the scanId is not valid
def getScanStatus(scanId):
p = subprocess.Popen(["appscan", "status", "-i", scanId], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if(isLoggedIn(err.decode()) == False):
raise UnauthenticatedException()
if(err.decode().find("Invalid UUID")>=0):
raise ScanNotFoundException()
status = output.decode()
if((status == "Running") or (status == "Ready") or (status == "Failed") or (status == "InQueue")):
return status
else:
return None
# Function: getAppList
# Description:
# Wraps the appscan list_apps command
# Gets a list of application names and Id's associated with the ASoC Account
# Returns:
# A Dictionary object of the format appList["appname"] = appId
# Exceptions:
# UnauthenticatedException - if not logged in
def getAppList():
appList = {}
p = subprocess.Popen(["appscan", "list_apps"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if(isLoggedIn(err.decode()) == False):
raise UnauthenticatedException()
lines = output.decode().split('\n')
for line in lines:
temp = line.split(' [ ')
if(len(temp)<2):
continue
#appList[temp[0]] = temp[1].split(' ]')[0]
appList[temp[1].split(' ]')[0]] = temp[0]
return appList
# Function: queueAnalysis
# Description:
# Wraps the appscan queue_analysis command
# Uploads an IRX file for analysis
# Requires:
# irxFile - The path to the IRX file to be scanned
# scanName - The name of the scan to be created
# appId - The application Id to associate the scan results with
# Returns:
# The scanId of the scan that was created, or None if something went wrong
# Exceptions:
# UnauthenticatedException - if not logged in
# ApplicationNotException - if the appId is not valid or user does not have permission to access it
def queueAnalysis(irxFile, scanName, appId):
p = subprocess.Popen(["appscan", "queue_analysis", "-f", irxFile, "-n", scanName, "-a", appId], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(output, err) = p.communicate()
p_status = p.wait()
if(isLoggedIn(err.decode()) == False):
raise UnauthenticatedException()
if(err.decode().find("Could not associate scan with application")>=0):
raise ApplicationNotException()
lines = output.decode().split('\n')
return lines[-1]
# Function: waitForScan
# Description:
# Waits for a scan to reach a target status
# Or until the timeout has been exceeded
# Requires:
# scanId - The scanId of the scan to monitor
# Optional:
# timeoutMins - The amount of minutes to wait for the scan to reach target status before giving update (default 90)
# targetStatus - The target Status waiting for the scan to reach (default "Ready")
# Returns:
# last known scan status or "timeout" if the timeout was reached
# Exceptions:
# UnauthenticatedException - if not logged in
# ScanNotFoundException - if the scanId is not valid or user does not have permission to access it
def waitForScan(scanId, timeoutMins=360, targetStatus="Ready", printStatusEveryMins=5):
summary = getScanSummary(scanId)
curStatus = summary["LatestExecution"]["Status"]
minsWaited = 0
while(curStatus != targetStatus):
if(curStatus == "Failed"):
break
time.sleep(60) #Sleep 1 min
minsWaited += 1
if(minsWaited >= timeoutMins):
return "Timeout"
summary = getScanSummary(scanId)
curStatus = summary["LatestExecution"]["Status"]
if((printStatusEveryMins > 0) and (minsWaited % printStatusEveryMins == 0)):
print("Current Status: " + curStatus + " Progress: " + str(summary["LatestExecution"]["Progress"]) + "%")
return curStatus
def getTimeStamp():
ts = time.time()
return datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H-%M-%S')
#Returns False if the response string contains messages about not being logged in
def isLoggedIn(response):
if(response.find("You must be authenticated")>=0):
return False
if(response.find("The token has expired")>=0):
return False
return True
#Determine if a text file contains the target string
def fileContains(fileName, targetStr):
result = False
f = open(fileName, 'r')
for line in f:
if(line.find(targetStr) >= 0):
result = True
break
f.close()
return result
#Helper Function to Format a time delta obj
def strfdelta(tdelta, fmt):
d = {"days": tdelta.days}
d["hours"], rem = divmod(tdelta.seconds, 3600)
d["minutes"], d["seconds"] = divmod(rem, 60)
return fmt.format(**d)
#Exception raised if an api returns an authentication error(not logged in)
class UnauthenticatedException(Exception):
def __init__(self):
Exception.__init__(self,"You are not logged into ASoC")
#Exception raised if an api call that requires a scan id, but is given an invalid scanId
class ScanNotFoundException(Exception):
def __init__(self):
Exception.__init__(self,"Scan Id was not found")
#Exception raised if an api call that requires a app id, but is given an invalid appId
class ApplicationNotException(Exception):
def __init__(self):
Exception.__init__(self,"Application Id could not be found or you do not have permission to access it.")
#Copyright 2018 Cody Travis
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.