1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
import select
import socket
import struct
import time
DRONE_IP = "192.168.1.1"
ARDRONE_NAVDATA_PORT = 5554
ARDRONE_COMMAND_PORT = 5556
'''
Small endless loop to test the robustness of the tcp ip connection (video streaming)
Warning: This test does not stop, it raises an exception when the connection is lost or
if something goes wrong (most likely the drone stops sending video data and
send empty packets on the command port...
'''
def at(command, seq, params):
"""
Parameters:
command -- the command
seq -- the sequence number
params -- a list of elements which can be either int, float or string
"""
param_str = ''
for p in params:
if type(p) == int:
param_str += ",%d" % p
elif type(p) == float:
param_str += ",%d" % f2i(p)
elif type(p) == str:
param_str += ',"' + p + '"'
msg = "AT*%s=%i%s\r" % (command, seq, param_str)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, ("192.168.1.1", ARDRONE_COMMAND_PORT))
def f2i(f):
"""Interpret IEEE-754 floating-point value as signed integer.
Arguments:
f -- floating point value
"""
return struct.unpack('i', struct.pack('f', f))[0]
if __name__ == '__main__':
nav_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
nav_socket.connect((DRONE_IP, ARDRONE_NAVDATA_PORT))
nav_socket.setblocking(0)
nav_socket.send("\x01\x00\x00\x00")
seq = 1
stopping = 1
while stopping < 100:
inputready, outputready, exceptready = select.select([nav_socket], [], [], 1)
seq += 1
at("COMWDG", seq, [])
if len(inputready) == 0:
print "Connection lost for the %d time !" % stopping
nav_socket.send("\x01\x00\x00\x00")
stopping += 1
for i in inputready:
while 1:
try:
data = nav_socket.recv(500)
except IOError:
break
raise Exception("Should not get here")
|