XTDrone/sensing/lane_detection/lane_detection_ugv.py

128 lines
3.9 KiB
Python

import numpy as np
import cv2
import math
import rospy
from sensor_msgs.msg import Image
from std_msgs.msg import Int16
import matplotlib.pyplot as plt
from cv_bridge import CvBridge
import sys
bridge = CvBridge()
def region_of_interest(img, vertices):
mask = np.zeros_like(img)
match_mask_color = 255
cv2.fillPoly(mask, vertices, match_mask_color)
masked_image = cv2.bitwise_and(img, mask)
return masked_image
def draw_lines(img, lines, color=[255, 0, 0], thickness=3):
line_img = np.zeros(
(
img.shape[0],
img.shape[1],
3
),
dtype=np.uint8
)
img = np.copy(img)
if lines is None:
return
for line in lines:
for x1, y1, x2, y2 in line:
cv2.line(line_img, (x1, y1), (x2, y2), color, thickness)
img = cv2.addWeighted(img, 0.8, line_img, 1.0, 0.0)
return img
def yellow_dectection(image):
# create hsv
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
lower = np.uint8([ 30, 150, 101])
upper = np.uint8([ 100, 255, 255])
yellow_mask = cv2.inRange(hsv, lower, upper)
result = cv2.bitwise_and(image, image,mask=yellow_mask)
return result
def pipeline(image):
image = yellow_dectection(image)
height = image.shape[0]
width = image.shape[1]
gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
cannyed_image = cv2.Canny(gray_image, 100, 200)
lines = cv2.HoughLinesP(
cannyed_image,
rho=6,
theta=np.pi / 60,
threshold=200,
lines=np.array([]),
minLineLength=50,
maxLineGap=50
)
if lines is None:
img_ros = bridge.cv2_to_imgmsg(image, "rgb8")
return img_ros, 0
left_line_x = []
left_line_y = []
right_line_x = []
right_line_y = []
for line in lines:
for x1, y1, x2, y2 in line:
slope = float(y2 - y1) / float(x2 - x1)
if slope <= 0:
left_line_x.extend([x1, x2])
left_line_y.extend([y1, y2])
else:
right_line_x.extend([x1, x2])
right_line_y.extend([y1, y2])
min_y = int(image.shape[0]* 1 / 2)
max_y = int(image.shape[0]* 3 / 5)
if not left_line_x == []:
poly_left = np.poly1d(np.polyfit(left_line_y,left_line_x,deg=1))
left_x_start = int(poly_left(max_y))
left_x_end = int(poly_left(min_y))
if not right_line_x == []:
poly_right = np.poly1d(np.polyfit(right_line_y,right_line_x,deg=1))
right_x_start = int(poly_right(max_y))
right_x_end = int(poly_right(min_y))
if not left_line_x == [] and not right_line_x == []:
mid_x_error = (left_x_end+right_x_end)/2.0-width/2
line_image = draw_lines(image,[[[left_x_start, max_y, left_x_end, min_y],[right_x_start, max_y, right_x_end,min_y],]],thickness=5,)
img_ros = bridge.cv2_to_imgmsg(line_image, "rgb8")
else:
mid_x_error = 1000
img_ros = bridge.cv2_to_imgmsg(image, "rgb8")
return img_ros, mid_x_error
def img_callback(msg):
global img_processed, lane_mid_error
cv_image = bridge.imgmsg_to_cv2(msg, "rgb8")
img_processed, lane_mid_error = pipeline(cv_image)
img_processed.header.stamp = rospy.Time.now()
img_processed_pub.publish(img_processed)
if __name__ == "__main__":
ugv_num = sys.argv[1]
rospy.init_node("lane_detection_"+ugv_num)
img_sub = rospy.Subscriber("/ugv_"+ugv_num+"/triclops/triclops/left/image", Image, callback=img_callback)
img_processed_pub = rospy.Publisher("/ugv_"+ugv_num+"/image_lane", Image,queue_size=2)
lane_mid_error_pub = rospy.Publisher("/ugv_"+ugv_num+"/lane_mid_error",Int16,queue_size=2)
img_processed = Image()
lane_mid_error = Int16()
rate = rospy.Rate(50)
while not rospy.is_shutdown():
lane_mid_error_pub.publish(lane_mid_error)
rate.sleep()