#!/usr/bin/env python3 import datetime import dateutil.parser import json import os import re import shutil import subprocess import traceback import exifread import ingest_config # Directories to import from IMPORT_DIRS = ingest_config.IMPORT_DIRS or {} # Output directory OUTPUT_DIR = ingest_config.OUTPUT_DIR or {} # Patterns from file paths to ignore IGNORE_PATTERNS = [ re.compile(r".*/Thumbs.db$") ] # How to format the dates in the photo library DATE_DIRS = os.path.join("%Y", "%Y.%m", "%Y.%m.%d") # Date formats that might appear in the filename itself KNOWN_FILENAME_DATE_FORMATS = [ ( re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}_[0-9]{2}\-[0-9]{2}\-[0-9]{2})[^0-9].*"), "%Y-%m-%d_%H-%M-%S" ), ( re.compile(r".*[^0-9]([0-9]{8}_[0-9]{6})(?:$|[^0-9].*)"), "%Y%m%d_%H%M%S" ), ( re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2})(?:$|[^0-9].*)"), "%Y-%m-%d-%H-%M-%S" ), ( re.compile(r".*VID([0-9]{14})(?:$|[^0-9].*)"), "%Y%m%d%H%M%S" ) ] # Acceptable formats for extracting metadata via ffprobe FFPROBE_FORMATS = ["mp4", "mkv", "mov"] LOG_DONE_FILE = None LOG_ERROR_FILE = None def get_exif_meta(path): with open(path, "rb") as f: return exifread.process_file(f) def get_magick_meta(path): result = subprocess.run(["convert", path, "json:"], capture_output=True, text=True) if result.returncode > 0: raise RuntimeError return json.loads(result.stdout)[0].get("image", {}) def get_ffprobe_meta(path): result = subprocess.run(["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", path], capture_output=True, text=True) if result.returncode > 0: raise RuntimeError return json.loads(result.stdout) def get_date(path): # Use filename date filename = os.extsep.join(path.split(os.extsep)[:-1]).split(os.sep)[-1] if filename: for filename_format, template in KNOWN_FILENAME_DATE_FORMATS: match = filename_format.match(filename) if match: return datetime.datetime.strptime(match.groups()[0], template) # Use metadata try: exif_meta = get_exif_meta(path) except: exif_meta = {} pass ext = path.split(os.extsep)[-1] if ext.lower() in FFPROBE_FORMATS: try: ffprobe_meta = get_ffprobe_meta(path) except: ffprobe_meta = {} pass else: ffprobe_meta = {} if ffprobe_meta: try: return dateutil.parser.parse( ffprobe_meta.get("format", {}).get("tags", {}).get("creation_time") ) except: pass if exif_meta: try: timestamp = exif_meta.get("EXIF DateTimeOriginal") offset = exif_meta.get("EXIF OffsetTimeOriginal") return dateutil.parser.parse( f"{timestamp} {offset}" ) except: pass try: magick_meta = get_magick_meta(path) except: magick_meta = {} pass if magick_meta: try: return dateutil.parser.parse( magick_meta.get("properties", {}).get("exif:DateTime") ) except: pass try: return dateutil.parser.parse( magick_meta.get("properties", {}).get("date:modify") ) except: pass if not any((meta for meta in [magick_meta, exif_meta, ffprobe_meta])): import pdb; pdb.set_trace() raise ValueError(f"No metadata extracted for {path}") return datetime.datetime.fromtimestamp(os.stat(path).st_ctime) def get_out_path(import_source, image_path): import_meta = IMPORT_DIRS.get(import_source, {}) dev = import_meta.get("device") owner = import_meta.get("owner") intermediate = ".".join([owner, dev]) date_part = get_date(image_path).strftime(DATE_DIRS) relative_part = os.path.join(image_path.split(import_source)[1].lstrip(os.sep)) s = os.sep return os.path.join(OUTPUT_DIR, f"{date_part}{s}{intermediate}{s}{relative_part}") def log_done(image_path): global LOG_DONE_FILE if not LOG_DONE_FILE: log_name = "ingest_done.log" LOG_DONE_FILE = open(log_name, "a") LOG_DONE_FILE.write(image_path + "\n") def check_log_done(image_path): log_name = "ingest_done.log" try: with open(log_name, "r") as f: for line in f.readlines(): line = line.strip() if line == image_path: return True except: if not os.path.exists(log_name): open(log_name, "a").close() else: raise return False def log_error(image_path): global LOG_ERROR_FILE if not LOG_ERROR_FILE: log_name = "ingest_errors.log" LOG_ERROR_FILE = open(log_name, "a") LOG_ERROR_FILE.write(image_path + "\n") def main(): for import_source in IMPORT_DIRS: print(f"Importing from {import_source}") for basepath, _, files in os.walk(import_source): for f in files: image_path = os.path.join(basepath, f) if any((pattern.match(image_path) for pattern in IGNORE_PATTERNS)): #print("[ WARN ] Path matches ignore pattern, skipping...") continue if check_log_done(image_path): #print("[ WARN ] Path already ingested, skipping...") continue else: print(image_path, end=" ") print("-->", end=" ") out_path = None try: out_path = get_out_path(import_source, image_path) except: log_error(image_path) traceback.print_exc() continue print(out_path) if os.path.exists(out_path): print("[ WARN ] Output already exists, skipping...") log_done(image_path) continue else: try: os.makedirs(os.sep.join(out_path.split(os.sep)[:-1]), exist_ok=True) shutil.copy(image_path, out_path) except: log_error(image_path) print() traceback.print_exc() continue log_done(image_path) if __name__ == "__main__": main()