Meme-service/src/s3Client.py

195 lines
5.8 KiB
Python

import logging
import os
from minio.commonconfig import Tags
from minio import Minio
from minio.commonconfig import Tags
from minio.error import S3Error
from config import config
from datetime import datetime
from functools import lru_cache
S3_URL = ""
S3_UN = ""
S3_PW = ""
S3_TLS = True
S3_BUCKET = ""
CACHE_TTL = 10
gclient = None
def getClient():
global gclient
global S3_URL
global S3_UN
global S3_PW
global S3_TLS
global S3_BUCKET
if gclient != None:
return gclient
if "url" not in config["s3"]:
raise Exception("S3_URL is not set!")
S3_URL = config["s3.url"]
logging.info("Using S3_URL : " + S3_URL )
if "username" not in config["s3"]:
raise Exception("S3_UN is not set!")
S3_UN = config["s3.username"]
logging.info("Using S3_UN : " + S3_UN)
if "password" not in config["s3"]:
raise Exception("S3_PW is not set!")
S3_PW = config["s3.password"]
logging.info("Using S3_PW : " + S3_PW)
if "bucket" not in config["s3"]:
raise Exception("S3_BUCKET is not set!")
S3_BUCKET = config["s3.bucket"]
logging.info("Using S3_BUCKET : " + S3_BUCKET)
# override defaults
if "tls" in config["s3"]:
S3_TLS = config["s3.tls"]
logging.info("Using S3_TLS : " + str(S3_TLS))
client = Minio(S3_URL,
access_key=S3_UN,
secret_key=S3_PW,
secure=S3_TLS)
found = client.bucket_exists(S3_BUCKET)
if not found:
client.make_bucket(S3_BUCKET)
logging.info(f"Failed to find bucket " + S3_BUCKET + " so I made it instead")
else:
logging.info(f"Found bucket " + S3_BUCKET)
clientObject = Client(client)
gclient = clientObject
return gclient
class Client:
allMemes = set()
memesToMd5 = dict()
memesToTags = dict()
lastCheckedAllMemes = datetime.strptime("2000-01-01 01:01:01", "%Y-%m-%d %H:%M:%S")
client = None
def __init__(self, client):
global gclient
if gclient is not None:
raise Exception("Object already exists! use getClient")
logging.debug("Making a new client")
if client is not None:
self.client = client
else:
raise Exception("Improper object passed for client!")
self.getCurrentMemeList(force=True)
def getCurrentMemeList(self, force=False):
"""
Get a list of all memes in the bucket
"""
now = datetime.now()
if (now - self.lastCheckedAllMemes).seconds > 300 or force:
logging.info("Enough time has elapsed, refreshing meme cache...")
self.lastCheckedAllMemes = now
self.allMemes.clear()
self.memesToMd5.clear()
self.memesToTags.clear()
for obj in self.client.list_objects(S3_BUCKET):
if not obj.is_dir:
self.allMemes.add(obj.object_name)
self.memesToMd5[obj.etag] = obj.object_name
logging.info("Finished fetching " + str(len(self.allMemes)) + " memes")
return self.allMemes
@lru_cache(maxsize=32)
def getMeme(self, memeName: str):
"""
Return a meme with the exact given name, or raise an exception
"""
if not isinstance(memeName, str):
raise Exception("paramater memeName is of improper type, expected a str")
memeSet = self.getCurrentMemeList()
if memeName in memeSet:
reply = self.client.get_object(bucket_name=S3_BUCKET,
object_name=memeName)
return reply.read()
else:
raise Exception("Requested meme '" + memeName + "' not found")
return None
def addMeme(self, fileContents, name: str, tags: Tags = Tags.new_object_tags()):
result = self.client.put_object(bucket_name=S3_BUCKET,
object_name=name,
data=fileContents,
length=-1,
tags=tags,
part_size=10*1024*1024)
if result.etag in self.memesToMd5:
logging.info('Uploaded meme named ' + name + ' already exists')
self.client.remove_object(bucket_name=S3_BUCKET,
object_name=name)
return False
else:
self.allMemes.add(name)
self.memesToMd5[name] = result.etag
self.memesToTags[name] = tags
return True
@lru_cache(maxsize=32)
def getTagsOnMeme(self, memeName: str):
"""
Returns the S3 Tags object for a given meme
"""
if not isinstance(memeName, str):
raise Exception("paramater memeName is of improper type, expected a str")
if memeName in self.memesToTags:
return self.memesToTags[memeName]
memeSet = self.getCurrentMemeList()
if memeName in memeSet:
reply = self.client.get_object_tags(bucket_name=S3_BUCKET,
object_name=memeName)
self.memesToTags[memeName] = reply
return reply
else:
raise Exception("Requested meme '" + memeName + "' not found")
return None
def getShareForMeme(self, memeName: str) -> str:
"""
Returns the S3 bucket's share link for the meme
"""
if not isinstance(memeName, str):
raise Exception("paramater memeName is of improper type, expected a str")
if memeName not in self.getCurrentMemeList():
raise Exception("Requested meme '" + memeName + "' not found")
reply = self.client.get_presigned_url(
method="GET",
bucket_name=S3_BUCKET,
object_name=memeName)
return reply