94 lines
2.2 KiB
Python
94 lines
2.2 KiB
Python
import io
|
|
|
|
from fastapi import FastAPI, Response
|
|
import uvicorn
|
|
from picamera2 import Picamera2, Preview
|
|
from threading import Condition
|
|
import logging
|
|
|
|
from picamera2.encoders import MJPEGEncoder, Quality
|
|
from picamera2.outputs import FileOutput
|
|
from starlette.background import BackgroundTask
|
|
from starlette.responses import StreamingResponse
|
|
|
|
app = FastAPI()
|
|
|
|
@app.get("/image")
|
|
def get_image():
|
|
"""
|
|
Obtain a still image from the camera
|
|
:return:
|
|
"""
|
|
picam2 = Picamera2()
|
|
capture_config = picam2.create_still_configuration(main={"size": (1920,1080)})
|
|
picam2.configure(capture_config)
|
|
|
|
data = io.BytesIO()
|
|
picam2.start()
|
|
image = picam2.capture_image()
|
|
|
|
image.save(data, format="jpeg")
|
|
|
|
picam2.stop()
|
|
picam2.close()
|
|
|
|
return Response(content=data.getvalue(), media_type="image/jpeg")
|
|
|
|
class StreamingOutput(io.BufferedIOBase):
|
|
def __init__(self):
|
|
self.frame = None
|
|
self.condition = Condition()
|
|
|
|
def write(self, buffer):
|
|
with self.condition:
|
|
self.frame = buffer
|
|
self.condition.notify_all()
|
|
|
|
def read(self):
|
|
with self.condition:
|
|
self.condition.wait()
|
|
return self.frame
|
|
|
|
def generate_frames(output):
|
|
while True:
|
|
try:
|
|
frame = output.read()
|
|
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
|
|
except Exception as e:
|
|
logging.error(f"Error in generate_frames: {str(e)}")
|
|
break
|
|
print("done")
|
|
|
|
@app.get("/mjpeg")
|
|
async def mjpeg():
|
|
picam2 = Picamera2()
|
|
capture_config = picam2.create_still_configuration(main={"size": (1920,1080)})
|
|
picam2.configure(capture_config)
|
|
|
|
output = StreamingOutput()
|
|
encoder = MJPEGEncoder(output=FileOutput(output))
|
|
|
|
picam2.start_recording(encoder, Quality.VERY_HIGH)
|
|
|
|
def stop():
|
|
logging.info("Stopping recording")
|
|
picam2.stop_recording()
|
|
picam2.close()
|
|
|
|
return StreamingResponse(
|
|
generate_frames(output),
|
|
media_type="multipart/x-mixed-replace; boundary=frame",
|
|
background=BackgroundTask(stop)
|
|
)
|
|
|
|
|
|
def main():
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=8080
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |