From 5772af4eed51e45de06b1329ed3be3158c4c2ccf Mon Sep 17 00:00:00 2001 From: Florian Jung Date: Wed, 7 Jan 2015 17:51:55 +0100 Subject: server2 --- libardrone/test_losing_connection.py | 67 ++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 libardrone/test_losing_connection.py (limited to 'libardrone/test_losing_connection.py') diff --git a/libardrone/test_losing_connection.py b/libardrone/test_losing_connection.py new file mode 100644 index 0000000..0765d76 --- /dev/null +++ b/libardrone/test_losing_connection.py @@ -0,0 +1,67 @@ +import select +import socket +import struct +import time + +DRONE_IP = "192.168.1.1" +ARDRONE_NAVDATA_PORT = 5554 +ARDRONE_COMMAND_PORT = 5556 + +''' +Small endless loop to test the robustness of the tcp ip connection (video streaming) +Warning: This test does not stop, it raises an exception when the connection is lost or +if something goes wrong (most likely the drone stops sending video data and +send empty packets on the command port... +''' + +def at(command, seq, params): + """ + Parameters: + command -- the command + seq -- the sequence number + params -- a list of elements which can be either int, float or string + """ + param_str = '' + for p in params: + if type(p) == int: + param_str += ",%d" % p + elif type(p) == float: + param_str += ",%d" % f2i(p) + elif type(p) == str: + param_str += ',"' + p + '"' + msg = "AT*%s=%i%s\r" % (command, seq, param_str) + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(msg, ("192.168.1.1", ARDRONE_COMMAND_PORT)) + +def f2i(f): + """Interpret IEEE-754 floating-point value as signed integer. + Arguments: + f -- floating point value + """ + return struct.unpack('i', struct.pack('f', f))[0] + + +if __name__ == '__main__': + nav_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + nav_socket.connect((DRONE_IP, ARDRONE_NAVDATA_PORT)) + nav_socket.setblocking(0) + nav_socket.send("\x01\x00\x00\x00") + + seq = 1 + stopping = 1 + while stopping < 100: + inputready, outputready, exceptready = select.select([nav_socket], [], [], 1) + seq += 1 + at("COMWDG", seq, []) + if len(inputready) == 0: + print "Connection lost for the %d time !" % stopping + nav_socket.send("\x01\x00\x00\x00") + stopping += 1 + for i in inputready: + while 1: + try: + data = nav_socket.recv(500) + except IOError: + break + + raise Exception("Should not get here") -- cgit v1.2.3