Source code for pyparrot.DroneVisionGUI

"""
DroneVisionGUI is a new class that parallels DroneVision but with several important changes.

1) This module uses VLC instead of FFMPEG
2) This module opens a GUI window to show you the video in real-time (you could
watch it in real-time previously through the VisionServer)
3) Because GUI windows are different on different OS's (and in particular OS X behaves differently
than linux and windows) and because they want to run in the main program thread, the way your program runs
is different.  You first open the GUI and then you have the GUI spawn a thread to run your program.
4) This module can use a virtual disk in memory to save the images, thus shortening the time delay for the
camera for your programs.

Author: Amy McGovern, dramymcgovern@gmail.com
Some of the LIBVLC code comes from
Author: Valentin Benke, valentin.benke@aon.at
"""
import inspect
import sys
import tempfile
import time
from functools import partial
from os.path import join

import cv2
import pyparrot.utils.vlc as vlc
from pyparrot.Model import Model
from PyQt5.QtCore import Qt, QThread, QTimer
from PyQt5.QtGui import QColor, QPalette, QPixmap
from PyQt5.QtWidgets import (QAction, QApplication, QFileDialog, QFrame,
                             QHBoxLayout, QLabel, QMainWindow, QPushButton,
                             QSlider, QVBoxLayout, QWidget)


class Player(QMainWindow):
    """
    Modification of the simple Media Player using VLC and Qt
    to show the mambo stream

    The window part of this example was modified from the QT example cited below.
    VLC requires windows to create and show the video and this was a cross-platform solution.
    VLC will automatically create the windows in linux but not on the mac.
    Amy McGovern, dramymcgovern@gmail.com

    Qt example for VLC Python bindings
    https://github.com/devos50/vlc-pyqt5-example
    Copyright (C) 2009-2010 the VideoLAN team

    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 2 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
    """
    def __init__(self, vlc_player, drone_gui):
        """
        Create a UI window for the VLC player
        :param vlc_player: the VLC player (created outside the function)
        """
        QMainWindow.__init__(self)
        self.setWindowTitle("VLC Drone Video Player")

        # save the media player
        self.mediaplayer = vlc_player

        # need a reference to the main drone vision class
        self.drone_vision = drone_gui

        # create the GUI
        self.createUI()

    def createUI(self):
        """
        Set up the window for the VLC viewer
        """
        self.widget = QWidget(self)
        self.setCentralWidget(self.widget)

        # In this widget, the video will be drawn
        if sys.platform == "darwin": # for MacOS
            from PyQt5.QtWidgets import QMacCocoaViewContainer
            self.videoframe = QMacCocoaViewContainer(0)
        else:
            self.videoframe = QFrame()
        self.palette = self.videoframe.palette()
        self.palette.setColor (QPalette.Window,
                               QColor(0,0,0))
        self.videoframe.setPalette(self.palette)
        self.videoframe.setAutoFillBackground(True)

        self.hbuttonbox = QHBoxLayout()
        self.playbutton = QPushButton("Run my program")
        self.hbuttonbox.addWidget(self.playbutton)
        self.playbutton.clicked.connect(partial(self.drone_vision.run_user_code, self.playbutton))

        self.landbutton = QPushButton("Land NOW")
        self.hbuttonbox.addWidget(self.landbutton)
        self.landbutton.clicked.connect(self.drone_vision.land)

        self.stopbutton = QPushButton("Quit")
        self.hbuttonbox.addWidget(self.stopbutton)
        self.stopbutton.clicked.connect(self.drone_vision.close_exit)

        self.vboxlayout = QVBoxLayout()
        self.vboxlayout.addWidget(self.videoframe)

        if (self.drone_vision.user_draw_window_fn is not None):
            self.userWindow = QLabel()
            fullPath = inspect.getfile(DroneVisionGUI)
            shortPathIndex = fullPath.rfind("/")
            if (shortPathIndex == -1):
                # handle Windows paths
                shortPathIndex = fullPath.rfind("\\")
            print(shortPathIndex)
            shortPath = fullPath[0:shortPathIndex]
            pixmap = QPixmap('%s/demo_user_image.png' % shortPath)
            print(pixmap)
            print(pixmap.isNull())
            self.userWindow.setPixmap(pixmap)
            self.vboxlayout.addWidget(self.userWindow)

        self.vboxlayout.addLayout(self.hbuttonbox)

        self.widget.setLayout(self.vboxlayout)

        # the media player has to be 'connected' to the QFrame
        # (otherwise a video would be displayed in it's own window)
        # this is platform specific!
        # you have to give the id of the QFrame (or similar object) to
        # vlc, different platforms have different functions for this
        if sys.platform.startswith('linux'): # for Linux using the X Server
            self.mediaplayer.set_xwindow(self.videoframe.winId())
        elif sys.platform == "win32": # for Windows
            self.mediaplayer.set_hwnd(self.videoframe.winId())
        elif sys.platform == "darwin": # for MacOS
            self.mediaplayer.set_nsobject(int(self.videoframe.winId()))


class UserVisionProcessingThread(QThread):

    def __init__(self, user_vision_function, user_args, drone_vision):
        """
        :param user_vision_function: user callback function to handle vision
        :param user_args: optional arguments to the user callback function
        """
        QThread.__init__(self)
        self.user_vision_function = user_vision_function
        self.user_args = user_args
        self.drone_vision = drone_vision

    def __del__(self):
        self.wait()

    def run(self):
        print("user callback being called")
        while (self.drone_vision.vision_running):
            self.user_vision_function(self.user_args)

            # put the thread back to sleep for fps
            # sleeping shorter to ensure we stay caught up on frames
            time.sleep(1.0 / (3.0 * self.drone_vision.fps))

        # exit when the vision thread ends
        print("exiting user vision thread")
        self.terminate()

class UserWindowDrawThread(QThread):

    def __init__(self, user_draw_function, drone_vision):
        """
        :param user_draw_function: user drawing function that should return an image
        """
        QThread.__init__(self)
        self.user_draw_function = user_draw_function
        self.drone_vision = drone_vision

    def __del__(self):
        self.wait()

    def run(self):
        #print("user window draw thread being called")
        while (self.drone_vision.vision_running):
            img = self.user_draw_function()
            if(img is not None):
                if (not img.isNull()):
                    self.drone_vision.vlc_gui.userWindow.setPixmap(QPixmap.fromImage(img))

            # put the thread back to sleep for fps
            # sleeping shorter to ensure we stay caught up on frames
            time.sleep(1.0 / (3.0 * self.drone_vision.fps))

        # exit when the vision thread ends
        print("exiting user window draw thread")
        self.terminate()

class UserCodeToRun(QThread):
    def __init__(self, user_function, user_args, drone_vision):
        """
        :param user_function: user code to run (presumably flies the drone)
        :param user_args: optional arguments to the user function
        """
        QThread.__init__(self)
        self.user_vision_function = user_function
        self.user_args = user_args
        self.drone_vision = drone_vision

    def __del__(self):
        self.wait()

    def run(self):
        self.user_vision_function(self.drone_vision, self.user_args)


[docs]class DroneVisionGUI: def __init__(self, drone_object, model, user_code_to_run, user_args, buffer_size=200, network_caching=200, fps=20, user_draw_window_fn=None): """ Setup your vision object and initialize your buffers. You won't start seeing pictures until you call open_video. :param drone_object reference to the drone (mambo or bebop) object :param is_bebop: True if it is a bebop and false if it is a mambo :param user_code_to_run: user code to run with the run button (remember this is needed due to the GUI taking the thread) :param user_args: arguments to the user code :param buffer_size: number of frames to buffer in memory. Defaults to 10. :param network_caching: buffering time in milli-seconds, 200 should be enough, 150 works on some devices (Mac OS X ignores this argument) :param fps: frame rate for the vision :param user_window: set to a function to be called to draw a QImage and None otherwise (default None) """ self.fps = fps self.vision_interval = int(1000 * 1.0 / self.fps) self.buffer_size = buffer_size self.drone_object = drone_object self.model = model # initialize a buffer (will contain the last buffer_size vision objects) self.buffer = [None] * buffer_size self.buffer_size = buffer_size self.buffer_index = 0 # vision threading is done from a QTimer instead of a separate thread self.new_frame = False self.vision_running = True # the vision thread starts opencv on these files. That will happen inside the other thread # so here we just sent the image index to 1 ( to start) self.image_index = 1 # save the caching parameters and choice of libvlc self.network_caching = network_caching # save the user function and args for calling from the run button self.user_code_to_run = user_code_to_run self.user_args = user_args self.user_thread = UserCodeToRun(user_code_to_run, user_args, self) # if we are drawing a special user window self.user_draw_window_fn = user_draw_window_fn if (self.user_draw_window_fn is not None): self.user_window_draw_thread = UserWindowDrawThread(self.user_draw_window_fn, self) else: self.user_window_draw_thread = None # in case we never setup a user callback function self.user_vision_thread = None # has the land button been clicked - saved in case the user needs it in their code self.land_button_clicked = False
[docs] def run_user_code(self, button): """ Start the thread to run the user code :return: """ button.setEnabled(False) self.user_thread.start()
[docs] def set_user_callback_function(self, user_callback_function=None, user_callback_args=None): """ Set the (optional) user callback function for handling the new vision frames. This is run in a separate thread that starts when you start the vision buffering :param user_callback_function: function :param user_callback_args: arguments to the function :return: """ self.user_vision_thread = UserVisionProcessingThread(user_callback_function, user_callback_args, self)
[docs] def open_video(self): """ Open the video stream using vlc. Note that this version is blocking meaning this function will NEVER return. If you want to run your own code and not just watch the video, be sure you set your user code in the constructor! Remember that this will only work if you have connected to the wifi for your mambo! :return never returns due to QT running in the main loop by requirement """ # start the stream on the bebop if self.model is Model.BEBOP: self.drone_object.start_video_stream() # we have bypassed the old opencv VideoCapture method because it was unreliable for rtsp # get the path for the config files fullPath = inspect.getfile(DroneVisionGUI) shortPathIndex = fullPath.rfind("/") if (shortPathIndex == -1): # handle Windows paths shortPathIndex = fullPath.rfind("\\") print(shortPathIndex) shortPath = fullPath[0:shortPathIndex] self.imagePath = join(shortPath, "images") self.utilPath = join(shortPath, "utils") print(self.imagePath) print(self.utilPath) if self.model is Model.BEBOP: # generate the streaming-address for the Bebop self.utilPath = join(shortPath, "utils") self.stream_addr = "%s/bebop.sdp" % self.utilPath elif self.model is Model.MAMBO: # generate the streaming-address for the Mambo self.stream_addr = "rtsp://192.168.99.1/media/stream2" elif self.model is Model.ANAFI: self.stream_addr = "rtsp://192.168.42.1/live" # initialise the vlc-player with the network-caching self.player = vlc.MediaPlayer(self.stream_addr, ":network-caching=" + str(self.network_caching)) # start the buffering success = self._start_video_buffering()
def _start_video_buffering(self): """ If the video capture was successfully opened, then start the thread to buffer the stream :return: if using libvlc this will return whether or not the player started """ # open/draw the GUI app = QApplication(sys.argv) self.vlc_gui = Player(vlc_player=self.player, drone_gui=self) self.vlc_gui.show() self.vlc_gui.resize(640, 480) # ensure that closing the window closes vision app.aboutToQuit.connect(self.land_close_exit) if (self.user_vision_thread is not None): print("Starting user vision thread") self.user_vision_thread.start() if (self.user_draw_window_fn is not None): print("Starting user drawing thread") self.user_window_draw_thread.start() # setup the timer for snapshots self.timer = QTimer(self.vlc_gui) self.timer.setInterval(self.vision_interval) self.timer.timeout.connect(self._buffer_vision) self.timer.start() # show the stream success = self.player.play() print("success from play call is %s " % success) # start the GUI loop app.exec() def _buffer_vision(self): """ Internal method to save valid video captures from the camera fps times a second :return: """ # start with no new data self.new_frame = False # run forever, trying to grab the latest image if (self.vision_running): # generate a temporary file, gets deleted after usage automatically #self.file = tempfile.NamedTemporaryFile(dir=self.imagePath) self.file = join(self.imagePath, "visionStream.jpg") #self.file = tempfile.SpooledTemporaryFile(max_size=32768) # save the current picture from the stream self.player.video_take_snapshot(0, self.file, 0, 0) # read the picture into opencv img = cv2.imread(self.file) # sometimes cv2 returns a None object so skip putting those in the array if (img is not None): # got a new image, save it to the buffer directly self.buffer_index += 1 self.buffer_index %= self.buffer_size #print video_frame self.buffer[self.buffer_index] = img self.new_frame = True
[docs] def get_latest_valid_picture(self): """ Return the latest valid image (from the buffer) :return: last valid image received from the Mambo """ return self.buffer[self.buffer_index]
[docs] def close_exit(self): """ Land, close the video, and exit the GUI :return: """ self.close_video() self.vlc_gui.close() self.vlc_gui.destroy() # kill the threads if (self.user_window_draw_thread is not None): self.user_window_draw_thread.quit() self.user_vision_thread.quit() self.user_thread.quit() # this is hanging on Mac OS X when it tries to exit and I'm not sure why. The threads are properly # exiting sys.exit()
[docs] def land_close_exit(self): """ Called if you Quit the GUI: lands the drone, stops vision, and exits the GUI :return: """ self.land() self.close_exit()
[docs] def land(self): """ Send the land command over the emergency channel when the user pushes the button :return: """ # tell the user that the land button was clicked self.land_button_clicked = True # land the drone if self.model is Model.BEBOP: if (not self.drone_object.is_landed()): self.drone_object.emergency_land() else: if (not self.drone_object.is_landed()): self.drone_object.safe_land(5)
[docs] def close_video(self): """ Stop the vision processing and all its helper threads """ # the helper threads look for this variable to be true self.vision_running = False self.player.stop() # send the command to kill the vision stream (bebop only) if self.model is Model.BEBOP: self.drone_object.stop_video_stream()