-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpxpowersh.py
66 lines (57 loc) · 2.39 KB
/
pxpowersh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
'''
originally from: https://github.com/MarkBaggett/pxpowershell/blob/main/pxpowershell/pxpowershell.py
code modified by me to output less errors
'''
import pexpect
import time
import re
import sys
import random
from pexpect.popen_spawn import PopenSpawn
class PxPowershell(object):
def __init__(self, *args, debug=False, **kwargs):
self.cmd = "powershell.exe"
self.unique_prompt = f"XYZPEXPECT{random.randint(1,99999999):0>X}XYZ"
self.orig_prompt = ""
self.process = ""
self.debug = debug
def start_process(self):
self.process = pexpect.popen_spawn.PopenSpawn(self.cmd)
if self.debug:
self.process.logfile = sys.stdout.buffer
time.sleep(1)
init_banner = ''
prompt = []
elapsed = time.time()
while not prompt:
if time.time()-elapsed > 60:
raise(Exception('Error: Unable to determine powershell prompt'))
init_banner = self.process.read_nonblocking(4096, 2)
prompt = re.findall(b'PS [A-Z]:', init_banner, re.MULTILINE)
prompt = prompt[0]
self.process.sendline("Get-Content function:\prompt")
self.process.expect(prompt)
#The first 32 characters will be the command we sent in
self.orig_prompt = self.process.before[32:]
if self.debug:
print(f"Original prompt: {self.orig_prompt} -> {self.unique_prompt}")
self.process.sendline('Function prompt{{"{0}"}}'.format(self.unique_prompt))
self.process.expect(self.unique_prompt)
self.clear_buffer()
def clear_buffer(self):
#repeat expecting the prompt until there are no more. One thing that nessesitates this is when extra CRLFs are sent to run().
while self.process.expect([self.unique_prompt, pexpect.EOF, pexpect.TIMEOUT], timeout=1) == 0:
pass
def restore_prompt(self):
self.process.sendline('Function prompt{{"{0}"}}'.format(self.orig_prompt))
self.process.expect(self.orig_prompt)
def run(self,pscommand, timeout=-1):
self.clear_buffer()
self.process.sendline(pscommand)
self.process.expect(self.unique_prompt, timeout=timeout)
#capture result but trim off the command and the CRLF
result = self.process.before[len(pscommand)+2:]
self.clear_buffer()
return result
def stop_process(self):
self.process.kill(9)