-
Notifications
You must be signed in to change notification settings - Fork 16
/
pipe_helper.py
73 lines (57 loc) · 2.23 KB
/
pipe_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from typing import Any
import win32file, win32pipe
import win32event, pywintypes
def create_pipe_server(pipe_name: str):
return win32pipe.CreateNamedPipe(
pipe_name,
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE,
win32pipe.PIPE_UNLIMITED_INSTANCES,
65536,
65536,
0,
None
)
def create_pipe_client(pipe_name: str):
return win32file.CreateFile(
pipe_name,
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
win32file.FILE_FLAG_OVERLAPPED,
None
)
def pipe_write_async_await(pipe, data):
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, False, None)
win32file.WriteFile(pipe, data, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
win32file.CloseHandle(overlapped.hEvent)
def pipe_read_async_await(pipe, read_buf) -> bytes:
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, False, None)
win32file.ReadFile(pipe, read_buf, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
read_buf_len = win32pipe.GetOverlappedResult(pipe, overlapped, True)
win32file.CloseHandle(overlapped.hEvent)
return bytes(read_buf[:read_buf_len])
def pipe_wait_for_client_async_await(pipe):
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, False, None)
win32pipe.ConnectNamedPipe(pipe, overlapped)
win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
win32file.CloseHandle(overlapped.hEvent)
def pipe_wait_for_client_async_nowait(pipe) -> Any:
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, True, False, None)
win32pipe.ConnectNamedPipe(pipe, overlapped)
return overlapped.hEvent
def wait_and_close_event(event):
win32event.WaitForSingleObject(event, win32event.INFINITE)
win32file.CloseHandle(event)
def close_handle_ignore_error(handle):
try:
win32file.CloseHandle(handle)
except:
pass