#!/usr/bin/env python import time import signal import sys import json import ffmpeg from os import getenv from pathlib import Path from PIL import Image, ExifTags, UnidentifiedImageError # Set the directories source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')] photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve() recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve() photo_extensions: list[str] = ['.jpg', '.jpeg'] video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv'] sleep_duration: int = 30 def main(): # Check the source directories exist and create the ignore files if they don't exist for directory in source_directories: if not directory.exists() or not directory.is_dir(): print(f"Source directory {directory} does not exist or is not a directory. Exiting.") sys.exit(1) ignore_file: Path = directory.joinpath("ignored_files.json") if not ignore_file.exists(): print(f"Ignored files list {ignore_file} does not exist. Creating.") with open(ignore_file, 'w') as f: json.dump([], f) # Check the destination directories exist and are writable for directory in [photos_directory, recordings_directory]: if not directory.exists() or not directory.is_dir() : print(f"Destination directory {directory} does not exist or is not a directory. Exiting.") sys.exit(1) write_test_file: Path = directory.joinpath("write_test") try: write_test_file.touch() write_test_file.unlink() except PermissionError: print(f"Destination directory {directory} is not writable") sys.exit(1) # Start the main loop while True: for directory in source_directories: print(f"Starting sort of directory {directory}.") start_time = time.time() sort_count, ignore_count = sort_directory(directory = directory) end_time = time.time() print(f"Finished sort of directory {directory}.") print(f"Sorted {sort_count} files.") print(f"Added {ignore_count} files to the ignore list.") print(f"Processing took {end_time - start_time} seconds.") print(f"Sleeping for {sleep_duration} seconds.") time.sleep(sleep_duration) def sort_directory(directory: Path) -> tuple[int, int]: sort_count: int = 0 ignore_count: int = 0 # Load the ignored files list ignore_file = directory.joinpath("ignored_files.json") with open(file = ignore_file) as f: ignore_list: list[dict] = json.load(f) # Get all files in the directory file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()] # Filter out the ignored files sort_list = [file for file in file_list if file.name not in [item["name"] for item in ignore_list]] # Create a list of photo files photo_list = [file for file in sort_list if file.suffix in photo_extensions] # Create a list of video files video_list = [file for file in sort_list if file.suffix in video_extensions] print(f''' Found {str(len(file_list))} total files in directory {directory.name}. {str(len(ignore_list))} to ignore. {str(len(photo_list) + len(video_list))} to sort: {str(len(photo_list))} photos. {str(len(video_list))} videos. ''') # Sort the photos print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}") ignore_list, sort_count, ignore_count = sort_photos( photo_list = photo_list, ignore_list = ignore_list, sort_count = sort_count, ignore_count = ignore_count ) # Sort the videos print(f"Sorting {str(len(video_list))} videos from directory {directory.name}") ignore_list, sort_count, ignore_count = sort_videos( video_list = video_list, ignore_list = ignore_list, sort_count = sort_count, ignore_count = ignore_count ) # Update the ignore_file with open(file = ignore_file, mode='w') as f: json.dump(ignore_list, f) return (sort_count, ignore_count) def sort_photos(*, photo_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int ) -> tuple[list[dict], int, int]: # Init the previous_name for collision avoidance previous_name: str = "" for photo in photo_list: # Read the exif data from the photo try: with Image.open(photo.resolve()) as image: exif = image.getexif() except OSError: print(f"File {photo.name} could not be opened (permissions?), skipping.") continue except UnidentifiedImageError: print(f"File {photo.name} is not a valid image file. Ignoring.") ignore_list.append({"name": photo.name, "reason": "Not a valid image file"}) ignore_count += 1 continue timestamp = None if ExifTags.Base.DateTimeOriginal in exif.keys(): timestamp = time.strptime(exif[ExifTags.Base.DateTimeOriginal], "%Y:%m:%d %H:%M:%S") elif ExifTags.Base.DateTime in exif.keys(): timestamp = time.strptime(exif[ExifTags.Base.DateTime], "%Y:%m:%d %H:%M:%S") if timestamp is None: print(f"File {photo.name} does not have an exif timestamp, ignoring.") ignore_list.append({"name": photo.name, "reason": "No exif timestamp"}) ignore_count += 1 continue previous_name, ignore_list, sort_count, ignore_count = rename_file( file = photo, timestamp = timestamp, previous_name = previous_name, destination_directory = photos_directory, parent_suffix = "Photos", ignore_list = ignore_list, sort_count = sort_count, ignore_count = ignore_count ) return (ignore_list, sort_count, ignore_count) def sort_videos(*, video_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int ) -> tuple[list[dict], int, int]: previous_name: str = "" for video in video_list: try: meta_dict: dict = ffmpeg.probe(video) except OSError: print(f"File {video.name} could not be opened (permissions?), skipping.") continue except ffmpeg.Error: print(f"General error, file {video.name} could not be probed by ffmpeg, ignoring.") ignore_list.append({"name": video.name, "reason": "Can not be probed by ffmpeg"}) ignore_count += 1 continue timestamp = None if "format" in meta_dict.keys(): if "tags" in meta_dict["format"].keys(): if "creation_time" in meta_dict["format"]["tags"].keys(): timestamp = time.strptime(meta_dict["format"]["tags"]["creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ") if timestamp is None: print(f"File {video.name} does not have creation_time metadata, ignoring.") ignore_list.append({"name": video.name, "reason": "No creation_time metadata"}) ignore_count += 1 continue previous_name, ignore_list, sort_count, ignore_count = rename_file( file = video, timestamp = timestamp, previous_name = previous_name, destination_directory = recordings_directory, parent_suffix = "Recordings", ignore_list = ignore_list, sort_count = sort_count, ignore_count = ignore_count ) return (ignore_list, sort_count, ignore_count) def rename_file(*, file: Path, timestamp: time.struct_time, previous_name: str, destination_directory: Path, parent_suffix: str, ignore_list: list[dict], sort_count: int, ignore_count: int ) -> tuple[str, list[dict], int, int]: creation_year = time.strftime("%Y", timestamp) creation_date = time.strftime("%Y-%m-%d", timestamp) creation_time = time.strftime("%H-%M-%S", timestamp) name_index = 0 new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}" while new_file_name == previous_name: name_index += 1 new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}" new_path: Path = destination_directory.joinpath(f"{creation_year}-{parent_suffix}", new_file_name).resolve() # Ensure the parent year folder exists new_path.parent.mkdir(parents=True, exist_ok=True) # Move the file if not new_path.exists(): print(f"Moving {file.as_posix()} to {new_path.as_posix()}") file.rename(new_path) sort_count += 1 else: print(f"File {new_file_name} already exists, ignoring.") ignore_list.append({"name": file.name, "reason": "File already exists"}) ignore_count += 1 # Update the previous_name for next iteration return (new_file_name, ignore_list, sort_count, ignore_count) def signal_handler(sig, frame): print('Shutdown signal received. Exiting gracefully.') sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) if __name__ == "__main__": main()