import io from fastapi import FastAPI, Response import uvicorn from picamera2 import Picamera2, Preview from threading import Condition import logging from picamera2.encoders import MJPEGEncoder, Quality from picamera2.outputs import FileOutput from starlette.background import BackgroundTask from starlette.responses import StreamingResponse app = FastAPI() @app.get("/image") def get_image(): """ Obtain a still image from the camera :return: """ picam2 = Picamera2() capture_config = picam2.create_still_configuration(main={"size": (1920,1080)}) picam2.configure(capture_config) data = io.BytesIO() picam2.start() image = picam2.capture_image() image.save(data, format="jpeg") picam2.stop() picam2.close() return Response(content=data.getvalue(), media_type="image/jpeg") class StreamingOutput(io.BufferedIOBase): def __init(self): self.frame = None self.condition = Condition() def write(self, buffer): with self.condition: self.frame = buffer self.condition.notify_all() def read(self): with self.condition: self.condition.wait() return self.frame def generate_frames(output): while True: try: frame = output.read() yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n") except Exception as e: logging.error(f"Error in generate_frames: {str(e)}") break print("done") @app.get("/mjpeg") async def mjpeg(): picam2 = Picamera2() capture_config = picam2.create_still_configuration(main={"size": (1920,1080)}) picam2.configure(capture_config) output = StreamingOutput() picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.VERY_HIGH) def stop(): logging.info("Stopping recording") picam2.stop_recording() picam2.close() return StreamingResponse( generate_frames(output), media_type="multipart/x-mixed-replace; boundry=frame", background=BackgroundTask(stop) ) def main(): uvicorn.run( "main:app", host="0.0.0.0", port=8080 ) if __name__ == "__main__": main()