diff --git a/.gitignore b/.gitignore index cc94e52..12fd3ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ .venv/ -bin/utils/__pycache__/* +**/__pycache__/** diff --git a/bin/reverse_geolocate.py b/bin/reverse_geolocate.py index c8f27e4..08d6e01 100755 --- a/bin/reverse_geolocate.py +++ b/bin/reverse_geolocate.py @@ -14,6 +14,7 @@ MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/) """ import configparser + # import textwrap import glob import os @@ -23,10 +24,18 @@ import argparse import sqlite3 from shutil import copyfile, get_terminal_size from math import ceil + # Note XMPFiles does not work with sidecar files, need to read via XMPMeta from libxmp import XMPMeta, consts + # user modules below -from utils.long_lat import convert_dms_to_lat, convert_dms_to_long, convert_lat_to_dms, convert_long_to_dms, get_distance +from utils.long_lat import ( + convert_dms_to_lat, + convert_dms_to_long, + convert_lat_to_dms, + convert_long_to_dms, + get_distance +) from utils.reverse_geolocate import reverse_geolocate from utils.string_helpers import string_len_cjk, shorten_string, format_len @@ -40,6 +49,7 @@ cache_latin_letters = {} # ARGPARSE HELPERS + class WritableDirFolder(argparse.Action): """ checks if this is a writeable folder OR file @@ -48,6 +58,7 @@ class WritableDirFolder(argparse.Action): Args: argparse (_type_): _description_ """ + def __call__(self, parser, namespace, values, option_string=None): if isinstance(values, str) or values is None: print("FAIL") @@ -66,9 +77,8 @@ class WritableDirFolder(argparse.Action): # and write that list back to the self.dest in the namespace setattr(namespace, self.dest, out) else: - raise argparse.ArgumentTypeError( - f"writable_dir_folder: {prospective_dir} is not a writable dir" - ) + raise argparse.ArgumentTypeError(f"writable_dir_folder: {prospective_dir} is not a writable dir") + class ReadableDir(argparse.Action): """ @@ -77,23 +87,19 @@ class ReadableDir(argparse.Action): Args: argparse (_type_): _description_ """ + def __call__(self, parser, namespace, values, option_string=None): prospective_dir = values if not isinstance(prospective_dir, str): - raise argparse.ArgumentTypeError( - f"readable_dir:{prospective_dir} is not a readable dir" - ) + raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir") else: if not os.path.isdir(prospective_dir): - raise argparse.ArgumentTypeError( - f"readable_dir:{prospective_dir} is not a valid path" - ) + raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a valid path") if os.access(prospective_dir, os.R_OK): setattr(namespace, self.dest, prospective_dir) else: - raise argparse.ArgumentTypeError( - f"readable_dir:{prospective_dir} is not a readable dir" - ) + raise argparse.ArgumentTypeError(f"readable_dir:{prospective_dir} is not a readable dir") + class DistanceValues(argparse.Action): """ @@ -102,27 +108,25 @@ class DistanceValues(argparse.Action): Args: argparse (_type_): _description_ """ + def __call__(self, parser, namespace, values, option_string=None): if not isinstance(values, str): - raise argparse.ArgumentTypeError( - f"distance_values:{values} is not a valid argument" - ) + raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument") else: - _distance = re.match(r'^(\d+)\s?(m|km)$', values) + _distance = re.match(r"^(\d+)\s?(m|km)$", values) if _distance: # convert to int in meters values = int(_distance.group(1)) - if _distance.group(2) == 'km': + if _distance.group(2) == "km": values *= 1000 setattr(namespace, self.dest, values) else: - raise argparse.ArgumentTypeError( - f"distance_values:{values} is not a valid argument" - ) + raise argparse.ArgumentTypeError(f"distance_values:{values} is not a valid argument") # MAIN FUNCTIONS + def check_overwrite(data, key, field_controls, args): """ checks with field control flags if given data for key should be written @@ -144,16 +148,13 @@ def check_overwrite(data, key, field_controls, args): # init field controls for empty if not field_controls: field_controls = [] - if ( - not data and (len(field_controls) == 0 or - ('overwrite' in field_controls and len(field_controls) == 1)) - ): + if not data and (len(field_controls) == 0 or ("overwrite" in field_controls and len(field_controls) == 1)): status = True elif not data and key.lower() in field_controls: status = True - elif data and 'overwrite' in field_controls and len(field_controls) == 1: + elif data and "overwrite" in field_controls and len(field_controls) == 1: status = True - elif data and key.lower() in field_controls and 'overwrite' in field_controls: + elif data and key.lower() in field_controls and "overwrite" in field_controls: status = True if args.debug: print( @@ -167,6 +168,7 @@ def check_overwrite(data, key, field_controls, args): ) return status + def shorten_path(path, length=30, file_only=False, path_only=False): """ shortes a path from the left so it fits into lenght @@ -192,6 +194,7 @@ def shorten_path(path, length=30, file_only=False, path_only=False): path = f".. {path[string_len_cjk(path) - length:]}" return path + # def print_header(header, lines=0, header_line=0): # """ # prints header line and header seperator line @@ -214,15 +217,17 @@ def shorten_path(path, length=30, file_only=False, path_only=False): # lines += 1 # return lines + class ReadOnlyOutput: """ for read only listing """ + page_no = 1 page_all = 1 lines = 0 header_print = 0 - header_template = '' + header_template = "" def __init__(self, header_template, max_pages, header_print_line): self.page_all = max_pages @@ -247,11 +252,10 @@ class ReadOnlyOutput: self.lines = 0 # print header # print(f"{header}") - print(self.header_template.format( - page_no=self.page_no, page_all=self.page_all - )) + print(self.header_template.format(page_no=self.page_no, page_all=self.page_all)) self.lines += 1 + def file_sort_number(file): """ gets the BK number for sorting in the file list @@ -262,9 +266,10 @@ def file_sort_number(file): Returns: int: number found in the BK string or 0 for none """ - match = re.match(r'.*\.BK\.(\d+)\.xmp$', file) + match = re.match(r".*\.BK\.(\d+)\.xmp$", file) return int(match.group(1)) if match is not None else 0 + def output_list_width_adjust(args): """ adjusts the size for the format length for the list output @@ -277,22 +282,20 @@ def output_list_width_adjust(args): """ # various string lengths format_length = { - 'filename': 35, - 'latitude': 18, - 'longitude': 18, - 'code': 4, - 'country': 15, - 'state': 18, - 'city': 20, - 'location': 25, - 'path': 40, + "filename": 35, + "latitude": 18, + "longitude": 18, + "code": 4, + "country": 15, + "state": 18, + "city": 20, + "location": 25, + "path": 40, } if args.compact_view: reduce_percent = 40 # all formats are reduced to a mininum, we cut % off - for format_key in [ - 'filename', 'latitude', 'longitude', 'country', 'state', 'city', 'location', 'path' - ]: + for format_key in ["filename", "latitude", "longitude", "country", "state", "city", "location", "path"]: format_length[format_key] = ceil( format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent) ) @@ -314,12 +317,10 @@ def output_list_width_adjust(args): current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 if current_columns < get_terminal_size().columns: resize = 1 - format_key_order = ['path', 'location', 'state', 'city', 'country', 'filename'] + format_key_order = ["path", "location", "state", "city", "country", "filename"] else: resize = -1 - format_key_order = [ - 'latitude', 'longitude', 'path', 'country', 'state', 'city', 'location', 'filename' - ] + format_key_order = ["latitude", "longitude", "path", "country", "state", "city", "location", "filename"] # if we have no auto adjust if resize and args.no_autoadjust: # warningn if screen is too small @@ -336,33 +337,27 @@ def output_list_width_adjust(args): resize_width = ceil(format_length[format_key] + resize_width) # in case too small, keep old one format_length[format_key] = ( - resize_width - if resize_width > resize_width_min else format_length[format_key] + resize_width if resize_width > resize_width_min else format_length[format_key] ) # calc new width for check if we can abort - current_columns = ( - sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 - ) - if ( - (resize == 1 and current_columns >= get_terminal_size().columns) or - (resize == -1 and current_columns < get_terminal_size().columns) + current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 + if (resize == 1 and current_columns >= get_terminal_size().columns) or ( + resize == -1 and current_columns < get_terminal_size().columns ): # check that we are not OVER but one under width_up = get_terminal_size().columns - current_columns - 1 if (resize == 1 and width_up < 0) or (resize == -1 and width_up != 0): - if format_length['path'] + width_up >= resize_width_min: - format_length['path'] += width_up + if format_length["path"] + width_up >= resize_width_min: + format_length["path"] += width_up abort = True break if abort: break - if ( - sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > - get_terminal_size().columns - ): + if sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > get_terminal_size().columns: print("[!!!] Screen layout might be skewed. Increase Terminal width") return format_length + def get_backup_file_counter(xmp_file, args): """ get backup file counter @@ -384,15 +379,12 @@ def get_backup_file_counter(xmp_file, args): # path=os.path.split(xmp_file)[0], # file=f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK." # ) - os.path.join( - f"{os.path.split(xmp_file)[0]}", - f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK.*.xmp" - ) + os.path.join(f"{os.path.split(xmp_file)[0]}", f"{os.path.splitext(os.path.split(xmp_file)[1])[0]}.BK.*.xmp") ), # custom sort key to get the backup files sorted correctly key=lambda pos: file_sort_number(pos), # key=file_sort_number(), - reverse=True + reverse=True, ): # BK.1, etc -> get the number bk_pos = file_sort_number(bk_file) @@ -406,10 +398,12 @@ def get_backup_file_counter(xmp_file, args): # return the next correct number for backup return bk_file_counter + ############################################################## # ARGUMENT PARSING ############################################################## + def argument_parser(): """ Parses the command line arguments @@ -419,56 +413,56 @@ def argument_parser(): """ parser = argparse.ArgumentParser( - description='Reverse Geoencoding based on set Latitude/Longitude data in XMP files', + description="Reverse Geoencoding based on set Latitude/Longitude data in XMP files", # formatter_class=argparse.RawDescriptionHelpFormatter, - epilog='Sample: (todo)' + epilog="Sample: (todo)", ) # xmp folder (or folders), or file (or files) # note that the target directory or file needs to be writeable parser.add_argument( - '-i', - '--include-source', + "-i", + "--include-source", required=True, - nargs='*', + nargs="*", action=WritableDirFolder, - dest='xmp_sources', - metavar='XMP SOURCE FOLDER', + dest="xmp_sources", + metavar="XMP SOURCE FOLDER", help=( - 'The source folder or folders with the XMP files that need reverse geo encoding ' - 'to be set. Single XMP files can be given here' - ) + "The source folder or folders with the XMP files that need reverse geo encoding " + "to be set. Single XMP files can be given here" + ), ) # exclude folders parser.add_argument( - '-x', - '--exclude-source', - nargs='*', + "-x", + "--exclude-source", + nargs="*", action=WritableDirFolder, - dest='exclude_sources', - metavar='EXCLUDE XMP SOURCE FOLDER', - help='Folders and files that will be excluded.' + dest="exclude_sources", + metavar="EXCLUDE XMP SOURCE FOLDER", + help="Folders and files that will be excluded.", ) # LR database (base folder) # get .lrcat file in this folder parser.add_argument( - '-l', - '--lightroom', + "-l", + "--lightroom", # required=True, action=ReadableDir, - dest='lightroom_folder', - metavar='LIGHTROOM FOLDER', - help='Lightroom catalogue base folder' + dest="lightroom_folder", + metavar="LIGHTROOM FOLDER", + help="Lightroom catalogue base folder", ) # strict LR check with base path next to the file base name parser.add_argument( - '-s', - '--strict', - dest='lightroom_strict', - action='store_true', - help='Do strict check for Lightroom files including Path in query' + "-s", + "--strict", + dest="lightroom_strict", + action="store_true", + help="Do strict check for Lightroom files including Path in query", ) # set behaviour override @@ -477,157 +471,122 @@ def argument_parser(): # overwrite specifc field (as defined below) # fields: Location, City, State, Country, CountryCode parser.add_argument( - '-f', - '--field', - action='append', + "-f", + "--field", + action="append", type=str.lower, # make it lowercase for check - choices=['overwrite', 'location', 'city', 'state', 'country', 'countrycode'], - dest='field_controls', - metavar='', + choices=["overwrite", "location", "city", "state", "country", "countrycode"], + dest="field_controls", + metavar="", help=( - 'On default only set fields that are not set yet. Options are: ' - 'Overwrite (write all new), Location, City, State, Country, CountryCode. ' - 'Multiple can be given for combination overwrite certain fields only ' - 'or set only certain fields. ' - 'If with overwrite the field will be overwritten if already set, ' - 'else it will be always skipped.' - ) + "On default only set fields that are not set yet. Options are: " + "Overwrite (write all new), Location, City, State, Country, CountryCode. " + "Multiple can be given for combination overwrite certain fields only " + "or set only certain fields. " + "If with overwrite the field will be overwritten if already set, " + "else it will be always skipped." + ), ) parser.add_argument( - '-d', - '--fuzzy-cache', + "-d", + "--fuzzy-cache", type=str.lower, action=DistanceValues, - nargs='?', - const='10m', # default is 10m - dest='fuzzy_distance', - metavar='FUZZY DISTANCE', + nargs="?", + const="10m", # default is 10m + dest="fuzzy_distance", + metavar="FUZZY DISTANCE", help=( - 'Allow fuzzy distance cache lookup. Optional distance can be given, ' - 'if not set default of 10m is used. ' - 'Allowed argument is in the format of 12m or 12km' - ) + "Allow fuzzy distance cache lookup. Optional distance can be given, " + "if not set default of 10m is used. " + "Allowed argument is in the format of 12m or 12km" + ), ) # Google Maps API key to overcome restrictions parser.add_argument( - '-g', - '--google', - dest='google_api_key', - metavar='GOOGLE API KEY', - help='Set a Google API Maps key to overcome the default lookup limitations' + "-g", + "--google", + dest="google_api_key", + metavar="GOOGLE API KEY", + help="Set a Google API Maps key to overcome the default lookup limitations", ) # use open street maps parser.add_argument( - '-o', - '--openstreetmap', - dest='use_openstreetmap', - action='store_true', - help='Use openstreetmap instead of Google' + "-o", + "--openstreetmap", + dest="use_openstreetmap", + action="store_true", + help="Use openstreetmap instead of Google", ) # email of open street maps requests parser.add_argument( - '-e', - '--email', - dest='email', - metavar='EMIL ADDRESS', - help='An email address for OpenStreetMap' + "-e", "--email", dest="email", metavar="EMIL ADDRESS", help="An email address for OpenStreetMap" ) # write api/email settings to config file parser.add_argument( - '-w', - '--write-settings', - dest='config_write', - action='store_true', - help='Write Google API or OpenStreetMap email to config file' + "-w", + "--write-settings", + dest="config_write", + action="store_true", + help="Write Google API or OpenStreetMap email to config file", ) # only read data and print on screen, do not write anything parser.add_argument( - '-r', - '--read-only', - dest='read_only', - action='store_true', - help=( - 'Read current values from the XMP file only, ' - 'do not read from LR or lookup any data and write back' - ) + "-r", + "--read-only", + dest="read_only", + action="store_true", + help=("Read current values from the XMP file only, " "do not read from LR or lookup any data and write back"), ) # only list unset ones - parser.add_argument( - '-u', - '--unset-only', - dest='unset_only', - action='store_true', - help='Only list unset XMP files' - ) + parser.add_argument("-u", "--unset-only", dest="unset_only", action="store_true", help="Only list unset XMP files") # only list unset GPS codes parser.add_argument( - '-p', - '--unset-gps-only', - dest='unset_gps_only', - action='store_true', - help='Only list unset XMP files for GPS fields' + "-p", + "--unset-gps-only", + dest="unset_gps_only", + action="store_true", + help="Only list unset XMP files for GPS fields", ) # don't try to do auto adjust in list view parser.add_argument( - '-a', - '--no-autoadjust', - dest='no_autoadjust', - action='store_true', - help='Don\'t try to auto adjust columns' + "-a", "--no-autoadjust", dest="no_autoadjust", action="store_true", help="Don't try to auto adjust columns" ) # compact view, compresses columns down to a minimum - parser.add_argument( - '-c', - '--compact', - dest='compact_view', - action='store_true', - help='Very compact list view' - ) + parser.add_argument("-c", "--compact", dest="compact_view", action="store_true", help="Very compact list view") # Do not create backup files parser.add_argument( - '-n', - '--nobackup', - dest='no_xmp_backup', - action='store_true', - help='Do not create a backup from the XMP file' + "-n", "--nobackup", dest="no_xmp_backup", action="store_true", help="Do not create a backup from the XMP file" ) # verbose args for more detailed output - parser.add_argument( - '-v', - '--verbose', - action='count', - dest='verbose', - help='Set verbose output level' - ) + parser.add_argument("-v", "--verbose", action="count", dest="verbose", help="Set verbose output level") # debug flag - parser.add_argument( - '--debug', action='store_true', dest='debug', help='Set detailed debug output' - ) + parser.add_argument("--debug", action="store_true", dest="debug", help="Set detailed debug output") # test flag - parser.add_argument( - '--test', action='store_true', dest='test', help='Do not write data back to file' - ) + parser.add_argument("--test", action="store_true", dest="test", help="Do not write data back to file") # read in the argumens return parser.parse_args() + ############################################################## # MAIN CODE ############################################################## + def main(): """ Main code run @@ -658,7 +617,7 @@ def main(): # error flag error = False # set search map type - map_type = 'google' if not args.use_openstreetmap else 'openstreetmap' + map_type = "google" if not args.use_openstreetmap else "openstreetmap" # if -g and -o, error if args.google_api_key and args.use_openstreetmap: print("You cannot set a Google API key and use OpenStreetMap at the same time") @@ -673,7 +632,7 @@ def main(): error = True # if email and not basic valid email (@ .) if args.email: - if not re.match(r'^.+@.+\.[A-Za-z]{1,}$', args.email): + if not re.match(r"^.+@.+\.[A-Za-z]{1,}$", args.email): print(f"Not a valid email for OpenStreetMap: {args.email}") error = True # on error exit here @@ -683,44 +642,42 @@ def main(): config = configparser.ConfigParser() # try to find config file in following order # $HOME/.config/ - config_file = 'reverse_geolocate.cfg' - config_folder = os.path.expanduser('~/.config/reverseGeolocate/') + config_file = "reverse_geolocate.cfg" + config_folder = os.path.expanduser("~/.config/reverseGeolocate/") config_data = os.path.join(f"{config_folder}", f"{config_file}") # if file exists read, if not skip unless we have write flag and # google api or openstreetmaps email if os.path.isfile(config_data): config.read(config_data) # check if api group & setting is there. also never overwrite argument given data - if 'API' in config: - if 'googleapikey' in config['API']: + if "API" in config: + if "googleapikey" in config["API"]: if not args.google_api_key: - args.google_api_key = config['API']['googleapikey'] - if 'openstreetmapemail' in config['API']: + args.google_api_key = config["API"]["googleapikey"] + if "openstreetmapemail" in config["API"]: if not args.email: - args.email = config['API']['openstreetmapemail'] + args.email = config["API"]["openstreetmapemail"] # write data if exists and changed if args.config_write and (args.google_api_key or args.email): config_change = False # check if new value differs, if yes, change and write - if 'API' not in config: - config['API'] = {} - if ( - args.google_api_key and ('googleapikey' not in config['API'] or - config['API']['googleapikey'] != args.google_api_key) + if "API" not in config: + config["API"] = {} + if args.google_api_key and ( + "googleapikey" not in config["API"] or config["API"]["googleapikey"] != args.google_api_key ): - config['API']['googleapikey'] = args.google_api_key + config["API"]["googleapikey"] = args.google_api_key config_change = True - if ( - args.email and ('openstreetmapemail' not in config['API'] or - config['API']['openstreetmapemail'] != args.email) + if args.email and ( + "openstreetmapemail" not in config["API"] or config["API"]["openstreetmapemail"] != args.email ): - config['API']['openstreetmapemail'] = args.email + config["API"]["openstreetmapemail"] = args.email config_change = True if config_change: # if we do not have the base folder create that first if not os.path.exists(config_folder): os.makedirs(config_folder) - with open(config_data, 'w', encoding="UTF-8") as fptr: + with open(config_data, "w", encoding="UTF-8") as fptr: config.write(fptr) if args.debug: print(f"### OVERRIDE API: G: {args.google_api_key}, O: {args.email}") @@ -738,25 +695,25 @@ def main(): # Iptc4xmpCore:CountryCode xmp_fields = { # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format - 'GPSLatitude': consts.XMP_NS_EXIF, - 'GPSLongitude': consts.XMP_NS_EXIF, - 'Location': consts.XMP_NS_IPTCCore, - 'City': consts.XMP_NS_Photoshop, - 'State': consts.XMP_NS_Photoshop, - 'Country': consts.XMP_NS_Photoshop, - 'CountryCode': consts.XMP_NS_IPTCCore + "GPSLatitude": consts.XMP_NS_EXIF, + "GPSLongitude": consts.XMP_NS_EXIF, + "Location": consts.XMP_NS_IPTCCore, + "City": consts.XMP_NS_Photoshop, + "State": consts.XMP_NS_Photoshop, + "Country": consts.XMP_NS_Photoshop, + "CountryCode": consts.XMP_NS_IPTCCore, } # non lat/long fields (for loc loops) - data_set_loc = ('Location', 'City', 'State', 'Country', 'CountryCode') + data_set_loc = ("Location", "City", "State", "Country", "CountryCode") # one xmp data set data_set = { - 'GPSLatitude': '', - 'GPSLongitude': '', - 'Location': '', - 'City': '', - 'State': '', - 'Country': '', - 'CountryCode': '' + "GPSLatitude": "", + "GPSLongitude": "", + "Location": "", + "City": "", + "State": "", + "Country": "", + "CountryCode": "", } # original set for compare (is constant unchanged) data_set_original = {} @@ -769,67 +726,67 @@ def main(): # use lightroom use_lightroom = False # path to lightroom database - lightroom_database = '' + lightroom_database = "" # cursors & query - query = '' + query = "" cur = None lrdb = None # count variables count = { - 'all': 0, - 'listed': 0, - 'read': 0, - 'map': 0, - 'cache': 0, - 'fuzzy_cache': 0, - 'lightroom': 0, - 'changed': 0, - 'failed': 0, - 'skipped': 0, - 'not_found': 0, - 'many_found': 0, + "all": 0, + "listed": 0, + "read": 0, + "map": 0, + "cache": 0, + "fuzzy_cache": 0, + "lightroom": 0, + "changed": 0, + "failed": 0, + "skipped": 0, + "not_found": 0, + "many_found": 0, } # do lightroom stuff only if we have the lightroom folder if args.lightroom_folder: # query string for lightroom DB check query = ( - 'SELECT Adobe_images.id_local, AgLibraryFile.baseName, ' - 'AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, ' - 'AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, ' - 'AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, ' - 'AgHarvestedIptcMetadata.locationDataOrigination, ' - 'AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, ' - 'AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, ' - 'AgInternedIptcIsoCountryCode.value as CountryCode ' - 'FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, ' - 'AgLibraryRootFolder, Adobe_images ' - 'LEFT JOIN AgHarvestedIptcMetadata ' - 'ON Adobe_images.id_local = AgHarvestedIptcMetadata.image ' - 'LEFT JOIN AgInternedIptcLocation ' - 'ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local ' - 'LEFT JOIN AgInternedIptcCity ' - 'ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local ' - 'LEFT JOIN AgInternedIptcState ' - 'ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local ' - 'LEFT JOIN AgInternedIptcCountry ' - 'ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local ' - 'LEFT JOIN AgInternedIptcIsoCountryCode ' - 'ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local ' - 'WHERE Adobe_images.rootFile = AgLibraryFile.id_local ' - 'AND Adobe_images.id_local = AgHarvestedExifMetadata.image ' - 'AND AgLibraryFile.folder = AgLibraryFolder.id_local ' - 'AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local ' - 'AND AgLibraryFile.baseName = ?' + "SELECT Adobe_images.id_local, AgLibraryFile.baseName, " + "AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, " + "AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, " + "AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, " + "AgHarvestedIptcMetadata.locationDataOrigination, " + "AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, " + "AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, " + "AgInternedIptcIsoCountryCode.value as CountryCode " + "FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, " + "AgLibraryRootFolder, Adobe_images " + "LEFT JOIN AgHarvestedIptcMetadata " + "ON Adobe_images.id_local = AgHarvestedIptcMetadata.image " + "LEFT JOIN AgInternedIptcLocation " + "ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local " + "LEFT JOIN AgInternedIptcCity " + "ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local " + "LEFT JOIN AgInternedIptcState " + "ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local " + "LEFT JOIN AgInternedIptcCountry " + "ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local " + "LEFT JOIN AgInternedIptcIsoCountryCode " + "ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local " + "WHERE Adobe_images.rootFile = AgLibraryFile.id_local " + "AND Adobe_images.id_local = AgHarvestedExifMetadata.image " + "AND AgLibraryFile.folder = AgLibraryFolder.id_local " + "AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local " + "AND AgLibraryFile.baseName = ?" ) # absolutePath + pathFromRoot = path of XMP file - XMP file if args.lightroom_strict: - query += 'AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?' + query += "AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?" # connect to LR database for reading # open the folder and look for the first lrcat file in there for file in os.listdir(args.lightroom_folder): - if file.endswith('.lrcat'): + if file.endswith(".lrcat"): lightroom_database = os.path.join(args.lightroom_folder, file) lrdb = sqlite3.connect(lightroom_database) if not lightroom_database or not lrdb: @@ -862,11 +819,9 @@ def main(): # if folder, open and loop # NOTE: we do check for folders in there, if there are we recourse traverse them # also check that folder is not in exclude list - if ( - os.path.isdir(xmp_file_source) and - xmp_file_source.rstrip(os.sep) not in [x.rstrip(os.sep) - for x in args.exclude_sources] - ): + if os.path.isdir(xmp_file_source) and xmp_file_source.rstrip(os.sep) not in [ + x.rstrip(os.sep) for x in args.exclude_sources + ]: # open folder and look for any .xmp files and push them into holding array # if there are folders, dive into them # or glob glob all .xmp files + directory @@ -877,24 +832,24 @@ def main(): # 3) full folder is not in exclude list file_path = os.path.join(f"{root}", f"{file}") if ( - file.endswith(".xmp") and ".BK." not in file + file.endswith(".xmp") + and ".BK." not in file and file_path not in args.exclude_sources - and root.rstrip(os.sep) not in [x.rstrip(os.sep) - for x in args.exclude_sources] + and root.rstrip(os.sep) not in [x.rstrip(os.sep) for x in args.exclude_sources] ): if file_path not in work_files: work_files.append(file_path) - count['all'] += 1 + count["all"] += 1 else: # not already added to list and not in the exclude list either if xmp_file_source not in work_files and xmp_file_source not in args.exclude_sources: work_files.append(xmp_file_source) - count['all'] += 1 + count["all"] += 1 if args.debug: print(f"### Work Files {work_files}") - format_line = '' - header_line = '' + format_line = "" + header_line = "" format_length = {} header_print = None # if we have read only we print list format style @@ -988,22 +943,22 @@ def main(): # ) # pre replace path length before we add the header titles header_line_2 = format_line.format( - filenamelen=format_length['filename'], - countrylen=format_length['country'], - statelen=format_length['state'], - citylen=format_length['city'], - locationlen=format_length['location'], - pathlen=format_length['path'] + filenamelen=format_length["filename"], + countrylen=format_length["country"], + statelen=format_length["state"], + citylen=format_length["city"], + locationlen=format_length["location"], + pathlen=format_length["path"], ).format( # the header title line - filename='File'[:format_length['filename']], - latitude='Latitude'[:format_length['latitude']], - longitude='Longitude'[:format_length['longitude']], - code='Code', - country='Country'[:format_length['country']], - state='State'[:format_length['state']], - city='City'[:format_length['city']], - location='Location'[:format_length['location']], - path='Path'[:format_length['path']] + filename="File"[: format_length["filename"]], + latitude="Latitude"[: format_length["latitude"]], + longitude="Longitude"[: format_length["longitude"]], + code="Code", + country="Country"[: format_length["country"]], + state="State"[: format_length["state"]], + city="City"[: format_length["city"]], + location="Location"[: format_length["location"]], + path="Path"[: format_length["path"]], ) header_line_3 = ( f"{'-' * (format_length['filename'] + 2)}+" @@ -1024,11 +979,7 @@ def main(): f"{header_line_3}" ) # header print class - header_print = ReadOnlyOutput( - header_line, - page_all, - header_repeat - ) + header_print = ReadOnlyOutput(header_line, page_all, header_repeat) # print header # print_header(header_line.format(page_no=page_no, page_all=page_all)) header_print.print_header() @@ -1040,14 +991,14 @@ def main(): # now we just loop through each file and work on them for xmp_file in work_files: # noqa: C901 if not args.read_only: - print(f"---> {xmp_file}: ", end='') + print(f"---> {xmp_file}: ", end="") # ### ACTION FLAGs write_file = False # ### XMP FILE READING # open file & read all into buffer - with open(xmp_file, 'r', encoding="UTF-8") as fptr: + with open(xmp_file, "r", encoding="UTF-8") as fptr: strbuffer = fptr.read() # read fields from the XMP file and store in hash xmp.parse_from_str(strbuffer) @@ -1064,18 +1015,15 @@ def main(): if xmp.does_property_exist(xmp_field_value, xmp_field_key): data_set[xmp_field_key] = xmp.get_property(xmp_field_value, xmp_field_key) else: - data_set[xmp_field_key] = '' + data_set[xmp_field_key] = "" if args.debug: - print( - f"### => XMP: {xmp_field_value}:{xmp_field_key} => {data_set[xmp_field_key]}" - ) + print(f"### => XMP: {xmp_field_value}:{xmp_field_key} => {data_set[xmp_field_key]}") if args.read_only: # view only if list all or if data is unset if ( - (not args.unset_only and not args.unset_gps_only) or - (args.unset_only and '' in data_set.values()) or - (args.unset_gps_only and (not data_set['GPSLatitude'] or - not data_set['GPSLongitude'])) + (not args.unset_only and not args.unset_gps_only) + or (args.unset_only and "" in data_set.values()) + or (args.unset_gps_only and (not data_set["GPSLatitude"] or not data_set["GPSLongitude"])) ): # for read only we print out the data formatted # headline check, do we need to print that @@ -1087,72 +1035,47 @@ def main(): if header_print is not None: header_print.print_header() # the data content - print(format_line.format( + print( + format_line.format( # for all possible non latin fields we do adjust # if it has double byte characters inside filenamelen=format_len( - shorten_path(xmp_file, format_length['filename'], file_only=True), - format_length['filename'] + shorten_path(xmp_file, format_length["filename"], file_only=True), format_length["filename"] ), countrylen=format_len( - shorten_string(data_set['Country'], width=format_length['country']), - format_length['country'] + shorten_string(data_set["Country"], width=format_length["country"]), + format_length["country"], ), statelen=format_len( - shorten_string(data_set['State'], width=format_length['state']), - format_length['state'] + shorten_string(data_set["State"], width=format_length["state"]), format_length["state"] ), citylen=format_len( - shorten_string(data_set['City'], width=format_length['city']), - format_length['city'] + shorten_string(data_set["City"], width=format_length["city"]), format_length["city"] ), locationlen=format_len( - shorten_string(data_set['Location'], width=format_length['location']), - format_length['location'] + shorten_string(data_set["Location"], width=format_length["location"]), + format_length["location"], ), pathlen=format_len( - shorten_path(xmp_file, format_length['path'], path_only=True), - format_length['path'] - ) + shorten_path(xmp_file, format_length["path"], path_only=True), format_length["path"] + ), ).format( # shorten from the left - filename=shorten_path( - xmp_file, format_length['filename'], - file_only=True - ), + filename=shorten_path(xmp_file, format_length["filename"], file_only=True), # cut off from the right - latitude=( - str(convert_dms_to_lat(data_set['GPSLatitude'])) - [:format_length['latitude']] - ), - longitude=( - str(convert_dms_to_long(data_set['GPSLongitude'])) - [:format_length['longitude']] - ), + latitude=(str(convert_dms_to_lat(data_set["GPSLatitude"]))[: format_length["latitude"]]), + longitude=(str(convert_dms_to_long(data_set["GPSLongitude"]))[: format_length["longitude"]]), # is only 2 chars - code=data_set['CountryCode'][:2].center(4), + code=data_set["CountryCode"][:2].center(4), # shorten from the right - country=shorten_string( - data_set['Country'], width=format_length['country'] - ), - state=shorten_string( - data_set['State'], width=format_length['state'] - ), - city=shorten_string( - data_set['City'], width=format_length['city'] - ), - location=shorten_string( - data_set['Location'], - width=format_length['location'] - ), - path=shorten_path( - xmp_file, - format_length['path'], - path_only=True - ) + country=shorten_string(data_set["Country"], width=format_length["country"]), + state=shorten_string(data_set["State"], width=format_length["state"]), + city=shorten_string(data_set["City"], width=format_length["city"]), + location=shorten_string(data_set["Location"], width=format_length["location"]), + path=shorten_path(xmp_file, format_length["path"], path_only=True), ) ) - count['listed'] += 1 + count["listed"] += 1 else: # ### LR Action Flag (data ok) lightroom_data_ok = True @@ -1177,12 +1100,12 @@ def main(): if cur.fetchone() is not None: print("(!) Lightroom DB returned more than one more row") lightroom_data_ok = False - count['many_found'] += 1 + count["many_found"] += 1 # Notify if we couldn't find one elif not lrdb_row: print("(!) Could not get data from Lightroom DB") lightroom_data_ok = False - count['not_found'] += 1 + count["not_found"] += 1 if args.debug and lrdb_row: print(f"### LightroomDB: {tuple(lrdb_row)} / {lrdb_row.keys()}") @@ -1195,11 +1118,11 @@ def main(): # if missing in both do lookup in Maps if use_lightroom and lightroom_data_ok: # check lat/long separate - if lrdb_row['gpsLatitude'] and not data_set['GPSLatitude']: + if lrdb_row["gpsLatitude"] and not data_set["GPSLatitude"]: # we need to convert to the Degree,Min.sec[NSEW] format - data_set['GPSLatitude'] = convert_lat_to_dms(lrdb_row['gpsLatitude']) - if lrdb_row['gpsLongitude'] and not data_set['GPSLongitude']: - data_set['GPSLongitude'] = convert_long_to_dms(lrdb_row['gpsLongitude']) + data_set["GPSLatitude"] = convert_lat_to_dms(lrdb_row["gpsLatitude"]) + if lrdb_row["gpsLongitude"] and not data_set["GPSLongitude"]: + data_set["GPSLongitude"] = convert_long_to_dms(lrdb_row["gpsLongitude"]) # now check Location, City, etc for loc in data_set_loc: # overwrite original set (read from XMP) with LR data @@ -1221,10 +1144,7 @@ def main(): # check if lat/long is in cache cache_key = f"{data_set['GPSLongitude']}#{data_set['GPSLatitude']}" if args.debug: - print( - f"### *** CACHE: {cache_key}: " - f"{'NO' if cache_key not in data_cache else 'YES'}" - ) + print(f"### *** CACHE: {cache_key}: " f"{'NO' if cache_key not in data_cache else 'YES'}") # main chache check = identical # second cache level check is on distance: # default distance is 10m, can be set via flag @@ -1232,13 +1152,13 @@ def main(): # and match before we do google lookup if cache_key not in data_cache: has_fuzzy_cache = False - best_match_latlong = '' + best_match_latlong = "" if args.fuzzy_distance: shortest_distance = args.fuzzy_distance # check if we have fuzzy distance, if no valid found do maps lookup for _cache_key in data_cache: # split up cache key so we can use in the distance calc method - to_lat_long = _cache_key.split('#') + to_lat_long = _cache_key.split("#") # get the distance based on current set + cached set # print( # f"Lookup f-long {data_set['GPSLongitude']} " @@ -1246,10 +1166,10 @@ def main(): # f"t-long {to_lat_long[0]} t-lat {to_lat_long[1]}" # ) distance = get_distance( - from_longitude=data_set['GPSLongitude'], - from_latitude=data_set['GPSLatitude'], + from_longitude=data_set["GPSLongitude"], + from_latitude=data_set["GPSLatitude"], to_longitude=to_lat_long[0], - to_latitude=to_lat_long[1] + to_latitude=to_lat_long[1], ) if args.debug: print( @@ -1262,17 +1182,14 @@ def main(): best_match_latlong = _cache_key has_fuzzy_cache = True if args.debug: - print( - "### ***= FUZZY CACHE: YES => " - f"Best match: {best_match_latlong}" - ) + print("### ***= FUZZY CACHE: YES => " f"Best match: {best_match_latlong}") if not has_fuzzy_cache: # get location from maps (google or openstreetmap) maps_location = reverse_geolocate( - latitude=data_set['GPSLatitude'], - longitude=data_set['GPSLongitude'], + latitude=data_set["GPSLatitude"], + longitude=data_set["GPSLongitude"], map_type=map_type, - args=args + args=args, ) # cache data with Lat/Long data_cache[cache_key] = maps_location @@ -1282,19 +1199,19 @@ def main(): # cache this one, because the next one will match this one too # we don't need to loop search again for the same fuzzy location data_cache[cache_key] = maps_location - count['cache'] += 1 - count['fuzzy_cache'] += 1 + count["cache"] += 1 + count["fuzzy_cache"] += 1 from_cache = True else: # load location from cache maps_location = data_cache[cache_key] - count['cache'] += 1 + count["cache"] += 1 from_cache = True # overwrite sets (note options check here) if args.debug: print(f"### Map Location ({map_type}): {maps_location}") # must have at least the country set to write anything back - if maps_location['Country']: + if maps_location["Country"]: for loc in data_set_loc: # only write to XMP if overwrite check passes if check_overwrite(data_set_original[loc], loc, args.field_controls, args): @@ -1302,16 +1219,13 @@ def main(): xmp.set_property(xmp_fields[loc], loc, maps_location[loc]) write_file = True if write_file: - count['map'] += 1 + count["map"] += 1 else: - print("(!) Could not geo loaction data ", end='') + print("(!) Could not geo loaction data ", end="") failed = True else: if args.debug: - print( - f"Lightroom data use: {use_lightroom}, " - f"Lightroom data ok: {lightroom_data_ok}" - ) + print(f"Lightroom data use: {use_lightroom}, " f"Lightroom data ok: {lightroom_data_ok}") # check if the data_set differs from the original (LR db load) # if yes write, else skip if use_lightroom and lightroom_data_ok: @@ -1325,16 +1239,13 @@ def main(): # write_file = True for key, value in data_set.items(): # if not the same (to original data) and passes overwrite check - if ( - value != data_set_original[key] and - check_overwrite( - data_set_original[key], key, args.field_controls, args - ) + if value != data_set_original[key] and check_overwrite( + data_set_original[key], key, args.field_controls, args ): xmp.set_property(xmp_fields[key], key, value) write_file = True if write_file: - count['lightroom'] += 1 + count["lightroom"] += 1 # if we have the write flag set, write data if write_file: if not args.test: @@ -1346,27 +1257,26 @@ def main(): # copy to new backup file copyfile( xmp_file, - f"{os.path.splitext(xmp_file)[0]}.BK." - f"{bk_file_counter}{os.path.splitext(xmp_file)[1]}" + f"{os.path.splitext(xmp_file)[0]}.BK." f"{bk_file_counter}{os.path.splitext(xmp_file)[1]}", ) # write back to riginal file - with open(xmp_file, 'w', encoding="UTF-8") as fptr: + with open(xmp_file, "w", encoding="UTF-8") as fptr: fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True)) else: - print(f"[TEST] Would write {data_set} {xmp_file}", end='') + print(f"[TEST] Would write {data_set} {xmp_file}", end="") if from_cache: print("[UPDATED FROM CACHE]") else: print("[UPDATED]") - count['changed'] += 1 + count["changed"] += 1 elif failed: print("[FAILED]") - count['failed'] += 1 + count["failed"] += 1 # log data to array for post print failed_files.append(xmp_file) else: print("[SKIP]") - count['skipped'] += 1 + count["skipped"] += 1 # close DB connection if use_lightroom and lrdb is not None: diff --git a/bin/utils/long_lat.py b/bin/utils/long_lat.py index 31791d6..e4282d5 100644 --- a/bin/utils/long_lat.py +++ b/bin/utils/long_lat.py @@ -5,6 +5,7 @@ latitude/longitude functions import re from math import radians, sin, cos, atan2, sqrt + def convert_lat_long_to_dms(lat_long, is_latitude=False, is_longitude=False): """ convert the LR format of N.N to the Exif GPS format @@ -21,13 +22,14 @@ def convert_lat_long_to_dms(lat_long, is_latitude=False, is_longitude=False): degree = int(abs(lat_long)) minutes = round((float(abs(lat_long)) - int(abs(lat_long))) * 60, 10) if is_latitude is True: - direction = 'S' if int(lat_long) < 0 else 'N' + direction = "S" if int(lat_long) < 0 else "N" elif is_longitude is True: - direction = 'W' if int(lat_long) < 0 else 'E' + direction = "W" if int(lat_long) < 0 else "E" else: - direction = '(INVALID)' + direction = "(INVALID)" return f"{degree},{minutes}{direction}" + def convert_lat_to_dms(lat_long): """ wrapper functions for Long/Lat calls: latitude @@ -54,6 +56,7 @@ def convert_long_to_dms(lat_long): """ return convert_lat_long_to_dms(lat_long, is_longitude=True) + def long_lat_reg(longitude, latitude): """ converts the XMP/EXIF formatted GPS Long/Lat coordinates @@ -68,12 +71,9 @@ def long_lat_reg(longitude, latitude): dictionary: dict with converted lat/long """ # regex - latlong_re = re.compile(r'^(\d+),(\d+\.\d+)([NESW]{1})$') + latlong_re = re.compile(r"^(\d+),(\d+\.\d+)([NESW]{1})$") # dict for loop - lat_long = { - 'longitude': longitude, - 'latitude': latitude - } + lat_long = {"longitude": longitude, "latitude": latitude} # for element in lat_long: for index, element in lat_long.items(): # match if it is exif GPS format @@ -82,10 +82,11 @@ def long_lat_reg(longitude, latitude): # convert from Degree, Min.Sec into float format lat_long[index] = float(_match.group(1)) + (float(_match.group(2)) / 60) # if S or W => inverse to negative - if _match.group(3) == 'S' or _match.group(3) == 'W': + if _match.group(3) == "S" or _match.group(3) == "W": lat_long[index] *= -1 return lat_long + def convert_dms_to_lat(lat_long): """ rapper calls for DMS to Lat/Long: latitude @@ -96,7 +97,8 @@ def convert_dms_to_lat(lat_long): Returns: dict: dict with converted lat/long """ - return long_lat_reg('0,0.0N', lat_long)['latitude'] + return long_lat_reg("0,0.0N", lat_long)["latitude"] + def convert_dms_to_long(lat_long): """ @@ -108,7 +110,8 @@ def convert_dms_to_long(lat_long): Returns: dict: dict with converted lat/long """ - return long_lat_reg(lat_long, '0,0.0N')['longitude'] + return long_lat_reg(lat_long, "0,0.0N")["longitude"] + def get_distance(from_longitude, from_latitude, to_longitude, to_latitude): """ @@ -134,7 +137,8 @@ def get_distance(from_longitude, from_latitude, to_longitude, to_latitude): distance_longitude = from_longitude - to_longitude distance_latitude = from_latitude - to_latitude # main distance calculation - distance = sin(distance_latitude / 2)**2 + cos(from_latitude) * \ - cos(to_latitude) * sin(distance_longitude / 2)**2 + distance = ( + sin(distance_latitude / 2) ** 2 + cos(from_latitude) * cos(to_latitude) * sin(distance_longitude / 2) ** 2 + ) distance = 2 * atan2(sqrt(distance), sqrt(1 - distance)) return earth_radius * distance diff --git a/bin/utils/reverse_geolocate.py b/bin/utils/reverse_geolocate.py index dd7c8f8..1f0e3cf 100644 --- a/bin/utils/reverse_geolocate.py +++ b/bin/utils/reverse_geolocate.py @@ -2,11 +2,12 @@ reverse geolacte functions """ -import requests import re +import requests from utils.long_lat import long_lat_reg from utils.string_helpers import only_latin_chars + def reverse_geolocate(longitude, latitude, map_type, args): """ wrapper to call to either the google or openstreetmap @@ -27,16 +28,13 @@ def reverse_geolocate(longitude, latitude, map_type, args): # detect and convert lat_long = long_lat_reg(longitude=longitude, latitude=latitude) # which service to use - if map_type == 'google': - return reverse_geolocate_google(lat_long['longitude'], lat_long['latitude'], args) - elif map_type == 'openstreetmap': - return reverse_geolocate_open_street_map(lat_long['longitude'], lat_long['latitude'], args) + if map_type == "google": + return reverse_geolocate_google(lat_long["longitude"], lat_long["latitude"], args) + elif map_type == "openstreetmap": + return reverse_geolocate_open_street_map(lat_long["longitude"], lat_long["latitude"], args) else: - return { - 'Country': '', - 'status': 'ERROR', - 'error': 'Map type not valid' - } + return {"Country": "", "status": "ERROR", "error": "Map type not valid"} + def reverse_geolocate_init(longitude, latitude): """ @@ -52,22 +50,23 @@ def reverse_geolocate_init(longitude, latitude): """ # basic dict format geolocation = { - 'CountryCode': '', - 'Country': '', - 'State': '', - 'City': '', - 'Location': '', + "CountryCode": "", + "Country": "", + "State": "", + "City": "", + "Location": "", # below for error reports - 'status': '', - 'error_message': '' + "status": "", + "error_message": "", } # error if long/lat is not valid - latlong_re = re.compile(r'^\d+\.\d+$') + latlong_re = re.compile(r"^\d+\.\d+$") if not latlong_re.match(str(longitude)) or not latlong_re.match(str(latitude)): - geolocation['status'] = 'ERROR' - geolocation['error_message'] = f"Latitude {latitude} or Longitude {longitude} are not valid" + geolocation["status"] = "ERROR" + geolocation["error_message"] = f"Latitude {latitude} or Longitude {longitude} are not valid" return geolocation + def reverse_geolocate_open_street_map(longitude, latitude, args): """ OpenStreetMap reverse lookcation lookup @@ -87,24 +86,19 @@ def reverse_geolocate_open_street_map(longitude, latitude, args): """ # init geolocation = reverse_geolocate_init(longitude, latitude) - if geolocation['status'] == 'ERROR': + if geolocation["status"] == "ERROR": return geolocation # query format - query_format = 'jsonv2' + query_format = "jsonv2" # language to return (english) - language = 'en-US,en' + language = "en-US,en" # build query - base = 'https://nominatim.openstreetmap.org/reverse.php?' + base = "https://nominatim.openstreetmap.org/reverse.php?" # parameters - payload = { - 'format': query_format, - 'lat': latitude, - 'lon': longitude, - 'accept-language': language - } + payload = {"format": query_format, "lat": latitude, "lon": longitude, "accept-language": language} # if we have an email, add it here if args.email: - payload['email'] = args.email + payload["email"] = args.email url = f"{base}" # timeout in seconds timeout = 60 @@ -117,16 +111,16 @@ def reverse_geolocate_open_street_map(longitude, latitude, args): # type map # Country to Location and for each in order of priority type_map = { - 'CountryCode': ['country_code'], - 'Country': ['country'], - 'State': ['state'], - 'City': ['city', 'city_district', 'state_district'], - 'Location': ['county', 'town', 'suburb', 'hamlet', 'neighbourhood', 'road'] + "CountryCode": ["country_code"], + "Country": ["country"], + "State": ["state"], + "City": ["city", "city_district", "state_district"], + "Location": ["county", "town", "suburb", "hamlet", "neighbourhood", "road"], } # if not error - if 'error' not in response.json(): + if "error" not in response.json(): # get address block - addr = response.json()['address'] + addr = response.json()["address"] # loop for locations for loc_index, sub_index in type_map.items(): for index in sub_index: @@ -137,12 +131,13 @@ def reverse_geolocate_open_street_map(longitude, latitude, args): # if index in addr and not geolocation[loc_index]: # geolocation[loc_index] = addr[index] else: - geolocation['status'] = 'ERROR' - geolocation['error_message'] = response.json()['error'] + geolocation["status"] = "ERROR" + geolocation["error_message"] = response.json()["error"] print(f"Error in request: {geolocation['error']}") # return return geolocation + def reverse_geolocate_google(longitude, latitude, args): """ Google Maps reverse location lookup @@ -163,25 +158,21 @@ def reverse_geolocate_google(longitude, latitude, args): # init geolocation = reverse_geolocate_init(longitude, latitude) temp_geolocation = geolocation.copy() - if geolocation['status'] == 'ERROR': + if geolocation["status"] == "ERROR": return geolocation # sensor (why?) - sensor = 'false' + sensor = "false" # language, so we get ascii en back - language = 'en' + language = "en" # request to google # if a google api key is used, the request has to be via https - protocol = 'https://' if args.google_api_key else 'http://' + protocol = "https://" if args.google_api_key else "http://" base = "maps.googleapis.com/maps/api/geocode/json?" # build the base params - payload = { - 'latlng': f"{latitude},{longitude}", - 'language': language, - 'sensor': sensor - } + payload = {"latlng": f"{latitude},{longitude}", "language": language, "sensor": sensor} # if we have a google api key, add it here if args.google_api_key: - payload['key'] = args.google_api_key + payload["key"] = args.google_api_key # build the full url and send it to google url = f"{protocol}{base}" # timeout in seconds @@ -195,22 +186,22 @@ def reverse_geolocate_google(longitude, latitude, args): # type map # For automated return of correct data into set to return type_map = { - 'CountryCode': ['country'], - 'Country': ['country'], - 'State': ['administrative_area_level_1', 'administrative_area_level_2'], - 'City': ['locality', 'administrative_area_level_3'], - 'Location': ['sublocality_level_1', 'sublocality_level_2', 'route'], + "CountryCode": ["country"], + "Country": ["country"], + "State": ["administrative_area_level_1", "administrative_area_level_2"], + "City": ["locality", "administrative_area_level_3"], + "Location": ["sublocality_level_1", "sublocality_level_2", "route"], } # print("Error: {}".format(response.json()['status'])) - if response.json()['status'] == 'OK': + if response.json()["status"] == "OK": # first entry for type = premise - for entry in response.json()['results']: + for entry in response.json()["results"]: for sub_entry in entry: - if sub_entry == 'types' and ( - 'premise' in entry[sub_entry] or - 'route' in entry[sub_entry] or - 'street_address' in entry[sub_entry] or - 'sublocality' in entry[sub_entry] + if sub_entry == "types" and ( + "premise" in entry[sub_entry] + or "route" in entry[sub_entry] + or "street_address" in entry[sub_entry] + or "sublocality" in entry[sub_entry] ): # print("Entry {}: {}".format(sub_entry, entry[sub_entry])) # print("Address {}".format(entry['address_components'])) @@ -225,33 +216,33 @@ def reverse_geolocate_google(longitude, latitude, args): for loc_index, sub_index in type_map.items(): for index in sub_index: # this is an array, so we need to loop through each - for addr in entry['address_components']: + for addr in entry["address_components"]: # in types check that index is in there # and the location is not yet set # also check that entry is in LATIN based # NOTE: fallback if all are non LATIN? - if index in addr['types'] and not geolocation[loc_index]: + if index in addr["types"] and not geolocation[loc_index]: # for country code we need to use short name, # else we use long name - if loc_index == 'CountryCode': - if only_latin_chars(addr['short_name']): - geolocation[loc_index] = addr['short_name'] + if loc_index == "CountryCode": + if only_latin_chars(addr["short_name"]): + geolocation[loc_index] = addr["short_name"] elif not temp_geolocation[loc_index]: - temp_geolocation[loc_index] = addr['short_name'] + temp_geolocation[loc_index] = addr["short_name"] else: - if only_latin_chars(addr['long_name']): - geolocation[loc_index] = addr['long_name'] + if only_latin_chars(addr["long_name"]): + geolocation[loc_index] = addr["long_name"] elif not temp_geolocation[loc_index]: - temp_geolocation[loc_index] = addr['long_name'] + temp_geolocation[loc_index] = addr["long_name"] # check that all in geoloaction are filled and if not fille from temp_geolocation dictionary for loc_index in type_map: if not geolocation[loc_index] and temp_geolocation[loc_index]: geolocation[loc_index] = temp_geolocation[loc_index] # write OK status - geolocation['status'] = response.json()['status'] + geolocation["status"] = response.json()["status"] else: - geolocation['error_message'] = response.json()['error_message'] - geolocation['status'] = response.json()['status'] + geolocation["error_message"] = response.json()["error_message"] + geolocation["status"] = response.json()["status"] print(f"Error in request: {geolocation['status']} {geolocation['error_message']}") # return return geolocation diff --git a/bin/utils/string_helpers.py b/bin/utils/string_helpers.py index 6982ef9..06daf6a 100644 --- a/bin/utils/string_helpers.py +++ b/bin/utils/string_helpers.py @@ -8,7 +8,8 @@ import unicodedata # this is used by isLatin and onlyLatinChars cache_latin_letters = {} -def shorten_string(string, width, placeholder='..'): + +def shorten_string(string, width, placeholder=".."): """ shortens a string to width and attached placeholder @@ -26,7 +27,7 @@ def shorten_string(string, width, placeholder='..'): if string_length_cjk > width: # set current length and output string cur_len = 0 - out_string = '' + out_string = "" # loop through each character for char in str(string): # set the current length if we add the character @@ -39,6 +40,7 @@ def shorten_string(string, width, placeholder='..'): else: return str(string) + def string_len_cjk(string): """ because len on string in python counts characters but we need the width @@ -53,6 +55,7 @@ def string_len_cjk(string): # return string len including double count for double width characters return sum(1 + (unicodedata.east_asian_width(c) in "WF") for c in string) + def is_latin(uchr): """ checks via the unciode class if a character is LATIN char based @@ -71,7 +74,8 @@ def is_latin(uchr): return cache_latin_letters[uchr] except KeyError: # find LATIN in uncide type returned and set in dictionary for this character - return cache_latin_letters.setdefault(uchr, 'LATIN' in unicodedata.name(uchr)) + return cache_latin_letters.setdefault(uchr, "LATIN" in unicodedata.name(uchr)) + def only_latin_chars(unistr): """ @@ -88,6 +92,7 @@ def only_latin_chars(unistr): """ return all(is_latin(uchr) for uchr in unistr if uchr.isalpha()) + def format_len(string, length): """ in case of CJK characters we need to adjust the format length dynamically