import rospy from mavros_msgs.msg import State, PositionTarget from mavros_msgs.srv import CommandBool, SetMode from geometry_msgs.msg import PoseStamped, Pose, Twist from nav_msgs.msg import Odometry from gazebo_msgs.srv import GetModelState from std_msgs.msg import String from pyquaternion import Quaternion import math import sys class Communication: def __init__(self, vehicle_id): self.vehicle_type = 'rover' self.vehicle_id = vehicle_id self.local_pose = None self.target_motion = PositionTarget() self.arm_state = False self.motion_type = 0 self.flight_mode = None self.mission = None self.motion_type = 1 self.coordinate_frame = 1 ''' ros subscribers ''' self.local_pose_sub = rospy.Subscriber(self.vehicle_type+'_'+self.vehicle_id+"/mavros/local_position/pose", PoseStamped, self.local_pose_callback) self.mavros_sub = rospy.Subscriber(self.vehicle_type+'_'+self.vehicle_id+"/mavros/state", State, self.mavros_state_callback) self.cmd_pose_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_flu", Pose, self.cmd_pose_flu_callback) self.cmd_pose_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_pose_enu", Pose, self.cmd_pose_enu_callback) self.cmd_vel_flu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_flu", Twist, self.cmd_vel_flu_callback) self.cmd_vel_enu_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd_vel_enu", Twist, self.cmd_vel_enu_callback) self.cmd_sub = rospy.Subscriber("/xtdrone/"+self.vehicle_type+'_'+self.vehicle_id+"/cmd",String,self.cmd_callback) ''' ros publishers ''' self.target_motion_pub = rospy.Publisher(self.vehicle_type+'_'+self.vehicle_id+"/mavros/setpoint_raw/local", PositionTarget, queue_size=10) self.odom_groundtruth_pub = rospy.Publisher('/xtdrone/'+self.vehicle_type+'_'+self.vehicle_id+'/ground_truth/odom', Odometry, queue_size=10) ''' ros services ''' self.armService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/cmd/arming", CommandBool) self.flightModeService = rospy.ServiceProxy(self.vehicle_type+'_'+self.vehicle_id+"/mavros/set_mode", SetMode) self.gazeboModelstate = rospy.ServiceProxy('gazebo/get_model_state', GetModelState) print(self.vehicle_type+'_'+self.vehicle_id+": "+"communication initialized") def start(self): rospy.init_node(self.vehicle_type+'_'+self.vehicle_id+"_communication") rate = rospy.Rate(100) ''' main ROS thread ''' while not rospy.is_shutdown(): self.target_motion_pub.publish(self.target_motion) try: response = self.gazeboModelstate (self.vehicle_type+'_'+self.vehicle_id,'ground_plane') except rospy.ServiceException, e: print "Gazebo model state service call failed: %s"%e odom = Odometry() odom.header = response.header odom.pose.pose = response.pose odom.twist.twist = response.twist self.odom_groundtruth_pub.publish(odom) if (self.flight_mode is "LAND") and (self.local_pose.pose.position.z < 0.15): if(self.disarm()): self.flight_mode = "DISARMED" rate.sleep() def local_pose_callback(self, msg): self.local_pose = msg def mavros_state_callback(self, msg): self.mavros_state = msg.mode def construct_target(self, x=0, y=0, z=0, vx=0, vy=0, vz=0, yaw=0): target_raw_pose = PositionTarget() target_raw_pose.coordinate_frame = self.coordinate_frame target_raw_pose.position.x = x target_raw_pose.position.y = y target_raw_pose.position.z = z target_raw_pose.velocity.x = vx target_raw_pose.velocity.y = vy target_raw_pose.velocity.z = vz target_raw_pose.yaw = yaw if(self.motion_type == 0): target_raw_pose.type_mask = PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ + PositionTarget.IGNORE_YAW_RATE if(self.motion_type == 1): target_raw_pose.type_mask = PositionTarget.IGNORE_PX + PositionTarget.IGNORE_PY + PositionTarget.IGNORE_PZ \ + PositionTarget.IGNORE_AFX + PositionTarget.IGNORE_AFY + PositionTarget.IGNORE_AFZ \ + PositionTarget.IGNORE_YAW_RATE return target_raw_pose def cmd_pose_flu_callback(self, msg): self.coordinate_frame=9 self.motion_type=0 self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=0) def cmd_pose_enu_callback(self, msg): self.coordinate_frame=1 self.motion_type=0 self.target_motion = self.construct_target(x=msg.position.x,y=msg.position.y,z=msg.position.z,yaw=0) def cmd_vel_flu_callback(self, msg): self.coordinate_frame=8 self.motion_type=1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.angular.z,vz=msg.linear.z,yaw=0) def cmd_vel_enu_callback(self, msg): self.coordinate_frame=1 self.motion_type=1 self.target_motion = self.construct_target(vx=msg.linear.x,vy=msg.angular.z,vz=msg.linear.z,yaw=0) def cmd_callback(self, msg): if msg.data == '': return elif msg.data == 'ARM': self.arm_state =self.arm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data == 'DISARM': self.arm_state =not self.disarm() print(self.vehicle_type+'_'+self.vehicle_id+": Armed "+str(self.arm_state)) elif msg.data[:-1] == "mission" and not msg.data == self.mission: self.mission = msg.data print(self.vehicle_type+'_'+self.vehicle_id+": "+msg.data) else: self.flight_mode = msg.data self.flight_mode_switch() def arm(self): if self.armService(True): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": arming failed!") return False def disarm(self): if self.armService(False): return True else: print(self.vehicle_type+'_'+self.vehicle_id+": disarming failed!") return False def flight_mode_switch(self): if self.flightModeService(custom_mode=self.flight_mode): print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode) return True else: print(self.vehicle_type+'_'+self.vehicle_id+": "+self.flight_mode+"failed") return False if __name__ == '__main__': communication = Communication(sys.argv[1]) communication.start()