-
Notifications
You must be signed in to change notification settings - Fork 3
/
ddsping.py
executable file
·108 lines (85 loc) · 3.39 KB
/
ddsping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python
##############################################################################
# Copyright (c) 2005-2017 Real-Time Innovations, Inc.
# All rights reserved. RTI grants Licensee a license to use, modify, compile,
# and create derivative works of the software for their internal use. The software
# is provided "as is", with no warranty of any type, including any warranty for
# fitness for any purpose. RTI is under no obligation to maintain or
# support the software. RTI shall not be liable for any incidental
# or consequential damages arising out of the use or inability to
# use the software.
##############################################################################
"""Application to check if there are issues with the DDS traffic."""
from __future__ import print_function
from time import sleep
from argparse import ArgumentParser
from signal import signal, SIGINT
from sys import exit as sysExit
from os import environ
import rticonnextdds_connector as rti
options = {}
def read_arguments():
"""Parse the command-line arguments."""
parser = ArgumentParser(description="Check if DDS traffic works")
parser.add_argument("-w", "--writer", action='store_true',
help="send ping messages")
parser.add_argument("-r", "--reader", action='store_true',
help="read ping messages")
parser.add_argument("-p", "--peer", action='append',
help="add a discovery peer. " +
"Default list: \"239.255.0.1, 127.0.0.1\"")
return parser
def writer():
"""A writer is going to start sending samples."""
print("Running writer...")
connector = rti.Connector("MyParticipantLibrary::Zero",
"/usr/local/bin/PingConfiguration.xml")
outputDDS = connector.getOutput("MyPublisher::PingWriter")
i = 0
while True:
print("Sending ping message number " + str(i))
outputDDS.instance.setNumber("number", i)
i += 1
outputDDS.write()
sleep(1)
def reader():
"""A reader is going to start receiving samples."""
print("Running reader...")
connector = rti.Connector("MyParticipantLibrary::Zero",
"/bin/PingConfiguration.xml")
inputDDS = connector.getInput("MySubscriber::PingReader")
timeout = 2000 # 2 seconds
while True:
ret = connector.wait(timeout)
if ret == 0:
inputDDS.take()
numOfSamples = inputDDS.samples.getLength()
for j in range(1, numOfSamples+1):
if inputDDS.infos.isValid(j):
pingID = int(inputDDS.samples.getNumber(j, "number"))
# Print the sample
print("Received ping message number " + str(pingID))
else:
print("No ping was received")
def stop(s, frame):
"""Handler for the Ctrl + C signal."""
print("Stopping...")
sysExit(0)
def main():
"""Main application entry."""
args = read_arguments()
parsedArgs = args.parse_args()
signal(SIGINT, stop)
if parsedArgs.peer:
options["peers"] = ",".join(parsedArgs.peer)
environ["NDDS_DISCOVERY_PEERS"] = options["peers"]
else:
options["peers"] = "239.255.0.1,127.0.0.1"
if parsedArgs.writer:
writer()
elif parsedArgs.reader:
reader()
else:
args.print_help()
if __name__ == "__main__":
main()