video sorting added
This commit is contained in:
parent
80a239cb94
commit
d325e201cd
@ -1,2 +1,3 @@
|
|||||||
pillow
|
pillow
|
||||||
|
# ffmpeg
|
||||||
ffmpeg-python
|
ffmpeg-python
|
@ -4,6 +4,7 @@ import time
|
|||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
|
import ffmpeg
|
||||||
from os import getenv
|
from os import getenv
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from PIL import Image, ExifTags, UnidentifiedImageError
|
from PIL import Image, ExifTags, UnidentifiedImageError
|
||||||
@ -45,24 +46,28 @@ def main():
|
|||||||
# Start the main loop
|
# Start the main loop
|
||||||
while True:
|
while True:
|
||||||
for directory in source_directories:
|
for directory in source_directories:
|
||||||
print(f"Starting sort of directory {directory}")
|
print(f"Starting sort of directory {directory}.")
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
sort_directory(directory = directory)
|
sort_count, ignore_count = sort_directory(directory = directory)
|
||||||
end_time = time.time()
|
end_time = time.time()
|
||||||
print(f"Finished sorting directory {directory}")
|
print(f"Finished sort of directory {directory}.")
|
||||||
print(f"Time taken: {end_time - start_time} seconds")
|
print(f"Sorted {sort_count} files.")
|
||||||
print(f"Sleeping for {sleep_duration} seconds")
|
print(f"Added {ignore_count} files to the ignore list.")
|
||||||
|
print(f"Processing took {end_time - start_time} seconds.")
|
||||||
|
print(f"Sleeping for {sleep_duration} seconds.")
|
||||||
time.sleep(sleep_duration)
|
time.sleep(sleep_duration)
|
||||||
|
|
||||||
def sort_directory(directory: Path):
|
def sort_directory(directory: Path) -> tuple[int, int]:
|
||||||
|
sort_count: int = 0
|
||||||
|
ignore_count: int = 0
|
||||||
# Load the ignored files list
|
# Load the ignored files list
|
||||||
ignore_file = directory.joinpath("ignored_files.json")
|
ignore_file = directory.joinpath("ignored_files.json")
|
||||||
with open(file = ignore_file) as f:
|
with open(file = ignore_file) as f:
|
||||||
ignore_list: list[str] = json.load(f)
|
ignore_list: list[dict] = json.load(f)
|
||||||
# Get all files in the directory
|
# Get all files in the directory
|
||||||
file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()]
|
file_list: list[Path] = [file for file in directory.iterdir() if file.is_file()]
|
||||||
# Filter out the ignored files
|
# Filter out the ignored files
|
||||||
sort_list = [file for file in file_list if file.name not in ignore_list]
|
sort_list = [file for file in file_list if file.name not in [item["name"] for item in ignore_list]]
|
||||||
# Create a list of photo files
|
# Create a list of photo files
|
||||||
photo_list = [file for file in sort_list if file.suffix in photo_extensions]
|
photo_list = [file for file in sort_list if file.suffix in photo_extensions]
|
||||||
# Create a list of video files
|
# Create a list of video files
|
||||||
@ -76,15 +81,26 @@ def sort_directory(directory: Path):
|
|||||||
''')
|
''')
|
||||||
# Sort the photos
|
# Sort the photos
|
||||||
print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
|
print(f"Sorting {str(len(photo_list))} photos from directory {directory.name}")
|
||||||
ignore_list = sort_photos(photo_list, ignore_list)
|
ignore_list, sort_count, ignore_count = sort_photos(
|
||||||
|
photo_list = photo_list,
|
||||||
|
ignore_list = ignore_list,
|
||||||
|
sort_count = sort_count,
|
||||||
|
ignore_count = ignore_count
|
||||||
|
)
|
||||||
# Sort the videos
|
# Sort the videos
|
||||||
print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
|
print(f"Sorting {str(len(video_list))} videos from directory {directory.name}")
|
||||||
ignore_list = sort_videos(video_list, ignore_list)
|
ignore_list, sort_count, ignore_count = sort_videos(
|
||||||
|
video_list = video_list,
|
||||||
|
ignore_list = ignore_list,
|
||||||
|
sort_count = sort_count,
|
||||||
|
ignore_count = ignore_count
|
||||||
|
)
|
||||||
# Update the ignore_file
|
# Update the ignore_file
|
||||||
with open(file = ignore_file, mode='w') as f:
|
with open(file = ignore_file, mode='w') as f:
|
||||||
json.dump(ignore_list, f)
|
json.dump(ignore_list, f)
|
||||||
|
return (sort_count, ignore_count)
|
||||||
|
|
||||||
def sort_photos(photo_list: list[Path], ignore_list: list[str]) -> list[str]:
|
def sort_photos(photo_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int) -> tuple[list[dict], int, int]:
|
||||||
# Init the previous_name for collision avoidance
|
# Init the previous_name for collision avoidance
|
||||||
previous_name: str = ""
|
previous_name: str = ""
|
||||||
for photo in photo_list:
|
for photo in photo_list:
|
||||||
@ -93,50 +109,102 @@ def sort_photos(photo_list: list[Path], ignore_list: list[str]) -> list[str]:
|
|||||||
with Image.open(photo.resolve()) as image:
|
with Image.open(photo.resolve()) as image:
|
||||||
exif = image.getexif()
|
exif = image.getexif()
|
||||||
except OSError:
|
except OSError:
|
||||||
print(f"File {photo.name} could not be opened (permissions?), ignoring.")
|
print(f"File {photo.name} could not be opened (permissions?), skipping.")
|
||||||
continue
|
continue
|
||||||
except UnidentifiedImageError:
|
except UnidentifiedImageError:
|
||||||
print(f"File {photo.name} is not a valid image file. Ignoring.")
|
print(f"File {photo.name} is not a valid image file. Ignoring.")
|
||||||
ignore_list.append(photo.name)
|
ignore_list.append({"name": photo.name, "reason": "Not a valid image file"})
|
||||||
|
ignore_count += 1
|
||||||
continue
|
continue
|
||||||
|
timestamp = None
|
||||||
if ExifTags.Base.DateTimeOriginal in exif.keys():
|
if ExifTags.Base.DateTimeOriginal in exif.keys():
|
||||||
image_date_time_string = exif[ExifTags.Base.DateTimeOriginal]
|
timestamp = time.strptime(exif[ExifTags.Base.DateTimeOriginal], "%Y:%m:%d %H:%M:%S")
|
||||||
elif ExifTags.Base.DateTime in exif.keys():
|
elif ExifTags.Base.DateTime in exif.keys():
|
||||||
image_date_time_string = exif[ExifTags.Base.DateTime]
|
timestamp = time.strptime(exif[ExifTags.Base.DateTime], "%Y:%m:%d %H:%M:%S")
|
||||||
else:
|
if timestamp is None:
|
||||||
image_date_time_string = None
|
print(f"File {photo.name} does not have an exif timestamp, ignoring.")
|
||||||
if image_date_time_string is None or image_date_time_string == '':
|
ignore_list.append({"name": photo.name, "reason": "No exif timestamp"})
|
||||||
print(f"File {photo.name} does not have a valid exif timestamp, ignoring.")
|
ignore_count += 1
|
||||||
ignore_list.append(photo.name)
|
|
||||||
continue
|
continue
|
||||||
image_timestamp = time.strptime(image_date_time_string, "%Y:%m:%d %H:%M:%S")
|
previous_name, ignore_list, sort_count, ignore_count = rename_file(
|
||||||
image_year: str = time.strftime("%Y", image_timestamp)
|
file = photo,
|
||||||
image_date_string: str = time.strftime("%Y-%m-%d", image_timestamp)
|
timestamp = timestamp,
|
||||||
image_time_string: str = time.strftime("%H-%M-%S", image_timestamp)
|
previous_name = previous_name,
|
||||||
# Set index_count for this iteration
|
destination_directory = photos_directory,
|
||||||
index_count: int = 0
|
parent_suffix = "Photos",
|
||||||
# Format the new name
|
ignore_list = ignore_list,
|
||||||
new_name: str = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
|
sort_count = sort_count,
|
||||||
while new_name == previous_name:
|
ignore_count = ignore_count
|
||||||
index_count += 1
|
)
|
||||||
new_name = f"{image_date_string}_{image_time_string}_{str(index_count).zfill(3)}.jpg"
|
return (ignore_list, sort_count, ignore_count)
|
||||||
new_path: Path = photos_directory.joinpath(f"{image_year}-Photos", new_name).resolve()
|
|
||||||
# Ensure the parent year folder exists
|
|
||||||
new_path.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
# Move the file
|
|
||||||
if not new_path.exists():
|
|
||||||
print(f"Moving {photo.as_posix()} to {new_path.as_posix()}")
|
|
||||||
photo.rename(new_path)
|
|
||||||
else:
|
|
||||||
print(f"File {new_name} already exists, ignoring.")
|
|
||||||
ignore_list.append(photo.name)
|
|
||||||
# Update the previous_name for next iteration
|
|
||||||
previous_name = new_name
|
|
||||||
return ignore_list
|
|
||||||
|
|
||||||
def sort_videos(video_list: list[Path], ignore_list: list[str]) -> list[str]:
|
def sort_videos(video_list: list[Path], ignore_list: list[dict], sort_count: int, ignore_count: int) -> tuple[list[dict], int, int]:
|
||||||
print("Video sorting not implemened yet.")
|
previous_name: str = ""
|
||||||
return ignore_list
|
for video in video_list:
|
||||||
|
try:
|
||||||
|
meta_dict: dict = ffmpeg.probe(video)
|
||||||
|
except OSError:
|
||||||
|
print(f"File {video.name} could not be opened (permissions?), skipping.")
|
||||||
|
continue
|
||||||
|
except ffmpeg.Error:
|
||||||
|
print(f"General error, file {video.name} could not be probed by ffmpeg, ignoring.")
|
||||||
|
ignore_list.append({"name": video.name, "reason": "Can not be probed by ffmpeg"})
|
||||||
|
ignore_count += 1
|
||||||
|
continue
|
||||||
|
timestamp = None
|
||||||
|
if "format" in meta_dict.keys():
|
||||||
|
if "tags" in meta_dict["format"].keys():
|
||||||
|
if "creation_time" in meta_dict["format"]["tags"].keys():
|
||||||
|
timestamp = time.strptime(meta_dict["format"]["tags"]["creation_time"], "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||||
|
if timestamp is None:
|
||||||
|
print(f"File {video.name} does not have creation_time metadata, ignoring.")
|
||||||
|
ignore_list.append({"name": video.name, "reason": "No creation_time metadata"})
|
||||||
|
ignore_count += 1
|
||||||
|
continue
|
||||||
|
previous_name, ignore_list, sort_count, ignore_count = rename_file(
|
||||||
|
file = video,
|
||||||
|
timestamp = timestamp,
|
||||||
|
previous_name = previous_name,
|
||||||
|
destination_directory = recordings_directory,
|
||||||
|
parent_suffix = "Recordings",
|
||||||
|
ignore_list = ignore_list,
|
||||||
|
sort_count = sort_count,
|
||||||
|
ignore_count = ignore_count
|
||||||
|
)
|
||||||
|
return (ignore_list, sort_count, ignore_count)
|
||||||
|
|
||||||
|
def rename_file(*,
|
||||||
|
file: Path,
|
||||||
|
timestamp: time.struct_time,
|
||||||
|
previous_name: str,
|
||||||
|
destination_directory: Path,
|
||||||
|
parent_suffix: str,
|
||||||
|
ignore_list: list[dict],
|
||||||
|
sort_count: int,
|
||||||
|
ignore_count: int
|
||||||
|
) -> tuple[str, list[dict], int, int]:
|
||||||
|
creation_year = time.strftime("%Y", timestamp)
|
||||||
|
creation_date = time.strftime("%Y-%m-%d", timestamp)
|
||||||
|
creation_time = time.strftime("%H-%M-%S", timestamp)
|
||||||
|
name_index = 0
|
||||||
|
new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
|
||||||
|
while new_file_name == previous_name:
|
||||||
|
name_index += 1
|
||||||
|
new_file_name = f"{creation_date}_{creation_time}_{str(name_index).zfill(3)}{file.suffix}"
|
||||||
|
new_path: Path = destination_directory.joinpath(f"{creation_year}-{parent_suffix}", new_file_name).resolve()
|
||||||
|
# Ensure the parent year folder exists
|
||||||
|
new_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
# Move the file
|
||||||
|
if not new_path.exists():
|
||||||
|
print(f"Moving {file.as_posix()} to {new_path.as_posix()}")
|
||||||
|
file.rename(new_path)
|
||||||
|
sort_count += 1
|
||||||
|
else:
|
||||||
|
print(f"File {new_file_name} already exists, ignoring.")
|
||||||
|
ignore_list.append({"name": file.name, "reason": "File already exists"})
|
||||||
|
ignore_count += 1
|
||||||
|
# Update the previous_name for next iteration
|
||||||
|
return (new_file_name, ignore_list, sort_count, ignore_count)
|
||||||
|
|
||||||
def signal_handler(sig, frame):
|
def signal_handler(sig, frame):
|
||||||
print('Shutdown signal received. Exiting gracefully.')
|
print('Shutdown signal received. Exiting gracefully.')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user