239 lines
6.5 KiB
Python
Executable File
239 lines
6.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import dateutil.parser
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import traceback
|
|
import exifread
|
|
|
|
import ingest_config
|
|
|
|
# Directories to import from
|
|
IMPORT_DIRS = ingest_config.IMPORT_DIRS or {}
|
|
|
|
# Output directory
|
|
OUTPUT_DIR = ingest_config.OUTPUT_DIR or {}
|
|
|
|
# Patterns from file paths to ignore
|
|
IGNORE_PATTERNS = [
|
|
re.compile(r".*/Thumbs.db$")
|
|
]
|
|
|
|
# How to format the dates in the photo library
|
|
DATE_DIRS = os.path.join("%Y", "%Y.%m", "%Y.%m.%d")
|
|
|
|
# Date formats that might appear in the filename itself
|
|
KNOWN_FILENAME_DATE_FORMATS = [
|
|
(
|
|
re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}_[0-9]{2}\-[0-9]{2}\-[0-9]{2})[^0-9].*"),
|
|
"%Y-%m-%d_%H-%M-%S"
|
|
),
|
|
(
|
|
re.compile(r".*[^0-9]([0-9]{8}_[0-9]{6})(?:$|[^0-9].*)"),
|
|
"%Y%m%d_%H%M%S"
|
|
),
|
|
(
|
|
re.compile(r".*[^0-9]([0-9]{4}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2}\-[0-9]{2})(?:$|[^0-9].*)"),
|
|
"%Y-%m-%d-%H-%M-%S"
|
|
),
|
|
(
|
|
re.compile(r".*VID([0-9]{14})(?:$|[^0-9].*)"),
|
|
"%Y%m%d%H%M%S"
|
|
)
|
|
]
|
|
|
|
# Acceptable formats for extracting metadata via ffprobe
|
|
FFPROBE_FORMATS = ["mp4", "mkv", "mov"]
|
|
|
|
LOG_DONE_FILE = None
|
|
LOG_ERROR_FILE = None
|
|
|
|
|
|
def get_exif_meta(path):
|
|
with open(path, "rb") as f:
|
|
return exifread.process_file(f)
|
|
|
|
|
|
def get_magick_meta(path):
|
|
result = subprocess.run(["convert", path, "json:"], capture_output=True, text=True)
|
|
if result.returncode > 0:
|
|
raise RuntimeError
|
|
return json.loads(result.stdout)[0].get("image", {})
|
|
|
|
|
|
def get_ffprobe_meta(path):
|
|
result = subprocess.run(["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", path], capture_output=True, text=True)
|
|
if result.returncode > 0:
|
|
raise RuntimeError
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def get_date(path):
|
|
# Use filename date
|
|
filename = os.extsep.join(path.split(os.extsep)[:-1]).split(os.sep)[-1]
|
|
|
|
if filename:
|
|
for filename_format, template in KNOWN_FILENAME_DATE_FORMATS:
|
|
match = filename_format.match(filename)
|
|
if match:
|
|
return datetime.datetime.strptime(match.groups()[0], template)
|
|
|
|
# Use metadata
|
|
try:
|
|
exif_meta = get_exif_meta(path)
|
|
except:
|
|
exif_meta = {}
|
|
pass
|
|
|
|
ext = path.split(os.extsep)[-1]
|
|
|
|
if ext.lower() in FFPROBE_FORMATS:
|
|
try:
|
|
ffprobe_meta = get_ffprobe_meta(path)
|
|
except:
|
|
ffprobe_meta = {}
|
|
pass
|
|
else:
|
|
ffprobe_meta = {}
|
|
|
|
if ffprobe_meta:
|
|
try:
|
|
return dateutil.parser.parse(
|
|
ffprobe_meta.get("format", {}).get("tags", {}).get("creation_time")
|
|
)
|
|
except:
|
|
pass
|
|
|
|
if exif_meta:
|
|
try:
|
|
timestamp = exif_meta.get("EXIF DateTimeOriginal")
|
|
offset = exif_meta.get("EXIF OffsetTimeOriginal")
|
|
return dateutil.parser.parse(
|
|
f"{timestamp} {offset}"
|
|
)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
magick_meta = get_magick_meta(path)
|
|
except:
|
|
magick_meta = {}
|
|
pass
|
|
|
|
if magick_meta:
|
|
try:
|
|
return dateutil.parser.parse(
|
|
magick_meta.get("properties", {}).get("exif:DateTime")
|
|
)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
return dateutil.parser.parse(
|
|
magick_meta.get("properties", {}).get("date:modify")
|
|
)
|
|
except:
|
|
pass
|
|
|
|
if not any((meta for meta in [magick_meta, exif_meta, ffprobe_meta])):
|
|
import pdb; pdb.set_trace()
|
|
raise ValueError(f"No metadata extracted for {path}")
|
|
|
|
return datetime.datetime.fromtimestamp(os.stat(path).st_ctime)
|
|
|
|
|
|
def get_out_path(import_source, image_path):
|
|
import_meta = IMPORT_DIRS.get(import_source, {})
|
|
|
|
dev = import_meta.get("device")
|
|
owner = import_meta.get("owner")
|
|
|
|
intermediate = ".".join([owner, dev])
|
|
|
|
date_part = get_date(image_path).strftime(DATE_DIRS)
|
|
|
|
relative_part = os.path.join(image_path.split(import_source)[1].lstrip(os.sep))
|
|
|
|
s = os.sep
|
|
|
|
return os.path.join(OUTPUT_DIR, f"{date_part}{s}{intermediate}{s}{relative_part}")
|
|
|
|
|
|
def log_done(image_path):
|
|
global LOG_DONE_FILE
|
|
if not LOG_DONE_FILE:
|
|
log_name = "ingest_done.log"
|
|
LOG_DONE_FILE = open(log_name, "a")
|
|
LOG_DONE_FILE.write(image_path + "\n")
|
|
|
|
|
|
def check_log_done(image_path):
|
|
log_name = "ingest_done.log"
|
|
try:
|
|
with open(log_name, "r") as f:
|
|
for line in f.readlines():
|
|
line = line.strip()
|
|
if line == image_path:
|
|
return True
|
|
except:
|
|
if not os.path.exists(log_name):
|
|
open(log_name, "a").close()
|
|
else:
|
|
raise
|
|
return False
|
|
|
|
|
|
def log_error(image_path):
|
|
global LOG_ERROR_FILE
|
|
if not LOG_ERROR_FILE:
|
|
log_name = "ingest_errors.log"
|
|
LOG_ERROR_FILE = open(log_name, "a")
|
|
LOG_ERROR_FILE.write(image_path + "\n")
|
|
|
|
|
|
def main():
|
|
for import_source in IMPORT_DIRS:
|
|
print(f"Importing from {import_source}")
|
|
for basepath, _, files in os.walk(import_source):
|
|
for f in files:
|
|
image_path = os.path.join(basepath, f)
|
|
if any((pattern.match(image_path) for pattern in IGNORE_PATTERNS)):
|
|
#print("[ WARN ] Path matches ignore pattern, skipping...")
|
|
continue
|
|
if check_log_done(image_path):
|
|
#print("[ WARN ] Path already ingested, skipping...")
|
|
continue
|
|
else:
|
|
print(image_path, end=" ")
|
|
print("-->", end=" ")
|
|
out_path = None
|
|
try:
|
|
out_path = get_out_path(import_source, image_path)
|
|
except:
|
|
log_error(image_path)
|
|
traceback.print_exc()
|
|
continue
|
|
print(out_path)
|
|
if os.path.exists(out_path):
|
|
print("[ WARN ] Output already exists, skipping...")
|
|
log_done(image_path)
|
|
continue
|
|
else:
|
|
try:
|
|
os.makedirs(os.sep.join(out_path.split(os.sep)[:-1]), exist_ok=True)
|
|
shutil.copy(image_path, out_path)
|
|
except:
|
|
log_error(image_path)
|
|
print()
|
|
traceback.print_exc()
|
|
continue
|
|
log_done(image_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|