Files
reverse-geolocate/bin/reverse_geolocate.py

1363 lines
54 KiB
Python
Executable File

#!/usr/bin/env python3
"""
AUTHOR : Clemens Schwaighofer
DATE : 2018/2/20
LICENSE: GPLv3
DESC :
Set the reverse Geo location (name) from Lat/Long data in XMP files
in a lightroom catalogue
* tries to get pre-set geo location from LR catalog
* if not found tries to get data from Google
* all data is translated into English with long vowl system (aka ou or oo is ō)
MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/)
"""
import configparser
# import textwrap
import glob
import os
import sys
import re
import argparse
import sqlite3
from shutil import copyfile, get_terminal_size
from math import ceil
# Note XMPFiles does not work with sidecar files, need to read via XMPMeta
from libxmp import XMPMeta, consts
# user modules below
from utils.long_lat import (
convert_dms_to_lat,
convert_dms_to_long,
convert_lat_to_dms,
convert_long_to_dms,
get_distance,
)
from utils.reverse_geolocate import reverse_geolocate
from utils.string_helpers import string_len_cjk, shorten_string, format_len
##############################################################
# FUNCTIONS
##############################################################
# this is for looking up if string is non latin letters
# this is used by isLatin and onlyLatinChars
cache_latin_letters = {}
# ARGPARSE HELPERS
class WritableDirFolder(argparse.Action):
"""
checks if this is a writeable folder OR file
AND it works on nargs *
Args:
argparse (_type_): _description_
"""
def __call__(self, parser, namespace, values, option_string=None):
if isinstance(values, str) or values is None:
print("FAIL")
else:
# we loop through list (this is because of nargs *)
for prospective_dir in iter(values):
# if valid and writeable (dir or file)
if os.access(prospective_dir, os.W_OK):
# init new output array
out = []
# if we have a previous list in the namespace extend current list
if isinstance(getattr(namespace, self.dest), list):
out.extend(getattr(namespace, self.dest))
# add the new dir to it
out.append(prospective_dir)
# and write that list back to the self.dest in the namespace
setattr(namespace, self.dest, out)
else:
raise argparse.ArgumentTypeError(
f"writable_dir_folder: {prospective_dir} is not a writable dir"
)
class ReadableDir(argparse.Action):
"""
custom define to check if it is a valid directory
Args:
argparse (_type_): _description_
"""
def __call__(self, parser, namespace, values, option_string=None):
prospective_dir = values
if not isinstance(prospective_dir, str):
raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir")
else:
if not os.path.isdir(prospective_dir):
raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a valid path")
if os.access(prospective_dir, os.R_OK):
setattr(namespace, self.dest, prospective_dir)
else:
raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir")
class DistanceValues(argparse.Action):
"""
check distance values are valid
Args:
argparse (_type_): _description_
"""
def __call__(self, parser, namespace, values, option_string=None):
if not isinstance(values, str):
raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument")
else:
_distance = re.match(r"^(\d+)\s?(m|km)$", values)
if _distance:
# convert to int in meters
values = int(_distance.group(1))
if _distance.group(2) == "km":
values *= 1000
setattr(namespace, self.dest, values)
else:
raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument")
# MAIN FUNCTIONS
def check_overwrite(data, key, field_controls, args):
"""
checks with field control flags if given data for key should be written
1) data is not set
2) data is set or not and field_control: overwrite only set
3) data for key is not set, but only for key matches field_control
4) data for key is set or not, but only for key matches field_control and overwrite is set
Args:
data(str): value field
key(str): xmpt key
field_controls (array): array from args
args (_type_): _description_
Returns:
bool: true/false
"""
status = False
# init field controls for empty
if not field_controls:
field_controls = []
if not data and (
len(field_controls) == 0 or ("overwrite" in field_controls and len(field_controls) == 1)
):
status = True
elif not data and key.lower() in field_controls:
status = True
elif data and "overwrite" in field_controls and len(field_controls) == 1:
status = True
elif data and key.lower() in field_controls and "overwrite" in field_controls:
status = True
if args.debug:
print(
f"Data set: {'YES' if data else 'NO'}, "
f"Key: {key.lower()}, "
f"Field Controls len: {len(field_controls)}, "
f"Overwrite: {'OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE'}, "
"Key in Field Controls: "
f"{'KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING'}, "
f"OVERWRITE: {status}"
)
return status
def shorten_path(path, length=30, file_only=False, path_only=False):
"""
shortes a path from the left so it fits into lenght
if file only is set to true, it will split the file, if path only is set, only the path
Args:
path(str): path
length (int, optional): maximum length to shorten to. Defaults to 30.
file_only (bool, optional): only file. Defaults to False.
path_only (bool, optional): only path. Defaults to False.
Returns:
string: shortend path with ... in front
"""
length = length - 3
# I assume the XMP file name has no CJK characters inside, so I strip out the path
# The reason is that if there are CJK characters inside it will screw up the formatting
if file_only:
path = os.path.split(path)[1]
if path_only:
path = os.path.split(path)[0]
if string_len_cjk(path) > length:
path = f".. {path[string_len_cjk(path) - length:]}"
return path
# def print_header(header, lines=0, header_line=0):
# """
# prints header line and header seperator line
# Args:
# header (str): header string
# lines (int, optional): line counter. Defaults to 0.
# header_line (int, optional): print header counter grigger. Defaults to 0.
# Returns:
# int: line counter +1
# """
# global page_no
# if lines == header_line:
# # add one to the pages shown and reset the lines to start new page
# page_no += 1
# lines = 0
# # print header
# print(f"{header}")
# lines += 1
# return lines
class ReadOnlyOutput:
"""
for read only listing
"""
page_no = 1
page_all = 1
lines = 0
header_print = 0
header_template = ""
def __init__(self, header_template, max_pages, header_print_line):
self.page_all = max_pages
self.header_template = header_template
self.header_print = header_print_line
def print_header(self):
"""
prints header line and header seperator line
Args:
header (str): header string
lines (int, optional): line counter. Defaults to 0.
header_line (int, optional): print header counter grigger. Defaults to 0.
Returns:
int: line counter +1
"""
if self.lines == self.header_print:
# add one to the pages shown and reset the lines to start new page
self.page_no += 1
self.lines = 0
# print header
# print(f"{header}")
print(self.header_template.format(page_no=self.page_no, page_all=self.page_all))
self.lines += 1
def file_sort_number(file):
"""
gets the BK number for sorting in the file list
Args:
file (str): file name
Returns:
int: number found in the BK string or 0 for none
"""
match = re.match(r".*\.BK\.(\d+)\.xmp$", file)
return int(match.group(1)) if match is not None else 0
def output_list_width_adjust(args):
"""
adjusts the size for the format length for the list output
Args:
args (_type_): arguments
Returns:
dictionary: format_length dictionary
"""
# various string lengths
format_length = {
"filename": 35,
"latitude": 18,
"longitude": 18,
"code": 4,
"country": 15,
"state": 18,
"city": 20,
"location": 25,
"path": 40,
}
if args.compact_view:
reduce_percent = 40
# all formats are reduced to a mininum, we cut % off
for format_key in [
"filename",
"latitude",
"longitude",
"country",
"state",
"city",
"location",
"path",
]:
format_length[format_key] = ceil(
format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent)
)
else:
# minimum resize size for a column
resize_width_min = 4
# the resize percent
# start with 10, then increase until we reach max
resize_percent_min = 10
resize_percent_max = 50
# abort flag so we can break out of the second loop too
abort = False
# formay key order, in which order the elements will be resized
format_key_order = []
# resize flag: 0 no, 1: make bigger, -1: make smaller
# change sizes for print based on terminal size
# NOTE: in screen or term this data might NOT be correct
# Current size needs the in between and left/right space data
current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2
if current_columns < get_terminal_size().columns:
resize = 1
format_key_order = ["path", "location", "state", "city", "country", "filename"]
else:
resize = -1
format_key_order = [
"latitude",
"longitude",
"path",
"country",
"state",
"city",
"location",
"filename",
]
# if we have no auto adjust
if resize and args.no_autoadjust:
# warningn if screen is too small
if resize == -1:
print("[!!!] Screen layout might be skewed. Increase Terminal width")
resize = 0
else:
for resize_percent in range(resize_percent_min, resize_percent_max, 10):
for format_key in format_key_order:
resize_width = (format_length[format_key] / 100) * resize_percent
# if we down size, make it negative
if resize == -1:
resize_width *= -1
resize_width = ceil(format_length[format_key] + resize_width)
# in case too small, keep old one
format_length[format_key] = (
resize_width if resize_width > resize_width_min else format_length[format_key]
)
# calc new width for check if we can abort
current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2
if (resize == 1 and current_columns >= get_terminal_size().columns) or (
resize == -1 and current_columns < get_terminal_size().columns
):
# check that we are not OVER but one under
width_up = get_terminal_size().columns - current_columns - 1
if (resize == 1 and width_up < 0) or (resize == -1 and width_up != 0):
if format_length["path"] + width_up >= resize_width_min:
format_length["path"] += width_up
abort = True
break
if abort:
break
if sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > get_terminal_size().columns:
print("[!!!] Screen layout might be skewed. Increase Terminal width")
return format_length
def get_backup_file_counter(xmp_file, args):
"""
get backup file counter
Args:
xmp_file (str): file name
args (_type_): arguments
Returns:
int: next counter to be used for backup
"""
# set to 1 for if we have no backups yet
bk_file_counter = 1
# get PATH from file and look for .BK. data in this folder matching,
# output is sorted per BK counter key
for bk_file in sorted(
glob.glob(
# "{path}/{file}*.xmp".format(
# path=os.path.split(xmp_file)[0],
# file=f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK."
# )
os.path.join(
f"{os.path.split(xmp_file)[0]}", f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK.*.xmp"
)
),
# custom sort key to get the backup files sorted correctly
key=lambda pos: file_sort_number(pos),
# key=file_sort_number(),
reverse=True,
):
# BK.1, etc -> get the number
bk_pos = file_sort_number(bk_file)
if bk_pos > 0:
if args.debug:
print(f"#### **** File: {bk_file}, Counter: {bk_pos} -> {bk_pos + 1}")
# check if found + 1 is bigger than set, if yes, set to new bk counter
if bk_pos + 1 > bk_file_counter:
bk_file_counter = bk_pos + 1
break
# return the next correct number for backup
return bk_file_counter
##############################################################
# ARGUMENT PARSING
##############################################################
def argument_parser():
"""
Parses the command line arguments
Returns:
Namespace: parsed arguments
"""
parser = argparse.ArgumentParser(
description="Reverse Geoencoding based on set Latitude/Longitude data in XMP files",
# formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="Sample: (todo)",
)
# xmp folder (or folders), or file (or files)
# note that the target directory or file needs to be writeable
parser.add_argument(
"-i",
"--include-source",
required=True,
nargs="*",
action=WritableDirFolder,
dest="xmp_sources",
metavar="XMP SOURCE FOLDER",
help=(
"The source folder or folders with the XMP files that need reverse geo encoding "
"to be set. Single XMP files can be given here"
),
)
# exclude folders
parser.add_argument(
"-x",
"--exclude-source",
nargs="*",
action=WritableDirFolder,
dest="exclude_sources",
metavar="EXCLUDE XMP SOURCE FOLDER",
help="Folders and files that will be excluded.",
)
# LR database (base folder)
# get .lrcat file in this folder
parser.add_argument(
"-l",
"--lightroom",
# required=True,
action=ReadableDir,
dest="lightroom_folder",
metavar="LIGHTROOM FOLDER",
help="Lightroom catalogue base folder",
)
# strict LR check with base path next to the file base name
parser.add_argument(
"-s",
"--strict",
dest="lightroom_strict",
action="store_true",
help="Do strict check for Lightroom files including Path in query",
)
# set behaviour override
# FLAG: default: only set not filled
# other: overwrite all or overwrite if one is missing,
# overwrite specifc field (as defined below)
# fields: Location, City, State, Country, CountryCode
parser.add_argument(
"-f",
"--field",
action="append",
type=str.lower, # make it lowercase for check
choices=["overwrite", "location", "city", "state", "country", "countrycode"],
dest="field_controls",
metavar="<overwrite, location, city, state, country, countrycode>",
help=(
"On default only set fields that are not set yet. Options are: "
"Overwrite (write all new), Location, City, State, Country, CountryCode. "
"Multiple can be given for combination overwrite certain fields only "
"or set only certain fields. "
"If with overwrite the field will be overwritten if already set, "
"else it will be always skipped."
),
)
parser.add_argument(
"-d",
"--fuzzy-cache",
type=str.lower,
action=DistanceValues,
nargs="?",
const="10m", # default is 10m
dest="fuzzy_distance",
metavar="FUZZY DISTANCE",
help=(
"Allow fuzzy distance cache lookup. Optional distance can be given, "
"if not set default of 10m is used. "
"Allowed argument is in the format of 12m or 12km"
),
)
# Google Maps API key to overcome restrictions
parser.add_argument(
"-g",
"--google",
dest="google_api_key",
metavar="GOOGLE API KEY",
help="Set a Google API Maps key to overcome the default lookup limitations",
)
# use open street maps
parser.add_argument(
"-o",
"--openstreetmap",
dest="use_openstreetmap",
action="store_true",
help="Use openstreetmap instead of Google",
)
# email of open street maps requests
parser.add_argument(
"-e", "--email", dest="email", metavar="EMIL ADDRESS", help="An email address for OpenStreetMap"
)
# write api/email settings to config file
parser.add_argument(
"-w",
"--write-settings",
dest="config_write",
action="store_true",
help="Write Google API or OpenStreetMap email to config file",
)
# only read data and print on screen, do not write anything
parser.add_argument(
"-r",
"--read-only",
dest="read_only",
action="store_true",
help=(
"Read current values from the XMP file only, "
"do not read from LR or lookup any data and write back"
),
)
# only list unset ones
parser.add_argument(
"-u", "--unset-only", dest="unset_only", action="store_true", help="Only list unset XMP files"
)
# only list unset GPS codes
parser.add_argument(
"-p",
"--unset-gps-only",
dest="unset_gps_only",
action="store_true",
help="Only list unset XMP files for GPS fields",
)
# don't try to do auto adjust in list view
parser.add_argument(
"-a",
"--no-autoadjust",
dest="no_autoadjust",
action="store_true",
help="Don't try to auto adjust columns",
)
# compact view, compresses columns down to a minimum
parser.add_argument(
"-c", "--compact", dest="compact_view", action="store_true", help="Very compact list view"
)
# Do not create backup files
parser.add_argument(
"-n",
"--nobackup",
dest="no_xmp_backup",
action="store_true",
help="Do not create a backup from the XMP file",
)
# verbose args for more detailed output
parser.add_argument("-v", "--verbose", action="count", dest="verbose", help="Set verbose output level")
# debug flag
parser.add_argument("--debug", action="store_true", dest="debug", help="Set detailed debug output")
# test flag
parser.add_argument("--test", action="store_true", dest="test", help="Do not write data back to file")
# read in the argumens
return parser.parse_args()
##############################################################
# MAIN CODE
##############################################################
def main():
"""
Main code run
"""
args = argument_parser()
# init verbose to 0 if not set
if not args.verbose:
args.verbose = 0
# init exclude source to list if not set
if not args.exclude_sources:
args.exclude_sources = []
# init args unset (for list view) with 0 if unset
if not args.unset_only:
args.unset_only = 0
if args.debug:
print(
"### ARGUMENT VARS: "
f"I: {args.xmp_sources}, X: {args.exclude_sources}, L: {args.lightroom_folder}, "
f"F: {args.field_controls}, D: {args.fuzzy_distance}, M: {args.use_openstreetmap}, "
f"G: {args.google_api_key}, E: {args.email}, R: {args.read_only}, "
f"U: {args.unset_only}, A: {args.no_autoadjust}, C: {args.compact_view}, "
f"N: {args.no_xmp_backup}, W: {args.config_write}, V: {args.verbose}, "
f"D: {args.debug}, T: {args.test}"
)
# error flag
error = False
# set search map type
map_type = "google" if not args.use_openstreetmap else "openstreetmap"
# if -g and -o, error
if args.google_api_key and args.use_openstreetmap:
print("You cannot set a Google API key and use OpenStreetMap at the same time")
error = True
# or if -g and -e
if args.google_api_key and args.email:
print("You cannot set a Google API key and OpenStreetMap email at the same time")
error = True
# or -e and no -o
if args.email and not args.use_openstreetmap:
print("You cannot set an OpenStreetMap email and not use OpenStreetMap")
error = True
# if email and not basic valid email (@ .)
if args.email:
if not re.match(r"^.+@.+\.[A-Za-z]{1,}$", args.email):
print(f"Not a valid email for OpenStreetMap: {args.email}")
error = True
# on error exit here
if error:
sys.exit(1)
config = configparser.ConfigParser()
# try to find config file in following order
# $HOME/.config/
config_file = "reverse_geolocate.cfg"
config_folder = os.path.expanduser("~/.config/reverseGeolocate/")
config_data = os.path.join(f"{config_folder}", f"{config_file}")
# if file exists read, if not skip unless we have write flag and
# google api or openstreetmaps email
if os.path.isfile(config_data):
config.read(config_data)
# check if api group & setting is there. also never overwrite argument given data
if "API" in config:
if "googleapikey" in config["API"]:
if not args.google_api_key:
args.google_api_key = config["API"]["googleapikey"]
if "openstreetmapemail" in config["API"]:
if not args.email:
args.email = config["API"]["openstreetmapemail"]
# write data if exists and changed
if args.config_write and (args.google_api_key or args.email):
config_change = False
# check if new value differs, if yes, change and write
if "API" not in config:
config["API"] = {}
if args.google_api_key and (
"googleapikey" not in config["API"] or config["API"]["googleapikey"] != args.google_api_key
):
config["API"]["googleapikey"] = args.google_api_key
config_change = True
if args.email and (
"openstreetmapemail" not in config["API"] or config["API"]["openstreetmapemail"] != args.email
):
config["API"]["openstreetmapemail"] = args.email
config_change = True
if config_change:
# if we do not have the base folder create that first
if not os.path.exists(config_folder):
os.makedirs(config_folder)
with open(config_data, "w", encoding="UTF-8") as fptr:
config.write(fptr)
if args.debug:
print(f"### OVERRIDE API: G: {args.google_api_key}, O: {args.email}")
# The XMP fields const lookup values
# XML/XMP
# READ:
# exif:GPSLatitude
# exif:GPSLongitude
# READ for if filled
# Iptc4xmpCore:Location
# photoshop:City
# photoshop:State
# photoshop:Country
# Iptc4xmpCore:CountryCode
xmp_fields = {
# EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format
"GPSLatitude": consts.XMP_NS_EXIF,
"GPSLongitude": consts.XMP_NS_EXIF,
"Location": consts.XMP_NS_IPTCCore,
"City": consts.XMP_NS_Photoshop,
"State": consts.XMP_NS_Photoshop,
"Country": consts.XMP_NS_Photoshop,
"CountryCode": consts.XMP_NS_IPTCCore,
}
# non lat/long fields (for loc loops)
data_set_loc = ("Location", "City", "State", "Country", "CountryCode")
# one xmp data set
data_set = {
"GPSLatitude": "",
"GPSLongitude": "",
"Location": "",
"City": "",
"State": "",
"Country": "",
"CountryCode": "",
}
# original set for compare (is constant unchanged)
data_set_original = {}
# cache set to avoid double lookups for identical Lat/Ling
data_cache = {}
# work files, all files + folders we need to work on
work_files = []
# all failed files
failed_files = []
# use lightroom
use_lightroom = False
# path to lightroom database
lightroom_database = ""
# cursors & query
query = ""
cur = None
lrdb = None
# count variables
count = {
"all": 0,
"listed": 0,
"read": 0,
"map": 0,
"cache": 0,
"fuzzy_cache": 0,
"lightroom": 0,
"changed": 0,
"failed": 0,
"skipped": 0,
"not_found": 0,
"many_found": 0,
}
# do lightroom stuff only if we have the lightroom folder
if args.lightroom_folder:
# query string for lightroom DB check
query = (
"SELECT Adobe_images.id_local, AgLibraryFile.baseName, "
"AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, "
"AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, "
"AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, "
"AgHarvestedIptcMetadata.locationDataOrigination, "
"AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, "
"AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, "
"AgInternedIptcIsoCountryCode.value as CountryCode "
"FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, "
"AgLibraryRootFolder, Adobe_images "
"LEFT JOIN AgHarvestedIptcMetadata "
"ON Adobe_images.id_local = AgHarvestedIptcMetadata.image "
"LEFT JOIN AgInternedIptcLocation "
"ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local "
"LEFT JOIN AgInternedIptcCity "
"ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local "
"LEFT JOIN AgInternedIptcState "
"ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local "
"LEFT JOIN AgInternedIptcCountry "
"ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local "
"LEFT JOIN AgInternedIptcIsoCountryCode "
"ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local "
"WHERE Adobe_images.rootFile = AgLibraryFile.id_local "
"AND Adobe_images.id_local = AgHarvestedExifMetadata.image "
"AND AgLibraryFile.folder = AgLibraryFolder.id_local "
"AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local "
"AND AgLibraryFile.baseName = ?"
)
# absolutePath + pathFromRoot = path of XMP file - XMP file
if args.lightroom_strict:
query += "AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?"
# connect to LR database for reading
# open the folder and look for the first lrcat file in there
for file in os.listdir(args.lightroom_folder):
if file.endswith(".lrcat"):
lightroom_database = os.path.join(args.lightroom_folder, file)
lrdb = sqlite3.connect(lightroom_database)
if not lightroom_database or not lrdb:
print(
"(!) We could not find a lrcat file in the given lightroom folder or "
f"DB connection failed: {args.lightroom_folder}"
)
# flag for end
error = True
else:
# set row so we can access each element by the name
lrdb.row_factory = sqlite3.Row
# set cursor
cur = lrdb.cursor()
# flag that we have Lightroom DB
use_lightroom = True
if args.debug:
print(f"### USE Lightroom {use_lightroom}")
# on error exit here
if error:
sys.exit(1)
# init the XML meta for handling
xmp = XMPMeta()
# loop through the xmp_sources (folder or files)
# and read in the XMP data for LAT/LONG, other data
for xmp_file_source in args.xmp_sources:
# if folder, open and loop
# NOTE: we do check for folders in there, if there are we recourse traverse them
# also check that folder is not in exclude list
if os.path.isdir(xmp_file_source) and xmp_file_source.rstrip(os.sep) not in [
x.rstrip(os.sep) for x in args.exclude_sources
]:
# open folder and look for any .xmp files and push them into holding array
# if there are folders, dive into them
# or glob glob all .xmp files + directory
for root, _, files in os.walk(xmp_file_source):
for file in sorted(files):
# 1) but has no .BK. inside
# 2) file is not in exclude list
# 3) full folder is not in exclude list
file_path = os.path.join(f"{root}", f"{file}")
if (
file.endswith(".xmp")
and ".BK." not in file
and file_path not in args.exclude_sources
and root.rstrip(os.sep) not in [x.rstrip(os.sep) for x in args.exclude_sources]
):
if file_path not in work_files:
work_files.append(file_path)
count["all"] += 1
else:
# not already added to list and not in the exclude list either
if xmp_file_source not in work_files and xmp_file_source not in args.exclude_sources:
work_files.append(xmp_file_source)
count["all"] += 1
if args.debug:
print(f"### Work Files {work_files}")
format_line = ""
header_line = ""
format_length = {}
header_print = None
# if we have read only we print list format style
if args.read_only:
# adjust the output width for the list view
format_length = output_list_width_adjust(args)
# after how many lines do we reprint the header
header_repeat = 50
# how many pages will we have
page_all = ceil(len(work_files) / header_repeat)
# current page number
# page_no = 1
# the formatted line for the output
# 4 {} => final replace: data (2 pre replaces)
# 1 {} => length replace here
# format_line = (
# " {{{{filename:<{}}}}} | {{{{latitude:>{}}}}} | {{{{longitude:>{}}}}} | "
# "{{{{code:<{}}}}} | {{{{country:<{}}}}} | {{{{state:<{}}}}} | {{{{city:<{}}}}} | "
# "{{{{location:<{}}}}} | {{{{path:<{}}}}}"
# ).format(
# "{filenamelen}",
# format_length['latitude'],
# format_length['longitude'],
# format_length['code'],
# "{countrylen}",
# "{statelen}",
# "{citylen}",
# "{locationlen}",
# "{pathlen}" # set path len replacer variable
# )
format_line = (
" {{{{filename:<{{filenamelen}}}}}} | "
"{{{{latitude:>"
f"{format_length['latitude']}"
"}}}} | "
"{{{{longitude:>"
f"{format_length['longitude']}"
"}}}} | "
"{{{{code:<"
f"{format_length['code']}"
"}}}} | "
"{{{{country:<{{countrylen}}}}}} | "
"{{{{state:<{{statelen}}}}}} | "
"{{{{city:<{{citylen}}}}}} | "
"{{{{location:<{{locationlen}}}}}} | "
"{{{{path:<{{pathlen}}}}}}"
)
# header line format:
# blank line
# header title
# seperator line
# header_line = (
# # f"{'> Page {page_no:,}/{page_all:,}'}"
# "{}"
# "{}"
# "{}"
# ).format(
# # can later be set to something else, eg page numbers
# '> Page {page_no:,}/{page_all:,}',
# # pre replace path length before we add the header titles
# format_line.format(
# filenamelen=format_length['filename'],
# countrylen=format_length['country'],
# statelen=format_length['state'],
# citylen=format_length['city'],
# locationlen=format_length['location'],
# pathlen=format_length['path']
# ).format( # the header title line
# filename='File'[:format_length['filename']],
# latitude='Latitude'[:format_length['latitude']],
# longitude='Longitude'[:format_length['longitude']],
# code='Code',
# country='Country'[:format_length['country']],
# state='State'[:format_length['state']],
# city='City'[:format_length['city']],
# location='Location'[:format_length['location']],
# path='Path'[:format_length['path']]
# ),
# (
# f"{'-' * (format_length['filename'] + 2)}+"
# f"{'-' * (format_length['latitude'] + 2)}+"
# f"{'-' * (format_length['longitude'] + 2)}+"
# f"{'-' * (format_length['code'] + 2)}+"
# f"{'-' * (format_length['country'] + 2)}+"
# f"{'-' * (format_length['state'] + 2)}+"
# f"{'-' * (format_length['city'] + 2)}+"
# f"{'-' * (format_length['location'] + 2)}+"
# f"{'-' * (format_length['path'] + 2)}"
# )
# )
# pre replace path length before we add the header titles
header_line_2 = format_line.format(
filenamelen=format_length["filename"],
countrylen=format_length["country"],
statelen=format_length["state"],
citylen=format_length["city"],
locationlen=format_length["location"],
pathlen=format_length["path"],
).format( # the header title line
filename="File"[: format_length["filename"]],
latitude="Latitude"[: format_length["latitude"]],
longitude="Longitude"[: format_length["longitude"]],
code="Code",
country="Country"[: format_length["country"]],
state="State"[: format_length["state"]],
city="City"[: format_length["city"]],
location="Location"[: format_length["location"]],
path="Path"[: format_length["path"]],
)
header_line_3 = (
f"{'-' * (format_length['filename'] + 2)}+"
f"{'-' * (format_length['latitude'] + 2)}+"
f"{'-' * (format_length['longitude'] + 2)}+"
f"{'-' * (format_length['code'] + 2)}+"
f"{'-' * (format_length['country'] + 2)}+"
f"{'-' * (format_length['state'] + 2)}+"
f"{'-' * (format_length['city'] + 2)}+"
f"{'-' * (format_length['location'] + 2)}+"
f"{'-' * (format_length['path'] + 2)}"
)
header_line = (
# can later be set to something else, eg page numbers
"{> Page {page_no:,}/{page_all:,}}"
# pre replace path length before we add the header titles
f"{header_line_2}"
f"{header_line_3}"
)
# header print class
header_print = ReadOnlyOutput(header_line, page_all, header_repeat)
# print header
# print_header(header_line.format(page_no=page_no, page_all=page_all))
header_print.print_header()
# print no files found if we have no files
if not work_files:
print(f"{'[!!!] No files found':<60}")
# ### MAIN WORK LOOP
# now we just loop through each file and work on them
for xmp_file in work_files: # noqa: C901
if not args.read_only:
print(f"---> {xmp_file}: ", end="")
# ### ACTION FLAGs
write_file = False
# ### XMP FILE READING
# open file & read all into buffer
with open(xmp_file, "r", encoding="UTF-8") as fptr:
strbuffer = fptr.read()
# read fields from the XMP file and store in hash
xmp.parse_from_str(strbuffer)
# for xmp_field in xmp_fields:
# # need to check if propert exist or it will the exempi routine will fail
# if xmp.does_property_exist(xmp_fields[xmp_field], xmp_field):
# data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field)
# else:
# data_set[xmp_field] = ''
# if args.debug:
# print(f"### => XMP: {xmp_fields[xmp_field]}:{xmp_field} => {data_set[xmp_field]}")
for xmp_field_key, xmp_field_value in xmp_fields.items():
# need to check if propert exist or it will the exempi routine will fail
if xmp.does_property_exist(xmp_field_value, xmp_field_key):
data_set[xmp_field_key] = xmp.get_property(xmp_field_value, xmp_field_key)
else:
data_set[xmp_field_key] = ""
if args.debug:
print(f"### => XMP: {xmp_field_value}:{xmp_field_key} => {data_set[xmp_field_key]}")
if args.read_only:
# view only if list all or if data is unset
if (
(not args.unset_only and not args.unset_gps_only)
or (args.unset_only and "" in data_set.values())
or (args.unset_gps_only and (not data_set["GPSLatitude"] or not data_set["GPSLongitude"]))
):
# for read only we print out the data formatted
# headline check, do we need to print that
# count['read'] = print_header(
# header_line.format(page_no=page_no, page_all=page_all),
# count['read'],
# header_repeat
# )
if header_print is not None:
header_print.print_header()
# the data content
print(
format_line.format(
# for all possible non latin fields we do adjust
# if it has double byte characters inside
filenamelen=format_len(
shorten_path(xmp_file, format_length["filename"], file_only=True),
format_length["filename"],
),
countrylen=format_len(
shorten_string(data_set["Country"], width=format_length["country"]),
format_length["country"],
),
statelen=format_len(
shorten_string(data_set["State"], width=format_length["state"]),
format_length["state"],
),
citylen=format_len(
shorten_string(data_set["City"], width=format_length["city"]),
format_length["city"],
),
locationlen=format_len(
shorten_string(data_set["Location"], width=format_length["location"]),
format_length["location"],
),
pathlen=format_len(
shorten_path(xmp_file, format_length["path"], path_only=True),
format_length["path"],
),
).format(
# shorten from the left
filename=shorten_path(xmp_file, format_length["filename"], file_only=True),
# cut off from the right
latitude=(
str(convert_dms_to_lat(data_set["GPSLatitude"]))[: format_length["latitude"]]
),
longitude=(
str(convert_dms_to_long(data_set["GPSLongitude"]))[: format_length["longitude"]]
),
# is only 2 chars
code=data_set["CountryCode"][:2].center(4),
# shorten from the right
country=shorten_string(data_set["Country"], width=format_length["country"]),
state=shorten_string(data_set["State"], width=format_length["state"]),
city=shorten_string(data_set["City"], width=format_length["city"]),
location=shorten_string(data_set["Location"], width=format_length["location"]),
path=shorten_path(xmp_file, format_length["path"], path_only=True),
)
)
count["listed"] += 1
else:
# ### LR Action Flag (data ok)
lightroom_data_ok = True
lrdb_row = {}
# ### LIGHTROOM DB READING
# read in data from DB if we uave lightroom folder
if use_lightroom and cur is not None:
# get the base file name, we need this for lightroom
xmp_file_basename = os.path.splitext(os.path.split(xmp_file)[1])[0]
# try to get this file name from the DB
lr_query_params = [xmp_file_basename]
# for strict check we need to get the full path
# and add / as the LR stores the last folder with /
if args.lightroom_strict:
# xmp_file_path = "{}/{}".format(os.path.split(xmp_file)[0], '/')
xmp_file_path = f"{os.path.split(xmp_file)[0]}/{'/'}"
lr_query_params.append(xmp_file_path)
cur.execute(query, lr_query_params)
# get the row data
lrdb_row = cur.fetchone()
# abort the read because we found more than one row
if cur.fetchone() is not None:
print("(!) Lightroom DB returned more than one more row")
lightroom_data_ok = False
count["many_found"] += 1
# Notify if we couldn't find one
elif not lrdb_row:
print("(!) Could not get data from Lightroom DB")
lightroom_data_ok = False
count["not_found"] += 1
if args.debug and lrdb_row:
print(f"### LightroomDB: {tuple(lrdb_row)} / {lrdb_row.keys()}")
# create a duplicate copy for later checking if something changed
data_set_original = data_set.copy()
# check if LR exists and use this to compare to XMP data
# is LR GPS and no XMP GPS => use LR and set XMP
# same for location names
# if missing in XMP but in LR -> set in XMP
# if missing in both do lookup in Maps
if use_lightroom and lightroom_data_ok:
# check lat/long separate
if lrdb_row["gpsLatitude"] and not data_set["GPSLatitude"]:
# we need to convert to the Degree,Min.sec[NSEW] format
data_set["GPSLatitude"] = convert_lat_to_dms(lrdb_row["gpsLatitude"])
if lrdb_row["gpsLongitude"] and not data_set["GPSLongitude"]:
data_set["GPSLongitude"] = convert_long_to_dms(lrdb_row["gpsLongitude"])
# now check Location, City, etc
for loc in data_set_loc:
# overwrite original set (read from XMP) with LR data
# if original data is missing
if lrdb_row[loc] and not data_set[loc]:
data_set[loc] = lrdb_row[loc]
if args.debug:
print(f"### -> LR: {loc} => {lrdb_row[loc]}")
# base set done, now check if there is anything unset in the data_set,
# if yes do a lookup in maps
# run this through the overwrite checker to get unset if we have a forced overwrite
has_unset = False
failed = False
from_cache = False
for loc in data_set_loc:
if check_overwrite(data_set[loc], loc, args.field_controls, args):
has_unset = True
if has_unset:
# check if lat/long is in cache
cache_key = f"{data_set['GPSLongitude']}#{data_set['GPSLatitude']}"
if args.debug:
print(f"### *** CACHE: {cache_key}: {'NO' if cache_key not in data_cache else 'YES'}")
# main chache check = identical
# second cache level check is on distance:
# default distance is 10m, can be set via flag
# check distance to previous cache entries (reverse newest to oldest)
# and match before we do google lookup
if cache_key not in data_cache:
has_fuzzy_cache = False
best_match_latlong = ""
if args.fuzzy_distance:
shortest_distance = args.fuzzy_distance
# check if we have fuzzy distance, if no valid found do maps lookup
for _cache_key in data_cache:
# split up cache key so we can use in the distance calc method
to_lat_long = _cache_key.split("#")
# get the distance based on current set + cached set
# print(
# f"Lookup f-long {data_set['GPSLongitude']} "
# f"f-lat {data_set['GPSLatitude']} "
# f"t-long {to_lat_long[0]} t-lat {to_lat_long[1]}"
# )
distance = get_distance(
from_longitude=data_set["GPSLongitude"],
from_latitude=data_set["GPSLatitude"],
to_longitude=to_lat_long[0],
to_latitude=to_lat_long[1],
)
if args.debug:
print(
f"### **= FUZZY CACHE: => distance: {distance} (m), "
f"shortest: {shortest_distance}"
)
if distance <= shortest_distance:
# set new distance and keep current best matching location
shortest_distance = distance
best_match_latlong = _cache_key
has_fuzzy_cache = True
if args.debug:
print(f"### ***= FUZZY CACHE: YES => Best match: {best_match_latlong}")
if not has_fuzzy_cache:
# get location from maps (google or openstreetmap)
maps_location = reverse_geolocate(
latitude=data_set["GPSLatitude"],
longitude=data_set["GPSLongitude"],
map_type=map_type,
args=args,
)
# cache data with Lat/Long
data_cache[cache_key] = maps_location
from_cache = False
else:
maps_location = data_cache[best_match_latlong]
# cache this one, because the next one will match this one too
# we don't need to loop search again for the same fuzzy location
data_cache[cache_key] = maps_location
count["cache"] += 1
count["fuzzy_cache"] += 1
from_cache = True
else:
# load location from cache
maps_location = data_cache[cache_key]
count["cache"] += 1
from_cache = True
# overwrite sets (note options check here)
if args.debug:
print(f"### Map Location ({map_type}): {maps_location}")
# must have at least the country set to write anything back
if maps_location["Country"]:
for loc in data_set_loc:
# only write to XMP if overwrite check passes
if check_overwrite(data_set_original[loc], loc, args.field_controls, args):
data_set[loc] = maps_location[loc]
xmp.set_property(xmp_fields[loc], loc, maps_location[loc])
write_file = True
if write_file:
count["map"] += 1
else:
print("(!) Could not geo loaction data ", end="")
failed = True
else:
if args.debug:
print(f"Lightroom data use: {use_lightroom}, Lightroom data ok: {lightroom_data_ok}")
# check if the data_set differs from the original (LR db load)
# if yes write, else skip
if use_lightroom and lightroom_data_ok:
# for key in data_set:
# # if not the same (to original data) and passes overwrite check
# if (
# data_set[key] != data_set_original[key] and
# check_overwrite(data_set_original[key], key, args.field_controls)
# ):
# xmp.set_property(xmp_fields[key], key, data_set[key])
# write_file = True
for key, value in data_set.items():
# if not the same (to original data) and passes overwrite check
if value != data_set_original[key] and check_overwrite(
data_set_original[key], key, args.field_controls, args
):
xmp.set_property(xmp_fields[key], key, value)
write_file = True
if write_file:
count["lightroom"] += 1
# if we have the write flag set, write data
if write_file:
if not args.test:
# use copyfile to create a backup copy
if not args.no_xmp_backup:
# check if there is another file with .BK. already there,
# if yes, get the max number and +1 it, if not set to 1
bk_file_counter = get_backup_file_counter(xmp_file, args)
# copy to new backup file
copyfile(
xmp_file,
f"{os.path.splitext(xmp_file)[0]}.BK."
f"{bk_file_counter}{os.path.splitext(xmp_file)[1]}",
)
# write back to riginal file
with open(xmp_file, "w", encoding="UTF-8") as fptr:
fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True))
else:
print(f"[TEST] Would write {data_set} {xmp_file}", end="")
if from_cache:
print("[UPDATED FROM CACHE]")
else:
print("[UPDATED]")
count["changed"] += 1
elif failed:
print("[FAILED]")
count["failed"] += 1
# log data to array for post print
failed_files.append(xmp_file)
else:
print("[SKIP]")
count["skipped"] += 1
# close DB connection
if use_lightroom and lrdb is not None:
lrdb.close()
# end stats only if we write
print(f"{'=' * 44}")
print(f"XMP Files found : {count['all']:9,}")
if args.read_only:
print(f"XMP Files listed : {count['listed']:9,}")
if not args.read_only:
print(f"Updated : {count['changed']:9,}")
print(f"Skipped : {count['skipped']:9,}")
print(f"New GeoLocation from Map : {count['map']:9,}")
print(f"GeoLocation from Cache : {count['cache']:9,}")
print(f"GeoLocation from Fuzzy Cache : {count['fuzzy_cache']:9,}")
print(f"Failed reverse GeoLocate : {count['failed']:9,}")
if use_lightroom:
print(f"GeoLocaction from Lightroom : {count['lightroom']:9,}")
print(f"No Lightroom data found : {count['not_found']:9,}")
print(f"More than one found in Lightroom : {count['many_found']:9,}")
# if we have failed data
if len(failed_files) > 0:
print(f"{'-' * 44}")
print("Files that failed to update:")
print(f"{', '.join(failed_files)}")
##############################################################
# MAIN RUN
##############################################################
main()
# __END__