Photo_Library/ingest.py
2024-02-16 11:43:20 +00:00

239 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
import datetime
import dateutil.parser
import json
import os
import re
import shutil
import subprocess
import traceback
import exifread
import ingest_config
# Directories to import from
IMPORT_DIRS = ingest_config.IMPORT_DIRS or {}
# Output directory
OUTPUT_DIR = ingest_config.OUTPUT_DIR or {}
# Patterns from file paths to ignore
IGNORE_PATTERNS = [
re.compile(r".*/Thumbs.db$")
]
# How to format the dates in the photo library
DATE_DIRS = os.path.join("%Y", "%Y.%m", "%Y.%m.%d")
# Date formats that might appear in the filename itself
KNOWN_FILENAME_DATE_FORMATS = [
(
re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}_[0-9]{2}\-[0-9]{2}\-[0-9]{2})[^0-9].*"),
"%Y-%m-%d_%H-%M-%S"
),
(
re.compile(r".*[^0-9]([0-9]{8}_[0-9]{6})(?:$|[^0-9].*)"),
"%Y%m%d_%H%M%S"
),
(
re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2})(?:$|[^0-9].*)"),
"%Y-%m-%d-%H-%M-%S"
),
(
re.compile(r".*VID([0-9]{14})(?:$|[^0-9].*)"),
"%Y%m%d%H%M%S"
)
]
# Acceptable formats for extracting metadata via ffprobe
FFPROBE_FORMATS = ["mp4", "mkv", "mov"]
LOG_DONE_FILE = None
LOG_ERROR_FILE = None
def get_exif_meta(path):
with open(path, "rb") as f:
return exifread.process_file(f)
def get_magick_meta(path):
result = subprocess.run(["convert", path, "json:"], capture_output=True, text=True)
if result.returncode > 0:
raise RuntimeError
return json.loads(result.stdout)[0].get("image", {})
def get_ffprobe_meta(path):
result = subprocess.run(["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", path], capture_output=True, text=True)
if result.returncode > 0:
raise RuntimeError
return json.loads(result.stdout)
def get_date(path):
# Use filename date
filename = os.extsep.join(path.split(os.extsep)[:-1]).split(os.sep)[-1]
if filename:
for filename_format, template in KNOWN_FILENAME_DATE_FORMATS:
match = filename_format.match(filename)
if match:
return datetime.datetime.strptime(match.groups()[0], template)
# Use metadata
try:
exif_meta = get_exif_meta(path)
except:
exif_meta = {}
pass
ext = path.split(os.extsep)[-1]
if ext.lower() in FFPROBE_FORMATS:
try:
ffprobe_meta = get_ffprobe_meta(path)
except:
ffprobe_meta = {}
pass
else:
ffprobe_meta = {}
if ffprobe_meta:
try:
return dateutil.parser.parse(
ffprobe_meta.get("format", {}).get("tags", {}).get("creation_time")
)
except:
pass
if exif_meta:
try:
timestamp = exif_meta.get("EXIF DateTimeOriginal")
offset = exif_meta.get("EXIF OffsetTimeOriginal")
return dateutil.parser.parse(
f"{timestamp} {offset}"
)
except:
pass
try:
magick_meta = get_magick_meta(path)
except:
magick_meta = {}
pass
if magick_meta:
try:
return dateutil.parser.parse(
magick_meta.get("properties", {}).get("exif:DateTime")
)
except:
pass
try:
return dateutil.parser.parse(
magick_meta.get("properties", {}).get("date:modify")
)
except:
pass
if not any((meta for meta in [magick_meta, exif_meta, ffprobe_meta])):
import pdb; pdb.set_trace()
raise ValueError(f"No metadata extracted for {path}")
return datetime.datetime.fromtimestamp(os.stat(path).st_ctime)
def get_out_path(import_source, image_path):
import_meta = IMPORT_DIRS.get(import_source, {})
dev = import_meta.get("device")
owner = import_meta.get("owner")
intermediate = ".".join([owner, dev])
date_part = get_date(image_path).strftime(DATE_DIRS)
relative_part = os.path.join(image_path.split(import_source)[1].lstrip(os.sep))
s = os.sep
return os.path.join(OUTPUT_DIR, f"{date_part}{s}{intermediate}{s}{relative_part}")
def log_done(image_path):
global LOG_DONE_FILE
if not LOG_DONE_FILE:
log_name = "ingest_done.log"
LOG_DONE_FILE = open(log_name, "a")
LOG_DONE_FILE.write(image_path + "\n")
def check_log_done(image_path):
log_name = "ingest_done.log"
try:
with open(log_name, "r") as f:
for line in f.readlines():
line = line.strip()
if line == image_path:
return True
except:
if not os.path.exists(log_name):
open(log_name, "a").close()
else:
raise
return False
def log_error(image_path):
global LOG_ERROR_FILE
if not LOG_ERROR_FILE:
log_name = "ingest_errors.log"
LOG_ERROR_FILE = open(log_name, "a")
LOG_ERROR_FILE.write(image_path + "\n")
def main():
for import_source in IMPORT_DIRS:
print(f"Importing from {import_source}")
for basepath, _, files in os.walk(import_source):
for f in files:
image_path = os.path.join(basepath, f)
if any((pattern.match(image_path) for pattern in IGNORE_PATTERNS)):
#print("[ WARN ] Path matches ignore pattern, skipping...")
continue
if check_log_done(image_path):
#print("[ WARN ] Path already ingested, skipping...")
continue
else:
print(image_path, end=" ")
print("-->", end=" ")
out_path = None
try:
out_path = get_out_path(import_source, image_path)
except:
log_error(image_path)
traceback.print_exc()
continue
print(out_path)
if os.path.exists(out_path):
print("[ WARN ] Output already exists, skipping...")
log_done(image_path)
continue
else:
try:
os.makedirs(os.sep.join(out_path.split(os.sep)[:-1]), exist_ok=True)
shutil.copy(image_path, out_path)
except:
log_error(image_path)
print()
traceback.print_exc()
continue
log_done(image_path)
if __name__ == "__main__":
main()