利用orangepi +摄像头 使用python写一个延时摄影的程序。
一,思路
orangepi 外接摄像头,利用opencv,按照一定间隔取图,再利用opencv 将图片编码成视频。
利用mqtt进行通讯,可以写一个app进行控制。
二,代码
# -*- coding: utf-8 -*-
import cv2
import random
import paho.mqtt.client as mqtt
import threading
import time
import shutil
import os
import json
from PIL import Image
class Camera(object):
def __init__(self,_host,_port):
self.host=_host
self.port=_port
self.client_id='python-mqtt-{}'.format(random.randint(0,1000))
self.client=mqtt.Client(self.client_id)
self.camera_start='camera_start'
self.camera_stop='camera_stop'
self.cap = cv2.VideoCapture(1)
self.isRuning=False
#3秒1张 12小时 压缩成120fps = 两分钟视频
self.interval=3
#录制多少小时
self.duration=0
self.imagePath='/mnt/usb/image'
self.videoPath='/mnt/usb/video'
#开始录制
def start(self):
index=0
shutil.rmtree(self.imagePath)
os.makedirs(self.imagePath)
print('开始录制视频 ',self.duration)
while self.isRuning:
ret,frame = self.cap.read()
if ret:
imgName=self.imagePath+'/{}.jpg'.format(index)
cv2.imwrite(imgName,frame)
index+=1
# frame = cv2.imread('test.jpeg')
# imgName=self.imagePath+'/{}.jpg'.format(index)
# cv2.imwrite(imgName,frame)
# index+=1
time.sleep(self.interval)
#到时间自动停止
if self.duration !=0:
if (60*60*self.duration)/self.interval<=index:
self.stop()
#停止录制
def stop(self):
print('停止录制 开始生成视频')
self.isRuning = False
time.sleep(self.interval)
current_timestamp = time.time()
local_time = time.localtime(current_timestamp)
current_time = time.strftime('%Y-%m-%d_%H:%M:%S',local_time)
videoFile = self.videoPath+"/"+current_time+".mp4"
t = threading.Thread(target=self.image_to_video(self.imagePath,videoFile,120))
t.start()
#生成视频
def image_to_video(self,_image_path, _media_path, _fps):
image_names = os.listdir(_image_path)
image_names.sort(key=lambda n: int(n[:-4]))
fourcc = cv2.VideoWriter_fourcc('M','P','4','V')
fps = _fps
image = Image.open(os.path.join(_image_path, image_names[0]))
media_writer = cv2.VideoWriter(_media_path, fourcc, fps, image.size)
for image_name in image_names:
print(image_name)
im = cv2.imread(os.path.join(_image_path, image_name))
media_writer.write(im)
media_writer.release()
print('视频写入完成')
def __del__(self):
self.client.disconnect()
self.cap.release()
def on_message(self,client,userdata,message):
print('收到消息 ',message.topic,message.payload.decode('utf-8'))
if message.topic==self.camera_start:
if self.isRuning==False:
self.isRuning=True
payloadStr = message.payload.decode('utf-8')
payloadJ = json.loads(payloadStr)
self.duration = payloadJ["duration"]
t = threading.Thread(target=self.start)
t.start()
elif message.topic == self.camera_stop:
self.stop()
def run(self):
self.client.connect(self.host,self.port)
self.client.on_message=self.on_message
self.client.subscribe(self.camera_start)
self.client.subscribe(self.camera_stop)
self.client.loop_forever()
if __name__ =='__main__':
try:
camera = Camera("127.0.0.1",1883)
camera.run()
except KeyboardInterrupt:
del camera
三,粗糙的设备
四,拍摄了一个晚上的街景。 摄像头有点差 而且大雾 看的很模糊
2023-10-31_23_58_20