-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathapp.py
157 lines (122 loc) · 8.15 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from ast import If
import requests
import json
import os
import time
import sys
from dotenv import load_dotenv
load_dotenv()
ACCOUNT_ID = os.environ.get('PLUGIN_ACCOUNTID')
API_KEY = os.environ.get('PLUGIN_APIKEY')
APPLICATION_NAME = os.environ.get('PLUGIN_APPLICATION')
WORKFLOW_NAME = os.environ.get('PLUGIN_ENTITYNAME')
#BODY = os.environ.get('PLUGIN_BODY')
EXECUTION_TYPE = os.environ.get('PLUGIN_TYPE')
SERVICE_NAME = os.environ.get('PLUGIN_SERVICE_NAME')
BUILD_NUMBER = os.environ.get('PLUGIN_BUILD_NUMBER')
ARTIFACT_SOURCE_NAME = os.environ.get('PLUGIN_ARTIFACT_SOURCE_NAME')
DYNAMIC_VARIABLES_INPUT = os.environ.get('PLUGIN_DYNAMIC_VARIABLES_INPUT') or "false"
WAIT_FOR_EXECUTION = os.environ.get('PLUGIN_WAIT_FOR_EXECUTION') or "false"
WAIT_FOR_EXECUTION_TIMEOUT = int(os.environ.get('PLUGIN_WAIT_FOR_EXECUTION_TIMEOUT')) or 30
RETURN_ERROR_IF_EXECUTION_FAIL = os.environ.get('PLUGIN_RETURN_ERROR_IF_EXECUTION_FAIL') or "false"
EXECUTION_NOTES = os.environ.get('PLUGIN_EXECUTION_NOTES') or "Automated Execution"
global URL
URL = "https://app.harness.io/gateway/api/graphql?accountId=" + ACCOUNT_ID
print("Using Account ID: " + ACCOUNT_ID)
def getAppByName(appName):
pload = "{\"query\":\"{ \\n applicationByName(name: \\\""+appName+"\\\"){ \\n id \\n } \\n}\",\"variables\":{}}"
print("Getting Harness App ID")
response = requests.post(URL, headers={'x-api-key': API_KEY,'Content-Type': 'application/json'}, data=pload)
json_response = response.json()
#print(json_response)
appId = json_response['data']['applicationByName']['id']
print ("appID is: " + appId)
return appId
def getWfByName(AppID, WFName):
print ("Getting Harness Workflow ID by workflow name: " + WFName)
pload = "{\"query\":\"{\\n workflowByName( workflowName: \\\""+WFName+"\\\", applicationId: \\\""+AppID+"\\\") { \\n id \\n } \\n}\",\"variables\":{}}"
response = requests.post(URL, headers={'x-api-key': API_KEY,'Content-Type': 'application/json'}, data=pload)
json_response = response.json()
WFID = json_response["data"]["workflowByName"]["id"]
print("WFID is: " + WFID)
return WFID
def getPLByName(AppID, PLName):
pload = '{ pipelineByName( pipelineName: "' + PLName + '", applicationId: "' + AppID + '") { id } }'
print ("Getting Harness Pipeline ID")
print(pload)
response = requests.post(URL, headers={'x-api-key': API_KEY,'Content-Type': 'text/plain'}, data=pload)
print(response)
json_response = response.json()
PLID = json_response["data"]["pipelineByName"]["id"]
print("PLID is: " + PLID)
return PLID
def execute(appID, wfID):
print("Service Name: " + SERVICE_NAME)
print("Build Number: " + BUILD_NUMBER)
print("Artifact Source Name: " + ARTIFACT_SOURCE_NAME)
print("Execution Type: " + EXECUTION_TYPE)
body="\n variableInputs: [\n {\n name: \"Service\"\n variableValue: {\n type: NAME\n value: \""+SERVICE_NAME+"\"\n }\n },"
if DYNAMIC_VARIABLES_INPUT != "false":
variable_list = DYNAMIC_VARIABLES_INPUT.split(",")
for var in variable_list:
key,value = var.split(":")
body += "\n {\n name: \""+key+"\"\n variableValue: {\n type: NAME\n value: \""+value+"\"\n }\n },"
body += "\n ],\n serviceInputs: [\n {\n name: \""+SERVICE_NAME+"\",\n artifactValueInput: {\n valueType: BUILD_NUMBER,\n buildNumber: {\n buildNumber: \""+BUILD_NUMBER+"\",\n artifactSourceName: \""+ARTIFACT_SOURCE_NAME+"\"\n }\n }\n }\n ]\n"
pload = "mutation {\n startExecution(input: {\n notes: \""+EXECUTION_NOTES+"\",\n applicationId: \""+appID+"\"\n entityId: \""+wfID+"\"\n executionType: "+EXECUTION_TYPE+", "+body+" }\n )\n {\n clientMutationId\n execution{\n id\n status\n }\n }\n }"
print("Payload: " + pload)
retries = 0
while True:
response = requests.post(URL, headers={'x-api-key': API_KEY,'Content-Type': 'text/plain'}, data=pload)
json_response = response.json()
print(json_response)
if not "errors" in json_response:
return json_response
time.sleep(30)
retries += 1
print(f"Retrying ... {retries}")
def status(exec_id):
print("Execution Type: " + EXECUTION_TYPE)
print("Execution ID: " + exec_id)
body = "{\"query\":\"{\\n execution(executionId:\\\""+exec_id+"\\\"){\\n id\\n status\\n ... on PipelineExecution {\\n id\\n status\\n pipelineStageExecutions {\\n pipelineStageElementId\\n pipelineStageName\\n pipelineStepName\\n ... on ApprovalStageExecution {\\n approvalStepType\\n status\\n }\\n ... on WorkflowStageExecution {\\n runtimeInputVariables {\\n allowedValues\\n defaultValue\\n allowMultipleValues\\n fixed\\n name\\n required\\n type\\n }\\n status\\n workflowExecutionId\\n }\\n }\\n }\\n ... on WorkflowExecution {\\n id\\n outcomes{\\n nodes{\\n execution {\\n id\\n endedAt\\n startedAt\\n }\\n }\\n }\\n }\\n }\\n}\\n\",\"variables\":{}}"
#print("Payload: " + body)
response = requests.post(URL, headers={'x-api-key': API_KEY,'Content-Type': 'application/json'}, data=body)
json_response = response.json()
print(json_response)
return json_response
print("Search for Application Name: " + APPLICATION_NAME)
AppID = (getAppByName(APPLICATION_NAME))
if EXECUTION_TYPE == "WORKFLOW":
WfID = getWfByName(AppID, WORKFLOW_NAME)
execution_id = execute(AppID, WfID)
workflow_status = status(execution_id['data']['startExecution']['execution']['id'])['data']['execution']['status']
print("Status:" + workflow_status)
if WAIT_FOR_EXECUTION == "true":
timeout = time.time() + 60*WAIT_FOR_EXECUTION_TIMEOUT # 30 minutes from now
while workflow_status == "RUNNING" or workflow_status == "PAUSED" or workflow_status == "PAUSING" or workflow_status == "QUEUED" or workflow_status == "WAITING":
time.sleep(15)
workflow_status = status(execution_id['data']['startExecution']['execution']['id'])['data']['execution']['status']
if time.time() > timeout:
sys.exit(777)
if RETURN_ERROR_IF_EXECUTION_FAIL == "true" and (workflow_status == "FAILED" or workflow_status == "ABORTED" or workflow_status == "EXPIRED" or workflow_status == "REJECTED" or workflow_status == "ERROR"):
print("Exiting with error...")
sys.exit(-1)
print(workflow_status)
else:
PlID = getPLByName(AppID, WORKFLOW_NAME)
execution_id = execute(AppID, PlID)
pipeline_status = status(execution_id['data']['startExecution']['execution']['id'])['data']['execution']['status']
print("Status:" + pipeline_status)
if WAIT_FOR_EXECUTION == "true":
timeout = time.time() + 60*30 # 30 minutes from now
while pipeline_status == "RUNNING" or pipeline_status == "PAUSED" or pipeline_status == "PAUSING" or pipeline_status == "QUEUED" or pipeline_status == "WAITING":
time.sleep(15)
pipeline_status = status(execution_id['data']['startExecution']['execution']['id'])['data']['execution']['status']
if time.time() > timeout:
sys.exit(777)
if RETURN_ERROR_IF_EXECUTION_FAIL == "true" and (pipeline_status == "FAILED" or pipeline_status == "ABORTED" or pipeline_status == "EXPIRED" or pipeline_status == "REJECTED" or pipeline_status == "ERROR"):
print("Exiting with error...")
sys.exit(-1)
print(pipeline_status)
""" status("DsCaP1eJSJSePbsYK4td2Q")
status("Inrzd03aQ967s141m9XsWQ")
status("bO3xZz_iQcSYruJY8GJqqQ") """