General-API/src/routers/images.py

119 lines
3.1 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Header, BackgroundTasks
from fastapi.responses import FileResponse, Response
from typing import List, Optional
from pydantic import BaseModel
import logging
from config import getConfig
from minio import Minio
import random
import tempfile
import traceback
import os
from datetime import date
router = APIRouter(
prefix="/images",
tags=["Images"],
responses={404: {"description": "Not found"}}
)
today = None
todays_image = None
def get_minio_client():
endpoint = getConfig()["images.endpoint"]
access_key = getConfig()["images.access_key"]
secret_key = getConfig()["images.secret_key"]
bucket = getConfig()["images.bucket"]
secure = getConfig()["images.secure"]
client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
return client
def get_random_object(client, bucket : str = getConfig()["images.bucket"]) -> str:
objects = client.list_objects(bucket)
object_names = [obj.object_name for obj in objects]
if not object_names:
raise HTTPException(status_code=500, detail="No data in bucket!")
random_object_name = random.choice(object_names)
return random_object_name
def fetch_object(client, object_name : str, bucket : str = getConfig()["images.bucket"]):
return client.get_object(bucket, object_name)
@router.get("/background/random")
async def background():
"""
Will obtain a random image from the configured S3 bucket.
"""
try:
client = get_minio_client()
random_object_name = get_random_object(client)
print(random_object_name)
file_response = None
try:
response = fetch_object(client, random_object_name)
file_response = Response(response.data, media_type="image/*")
finally:
response.close()
response.release_conn()
return file_response
except Exception as err:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Unknown error. Check logs {err}")
print(f"Error occurued processing background {err}")
@router.get("/background/today")
@router.get("/background/daily")
async def background_of_the_day():
"""
Obtain a random image, and consistently return it for the rest of the day
"""
global today
global todays_image
try:
if todays_image is None or today != date.today():
today = date.today()
client = get_minio_client()
random_object_name = get_random_object(client)
print(random_object_name)
try:
response = fetch_object(client, random_object_name)
todays_image = response.data
finally:
response.close()
response.release_conn()
return Response(todays_image, media_type="image/*")
except Exception as err:
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Unknown error. Check logs {err}")
print(f"Error occurued processing background {err}")