diff --git a/bin/reverse_geolocate.py b/bin/reverse_geolocate.py new file mode 100755 index 0000000..21514e4 --- /dev/null +++ b/bin/reverse_geolocate.py @@ -0,0 +1,541 @@ +#!/opt/local/bin/python3 + +# AUTHOR : Clemens Schwaighofer +# DATE : 2018/2/20 +# LICENSE: GPLv3 +# DESC : Set the reverse Geo location (name) from Lat/Long data in XMP files in a lightroom catalogue +# * tries to get pre-set geo location from LR catalog +# * if not found tries to get data from Google +# * all data is translated into English with long vowl system (aka ou or oo is ō) +# MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/) + +import argparse +import os, sys, re +# Note XMPFiles does not work with sidecar files, need to read via XMPMeta +from libxmp import XMPMeta, XMPError, consts +import sqlite3 +import requests +from shutil import copyfile + +############################################################## +### FUNCTIONS +############################################################## + +### ARGPARSE HELPERS + +# call: writable_dir_folder +# checks if this is a writeable folder OR file +# AND it works on nargs * +class writable_dir_folder(argparse.Action): + def __call__(self, parser, namespace, values, option_string = None): + # we loop through list (this is because of nargs *) + for prospective_dir in values: + # if valid and writeable (dir or file) + if os.access(prospective_dir, os.W_OK): + # init new output array + out = [] + # if we have a previous list in the namespace extend current list + if type(namespace.xmp_sources) is list: + out.extend(namespace.xmp_sources) + # add the new dir to it + out.append(prospective_dir) + # and write that list back to the self.dest in the namespace + setattr(namespace, self.dest, out) + else: + raise argparse.ArgumentTypeError("writable_dir_folder: {0} is not a writable dir".format(prospective_dir)) + +# call: readable_dir +# custom define to check if it is a valid directory +class readable_dir(argparse.Action): + def __call__(self, parser, namespace, values, option_string = None): + prospective_dir=values + if not os.path.isdir(prospective_dir): + raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir)) + if os.access(prospective_dir, os.R_OK): + setattr(namespace,self.dest,prospective_dir) + else: + raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir)) + +### MAIN FUNCTIONS + +# METHOD: reverseGeolocate +# PARAMS: latitude, longitude +# RETURN: dict with location, city, state, country, country code +# if not fillable, entry is empty +# google images lookup base settings +# SAMPLE: http://maps.googleapis.com/maps/api/geocode/json?latlng=35.6671355,139.7419185&sensor=false +def reverseGeolocate(longitude, latitude): + # clean up long/lat + # they are stored with N/S/E/W if they come from an XMP + # format: Deg,Min.Sec[NSEW] + # NOTE: lat is N/S, long is E/W + # detect and convert + lat_long = longLatReg(longitude = longitude, latitude = latitude) + # sensor (why?) + sensor = 'false' + # request to google + base = "http://maps.googleapis.com/maps/api/geocode/json?" + params = "latlng={lat},{lon}&sensor={sensor}".format(lon = lat_long['longitude'], lat = lat_long['latitude'], sensor = sensor) + key = "&key={}".format(args.google_api_key) if args.google_api_key else '' + url = "{base}{params}{key}".format(base = base, params = params, key = key) + response = requests.get(url) + # sift through the response to get the best matching entry + geolocation = { + 'CountryCode': '', + 'Country': '', + 'State': '', + 'City': '', + 'Location': '' + } + # print("Google response: {} => TEXT: {} JSON: {}".format(response, response.text, response.json())) + # print("Error: {}".format(response.json()['status'])) + if response.json()['status'] is not 'INVALID_REQUEST': + # first entry for type = premise + for entry in response.json()['results']: + for sub_entry in entry: + if sub_entry == 'types' and 'premise' in entry[sub_entry]: + # print("Entry {}: {}".format(sub_entry, entry[sub_entry])) + # print("Address {}".format(entry['address_components'])) + # type + # -> country, + # -> administrative_area (1), + # -> locality, + # -> sublocality (_level_1 or 2 first found) + for addr in entry['address_components']: + # print("Addr: {}".format(addr)) + # country code + country + if 'country' in addr['types'] and not geolocation['CountryCode']: + geolocation['CountryCode'] = addr['short_name'] + geolocation['Country'] = addr['long_name'] + # print("Code: {}, Country: {}".format(country_code, country)) + # state + if 'administrative_area_level_1' in addr['types'] and not geolocation['State']: + geolocation['State'] = addr['long_name'] + # print("State (1): {}".format(state)) + if 'administrative_area_level_2' in addr['types'] and not geolocation['State']: + geolocation['State'] = addr['long_name'] + # print("State (2): {}".format(state)) + # city + if 'locality' in addr['types'] and not geolocation['City']: + geolocation['City'] = addr['long_name'] + # print("City: {}".format(city)) + # location + if 'sublocality_level_1' in addr['types'] and not geolocation['Location']: + geolocation['Location'] = addr['long_name'] + # print("Location (1): {}".format(location)) + if 'sublocality_level_2' in addr['types'] and not geolocation['Location']: + geolocation['Location'] = addr['long_name'] + # print("Location (1): {}".format(location)) + # if all failes try route + if 'route' in addr['types'] and not geolocation['Location']: + geolocation['Location'] = addr['long_name'] + # print("Location (R): {}".format(location)) + else: + print("Error in request: {}".format(response.json()['error_message'])) + # return + return geolocation + +# METHOD: convertLatLongToDMS +# PARAMS: latLong in (-)N.N format, lat or long flag (else we can't set N/S) +# RETURN: Deg,Min.Sec(NESW) format +# DESC : convert the LR format of N.N to the Exif GPS format +def convertLatLongToDMS(lat_long, is_latitude = False, is_longitude = False): + # minus part before . and then multiply rest by 60 + degree = int(abs(lat_long)) + minutes = round((float(abs(lat_long)) - int(abs(lat_long))) * 60, 10) + if is_latitude == True: + direction = 'S' if int(lat_long) < 0 else 'N' + elif is_longitude == True: + direction = 'W' if int(lat_long) < 0 else 'E' + else: + direction = '(INVALID)' + return "{},{}{}".format(degree, minutes, direction) + +# wrapper functions for Long/Lat calls +def convertLatToDMS(lat_long): + return convertLatLongToDMS(lat_long, is_latitude = True) +def convertLongToDMS(lat_long): + return convertLatLongToDMS(lat_long, is_longitude = True) + +# METHOD: longLatReg +# PARAMS: latitude, longitude +# RETURN: dict with converted lat/long +# DESC : converts the XMP/EXIF formatted GPS Long/Lat coordinates +# from the , to the normal float +# number used in google/lr internal +def longLatReg(longitude, latitude): + # regex + latlong_re = re.compile('^(\d+),(\d+\.\d+)([NESW]{1})$') + # dict for loop + lat_long = { + 'longitude': longitude, + 'latitude': latitude + } + for element in lat_long: + # match if it is exif GPS format + m = latlong_re.match(lat_long[element]) + if m is not None: + # convert from Degree, Min.Sec into float format + lat_long[element] = float(m.group(1)) + (float(m.group(2)) / 60) + # if S or W => inverse to negative + if m.group(3) == 'S' or m.group(3) == 'W': + lat_long[element] *= -1 + return lat_long + +# METHOD: checkOverwrite +# PARAMS: data: value field, key: XMP key, field_controls: array from args +# RETURN: true/false +# DESC : checks with field control flags if given data for key should be written +# 1) data is not set +# 2) data is set or not and field_control: overwrite only set +# 3) data for key is not set, but only for key matches field_control +# 4) data for key is set or not, but only for key matches field_control and overwrite is set +def checkOverwrite(data, key, field_controls): + status = False + # init field controls for empty + if not field_controls: + field_controls = [] + if not data and (len(field_controls) == 0 or ('overwrite' in field_controls and len(field_controls) == 1)): + status = True + elif not data and key.lower() in field_controls: + status = True + elif data and 'overwrite' in field_controls and len(field_controls) == 1: + status = True + elif data and key.lower() in field_controls and 'overwrite' in field_controls: + status = True + if args.debug: + print("Data set: {}, Key: {}, Field Controls len: {}, Overwrite: {}, Key in Field Controls: {}, OVERWRITE: {}".format( + 'YES' if data else 'NO', + key.lower(), + len(field_controls), + 'OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE', + 'KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING', + status + )) + return status + +############################################################## +### ARGUMENT PARSNING +############################################################## + +parser = argparse.ArgumentParser( + description = 'Reverse Geoencoding based on set Latitude/Longitude data in XMP files', + # formatter_class=argparse.RawDescriptionHelpFormatter, + epilog = 'Sample: (todo)' +) + +# xmp folder (or folders), or file (or files) +# note that the target directory or file needs to be writeable +parser.add_argument('-x', '--xmp', + required = True, + nargs = '*', + action = writable_dir_folder, + dest = 'xmp_sources', + metavar = 'XMP SOURCE FOLDER', + help = 'The source folder or folders with the XMP files that need reverse geo encoding to be set. Single XMP files can be given here' +) + +# LR database (base folder) +# get .lrcat file in this folder +parser.add_argument('-l', '--lightroom', + # required = True, + action = readable_dir, + dest = 'lightroom_folder', + metavar = 'LIGHTROOM FOLDER', + help = 'Lightroom catalogue base folder' +) + +# set behaviour override +# FLAG: default: only set not filled +# other: overwrite all or overwrite if one is missing, overwrite specifc field (as defined below) +# fields: Location, City, State, Country, CountryCode +parser.add_argument('-f', '--field', + action = 'append', + type = str.lower, # make it lowercase for check + choices = ['overwrite', 'location', 'city', 'state', 'country', 'countrycode'], + dest = 'field_controls', + metavar = 'FIELD CONTROLS', + help = 'On default only set fields that are not set yet. Options are: Overwrite (write all new), Location, City, State, Country, CountryCode. Multiple can be given. If with overwrite the field will be overwritten if already set, else it will be always skipped' +) + +# Google Maps API key to overcome restrictions +parser.add_argument('-g', '--google', + dest = 'google_api_key', + metavar = 'GOOGLE_API_KEY', + help = 'Set a Google API Maps key to overcome the default lookup limitations' +) + +# verbose args for more detailed output +parser.add_argument('-v', '--verbose', + action = 'count', + dest = 'verbose', + help = 'Set verbose output level' +) + +# debug flag +parser.add_argument('--debug', action = 'store_true', dest = 'debug', help = 'Set detailed debug output') +# test flag +parser.add_argument('--test', action = 'store_true', dest = 'test', help = 'Do not write data back to file') + +# read in the argumens +args = parser.parse_args() + +############################################################## +### MAIN CODE +############################################################## + +if args.debug: + print("### ARGUMENT VARS: X: {}, L: {}, F: {}, G: {}, V: {}, D: {}, T: {}".format(args.xmp_sources, args.lightroom_folder, args.field_controls, args.google_api_key, args.verbose, args.debug, args.test)) + +# The XMP fields const lookup values +# XML/XMP +# READ: +# exif:GPSLatitude +# exif:GPSLongitude +# READ for if filled +# Iptc4xmpCore:Location +# photoshop:City +# photoshop:State +# photoshop:Country +# Iptc4xmpCore:CountryCode +xmp_fields = { + 'GPSLatitude': consts.XMP_NS_EXIF, # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format + 'GPSLongitude': consts.XMP_NS_EXIF, + 'Location': consts.XMP_NS_IPTCCore, + 'City': consts.XMP_NS_Photoshop, + 'State': consts.XMP_NS_Photoshop, + 'Country': consts.XMP_NS_Photoshop, + 'CountryCode': consts.XMP_NS_IPTCCore +} +# non lat/long fields (for loc loops) +data_set_loc = ('Location', 'City', 'State', 'Country', 'CountryCode') +# one xmp data set +data_set = { + 'GPSLatitude': '', + 'GPSLongitude': '', + 'Location': '', + 'City': '', + 'State': '', + 'Country': '', + 'CountryCode': '' +} +# original set for compare (is constant unchanged) +data_set_original = {} +# cache set to avoid double lookups for identical Lat/Ling +data_cache = {} +# work files, all files + folders we need to work on +work_files = [] +# error flag +error = False +# use lightroom +use_lightroom = False +# cursors & query +query = '' +cur = '' +# count variables +count = { + 'all': 0, + 'google': 0, + 'cache': 0, + 'lightroom': 0, + 'changed': 0, + 'failed': 0, + 'skipped': 0, + 'not_found': 0 +} + +# do lightroom stuff only if we have the lightroom folder +if args.lightroom_folder: + # query string for lightroom DB check + query = 'SELECT Adobe_images.id_local, AgLibraryFile.baseName, AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, AgHarvestedIptcMetadata.locationDataOrigination, AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, AgInternedIptcIsoCountryCode.value as CountryCode ' + query += 'FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, AgLibraryRootFolder, Adobe_images ' + query += 'LEFT JOIN AgHarvestedIptcMetadata ON Adobe_images.id_local = AgHarvestedIptcMetadata.image ' + query += 'LEFT JOIN AgInternedIptcLocation ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local ' + query += 'LEFT JOIN AgInternedIptcCity ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local ' + query += 'LEFT JOIN AgInternedIptcState ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local ' + query += 'LEFT JOIN AgInternedIptcCountry ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local ' + query += 'LEFT JOIN AgInternedIptcIsoCountryCode ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local ' + query += 'WHERE Adobe_images.rootFile = AgLibraryFile.id_local AND Adobe_images.id_local = AgHarvestedExifMetadata.image AND AgLibraryFile.folder = AgLibraryFolder.id_local AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local ' + query += 'AND AgLibraryFile.baseName = ?' + + # connect to LR database for reading + # open the folder and look for the first lrcat file in there + for file in os.listdir(args.lightroom_folder): + if file.endswith('.lrcat'): + lightroom_database = os.path.join(args.lightroom_folder, file) + lrdb = sqlite3.connect(lightroom_database) + if not lightroom_database or not lrdb: + print("(!) We could not find a lrcat file in the given lightroom folder or DB connection failed: {}".format(args.lightroom_folder)) + # flag for end + error = True + else: + # set row so we can access each element by the name + lrdb.row_factory = sqlite3.Row + # set cursor + cur = lrdb.cursor() + # flag that we have Lightroom DB + use_lightroom = True + +# on error exit here +if error: + sys.exit(1) + +# init the XML meta for handling +xmp = XMPMeta() + +# loop through the xmp_sources (folder or files) and read in the XMP data for LAT/LONG, other data +for xmp_file_source in args.xmp_sources: + # if folder, open and loop + # NOTE: we do check for folders in there, if there are we recourse traverse them + if os.path.isdir(xmp_file_source): + # open folder and look for any .xmp files and push them into holding array + # if there are folders, dive into them + # or glob glob all .xmp files + directory + for root, dirs, files in os.walk(xmp_file_source): + for file in files: + if file.endswith(".xmp"): + if "{}/{}".format(root, file) not in work_files: + work_files.append("{}/{}".format(root, file)) + count['all'] += 1 + else: + if xmp_file_source not in work_files: + work_files.append(xmp_file_source) + count['all'] += 1 + +if args.debug: + print("### Work Files {}".format(work_files)) +# now we just loop through each file and work on them +for xmp_file in work_files: + print("---> {}".format(xmp_file)) + #### ACTION FLAGs + write_file = False + lightroom_data_ok = True + #### LIGHTROOM DB READING + # read in data from DB if we uave lightroom folder + if use_lightroom: + # get the base file name, we need this for lightroom + xmp_file_basename = os.path.splitext(os.path.split(xmp_file)[1])[0] + # try to get this file name from the DB + # NOTE: We should search here with folder too in case for double same name entries + cur.execute(query, [xmp_file_basename]) + # get the row data + lrdb_row = cur.fetchone() + # Notify if we couldn't find one + if not lrdb_row: + print("(!) Could not get data from Lightroom DB") + lightroom_data_ok = False + count['not_found'] += 1 + if args.debug and lrdb_row: + print("### LightroomDB: {} / {}".format(tuple(lrdb_row), lrdb_row.keys())) + + #### XMP FILE READING + # open file & read all into buffer + with open(xmp_file, 'r') as fptr: + strbuffer = fptr.read() + # read fields from the XMP file and store in hash + xmp.parse_from_str(strbuffer) + for xmp_field in xmp_fields: + data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field) + if args.debug: + print("### => XMP: {}:{} => {}".format(xmp_fields[xmp_field], xmp_field, data_set[xmp_field])) + # create a duplicate copy for later checking if something changed + data_set_original = data_set.copy() + + # check if LR exists and use this to compare to XMP data + # is LR GPS and no XMP GPS => use LR and set XMP + # same for location names + # if missing in XMP but in LR -> set in XMP + # if missing in both do lookup in Google + if use_lightroom and lightroom_data_ok: + # check lat/long separate + if lrdb_row['gpsLatitude'] and not data_set['GPSLatitude']: + # we need to convert to the Degree,Min.sec[NSEW] format + data_set['GPSLatitude'] = convertLatToDMS(lrdb_row['gpsLatitude']) + if lrdb_row['gpsLongitude'] and not data_set['GPSLongitude']: + data_set['GPSLongitude'] = convertLongToDMS(lrdb_row['gpsLongitude']) + # now check Location, City, etc + for loc in data_set_loc: + # overwrite original set (read from XMP) with LR data if original data is missing + if lrdb_row[loc] and not data_set[loc]: + data_set[loc] = lrdb_row[loc] + if args.debug: + print("### -> LR: {} => {}".format(loc, lrdb_row[loc])) + # base set done, now check if there is anything unset in the data_set, if yes do a lookup in google + # run this through the overwrite checker to get unset if we have a forced overwrite + has_unset = False + for loc in data_set_loc: + if checkOverwrite(data_set[loc], loc, args.field_controls): + has_unset = True + if has_unset: + # check if lat/long is in cache + cache_key = '{}.#.{}'.format(data_set['GPSLatitude'], data_set['GPSLongitude']) + if args.debug: + print("### *** CACHE: {}: {}".format(cache_key, 'NO' if cache_key not in data_cache else 'YES')) + if cache_key not in data_cache: + # get location from google + google_location = reverseGeolocate(latitude = data_set['GPSLatitude'], longitude = data_set['GPSLongitude']) + # cache data with Lat/Long + data_cache[cache_key] = google_location + else: + # load location from cache + google_location = data_cache[cache_key] + count['cache'] += 1 + # overwrite sets (note options check here) + if args.debug: + print("### Google Location: {}".format(google_location)) + if google_location['Country']: + for loc in data_set_loc: + # only write to XMP if overwrite check passes + if checkOverwrite(data_set[loc], loc, args.field_controls): + xmp.set_property(xmp_fields[loc], loc, google_location[loc]) + write_file = True + if write_file: + count['google'] += 1 + else: + print("(!) Could not geo loaction for: {}".format(xmp_file)) + count['failed'] += 1 + else: + if args.debug: + print("Lightroom data use: {}, Lightroom data ok: {}".format(use_lightroom, lightroom_data_ok)) + # check if the data_set differs from the original (LR db load) + # if yes write, else skip + if use_lightroom and lightroom_data_ok: + for key in data_set: + # if not the same (to original data) and passes overwrite check + if data_set[key] != data_set_original[key] and checkOverwrite(data_set[key], key, args.field_controls): + xmp.set_property(xmp_fields[key], key, data_set[key]) + write_file = True; + if write_file: + count['lightroom'] += 1 + # if we have the write flag set, write data + if write_file: + if not args.test: + # use copyfile to create a backup copy + copyfile(xmp_file, "{}.BK.{}".format(os.path.splitext(xmp_file)[0], os.path.splitext(xmp_file)[1])) + # write back to riginal file + with open(xmp_file, 'w') as fptr: + fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True)) + else: + print("[TEST] Would write {} to file {}".format(data_set, xmp_file)) + count['changed'] += 1 + else: + print(". Data exists: SKIP") + count['skipped'] += 1 + +# close DB connection +lrdb.close() + +# end stats +print("{}".format('=' * 30)) +print("Found XMP Files : {:,}".format(count['all'])) +print("Updated : {:,}".format(count['changed'])) +print("Skipped : {:,}".format(count['skipped'])) +print("New GeoLocation Google: {:,}".format(count['google'])) +print("GeoLocation from Cache: {:,}".format(count['cache'])) +print("Failed for Reverse Geo: {:,}".format(count['failed'])) +if use_lightroom: + print("Geo from Lightroom : {:,}".format(count['lightroom'])) + print("No Lightroom data : {:,}".format(count['not_found'])) + + +# __END__ \ No newline at end of file