#!/usr/bin/env python3 """ AUTHOR : Clemens Schwaighofer DATE : 2018/2/20 LICENSE: GPLv3 DESC : Set the reverse Geo location (name) from Lat/Long data in XMP files in a lightroom catalogue * tries to get pre-set geo location from LR catalog * if not found tries to get data from Google * all data is translated into English with long vowl system (aka ou or oo is ō) MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/) """ import configparser # import textwrap import glob import os import sys import re import argparse import sqlite3 from shutil import copyfile, get_terminal_size from math import ceil # Note XMPFiles does not work with sidecar files, need to read via XMPMeta from libxmp import XMPMeta, consts # user modules below from utils.long_lat import ( convert_dms_to_lat, convert_dms_to_long, convert_lat_to_dms, convert_long_to_dms, get_distance, ) from utils.reverse_geolocate import reverse_geolocate from utils.string_helpers import string_len_cjk, shorten_string, format_len ############################################################## # FUNCTIONS ############################################################## # this is for looking up if string is non latin letters # this is used by isLatin and onlyLatinChars cache_latin_letters = {} # ARGPARSE HELPERS class WritableDirFolder(argparse.Action): """ checks if this is a writeable folder OR file AND it works on nargs * Args: argparse (_type_): _description_ """ def __call__(self, parser, namespace, values, option_string=None): if isinstance(values, str) or values is None: print("FAIL") else: # we loop through list (this is because of nargs *) for prospective_dir in iter(values): # if valid and writeable (dir or file) if os.access(prospective_dir, os.W_OK): # init new output array out = [] # if we have a previous list in the namespace extend current list if isinstance(getattr(namespace, self.dest), list): out.extend(getattr(namespace, self.dest)) # add the new dir to it out.append(prospective_dir) # and write that list back to the self.dest in the namespace setattr(namespace, self.dest, out) else: raise argparse.ArgumentTypeError( f"writable_dir_folder: {prospective_dir} is not a writable dir" ) class ReadableDir(argparse.Action): """ custom define to check if it is a valid directory Args: argparse (_type_): _description_ """ def __call__(self, parser, namespace, values, option_string=None): prospective_dir = values if not isinstance(prospective_dir, str): raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir") else: if not os.path.isdir(prospective_dir): raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a valid path") if os.access(prospective_dir, os.R_OK): setattr(namespace, self.dest, prospective_dir) else: raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir") class DistanceValues(argparse.Action): """ check distance values are valid Args: argparse (_type_): _description_ """ def __call__(self, parser, namespace, values, option_string=None): if not isinstance(values, str): raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument") else: _distance = re.match(r"^(\d+)\s?(m|km)$", values) if _distance: # convert to int in meters values = int(_distance.group(1)) if _distance.group(2) == "km": values *= 1000 setattr(namespace, self.dest, values) else: raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument") # MAIN FUNCTIONS def check_overwrite(data, key, field_controls, args): """ checks with field control flags if given data for key should be written 1) data is not set 2) data is set or not and field_control: overwrite only set 3) data for key is not set, but only for key matches field_control 4) data for key is set or not, but only for key matches field_control and overwrite is set Args: data(str): value field key(str): xmpt key field_controls (array): array from args args (_type_): _description_ Returns: bool: true/false """ status = False # init field controls for empty if not field_controls: field_controls = [] if not data and ( len(field_controls) == 0 or ("overwrite" in field_controls and len(field_controls) == 1) ): status = True elif not data and key.lower() in field_controls: status = True elif data and "overwrite" in field_controls and len(field_controls) == 1: status = True elif data and key.lower() in field_controls and "overwrite" in field_controls: status = True if args.debug: print( f"Data set: {'YES' if data else 'NO'}, " f"Key: {key.lower()}, " f"Field Controls len: {len(field_controls)}, " f"Overwrite: {'OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE'}, " "Key in Field Controls: " f"{'KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING'}, " f"OVERWRITE: {status}" ) return status def shorten_path(path, length=30, file_only=False, path_only=False): """ shortes a path from the left so it fits into lenght if file only is set to true, it will split the file, if path only is set, only the path Args: path(str): path length (int, optional): maximum length to shorten to. Defaults to 30. file_only (bool, optional): only file. Defaults to False. path_only (bool, optional): only path. Defaults to False. Returns: string: shortend path with ... in front """ length = length - 3 # I assume the XMP file name has no CJK characters inside, so I strip out the path # The reason is that if there are CJK characters inside it will screw up the formatting if file_only: path = os.path.split(path)[1] if path_only: path = os.path.split(path)[0] if string_len_cjk(path) > length: path = f".. {path[string_len_cjk(path) - length:]}" return path # def print_header(header, lines=0, header_line=0): # """ # prints header line and header seperator line # Args: # header (str): header string # lines (int, optional): line counter. Defaults to 0. # header_line (int, optional): print header counter grigger. Defaults to 0. # Returns: # int: line counter +1 # """ # global page_no # if lines == header_line: # # add one to the pages shown and reset the lines to start new page # page_no += 1 # lines = 0 # # print header # print(f"{header}") # lines += 1 # return lines class ReadOnlyOutput: """ for read only listing """ page_no = 1 page_all = 1 lines = 0 header_print = 0 header_template = "" def __init__(self, header_template, max_pages, header_print_line): self.page_all = max_pages self.header_template = header_template self.header_print = header_print_line def print_header(self): """ prints header line and header seperator line Args: header (str): header string lines (int, optional): line counter. Defaults to 0. header_line (int, optional): print header counter grigger. Defaults to 0. Returns: int: line counter +1 """ if self.lines == self.header_print: # add one to the pages shown and reset the lines to start new page self.page_no += 1 self.lines = 0 # print header # print(f"{header}") print(self.header_template.format(page_no=self.page_no, page_all=self.page_all)) self.lines += 1 def file_sort_number(file): """ gets the BK number for sorting in the file list Args: file (str): file name Returns: int: number found in the BK string or 0 for none """ match = re.match(r".*\.BK\.(\d+)\.xmp$", file) return int(match.group(1)) if match is not None else 0 def output_list_width_adjust(args): """ adjusts the size for the format length for the list output Args: args (_type_): arguments Returns: dictionary: format_length dictionary """ # various string lengths format_length = { "filename": 35, "latitude": 18, "longitude": 18, "code": 4, "country": 15, "state": 18, "city": 20, "location": 25, "path": 40, } if args.compact_view: reduce_percent = 40 # all formats are reduced to a mininum, we cut % off for format_key in [ "filename", "latitude", "longitude", "country", "state", "city", "location", "path", ]: format_length[format_key] = ceil( format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent) ) else: # minimum resize size for a column resize_width_min = 4 # the resize percent # start with 10, then increase until we reach max resize_percent_min = 10 resize_percent_max = 50 # abort flag so we can break out of the second loop too abort = False # formay key order, in which order the elements will be resized format_key_order = [] # resize flag: 0 no, 1: make bigger, -1: make smaller # change sizes for print based on terminal size # NOTE: in screen or term this data might NOT be correct # Current size needs the in between and left/right space data current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 if current_columns < get_terminal_size().columns: resize = 1 format_key_order = ["path", "location", "state", "city", "country", "filename"] else: resize = -1 format_key_order = [ "latitude", "longitude", "path", "country", "state", "city", "location", "filename", ] # if we have no auto adjust if resize and args.no_autoadjust: # warningn if screen is too small if resize == -1: print("[!!!] Screen layout might be skewed. Increase Terminal width") resize = 0 else: for resize_percent in range(resize_percent_min, resize_percent_max, 10): for format_key in format_key_order: resize_width = (format_length[format_key] / 100) * resize_percent # if we down size, make it negative if resize == -1: resize_width *= -1 resize_width = ceil(format_length[format_key] + resize_width) # in case too small, keep old one format_length[format_key] = ( resize_width if resize_width > resize_width_min else format_length[format_key] ) # calc new width for check if we can abort current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 if (resize == 1 and current_columns >= get_terminal_size().columns) or ( resize == -1 and current_columns < get_terminal_size().columns ): # check that we are not OVER but one under width_up = get_terminal_size().columns - current_columns - 1 if (resize == 1 and width_up < 0) or (resize == -1 and width_up != 0): if format_length["path"] + width_up >= resize_width_min: format_length["path"] += width_up abort = True break if abort: break if sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > get_terminal_size().columns: print("[!!!] Screen layout might be skewed. Increase Terminal width") return format_length def get_backup_file_counter(xmp_file, args): """ get backup file counter Args: xmp_file (str): file name args (_type_): arguments Returns: int: next counter to be used for backup """ # set to 1 for if we have no backups yet bk_file_counter = 1 # get PATH from file and look for .BK. data in this folder matching, # output is sorted per BK counter key for bk_file in sorted( glob.glob( # "{path}/{file}*.xmp".format( # path=os.path.split(xmp_file)[0], # file=f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK." # ) os.path.join( f"{os.path.split(xmp_file)[0]}", f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK.*.xmp" ) ), # custom sort key to get the backup files sorted correctly key=lambda pos: file_sort_number(pos), # key=file_sort_number(), reverse=True, ): # BK.1, etc -> get the number bk_pos = file_sort_number(bk_file) if bk_pos > 0: if args.debug: print(f"#### **** File: {bk_file}, Counter: {bk_pos} -> {bk_pos + 1}") # check if found + 1 is bigger than set, if yes, set to new bk counter if bk_pos + 1 > bk_file_counter: bk_file_counter = bk_pos + 1 break # return the next correct number for backup return bk_file_counter ############################################################## # ARGUMENT PARSING ############################################################## def argument_parser(): """ Parses the command line arguments Returns: Namespace: parsed arguments """ parser = argparse.ArgumentParser( description="Reverse Geoencoding based on set Latitude/Longitude data in XMP files", # formatter_class=argparse.RawDescriptionHelpFormatter, epilog="Sample: (todo)", ) # xmp folder (or folders), or file (or files) # note that the target directory or file needs to be writeable parser.add_argument( "-i", "--include-source", required=True, nargs="*", action=WritableDirFolder, dest="xmp_sources", metavar="XMP SOURCE FOLDER", help=( "The source folder or folders with the XMP files that need reverse geo encoding " "to be set. Single XMP files can be given here" ), ) # exclude folders parser.add_argument( "-x", "--exclude-source", nargs="*", action=WritableDirFolder, dest="exclude_sources", metavar="EXCLUDE XMP SOURCE FOLDER", help="Folders and files that will be excluded.", ) # LR database (base folder) # get .lrcat file in this folder parser.add_argument( "-l", "--lightroom", # required=True, action=ReadableDir, dest="lightroom_folder", metavar="LIGHTROOM FOLDER", help="Lightroom catalogue base folder", ) # strict LR check with base path next to the file base name parser.add_argument( "-s", "--strict", dest="lightroom_strict", action="store_true", help="Do strict check for Lightroom files including Path in query", ) # set behaviour override # FLAG: default: only set not filled # other: overwrite all or overwrite if one is missing, # overwrite specifc field (as defined below) # fields: Location, City, State, Country, CountryCode parser.add_argument( "-f", "--field", action="append", type=str.lower, # make it lowercase for check choices=["overwrite", "location", "city", "state", "country", "countrycode"], dest="field_controls", metavar="", help=( "On default only set fields that are not set yet. Options are: " "Overwrite (write all new), Location, City, State, Country, CountryCode. " "Multiple can be given for combination overwrite certain fields only " "or set only certain fields. " "If with overwrite the field will be overwritten if already set, " "else it will be always skipped." ), ) parser.add_argument( "-d", "--fuzzy-cache", type=str.lower, action=DistanceValues, nargs="?", const="10m", # default is 10m dest="fuzzy_distance", metavar="FUZZY DISTANCE", help=( "Allow fuzzy distance cache lookup. Optional distance can be given, " "if not set default of 10m is used. " "Allowed argument is in the format of 12m or 12km" ), ) # Google Maps API key to overcome restrictions parser.add_argument( "-g", "--google", dest="google_api_key", metavar="GOOGLE API KEY", help="Set a Google API Maps key to overcome the default lookup limitations", ) # use open street maps parser.add_argument( "-o", "--openstreetmap", dest="use_openstreetmap", action="store_true", help="Use openstreetmap instead of Google", ) # email of open street maps requests parser.add_argument( "-e", "--email", dest="email", metavar="EMIL ADDRESS", help="An email address for OpenStreetMap" ) # write api/email settings to config file parser.add_argument( "-w", "--write-settings", dest="config_write", action="store_true", help="Write Google API or OpenStreetMap email to config file", ) # only read data and print on screen, do not write anything parser.add_argument( "-r", "--read-only", dest="read_only", action="store_true", help=( "Read current values from the XMP file only, " "do not read from LR or lookup any data and write back" ), ) # only list unset ones parser.add_argument( "-u", "--unset-only", dest="unset_only", action="store_true", help="Only list unset XMP files" ) # only list unset GPS codes parser.add_argument( "-p", "--unset-gps-only", dest="unset_gps_only", action="store_true", help="Only list unset XMP files for GPS fields", ) # don't try to do auto adjust in list view parser.add_argument( "-a", "--no-autoadjust", dest="no_autoadjust", action="store_true", help="Don't try to auto adjust columns", ) # compact view, compresses columns down to a minimum parser.add_argument( "-c", "--compact", dest="compact_view", action="store_true", help="Very compact list view" ) # Do not create backup files parser.add_argument( "-n", "--nobackup", dest="no_xmp_backup", action="store_true", help="Do not create a backup from the XMP file", ) # verbose args for more detailed output parser.add_argument("-v", "--verbose", action="count", dest="verbose", help="Set verbose output level") # debug flag parser.add_argument("--debug", action="store_true", dest="debug", help="Set detailed debug output") # test flag parser.add_argument("--test", action="store_true", dest="test", help="Do not write data back to file") # read in the argumens return parser.parse_args() ############################################################## # MAIN CODE ############################################################## def main(): """ Main code run """ args = argument_parser() # init verbose to 0 if not set if not args.verbose: args.verbose = 0 # init exclude source to list if not set if not args.exclude_sources: args.exclude_sources = [] # init args unset (for list view) with 0 if unset if not args.unset_only: args.unset_only = 0 if args.debug: print( "### ARGUMENT VARS: " f"I: {args.xmp_sources}, X: {args.exclude_sources}, L: {args.lightroom_folder}, " f"F: {args.field_controls}, D: {args.fuzzy_distance}, M: {args.use_openstreetmap}, " f"G: {args.google_api_key}, E: {args.email}, R: {args.read_only}, " f"U: {args.unset_only}, A: {args.no_autoadjust}, C: {args.compact_view}, " f"N: {args.no_xmp_backup}, W: {args.config_write}, V: {args.verbose}, " f"D: {args.debug}, T: {args.test}" ) # error flag error = False # set search map type map_type = "google" if not args.use_openstreetmap else "openstreetmap" # if -g and -o, error if args.google_api_key and args.use_openstreetmap: print("You cannot set a Google API key and use OpenStreetMap at the same time") error = True # or if -g and -e if args.google_api_key and args.email: print("You cannot set a Google API key and OpenStreetMap email at the same time") error = True # or -e and no -o if args.email and not args.use_openstreetmap: print("You cannot set an OpenStreetMap email and not use OpenStreetMap") error = True # if email and not basic valid email (@ .) if args.email: if not re.match(r"^.+@.+\.[A-Za-z]{1,}$", args.email): print(f"Not a valid email for OpenStreetMap: {args.email}") error = True # on error exit here if error: sys.exit(1) config = configparser.ConfigParser() # try to find config file in following order # $HOME/.config/ config_file = "reverse_geolocate.cfg" config_folder = os.path.expanduser("~/.config/reverseGeolocate/") config_data = os.path.join(f"{config_folder}", f"{config_file}") # if file exists read, if not skip unless we have write flag and # google api or openstreetmaps email if os.path.isfile(config_data): config.read(config_data) # check if api group & setting is there. also never overwrite argument given data if "API" in config: if "googleapikey" in config["API"]: if not args.google_api_key: args.google_api_key = config["API"]["googleapikey"] if "openstreetmapemail" in config["API"]: if not args.email: args.email = config["API"]["openstreetmapemail"] # write data if exists and changed if args.config_write and (args.google_api_key or args.email): config_change = False # check if new value differs, if yes, change and write if "API" not in config: config["API"] = {} if args.google_api_key and ( "googleapikey" not in config["API"] or config["API"]["googleapikey"] != args.google_api_key ): config["API"]["googleapikey"] = args.google_api_key config_change = True if args.email and ( "openstreetmapemail" not in config["API"] or config["API"]["openstreetmapemail"] != args.email ): config["API"]["openstreetmapemail"] = args.email config_change = True if config_change: # if we do not have the base folder create that first if not os.path.exists(config_folder): os.makedirs(config_folder) with open(config_data, "w", encoding="UTF-8") as fptr: config.write(fptr) if args.debug: print(f"### OVERRIDE API: G: {args.google_api_key}, O: {args.email}") # The XMP fields const lookup values # XML/XMP # READ: # exif:GPSLatitude # exif:GPSLongitude # READ for if filled # Iptc4xmpCore:Location # photoshop:City # photoshop:State # photoshop:Country # Iptc4xmpCore:CountryCode xmp_fields = { # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format "GPSLatitude": consts.XMP_NS_EXIF, "GPSLongitude": consts.XMP_NS_EXIF, "Location": consts.XMP_NS_IPTCCore, "City": consts.XMP_NS_Photoshop, "State": consts.XMP_NS_Photoshop, "Country": consts.XMP_NS_Photoshop, "CountryCode": consts.XMP_NS_IPTCCore, } # non lat/long fields (for loc loops) data_set_loc = ("Location", "City", "State", "Country", "CountryCode") # one xmp data set data_set = { "GPSLatitude": "", "GPSLongitude": "", "Location": "", "City": "", "State": "", "Country": "", "CountryCode": "", } # original set for compare (is constant unchanged) data_set_original = {} # cache set to avoid double lookups for identical Lat/Ling data_cache = {} # work files, all files + folders we need to work on work_files = [] # all failed files failed_files = [] # use lightroom use_lightroom = False # path to lightroom database lightroom_database = "" # cursors & query query = "" cur = None lrdb = None # count variables count = { "all": 0, "listed": 0, "read": 0, "map": 0, "cache": 0, "fuzzy_cache": 0, "lightroom": 0, "changed": 0, "failed": 0, "skipped": 0, "not_found": 0, "many_found": 0, } # do lightroom stuff only if we have the lightroom folder if args.lightroom_folder: # query string for lightroom DB check query = ( "SELECT Adobe_images.id_local, AgLibraryFile.baseName, " "AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, " "AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, " "AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, " "AgHarvestedIptcMetadata.locationDataOrigination, " "AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, " "AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, " "AgInternedIptcIsoCountryCode.value as CountryCode " "FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, " "AgLibraryRootFolder, Adobe_images " "LEFT JOIN AgHarvestedIptcMetadata " "ON Adobe_images.id_local = AgHarvestedIptcMetadata.image " "LEFT JOIN AgInternedIptcLocation " "ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local " "LEFT JOIN AgInternedIptcCity " "ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local " "LEFT JOIN AgInternedIptcState " "ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local " "LEFT JOIN AgInternedIptcCountry " "ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local " "LEFT JOIN AgInternedIptcIsoCountryCode " "ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local " "WHERE Adobe_images.rootFile = AgLibraryFile.id_local " "AND Adobe_images.id_local = AgHarvestedExifMetadata.image " "AND AgLibraryFile.folder = AgLibraryFolder.id_local " "AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local " "AND AgLibraryFile.baseName = ?" ) # absolutePath + pathFromRoot = path of XMP file - XMP file if args.lightroom_strict: query += "AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?" # connect to LR database for reading # open the folder and look for the first lrcat file in there for file in os.listdir(args.lightroom_folder): if file.endswith(".lrcat"): lightroom_database = os.path.join(args.lightroom_folder, file) lrdb = sqlite3.connect(lightroom_database) if not lightroom_database or not lrdb: print( "(!) We could not find a lrcat file in the given lightroom folder or " f"DB connection failed: {args.lightroom_folder}" ) # flag for end error = True else: # set row so we can access each element by the name lrdb.row_factory = sqlite3.Row # set cursor cur = lrdb.cursor() # flag that we have Lightroom DB use_lightroom = True if args.debug: print(f"### USE Lightroom {use_lightroom}") # on error exit here if error: sys.exit(1) # init the XML meta for handling xmp = XMPMeta() # loop through the xmp_sources (folder or files) # and read in the XMP data for LAT/LONG, other data for xmp_file_source in args.xmp_sources: # if folder, open and loop # NOTE: we do check for folders in there, if there are we recourse traverse them # also check that folder is not in exclude list if os.path.isdir(xmp_file_source) and xmp_file_source.rstrip(os.sep) not in [ x.rstrip(os.sep) for x in args.exclude_sources ]: # open folder and look for any .xmp files and push them into holding array # if there are folders, dive into them # or glob glob all .xmp files + directory for root, _, files in os.walk(xmp_file_source): for file in sorted(files): # 1) but has no .BK. inside # 2) file is not in exclude list # 3) full folder is not in exclude list file_path = os.path.join(f"{root}", f"{file}") if ( file.endswith(".xmp") and ".BK." not in file and file_path not in args.exclude_sources and root.rstrip(os.sep) not in [x.rstrip(os.sep) for x in args.exclude_sources] ): if file_path not in work_files: work_files.append(file_path) count["all"] += 1 else: # not already added to list and not in the exclude list either if xmp_file_source not in work_files and xmp_file_source not in args.exclude_sources: work_files.append(xmp_file_source) count["all"] += 1 if args.debug: print(f"### Work Files {work_files}") format_line = "" header_line = "" format_length = {} header_print = None # if we have read only we print list format style if args.read_only: # adjust the output width for the list view format_length = output_list_width_adjust(args) # after how many lines do we reprint the header header_repeat = 50 # how many pages will we have page_all = ceil(len(work_files) / header_repeat) # current page number # page_no = 1 # the formatted line for the output # 4 {} => final replace: data (2 pre replaces) # 1 {} => length replace here # format_line = ( # " {{{{filename:<{}}}}} | {{{{latitude:>{}}}}} | {{{{longitude:>{}}}}} | " # "{{{{code:<{}}}}} | {{{{country:<{}}}}} | {{{{state:<{}}}}} | {{{{city:<{}}}}} | " # "{{{{location:<{}}}}} | {{{{path:<{}}}}}" # ).format( # "{filenamelen}", # format_length['latitude'], # format_length['longitude'], # format_length['code'], # "{countrylen}", # "{statelen}", # "{citylen}", # "{locationlen}", # "{pathlen}" # set path len replacer variable # ) format_line = ( " {{{{filename:<{{filenamelen}}}}}} | " "{{{{latitude:>" f"{format_length['latitude']}" "}}}} | " "{{{{longitude:>" f"{format_length['longitude']}" "}}}} | " "{{{{code:<" f"{format_length['code']}" "}}}} | " "{{{{country:<{{countrylen}}}}}} | " "{{{{state:<{{statelen}}}}}} | " "{{{{city:<{{citylen}}}}}} | " "{{{{location:<{{locationlen}}}}}} | " "{{{{path:<{{pathlen}}}}}}" ) # header line format: # blank line # header title # seperator line # header_line = ( # # f"{'> Page {page_no:,}/{page_all:,}'}" # "{}" # "{}" # "{}" # ).format( # # can later be set to something else, eg page numbers # '> Page {page_no:,}/{page_all:,}', # # pre replace path length before we add the header titles # format_line.format( # filenamelen=format_length['filename'], # countrylen=format_length['country'], # statelen=format_length['state'], # citylen=format_length['city'], # locationlen=format_length['location'], # pathlen=format_length['path'] # ).format( # the header title line # filename='File'[:format_length['filename']], # latitude='Latitude'[:format_length['latitude']], # longitude='Longitude'[:format_length['longitude']], # code='Code', # country='Country'[:format_length['country']], # state='State'[:format_length['state']], # city='City'[:format_length['city']], # location='Location'[:format_length['location']], # path='Path'[:format_length['path']] # ), # ( # f"{'-' * (format_length['filename'] + 2)}+" # f"{'-' * (format_length['latitude'] + 2)}+" # f"{'-' * (format_length['longitude'] + 2)}+" # f"{'-' * (format_length['code'] + 2)}+" # f"{'-' * (format_length['country'] + 2)}+" # f"{'-' * (format_length['state'] + 2)}+" # f"{'-' * (format_length['city'] + 2)}+" # f"{'-' * (format_length['location'] + 2)}+" # f"{'-' * (format_length['path'] + 2)}" # ) # ) # pre replace path length before we add the header titles header_line_2 = format_line.format( filenamelen=format_length["filename"], countrylen=format_length["country"], statelen=format_length["state"], citylen=format_length["city"], locationlen=format_length["location"], pathlen=format_length["path"], ).format( # the header title line filename="File"[: format_length["filename"]], latitude="Latitude"[: format_length["latitude"]], longitude="Longitude"[: format_length["longitude"]], code="Code", country="Country"[: format_length["country"]], state="State"[: format_length["state"]], city="City"[: format_length["city"]], location="Location"[: format_length["location"]], path="Path"[: format_length["path"]], ) header_line_3 = ( f"{'-' * (format_length['filename'] + 2)}+" f"{'-' * (format_length['latitude'] + 2)}+" f"{'-' * (format_length['longitude'] + 2)}+" f"{'-' * (format_length['code'] + 2)}+" f"{'-' * (format_length['country'] + 2)}+" f"{'-' * (format_length['state'] + 2)}+" f"{'-' * (format_length['city'] + 2)}+" f"{'-' * (format_length['location'] + 2)}+" f"{'-' * (format_length['path'] + 2)}" ) header_line = ( # can later be set to something else, eg page numbers "{> Page {page_no:,}/{page_all:,}}" # pre replace path length before we add the header titles f"{header_line_2}" f"{header_line_3}" ) # header print class header_print = ReadOnlyOutput(header_line, page_all, header_repeat) # print header # print_header(header_line.format(page_no=page_no, page_all=page_all)) header_print.print_header() # print no files found if we have no files if not work_files: print(f"{'[!!!] No files found':<60}") # ### MAIN WORK LOOP # now we just loop through each file and work on them for xmp_file in work_files: # noqa: C901 if not args.read_only: print(f"---> {xmp_file}: ", end="") # ### ACTION FLAGs write_file = False # ### XMP FILE READING # open file & read all into buffer with open(xmp_file, "r", encoding="UTF-8") as fptr: strbuffer = fptr.read() # read fields from the XMP file and store in hash xmp.parse_from_str(strbuffer) # for xmp_field in xmp_fields: # # need to check if propert exist or it will the exempi routine will fail # if xmp.does_property_exist(xmp_fields[xmp_field], xmp_field): # data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field) # else: # data_set[xmp_field] = '' # if args.debug: # print(f"### => XMP: {xmp_fields[xmp_field]}:{xmp_field} => {data_set[xmp_field]}") for xmp_field_key, xmp_field_value in xmp_fields.items(): # need to check if propert exist or it will the exempi routine will fail if xmp.does_property_exist(xmp_field_value, xmp_field_key): data_set[xmp_field_key] = xmp.get_property(xmp_field_value, xmp_field_key) else: data_set[xmp_field_key] = "" if args.debug: print(f"### => XMP: {xmp_field_value}:{xmp_field_key} => {data_set[xmp_field_key]}") if args.read_only: # view only if list all or if data is unset if ( (not args.unset_only and not args.unset_gps_only) or (args.unset_only and "" in data_set.values()) or (args.unset_gps_only and (not data_set["GPSLatitude"] or not data_set["GPSLongitude"])) ): # for read only we print out the data formatted # headline check, do we need to print that # count['read'] = print_header( # header_line.format(page_no=page_no, page_all=page_all), # count['read'], # header_repeat # ) if header_print is not None: header_print.print_header() # the data content print( format_line.format( # for all possible non latin fields we do adjust # if it has double byte characters inside filenamelen=format_len( shorten_path(xmp_file, format_length["filename"], file_only=True), format_length["filename"], ), countrylen=format_len( shorten_string(data_set["Country"], width=format_length["country"]), format_length["country"], ), statelen=format_len( shorten_string(data_set["State"], width=format_length["state"]), format_length["state"], ), citylen=format_len( shorten_string(data_set["City"], width=format_length["city"]), format_length["city"], ), locationlen=format_len( shorten_string(data_set["Location"], width=format_length["location"]), format_length["location"], ), pathlen=format_len( shorten_path(xmp_file, format_length["path"], path_only=True), format_length["path"], ), ).format( # shorten from the left filename=shorten_path(xmp_file, format_length["filename"], file_only=True), # cut off from the right latitude=( str(convert_dms_to_lat(data_set["GPSLatitude"]))[: format_length["latitude"]] ), longitude=( str(convert_dms_to_long(data_set["GPSLongitude"]))[: format_length["longitude"]] ), # is only 2 chars code=data_set["CountryCode"][:2].center(4), # shorten from the right country=shorten_string(data_set["Country"], width=format_length["country"]), state=shorten_string(data_set["State"], width=format_length["state"]), city=shorten_string(data_set["City"], width=format_length["city"]), location=shorten_string(data_set["Location"], width=format_length["location"]), path=shorten_path(xmp_file, format_length["path"], path_only=True), ) ) count["listed"] += 1 else: # ### LR Action Flag (data ok) lightroom_data_ok = True lrdb_row = {} # ### LIGHTROOM DB READING # read in data from DB if we uave lightroom folder if use_lightroom and cur is not None: # get the base file name, we need this for lightroom xmp_file_basename = os.path.splitext(os.path.split(xmp_file)[1])[0] # try to get this file name from the DB lr_query_params = [xmp_file_basename] # for strict check we need to get the full path # and add / as the LR stores the last folder with / if args.lightroom_strict: # xmp_file_path = "{}/{}".format(os.path.split(xmp_file)[0], '/') xmp_file_path = f"{os.path.split(xmp_file)[0]}/{'/'}" lr_query_params.append(xmp_file_path) cur.execute(query, lr_query_params) # get the row data lrdb_row = cur.fetchone() # abort the read because we found more than one row if cur.fetchone() is not None: print("(!) Lightroom DB returned more than one more row") lightroom_data_ok = False count["many_found"] += 1 # Notify if we couldn't find one elif not lrdb_row: print("(!) Could not get data from Lightroom DB") lightroom_data_ok = False count["not_found"] += 1 if args.debug and lrdb_row: print(f"### LightroomDB: {tuple(lrdb_row)} / {lrdb_row.keys()}") # create a duplicate copy for later checking if something changed data_set_original = data_set.copy() # check if LR exists and use this to compare to XMP data # is LR GPS and no XMP GPS => use LR and set XMP # same for location names # if missing in XMP but in LR -> set in XMP # if missing in both do lookup in Maps if use_lightroom and lightroom_data_ok: # check lat/long separate if lrdb_row["gpsLatitude"] and not data_set["GPSLatitude"]: # we need to convert to the Degree,Min.sec[NSEW] format data_set["GPSLatitude"] = convert_lat_to_dms(lrdb_row["gpsLatitude"]) if lrdb_row["gpsLongitude"] and not data_set["GPSLongitude"]: data_set["GPSLongitude"] = convert_long_to_dms(lrdb_row["gpsLongitude"]) # now check Location, City, etc for loc in data_set_loc: # overwrite original set (read from XMP) with LR data # if original data is missing if lrdb_row[loc] and not data_set[loc]: data_set[loc] = lrdb_row[loc] if args.debug: print(f"### -> LR: {loc} => {lrdb_row[loc]}") # base set done, now check if there is anything unset in the data_set, # if yes do a lookup in maps # run this through the overwrite checker to get unset if we have a forced overwrite has_unset = False failed = False from_cache = False for loc in data_set_loc: if check_overwrite(data_set[loc], loc, args.field_controls, args): has_unset = True if has_unset: # check if lat/long is in cache cache_key = f"{data_set['GPSLongitude']}#{data_set['GPSLatitude']}" if args.debug: print(f"### *** CACHE: {cache_key}: {'NO' if cache_key not in data_cache else 'YES'}") # main chache check = identical # second cache level check is on distance: # default distance is 10m, can be set via flag # check distance to previous cache entries (reverse newest to oldest) # and match before we do google lookup if cache_key not in data_cache: has_fuzzy_cache = False best_match_latlong = "" if args.fuzzy_distance: shortest_distance = args.fuzzy_distance # check if we have fuzzy distance, if no valid found do maps lookup for _cache_key in data_cache: # split up cache key so we can use in the distance calc method to_lat_long = _cache_key.split("#") # get the distance based on current set + cached set # print( # f"Lookup f-long {data_set['GPSLongitude']} " # f"f-lat {data_set['GPSLatitude']} " # f"t-long {to_lat_long[0]} t-lat {to_lat_long[1]}" # ) distance = get_distance( from_longitude=data_set["GPSLongitude"], from_latitude=data_set["GPSLatitude"], to_longitude=to_lat_long[0], to_latitude=to_lat_long[1], ) if args.debug: print( f"### **= FUZZY CACHE: => distance: {distance} (m), " f"shortest: {shortest_distance}" ) if distance <= shortest_distance: # set new distance and keep current best matching location shortest_distance = distance best_match_latlong = _cache_key has_fuzzy_cache = True if args.debug: print(f"### ***= FUZZY CACHE: YES => Best match: {best_match_latlong}") if not has_fuzzy_cache: # get location from maps (google or openstreetmap) maps_location = reverse_geolocate( latitude=data_set["GPSLatitude"], longitude=data_set["GPSLongitude"], map_type=map_type, args=args, ) # cache data with Lat/Long data_cache[cache_key] = maps_location from_cache = False else: maps_location = data_cache[best_match_latlong] # cache this one, because the next one will match this one too # we don't need to loop search again for the same fuzzy location data_cache[cache_key] = maps_location count["cache"] += 1 count["fuzzy_cache"] += 1 from_cache = True else: # load location from cache maps_location = data_cache[cache_key] count["cache"] += 1 from_cache = True # overwrite sets (note options check here) if args.debug: print(f"### Map Location ({map_type}): {maps_location}") # must have at least the country set to write anything back if maps_location["Country"]: for loc in data_set_loc: # only write to XMP if overwrite check passes if check_overwrite(data_set_original[loc], loc, args.field_controls, args): data_set[loc] = maps_location[loc] xmp.set_property(xmp_fields[loc], loc, maps_location[loc]) write_file = True if write_file: count["map"] += 1 else: print("(!) Could not geo loaction data ", end="") failed = True else: if args.debug: print(f"Lightroom data use: {use_lightroom}, Lightroom data ok: {lightroom_data_ok}") # check if the data_set differs from the original (LR db load) # if yes write, else skip if use_lightroom and lightroom_data_ok: # for key in data_set: # # if not the same (to original data) and passes overwrite check # if ( # data_set[key] != data_set_original[key] and # check_overwrite(data_set_original[key], key, args.field_controls) # ): # xmp.set_property(xmp_fields[key], key, data_set[key]) # write_file = True for key, value in data_set.items(): # if not the same (to original data) and passes overwrite check if value != data_set_original[key] and check_overwrite( data_set_original[key], key, args.field_controls, args ): xmp.set_property(xmp_fields[key], key, value) write_file = True if write_file: count["lightroom"] += 1 # if we have the write flag set, write data if write_file: if not args.test: # use copyfile to create a backup copy if not args.no_xmp_backup: # check if there is another file with .BK. already there, # if yes, get the max number and +1 it, if not set to 1 bk_file_counter = get_backup_file_counter(xmp_file, args) # copy to new backup file copyfile( xmp_file, f"{os.path.splitext(xmp_file)[0]}.BK." f"{bk_file_counter}{os.path.splitext(xmp_file)[1]}", ) # write back to riginal file with open(xmp_file, "w", encoding="UTF-8") as fptr: fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True)) else: print(f"[TEST] Would write {data_set} {xmp_file}", end="") if from_cache: print("[UPDATED FROM CACHE]") else: print("[UPDATED]") count["changed"] += 1 elif failed: print("[FAILED]") count["failed"] += 1 # log data to array for post print failed_files.append(xmp_file) else: print("[SKIP]") count["skipped"] += 1 # close DB connection if use_lightroom and lrdb is not None: lrdb.close() # end stats only if we write print(f"{'=' * 44}") print(f"XMP Files found : {count['all']:9,}") if args.read_only: print(f"XMP Files listed : {count['listed']:9,}") if not args.read_only: print(f"Updated : {count['changed']:9,}") print(f"Skipped : {count['skipped']:9,}") print(f"New GeoLocation from Map : {count['map']:9,}") print(f"GeoLocation from Cache : {count['cache']:9,}") print(f"GeoLocation from Fuzzy Cache : {count['fuzzy_cache']:9,}") print(f"Failed reverse GeoLocate : {count['failed']:9,}") if use_lightroom: print(f"GeoLocaction from Lightroom : {count['lightroom']:9,}") print(f"No Lightroom data found : {count['not_found']:9,}") print(f"More than one found in Lightroom : {count['many_found']:9,}") # if we have failed data if len(failed_files) > 0: print(f"{'-' * 44}") print("Files that failed to update:") print(f"{', '.join(failed_files)}") ############################################################## # MAIN RUN ############################################################## main() # __END__