159 lines
6.5 KiB
Python
159 lines
6.5 KiB
Python
#!/usr/bin/env python
|
|
|
|
import time
|
|
import signal
|
|
import sys
|
|
import json
|
|
from os import getenv
|
|
from pathlib import Path
|
|
from PIL import Image, ExifTags, UnidentifiedImageError
|
|
|
|
# Set the directories
|
|
source_directories: list[Path] = [Path(directory_string).resolve() for directory_string in getenv('WATCH_DIRS', '/sync').split(':')]
|
|
recordings_directory: Path = Path(getenv('RECORDINGS_DIRECTORY', '/recordings')).resolve()
|
|
photos_directory: Path = Path(getenv('PHOTOS_DIRECTORY', '/photos')).resolve()
|
|
photo_extensions: list[str] = ['.jpg', '.jpeg']
|
|
video_extensions: list[str] = ['.mp4', '.mov', '.avi', '.mkv']
|
|
sleep_duration: int = 30
|
|
|
|
def main():
|
|
|
|
# Check the source directories exist and create the ignore files if they don't exist
|
|
for directory in source_directories:
|
|
if not directory.exists() or not directory.is_dir():
|
|
print(f"Directory {directory} does not exist or is not a directory. Exiting.")
|
|
sys.exit(1)
|
|
ignore_file: Path = directory.joinpath("ignored_files.json")
|
|
if not ignore_file.exists():
|
|
with open('ignored_files.json', 'w') as f:
|
|
json.dump([], f)
|
|
|
|
# Check the destination directories exist and are writable
|
|
for directory in [recordings_directory, photos_directory]:
|
|
if not directory.exists() or not directory.is_dir() :
|
|
print(f"Destination directory {directory} does not exist or is not a directory. Exiting.")
|
|
sys.exit(1)
|
|
write_test_file: Path = directory.joinpath("write_test")
|
|
try:
|
|
write_test_file.touch()
|
|
write_test_file.unlink()
|
|
except PermissionError:
|
|
print(f"Destination directory {directory} is not writable")
|
|
sys.exit(1)
|
|
|
|
# Ensure folders for the current year exist
|
|
current_year: str = time.strftime("%Y")
|
|
for directory in [recordings_directory, photos_directory]:
|
|
year_directory: Path = directory.joinpath(current_year)
|
|
if not year_directory.exists():
|
|
year_directory.mkdir()
|
|
|
|
# Start the main loop
|
|
while True:
|
|
for directory in source_directories:
|
|
print(f"Starting sort of directory {directory}")
|
|
start_time = time.time()
|
|
sort_directory(directory = directory)
|
|
end_time = time.time()
|
|
print(f"Finished sorting directory {directory}")
|
|
print(f"Time taken: {end_time - start_time} seconds")
|
|
print(f"Sleeping for {sleep_duration} seconds")
|
|
time.sleep(sleep_duration)
|
|
|
|
def sort_directory(directory: Path):
|
|
# Load the ignored files list
|
|
ignore_file = directory.joinpath("ignored_files.json")
|
|
with open(file= ignore_file) as f:
|
|
ignore_list: list[str] = json.load(f)
|
|
# Get all files in the directory
|
|
file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()]
|
|
# Filter out the ignored files
|
|
sort_list = [file for file in file_list if file.name not in ignore_list]
|
|
# Create a list of photo files
|
|
photo_list = [file for file in sort_list if file.suffix in photo_extensions]
|
|
# Create a list of video files
|
|
video_list = [file for file in sort_list if file.suffix in video_extensions]
|
|
print(f'''
|
|
Found {str(len(file_list))} total files in directory {directory.name}.
|
|
{str(len(ignore_list))} to ignore.
|
|
{str(len(sort_list))} to sort.
|
|
{str(len(photo_list))} photos.
|
|
{str(len(video_list))} videos.
|
|
''')
|
|
# Sort the photos
|
|
print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
|
|
ignored_files = sort_photos(photo_list, ignored_files)
|
|
# Sort the videos
|
|
print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
|
|
ignored_files = sort_videos(video_list, ignored_files)
|
|
# Update the ignore_file
|
|
with open(file= ignore_file, mode='w') as f:
|
|
json.dump(ignored_files, f)
|
|
|
|
def sort_photos(photo_list: list[Path], ignored_files: list[str]) -> list[str]:
|
|
# Init the previous_name for collision avoidance
|
|
previous_name: str = ""
|
|
for photo in photo_list:
|
|
# Read the exif data from the photo
|
|
try:
|
|
with Image.open(photo.resolve()) as image:
|
|
exif = image.getexif()
|
|
except OSError:
|
|
print(f"File {photo.name} could not be opened (permissions?), ignoring.")
|
|
continue
|
|
except UnidentifiedImageError:
|
|
print(f"File {photo.name} is not a valid image file. Ignoring.")
|
|
ignored_files.append(photo.name)
|
|
continue
|
|
# Format the date
|
|
if exif[ExifTags.Base.DateTimeOriginal] is not None and exif[ExifTags.Base.DateTimeOriginal] != '':
|
|
image_timestamp = time.strptime(
|
|
data_string = exif[ExifTags.Base.DateTimeOriginal],
|
|
format = "%Y:%m:%d %H:%M:%S"
|
|
)
|
|
image_year: str = time.strftime(
|
|
format = "%Y",
|
|
time_tuple = image_timestamp
|
|
)
|
|
image_date_string: str = time.strftime(
|
|
format = "%Y-%m-%d",
|
|
time_tuple = image_timestamp
|
|
)
|
|
image_time_string: str = time.strftime(
|
|
format = "%H-%M-%S",
|
|
time_tuple = image_timestamp
|
|
)
|
|
else:
|
|
print(f"File {photo.name} does not have a valid creation date, ignoring.")
|
|
ignored_files.append(photo.name)
|
|
continue
|
|
# Set index_count for this iteration
|
|
index_count: int = 0
|
|
# Format the new name
|
|
new_name: str = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
|
|
while new_name == previous_name:
|
|
index_count += 1
|
|
new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
|
|
# Move the file
|
|
new_path: Path = photos_directory.joinpath(image_year, new_name).resolve()
|
|
if not new_path.exists():
|
|
photo.rename(new_path)
|
|
else:
|
|
print(f"File {new_name} already exists, ignoring.")
|
|
ignored_files.append(photo.name)
|
|
# Update the previous_name for next iteration
|
|
previous_name = new_name
|
|
return ignored_files
|
|
|
|
def sort_videos(video_list: list[Path], ignored_files: list[str]) -> list[str]:
|
|
print("Video sorting not implemened yet.")
|
|
return ignored_files
|
|
|
|
def signal_handler(sig, frame):
|
|
print('Shutdown signal received. Exiting gracefully.')
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
if __name__ == "__main__":
|
|
main() |