diff --git a/bin/reverse_geolocate.py b/bin/reverse_geolocate.py index 4447c94..475ac94 100755 --- a/bin/reverse_geolocate.py +++ b/bin/reverse_geolocate.py @@ -1,15 +1,17 @@ #!/usr/bin/env python3 -# AUTHOR : Clemens Schwaighofer -# DATE : 2018/2/20 -# LICENSE: GPLv3 -# DESC : -# Set the reverse Geo location (name) from Lat/Long data in XMP files -# in a lightroom catalogue -# * tries to get pre-set geo location from LR catalog -# * if not found tries to get data from Google -# * all data is translated into English with long vowl system (aka ou or oo is ō) -# MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/) +""" +AUTHOR : Clemens Schwaighofer +DATE : 2018/2/20 +LICENSE: GPLv3 +DESC : +Set the reverse Geo location (name) from Lat/Long data in XMP files +in a lightroom catalogue + * tries to get pre-set geo location from LR catalog + * if not found tries to get data from Google + * all data is translated into English with long vowl system (aka ou or oo is ō) +MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/) +""" import configparser import unicodedata @@ -45,68 +47,104 @@ class WritableDirFolder(argparse.Action): argparse (_type_): _description_ """ def __call__(self, parser, namespace, values, option_string=None): - # we loop through list (this is because of nargs *) - for prospective_dir in values: - # if valid and writeable (dir or file) - if os.access(prospective_dir, os.W_OK): - # init new output array - out = [] - # if we have a previous list in the namespace extend current list - if type(getattr(namespace, self.dest)) is list: - out.extend(getattr(namespace, self.dest)) - # add the new dir to it - out.append(prospective_dir) - # and write that list back to the self.dest in the namespace - setattr(namespace, self.dest, out) - else: - raise argparse.ArgumentTypeError("writable_dir_folder: {0} is not a writable dir".format(prospective_dir)) + if isinstance(values, str) or values is None: + print("FAIL") + else: + # we loop through list (this is because of nargs *) + for prospective_dir in iter(values): + # if valid and writeable (dir or file) + if os.access(prospective_dir, os.W_OK): + # init new output array + out = [] + # if we have a previous list in the namespace extend current list + if isinstance(getattr(namespace, self.dest), list): + out.extend(getattr(namespace, self.dest)) + # add the new dir to it + out.append(prospective_dir) + # and write that list back to the self.dest in the namespace + setattr(namespace, self.dest, out) + else: + raise argparse.ArgumentTypeError( + f"writable_dir_folder: {prospective_dir} is not a writable dir" + ) +class ReadableDir(argparse.Action): + """ + custom define to check if it is a valid directory -# call: readable_dir -# custom define to check if it is a valid directory -class readable_dir(argparse.Action): + Args: + argparse (_type_): _description_ + """ def __call__(self, parser, namespace, values, option_string=None): prospective_dir = values - if not os.path.isdir(prospective_dir): - raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir)) - if os.access(prospective_dir, os.R_OK): - setattr(namespace, self.dest, prospective_dir) + if not isinstance(prospective_dir, str): + raise argparse.ArgumentTypeError( + f"readable_dir:{prospective_dir} is not a readable dir" + ) else: - raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir)) + if not os.path.isdir(prospective_dir): + raise argparse.ArgumentTypeError( + f"readable_dir:{prospective_dir} is not a valid path" + ) + if os.access(prospective_dir, os.R_OK): + setattr(namespace, self.dest, prospective_dir) + else: + raise argparse.ArgumentTypeError( + f"readable_dir:{prospective_dir} is not a readable dir" + ) +class DistanceValues(argparse.Action): + """ + check distance values are valid -# check distance values are valid -class distance_values(argparse.Action): + Args: + argparse (_type_): _description_ + """ def __call__(self, parser, namespace, values, option_string=None): - m = re.match(r'^(\d+)\s?(m|km)$', values) - if m: - # convert to int in meters - values = int(m.group(1)) - if m.group(2) == 'km': - values *= 1000 - setattr(namespace, self.dest, values) + if not isinstance(values, str): + raise argparse.ArgumentTypeError( + f"distance_values:{values} is not a valid argument" + ) else: - raise argparse.ArgumentTypeError("distance_values:{0} is not a valid argument".format(values)) + _distance = re.match(r'^(\d+)\s?(m|km)$', values) + if _distance: + # convert to int in meters + values = int(_distance.group(1)) + if _distance.group(2) == 'km': + values *= 1000 + setattr(namespace, self.dest, values) + else: + raise argparse.ArgumentTypeError( + f"distance_values:{values} is not a valid argument" + ) # MAIN FUNCTIONS -# METHOD: reverseGeolocate -# PARAMS: latitude, longitude, map search target (google or openstreetmap) -# RETURN: dict with all data (see below) -# DESC : wrapper to call to either the google or openstreetmap -def reverseGeolocate(longitude, latitude, map_type): +def reverse_geolocate(longitude, latitude, map_type, args): + """ + wrapper to call to either the google or openstreetmap + + Args: + longitude (float): latitude + latitude (float): longitue + map_type(str): map search target (google or openstreetmap) + args (_type_): _description_ + + Returns: + _type_: dict with all data (see below) + """ # clean up long/lat # they are stored with N/S/E/W if they come from an XMP # format: Deg,Min.Sec[NSEW] # NOTE: lat is N/S, long is E/W # detect and convert - lat_long = longLatReg(longitude=longitude, latitude=latitude) + lat_long = long_lat_reg(longitude=longitude, latitude=latitude) # which service to use if map_type == 'google': - return reverseGeolocateGoogle(lat_long['longitude'], lat_long['latitude']) + return reverse_geolocate_google(lat_long['longitude'], lat_long['latitude'], args) elif map_type == 'openstreetmap': - return reverseGeolocateOpenStreetMap(lat_long['longitude'], lat_long['latitude']) + return reverse_geolocate_open_street_map(lat_long['longitude'], lat_long['latitude'], args) else: return { 'Country': '', @@ -114,13 +152,18 @@ def reverseGeolocate(longitude, latitude, map_type): 'error': 'Map type not valid' } +def reverse_geolocate_init(longitude, latitude): + """ + inits the dictionary for return, and checks the lat/long on valid + returns geolocation dict with status = 'ERROR' if an error occurded -# METHOD: reverseGeolocateInit -# PARAMS: longitude, latitude -# RETURN: empty geolocation dictionary, or error flag if lat/long is not valid -# DESC : inits the dictionary for return, and checks the lat/long on valid -# returns geolocation dict with status = 'ERROR' if an error occurded -def reverseGeolocateInit(longitude, latitude): + Args: + longitude (float): longitude + latitude (float): latitude + + Returns: + _type_: empty geolocation dictionary, or error flag if lat/long is not valid + """ # basic dict format geolocation = { 'CountryCode': '', @@ -136,19 +179,28 @@ def reverseGeolocateInit(longitude, latitude): latlong_re = re.compile(r'^\d+\.\d+$') if not latlong_re.match(str(longitude)) or not latlong_re.match(str(latitude)): geolocation['status'] = 'ERROR' - geolocation['error_message'] = 'Latitude {} or Longitude {} are not valid'.format(latitude, longitude) + geolocation['error_message'] = f"Latitude {latitude} or Longitude {longitude} are not valid" return geolocation +def reverse_geolocate_open_street_map(longitude, latitude, args): + """ + OpenStreetMap reverse lookcation lookup -# METHOD: reverseGeolocateOpenStreetMap -# PARAMS: latitude, longitude -# RETURN: OpenStreetMap reverse lookcation lookup -# dict with locaiton, city, state, country, country code -# if not fillable, entry is empty -# SAMPLE: https://nominatim.openstreetmap.org/reverse.php?format=jsonv2&lat=&lon=&zoom=21&accept-languge=en-US,en& -def reverseGeolocateOpenStreetMap(longitude, latitude): + sample: + https://nominatim.openstreetmap.org/reverse.php?format=jsonv2& + at=&lon=&zoom=21&accept-languge=en-US,en& + + Args: + longitude (float): longitude + latitude (float): latitude + args (_type_): _description_ + + Returns: + dictionary: dict with locaiton, city, state, country, country code + if not fillable, entry is empty + """ # init - geolocation = reverseGeolocateInit(longitude, latitude) + geolocation = reverse_geolocate_init(longitude, latitude) if geolocation['status'] == 'ERROR': return geolocation # query format @@ -167,13 +219,15 @@ def reverseGeolocateOpenStreetMap(longitude, latitude): # if we have an email, add it here if args.email: payload['email'] = args.email - url = "{base}".format(base=base) - response = requests.get(url, params=payload) + url = f"{base}" + # timeout in seconds + timeout = 60 + response = requests.get(url, params=payload, timeout=timeout) # debug output if args.debug: - print("OpenStreetMap search for Lat: {}, Long: {}".format(latitude, longitude)) + print(f"OpenStreetMap search for Lat: {latitude}, Long: {longitude}") if args.debug and args.verbose >= 1: - print("OpenStreetMap response: {} => JSON: {}".format(response, response.json())) + print(f"OpenStreetMap response: {response} => JSON: {response.json()}") # type map # Country to Location and for each in order of priority type_map = { @@ -188,27 +242,40 @@ def reverseGeolocateOpenStreetMap(longitude, latitude): # get address block addr = response.json()['address'] # loop for locations - for loc_index in type_map: - for index in type_map[loc_index]: + for loc_index, sub_index in type_map.items(): + for index in sub_index: if index in addr and not geolocation[loc_index]: geolocation[loc_index] = addr[index] + # for loc_index in type_map: + # for index in type_map[loc_index]: + # if index in addr and not geolocation[loc_index]: + # geolocation[loc_index] = addr[index] else: geolocation['status'] = 'ERROR' geolocation['error_message'] = response.json()['error'] - print("Error in request: {}".format(geolocation['error'])) + print(f"Error in request: {geolocation['error']}") # return return geolocation +def reverse_geolocate_google(longitude, latitude, args): + """ + Google Maps reverse location lookup -# METHOD: reverseGeolocateGoogle -# PARAMS: latitude, longitude -# RETURN: Google Maps reverse location lookup -# dict with location, city, state, country, country code -# if not fillable, entry is empty -# SAMPLE: http://maps.googleapis.com/maps/api/geocode/json?latlng=,&language=&sensor=false&key= -def reverseGeolocateGoogle(longitude, latitude): # noqa: C901 + sample: + http://maps.googleapis.com/maps/api/geocode/json?latlng=,&language= + &sensor=false&key= + + Args: + longitude (float): longitude + latitude (float): latitude + args (_type_): _description_ + + Returns: + dictionary: dict with location, city, state, country, country code + if not fillable, entry is empty + """ # init - geolocation = reverseGeolocateInit(longitude, latitude) + geolocation = reverse_geolocate_init(longitude, latitude) temp_geolocation = geolocation.copy() if geolocation['status'] == 'ERROR': return geolocation @@ -222,7 +289,7 @@ def reverseGeolocateGoogle(longitude, latitude): # noqa: C901 base = "maps.googleapis.com/maps/api/geocode/json?" # build the base params payload = { - 'latlng': '{lat},{lon}'.format(lon=longitude, lat=latitude), + 'latlng': f"{latitude},{longitude}", 'language': language, 'sensor': sensor } @@ -230,13 +297,15 @@ def reverseGeolocateGoogle(longitude, latitude): # noqa: C901 if args.google_api_key: payload['key'] = args.google_api_key # build the full url and send it to google - url = "{protocol}{base}".format(protocol=protocol, base=base) - response = requests.get(url, params=payload) + url = f"{protocol}{base}" + # timeout in seconds + timeout = 60 + response = requests.get(url, params=payload, timeout=timeout) # debug output if args.debug: - print("Google search for Lat: {}, Long: {} with {}".format(longitude, latitude, response.url)) + print(f"Google search for Lat: {latitude}, Long: {longitude} with {response.url}") if args.debug and args.verbose >= 1: - print("Google response: {} => JSON: {}".format(response, response.json())) + print(f"Google response: {response} => JSON: {response.json()}") # type map # For automated return of correct data into set to return type_map = { @@ -265,22 +334,26 @@ def reverseGeolocateGoogle(longitude, latitude): # noqa: C901 # -> locality, # -> sublocality (_level_1 or 2 first found, then route) # so we get the data in the correct order - for loc_index in type_map: - for index in type_map[loc_index]: + # for loc_index in type_map: + # for index in type_map[loc_index]: + for loc_index, sub_index in type_map.items(): + for index in sub_index: # this is an array, so we need to loop through each for addr in entry['address_components']: - # in types check that index is in there and the location is not yet set + # in types check that index is in there + # and the location is not yet set # also check that entry is in LATIN based # NOTE: fallback if all are non LATIN? if index in addr['types'] and not geolocation[loc_index]: - # for country code we need to use short name, else we use long name + # for country code we need to use short name, + # else we use long name if loc_index == 'CountryCode': - if onlyLatinChars(addr['short_name']): + if only_latin_chars(addr['short_name']): geolocation[loc_index] = addr['short_name'] elif not temp_geolocation[loc_index]: temp_geolocation[loc_index] = addr['short_name'] else: - if onlyLatinChars(addr['long_name']): + if only_latin_chars(addr['long_name']): geolocation[loc_index] = addr['long_name'] elif not temp_geolocation[loc_index]: temp_geolocation[loc_index] = addr['long_name'] @@ -293,16 +366,22 @@ def reverseGeolocateGoogle(longitude, latitude): # noqa: C901 else: geolocation['error_message'] = response.json()['error_message'] geolocation['status'] = response.json()['status'] - print("Error in request: {} {}".format(geolocation['status'], geolocation['error_message'])) + print(f"Error in request: {geolocation['status']} {geolocation['error_message']}") # return return geolocation +def convert_lat_long_to_dms(lat_long, is_latitude=False, is_longitude=False): + """ + convert the LR format of N.N to the Exif GPS format -# METHOD: convertLatLongToDMS -# PARAMS: latLong in (-)N.N format, lat or long flag (else we can't set N/S) -# RETURN: Deg,Min.Sec(NESW) format -# DESC : convert the LR format of N.N to the Exif GPS format -def convertLatLongToDMS(lat_long, is_latitude=False, is_longitude=False): + Args: + lat_long(str): latLong in (-)N.N format + is_latitude (bool, optional): flag, else we can't set North/Sout. Defaults to False. + is_longitude (bool, optional): flag, else we can't set West/East. Defaults to False. + + Returns: + string: Deg,Min.Sec(NESW) format + """ # minus part before . and then multiply rest by 60 degree = int(abs(lat_long)) minutes = round((float(abs(lat_long)) - int(abs(lat_long))) * 60, 10) @@ -312,26 +391,47 @@ def convertLatLongToDMS(lat_long, is_latitude=False, is_longitude=False): direction = 'W' if int(lat_long) < 0 else 'E' else: direction = '(INVALID)' - return "{},{}{}".format(degree, minutes, direction) + return f"{degree},{minutes}{direction}" +def convert_lat_to_dms(lat_long): + """ + wrapper functions for Long/Lat calls: latitude -# wrapper functions for Long/Lat calls: latitude -def convertLatToDMS(lat_long): - return convertLatLongToDMS(lat_long, is_latitude=True) + Args: + lat_long(str): latLong in (-)N.N format + + Returns: + string: Deg,Min.Sec(NESW) format + """ + return convert_lat_long_to_dms(lat_long, is_latitude=True) # wrapper for Long/Lat call: longitute -def convertLongToDMS(lat_long): - return convertLatLongToDMS(lat_long, is_longitude=True) +def convert_long_to_dms(lat_long): + """ + wrapper for Long/Lat call: longitute + Args: + lat_long(str): latLong in (-)N.N format -# METHOD: longLatReg -# PARAMS: latitude in (n,n.nNSEW format), longitude -# RETURN: dict with converted lat/long -# DESC : converts the XMP/EXIF formatted GPS Long/Lat coordinates -# from the , to the normal float -# number used in google/lr internal -def longLatReg(longitude, latitude): + Returns: + string: Deg,Min.Sec(NESW) format + """ + return convert_lat_long_to_dms(lat_long, is_longitude=True) + +def long_lat_reg(longitude, latitude): + """ + converts the XMP/EXIF formatted GPS Long/Lat coordinates + from the , to the normal float + number used in google/lr internal + + Args: + longitude(str): n,n.nNSEW format + latitude(str): n,n.nNSEW format + + Returns: + dictionary: dict with converted lat/long + """ # regex latlong_re = re.compile(r'^(\d+),(\d+\.\d+)([NESW]{1})$') # dict for loop @@ -339,63 +439,96 @@ def longLatReg(longitude, latitude): 'longitude': longitude, 'latitude': latitude } - for element in lat_long: + # for element in lat_long: + for index, element in lat_long.items(): # match if it is exif GPS format - m = latlong_re.match(lat_long[element]) - if m is not None: + _match = latlong_re.match(element) + if _match is not None: # convert from Degree, Min.Sec into float format - lat_long[element] = float(m.group(1)) + (float(m.group(2)) / 60) + lat_long[index] = float(_match.group(1)) + (float(_match.group(2)) / 60) # if S or W => inverse to negative - if m.group(3) == 'S' or m.group(3) == 'W': - lat_long[element] *= -1 + if _match.group(3) == 'S' or _match.group(3) == 'W': + lat_long[index] *= -1 return lat_long +def convert_dms_to_lat(lat_long): + """ + rapper calls for DMS to Lat/Long: latitude -# wrapper calls for DMS to Lat/Long: latitude -def convertDMStoLat(lat_long): - return longLatReg('0,0.0N', lat_long)['latitude'] + Args: + lat_long(str): n,n.nNSEW format + Returns: + dict: dict with converted lat/long + """ + return long_lat_reg('0,0.0N', lat_long)['latitude'] -# # wrapper calls for DMS to Lat/Long: longitude -def convertDMStoLong(lat_long): - return longLatReg(lat_long, '0,0.0N')['longitude'] +def convert_dms_to_long(lat_long): + """ + wrapper calls for DMS to Lat/Long: longitude + Args: + lat_long(str): n,n.nNSEW format -# METHOD: getDistance -# PARAMS: from long/lat, to long_lat -# RETURN: distance in meters -# DESC : calculates the difference between two coordinates -def getDistance(from_longitude, from_latitude, to_longitude, to_latitude): + Returns: + dict: dict with converted lat/long + """ + return long_lat_reg(lat_long, '0,0.0N')['longitude'] + +def get_distance(from_longitude, from_latitude, to_longitude, to_latitude): + """ + calculates the difference between two coordinates + + Args: + from_longitude(str): from longitude + from_latitude(str): from latitude + to_longitude(str): to longitude + to_latitude(str): to latitude + + Returns: + float: distance in meters + """ # earth radius in meters earth_radius = 6378137.0 # convert all from radians with pre convert DMS to long and to float - from_longitude = radians(float(convertDMStoLong(from_longitude))) - from_latitude = radians(float(convertDMStoLat(from_latitude))) - to_longitude = radians(float(convertDMStoLong(to_longitude))) - to_latitude = radians(float(convertDMStoLat(to_latitude))) + from_longitude = radians(float(convert_dms_to_long(from_longitude))) + from_latitude = radians(float(convert_dms_to_lat(from_latitude))) + to_longitude = radians(float(convert_dms_to_long(to_longitude))) + to_latitude = radians(float(convert_dms_to_lat(to_latitude))) # distance from - to distance_longitude = from_longitude - to_longitude distance_latitude = from_latitude - to_latitude # main distance calculation - distance = sin(distance_latitude / 2)**2 + cos(from_latitude) * cos(to_latitude) * sin(distance_longitude / 2)**2 + distance = sin(distance_latitude / 2)**2 + cos(from_latitude) * \ + cos(to_latitude) * sin(distance_longitude / 2)**2 distance = 2 * atan2(sqrt(distance), sqrt(1 - distance)) return earth_radius * distance +def check_overwrite(data, key, field_controls, args): + """ + checks with field control flags if given data for key should be written + 1) data is not set + 2) data is set or not and field_control: overwrite only set + 3) data for key is not set, but only for key matches field_control + 4) data for key is set or not, but only for key matches field_control and overwrite is set -# METHOD: checkOverwrite -# PARAMS: data: value field, key: XMP key, field_controls: array from args -# RETURN: true/false -# DESC : checks with field control flags if given data for key should be written -# 1) data is not set -# 2) data is set or not and field_control: overwrite only set -# 3) data for key is not set, but only for key matches field_control -# 4) data for key is set or not, but only for key matches field_control and overwrite is set -def checkOverwrite(data, key, field_controls): + Args: + data(str): value field + key(str): xmpt key + field_controls (array): array from args + args (_type_): _description_ + + Returns: + bool: true/false + """ status = False # init field controls for empty if not field_controls: field_controls = [] - if not data and (len(field_controls) == 0 or ('overwrite' in field_controls and len(field_controls) == 1)): + if ( + not data and (len(field_controls) == 0 or + ('overwrite' in field_controls and len(field_controls) == 1)) + ): status = True elif not data and key.lower() in field_controls: status = True @@ -404,23 +537,31 @@ def checkOverwrite(data, key, field_controls): elif data and key.lower() in field_controls and 'overwrite' in field_controls: status = True if args.debug: - print("Data set: {data_set}, Key: {key_lower}, Field Controls len: {field_count}, Overwrite: {overwrite_flag}, Key in Field Controls: {key_ok}, OVERWRITE: {do_overwrite}".format( - data_set='YES' if data else 'NO', - key_lower=key.lower(), - field_count=len(field_controls), - overwrite_flag='OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE', - key_ok='KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING', - do_overwrite=status - )) + print( + f"Data set: {'YES' if data else 'NO'}, " + f"Key: {key.lower()}, " + f"Field Controls len: {len(field_controls)}, " + f"Overwrite: {'OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE'}, " + "Key in Field Controls: " + f"{'KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING'}, " + f"OVERWRITE: {status}" + ) return status +def shorten_path(path, length=30, file_only=False, path_only=False): + """ + shortes a path from the left so it fits into lenght + if file only is set to true, it will split the file, if path only is set, only the path -# METHOD: shortenPath -# PARAMS: path = string, length = int, file_only = true/false, path_only = true/false -# RETURN: shortend path with ... in front -# DESC : shortes a path from the left so it fits into lenght -# if file only is set to true, it will split the file, if path only is set, only the path -def shortenPath(path, length=30, file_only=False, path_only=False): + Args: + path(str): path + length (int, optional): maximum length to shorten to. Defaults to 30. + file_only (bool, optional): only file. Defaults to False. + path_only (bool, optional): only path. Defaults to False. + + Returns: + string: shortend path with ... in front + """ length = length - 3 # I assume the XMP file name has no CJK characters inside, so I strip out the path # The reason is that if there are CJK characters inside it will screw up the formatting @@ -428,20 +569,26 @@ def shortenPath(path, length=30, file_only=False, path_only=False): path = os.path.split(path)[1] if path_only: path = os.path.split(path)[0] - if stringLenCJK(path) > length: - path = "{} {}".format("..", path[stringLenCJK(path) - length:]) + if string_len_cjk(path) > length: + path = f".. {path[string_len_cjk(path) - length:]}" return path +def shorten_string(string, width, placeholder='..'): + """ + shortens a string to width and attached placeholder -# METHOD: shortenString -# PARAMS: string, shorten width, override shorten placeholder -# RETURN: shortened string -# DESC : shortens a string to width and attached placeholder -def shortenString(string, width, placeholder='..'): + Args: + string(str): string to shorten + width (int): length th shorten to + placeholder (str, optional): optional string for removed shortend part. Defaults to '..'. + + Returns: + string: shortened string + """ # get the length with double byte charactes - string_len_cjk = stringLenCJK(str(string)) + string_length_cjk = string_len_cjk(str(string)) # if double byte width is too big - if string_len_cjk > width: + if string_length_cjk > width: # set current length and output string cur_len = 0 out_string = '' @@ -453,28 +600,37 @@ def shortenString(string, width, placeholder='..'): if cur_len <= (width - len(placeholder)): out_string += char # return string with new width and placeholder - return "{}{}".format(out_string, placeholder) + return f"{out_string}{placeholder}" else: return str(string) +def string_len_cjk(string): + """ + because len on string in python counts characters but we need the width + count for formatting, we count two for a double byte characters -# METHOD: stringLenCJK -# PARAMS: string -# RETURN: length including double count for double width characters -# DESC : because len on string in python counts characters but we need -# the width count for formatting, we count two for a double byte -# characters -def stringLenCJK(string): - """ return string len including double count for double width characters """ + Args: + string (string): string to check length + + Returns: + int: length including double count for double width characters + """ + # return string len including double count for double width characters return sum(1 + (unicodedata.east_asian_width(c) in "WF") for c in string) +def is_latin(uchr): + """ + checks via the unciode class if a character is LATIN char based -# FROM: https://stackoverflow.com/a/3308844/7811993 -# METHOD: isLatin -# PARAMS: character -# RETURN: flagged LATIN or not char -# DESC : checks via the unciode class if a character is LATIN char based -def isLatin(uchr): + from + https://stackoverflow.com/a/3308844/7811993 + + Args: + uchr (str): _description_ + + Returns: + str: flagged LATIN or not char + """ try: # if we found in the dictionary return return cache_latin_letters[uchr] @@ -482,58 +638,121 @@ def isLatin(uchr): # find LATIN in uncide type returned and set in dictionary for this character return cache_latin_letters.setdefault(uchr, 'LATIN' in unicodedata.name(uchr)) +def only_latin_chars(unistr): + """ + chekcs if a string is based on LATIN chars. No for any CJK, Cyrillic, Hebrew, etc -# FROM: https://stackoverflow.com/a/3308844/7811993 -# METHOD: onlyLatinChars -# PARAMS: string -# RETURN: True/False for if string is LATIN char based -# DESC : chekcs if a string is based on LATIN chars. No for any CJK, Cyrillic, Hebrew, etc -def onlyLatinChars(unistr): - return all(isLatin(uchr) for uchr in unistr if uchr.isalpha()) + from: + https://stackoverflow.com/a/3308844/7811993 + Args: + unistr (str): string -# METHOD: printHeader -# PARAMS: header string, line counter, print header counter trigger -# RETURN: line counter +1 -# DESC : prints header line and header seperator line -def printHeader(header, lines=0, header_line=0): - global page_no - if lines == header_line: - # add one to the pages shown and reset the lines to start new page - page_no += 1 - lines = 0 - # print header - print("{}".format(header)) - lines += 1 - return lines + Returns: + bool: True/False for if string is LATIN char based + """ + return all(is_latin(uchr) for uchr in unistr if uchr.isalpha()) +# def print_header(header, lines=0, header_line=0): +# """ +# prints header line and header seperator line -# METHOD: formatLen -# PARAMS: string, format length -# RETURN: returns adjusted format length -# DESC : in case of CJK characters we need to adjust the format length dynamically -# calculate correct length based on string given -def formatLen(string, length): +# Args: +# header (str): header string +# lines (int, optional): line counter. Defaults to 0. +# header_line (int, optional): print header counter grigger. Defaults to 0. + +# Returns: +# int: line counter +1 +# """ +# global page_no +# if lines == header_line: +# # add one to the pages shown and reset the lines to start new page +# page_no += 1 +# lines = 0 +# # print header +# print(f"{header}") +# lines += 1 +# return lines + +class ReadOnlyOutput: + """ + for read only listing + """ + page_no = 1 + page_all = 1 + lines = 0 + header_print = 0 + header_template = '' + + def __init__(self, header_template, max_pages, header_print_line): + self.page_all = max_pages + self.header_template = header_template + self.header_print = header_print_line + + def print_header(self): + """ + prints header line and header seperator line + + Args: + header (str): header string + lines (int, optional): line counter. Defaults to 0. + header_line (int, optional): print header counter grigger. Defaults to 0. + + Returns: + int: line counter +1 + """ + if self.lines == self.header_print: + # add one to the pages shown and reset the lines to start new page + self.page_no += 1 + self.lines = 0 + # print header + # print(f"{header}") + print(self.header_template.format( + page_no=self.page_no, page_all=self.page_all + )) + self.lines += 1 + +def format_len(string, length): + """ + in case of CJK characters we need to adjust the format length dynamically + calculate correct length based on string given + + Args: + string (str): string + length (int): format length + + Returns: + int: adjusted format legnth + """ # returns length udpated for string with double byte characters # get string length normal, get string length including double byte characters # then subtract that from the original length - return length - (stringLenCJK(string) - len(string)) + return length - (string_len_cjk(string) - len(string)) +\ +def file_sort_number(file): + """ + gets the BK number for sorting in the file list + Args: + file (str): file name -# METHOD: fileSortNumber -# PARAMS: file name -# RETURN: number found in the BK string or 0 for none -# DESC : gets the BK number for sorting in the file list -def fileSortNumber(file): - m = re.match(r'.*\.BK\.(\d+)\.xmp$', file) - return int(m.group(1)) if m is not None else 0 + Returns: + int: number found in the BK string or 0 for none + """ + match = re.match(r'.*\.BK\.(\d+)\.xmp$', file) + return int(match.group(1)) if match is not None else 0 +def output_list_width_adjust(args): + """ + adjusts the size for the format length for the list output -# METHOD: outputListWidthAdjust -# PARAMS: none -# RETURN: format_length dictionary -# DESC : adjusts the size for the format length for the list output -def outputListWidthAdjust(): + Args: + args (_type_): arguments + + Returns: + dictionary: format_length dictionary + """ # various string lengths format_length = { 'filename': 35, @@ -549,8 +768,12 @@ def outputListWidthAdjust(): if args.compact_view: reduce_percent = 40 # all formats are reduced to a mininum, we cut % off - for format_key in ['filename', 'latitude', 'longitude', 'country', 'state', 'city', 'location', 'path']: - format_length[format_key] = ceil(format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent)) + for format_key in [ + 'filename', 'latitude', 'longitude', 'country', 'state', 'city', 'location', 'path' + ]: + format_length[format_key] = ceil( + format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent) + ) else: # minimum resize size for a column resize_width_min = 4 @@ -572,7 +795,9 @@ def outputListWidthAdjust(): format_key_order = ['path', 'location', 'state', 'city', 'country', 'filename'] else: resize = -1 - format_key_order = ['latitude', 'longitude', 'path', 'country', 'state', 'city', 'location', 'filename'] + format_key_order = [ + 'latitude', 'longitude', 'path', 'country', 'state', 'city', 'location', 'filename' + ] # if we have no auto adjust if resize and args.no_autoadjust: # warningn if screen is too small @@ -588,10 +813,18 @@ def outputListWidthAdjust(): resize_width *= -1 resize_width = ceil(format_length[format_key] + resize_width) # in case too small, keep old one - format_length[format_key] = resize_width if resize_width > resize_width_min else format_length[format_key] + format_length[format_key] = ( + resize_width + if resize_width > resize_width_min else format_length[format_key] + ) # calc new width for check if we can abort - current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 - if (resize == 1 and current_columns >= get_terminal_size().columns) or (resize == -1 and current_columns < get_terminal_size().columns): + current_columns = ( + sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 + ) + if ( + (resize == 1 and current_columns >= get_terminal_size().columns) or + (resize == -1 and current_columns < get_terminal_size().columns) + ): # check that we are not OVER but one under width_up = get_terminal_size().columns - current_columns - 1 if (resize == 1 and width_up < 0) or (resize == -1 and width_up != 0): @@ -601,34 +834,49 @@ def outputListWidthAdjust(): break if abort: break - if sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > get_terminal_size().columns: + if ( + sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > + get_terminal_size().columns + ): print("[!!!] Screen layout might be skewed. Increase Terminal width") return format_length +def get_backup_file_counter(xmp_file, args): + """ + get backup file counter -# METHOD: getBackupFileCounter -# PARAMS: file name -# RETURN: next counter to be used for backup -# DESC : -def getBackupFileCounter(xmp_file): + Args: + xmp_file (str): file name + args (_type_): arguments + + Returns: + int: next counter to be used for backup + """ # set to 1 for if we have no backups yet bk_file_counter = 1 - # get PATH from file and look for .BK. data in this folder matching, output is sorted per BK counter key + # get PATH from file and look for .BK. data in this folder matching, + # output is sorted per BK counter key for bk_file in sorted( - glob.glob("{path}/{file}*.xmp".format( - path=os.path.split(xmp_file)[0], - file="{}.BK.".format(os.path.splitext(os.path.split(xmp_file)[1])[0]) + glob.glob( + # "{path}/{file}*.xmp".format( + # path=os.path.split(xmp_file)[0], + # file=f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK." + # ) + os.path.join( + f"{os.path.split(xmp_file)[0]}", + f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK.*.xmp" ) ), # custom sort key to get the backup files sorted correctly - key=lambda pos: fileSortNumber(pos), + key=lambda pos: file_sort_number(pos), + # key=file_sort_number(), reverse=True ): # BK.1, etc -> get the number - bk_pos = fileSortNumber(bk_file) + bk_pos = file_sort_number(bk_file) if bk_pos > 0: if args.debug: - print("#### **** File: {}, Counter: {} -> {}".format(bk_file, bk_pos, bk_pos + 1)) + print(f"#### **** File: {bk_file}, Counter: {bk_pos} -> {bk_pos + 1}") # check if found + 1 is bigger than set, if yes, set to new bk counter if bk_pos + 1 > bk_file_counter: bk_file_counter = bk_pos + 1 @@ -664,7 +912,10 @@ def argument_parser(): action=WritableDirFolder, dest='xmp_sources', metavar='XMP SOURCE FOLDER', - help='The source folder or folders with the XMP files that need reverse geo encoding to be set. Single XMP files can be given here' + help=( + 'The source folder or folders with the XMP files that need reverse geo encoding ' + 'to be set. Single XMP files can be given here' + ) ) # exclude folders parser.add_argument( @@ -683,7 +934,7 @@ def argument_parser(): '-l', '--lightroom', # required=True, - action=readable_dir, + action=ReadableDir, dest='lightroom_folder', metavar='LIGHTROOM FOLDER', help='Lightroom catalogue base folder' @@ -725,7 +976,7 @@ def argument_parser(): '-d', '--fuzzy-cache', type=str.lower, - action=distance_values, + action=DistanceValues, nargs='?', const='10m', # default is 10m dest='fuzzy_distance', @@ -876,9 +1127,10 @@ def main(): "### ARGUMENT VARS: " f"I: {args.xmp_sources}, X: {args.exclude_sources}, L: {args.lightroom_folder}, " f"F: {args.field_controls}, D: {args.fuzzy_distance}, M: {args.use_openstreetmap}, " - f"G: {args.google_api_key}, E: {args.email}, R: {args.read_only}, U: {args.unset_only}, " - f"A: {args.no_autoadjust}, C: {args.compact_view}, N: {args.no_xmp_backup}, " - f"W: {args.config_write}, V: {args.verbose}, D: {args.debug}, T: {args.test}" + f"G: {args.google_api_key}, E: {args.email}, R: {args.read_only}, " + f"U: {args.unset_only}, A: {args.no_autoadjust}, C: {args.compact_view}, " + f"N: {args.no_xmp_backup}, W: {args.config_write}, V: {args.verbose}, " + f"D: {args.debug}, T: {args.test}" ) # error flag @@ -900,7 +1152,7 @@ def main(): # if email and not basic valid email (@ .) if args.email: if not re.match(r'^.+@.+\.[A-Za-z]{1,}$', args.email): - print("Not a valid email for OpenStreetMap: {}".format(args.email)) + print(f"Not a valid email for OpenStreetMap: {args.email}") error = True # on error exit here if error: @@ -911,8 +1163,9 @@ def main(): # $HOME/.config/ config_file = 'reverse_geolocate.cfg' config_folder = os.path.expanduser('~/.config/reverseGeolocate/') - config_data = '{}{}'.format(config_folder, config_file) - # if file exists read, if not skip unless we have write flag and google api or openstreetmaps email + config_data = os.path.join(f"{config_folder}", f"{config_file}") + # if file exists read, if not skip unless we have write flag and + # google api or openstreetmaps email if os.path.isfile(config_data): config.read(config_data) # check if api group & setting is there. also never overwrite argument given data @@ -929,20 +1182,26 @@ def main(): # check if new value differs, if yes, change and write if 'API' not in config: config['API'] = {} - if args.google_api_key and ('googleapikey' not in config['API'] or config['API']['googleapikey'] != args.google_api_key): + if ( + args.google_api_key and ('googleapikey' not in config['API'] or + config['API']['googleapikey'] != args.google_api_key) + ): config['API']['googleapikey'] = args.google_api_key config_change = True - if args.email and ('openstreetmapemail' not in config['API'] or config['API']['openstreetmapemail'] != args.email): + if ( + args.email and ('openstreetmapemail' not in config['API'] or + config['API']['openstreetmapemail'] != args.email) + ): config['API']['openstreetmapemail'] = args.email config_change = True if config_change: # if we do not have the base folder create that first if not os.path.exists(config_folder): os.makedirs(config_folder) - with open(config_data, 'w') as fptr: + with open(config_data, 'w', encoding="UTF-8") as fptr: config.write(fptr) if args.debug: - print("### OVERRIDE API: G: {}, O: {}".format(args.google_api_key, args.email)) + print(f"### OVERRIDE API: G: {args.google_api_key}, O: {args.email}") # The XMP fields const lookup values # XML/XMP @@ -956,7 +1215,8 @@ def main(): # photoshop:Country # Iptc4xmpCore:CountryCode xmp_fields = { - 'GPSLatitude': consts.XMP_NS_EXIF, # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format + # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format + 'GPSLatitude': consts.XMP_NS_EXIF, 'GPSLongitude': consts.XMP_NS_EXIF, 'Location': consts.XMP_NS_IPTCCore, 'City': consts.XMP_NS_Photoshop, @@ -986,9 +1246,12 @@ def main(): failed_files = [] # use lightroom use_lightroom = False + # path to lightroom database + lightroom_database = '' # cursors & query query = '' - cur = '' + cur = None + lrdb = None # count variables count = { 'all': 0, @@ -1008,19 +1271,35 @@ def main(): # do lightroom stuff only if we have the lightroom folder if args.lightroom_folder: # query string for lightroom DB check - query = 'SELECT Adobe_images.id_local, AgLibraryFile.baseName, AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, ' - query += 'AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, ' - query += 'AgHarvestedIptcMetadata.locationDataOrigination, AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, ' - query += 'AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, AgInternedIptcIsoCountryCode.value as CountryCode ' - query += 'FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, AgLibraryRootFolder, Adobe_images ' - query += 'LEFT JOIN AgHarvestedIptcMetadata ON Adobe_images.id_local = AgHarvestedIptcMetadata.image ' - query += 'LEFT JOIN AgInternedIptcLocation ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local ' - query += 'LEFT JOIN AgInternedIptcCity ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local ' - query += 'LEFT JOIN AgInternedIptcState ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local ' - query += 'LEFT JOIN AgInternedIptcCountry ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local ' - query += 'LEFT JOIN AgInternedIptcIsoCountryCode ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local ' - query += 'WHERE Adobe_images.rootFile = AgLibraryFile.id_local AND Adobe_images.id_local = AgHarvestedExifMetadata.image AND AgLibraryFile.folder = AgLibraryFolder.id_local AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local ' - query += 'AND AgLibraryFile.baseName = ?' + query = ( + 'SELECT Adobe_images.id_local, AgLibraryFile.baseName, ' + 'AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, ' + 'AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, ' + 'AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, ' + 'AgHarvestedIptcMetadata.locationDataOrigination, ' + 'AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, ' + 'AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, ' + 'AgInternedIptcIsoCountryCode.value as CountryCode ' + 'FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, ' + 'AgLibraryRootFolder, Adobe_images ' + 'LEFT JOIN AgHarvestedIptcMetadata ' + 'ON Adobe_images.id_local = AgHarvestedIptcMetadata.image ' + 'LEFT JOIN AgInternedIptcLocation ' + 'ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local ' + 'LEFT JOIN AgInternedIptcCity ' + 'ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local ' + 'LEFT JOIN AgInternedIptcState ' + 'ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local ' + 'LEFT JOIN AgInternedIptcCountry ' + 'ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local ' + 'LEFT JOIN AgInternedIptcIsoCountryCode ' + 'ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local ' + 'WHERE Adobe_images.rootFile = AgLibraryFile.id_local ' + 'AND Adobe_images.id_local = AgHarvestedExifMetadata.image ' + 'AND AgLibraryFile.folder = AgLibraryFolder.id_local ' + 'AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local ' + 'AND AgLibraryFile.baseName = ?' + ) # absolutePath + pathFromRoot = path of XMP file - XMP file if args.lightroom_strict: query += 'AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?' @@ -1032,7 +1311,10 @@ def main(): lightroom_database = os.path.join(args.lightroom_folder, file) lrdb = sqlite3.connect(lightroom_database) if not lightroom_database or not lrdb: - print("(!) We could not find a lrcat file in the given lightroom folder or DB connection failed: {}".format(args.lightroom_folder)) + print( + "(!) We could not find a lrcat file in the given lightroom folder or " + f"DB connection failed: {args.lightroom_folder}" + ) # flag for end error = True else: @@ -1043,7 +1325,7 @@ def main(): # flag that we have Lightroom DB use_lightroom = True if args.debug: - print("### USE Lightroom {}".format(use_lightroom)) + print(f"### USE Lightroom {use_lightroom}") # on error exit here if error: @@ -1052,25 +1334,34 @@ def main(): # init the XML meta for handling xmp = XMPMeta() - # loop through the xmp_sources (folder or files) and read in the XMP data for LAT/LONG, other data + # loop through the xmp_sources (folder or files) + # and read in the XMP data for LAT/LONG, other data for xmp_file_source in args.xmp_sources: # if folder, open and loop # NOTE: we do check for folders in there, if there are we recourse traverse them # also check that folder is not in exclude list - if os.path.isdir(xmp_file_source) and xmp_file_source.rstrip('/') not in [x.rstrip('/') for x in args.exclude_sources]: + if ( + os.path.isdir(xmp_file_source) and + xmp_file_source.rstrip(os.sep) not in [x.rstrip(os.sep) + for x in args.exclude_sources] + ): # open folder and look for any .xmp files and push them into holding array # if there are folders, dive into them # or glob glob all .xmp files + directory - for root, dirs, files in os.walk(xmp_file_source): + for root, _, files in os.walk(xmp_file_source): for file in sorted(files): # 1) but has no .BK. inside # 2) file is not in exclude list # 3) full folder is not in exclude list - if file.endswith(".xmp") and ".BK." not in file \ - and "{}/{}".format(root, file) not in args.exclude_sources \ - and root.rstrip('/') not in [x.rstrip('/') for x in args.exclude_sources]: - if "{}/{}".format(root, file) not in work_files: - work_files.append("{}/{}".format(root, file)) + file_path = os.path.join(f"{root}", f"{file}") + if ( + file.endswith(".xmp") and ".BK." not in file + and file_path not in args.exclude_sources + and root.rstrip(os.sep) not in [x.rstrip(os.sep) + for x in args.exclude_sources] + ): + if file_path not in work_files: + work_files.append(file_path) count['all'] += 1 else: # not already added to list and not in the exclude list either @@ -1078,23 +1369,31 @@ def main(): work_files.append(xmp_file_source) count['all'] += 1 if args.debug: - print("### Work Files {}".format(work_files)) + print(f"### Work Files {work_files}") + format_line = '' + header_line = '' + format_length = {} + header_print = None # if we have read only we print list format style if args.read_only: # adjust the output width for the list view - format_length = outputListWidthAdjust() + format_length = output_list_width_adjust(args) # after how many lines do we reprint the header header_repeat = 50 # how many pages will we have page_all = ceil(len(work_files) / header_repeat) # current page number - page_no = 1 + # page_no = 1 # the formatted line for the output # 4 {} => final replace: data (2 pre replaces) # 1 {} => length replace here - format_line = " {{{{filename:<{}}}}} | {{{{latitude:>{}}}}} | {{{{longitude:>{}}}}} | {{{{code:<{}}}}} | {{{{country:<{}}}}} | {{{{state:<{}}}}} | {{{{city:<{}}}}} | {{{{location:<{}}}}} | {{{{path:<{}}}}}".format( + format_line = ( + " {{{{filename:<{}}}}} | {{{{latitude:>{}}}}} | {{{{longitude:>{}}}}} | " + "{{{{code:<{}}}}} | {{{{country:<{}}}}} | {{{{state:<{}}}}} | {{{{city:<{}}}}} | " + "{{{{location:<{}}}}} | {{{{path:<{}}}}}" + ).format( "{filenamelen}", format_length['latitude'], format_length['longitude'], @@ -1109,10 +1408,13 @@ def main(): # blank line # header title # seperator line - header_line = '''{} - {} - {}'''.format( - '> Page {page_no:,}/{page_all:,}', # can later be set to something else, eg page numbers + header_line = ( + "{}" + "{}" + "{}" + ).format( + # can later be set to something else, eg page numbers + '> Page {page_no:,}/{page_all:,}', # pre replace path length before we add the header titles format_line.format( filenamelen=format_length['filename'], @@ -1144,76 +1446,148 @@ def main(): '-' * (format_length['path'] + 2) ) ) + # header print class + header_print = ReadOnlyOutput(header_line, page_all, header_repeat) # print header - printHeader(header_line.format(page_no=page_no, page_all=page_all)) + # print_header(header_line.format(page_no=page_no, page_all=page_all)) + header_print.print_header() # print no files found if we have no files if not work_files: - print("{:<60}".format('[!!!] No files found')) + print(f"{'[!!!] No files found':<60}") # ### MAIN WORK LOOP # now we just loop through each file and work on them for xmp_file in work_files: # noqa: C901 if not args.read_only: - print("---> {}: ".format(xmp_file), end='') + print(f"---> {xmp_file}: ", end='') # ### ACTION FLAGs write_file = False # ### XMP FILE READING # open file & read all into buffer - with open(xmp_file, 'r') as fptr: + with open(xmp_file, 'r', encoding="UTF-8") as fptr: strbuffer = fptr.read() # read fields from the XMP file and store in hash xmp.parse_from_str(strbuffer) - for xmp_field in xmp_fields: + # for xmp_field in xmp_fields: + # # need to check if propert exist or it will the exempi routine will fail + # if xmp.does_property_exist(xmp_fields[xmp_field], xmp_field): + # data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field) + # else: + # data_set[xmp_field] = '' + # if args.debug: + # print(f"### => XMP: {xmp_fields[xmp_field]}:{xmp_field} => {data_set[xmp_field]}") + for xmp_field_key, xmp_field_value in xmp_fields.items(): # need to check if propert exist or it will the exempi routine will fail - if xmp.does_property_exist(xmp_fields[xmp_field], xmp_field): - data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field) + if xmp.does_property_exist(xmp_field_value, xmp_field_key): + data_set[xmp_field_key] = xmp.get_property(xmp_field_value, xmp_field_key) else: - data_set[xmp_field] = '' + data_set[xmp_field_key] = '' if args.debug: - print("### => XMP: {}:{} => {}".format(xmp_fields[xmp_field], xmp_field, data_set[xmp_field])) + print( + f"### => XMP: {xmp_field_value}:{xmp_field_key} => {data_set[xmp_field_key]}" + ) if args.read_only: # view only if list all or if data is unset - if (not args.unset_only and not args.unset_gps_only) or (args.unset_only and '' in data_set.values()) or (args.unset_gps_only and (not data_set['GPSLatitude'] or not data_set['GPSLongitude'])): + if ( + (not args.unset_only and not args.unset_gps_only) or + (args.unset_only and '' in data_set.values()) or + (args.unset_gps_only and (not data_set['GPSLatitude'] or + not data_set['GPSLongitude'])) + ): # for read only we print out the data formatted # headline check, do we need to print that - count['read'] = printHeader(header_line.format(page_no=page_no, page_all=page_all), count['read'], header_repeat) + # count['read'] = print_header( + # header_line.format(page_no=page_no, page_all=page_all), + # count['read'], + # header_repeat + # ) + if header_print is not None: + header_print.print_header() # the data content print(format_line.format( - # for all possible non latin fields we do adjust if it has double byte characters inside - filenamelen=formatLen(shortenPath(xmp_file, format_length['filename'], file_only=True), format_length['filename']), - countrylen=formatLen(shortenString(data_set['Country'], width=format_length['country']), format_length['country']), - statelen=formatLen(shortenString(data_set['State'], width=format_length['state']), format_length['state']), - citylen=formatLen(shortenString(data_set['City'], width=format_length['city']), format_length['city']), - locationlen=formatLen(shortenString(data_set['Location'], width=format_length['location']), format_length['location']), - pathlen=formatLen(shortenPath(xmp_file, format_length['path'], path_only=True), format_length['path']) + # for all possible non latin fields we do adjust + # if it has double byte characters inside + filenamelen=format_len( + shorten_path(xmp_file, format_length['filename'], file_only=True), + format_length['filename'] + ), + countrylen=format_len( + shorten_string(data_set['Country'], width=format_length['country']), + format_length['country'] + ), + statelen=format_len( + shorten_string(data_set['State'], width=format_length['state']), + format_length['state'] + ), + citylen=format_len( + shorten_string(data_set['City'], width=format_length['city']), + format_length['city'] + ), + locationlen=format_len( + shorten_string(data_set['Location'], width=format_length['location']), + format_length['location'] + ), + pathlen=format_len( + shorten_path(xmp_file, format_length['path'], path_only=True), + format_length['path'] + ) ).format( - filename=shortenPath(xmp_file, format_length['filename'], file_only=True), # shorten from the left - latitude=str(convertDMStoLat(data_set['GPSLatitude']))[:format_length['latitude']], # cut off from the right - longitude=str(convertDMStoLong(data_set['GPSLongitude']))[:format_length['longitude']], - code=data_set['CountryCode'][:2].center(4), # is only 2 chars - country=shortenString(data_set['Country'], width=format_length['country']), # shorten from the right - state=shortenString(data_set['State'], width=format_length['state']), - city=shortenString(data_set['City'], width=format_length['city']), - location=shortenString(data_set['Location'], width=format_length['location']), - path=shortenPath(xmp_file, format_length['path'], path_only=True) + # shorten from the left + filename=shorten_path( + xmp_file, format_length['filename'], + file_only=True + ), + # cut off from the right + latitude=( + str(convert_dms_to_lat(data_set['GPSLatitude'])) + [:format_length['latitude']] + ), + longitude=( + str(convert_dms_to_long(data_set['GPSLongitude'])) + [:format_length['longitude']] + ), + # is only 2 chars + code=data_set['CountryCode'][:2].center(4), + # shorten from the right + country=shorten_string( + data_set['Country'], width=format_length['country'] + ), + state=shorten_string( + data_set['State'], width=format_length['state'] + ), + city=shorten_string( + data_set['City'], width=format_length['city'] + ), + location=shorten_string( + data_set['Location'], + width=format_length['location'] + ), + path=shorten_path( + xmp_file, + format_length['path'], + path_only=True + ) ) ) count['listed'] += 1 else: # ### LR Action Flag (data ok) lightroom_data_ok = True + lrdb_row = {} # ### LIGHTROOM DB READING # read in data from DB if we uave lightroom folder - if use_lightroom: + if use_lightroom and cur is not None: # get the base file name, we need this for lightroom xmp_file_basename = os.path.splitext(os.path.split(xmp_file)[1])[0] # try to get this file name from the DB lr_query_params = [xmp_file_basename] - # for strict check we need to get the full path, and add / as the LR stores the last folder with / + # for strict check we need to get the full path + # and add / as the LR stores the last folder with / if args.lightroom_strict: - xmp_file_path = "{}/{}".format(os.path.split(xmp_file)[0], '/') + # xmp_file_path = "{}/{}".format(os.path.split(xmp_file)[0], '/') + xmp_file_path = f"{os.path.split(xmp_file)[0]}/{'/'}" lr_query_params.append(xmp_file_path) cur.execute(query, lr_query_params) # get the row data @@ -1229,7 +1603,7 @@ def main(): lightroom_data_ok = False count['not_found'] += 1 if args.debug and lrdb_row: - print("### LightroomDB: {} / {}".format(tuple(lrdb_row), lrdb_row.keys())) + print(f"### LightroomDB: {tuple(lrdb_row)} / {lrdb_row.keys()}") # create a duplicate copy for later checking if something changed data_set_original = data_set.copy() @@ -1242,12 +1616,13 @@ def main(): # check lat/long separate if lrdb_row['gpsLatitude'] and not data_set['GPSLatitude']: # we need to convert to the Degree,Min.sec[NSEW] format - data_set['GPSLatitude'] = convertLatToDMS(lrdb_row['gpsLatitude']) + data_set['GPSLatitude'] = convert_lat_to_dms(lrdb_row['gpsLatitude']) if lrdb_row['gpsLongitude'] and not data_set['GPSLongitude']: - data_set['GPSLongitude'] = convertLongToDMS(lrdb_row['gpsLongitude']) + data_set['GPSLongitude'] = convert_long_to_dms(lrdb_row['gpsLongitude']) # now check Location, City, etc for loc in data_set_loc: - # overwrite original set (read from XMP) with LR data if original data is missing + # overwrite original set (read from XMP) with LR data + # if original data is missing if lrdb_row[loc] and not data_set[loc]: data_set[loc] = lrdb_row[loc] if args.debug: @@ -1259,7 +1634,7 @@ def main(): failed = False from_cache = False for loc in data_set_loc: - if checkOverwrite(data_set[loc], loc, args.field_controls): + if check_overwrite(data_set[loc], loc, args.field_controls, args): has_unset = True if has_unset: # check if lat/long is in cache @@ -1276,9 +1651,9 @@ def main(): # and match before we do google lookup if cache_key not in data_cache: has_fuzzy_cache = False + best_match_latlong = '' if args.fuzzy_distance: shortest_distance = args.fuzzy_distance - best_match_latlong = '' # check if we have fuzzy distance, if no valid found do maps lookup for _cache_key in data_cache: # split up cache key so we can use in the distance calc method @@ -1289,7 +1664,7 @@ def main(): # f"f-lat {data_set['GPSLatitude']} " # f"t-long {to_lat_long[0]} t-lat {to_lat_long[1]}" # ) - distance = getDistance( + distance = get_distance( from_longitude=data_set['GPSLongitude'], from_latitude=data_set['GPSLatitude'], to_longitude=to_lat_long[0], @@ -1312,10 +1687,11 @@ def main(): ) if not has_fuzzy_cache: # get location from maps (google or openstreetmap) - maps_location = reverseGeolocate( + maps_location = reverse_geolocate( latitude=data_set['GPSLatitude'], longitude=data_set['GPSLongitude'], - map_type=map_type + map_type=map_type, + args=args ) # cache data with Lat/Long data_cache[cache_key] = maps_location @@ -1340,7 +1716,7 @@ def main(): if maps_location['Country']: for loc in data_set_loc: # only write to XMP if overwrite check passes - if checkOverwrite(data_set_original[loc], loc, args.field_controls): + if check_overwrite(data_set_original[loc], loc, args.field_controls, args): data_set[loc] = maps_location[loc] xmp.set_property(xmp_fields[loc], loc, maps_location[loc]) write_file = True @@ -1351,14 +1727,30 @@ def main(): failed = True else: if args.debug: - print(f"Lightroom data use: {use_lightroom}, Lightroom data ok: {lightroom_data_ok}") + print( + f"Lightroom data use: {use_lightroom}, " + f"Lightroom data ok: {lightroom_data_ok}" + ) # check if the data_set differs from the original (LR db load) # if yes write, else skip if use_lightroom and lightroom_data_ok: - for key in data_set: + # for key in data_set: + # # if not the same (to original data) and passes overwrite check + # if ( + # data_set[key] != data_set_original[key] and + # check_overwrite(data_set_original[key], key, args.field_controls) + # ): + # xmp.set_property(xmp_fields[key], key, data_set[key]) + # write_file = True + for key, value in data_set.items(): # if not the same (to original data) and passes overwrite check - if data_set[key] != data_set_original[key] and checkOverwrite(data_set_original[key], key, args.field_controls): - xmp.set_property(xmp_fields[key], key, data_set[key]) + if ( + value != data_set_original[key] and + check_overwrite( + data_set_original[key], key, args.field_controls, args + ) + ): + xmp.set_property(xmp_fields[key], key, value) write_file = True if write_file: count['lightroom'] += 1 @@ -1367,15 +1759,20 @@ def main(): if not args.test: # use copyfile to create a backup copy if not args.no_xmp_backup: - # check if there is another file with .BK. already there, if yes, get the max number and +1 it, if not set to 1 - bk_file_counter = getBackupFileCounter(xmp_file) + # check if there is another file with .BK. already there, + # if yes, get the max number and +1 it, if not set to 1 + bk_file_counter = get_backup_file_counter(xmp_file, args) # copy to new backup file - copyfile(xmp_file, "{}.BK.{}{}".format(os.path.splitext(xmp_file)[0], bk_file_counter, os.path.splitext(xmp_file)[1])) + copyfile( + xmp_file, + f"{os.path.splitext(xmp_file)[0]}.BK." + f"{bk_file_counter}{os.path.splitext(xmp_file)[1]}" + ) # write back to riginal file - with open(xmp_file, 'w') as fptr: + with open(xmp_file, 'w', encoding="UTF-8") as fptr: fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True)) else: - print("[TEST] Would write {} {}".format(data_set, xmp_file), end='') + print(f"[TEST] Would write {data_set} {xmp_file}", end='') if from_cache: print("[UPDATED FROM CACHE]") else: @@ -1391,7 +1788,7 @@ def main(): count['skipped'] += 1 # close DB connection - if use_lightroom: + if use_lightroom and lrdb is not None: lrdb.close() # end stats only if we write