From d325e201cdc86cdb5050d73c756b181f50ff224b Mon Sep 17 00:00:00 2001 From: = <=> Date: Sat, 23 Nov 2024 12:08:07 -0500 Subject: [PATCH] video sorting added --- upload-sorter/requirements.txt | 1 + upload-sorter/sort.py | 162 +++++++++++++++++++++++---------- 2 files changed, 116 insertions(+), 47 deletions(-) diff --git a/upload-sorter/requirements.txt b/upload-sorter/requirements.txt index 64d521a..c7ed5fd 100644 --- a/upload-sorter/requirements.txt +++ b/upload-sorter/requirements.txt @@ -1,2 +1,3 @@ pillow +# ffmpeg ffmpeg-python \ No newline at end of file diff --git a/upload-sorter/sort.py b/upload-sorter/sort.py index 250f721..b7f92de 100755 --- a/upload-sorter/sort.py +++ b/upload-sorter/sort.py @@ -4,6 +4,7 @@ import time import signal import sys import json +import ffmpeg from os import getenv from pathlib import Path from PIL import Image, ExifTags, UnidentifiedImageError @@ -45,24 +46,28 @@ def main(): # Start the main loop while True: for directory in source_directories: - print(f"Starting sort of directory {directory}") + print(f"Starting sort of directory {directory}.") start_time = time.time() - sort_directory(directory = directory) + sort_count, ignore_count = sort_directory(directory = directory) end_time = time.time() - print(f"Finished sorting directory {directory}") - print(f"Time taken: {end_time - start_time} seconds") - print(f"Sleeping for {sleep_duration} seconds") + print(f"Finished sort of directory {directory}.") + print(f"Sorted {sort_count} files.") + print(f"Added {ignore_count} files to the ignore list.") + print(f"Processing took {end_time - start_time} seconds.") + print(f"Sleeping for {sleep_duration} seconds.") time.sleep(sleep_duration) -def sort_directory(directory: Path): +def sort_directory(directory: Path) -> tuple[int, int]: + sort_count: int = 0 + ignore_count: int = 0 # Load the ignored files list ignore_file = directory.joinpath("ignored_files.json") with open(file = ignore_file) as f: - ignore_list: list[str] = json.load(f) + ignore_list: list[dict] = json.load(f) # Get all files in the directory file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()] # Filter out the ignored files - sort_list = [file for file in file_list if file.name not in ignore_list] + sort_list = [file for file in file_list if file.name not in [item["name"] for item in ignore_list]] # Create a list of photo files photo_list = [file for file in sort_list if file.suffix in photo_extensions] # Create a list of video files @@ -76,15 +81,26 @@ def sort_directory(directory: Path): ''') # Sort the photos print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}") - ignore_list = sort_photos(photo_list, ignore_list) + ignore_list, sort_count, ignore_count = sort_photos( + photo_list = photo_list, + ignore_list = ignore_list, + sort_count = sort_count, + ignore_count = ignore_count + ) # Sort the videos print(f"Sorting {str(len(video_list))} videos from directory {directory.name}") - ignore_list = sort_videos(video_list, ignore_list) + ignore_list, sort_count, ignore_count = sort_videos( + video_list = video_list, + ignore_list = ignore_list, + sort_count = sort_count, + ignore_count = ignore_count + ) # Update the ignore_file with open(file = ignore_file, mode='w') as f: json.dump(ignore_list, f) + return (sort_count, ignore_count) -def sort_photos(photo_list: list[Path], ignore_list: list[str]) -> list[str]: +def sort_photos(photo_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int) -> tuple[list[dict], int, int]: # Init the previous_name for collision avoidance previous_name: str = "" for photo in photo_list: @@ -93,50 +109,102 @@ def sort_photos(photo_list: list[Path], ignore_list: list[str]) -> list[str]: with Image.open(photo.resolve()) as image: exif = image.getexif() except OSError: - print(f"File {photo.name} could not be opened (permissions?), ignoring.") + print(f"File {photo.name} could not be opened (permissions?), skipping.") continue except UnidentifiedImageError: print(f"File {photo.name} is not a valid image file. Ignoring.") - ignore_list.append(photo.name) + ignore_list.append({"name": photo.name, "reason": "Not a valid image file"}) + ignore_count += 1 continue + timestamp = None if ExifTags.Base.DateTimeOriginal in exif.keys(): - image_date_time_string = exif[ExifTags.Base.DateTimeOriginal] + timestamp = time.strptime(exif[ExifTags.Base.DateTimeOriginal], "%Y:%m:%d %H:%M:%S") elif ExifTags.Base.DateTime in exif.keys(): - image_date_time_string = exif[ExifTags.Base.DateTime] - else: - image_date_time_string = None - if image_date_time_string is None or image_date_time_string == '': - print(f"File {photo.name} does not have a valid exif timestamp, ignoring.") - ignore_list.append(photo.name) + timestamp = time.strptime(exif[ExifTags.Base.DateTime], "%Y:%m:%d %H:%M:%S") + if timestamp is None: + print(f"File {photo.name} does not have an exif timestamp, ignoring.") + ignore_list.append({"name": photo.name, "reason": "No exif timestamp"}) + ignore_count += 1 continue - image_timestamp = time.strptime(image_date_time_string, "%Y:%m:%d %H:%M:%S") - image_year: str = time.strftime("%Y", image_timestamp) - image_date_string: str = time.strftime("%Y-%m-%d", image_timestamp) - image_time_string: str = time.strftime("%H-%M-%S", image_timestamp) - # Set index_count for this iteration - index_count: int = 0 - # Format the new name - new_name: str = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" - while new_name == previous_name: - index_count += 1 - new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg" - new_path: Path = photos_directory.joinpath(f"{image_year}-Photos", new_name).resolve() - # Ensure the parent year folder exists - new_path.parent.mkdir(parents=True, exist_ok=True) - # Move the file - if not new_path.exists(): - print(f"Moving {photo.as_posix()} to {new_path.as_posix()}") - photo.rename(new_path) - else: - print(f"File {new_name} already exists, ignoring.") - ignore_list.append(photo.name) - # Update the previous_name for next iteration - previous_name = new_name - return ignore_list + previous_name, ignore_list, sort_count, ignore_count = rename_file( + file = photo, + timestamp = timestamp, + previous_name = previous_name, + destination_directory = photos_directory, + parent_suffix = "Photos", + ignore_list = ignore_list, + sort_count = sort_count, + ignore_count = ignore_count + ) + return (ignore_list, sort_count, ignore_count) -def sort_videos(video_list: list[Path], ignore_list: list[str]) -> list[str]: - print("Video sorting not implemened yet.") - return ignore_list +def sort_videos(video_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int) -> tuple[list[dict], int, int]: + previous_name: str = "" + for video in video_list: + try: + meta_dict: dict = ffmpeg.probe(video) + except OSError: + print(f"File {video.name} could not be opened (permissions?), skipping.") + continue + except ffmpeg.Error: + print(f"General error, file {video.name} could not be probed by ffmpeg, ignoring.") + ignore_list.append({"name": video.name, "reason": "Can not be probed by ffmpeg"}) + ignore_count += 1 + continue + timestamp = None + if "format" in meta_dict.keys(): + if "tags" in meta_dict["format"].keys(): + if "creation_time" in meta_dict["format"]["tags"].keys(): + timestamp = time.strptime(meta_dict["format"]["tags"]["creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ") + if timestamp is None: + print(f"File {video.name} does not have creation_time metadata, ignoring.") + ignore_list.append({"name": video.name, "reason": "No creation_time metadata"}) + ignore_count += 1 + continue + previous_name, ignore_list, sort_count, ignore_count = rename_file( + file = video, + timestamp = timestamp, + previous_name = previous_name, + destination_directory = recordings_directory, + parent_suffix = "Recordings", + ignore_list = ignore_list, + sort_count = sort_count, + ignore_count = ignore_count + ) + return (ignore_list, sort_count, ignore_count) + +def rename_file(*, + file: Path, + timestamp: time.struct_time, + previous_name: str, + destination_directory: Path, + parent_suffix: str, + ignore_list: list[dict], + sort_count: int, + ignore_count: int + ) -> tuple[str, list[dict], int, int]: + creation_year = time.strftime("%Y", timestamp) + creation_date = time.strftime("%Y-%m-%d", timestamp) + creation_time = time.strftime("%H-%M-%S", timestamp) + name_index = 0 + new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}" + while new_file_name == previous_name: + name_index += 1 + new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}" + new_path: Path = destination_directory.joinpath(f"{creation_year}-{parent_suffix}", new_file_name).resolve() + # Ensure the parent year folder exists + new_path.parent.mkdir(parents=True, exist_ok=True) + # Move the file + if not new_path.exists(): + print(f"Moving {file.as_posix()} to {new_path.as_posix()}") + file.rename(new_path) + sort_count += 1 + else: + print(f"File {new_file_name} already exists, ignoring.") + ignore_list.append({"name": file.name, "reason": "File already exists"}) + ignore_count += 1 + # Update the previous_name for next iteration + return (new_file_name, ignore_list, sort_count, ignore_count) def signal_handler(sig, frame): print('Shutdown signal received. Exiting gracefully.')