Using a Raspberry Pi 3 and a PiCam this computer vision powered sensor detects faces and sends presence data over LAN – UPNP to SmartThings.
I will start by assuming that you have a Raspberry Pi 3 with a working camera and Open CV installed on it. If you don’t I recommend this tutorial 🙂
Creating a Custom SmartThings Device Handler
In the SmartThings IDE, we create a new device handler for our Computer Vision Motion sensor.
Go to the “My Device Handler” section, click “+ Create New Device Handler” on the top right corner.
In this case we will create it from code. Click on the second tab “From Code”, paste the attached code “Device Handler” and click the “Create”.
In the next page click “Publish” to make the device available to you.
Writing the SmartThings App
Similar to the Device Handler we will go to the “My SmartApps” section, click “+ Create New SmartApps” on the top right corner.
We will also create it from code. Click on the second tab “From Code”, paste the attached code “SmartApp” and click “Create”.
In the next page click “Publish”.
Getting the Raspberry Pi Ready
Now we have to add the python script that will get images from the camera, detecting faces and reporting to SmartThings.
First download and install imutils and twisted
If you don’t already have imutils package installed, you’ll want to grab that from GitHub or install it via:
pip install imutils
For twisted:
sudo apt-get install python-twisted-web
Now that everything is ready go to /home/pi and create a directory to store the script
mkdir camera
cd camera
Create the script file:
sudo nano ssdpcamera.py
Paste the attached code “Camera Python Script” and save by pressing “control + x” and then “y” and enter.
Test the script by typing python ssdpcamera.py you should see something like this:
Discovering and Pairing the Raspberry Pi
From the SmartThings mobile app we should go to Marketplace in the bottom right corner, click on the “SmartApps” tab and finally look in “+ My Apps” for “Computer Vision (Connect)”
Make sure that the Raspberry Pi is on and the python script is running.
The SmartApp will start the discovery process and once found, click on the selection dialog and select the device and click “Done”.
This will create the device in your account and start receiving updates.
Auto Start
Finally if you want to run the python script automatically when you turn on the Raspberry Pi you can edit /etc/rc.local and add the following line.
(sleep 10;python /home/pi/camera/ssdpcamera.py)&
The () make both commands run in the background.
Code
#!/usr/bin/python2.7 """ Computer Vision Camera for SmartThings Copyright 2016 Juan Pablo Risso <[email protected]> Dependencies: python-twisted, cv2, pyimagesearch Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import argparse import logging import cv2 import urllib2 import imutils from time import time from picamera.array import PiRGBArray from picamera import PiCamera from twisted.web import server, resource from twisted.internet import reactor from twisted.internet.defer import succeed from twisted.internet.protocol import DatagramProtocol from twisted.web.client import Agent from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer from twisted.web._newclient import ResponseFailed from zope.interface import implements SSDP_PORT = 1900 SSDP_ADDR = '239.255.255.250' UUID = 'd1c58eb4-9220-11e4-96fa-123b93f75cba' SEARCH_RESPONSE = 'HTTP/1.1 200 OK\r\nCACHE-CONTROL:max-age=30\r\nEXT:\r\nLOCATION:%s\r\nSERVER:Linux, UPnP/1.0, Pi_Garage/1.0\r\nST:%s\r\nUSN:uuid:%s::%s' # initialize the camera and grab a reference to the raw camera # capture camera = PiCamera() camera.resolution = (640, 480) camera.framerate = 32 rawCapture = PiRGBArray(camera, size=(640, 480)) auxcount = 0 # construct the face detector and allow the camera to warm up fd = FaceDetector("cascades/haarcascade_frontalface_default.xml") time.sleep(0.1) def determine_ip_for_host(host): """Determine local IP address used to communicate with a particular host""" test_sock = DatagramProtocol() test_sock_listener = reactor.listenUDP(0, test_sock) # pylint: disable=no-member test_sock.transport.connect(host, 1900) my_ip = test_sock.transport.getHost().host test_sock_listener.stopListening() return my_ip class StringProducer(object): """Writes an in-memory string to a Twisted request""" implements(IBodyProducer) def __init__(self, body): self.body = body self.length = len(body) def startProducing(self, consumer): # pylint: disable=invalid-name """Start producing supplied string to the specified consumer""" consumer.write(self.body) return succeed(None) def pauseProducing(self): # pylint: disable=invalid-name """Pause producing - no op""" pass def stopProducing(self): # pylint: disable=invalid-name """ Stop producing - no op""" pass class SSDPServer(DatagramProtocol): """Receive and response to M-SEARCH discovery requests from SmartThings hub""" def __init__(self, interface='', status_port=0, device_target=''): self.interface = interface self.device_target = device_target self.status_port = status_port self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) # pylint: disable=no-member self.port.joinGroup(SSDP_ADDR, interface=interface) reactor.addSystemEventTrigger('before', 'shutdown', self.stop) # pylint: disable=no-member def datagramReceived(self, data, (host, port)): try: header, _ = data.split(b'\r\n\r\n')[:2] except ValueError: return lines = header.split('\r\n') cmd = lines.pop(0).split(' ') lines = [x.replace(': ', ':', 1) for x in lines] lines = [x for x in lines if len(x) > 0] headers = [x.split(':', 1) for x in lines] headers = dict([(x[0].lower(), x[1]) for x in headers]) logging.debug('SSDP command %s %s - from %s:%d with headers %s', cmd[0], cmd[1], host, port, headers) search_target = '' if 'st' in headers: search_target = headers['st'] if cmd[0] == 'M-SEARCH' and cmd[1] == '*' and search_target in self.device_target: logging.info('Received %s %s for %s from %s:%d', cmd[0], cmd[1], search_target, host, port) url = 'http://%s:%d/status' % (determine_ip_for_host(host), self.status_port) response = SEARCH_RESPONSE % (url, search_target, UUID, self.device_target) self.port.write(response, (host, port)) else: logging.debug('Ignored SSDP command %s %s', cmd[0], cmd[1]) def stop(self): """Leave multicast group and stop listening""" self.port.leaveGroup(SSDP_ADDR, interface=self.interface) self.port.stopListening() class StatusServer(resource.Resource): """HTTP server that serves the status of the camera to the SmartThings hub""" isLeaf = True def __init__(self, device_target, subscription_list, garage_door_status): self.device_target = device_target self.subscription_list = subscription_list self.garage_door_status = garage_door_status resource.Resource.__init__(self) def render_SUBSCRIBE(self, request): # pylint: disable=invalid-name """Handle subscribe requests from ST hub - hub wants to be notified of garage door status updates""" headers = request.getAllHeaders() logging.debug("SUBSCRIBE: %s", headers) if 'callback' in headers: cb_url = headers['callback'][1:-1] if not cb_url in self.subscription_list: self.subscription_list[cb_url] = {} #reactor.stop() logging.info('Added subscription %s', cb_url) self.subscription_list[cb_url]['expiration'] = time() + 24 * 3600 return "" def render_GET(self, request): # pylint: disable=invalid-name """Handle polling requests from ST hub""" if request.path == '/status': if self.garage_door_status['last_state'] == 'inactive': cmd = 'status-inactive' else: cmd = 'status-active' msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target) logging.info("Polling request from %s for %s - returned %s", request.getClientIP(), request.path, cmd) return msg else: logging.info("Received bogus request from %s for %s", request.getClientIP(), request.path) return "" class MonitorCamera(object): """Monitors camera status, generating notifications whenever its state changes""" def __init__(self, device_target, subscription_list, camera_status): # pylint: disable=too-many-arguments self.device_target = device_target self.subscription_list = subscription_list self.camera_status = camera_status current_state = 'inactive' reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member def check_garage_state(self, current_state, auxcount): self.current_state = current_state self.auxcount = auxcount camera.capture(rawCapture, format="bgr", use_video_port=True) # grab the raw NumPy array representing the image frame = rawCapture.array # resize the frame and convert it to grayscale frame = imutils.resize(frame, width = 640) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # detect faces in the image and then clone the frame # so that we can draw on it faceRects = fd.detect(gray, scaleFactor = 1.1, minNeighbors = 10, minSize = (30, 30)) frameClone = frame.copy() if faceRects != (): auxcount = 0 if current_state == 'inactive': current_state = 'active' logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state) self.camera_status['last_state'] = current_state self.notify_hubs() else: auxcount = auxcount + 1 if auxcount == 60: current_state = 'inactive' logging.info('State changed from %s to %s', self.camera_status['last_state'], current_state) self.camera_status['last_state'] = current_state self.notify_hubs() # loop over the face bounding boxes and draw them for (fX, fY, fW, fH) in faceRects: cv2.rectangle(frameClone, (fX, fY), (fX + fW, fY + fH), (0, 255, 0), 2) # show the video feed on a new GUI window #cv2.imshow("Face", videorotate) rawCapture.truncate(0) # Schedule next check reactor.callLater(0, self.check_garage_state, current_state, auxcount) # pylint: disable=no-member def notify_hubs(self): """Notify the subscribed SmartThings hubs that a state change has occurred""" if self.camera_status['last_state'] == 'inactive': cmd = 'status-inactive' else: cmd = 'status-active' for subscription in self.subscription_list: if self.subscription_list[subscription]['expiration'] > time(): logging.info("Notifying hub %s", subscription) msg = '<msg><cmd>%s</cmd><usn>uuid:%s::%s</usn></msg>' % (cmd, UUID, self.device_target) body = StringProducer(msg) agent = Agent(reactor) req = agent.request( 'POST', subscription, Headers({'CONTENT-LENGTH': [len(msg)]}), body) req.addCallback(self.handle_response) req.addErrback(self.handle_error) def handle_response(self, response): # pylint: disable=no-self-use """Handle the SmartThings hub returning a status code to the POST. This is actually unexpected - it typically closes the connection for POST/PUT without giving a response code.""" if response.code == 202: logging.info("Status update accepted") else: logging.error("Unexpected response code: %s", response.code) def handle_error(self, response): # pylint: disable=no-self-use """Handle errors generating performing the NOTIFY. There doesn't seem to be a way to avoid ResponseFailed - the SmartThings Hub doesn't generate a proper response code for POST or PUT, and if NOTIFY is used, it ignores the body.""" if isinstance(response.value, ResponseFailed): logging.debug("Response failed (expected)") else: logging.error("Unexpected response: %s", response) def main(): """Main function to handle use from command line""" arg_proc = argparse.ArgumentParser(description='Provides camera active/inactive status to a SmartThings hub') arg_proc.add_argument('--httpport', dest='http_port', help='HTTP port number', default=8080, type=int) arg_proc.add_argument('--deviceindex', dest='device_index', help='Device index', default=1, type=int) arg_proc.add_argument('--pollingfreq', dest='polling_freq', help='Number of seconds between polling camera state', default=5, type=int) arg_proc.add_argument('--debug', dest='debug', help='Enable debug messages', default=False, action='store_true') options = arg_proc.parse_args() device_target = 'urn:schemas-upnp-org:device:RPi_Computer_Vision:%d' % (options.device_index) log_level = logging.INFO if options.debug: log_level = logging.DEBUG logging.basicConfig(format='%(asctime)-15s %(levelname)-8s %(message)s', level=log_level) subscription_list = {} camera_status = {'last_state': 'unknown'} logging.info('Initializing camera') # SSDP server to handle discovery SSDPServer(status_port=options.http_port, device_target=device_target) # HTTP site to handle subscriptions/polling status_site = server.Site(StatusServer(device_target, subscription_list, camera_status)) reactor.listenTCP(options.http_port, status_site) # pylint: disable=no-member logging.info('Initialization complete') # Monitor camera state and send notifications on state change MonitorCamera(device_target=device_target, subscription_list=subscription_list, camera_status=camera_status) reactor.run() # pylint: disable=no-member if __name__ == "__main__": main() Source: Computer Vision as Motion Sensor for SmartThings