1
0
2024-12-30 01:19:50 -05:00

249 lines
10 KiB
Python
Executable File

#!/usr/bin/env python
import time
import signal
import sys
import json
import ffmpeg
import shutil # For moving files
from os import getenv
from pathlib import Path
from PIL import Image, ExifTags, UnidentifiedImageError
# Set the directories
source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')]
photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve()
recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve()
photo_extensions: list[str] = ['.jpg', '.jpeg']
video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv']
sleep_duration: int = 30
def main():
# Check the source directories exist and create the ignore files if they don't exist
for directory in source_directories:
if not directory.exists() or not directory.is_dir():
print(f"Source directory {directory} does not exist or is not a directory. Exiting.")
sys.exit(1)
ignore_file: Path = directory.joinpath("ignored_files.json")
if not ignore_file.exists():
print(f"Ignored files list {ignore_file} does not exist. Creating.")
with open(ignore_file, 'w') as f:
json.dump([], f)
# Check the destination directories exist and are writable
for directory in [photos_directory, recordings_directory]:
if not directory.exists() or not directory.is_dir() :
print(f"Destination directory {directory} does not exist or is not a directory. Exiting.")
sys.exit(1)
write_test_file: Path = directory.joinpath("write_test")
try:
write_test_file.touch()
write_test_file.unlink()
except PermissionError:
print(f"Destination directory {directory} is not writable")
sys.exit(1)
# Start the main loop
while True:
for directory in source_directories:
print(f"Starting sort of directory {directory}.")
start_time = time.time()
sort_count, ignore_count = sort_directory(directory = directory)
end_time = time.time()
print(f"Finished sort of directory {directory}.")
print(f"Sorted {sort_count} files.")
print(f"Added {ignore_count} files to the ignore list.")
print(f"Processing took {end_time - start_time} seconds.")
print(f"Sleeping for {sleep_duration} seconds.")
time.sleep(sleep_duration)
def sort_directory(directory: Path) -> tuple[int, int]:
sort_count: int = 0
ignore_count: int = 0
# Load the ignored files list
ignore_file = directory.joinpath("ignored_files.json")
with open(file = ignore_file) as f:
ignore_list: list[dict] = json.load(f)
# Get all files in the directory
file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()]
# Filter out the ignored files
sort_list = [file for file in file_list if file.name not in [item["name"] for item in ignore_list]]
# Create a list of photo files
photo_list = [file for file in sort_list if file.suffix in photo_extensions]
# Create a list of video files
video_list = [file for file in sort_list if file.suffix in video_extensions]
print(f'''
Found {str(len(file_list))} total files in directory {directory.name}.
{str(len(ignore_list))} to ignore.
{str(len(photo_list) + len(video_list))} to sort:
{str(len(photo_list))} photos.
{str(len(video_list))} videos.
''')
# Sort the photos
print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
ignore_list, sort_count, ignore_count = sort_photos(
photo_list = photo_list,
ignore_list = ignore_list,
sort_count = sort_count,
ignore_count = ignore_count
)
# Sort the videos
print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
ignore_list, sort_count, ignore_count = sort_videos(
video_list = video_list,
ignore_list = ignore_list,
sort_count = sort_count,
ignore_count = ignore_count
)
# Update the ignore_file
with open(file = ignore_file, mode='w') as f:
json.dump(ignore_list, f)
return (sort_count, ignore_count)
def sort_photos(*,
photo_list: list[Path],
ignore_list: list[dict],
sort_count: int,
ignore_count: int
) -> tuple[list[dict], int, int]:
# Init the previous_name for collision avoidance
previous_name: str = ""
for photo in photo_list:
# Read the exif data from the photo
try:
with Image.open(photo.resolve()) as image:
exif = image.getexif()
except OSError:
print(f"File {photo.name} could not be opened (permissions?), skipping.")
ignore_list.append({"name": photo.name, "reason": "Permissions error"})
ignore_count += 1
continue
except UnidentifiedImageError:
print(f"File {photo.name} is not a valid image file. Ignoring.")
ignore_list.append({"name": photo.name, "reason": "Not a valid image file"})
ignore_count += 1
continue
timestamp = None
if ExifTags.Base.DateTimeOriginal in exif.keys():
try:
timestamp = time.strptime(exif[ExifTags.Base.DateTimeOriginal], "%Y:%m:%d %H:%M:%S")
except:
print(f"File {photo.name} has invalid timestamp format, ignoring.")
ignore_list.append({"name": photo.name, "reason": "Invalid exif timestamp"})
ignore_count += 1
continue
elif ExifTags.Base.DateTime in exif.keys():
try:
timestamp = time.strptime(exif[ExifTags.Base.DateTime], "%Y:%m:%d %H:%M:%S")
except:
print(f"File {photo.name} has invalid timestamp format, ignoring.")
ignore_list.append({"name": photo.name, "reason": "Invalid exif timestamp"})
ignore_count += 1
continue
if timestamp is None:
print(f"File {photo.name} does not have an exif timestamp, ignoring.")
ignore_list.append({"name": photo.name, "reason": "No exif timestamp"})
ignore_count += 1
continue
previous_name, ignore_list, sort_count, ignore_count = rename_file(
file = photo,
timestamp = timestamp,
previous_name = previous_name,
destination_directory = photos_directory,
parent_suffix = "Photos",
ignore_list = ignore_list,
sort_count = sort_count,
ignore_count = ignore_count
)
return (ignore_list, sort_count, ignore_count)
def sort_videos(*,
video_list: list[Path],
ignore_list: list[dict],
sort_count: int,
ignore_count: int
) -> tuple[list[dict], int, int]:
previous_name: str = ""
for video in video_list:
try:
meta_dict: dict = ffmpeg.probe(video)
except OSError:
print(f"File {video.name} could not be opened (permissions?), skipping.")
ignore_list.append({"name": video.name, "reason": "Permissions error"})
ignore_count += 1
continue
except ffmpeg.Error:
print(f"General error, file {video.name} could not be probed by ffmpeg, ignoring.")
ignore_list.append({"name": video.name, "reason": "Can not be probed by ffmpeg"})
ignore_count += 1
continue
timestamp = None
if "format" in meta_dict.keys():
if "tags" in meta_dict["format"].keys():
if "creation_time" in meta_dict["format"]["tags"].keys():
try:
timestamp = time.strptime(meta_dict["format"]["tags"]["creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ")
except:
print(f"{video.name} has invalid timestamp format, ignoring.")
ignore_list.append({"name": video.name, "reason": "Invalid creation_time format"})
ignore_count += 1
continue
if timestamp is None:
print(f"File {video.name} does not have creation_time metadata, ignoring.")
ignore_list.append({"name": video.name, "reason": "No creation_time metadata"})
ignore_count += 1
continue
previous_name, ignore_list, sort_count, ignore_count = rename_file(
file = video,
timestamp = timestamp,
previous_name = previous_name,
destination_directory = recordings_directory,
parent_suffix = "Recordings",
ignore_list = ignore_list,
sort_count = sort_count,
ignore_count = ignore_count
)
return (ignore_list, sort_count, ignore_count)
def rename_file(*,
file: Path,
timestamp: time.struct_time,
previous_name: str,
destination_directory: Path,
parent_suffix: str,
ignore_list: list[dict],
sort_count: int,
ignore_count: int
) -> tuple[str, list[dict], int, int]:
creation_year = time.strftime("%Y", timestamp)
creation_date = time.strftime("%Y-%m-%d", timestamp)
creation_time = time.strftime("%H-%M-%S", timestamp)
name_index = 0
new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
while new_file_name == previous_name:
name_index += 1
new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
new_path: Path = destination_directory.joinpath(f"{creation_year}-{parent_suffix}", new_file_name).resolve()
# Ensure the parent year folder exists
new_path.parent.mkdir(parents=True, exist_ok=True)
# Move the file
if not new_path.exists():
print(f"Moving {file.as_posix()} to {new_path.as_posix()}")
shutil.move(file, new_path)
sort_count += 1
else:
print(f"File {new_file_name} already exists, ignoring.")
ignore_list.append({"name": file.name, "reason": "File already exists"})
ignore_count += 1
# Update the previous_name for next iteration
return (new_file_name, ignore_list, sort_count, ignore_count)
def signal_handler(sig, frame):
print('Shutdown signal received. Exiting gracefully.')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
main()