photo sort working
This commit is contained in:
		
							
								
								
									
										73
									
								
								upload-sorter/sort.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							
							
						
						
									
										73
									
								
								upload-sorter/sort.py
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							@@ -10,8 +10,8 @@ from PIL import Image, ExifTags, UnidentifiedImageError
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
# Set the directories
 | 
					# Set the directories
 | 
				
			||||||
source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')]
 | 
					source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')]
 | 
				
			||||||
recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve()
 | 
					 | 
				
			||||||
photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve()
 | 
					photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve()
 | 
				
			||||||
 | 
					recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve()
 | 
				
			||||||
photo_extensions: list[str] = ['.jpg', '.jpeg']
 | 
					photo_extensions: list[str] = ['.jpg', '.jpeg']
 | 
				
			||||||
video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv']
 | 
					video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv']
 | 
				
			||||||
sleep_duration: int = 30
 | 
					sleep_duration: int = 30
 | 
				
			||||||
@@ -21,15 +21,16 @@ def main():
 | 
				
			|||||||
    # Check the source directories exist and create the ignore files if they don't exist
 | 
					    # Check the source directories exist and create the ignore files if they don't exist
 | 
				
			||||||
    for directory in source_directories:
 | 
					    for directory in source_directories:
 | 
				
			||||||
        if not directory.exists() or not directory.is_dir():
 | 
					        if not directory.exists() or not directory.is_dir():
 | 
				
			||||||
            print(f"Directory {directory} does not exist or is not a directory. Exiting.")
 | 
					            print(f"Source directory {directory} does not exist or is not a directory. Exiting.")
 | 
				
			||||||
            sys.exit(1)
 | 
					            sys.exit(1)
 | 
				
			||||||
        ignore_file: Path = directory.joinpath("ignored_files.json")
 | 
					        ignore_file: Path = directory.joinpath("ignored_files.json")
 | 
				
			||||||
        if not ignore_file.exists():
 | 
					        if not ignore_file.exists():
 | 
				
			||||||
            with open('ignored_files.json', 'w') as f:
 | 
					            print(f"Ignored files list {ignore_file} does not exist. Creating.")
 | 
				
			||||||
 | 
					            with open(ignore_file, 'w') as f:
 | 
				
			||||||
                json.dump([], f)
 | 
					                json.dump([], f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Check the destination directories exist and are writable
 | 
					    # Check the destination directories exist and are writable
 | 
				
			||||||
    for directory in [recordings_directory, photos_directory]:
 | 
					    for directory in [photos_directory, recordings_directory]:
 | 
				
			||||||
        if not directory.exists() or not directory.is_dir() :
 | 
					        if not directory.exists() or not directory.is_dir() :
 | 
				
			||||||
            print(f"Destination directory {directory} does not exist or is not a directory. Exiting.")
 | 
					            print(f"Destination directory {directory} does not exist or is not a directory. Exiting.")
 | 
				
			||||||
            sys.exit(1)
 | 
					            sys.exit(1)
 | 
				
			||||||
@@ -41,13 +42,6 @@ def main():
 | 
				
			|||||||
            print(f"Destination directory {directory} is not writable")
 | 
					            print(f"Destination directory {directory} is not writable")
 | 
				
			||||||
            sys.exit(1)
 | 
					            sys.exit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Ensure folders for the current year exist
 | 
					 | 
				
			||||||
    current_year: str = time.strftime("%Y")
 | 
					 | 
				
			||||||
    for directory in [recordings_directory, photos_directory]:
 | 
					 | 
				
			||||||
        year_directory: Path = directory.joinpath(current_year)
 | 
					 | 
				
			||||||
        if not year_directory.exists():
 | 
					 | 
				
			||||||
            year_directory.mkdir()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # Start the main loop
 | 
					    # Start the main loop
 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
        for directory in source_directories:
 | 
					        for directory in source_directories:
 | 
				
			||||||
@@ -76,21 +70,21 @@ def sort_directory(directory: Path):
 | 
				
			|||||||
    print(f'''
 | 
					    print(f'''
 | 
				
			||||||
        Found {str(len(file_list))} total files in directory {directory.name}.
 | 
					        Found {str(len(file_list))} total files in directory {directory.name}.
 | 
				
			||||||
            {str(len(ignore_list))} to ignore.
 | 
					            {str(len(ignore_list))} to ignore.
 | 
				
			||||||
            {str(len(sort_list))} to sort.
 | 
					            {str(len(photo_list) + len(video_list))} to sort:
 | 
				
			||||||
                {str(len(photo_list))} photos.
 | 
					                {str(len(photo_list))} photos.
 | 
				
			||||||
                {str(len(video_list))} videos.
 | 
					                {str(len(video_list))} videos.
 | 
				
			||||||
    ''')
 | 
					    ''')
 | 
				
			||||||
    # Sort the photos
 | 
					    # Sort the photos
 | 
				
			||||||
    print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
 | 
					    print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
 | 
				
			||||||
    ignored_files = sort_photos(photo_list, ignored_files)
 | 
					    ignore_list = sort_photos(photo_list, ignore_list)
 | 
				
			||||||
    # Sort the videos 
 | 
					    # Sort the videos 
 | 
				
			||||||
    print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
 | 
					    print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
 | 
				
			||||||
    ignored_files = sort_videos(video_list, ignored_files)
 | 
					    ignore_list = sort_videos(video_list, ignore_list)
 | 
				
			||||||
    # Update the ignore_file
 | 
					    # Update the ignore_file
 | 
				
			||||||
    with open(file = ignore_file, mode='w') as f:
 | 
					    with open(file = ignore_file, mode='w') as f:
 | 
				
			||||||
        json.dump(ignored_files, f)
 | 
					        json.dump(ignore_list, f)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]:
 | 
					def sort_photos(photo_list: list[Path], ignore_list: list[str]) -> list[str]:
 | 
				
			||||||
    # Init the previous_name for collision avoidance
 | 
					    # Init the previous_name for collision avoidance
 | 
				
			||||||
    previous_name: str = ""
 | 
					    previous_name: str = ""
 | 
				
			||||||
    for photo in photo_list:
 | 
					    for photo in photo_list:
 | 
				
			||||||
@@ -103,30 +97,22 @@ def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]:
 | 
				
			|||||||
            continue
 | 
					            continue
 | 
				
			||||||
        except UnidentifiedImageError:
 | 
					        except UnidentifiedImageError:
 | 
				
			||||||
            print(f"File {photo.name} is not a valid image file. Ignoring.")
 | 
					            print(f"File {photo.name} is not a valid image file. Ignoring.")
 | 
				
			||||||
            ignored_files.append(photo.name)
 | 
					            ignore_list.append(photo.name)
 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
        # Format the date
 | 
					        if ExifTags.Base.DateTimeOriginal in exif.keys():
 | 
				
			||||||
        if exif[ExifTags.Base.DateTimeOriginal] is not None and exif[ExifTags.Base.DateTimeOriginal] != '':
 | 
					            image_date_time_string = exif[ExifTags.Base.DateTimeOriginal]
 | 
				
			||||||
            image_timestamp = time.strptime(
 | 
					        elif ExifTags.Base.DateTime in exif.keys():
 | 
				
			||||||
                data_string = exif[ExifTags.Base.DateTimeOriginal],
 | 
					            image_date_time_string = exif[ExifTags.Base.DateTime]
 | 
				
			||||||
                format = "%Y:%m:%d %H:%M:%S"
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            image_year: str = time.strftime(
 | 
					 | 
				
			||||||
                format = "%Y",
 | 
					 | 
				
			||||||
                time_tuple = image_timestamp
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            image_date_string: str = time.strftime(
 | 
					 | 
				
			||||||
                format = "%Y-%m-%d",
 | 
					 | 
				
			||||||
                time_tuple = image_timestamp
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            image_time_string: str = time.strftime(
 | 
					 | 
				
			||||||
                format = "%H-%M-%S",
 | 
					 | 
				
			||||||
                time_tuple = image_timestamp
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            print(f"File {photo.name} does not have a valid creation date, ignoring.")
 | 
					           image_date_time_string = None
 | 
				
			||||||
            ignored_files.append(photo.name)
 | 
					        if image_date_time_string is None or image_date_time_string == '':
 | 
				
			||||||
 | 
					            print(f"File {photo.name} does not have a valid exif timestamp, ignoring.")
 | 
				
			||||||
 | 
					            ignore_list.append(photo.name)
 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
 | 
					        image_timestamp = time.strptime(image_date_time_string, "%Y:%m:%d %H:%M:%S")
 | 
				
			||||||
 | 
					        image_year: str = time.strftime("%Y", image_timestamp)
 | 
				
			||||||
 | 
					        image_date_string: str = time.strftime("%Y-%m-%d", image_timestamp)
 | 
				
			||||||
 | 
					        image_time_string: str = time.strftime("%H-%M-%S", image_timestamp)
 | 
				
			||||||
        # Set index_count for this iteration
 | 
					        # Set index_count for this iteration
 | 
				
			||||||
        index_count: int = 0
 | 
					        index_count: int = 0
 | 
				
			||||||
        # Format the new name
 | 
					        # Format the new name
 | 
				
			||||||
@@ -134,20 +120,23 @@ def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]:
 | 
				
			|||||||
        while new_name == previous_name:
 | 
					        while new_name == previous_name:
 | 
				
			||||||
            index_count += 1
 | 
					            index_count += 1
 | 
				
			||||||
            new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
 | 
					            new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
 | 
				
			||||||
        # Move the file
 | 
					 | 
				
			||||||
        new_path: Path = photos_directory.joinpath(image_year, new_name).resolve()
 | 
					        new_path: Path = photos_directory.joinpath(image_year, new_name).resolve()
 | 
				
			||||||
 | 
					        # Ensure the parent year folder exists
 | 
				
			||||||
 | 
					        new_path.parent.mkdir(parents=True, exist_ok=True)
 | 
				
			||||||
 | 
					        # Move the file
 | 
				
			||||||
        if not new_path.exists():
 | 
					        if not new_path.exists():
 | 
				
			||||||
 | 
					            print(f"Moving {photo.as_posix()} to {new_path.as_posix()}")
 | 
				
			||||||
            photo.rename(new_path)
 | 
					            photo.rename(new_path)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            print(f"File {new_name} already exists, ignoring.")
 | 
					            print(f"File {new_name} already exists, ignoring.")
 | 
				
			||||||
            ignored_files.append(photo.name)
 | 
					            ignore_list.append(photo.name)
 | 
				
			||||||
        # Update the previous_name for next iteration
 | 
					        # Update the previous_name for next iteration
 | 
				
			||||||
        previous_name = new_name
 | 
					        previous_name = new_name
 | 
				
			||||||
    return ignored_files
 | 
					    return ignore_list
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def sort_videos(video_list: list[Path], ignored_files: list[str]) -> list[str]:
 | 
					def sort_videos(video_list: list[Path], ignore_list: list[str]) -> list[str]:
 | 
				
			||||||
    print("Video sorting not implemened yet.")
 | 
					    print("Video sorting not implemened yet.")
 | 
				
			||||||
    return ignored_files
 | 
					    return ignore_list
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def signal_handler(sig, frame):
 | 
					def signal_handler(sig, frame):
 | 
				
			||||||
            print('Shutdown signal received. Exiting gracefully.')
 | 
					            print('Shutdown signal received. Exiting gracefully.')
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user