upload-sorter python wip
This commit is contained in:
		| @@ -1,33 +1,160 @@ | ||||
| #!/usr/bin/python | ||||
| #!/usr/bin/env python | ||||
|  | ||||
| import time | ||||
| import os | ||||
| from time import struct_time | ||||
| from os import getenv | ||||
| import signal | ||||
| import sys | ||||
| import json | ||||
| from pathlib import Path | ||||
| from PIL import Image, ExifTags, UnidentifiedImageError | ||||
|  | ||||
| # Set the directories | ||||
| watch_directories: list[str] = os.getenv('WATCH_DIR', '/sync').split(':') | ||||
| recordings_directory: str = os.getenv('RECORDINGS_DIRECTORY', '/recordings') | ||||
| photos_directory: str = os.getenv('PHOTOS_DIRECTORY', '/photos') | ||||
| source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')] | ||||
| recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve() | ||||
| photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve() | ||||
| photo_extensions: list[str] = ['.jpg', '.jpeg'] | ||||
| video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv'] | ||||
| sleep_duration: int = 30 | ||||
|  | ||||
| def main(): | ||||
|      | ||||
|     # Check the source directories exist and create the ignore files if they don't exist | ||||
|     for directory in source_directories: | ||||
|         if not directory.exists() or not directory.is_dir(): | ||||
|             print(f"Directory {directory} does not exist or is not a directory. Exiting.") | ||||
|             sys.exit(1) | ||||
|         ignore_file: Path = directory.joinpath("ignored_files.json") | ||||
|         if not ignore_file.exists(): | ||||
|             with open('ignored_files.json', 'w') as f: | ||||
|                 json.dump([], f) | ||||
|  | ||||
|     # Check the destination directories exist and are writable | ||||
|     for directory in [recordings_directory, photos_directory]: | ||||
|         if not directory.exists() or not directory.is_dir() : | ||||
|             print(f"Destination directory {directory} does not exist or is not a directory. Exiting.") | ||||
|             sys.exit(1) | ||||
|         write_test_file: Path = directory.joinpath("write_test") | ||||
|         try: | ||||
|             write_test_file.touch() | ||||
|             write_test_file.unlink() | ||||
|         except PermissionError: | ||||
|             print(f"Destination directory {directory} is not writable") | ||||
|             sys.exit(1) | ||||
|  | ||||
|     # Ensure folders for the current year exist | ||||
|     current_year: str = time.strftime("%Y") | ||||
|     for directory in [recordings_directory, photos_directory]: | ||||
|         year_directory: Path = directory.joinpath(current_year) | ||||
|         if not year_directory.exists(): | ||||
|             year_directory.mkdir() | ||||
|  | ||||
|     # Start the main loop | ||||
|     while True: | ||||
|         for directory in watch_directories: | ||||
|             sort_directory(directory) | ||||
|         for directory in source_directories: | ||||
|             print(f"Starting sort of directory {directory}") | ||||
|             start_time = time.time() | ||||
|             sort_directory(directory = directory) | ||||
|             end_time = time.time() | ||||
|             print(f"Finished sorting directory {directory}") | ||||
|             print(f"Time taken: {end_time - start_time} seconds") | ||||
|             print(f"Sleeping for {sleep_duration} seconds") | ||||
|             time.sleep(sleep_duration) | ||||
|  | ||||
| def sort_directory(directory): | ||||
|      | ||||
|          | ||||
|         signal.signal(signal.SIGINT, signal_handler) | ||||
|         signal.signal(signal.SIGTERM, signal_handler) | ||||
| def sort_directory(directory: Path): | ||||
|     # Load the ignored files list | ||||
|     ignore_file = directory.joinpath("ignored_files.json") | ||||
|     with open(file= ignore_file) as f: | ||||
|         ignore_list: list[str] = json.load(f) | ||||
|     # Get all files in the directory | ||||
|     file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()] | ||||
|     # Filter out the ignored files | ||||
|     sort_list = [file for file in file_list if file.name not in ignore_list] | ||||
|     # Create a list of photo files | ||||
|     photo_list = [file for file in sort_list if file.suffix in photo_extensions] | ||||
|     # Create a list of video files | ||||
|     video_list = [file for file in sort_list if file.suffix in video_extensions] | ||||
|     print(f''' | ||||
|         Found {str(len(file_list))} total files in directory {directory.name}. | ||||
|             {str(len(ignore_list))} to ignore. | ||||
|             {str(len(sort_list))} to sort. | ||||
|             {str(len(photo_list))} photos. | ||||
|             {str(len(video_list))} videos. | ||||
|     ''') | ||||
|     # Sort the photos | ||||
|     print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}") | ||||
|     ignored_files = sort_photos(photo_list, ignored_files) | ||||
|     # Sort the videos  | ||||
|     print(f"Sorting {str(len(video_list))} videos from directory {directory.name}") | ||||
|     ignored_files = sort_videos(video_list, ignored_files) | ||||
|     # Update the ignore_file | ||||
|     with open(file= ignore_file, mode='w') as f: | ||||
|         json.dump(ignored_files, f) | ||||
|  | ||||
| def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]: | ||||
|     # Init the previous_name for collision avoidance | ||||
|     previous_name: str = "" | ||||
|     for photo in photo_list: | ||||
|         # Read the exif data from the photo | ||||
|         try: | ||||
|             with Image.open(photo.resolve()) as image: | ||||
|                 exif = image.getexif() | ||||
|         except OSError: | ||||
|             print(f"File {photo.name} could not be opened (permissions?), ignoring.") | ||||
|             continue | ||||
|         except UnidentifiedImageError: | ||||
|             print(f"File {photo.name} is not a valid image file. Ignoring.") | ||||
|             ignored_files.append(photo.name) | ||||
|             continue | ||||
|         # Format the date | ||||
|         if exif[ExifTags.Base.DateTimeOriginal] is not None and exif[ExifTags.Base.DateTimeOriginal] != '': | ||||
|             image_timestamp: struct_time = time.strptime( | ||||
|                 data_string = exif[ExifTags.Base.DateTimeOriginal], | ||||
|                 format = "%Y:%m:%d %H:%M:%S" | ||||
|             ) | ||||
|             image_year: str = time.strftime( | ||||
|                 format = "%Y", | ||||
|                 time_tuple = image_timestamp | ||||
|             ) | ||||
|             image_date_string: str = time.strftime( | ||||
|                 format = "%Y-%m-%d", | ||||
|                 time_tuple = image_timestamp | ||||
|             ) | ||||
|             image_time_string: str = time.strftime( | ||||
|                 format = "%H-%M-%S", | ||||
|                 time_tuple = image_timestamp | ||||
|             ) | ||||
|         else: | ||||
|             print(f"File {photo.name} does not have a valid creation date, ignoring.") | ||||
|             ignored_files.append(photo.name) | ||||
|             continue | ||||
|         # Set index_count for this iteration | ||||
|         index_count: int = 0 | ||||
|         # Format the new name | ||||
|         new_name: str = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" | ||||
|         while new_name == previous_name: | ||||
|             index_count += 1 | ||||
|             new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" | ||||
|         # Move the file | ||||
|         new_path: Path = photos_directory.joinpath(image_year, new_name).resolve() | ||||
|         if not new_path.exists(): | ||||
|             photo.rename(new_path) | ||||
|         else: | ||||
|             print(f"File {new_name} already exists, ignoring.") | ||||
|             ignored_files.append(photo.name) | ||||
|         # Update the previous_name for next iteration | ||||
|         previous_name = new_name | ||||
|     return ignored_files | ||||
|  | ||||
| def sort_videos(video_list: list[Path], ignored_files: list[str]) -> list[str]: | ||||
|     print("Video sorting not implemened yet.") | ||||
|     return ignored_files | ||||
|  | ||||
| def signal_handler(sig, frame): | ||||
|             print('Shutdown signal received. Exiting gracefully.') | ||||
|             sys.exit(0) | ||||
|  | ||||
| signal.signal(signal.SIGINT, signal_handler) | ||||
| signal.signal(signal.SIGTERM, signal_handler) | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
		Reference in New Issue
	
	Block a user