Computer Vision as Motion Sensor for SmartThings

Using a Raspberry Pi 3 and a PiCam this computer vision powered sensor detects faces and sends presence data over LAN – UPNP to SmartThings.

I will start by assuming that you have a Raspberry Pi 3 with a working camera and Open CV installed on it. If you don't I recommend this tutorial 🙂

Creating a Custom SmartThings  Device Handler

In the SmartThings IDE, we create a new device handler for our Computer Vision Motion sensor.

Go to the “My Device Handler” section, click “+ Create New Device Handler” on the top right corner.

https://youtu.be/q3E2AheLo7M

In this case we will create it from code. Click on the second tab “From Code”, paste the attached code “Device Handler” and click the “Create”.

In the next page click “Publish” to make the device available to you.

Writing the SmartThings App

Similar to the Device Handler we will go to the “My SmartApps” section, click “+ Create New SmartApps” on the top right corner.

We will also create it from code. Click on the second tab “From Code”, paste the attached code “SmartApp” and click “Create”.

In the next page click “Publish”.

Getting the Raspberry Pi Ready 

Now we have to add the python script that will get images from the camera, detecting faces and reporting to SmartThings. 

First download and install imutils and twisted

If you don’t already have imutils package installed, you’ll want to grab that from GitHub or install it via:

pip install imutils

For twisted:

sudo apt-get install python-twisted-web

Now that everything is ready go to /home/pi and create a directory to store the script

mkdir camera
cd camera

Create the script file:

sudo nano ssdpcamera.py

Paste the attached code Camera Python Script” and save by pressing “control + x” and then “y” and enter.

Test the script by typing python ssdpcamera.py you should see something like this:

Discovering and Pairing the Raspberry Pi

From the SmartThings mobile app we should go to Marketplace in the bottom right corner, click on the “SmartApps” tab and finally look in “+ My Apps” for “Computer Vision (Connect)”

Make sure that the Raspberry Pi is on and the python script is running.

The SmartApp will start the discovery process and once found, click on the selection dialog and select the device and click “Done”.

This will create the device in your account and start receiving updates. 

Auto Start

Finally if you want to run the python script automatically when you turn on the Raspberry Pi you can edit /etc/rc.local and add the following line.

(sleep 10;python /home/pi/camera/ssdpcamera.py)&

The () make both commands run in the background.

Code

#!/usr/bin/python2.7

""" Computer Vision Camera for SmartThings

Copyright 2016 Juan Pablo Risso <[email protected]>

Dependencies: python-twisted, cv2, pyimagesearch

Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at:

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import argparse
import logging
import cv2
import urllib2
import imutils
from time import time
from picamera.array import PiRGBArray
from picamera import PiCamera
from twisted.web import server, resource
from twisted.internet import reactor
from twisted.internet.defer import succeed
from twisted.internet.protocol import DatagramProtocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from twisted.web._newclient import ResponseFailed
from zope.interface import implements

SSDP_PORT = 1900
SSDP_ADDR = '239.255.255.250'
UUID = 'd1c58eb4-9220-11e4-96fa-123b93f75cba'
SEARCH_RESPONSE = 'HTTP/1.1 200 OK\r\nCACHE-CONTROL:max-age=30\r\nEXT:\r\nLOCATION:%s\r\nSERVER:Linux, UPnP/1.0, Pi_Garage/1.0\r\nST:%s\r\nUSN:uuid:%s::%s'

# initialize the camera and grab a reference to the raw camera
# capture

camera = PiCamera()
camera.resolution = (640, 480)
camera.framerate = 32
rawCapture = PiRGBArray(camera, size=(640, 480))
auxcount = 0

# construct the face detector and allow the camera to warm up
fd = FaceDetector("cascades/haarcascade_frontalface_default.xml")
time.sleep(0.1)

def determine_ip_for_host(host):
    """Determine local IP address used to communicate with a particular host"""
    test_sock = DatagramProtocol()
    test_sock_listener = reactor.listenUDP(0, test_sock) # pylint: disable=no-member
    test_sock.transport.connect(host, 1900)
    my_ip = test_sock.transport.getHost().host
    test_sock_listener.stopListening()
    return my_ip

class StringProducer(object):
    """Writes an in-memory string to a Twisted request"""
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer): # pylint: disable=invalid-name
        """Start producing supplied string to the specified consumer"""
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self): # pylint: disable=invalid-name
        """Pause producing - no op"""
        pass

    def stopProducing(self): # pylint: disable=invalid-name
        """ Stop producing - no op"""
        pass

class SSDPServer(DatagramProtocol):
    """Receive and response to M-SEARCH discovery requests from SmartThings hub"""

    def __init__(self, interface='', status_port=0, device_target=''):
        self.interface = interface
        self.device_target = device_target
        self.status_port = status_port
        self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) # pylint: disable=no-member
        self.port.joinGroup(SSDP_ADDR, interface=interface)
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop) # pylint: disable=no-member

    def datagramReceived(self, data, (host, port)):
        try:
            header, _ = data.split(b'\r\n\r\n')[:2]
        except ValueError:
            return
        lines = header.split('\r\n')
        cmd = lines.pop(0).split(' ')
        lines = [x.replace(': ', ':', 1) for x in lines]
        lines = [x for x in lines if len(x) > 0]
        headers = [x.split(':', 1) for x in lines]
        headers = dict([(x[0].lower(), x[1]) for x in headers])

        logging.debug('SSDP command %s %s - from %s:%d with headers %s', cmd[0], cmd[1], host, port, headers)

        search_target = ''
        if 'st' in headers:
            search_target = headers['st']

        if cmd[0] == 'M-SEARCH' and cmd[1] == '*' and search_target in self.device_target:
            logging.info('Received %s %s for %s from %s:%d', cmd[0], cmd[1], search_target, host, port)
            url = 'http://%s:%d/status' % (determine_ip_for_host(host), self.status_port)
            response = SEARCH_RESPONSE % (url, search_target, UUID, self.device_target)
            self.port.write(response, (host, port))
        else:
            logging.debug('Ignored SSDP command %s %s', cmd[0], cmd[1])

    def stop(self):
        """Leave multicast group and stop listening"""
        self.port.leaveGroup(SSDP_ADDR, interface=self.interface)
        self.port.stopListening()

class StatusServer(resource.Resource):
    """HTTP server that serves the status of the camera to the
       SmartThings hub"""
    isLeaf = True
    def __init__(self, device_target, subscription_list, garage_door_status):
        self.device_target = device_target
        self.subscription_list = subscription_list
        self.garage_door_status = garage_door_status
        resource.Resource.__init__(self)

    def render_SUBSCRIBE(self, request): # pylint: disable=invalid-name
        """Handle subscribe requests from ST hub - hub wants to be notified of
           garage door status updates"""
        headers = request.getAllHeaders()
        logging.debug("SUBSCRIBE: %s", headers)
        if 'callback' in headers:
            cb_url = headers['callback'][1:-1]

            if not cb_url in self.subscription_list:
                self.subscription_list[cb_url] = {}
                #reactor.stop()
                logging.info('Added subscription %s', cb_url)
            self.subscription_list[cb_url]['expiration'] = time() + 24 * 3600

        return ""

    def render_GET(self, request): # pylint: disable=invalid-name
        """Handle polling requests from ST hub"""
        if request.path == '/status':
            if self.garage_door_status['last_state'] == 'inactive':
                cmd = 'status-inactive'
            else:
                cmd = 'status-active'
            msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target)
            logging.info("Polling request from %s for %s - returned %s",
                         request.getClientIP(),
                         request.path,
                         cmd)
            return msg
        else:
            logging.info("Received bogus request from %s for %s",
                         request.getClientIP(),
                         request.path)
            return ""

class MonitorCamera(object):
    """Monitors camera status, generating notifications whenever its state changes"""
    def __init__(self, device_target, subscription_list, camera_status): # pylint: disable=too-many-arguments
        self.device_target = device_target
        self.subscription_list = subscription_list
        self.camera_status = camera_status
		
	current_state = 'inactive'
	reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member

    def check_garage_state(self, current_state, auxcount):
		self.current_state = current_state
		self.auxcount = auxcount
		camera.capture(rawCapture, format="bgr", use_video_port=True)
		# grab the raw NumPy array representing the image
		frame = rawCapture.array
		# resize the frame and convert it to grayscale
		frame = imutils.resize(frame, width = 640)
		gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
		
		# detect faces in the image and then clone the frame
		# so that we can draw on it
		faceRects = fd.detect(gray, scaleFactor = 1.1, minNeighbors = 10,
			minSize = (30, 30))
		frameClone = frame.copy()
		if faceRects != ():
			auxcount = 0	
			if current_state == 'inactive':
				current_state = 'active'	
				logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state)
				self.camera_status['last_state'] = current_state
				self.notify_hubs()
		else: 
			auxcount = auxcount + 1
			if auxcount == 60:
				current_state = 'inactive'	
				logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state)
				self.camera_status['last_state'] = current_state
				self.notify_hubs()				
		
		# loop over the face bounding boxes and draw them
		for (fX, fY, fW, fH) in faceRects:
			cv2.rectangle(frameClone, (fX, fY), (fX + fW, fY + fH), (0, 255, 0), 2)
		
		# show the video feed on a new GUI window
		#cv2.imshow("Face", videorotate)
		
		rawCapture.truncate(0) 
		
		# Schedule next check
		reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member

    def notify_hubs(self):
        """Notify the subscribed SmartThings hubs that a state change has occurred"""
        if self.camera_status['last_state'] == 'inactive':
            cmd = 'status-inactive'
        else:
            cmd = 'status-active'
        for subscription in self.subscription_list:
            if self.subscription_list[subscription]['expiration'] > time():
                logging.info("Notifying hub %s", subscription)
                msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target)
                body = StringProducer(msg)
                agent = Agent(reactor)
                req = agent.request(
                    'POST',
                    subscription,
                    Headers({'CONTENT-LENGTH': [len(msg)]}),
                    body)
                req.addCallback(self.handle_response)
                req.addErrback(self.handle_error)

    def handle_response(self, response): # pylint: disable=no-self-use
        """Handle the SmartThings hub returning a status code to the POST.
           This is actually unexpected - it typically closes the connection
           for POST/PUT without giving a response code."""
        if response.code == 202:
        	logging.info("Status update accepted")
        else:
        	logging.error("Unexpected response code: %s", response.code)

    def handle_error(self, response): # pylint: disable=no-self-use
        """Handle errors generating performing the NOTIFY. There doesn't seem
           to be a way to avoid ResponseFailed - the SmartThings Hub
           doesn't generate a proper response code for POST or PUT, and if
           NOTIFY is used, it ignores the body."""
        if isinstance(response.value, ResponseFailed):
            logging.debug("Response failed (expected)")
        else:
            logging.error("Unexpected response: %s", response)

def main():
    """Main function to handle use from command line"""

    arg_proc = argparse.ArgumentParser(description='Provides camera active/inactive status to a SmartThings hub')
    arg_proc.add_argument('--httpport', dest='http_port', help='HTTP port number', default=8080, type=int)
    arg_proc.add_argument('--deviceindex', dest='device_index', help='Device index', default=1, type=int)
    arg_proc.add_argument('--pollingfreq', dest='polling_freq', help='Number of seconds between polling camera state', default=5, type=int)
    arg_proc.add_argument('--debug', dest='debug', help='Enable debug messages', default=False, action='store_true')
    options = arg_proc.parse_args()

    device_target = 'urn:schemas-upnp-org:device:RPi_Computer_Vision:%d' % (options.device_index)
    log_level = logging.INFO
    if options.debug:
        log_level = logging.DEBUG

    logging.basicConfig(format='%(asctime)-15s %(levelname)-8s %(message)s', level=log_level)

    subscription_list = {}
    camera_status = {'last_state': 'unknown'}

    logging.info('Initializing camera')
    
    # SSDP server to handle discovery
    SSDPServer(status_port=options.http_port, device_target=device_target)

    # HTTP site to handle subscriptions/polling
    status_site = server.Site(StatusServer(device_target, subscription_list, camera_status))
    reactor.listenTCP(options.http_port, status_site) # pylint: disable=no-member

    logging.info('Initialization complete')

    # Monitor camera state and send notifications on state change
    MonitorCamera(device_target=device_target,
                  subscription_list=subscription_list,
                  camera_status=camera_status) 

    reactor.run() # pylint: disable=no-member

if __name__ == "__main__":
    main()

Source: Computer Vision as Motion Sensor for SmartThings

About The Author

Muhammad Bilal

I am highly skilled and motivated individual with a Master's degree in Computer Science. I have extensive experience in technical writing and a deep understanding of SEO practices.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top