240 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
import time
 | 
						|
import signal
 | 
						|
import sys
 | 
						|
import json
 | 
						|
import ffmpeg
 | 
						|
import shutil # For moving files
 | 
						|
from os import getenv
 | 
						|
from pathlib import Path
 | 
						|
from PIL import Image, ExifTags, UnidentifiedImageError
 | 
						|
 | 
						|
# Set the directories
 | 
						|
source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')]
 | 
						|
photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve()
 | 
						|
recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve()
 | 
						|
photo_extensions: list[str] = ['.jpg', '.jpeg']
 | 
						|
video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv']
 | 
						|
 | 
						|
def main():
 | 
						|
 | 
						|
    # Check the destination directories exist and are writable
 | 
						|
    for directory in [photos_directory, recordings_directory]:
 | 
						|
        if not directory.exists() or not directory.is_dir() :
 | 
						|
            print(f"Destination directory {directory} does not exist or is not a directory. Exiting.")
 | 
						|
            sys.exit(1)
 | 
						|
        write_test_file: Path = directory.joinpath("write_test")
 | 
						|
        try:
 | 
						|
            write_test_file.touch()
 | 
						|
            write_test_file.unlink()
 | 
						|
        except PermissionError:
 | 
						|
            print(f"Destination directory {directory} is not writable")
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
    # Check the source directories exist and create the ignore files if they don't exist
 | 
						|
    for directory in source_directories:
 | 
						|
        if not directory.exists() or not directory.is_dir():
 | 
						|
            print(f"Source directory {directory} does not exist or is not a directory. Exiting.")
 | 
						|
            sys.exit(1)
 | 
						|
        ignore_file: Path = directory.parent.joinpath(f"{directory.name}-ignore-file.json")
 | 
						|
        if not ignore_file.exists():
 | 
						|
            print(f"Ignored files list {ignore_file} does not exist. Creating.")
 | 
						|
            with open(ignore_file, 'w') as f:
 | 
						|
                json.dump([], f)
 | 
						|
        print(f"Starting sort of directory {directory}.")
 | 
						|
        sort_count, ignore_count = sort_directory(directory = directory, ignore_file = ignore_file)
 | 
						|
        print(f"Finished sort of directory {directory}.")
 | 
						|
        print(f"Sorted {sort_count} files.")
 | 
						|
        print(f"Added {ignore_count} files to the ignore list.")
 | 
						|
    print(f"Finished sorting {len(source_directories)} directories, exiting.")
 | 
						|
 | 
						|
def sort_directory(directory: Path, ignore_file: Path) -> tuple[int, int]:
 | 
						|
    sort_count: int = 0
 | 
						|
    ignore_count: int = 0
 | 
						|
    # Load the ignored files list
 | 
						|
    with open(file = ignore_file) as f:
 | 
						|
        ignore_list: list[dict] = json.load(f)
 | 
						|
    # Get all files in the directory
 | 
						|
    file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()]
 | 
						|
    # Filter out the ignored files
 | 
						|
    sort_list = [file for file in file_list if file.name not in [item["name"] for item in ignore_list]]
 | 
						|
    # Create a list of photo files
 | 
						|
    photo_list = [file for file in sort_list if file.suffix in photo_extensions]
 | 
						|
    # Create a list of video files
 | 
						|
    video_list = [file for file in sort_list if file.suffix in video_extensions]
 | 
						|
    print(f'''
 | 
						|
        Found {str(len(file_list))} total files in directory {directory.name}.
 | 
						|
            {str(len(ignore_list))} to ignore.
 | 
						|
            {str(len(photo_list) + len(video_list))} to sort:
 | 
						|
                {str(len(photo_list))} photos.
 | 
						|
                {str(len(video_list))} videos.
 | 
						|
    ''')
 | 
						|
    # Sort the photos
 | 
						|
    print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
 | 
						|
    ignore_list, sort_count, ignore_count = sort_photos(
 | 
						|
        photo_list = photo_list, 
 | 
						|
        ignore_list = ignore_list, 
 | 
						|
        sort_count = sort_count, 
 | 
						|
        ignore_count = ignore_count
 | 
						|
    )
 | 
						|
    # Sort the videos 
 | 
						|
    print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
 | 
						|
    ignore_list, sort_count, ignore_count = sort_videos(
 | 
						|
        video_list = video_list, 
 | 
						|
        ignore_list = ignore_list, 
 | 
						|
        sort_count = sort_count, 
 | 
						|
        ignore_count = ignore_count
 | 
						|
    )
 | 
						|
    # Update the ignore_file
 | 
						|
    with open(file = ignore_file, mode='w') as f:
 | 
						|
        json.dump(ignore_list, f)
 | 
						|
    return (sort_count, ignore_count)
 | 
						|
 | 
						|
def sort_photos(*,
 | 
						|
        photo_list: list[Path], 
 | 
						|
        ignore_list: list[dict], 
 | 
						|
        sort_count: int, 
 | 
						|
        ignore_count: int
 | 
						|
    ) -> tuple[list[dict], int, int]:
 | 
						|
    # Init the previous_name for collision avoidance
 | 
						|
    previous_name: str = ""
 | 
						|
    for photo in photo_list:
 | 
						|
        # Read the exif data from the photo
 | 
						|
        try:
 | 
						|
            with Image.open(photo.resolve()) as image:
 | 
						|
                exif = image.getexif()
 | 
						|
        except OSError:
 | 
						|
            print(f"File {photo.name} could not be opened (permissions?), skipping.")
 | 
						|
            ignore_list.append({"name": photo.name, "reason": "Permissions error"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        except UnidentifiedImageError:
 | 
						|
            print(f"File {photo.name} is not a valid image file. Ignoring.")
 | 
						|
            ignore_list.append({"name": photo.name, "reason": "Not a valid image file"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        timestamp = None
 | 
						|
        if ExifTags.Base.DateTimeOriginal in exif.keys():
 | 
						|
            try:
 | 
						|
                timestamp = time.strptime(exif[ExifTags.Base.DateTimeOriginal], "%Y:%m:%d %H:%M:%S")
 | 
						|
            except:
 | 
						|
                print(f"File {photo.name} has invalid timestamp format, ignoring.")
 | 
						|
                ignore_list.append({"name": photo.name, "reason": "Invalid exif timestamp"})
 | 
						|
                ignore_count += 1
 | 
						|
                continue
 | 
						|
        elif ExifTags.Base.DateTime in exif.keys():
 | 
						|
            try:
 | 
						|
                timestamp = time.strptime(exif[ExifTags.Base.DateTime], "%Y:%m:%d %H:%M:%S")
 | 
						|
            except:
 | 
						|
                print(f"File {photo.name} has invalid timestamp format, ignoring.")
 | 
						|
                ignore_list.append({"name": photo.name, "reason": "Invalid exif timestamp"})
 | 
						|
                ignore_count += 1
 | 
						|
                continue
 | 
						|
        if timestamp is None:
 | 
						|
            print(f"File {photo.name} does not have an exif timestamp, ignoring.")
 | 
						|
            ignore_list.append({"name": photo.name, "reason": "No exif timestamp"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        previous_name, ignore_list, sort_count, ignore_count = copy_file(
 | 
						|
            file = photo,
 | 
						|
            timestamp = timestamp,
 | 
						|
            previous_name = previous_name,
 | 
						|
            destination_directory = photos_directory,
 | 
						|
            parent_suffix = "Photos",
 | 
						|
            ignore_list = ignore_list,
 | 
						|
            sort_count = sort_count,
 | 
						|
            ignore_count = ignore_count
 | 
						|
        )
 | 
						|
    return (ignore_list, sort_count, ignore_count)
 | 
						|
 | 
						|
def sort_videos(*,
 | 
						|
        video_list: list[Path], 
 | 
						|
        ignore_list: list[dict], 
 | 
						|
        sort_count: int, 
 | 
						|
        ignore_count: int
 | 
						|
    ) -> tuple[list[dict], int, int]:
 | 
						|
    previous_name: str = ""
 | 
						|
    for video in video_list:
 | 
						|
        try:
 | 
						|
            meta_dict: dict = ffmpeg.probe(video)
 | 
						|
        except OSError:
 | 
						|
            print(f"File {video.name} could not be opened (permissions?), skipping.")
 | 
						|
            ignore_list.append({"name": video.name, "reason": "Permissions error"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        except ffmpeg.Error:
 | 
						|
            print(f"General error, file {video.name} could not be probed by ffmpeg, ignoring.")
 | 
						|
            ignore_list.append({"name": video.name, "reason": "Can not be probed by ffmpeg"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        timestamp = None
 | 
						|
        if "format" in meta_dict.keys():
 | 
						|
            if "tags" in meta_dict["format"].keys():
 | 
						|
                if "creation_time" in meta_dict["format"]["tags"].keys():
 | 
						|
                    try:
 | 
						|
                        timestamp = time.strptime(meta_dict["format"]["tags"]["creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ")
 | 
						|
                    except:
 | 
						|
                        print(f"{video.name} has invalid timestamp format, ignoring.")
 | 
						|
                        ignore_list.append({"name": video.name, "reason": "Invalid creation_time format"})
 | 
						|
                        ignore_count += 1
 | 
						|
                        continue
 | 
						|
        if timestamp is None:
 | 
						|
            print(f"File {video.name} does not have creation_time metadata, ignoring.")
 | 
						|
            ignore_list.append({"name": video.name, "reason": "No creation_time metadata"})
 | 
						|
            ignore_count += 1
 | 
						|
            continue
 | 
						|
        previous_name, ignore_list, sort_count, ignore_count = copy_file(
 | 
						|
            file = video,
 | 
						|
            timestamp = timestamp,
 | 
						|
            previous_name = previous_name,
 | 
						|
            destination_directory = recordings_directory,
 | 
						|
            parent_suffix = "Recordings",
 | 
						|
            ignore_list = ignore_list,
 | 
						|
            sort_count = sort_count,
 | 
						|
            ignore_count = ignore_count
 | 
						|
        )   
 | 
						|
    return (ignore_list, sort_count, ignore_count)
 | 
						|
 | 
						|
def copy_file(*,
 | 
						|
        file: Path, 
 | 
						|
        timestamp: time.struct_time, 
 | 
						|
        previous_name: str, 
 | 
						|
        destination_directory: Path,
 | 
						|
        parent_suffix: str,
 | 
						|
        ignore_list: list[dict],
 | 
						|
        sort_count: int,
 | 
						|
        ignore_count: int    
 | 
						|
    ) -> tuple[str, list[dict], int, int]:
 | 
						|
    creation_year = time.strftime("%Y", timestamp)
 | 
						|
    creation_date = time.strftime("%Y-%m-%d", timestamp)
 | 
						|
    creation_time = time.strftime("%H-%M-%S", timestamp)
 | 
						|
    name_index = 0
 | 
						|
    new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
 | 
						|
    while new_file_name == previous_name:
 | 
						|
        name_index += 1
 | 
						|
        new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
 | 
						|
    new_path: Path = destination_directory.joinpath(f"{creation_year}-{parent_suffix}", new_file_name).resolve()
 | 
						|
    # Ensure the parent year folder exists
 | 
						|
    new_path.parent.mkdir(parents=True, exist_ok=True)
 | 
						|
    # Move the file
 | 
						|
    if not new_path.exists():
 | 
						|
        print(f"Copying {file.as_posix()} to {new_path.as_posix()}")
 | 
						|
        shutil.copy2(src=file, dst=new_path)
 | 
						|
        ignore_list.append({"name": file.name, "reason": "Already copied"})
 | 
						|
        sort_count += 1
 | 
						|
    else:
 | 
						|
        print(f"File {new_file_name} already exists, ignoring.")
 | 
						|
        ignore_list.append({"name": file.name, "reason": "File already exists"})
 | 
						|
        ignore_count += 1
 | 
						|
    # Update the previous_name for next iteration
 | 
						|
    return (new_file_name, ignore_list, sort_count, ignore_count)
 | 
						|
 | 
						|
def signal_handler(sig, frame):
 | 
						|
            print('Shutdown signal received. Exiting gracefully.')
 | 
						|
            sys.exit(0)
 | 
						|
 | 
						|
signal.signal(signal.SIGINT, signal_handler)
 | 
						|
signal.signal(signal.SIGTERM, signal_handler)
 | 
						|
if __name__ == "__main__":
 | 
						|
    main() |