diff --git a/upload-sorter/sort.py b/upload-sorter/sort.py index 3cebe23..5cc9de7 100644 --- a/upload-sorter/sort.py +++ b/upload-sorter/sort.py @@ -1,33 +1,160 @@ -#!/usr/bin/python +#!/usr/bin/env python import time -import os +from time import struct_time +from os import getenv import signal import sys +import json +from pathlib import Path +from PIL import Image, ExifTags, UnidentifiedImageError # Set the directories -watch_directories: list[str] = os.getenv('WATCH_DIR', '/sync').split(':') -recordings_directory: str = os.getenv('RECORDINGS_DIRECTORY', '/recordings') -photos_directory: str = os.getenv('PHOTOS_DIRECTORY', '/photos') +source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')] +recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve() +photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve() +photo_extensions: list[str] = ['.jpg', '.jpeg'] +video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv'] sleep_duration: int = 30 def main(): + + # Check the source directories exist and create the ignore files if they don't exist + for directory in source_directories: + if not directory.exists() or not directory.is_dir(): + print(f"Directory {directory} does not exist or is not a directory. Exiting.") + sys.exit(1) + ignore_file: Path = directory.joinpath("ignored_files.json") + if not ignore_file.exists(): + with open('ignored_files.json', 'w') as f: + json.dump([], f) + + # Check the destination directories exist and are writable + for directory in [recordings_directory, photos_directory]: + if not directory.exists() or not directory.is_dir() : + print(f"Destination directory {directory} does not exist or is not a directory. Exiting.") + sys.exit(1) + write_test_file: Path = directory.joinpath("write_test") + try: + write_test_file.touch() + write_test_file.unlink() + except PermissionError: + print(f"Destination directory {directory} is not writable") + sys.exit(1) + + # Ensure folders for the current year exist + current_year: str = time.strftime("%Y") + for directory in [recordings_directory, photos_directory]: + year_directory: Path = directory.joinpath(current_year) + if not year_directory.exists(): + year_directory.mkdir() + + # Start the main loop while True: - for directory in watch_directories: - sort_directory(directory) + for directory in source_directories: + print(f"Starting sort of directory {directory}") + start_time = time.time() + sort_directory(directory = directory) + end_time = time.time() print(f"Finished sorting directory {directory}") + print(f"Time taken: {end_time - start_time} seconds") print(f"Sleeping for {sleep_duration} seconds") time.sleep(sleep_duration) -def sort_directory(directory): - - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) +def sort_directory(directory: Path): + # Load the ignored files list + ignore_file = directory.joinpath("ignored_files.json") + with open(file= ignore_file) as f: + ignore_list: list[str] = json.load(f) + # Get all files in the directory + file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()] + # Filter out the ignored files + sort_list = [file for file in file_list if file.name not in ignore_list] + # Create a list of photo files + photo_list = [file for file in sort_list if file.suffix in photo_extensions] + # Create a list of video files + video_list = [file for file in sort_list if file.suffix in video_extensions] + print(f''' + Found {str(len(file_list))} total files in directory {directory.name}. + {str(len(ignore_list))} to ignore. + {str(len(sort_list))} to sort. + {str(len(photo_list))} photos. + {str(len(video_list))} videos. + ''') + # Sort the photos + print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}") + ignored_files = sort_photos(photo_list, ignored_files) + # Sort the videos + print(f"Sorting {str(len(video_list))} videos from directory {directory.name}") + ignored_files = sort_videos(video_list, ignored_files) + # Update the ignore_file + with open(file= ignore_file, mode='w') as f: + json.dump(ignored_files, f) + +def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]: + # Init the previous_name for collision avoidance + previous_name: str = "" + for photo in photo_list: + # Read the exif data from the photo + try: + with Image.open(photo.resolve()) as image: + exif = image.getexif() + except OSError: + print(f"File {photo.name} could not be opened (permissions?), ignoring.") + continue + except UnidentifiedImageError: + print(f"File {photo.name} is not a valid image file. Ignoring.") + ignored_files.append(photo.name) + continue + # Format the date + if exif[ExifTags.Base.DateTimeOriginal] is not None and exif[ExifTags.Base.DateTimeOriginal] != '': + image_timestamp: struct_time = time.strptime( + data_string = exif[ExifTags.Base.DateTimeOriginal], + format = "%Y:%m:%d %H:%M:%S" + ) + image_year: str = time.strftime( + format = "%Y", + time_tuple = image_timestamp + ) + image_date_string: str = time.strftime( + format = "%Y-%m-%d", + time_tuple = image_timestamp + ) + image_time_string: str = time.strftime( + format = "%H-%M-%S", + time_tuple = image_timestamp + ) + else: + print(f"File {photo.name} does not have a valid creation date, ignoring.") + ignored_files.append(photo.name) + continue + # Set index_count for this iteration + index_count: int = 0 + # Format the new name + new_name: str = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" + while new_name == previous_name: + index_count += 1 + new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" + # Move the file + new_path: Path = photos_directory.joinpath(image_year, new_name).resolve() + if not new_path.exists(): + photo.rename(new_path) + else: + print(f"File {new_name} already exists, ignoring.") + ignored_files.append(photo.name) + # Update the previous_name for next iteration + previous_name = new_name + return ignored_files + +def sort_videos(video_list: list[Path], ignored_files: list[str]) -> list[str]: + print("Video sorting not implemened yet.") + return ignored_files def signal_handler(sig, frame): print('Shutdown signal received. Exiting gracefully.') sys.exit(0) +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) if __name__ == "__main__": main() \ No newline at end of file