2025-06-29 16:38:58 -04:00

95 lines
2.2 KiB
Python

import io
from fastapi import FastAPI, Response
import uvicorn
from picamera2 import Picamera2, Preview
from threading import Condition
import logging
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
app = FastAPI()
@app.get("/image")
def get_image():
"""
Obtain a still image from the camera
:return:
"""
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920,1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
image = picam2.capture_image()
image.save(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buffer):
with self.condition:
self.frame = buffer
self.condition.notify_all()
def read(self):
with self.condition:
self.condition.wait()
return self.frame
def generate_frames(output):
while True:
try:
frame = output.read()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logging.error(f"Error in generate_frames: {str(e)}")
break
print("done")
@app.get("/mjpeg")
async def mjpeg():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920,1080)})
picam2.configure(capture_config)
output = StreamingOutput()
encoder = MJPEGEncoder()
encoder.output = FileOutput(output)
picam2.start_recording(encoder, Quality.VERY_HIGH)
def stop():
logging.info("Stopping recording")
picam2.stop_recording()
picam2.close()
return StreamingResponse(
generate_frames(output),
media_type="multipart/x-mixed-replace; boundary=frame",
background=BackgroundTask(stop)
)
def main():
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8080
)
if __name__ == "__main__":
main()