Spaces:
Runtime error
Runtime error
| # from flask import Flask, jsonify | |
| # import cloudinary | |
| # import cloudinary.api | |
| # import cloudinary.uploader | |
| # import os | |
| # import glob | |
| # import requests | |
| # from dotenv import load_dotenv | |
| # from deepface.commons.logger import Logger | |
| # logger = Logger(module="modules/cloudservice.py") | |
| # load_dotenv() | |
| # # Configure Cloudinary | |
| # cloudinary.config( | |
| # cloud_name=os.getenv('CLOUDINARY_CLOUD_NAME'), | |
| # api_key=os.getenv('CLOUDINARY_API_KEY'), | |
| # api_secret=os.getenv('CLOUDINARY_API_SECRET') | |
| # ) | |
| # def fetch_cloudinary_images(folder_name): | |
| # resources = [] | |
| # res = cloudinary.api.resources(type='upload', resource_type='image', prefix=f'mafqoud/images/{folder_name}') | |
| # resources.extend(res.get('resources', [])) | |
| # return resources | |
| # def download_image(url, local_path): | |
| # response = requests.get(url, stream=True) | |
| # if response.status_code == 200: | |
| # with open(local_path, 'wb') as file: | |
| # for chunk in response: | |
| # file.write(chunk) | |
| # def sync_folder(folder_name, local_dir): | |
| # cloudinary_images = fetch_cloudinary_images(folder_name) | |
| # cloudinary_urls = {img['secure_url']: img['public_id'] for img in cloudinary_images} | |
| # # Download new images and track downloaded image paths | |
| # downloaded_paths = [] | |
| # for img in cloudinary_images: | |
| # url = img['secure_url'] | |
| # public_id = img['public_id'] | |
| # file_name = url.split('/')[-1] # Get the actual file name | |
| # local_path = os.path.join(local_dir, file_name) | |
| # print(local_path) | |
| # if not os.path.exists(local_path): | |
| # download_image(url, local_path) | |
| # downloaded_paths.append(local_path) | |
| # # Remove old images | |
| # local_images = [os.path.join(local_dir, f) for f in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, f))] | |
| # for local_path in local_images: | |
| # if local_path not in downloaded_paths and (local_path.endswith('.jpg') or local_path.endswith('.jpeg') or local_path.endswith('.png')): | |
| # os.remove(local_path) | |
| # def delete_pkl_files(directory): | |
| # """Delete all .pkl files in the specified directory.""" | |
| # pkl_files = glob.glob(os.path.join(directory, '*.pkl')) | |
| # for pkl_file in pkl_files: | |
| # os.remove(pkl_file) | |
| from flask import Flask, jsonify | |
| import cloudinary | |
| import cloudinary.api | |
| import cloudinary.uploader | |
| import os | |
| import glob | |
| import requests | |
| from dotenv import load_dotenv | |
| from deepface.commons.logger import Logger | |
| from deepface import DeepFace # Assuming this is the correct import for training | |
| logger = Logger(module="modules/cloudservice.py") | |
| load_dotenv() | |
| # Configure Cloudinary | |
| cloudinary.config( | |
| cloud_name=os.getenv('CLOUDINARY_CLOUD_NAME'), | |
| api_key=os.getenv('CLOUDINARY_API_KEY'), | |
| api_secret=os.getenv('CLOUDINARY_API_SECRET') | |
| ) | |
| def fetch_cloudinary_images(folder_name): | |
| resources = [] | |
| next_cursor = None | |
| while True: | |
| if next_cursor: | |
| res = cloudinary.api.resources( | |
| type='upload', | |
| resource_type='image', | |
| prefix=f'mafqoud/images/{folder_name}', | |
| max_results=500, | |
| next_cursor=next_cursor | |
| ) | |
| else: | |
| res = cloudinary.api.resources( | |
| type='upload', | |
| resource_type='image', | |
| prefix=f'mafqoud/images/{folder_name}', | |
| max_results=500 | |
| ) | |
| resources.extend(res.get('resources', [])) | |
| next_cursor = res.get('next_cursor') | |
| if not next_cursor: | |
| break | |
| return resources | |
| def download_image(url, local_path): | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| with open(local_path, 'wb') as file: | |
| for chunk in response: | |
| file.write(chunk) | |
| def sync_folder(folder_name, local_dir): | |
| cloudinary_images = fetch_cloudinary_images(folder_name) | |
| cloudinary_urls = {img['secure_url']: img['public_id'] for img in cloudinary_images} | |
| # Download new images and track downloaded image paths | |
| downloaded_paths = [] | |
| for img in cloudinary_images: | |
| url = img['secure_url'] | |
| public_id = img['public_id'] | |
| file_name = url.split('/')[-1] # Get the actual file name | |
| local_path = os.path.join(local_dir, file_name) | |
| print(local_path) | |
| if not os.path.exists(local_path): | |
| download_image(url, local_path) | |
| downloaded_paths.append(local_path) | |
| # Remove old images | |
| local_images = [os.path.join(local_dir, f) for f in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, f))] | |
| for local_path in local_images: | |
| if local_path not in downloaded_paths and (local_path.endswith('.jpg') or local_path.endswith('.jpeg') or local_path.endswith('.png')): | |
| os.remove(local_path) | |
| def delete_pkl_files(directory): | |
| """Delete all .pkl files in the specified directory.""" | |
| pkl_files = glob.glob(os.path.join(directory, '*.pkl')) | |
| for pkl_file in pkl_files: | |
| os.remove(pkl_file) | |