forked from xtdrone/XTDrone
100 lines
4.1 KiB
Python
100 lines
4.1 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: UTF-8 -*-
|
|
import rospy
|
|
from geometry_msgs.msg import Twist, Vector3, PoseStamped
|
|
from std_msgs.msg import String
|
|
from pyquaternion import Quaternion
|
|
import time
|
|
import math
|
|
import numpy
|
|
import sys
|
|
if sys.argv[2] == '6':
|
|
from formation_dict import formation_dict_6 as formation_dict
|
|
elif sys.argv[2] == '9':
|
|
from formation_dict import formation_dict_9 as formation_dict
|
|
elif sys.argv[2] == '18':
|
|
from formation_dict import formation_dict_18 as formation_dict
|
|
|
|
class Leader:
|
|
|
|
def __init__(self, uav_type, leader_id, uav_num):
|
|
self.hover = True
|
|
self.id = leader_id
|
|
self.local_pose = PoseStamped()
|
|
self.cmd_vel_enu = Twist()
|
|
self.follower_num = uav_num - 1
|
|
self.followers_info = ["Moving"]*self.follower_num
|
|
self.follower_arrived_num = 0
|
|
self.follower_all_arrived = True
|
|
self.avoid_vel = Vector3(0,0,0)
|
|
self.formation_config = 'waiting'
|
|
self.target_height_recorded = False
|
|
self.cmd = String()
|
|
self.f = 200
|
|
self.Kz = 0.5
|
|
self.local_pose_sub = rospy.Subscriber(uav_type+'_'+str(self.id)+"/mavros/local_position/pose", PoseStamped , self.local_pose_callback)
|
|
self.cmd_vel_sub = rospy.Subscriber("/xtdrone/leader/cmd_vel_flu", Twist, self.cmd_vel_callback)
|
|
self.avoid_vel_sub = rospy.Subscriber("/xtdrone/"+uav_type+'_'+str(self.id)+"/avoid_vel", Vector3, self.avoid_vel_callback)
|
|
self.leader_cmd_sub = rospy.Subscriber("/xtdrone/leader/cmd",String, self.cmd_callback)
|
|
|
|
for i in range(self.follower_num):
|
|
rospy.Subscriber('/xtdrone/'+uav_type+'_'+str(i+1)+'/info',String,self.followers_info_callback,i)
|
|
self.local_pose_pub = rospy.Publisher("/xtdrone/leader/pose", PoseStamped , queue_size=10)
|
|
self.formation_switch_pub = rospy.Publisher("/xtdrone/formation_switch",String, queue_size=10)
|
|
self.vel_enu_pub = rospy.Publisher('/xtdrone/'+uav_type+'_'+str(self.id)+'/cmd_vel_enu', Twist, queue_size=10)
|
|
self.cmd_pub = rospy.Publisher('/xtdrone/'+uav_type+'_'+str(self.id)+'/cmd', String, queue_size=10)
|
|
|
|
def local_pose_callback(self, msg):
|
|
self.local_pose = msg
|
|
|
|
def cmd_vel_callback(self, msg):
|
|
self.cmd_vel_enu = msg
|
|
if msg.linear.z == 0:
|
|
self.hover = True
|
|
else:
|
|
self.hover = False
|
|
|
|
def cmd_callback(self, msg):
|
|
if msg.data in formation_dict.keys():
|
|
self.formation_config = msg.data
|
|
else:
|
|
self.cmd = msg.data
|
|
|
|
def avoid_vel_callback(self, msg):
|
|
self.avoid_vel = msg
|
|
|
|
def followers_info_callback(self, msg, id):
|
|
self.followers_info[id] = msg.data
|
|
#print("follower"+str(id)+":"+ msg.data)
|
|
|
|
def loop(self):
|
|
rospy.init_node('leader')
|
|
rate = rospy.Rate(self.f)
|
|
while True:
|
|
#self.cmd_vel_enu = Twist()
|
|
for follower_info in self.followers_info:
|
|
if follower_info == "Arrived":
|
|
self.follower_arrived_num += 1
|
|
if self.follower_arrived_num > self.follower_num - 1:
|
|
self.follower_all_arrived = True
|
|
if self.follower_all_arrived:
|
|
self.formation_switch_pub.publish(self.formation_config)
|
|
if self.formation_config == 'pyramid':
|
|
if not self.target_height_recorded:
|
|
target_height = self.local_pose.pose.position.z + 2
|
|
self.target_height_recorded = True
|
|
self.cmd_vel_enu.linear.x = 0
|
|
self.cmd_vel_enu.linear.y = 0
|
|
self.cmd_vel_enu.linear.z = self.Kz * (target_height - self.local_pose.pose.position.z)
|
|
# self.cmd_vel_enu.linear.x = self.avoid_vel.x
|
|
# self.cmd_vel_enu.linear.y = self.avoid_vel.y
|
|
# self.cmd_vel_enu.linear.z = self.avoid_vel.z
|
|
self.vel_enu_pub.publish(self.cmd_vel_enu)
|
|
self.local_pose_pub.publish(self.local_pose)
|
|
self.cmd_pub.publish(self.cmd)
|
|
rate.sleep()
|
|
|
|
if __name__ == '__main__':
|
|
leader = Leader(sys.argv[1], 0, int(sys.argv[2]))
|
|
leader.loop()
|