1339 lines
58 KiB
Python
Executable File
1339 lines
58 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# AUTHOR : Clemens Schwaighofer
|
|
# DATE : 2018/2/20
|
|
# LICENSE: GPLv3
|
|
# DESC : Set the reverse Geo location (name) from Lat/Long data in XMP files in a lightroom catalogue
|
|
# * tries to get pre-set geo location from LR catalog
|
|
# * if not found tries to get data from Google
|
|
# * all data is translated into English with long vowl system (aka ou or oo is ō)
|
|
# MUST HAVE: Python XMP Toolkit (http://python-xmp-toolkit.readthedocs.io/)
|
|
|
|
import argparse
|
|
import sqlite3
|
|
import requests
|
|
import configparser
|
|
import unicodedata
|
|
# import textwrap
|
|
import glob
|
|
import os
|
|
import sys
|
|
import re
|
|
# Note XMPFiles does not work with sidecar files, need to read via XMPMeta
|
|
from libxmp import XMPMeta, consts
|
|
from shutil import copyfile, get_terminal_size
|
|
from math import ceil, radians, sin, cos, atan2, sqrt
|
|
|
|
##############################################################
|
|
# FUNCTIONS
|
|
##############################################################
|
|
|
|
# this is for looking up if string is non latin letters
|
|
# this is used by isLatin and onlyLatinChars
|
|
cache_latin_letters = {}
|
|
|
|
|
|
# ARGPARSE HELPERS
|
|
|
|
# call: writable_dir_folder
|
|
# checks if this is a writeable folder OR file
|
|
# AND it works on nargs *
|
|
class writable_dir_folder(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
# we loop through list (this is because of nargs *)
|
|
for prospective_dir in values:
|
|
# if valid and writeable (dir or file)
|
|
if os.access(prospective_dir, os.W_OK):
|
|
# init new output array
|
|
out = []
|
|
# if we have a previous list in the namespace extend current list
|
|
if type(getattr(namespace, self.dest)) is list:
|
|
out.extend(getattr(namespace, self.dest))
|
|
# add the new dir to it
|
|
out.append(prospective_dir)
|
|
# and write that list back to the self.dest in the namespace
|
|
setattr(namespace, self.dest, out)
|
|
else:
|
|
raise argparse.ArgumentTypeError("writable_dir_folder: {0} is not a writable dir".format(prospective_dir))
|
|
|
|
|
|
# call: readable_dir
|
|
# custom define to check if it is a valid directory
|
|
class readable_dir(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
prospective_dir = values
|
|
if not os.path.isdir(prospective_dir):
|
|
raise argparse.ArgumentTypeError("readable_dir:{0} is not a valid path".format(prospective_dir))
|
|
if os.access(prospective_dir, os.R_OK):
|
|
setattr(namespace, self.dest, prospective_dir)
|
|
else:
|
|
raise argparse.ArgumentTypeError("readable_dir:{0} is not a readable dir".format(prospective_dir))
|
|
|
|
|
|
# check distance values are valid
|
|
class distance_values(argparse.Action):
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
m = re.match('^(\d+)\s?(m|km)$', values)
|
|
if m:
|
|
# convert to int in meters
|
|
values = int(m.group(1))
|
|
if m.group(2) == 'km':
|
|
values *= 1000
|
|
setattr(namespace, self.dest, values)
|
|
else:
|
|
raise argparse.ArgumentTypeError("distance_values:{0} is not a valid argument".format(values))
|
|
|
|
|
|
# MAIN FUNCTIONS
|
|
|
|
# METHOD: reverseGeolocate
|
|
# PARAMS: latitude, longitude, map search target (google or openstreetmap)
|
|
# RETURN: dict with all data (see below)
|
|
# DESC : wrapper to call to either the google or openstreetmap
|
|
def reverseGeolocate(longitude, latitude, map_type):
|
|
# clean up long/lat
|
|
# they are stored with N/S/E/W if they come from an XMP
|
|
# format: Deg,Min.Sec[NSEW]
|
|
# NOTE: lat is N/S, long is E/W
|
|
# detect and convert
|
|
lat_long = longLatReg(longitude=longitude, latitude=latitude)
|
|
# which service to use
|
|
if map_type == 'google':
|
|
return reverseGeolocateGoogle(lat_long['longitude'], lat_long['latitude'])
|
|
elif map_type == 'openstreetmap':
|
|
return reverseGeolocateOpenStreetMap(lat_long['longitude'], lat_long['latitude'])
|
|
else:
|
|
return {
|
|
'Country': '',
|
|
'status': 'ERROR',
|
|
'error': 'Map type not valid'
|
|
}
|
|
|
|
|
|
# METHOD: reverseGeolocateInit
|
|
# PARAMS: longitude, latitude
|
|
# RETURN: empty geolocation dictionary, or error flag if lat/long is not valid
|
|
# DESC : inits the dictionary for return, and checks the lat/long on valid
|
|
# returns geolocation dict with status = 'ERROR' if an error occurded
|
|
def reverseGeolocateInit(longitude, latitude):
|
|
# basic dict format
|
|
geolocation = {
|
|
'CountryCode': '',
|
|
'Country': '',
|
|
'State': '',
|
|
'City': '',
|
|
'Location': '',
|
|
# below for error reports
|
|
'status': '',
|
|
'error_message': ''
|
|
}
|
|
# error if long/lat is not valid
|
|
latlong_re = re.compile('^\d+\.\d+$')
|
|
if not latlong_re.match(str(longitude)) or not latlong_re.match(str(latitude)):
|
|
geolocation['status'] = 'ERROR'
|
|
geolocation['error_message'] = 'Latitude {} or Longitude {} are not valid'.format(latitude, longitude)
|
|
return geolocation
|
|
|
|
|
|
# METHOD: reverseGeolocateOpenStreetMap
|
|
# PARAMS: latitude, longitude
|
|
# RETURN: OpenStreetMap reverse lookcation lookup
|
|
# dict with locaiton, city, state, country, country code
|
|
# if not fillable, entry is empty
|
|
# SAMPLE: https://nominatim.openstreetmap.org/reverse.php?format=jsonv2&lat=<latitude>&lon=<longitude>&zoom=21&accept-languge=en-US,en&
|
|
def reverseGeolocateOpenStreetMap(longitude, latitude):
|
|
# init
|
|
geolocation = reverseGeolocateInit(longitude, latitude)
|
|
if geolocation['status'] == 'ERROR':
|
|
return geolocation
|
|
# query format
|
|
query_format = 'jsonv2'
|
|
# language to return (english)
|
|
language = 'en-US,en'
|
|
# build query
|
|
base = 'https://nominatim.openstreetmap.org/reverse.php?'
|
|
# parameters
|
|
payload = {
|
|
'format': query_format,
|
|
'lat': latitude,
|
|
'lon': longitude,
|
|
'accept-language': language
|
|
}
|
|
# if we have an email, add it here
|
|
if args.email:
|
|
payload['email'] = args.email
|
|
url = "{base}".format(base=base)
|
|
response = requests.get(url, params=payload)
|
|
# debug output
|
|
if args.debug:
|
|
print("OpenStreetMap search for Lat: {}, Long: {}".format(latitude, longitude))
|
|
if args.debug and args.verbose >= 1:
|
|
print("OpenStreetMap response: {} => JSON: {}".format(response, response.json()))
|
|
# type map
|
|
# Country to Location and for each in order of priority
|
|
type_map = {
|
|
'CountryCode': ['country_code'],
|
|
'Country': ['country'],
|
|
'State': ['state'],
|
|
'City': ['city', 'city_district', 'state_district'],
|
|
'Location': ['county', 'town', 'suburb', 'hamlet', 'neighbourhood', 'road']
|
|
}
|
|
# if not error
|
|
if 'error' not in response.json():
|
|
# get address block
|
|
addr = response.json()['address']
|
|
# loop for locations
|
|
for loc_index in type_map:
|
|
for index in type_map[loc_index]:
|
|
if index in addr and not geolocation[loc_index]:
|
|
geolocation[loc_index] = addr[index]
|
|
else:
|
|
geolocation['status'] = 'ERROR'
|
|
geolocation['error_message'] = response.json()['error']
|
|
print("Error in request: {}".format(geolocation['error']))
|
|
# return
|
|
return geolocation
|
|
|
|
|
|
# METHOD: reverseGeolocateGoogle
|
|
# PARAMS: latitude, longitude
|
|
# RETURN: Google Maps reverse location lookup
|
|
# dict with location, city, state, country, country code
|
|
# if not fillable, entry is empty
|
|
# SAMPLE: http://maps.googleapis.com/maps/api/geocode/json?latlng=<latitude>,<longitude>&language=<lang>&sensor=false&key=<api key>
|
|
def reverseGeolocateGoogle(longitude, latitude):
|
|
# init
|
|
geolocation = reverseGeolocateInit(longitude, latitude)
|
|
temp_geolocation = geolocation.copy()
|
|
if geolocation['status'] == 'ERROR':
|
|
return geolocation
|
|
# sensor (why?)
|
|
sensor = 'false'
|
|
# language, so we get ascii en back
|
|
language = 'en'
|
|
# request to google
|
|
# if a google api key is used, the request has to be via https
|
|
protocol = 'https://' if args.google_api_key else 'http://'
|
|
base = "maps.googleapis.com/maps/api/geocode/json?"
|
|
# build the base params
|
|
payload = {
|
|
'latlng': '{lat},{lon}'.format(lon=longitude, lat=latitude),
|
|
'language': language,
|
|
'sensor': sensor
|
|
}
|
|
# if we have a google api key, add it here
|
|
if args.google_api_key:
|
|
payload['key'] = args.google_api_key
|
|
# build the full url and send it to google
|
|
url = "{protocol}{base}".format(protocol=protocol, base=base)
|
|
response = requests.get(url, params=payload)
|
|
# debug output
|
|
if args.debug:
|
|
print("Google search for Lat: {}, Long: {} with {}".format(longitude, latitude, response.url))
|
|
if args.debug and args.verbose >= 1:
|
|
print("Google response: {} => JSON: {}".format(response, response.json()))
|
|
# type map
|
|
# For automated return of correct data into set to return
|
|
type_map = {
|
|
'CountryCode': ['country'],
|
|
'Country': ['country'],
|
|
'State': ['administrative_area_level_1', 'administrative_area_level_2'],
|
|
'City': ['locality', 'administrative_area_level_3'],
|
|
'Location': ['sublocality_level_1', 'sublocality_level_2', 'route'],
|
|
}
|
|
# print("Error: {}".format(response.json()['status']))
|
|
if response.json()['status'] == 'OK':
|
|
# first entry for type = premise
|
|
for entry in response.json()['results']:
|
|
for sub_entry in entry:
|
|
if sub_entry == 'types' and (
|
|
'premise' in entry[sub_entry] or
|
|
'route' in entry[sub_entry] or
|
|
'street_address' in entry[sub_entry] or
|
|
'sublocality' in entry[sub_entry]
|
|
):
|
|
# print("Entry {}: {}".format(sub_entry, entry[sub_entry]))
|
|
# print("Address {}".format(entry['address_components']))
|
|
# type
|
|
# -> country,
|
|
# -> administrative_area (1, 2),
|
|
# -> locality,
|
|
# -> sublocality (_level_1 or 2 first found, then route)
|
|
# so we get the data in the correct order
|
|
for loc_index in type_map:
|
|
for index in type_map[loc_index]:
|
|
# this is an array, so we need to loop through each
|
|
for addr in entry['address_components']:
|
|
# in types check that index is in there and the location is not yet set
|
|
# also check that entry is in LATIN based
|
|
# NOTE: fallback if all are non LATIN?
|
|
if index in addr['types'] and not geolocation[loc_index]:
|
|
# for country code we need to use short name, else we use long name
|
|
if loc_index == 'CountryCode':
|
|
if onlyLatinChars(addr['short_name']):
|
|
geolocation[loc_index] = addr['short_name']
|
|
elif not temp_geolocation[loc_index]:
|
|
temp_geolocation[loc_index] = addr['short_name']
|
|
else:
|
|
if onlyLatinChars(addr['long_name']):
|
|
geolocation[loc_index] = addr['long_name']
|
|
elif not temp_geolocation[loc_index]:
|
|
temp_geolocation[loc_index] = addr['long_name']
|
|
# check that all in geoloaction are filled and if not fille from temp_geolocation dictionary
|
|
for loc_index in type_map:
|
|
if not geolocation[loc_index] and temp_geolocation[loc_index]:
|
|
geolocation[loc_index] = temp_geolocation[loc_index]
|
|
# write OK status
|
|
geolocation['status'] = response.json()['status']
|
|
else:
|
|
geolocation['error_message'] = response.json()['error_message']
|
|
geolocation['status'] = response.json()['status']
|
|
print("Error in request: {} {}".format(geolocation['status'], geolocation['error_message']))
|
|
# return
|
|
return geolocation
|
|
|
|
|
|
# METHOD: convertLatLongToDMS
|
|
# PARAMS: latLong in (-)N.N format, lat or long flag (else we can't set N/S)
|
|
# RETURN: Deg,Min.Sec(NESW) format
|
|
# DESC : convert the LR format of N.N to the Exif GPS format
|
|
def convertLatLongToDMS(lat_long, is_latitude=False, is_longitude=False):
|
|
# minus part before . and then multiply rest by 60
|
|
degree = int(abs(lat_long))
|
|
minutes = round((float(abs(lat_long)) - int(abs(lat_long))) * 60, 10)
|
|
if is_latitude is True:
|
|
direction = 'S' if int(lat_long) < 0 else 'N'
|
|
elif is_longitude is True:
|
|
direction = 'W' if int(lat_long) < 0 else 'E'
|
|
else:
|
|
direction = '(INVALID)'
|
|
return "{},{}{}".format(degree, minutes, direction)
|
|
|
|
|
|
# wrapper functions for Long/Lat calls: latitude
|
|
def convertLatToDMS(lat_long):
|
|
return convertLatLongToDMS(lat_long, is_latitude=True)
|
|
|
|
|
|
# wrapper for Long/Lat call: longitute
|
|
def convertLongToDMS(lat_long):
|
|
return convertLatLongToDMS(lat_long, is_longitude=True)
|
|
|
|
|
|
# METHOD: longLatReg
|
|
# PARAMS: latitude in (n,n.nNSEW format), longitude
|
|
# RETURN: dict with converted lat/long
|
|
# DESC : converts the XMP/EXIF formatted GPS Long/Lat coordinates
|
|
# from the <Degree>,<Minute.Second><NSEW> to the normal float
|
|
# number used in google/lr internal
|
|
def longLatReg(longitude, latitude):
|
|
# regex
|
|
latlong_re = re.compile('^(\d+),(\d+\.\d+)([NESW]{1})$')
|
|
# dict for loop
|
|
lat_long = {
|
|
'longitude': longitude,
|
|
'latitude': latitude
|
|
}
|
|
for element in lat_long:
|
|
# match if it is exif GPS format
|
|
m = latlong_re.match(lat_long[element])
|
|
if m is not None:
|
|
# convert from Degree, Min.Sec into float format
|
|
lat_long[element] = float(m.group(1)) + (float(m.group(2)) / 60)
|
|
# if S or W => inverse to negative
|
|
if m.group(3) == 'S' or m.group(3) == 'W':
|
|
lat_long[element] *= -1
|
|
return lat_long
|
|
|
|
|
|
# wrapper calls for DMS to Lat/Long: latitude
|
|
def convertDMStoLat(lat_long):
|
|
return longLatReg('0,0.0N', lat_long)['latitude']
|
|
|
|
|
|
# # wrapper calls for DMS to Lat/Long: longitude
|
|
def convertDMStoLong(lat_long):
|
|
return longLatReg(lat_long, '0,0.0N')['longitude']
|
|
|
|
|
|
# METHOD: getDistance
|
|
# PARAMS: from long/lat, to long_lat
|
|
# RETURN: distance in meters
|
|
# DESC : calculates the difference between two coordinates
|
|
def getDistance(from_longitude, from_latitude, to_longitude, to_latitude):
|
|
# earth radius in meters
|
|
earth_radius = 6378137.0
|
|
# convert all from radians with pre convert DMS to long and to float
|
|
from_longitude = radians(float(convertDMStoLong(from_longitude)))
|
|
from_latitude = radians(float(convertDMStoLat(from_latitude)))
|
|
to_longitude = radians(float(convertDMStoLong(to_longitude)))
|
|
to_latitude = radians(float(convertDMStoLat(to_latitude)))
|
|
# distance from - to
|
|
distance_longitude = from_longitude - to_longitude
|
|
distance_latitude = from_latitude - to_latitude
|
|
# main distance calculation
|
|
distance = sin(distance_latitude / 2)**2 + cos(from_latitude) * cos(to_latitude) * sin(distance_longitude / 2)**2
|
|
distance = 2 * atan2(sqrt(distance), sqrt(1 - distance))
|
|
return earth_radius * distance
|
|
|
|
|
|
# METHOD: checkOverwrite
|
|
# PARAMS: data: value field, key: XMP key, field_controls: array from args
|
|
# RETURN: true/false
|
|
# DESC : checks with field control flags if given data for key should be written
|
|
# 1) data is not set
|
|
# 2) data is set or not and field_control: overwrite only set
|
|
# 3) data for key is not set, but only for key matches field_control
|
|
# 4) data for key is set or not, but only for key matches field_control and overwrite is set
|
|
def checkOverwrite(data, key, field_controls):
|
|
status = False
|
|
# init field controls for empty
|
|
if not field_controls:
|
|
field_controls = []
|
|
if not data and (len(field_controls) == 0 or ('overwrite' in field_controls and len(field_controls) == 1)):
|
|
status = True
|
|
elif not data and key.lower() in field_controls:
|
|
status = True
|
|
elif data and 'overwrite' in field_controls and len(field_controls) == 1:
|
|
status = True
|
|
elif data and key.lower() in field_controls and 'overwrite' in field_controls:
|
|
status = True
|
|
if args.debug:
|
|
print("Data set: {data_set}, Key: {key_lower}, Field Controls len: {field_count}, Overwrite: {overwrite_flag}, Key in Field Controls: {key_ok}, OVERWRITE: {do_overwrite}".format(
|
|
data_set='YES' if data else 'NO',
|
|
key_lower=key.lower(),
|
|
field_count=len(field_controls),
|
|
overwrite_flag='OVERWRITE' if 'overwrite' in field_controls else 'NOT OVERWRITE',
|
|
key_ok='KEY OK' if key.lower() in field_controls else 'KEY NOT MATCHING',
|
|
do_overwrite=status
|
|
))
|
|
return status
|
|
|
|
|
|
# METHOD: shortenPath
|
|
# PARAMS: path = string, length = int, file_only = true/false, path_only = true/false
|
|
# RETURN: shortend path with ... in front
|
|
# DESC : shortes a path from the left so it fits into lenght
|
|
# if file only is set to true, it will split the file, if path only is set, only the path
|
|
def shortenPath(path, length=30, file_only=False, path_only=False):
|
|
length = length - 3
|
|
# I assume the XMP file name has no CJK characters inside, so I strip out the path
|
|
# The reason is that if there are CJK characters inside it will screw up the formatting
|
|
if file_only:
|
|
path = os.path.split(path)[1]
|
|
if path_only:
|
|
path = os.path.split(path)[0]
|
|
if stringLenCJK(path) > length:
|
|
path = "{} {}".format("..", path[stringLenCJK(path) - length:])
|
|
return path
|
|
|
|
|
|
# METHOD: shortenString
|
|
# PARAMS: string, shorten width, override shorten placeholder
|
|
# RETURN: shortened string
|
|
# DESC : shortens a string to width and attached placeholder
|
|
def shortenString(string, width, placeholder='..'):
|
|
# get the length with double byte charactes
|
|
string_len_cjk = stringLenCJK(str(string))
|
|
# if double byte width is too big
|
|
if string_len_cjk > width:
|
|
# set current length and output string
|
|
cur_len = 0
|
|
out_string = ''
|
|
# loop through each character
|
|
for char in str(string):
|
|
# set the current length if we add the character
|
|
cur_len += 2 if unicodedata.east_asian_width(char) in "WF" else 1
|
|
# if the new length is smaller than the output length to shorten too add the char
|
|
if cur_len <= (width - len(placeholder)):
|
|
out_string += char
|
|
# return string with new width and placeholder
|
|
return "{}{}".format(out_string, placeholder)
|
|
else:
|
|
return str(string)
|
|
|
|
|
|
# METHOD: stringLenCJK
|
|
# PARAMS: string
|
|
# RETURN: length including double count for double width characters
|
|
# DESC : because len on string in python counts characters but we need
|
|
# the width count for formatting, we count two for a double byte
|
|
# characters
|
|
def stringLenCJK(string):
|
|
""" return string len including double count for double width characters """
|
|
return sum(1 + (unicodedata.east_asian_width(c) in "WF") for c in string)
|
|
|
|
|
|
# FROM: https://stackoverflow.com/a/3308844/7811993
|
|
# METHOD: isLatin
|
|
# PARAMS: character
|
|
# RETURN: flagged LATIN or not char
|
|
# DESC : checks via the unciode class if a character is LATIN char based
|
|
def isLatin(uchr):
|
|
try:
|
|
# if we found in the dictionary return
|
|
return cache_latin_letters[uchr]
|
|
except KeyError:
|
|
# find LATIN in uncide type returned and set in dictionary for this character
|
|
return cache_latin_letters.setdefault(uchr, 'LATIN' in unicodedata.name(uchr))
|
|
|
|
|
|
# FROM: https://stackoverflow.com/a/3308844/7811993
|
|
# METHOD: onlyLatinChars
|
|
# PARAMS: string
|
|
# RETURN: True/False for if string is LATIN char based
|
|
# DESC : chekcs if a string is based on LATIN chars. No for any CJK, Cyrillic, Hebrew, etc
|
|
def onlyLatinChars(unistr):
|
|
return all(isLatin(uchr) for uchr in unistr if uchr.isalpha())
|
|
|
|
|
|
# METHOD: printHeader
|
|
# PARAMS: header string, line counter, print header counter trigger
|
|
# RETURN: line counter +1
|
|
# DESC : prints header line and header seperator line
|
|
def printHeader(header, lines=0, header_line=0):
|
|
global page_no
|
|
if lines == header_line:
|
|
# add one to the pages shown and reset the lines to start new page
|
|
page_no += 1
|
|
lines = 0
|
|
# print header
|
|
print("{}".format(header))
|
|
lines += 1
|
|
return lines
|
|
|
|
|
|
# METHOD: formatLen
|
|
# PARAMS: string, format length
|
|
# RETURN: returns adjusted format length
|
|
# DESC : in case of CJK characters we need to adjust the format length dynamically
|
|
# calculate correct length based on string given
|
|
def formatLen(string, length):
|
|
# returns length udpated for string with double byte characters
|
|
# get string length normal, get string length including double byte characters
|
|
# then subtract that from the original length
|
|
return length - (stringLenCJK(string) - len(string))
|
|
|
|
|
|
# METHOD: fileSortNumber
|
|
# PARAMS: file name
|
|
# RETURN: number found in the BK string or 0 for none
|
|
# DESC : gets the BK number for sorting in the file list
|
|
def fileSortNumber(file):
|
|
m = re.match('.*\.BK\.(\d+)\.xmp$', file)
|
|
return int(m.group(1)) if m is not None else 0
|
|
|
|
|
|
# METHOD: outputListWidthAdjust
|
|
# PARAMS: none
|
|
# RETURN: format_length dictionary
|
|
# DESC : adjusts the size for the format length for the list output
|
|
def outputListWidthAdjust():
|
|
# various string lengths
|
|
format_length = {
|
|
'filename': 35,
|
|
'latitude': 18,
|
|
'longitude': 18,
|
|
'code': 4,
|
|
'country': 15,
|
|
'state': 18,
|
|
'city': 20,
|
|
'location': 25,
|
|
'path': 40,
|
|
}
|
|
if args.compact_view:
|
|
reduce_percent = 40
|
|
# all formats are reduced to a mininum, we cut % off
|
|
for format_key in ['filename', 'latitude', 'longitude', 'country', 'state', 'city', 'location', 'path']:
|
|
format_length[format_key] = ceil(format_length[format_key] - ((format_length[format_key] / 100) * reduce_percent))
|
|
else:
|
|
# minimum resize size for a column
|
|
resize_width_min = 4
|
|
# the resize percent
|
|
# start with 10, then increase until we reach max
|
|
resize_percent_min = 10
|
|
resize_percent_max = 50
|
|
# abort flag so we can break out of the second loop too
|
|
abort = False
|
|
# formay key order, in which order the elements will be resized
|
|
format_key_order = []
|
|
# resize flag: 0 no, 1: make bigger, -1: make smaller
|
|
# change sizes for print based on terminal size
|
|
# NOTE: in screen or term this data might NOT be correct
|
|
# Current size needs the in between and left/right space data
|
|
current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2
|
|
if current_columns < get_terminal_size().columns:
|
|
resize = 1
|
|
format_key_order = ['path', 'location', 'state', 'city', 'country', 'filename']
|
|
else:
|
|
resize = -1
|
|
format_key_order = ['latitude', 'longitude', 'path', 'country', 'state', 'city', 'location', 'filename']
|
|
# if we have no auto adjust
|
|
if resize and args.no_autoadjust:
|
|
# warningn if screen is too small
|
|
if resize == -1:
|
|
print("[!!!] Screen layout might be skewed. Increase Terminal width")
|
|
resize = 0
|
|
else:
|
|
for resize_percent in range(resize_percent_min, resize_percent_max, 10):
|
|
for format_key in format_key_order:
|
|
resize_width = (format_length[format_key] / 100) * resize_percent
|
|
# if we down size, make it negative
|
|
if resize == -1:
|
|
resize_width *= -1
|
|
resize_width = ceil(format_length[format_key] + resize_width)
|
|
# in case too small, keep old one
|
|
format_length[format_key] = resize_width if resize_width > resize_width_min else format_length[format_key]
|
|
# calc new width for check if we can abort
|
|
current_columns = sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2
|
|
if (resize == 1 and current_columns >= get_terminal_size().columns) or (resize == -1 and current_columns < get_terminal_size().columns):
|
|
# check that we are not OVER but one under
|
|
width_up = get_terminal_size().columns - current_columns - 1
|
|
if (resize == 1 and width_up < 0) or (resize == -1 and width_up != 0):
|
|
if format_length['path'] + width_up >= resize_width_min:
|
|
format_length['path'] += width_up
|
|
abort = True
|
|
break
|
|
if abort:
|
|
break
|
|
if sum(format_length.values()) + ((len(format_length) - 1) * 3) + 2 > get_terminal_size().columns:
|
|
print("[!!!] Screen layout might be skewed. Increase Terminal width")
|
|
return format_length
|
|
|
|
|
|
# METHOD: getBackupFileCounter
|
|
# PARAMS: file name
|
|
# RETURN: next counter to be used for backup
|
|
# DESC :
|
|
def getBackupFileCounter(xmp_file):
|
|
# set to 1 for if we have no backups yet
|
|
bk_file_counter = 1
|
|
# get PATH from file and look for .BK. data in this folder matching, output is sorted per BK counter key
|
|
for bk_file in sorted(
|
|
glob.glob("{path}/{file}*.xmp".format(
|
|
path=os.path.split(xmp_file)[0],
|
|
file="{}.BK.".format(os.path.splitext(os.path.split(xmp_file)[1])[0])
|
|
)
|
|
),
|
|
# custom sort key to get the backup files sorted correctly
|
|
key=lambda pos: fileSortNumber(pos),
|
|
reverse=True
|
|
):
|
|
# BK.1, etc -> get the number
|
|
bk_pos = fileSortNumber(bk_file)
|
|
if bk_pos > 0:
|
|
if args.debug:
|
|
print("#### **** File: {}, Counter: {} -> {}".format(bk_file, bk_pos, bk_pos + 1))
|
|
# check if found + 1 is bigger than set, if yes, set to new bk counter
|
|
if bk_pos + 1 > bk_file_counter:
|
|
bk_file_counter = bk_pos + 1
|
|
break
|
|
# return the next correct number for backup
|
|
return bk_file_counter
|
|
|
|
##############################################################
|
|
# ARGUMENT PARSNING
|
|
##############################################################
|
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Reverse Geoencoding based on set Latitude/Longitude data in XMP files',
|
|
# formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='Sample: (todo)'
|
|
)
|
|
|
|
# xmp folder (or folders), or file (or files)
|
|
# note that the target directory or file needs to be writeable
|
|
parser.add_argument('-i', '--include-source',
|
|
required=True,
|
|
nargs='*',
|
|
action=writable_dir_folder,
|
|
dest='xmp_sources',
|
|
metavar='XMP SOURCE FOLDER',
|
|
help='The source folder or folders with the XMP files that need reverse geo encoding to be set. Single XMP files can be given here'
|
|
)
|
|
# exclude folders
|
|
parser.add_argument('-x', '--exclude-source',
|
|
nargs='*',
|
|
action=writable_dir_folder,
|
|
dest='exclude_sources',
|
|
metavar='EXCLUDE XMP SOURCE FOLDER',
|
|
help='Folders and files that will be excluded.'
|
|
)
|
|
|
|
# LR database (base folder)
|
|
# get .lrcat file in this folder
|
|
parser.add_argument('-l', '--lightroom',
|
|
# required=True,
|
|
action=readable_dir,
|
|
dest='lightroom_folder',
|
|
metavar='LIGHTROOM FOLDER',
|
|
help='Lightroom catalogue base folder'
|
|
)
|
|
|
|
# strict LR check with base path next to the file base name
|
|
parser.add_argument('-s', '--strict',
|
|
dest='lightroom_strict',
|
|
action='store_true',
|
|
help='Do strict check for Lightroom files including Path in query'
|
|
)
|
|
|
|
# set behaviour override
|
|
# FLAG: default: only set not filled
|
|
# other: overwrite all or overwrite if one is missing, overwrite specifc field (as defined below)
|
|
# fields: Location, City, State, Country, CountryCode
|
|
parser.add_argument('-f', '--field',
|
|
action='append',
|
|
type=str.lower, # make it lowercase for check
|
|
choices=['overwrite', 'location', 'city', 'state', 'country', 'countrycode'],
|
|
dest='field_controls',
|
|
metavar='<overwrite, location, city, state, country, countrycode>',
|
|
help='On default only set fields that are not set yet. Options are: '\
|
|
'Overwrite (write all new), Location, City, State, Country, CountryCode. '\
|
|
'Multiple can be given for combination overwrite certain fields only or set only certain fields. '\
|
|
'If with overwrite the field will be overwritten if already set, else it will be always skipped.'
|
|
)
|
|
|
|
parser.add_argument('-d', '--fuzzy-cache',
|
|
type=str.lower,
|
|
action=distance_values,
|
|
nargs='?',
|
|
const='10m', # default is 10m
|
|
dest='fuzzy_distance',
|
|
metavar='FUZZY DISTANCE',
|
|
help='Allow fuzzy distance cache lookup. Optional distance can be given, '\
|
|
'if not set default of 10m is used. '\
|
|
'Allowed argument is in the format of 12m or 12km'
|
|
)
|
|
|
|
# Google Maps API key to overcome restrictions
|
|
parser.add_argument('-g', '--google',
|
|
dest='google_api_key',
|
|
metavar='GOOGLE API KEY',
|
|
help='Set a Google API Maps key to overcome the default lookup limitations'
|
|
)
|
|
|
|
# use open street maps
|
|
parser.add_argument('-o', '--openstreetmap',
|
|
dest='use_openstreetmap',
|
|
action='store_true',
|
|
help='Use openstreetmap instead of Google'
|
|
)
|
|
|
|
# email of open street maps requests
|
|
parser.add_argument('-e', '--email',
|
|
dest='email',
|
|
metavar='EMIL ADDRESS',
|
|
help='An email address for OpenStreetMap'
|
|
)
|
|
|
|
# write api/email settings to config file
|
|
parser.add_argument('-w', '--write-settings',
|
|
dest='config_write',
|
|
action='store_true',
|
|
help='Write Google API or OpenStreetMap email to config file'
|
|
)
|
|
|
|
# only read data and print on screen, do not write anything
|
|
parser.add_argument('-r', '--read-only',
|
|
dest='read_only',
|
|
action='store_true',
|
|
help='Read current values from the XMP file only, do not read from LR or lookup any data and write back'
|
|
)
|
|
|
|
# only list unset ones
|
|
parser.add_argument('-u', '--unset-only',
|
|
dest='unset_only',
|
|
action='store_true',
|
|
help='Only list unset XMP files'
|
|
)
|
|
|
|
# only list unset GPS codes
|
|
parser.add_argument('-p', '--unset-gps-only',
|
|
dest='unset_gps_only',
|
|
action='store_true',
|
|
help='Only list unset XMP files for GPS fields'
|
|
)
|
|
|
|
# don't try to do auto adjust in list view
|
|
parser.add_argument('-a', '--no-autoadjust',
|
|
dest='no_autoadjust',
|
|
action='store_true',
|
|
help='Don\'t try to auto adjust columns'
|
|
)
|
|
|
|
# compact view, compresses columns down to a minimum
|
|
parser.add_argument('-c', '--compact',
|
|
dest='compact_view',
|
|
action='store_true',
|
|
help='Very compact list view'
|
|
)
|
|
|
|
# Do not create backup files
|
|
parser.add_argument('-n', '--nobackup',
|
|
dest='no_xmp_backup',
|
|
action='store_true',
|
|
help='Do not create a backup from the XMP file'
|
|
)
|
|
|
|
# verbose args for more detailed output
|
|
parser.add_argument('-v', '--verbose',
|
|
action='count',
|
|
dest='verbose',
|
|
help='Set verbose output level'
|
|
)
|
|
|
|
# debug flag
|
|
parser.add_argument('--debug', action='store_true', dest='debug', help='Set detailed debug output')
|
|
# test flag
|
|
parser.add_argument('--test', action='store_true', dest='test', help='Do not write data back to file')
|
|
|
|
# read in the argumens
|
|
args = parser.parse_args()
|
|
|
|
##############################################################
|
|
# MAIN CODE
|
|
##############################################################
|
|
|
|
# init verbose to 0 if not set
|
|
if not args.verbose:
|
|
args.verbose = 0
|
|
# init exclude source to list if not set
|
|
if not args.exclude_sources:
|
|
args.exclude_sources = []
|
|
# init args unset (for list view) with 0 if unset
|
|
if not args.unset_only:
|
|
args.unset_only = 0
|
|
|
|
if args.debug:
|
|
print("### ARGUMENT VARS: I: {incl}, X: {excl}, L: {lr}, F: {fc}, D: {fdist}, M: {osm}, G: {gp}, E: {em}, R: {read}, U: {us}, A: {adj}, C: {cmp}, N: {nbk}, W: {wrc}, V: {v}, D: {d}, T: {t}".format(
|
|
incl=args.xmp_sources,
|
|
excl=args.exclude_sources,
|
|
lr=args.lightroom_folder,
|
|
fc=args.field_controls,
|
|
fdist=args.fuzzy_distance,
|
|
osm=args.use_openstreetmap,
|
|
gp=args.google_api_key,
|
|
em=args.email,
|
|
read=args.read_only,
|
|
us=args.unset_only,
|
|
adj=args.no_autoadjust,
|
|
cmp=args.compact_view,
|
|
nbk=args.no_xmp_backup,
|
|
wrc=args.config_write,
|
|
v=args.verbose,
|
|
d=args.debug,
|
|
t=args.test
|
|
))
|
|
|
|
# error flag
|
|
error = False
|
|
# set search map type
|
|
map_type = 'google' if not args.use_openstreetmap else 'openstreetmap'
|
|
# if -g and -o, error
|
|
if args.google_api_key and args.use_openstreetmap:
|
|
print("You cannot set a Google API key and use OpenStreetMap at the same time")
|
|
error = True
|
|
# or if -g and -e
|
|
if args.google_api_key and args.email:
|
|
print("You cannot set a Google API key and OpenStreetMap email at the same time")
|
|
error = True
|
|
# or -e and no -o
|
|
if args.email and not args.use_openstreetmap:
|
|
print("You cannot set an OpenStreetMap email and not use OpenStreetMap")
|
|
error = True
|
|
# if email and not basic valid email (@ .)
|
|
if args.email:
|
|
if not re.match('^.+@.+\.[A-Za-z]{1,}$', args.email):
|
|
print("Not a valid email for OpenStreetMap: {}".format(args.email))
|
|
error = True
|
|
# on error exit here
|
|
if error:
|
|
sys.exit(1)
|
|
|
|
config = configparser.ConfigParser()
|
|
# try to find config file in following order
|
|
# $HOME/.config/
|
|
config_file = 'reverse_geolocate.cfg'
|
|
config_folder = os.path.expanduser('~/.config/reverseGeolocate/')
|
|
config_data = '{}{}'.format(config_folder, config_file)
|
|
# if file exists read, if not skip unless we have write flag and google api or openstreetmaps email
|
|
if os.path.isfile(config_data):
|
|
config.read(config_data)
|
|
# check if api group & setting is there. also never overwrite argument given data
|
|
if 'API' in config:
|
|
if 'googleapikey' in config['API']:
|
|
if not args.google_api_key:
|
|
args.google_api_key = config['API']['googleapikey']
|
|
if 'openstreetmapemail' in config['API']:
|
|
if not args.email:
|
|
args.email = config['API']['openstreetmapemail']
|
|
# write data if exists and changed
|
|
if args.config_write and (args.google_api_key or args.email):
|
|
config_change = False
|
|
# check if new value differs, if yes, change and write
|
|
if 'API' not in config:
|
|
config['API'] = {}
|
|
if args.google_api_key and ('googleapikey' not in config['API'] or config['API']['googleapikey'] != args.google_api_key):
|
|
config['API']['googleapikey'] = args.google_api_key
|
|
config_change = True
|
|
if args.email and ('openstreetmapemail' not in config['API'] or config['API']['openstreetmapemail'] != args.email):
|
|
config['API']['openstreetmapemail'] = args.email
|
|
config_change = True
|
|
if config_change:
|
|
# if we do not have the base folder create that first
|
|
if not os.path.exists(config_folder):
|
|
os.makedirs(config_folder)
|
|
with open(config_data, 'w') as fptr:
|
|
config.write(fptr)
|
|
if args.debug:
|
|
print("### OVERRIDE API: G: {}, O: {}".format(args.google_api_key, args.email))
|
|
|
|
# The XMP fields const lookup values
|
|
# XML/XMP
|
|
# READ:
|
|
# exif:GPSLatitude
|
|
# exif:GPSLongitude
|
|
# READ for if filled
|
|
# Iptc4xmpCore:Location
|
|
# photoshop:City
|
|
# photoshop:State
|
|
# photoshop:Country
|
|
# Iptc4xmpCore:CountryCode
|
|
xmp_fields = {
|
|
'GPSLatitude': consts.XMP_NS_EXIF, # EXIF GPSLat/Long are stored in Degree,Min.Sec[NESW] format
|
|
'GPSLongitude': consts.XMP_NS_EXIF,
|
|
'Location': consts.XMP_NS_IPTCCore,
|
|
'City': consts.XMP_NS_Photoshop,
|
|
'State': consts.XMP_NS_Photoshop,
|
|
'Country': consts.XMP_NS_Photoshop,
|
|
'CountryCode': consts.XMP_NS_IPTCCore
|
|
}
|
|
# non lat/long fields (for loc loops)
|
|
data_set_loc = ('Location', 'City', 'State', 'Country', 'CountryCode')
|
|
# one xmp data set
|
|
data_set = {
|
|
'GPSLatitude': '',
|
|
'GPSLongitude': '',
|
|
'Location': '',
|
|
'City': '',
|
|
'State': '',
|
|
'Country': '',
|
|
'CountryCode': ''
|
|
}
|
|
# original set for compare (is constant unchanged)
|
|
data_set_original = {}
|
|
# cache set to avoid double lookups for identical Lat/Ling
|
|
data_cache = {}
|
|
# work files, all files + folders we need to work on
|
|
work_files = []
|
|
# all failed files
|
|
failed_files = []
|
|
# use lightroom
|
|
use_lightroom = False
|
|
# cursors & query
|
|
query = ''
|
|
cur = ''
|
|
# count variables
|
|
count = {
|
|
'all': 0,
|
|
'listed': 0,
|
|
'read': 0,
|
|
'map': 0,
|
|
'cache': 0,
|
|
'fuzzy_cache': 0,
|
|
'lightroom': 0,
|
|
'changed': 0,
|
|
'failed': 0,
|
|
'skipped': 0,
|
|
'not_found': 0,
|
|
'many_found': 0,
|
|
}
|
|
|
|
# do lightroom stuff only if we have the lightroom folder
|
|
if args.lightroom_folder:
|
|
# query string for lightroom DB check
|
|
query = 'SELECT Adobe_images.id_local, AgLibraryFile.baseName, AgLibraryRootFolder.absolutePath, AgLibraryRootFolder.name as realtivePath, AgLibraryFolder.pathFromRoot, AgLibraryFile.originalFilename, '
|
|
query += 'AgHarvestedExifMetadata.gpsLatitude, AgHarvestedExifMetadata.gpsLongitude, '
|
|
query += 'AgHarvestedIptcMetadata.locationDataOrigination, AgInternedIptcLocation.value as Location, AgInternedIptcCity.value as City, '
|
|
query += 'AgInternedIptcState.value as State, AgInternedIptcCountry.value as Country, AgInternedIptcIsoCountryCode.value as CountryCode '
|
|
query += 'FROM AgLibraryFile, AgHarvestedExifMetadata, AgLibraryFolder, AgLibraryRootFolder, Adobe_images '
|
|
query += 'LEFT JOIN AgHarvestedIptcMetadata ON Adobe_images.id_local = AgHarvestedIptcMetadata.image '
|
|
query += 'LEFT JOIN AgInternedIptcLocation ON AgHarvestedIptcMetadata.locationRef = AgInternedIptcLocation.id_local '
|
|
query += 'LEFT JOIN AgInternedIptcCity ON AgHarvestedIptcMetadata.cityRef = AgInternedIptcCity.id_local '
|
|
query += 'LEFT JOIN AgInternedIptcState ON AgHarvestedIptcMetadata.stateRef = AgInternedIptcState.id_local '
|
|
query += 'LEFT JOIN AgInternedIptcCountry ON AgHarvestedIptcMetadata.countryRef = AgInternedIptcCountry.id_local '
|
|
query += 'LEFT JOIN AgInternedIptcIsoCountryCode ON AgHarvestedIptcMetadata.isoCountryCodeRef = AgInternedIptcIsoCountryCode.id_local '
|
|
query += 'WHERE Adobe_images.rootFile = AgLibraryFile.id_local AND Adobe_images.id_local = AgHarvestedExifMetadata.image AND AgLibraryFile.folder = AgLibraryFolder.id_local AND AgLibraryFolder.rootFolder = AgLibraryRootFolder.id_local '
|
|
query += 'AND AgLibraryFile.baseName = ?'
|
|
# absolutePath + pathFromRoot = path of XMP file - XMP file
|
|
if args.lightroom_strict:
|
|
query += 'AND AgLibraryRootFolder.absolutePath || AgLibraryFolder.pathFromRoot = ?'
|
|
|
|
# connect to LR database for reading
|
|
# open the folder and look for the first lrcat file in there
|
|
for file in os.listdir(args.lightroom_folder):
|
|
if file.endswith('.lrcat'):
|
|
lightroom_database = os.path.join(args.lightroom_folder, file)
|
|
lrdb = sqlite3.connect(lightroom_database)
|
|
if not lightroom_database or not lrdb:
|
|
print("(!) We could not find a lrcat file in the given lightroom folder or DB connection failed: {}".format(args.lightroom_folder))
|
|
# flag for end
|
|
error = True
|
|
else:
|
|
# set row so we can access each element by the name
|
|
lrdb.row_factory = sqlite3.Row
|
|
# set cursor
|
|
cur = lrdb.cursor()
|
|
# flag that we have Lightroom DB
|
|
use_lightroom = True
|
|
|
|
# on error exit here
|
|
if error:
|
|
sys.exit(1)
|
|
|
|
# init the XML meta for handling
|
|
xmp = XMPMeta()
|
|
|
|
# loop through the xmp_sources (folder or files) and read in the XMP data for LAT/LONG, other data
|
|
for xmp_file_source in args.xmp_sources:
|
|
# if folder, open and loop
|
|
# NOTE: we do check for folders in there, if there are we recourse traverse them
|
|
# also check that folder is not in exclude list
|
|
if os.path.isdir(xmp_file_source) and xmp_file_source.rstrip('/') not in [x.rstrip('/') for x in args.exclude_sources]:
|
|
# open folder and look for any .xmp files and push them into holding array
|
|
# if there are folders, dive into them
|
|
# or glob glob all .xmp files + directory
|
|
for root, dirs, files in os.walk(xmp_file_source):
|
|
for file in sorted(files):
|
|
# 1) but has no .BK. inside
|
|
# 2) file is not in exclude list
|
|
# 3) full folder is not in exclude list
|
|
if file.endswith(".xmp") and ".BK." not in file \
|
|
and "{}/{}".format(root, file) not in args.exclude_sources \
|
|
and root.rstrip('/') not in [x.rstrip('/') for x in args.exclude_sources]:
|
|
if "{}/{}".format(root, file) not in work_files:
|
|
work_files.append("{}/{}".format(root, file))
|
|
count['all'] += 1
|
|
else:
|
|
# not already added to list and not in the exclude list either
|
|
if xmp_file_source not in work_files and xmp_file_source not in args.exclude_sources:
|
|
work_files.append(xmp_file_source)
|
|
count['all'] += 1
|
|
if args.debug:
|
|
print("### Work Files {}".format(work_files))
|
|
|
|
# if we have read only we print list format style
|
|
if args.read_only:
|
|
# adjust the output width for the list view
|
|
format_length = outputListWidthAdjust()
|
|
|
|
# after how many lines do we reprint the header
|
|
header_repeat = 50
|
|
# how many pages will we have
|
|
page_all = ceil(len(work_files) / header_repeat)
|
|
# current page number
|
|
page_no = 1
|
|
# the formatted line for the output
|
|
# 4 {} => final replace: data (2 pre replaces)
|
|
# 1 {} => length replace here
|
|
format_line = " {{{{filename:<{}}}}} | {{{{latitude:>{}}}}} | {{{{longitude:>{}}}}} | {{{{code:<{}}}}} | {{{{country:<{}}}}} | {{{{state:<{}}}}} | {{{{city:<{}}}}} | {{{{location:<{}}}}} | {{{{path:<{}}}}}".format(
|
|
"{filenamelen}",
|
|
format_length['latitude'],
|
|
format_length['longitude'],
|
|
format_length['code'],
|
|
"{countrylen}",
|
|
"{statelen}",
|
|
"{citylen}",
|
|
"{locationlen}",
|
|
"{pathlen}" # set path len replacer variable
|
|
)
|
|
# header line format:
|
|
# blank line
|
|
# header title
|
|
# seperator line
|
|
header_line = '''{}
|
|
{}
|
|
{}'''.format(
|
|
'> Page {page_no:,}/{page_all:,}', # can later be set to something else, eg page numbers
|
|
# pre replace path length before we add the header titles
|
|
format_line.format(
|
|
filenamelen=format_length['filename'],
|
|
countrylen=format_length['country'],
|
|
statelen=format_length['state'],
|
|
citylen=format_length['city'],
|
|
locationlen=format_length['location'],
|
|
pathlen=format_length['path']
|
|
).format( # the header title line
|
|
filename='File'[:format_length['filename']],
|
|
latitude='Latitude'[:format_length['latitude']],
|
|
longitude='Longitude'[:format_length['longitude']],
|
|
code='Code',
|
|
country='Country'[:format_length['country']],
|
|
state='State'[:format_length['state']],
|
|
city='City'[:format_length['city']],
|
|
location='Location'[:format_length['location']],
|
|
path='Path'[:format_length['path']]
|
|
),
|
|
"{}+{}+{}+{}+{}+{}+{}+{}+{}".format( # the header seperator line
|
|
'-' * (format_length['filename'] + 2),
|
|
'-' * (format_length['latitude'] + 2),
|
|
'-' * (format_length['longitude'] + 2),
|
|
'-' * (format_length['code'] + 2),
|
|
'-' * (format_length['country'] + 2),
|
|
'-' * (format_length['state'] + 2),
|
|
'-' * (format_length['city'] + 2),
|
|
'-' * (format_length['location'] + 2),
|
|
'-' * (format_length['path'] + 2)
|
|
)
|
|
)
|
|
# print header
|
|
printHeader(header_line.format(page_no=page_no, page_all=page_all))
|
|
# print no files found if we have no files
|
|
if not work_files:
|
|
print("{:<60}".format('[!!!] No files found'))
|
|
|
|
# ### MAIN WORK LOOP
|
|
# now we just loop through each file and work on them
|
|
for xmp_file in work_files:
|
|
if not args.read_only:
|
|
print("---> {}: ".format(xmp_file), end='')
|
|
|
|
# ### ACTION FLAGs
|
|
write_file = False
|
|
|
|
# ### XMP FILE READING
|
|
# open file & read all into buffer
|
|
with open(xmp_file, 'r') as fptr:
|
|
strbuffer = fptr.read()
|
|
# read fields from the XMP file and store in hash
|
|
xmp.parse_from_str(strbuffer)
|
|
for xmp_field in xmp_fields:
|
|
# need to check if propert exist or it will the exempi routine will fail
|
|
if xmp.does_property_exist(xmp_fields[xmp_field], xmp_field):
|
|
data_set[xmp_field] = xmp.get_property(xmp_fields[xmp_field], xmp_field)
|
|
else:
|
|
data_set[xmp_field] = ''
|
|
if args.debug:
|
|
print("### => XMP: {}:{} => {}".format(xmp_fields[xmp_field], xmp_field, data_set[xmp_field]))
|
|
if args.read_only:
|
|
# view only if list all or if data is unset
|
|
if (not args.unset_only and not args.unset_gps_only) or (args.unset_only and '' in data_set.values()) or (args.unset_gps_only and (not data_set['GPSLatitude'] or not data_set['GPSLongitude'])):
|
|
# for read only we print out the data formatted
|
|
# headline check, do we need to print that
|
|
count['read'] = printHeader(header_line.format(page_no=page_no, page_all=page_all), count['read'], header_repeat)
|
|
# the data content
|
|
print(format_line.format(
|
|
# for all possible non latin fields we do adjust if it has double byte characters inside
|
|
filenamelen=formatLen(shortenPath(xmp_file, format_length['filename'], file_only=True), format_length['filename']),
|
|
countrylen=formatLen(shortenString(data_set['Country'], width=format_length['country']), format_length['country']),
|
|
statelen=formatLen(shortenString(data_set['State'], width=format_length['state']), format_length['state']),
|
|
citylen=formatLen(shortenString(data_set['City'], width=format_length['city']), format_length['city']),
|
|
locationlen=formatLen(shortenString(data_set['Location'], width=format_length['location']), format_length['location']),
|
|
pathlen=formatLen(shortenPath(xmp_file, format_length['path'], path_only=True), format_length['path'])
|
|
).format(
|
|
filename=shortenPath(xmp_file, format_length['filename'], file_only=True), # shorten from the left
|
|
latitude=str(convertDMStoLat(data_set['GPSLatitude']))[:format_length['latitude']], # cut off from the right
|
|
longitude=str(convertDMStoLong(data_set['GPSLongitude']))[:format_length['longitude']],
|
|
code=data_set['CountryCode'][:2].center(4), # is only 2 chars
|
|
country=shortenString(data_set['Country'], width=format_length['country']), # shorten from the right
|
|
state=shortenString(data_set['State'], width=format_length['state']),
|
|
city=shortenString(data_set['City'], width=format_length['city']),
|
|
location=shortenString(data_set['Location'], width=format_length['location']),
|
|
path=shortenPath(xmp_file, format_length['path'], path_only=True)
|
|
)
|
|
)
|
|
count['listed'] += 1
|
|
else:
|
|
# ### LR Action Flag (data ok)
|
|
lightroom_data_ok = True
|
|
# ### LIGHTROOM DB READING
|
|
# read in data from DB if we uave lightroom folder
|
|
if use_lightroom:
|
|
# get the base file name, we need this for lightroom
|
|
xmp_file_basename = os.path.splitext(os.path.split(xmp_file)[1])[0]
|
|
# try to get this file name from the DB
|
|
lr_query_params = [xmp_file_basename]
|
|
# for strict check we need to get the full path, and add / as the LR stores the last folder with /
|
|
if args.lightroom_strict:
|
|
xmp_file_path = "{}/{}".format(os.path.split(xmp_file)[0], '/')
|
|
lr_query_params.append(xmp_file_path)
|
|
cur.execute(query, lr_query_params)
|
|
# get the row data
|
|
lrdb_row = cur.fetchone()
|
|
# abort the read because we found more than one row
|
|
if cur.fetchone() is not None:
|
|
print("(!) Lightroom DB returned more than one more row")
|
|
lightroom_data_ok = False
|
|
count['many_found'] += 1
|
|
# Notify if we couldn't find one
|
|
elif not lrdb_row:
|
|
print("(!) Could not get data from Lightroom DB")
|
|
lightroom_data_ok = False
|
|
count['not_found'] += 1
|
|
if args.debug and lrdb_row:
|
|
print("### LightroomDB: {} / {}".format(tuple(lrdb_row), lrdb_row.keys()))
|
|
|
|
# create a duplicate copy for later checking if something changed
|
|
data_set_original = data_set.copy()
|
|
# check if LR exists and use this to compare to XMP data
|
|
# is LR GPS and no XMP GPS => use LR and set XMP
|
|
# same for location names
|
|
# if missing in XMP but in LR -> set in XMP
|
|
# if missing in both do lookup in Maps
|
|
if use_lightroom and lightroom_data_ok:
|
|
# check lat/long separate
|
|
if lrdb_row['gpsLatitude'] and not data_set['GPSLatitude']:
|
|
# we need to convert to the Degree,Min.sec[NSEW] format
|
|
data_set['GPSLatitude'] = convertLatToDMS(lrdb_row['gpsLatitude'])
|
|
if lrdb_row['gpsLongitude'] and not data_set['GPSLongitude']:
|
|
data_set['GPSLongitude'] = convertLongToDMS(lrdb_row['gpsLongitude'])
|
|
# now check Location, City, etc
|
|
for loc in data_set_loc:
|
|
# overwrite original set (read from XMP) with LR data if original data is missing
|
|
if lrdb_row[loc] and not data_set[loc]:
|
|
data_set[loc] = lrdb_row[loc]
|
|
if args.debug:
|
|
print("### -> LR: {} => {}".format(loc, lrdb_row[loc]))
|
|
# base set done, now check if there is anything unset in the data_set, if yes do a lookup in maps
|
|
# run this through the overwrite checker to get unset if we have a forced overwrite
|
|
has_unset = False
|
|
failed = False
|
|
from_cache = False
|
|
for loc in data_set_loc:
|
|
if checkOverwrite(data_set[loc], loc, args.field_controls):
|
|
has_unset = True
|
|
if has_unset:
|
|
# check if lat/long is in cache
|
|
cache_key = '{}#{}'.format(data_set['GPSLongitude'], data_set['GPSLatitude'])
|
|
if args.debug:
|
|
print("### *** CACHE: {}: {}".format(cache_key, 'NO' if cache_key not in data_cache else 'YES'))
|
|
# main chache check = identical
|
|
# second cache level check is on distance:
|
|
# default distance is 10m, can be set via flag
|
|
# check distance to previous cache entries (reverse newest to oldest) and match before we do google lookup
|
|
if cache_key not in data_cache:
|
|
has_fuzzy_cache = False
|
|
if args.fuzzy_distance:
|
|
shortest_distance = args.fuzzy_distance
|
|
best_match_latlong = ''
|
|
# check if we have fuzzy distance, if no valid found do maps lookup
|
|
for _cache_key in data_cache:
|
|
# split up cache key so we can use in the distance calc method
|
|
to_lat_long = _cache_key.split('#')
|
|
# get the distance based on current set + cached set
|
|
# print("Lookup f-long {} f-lat {} t-long {} t-lat {}".format(data_set['GPSLongitude'], data_set['GPSLatitude'], to_lat_long[0], to_lat_long[1]))
|
|
distance = getDistance(from_longitude=data_set['GPSLongitude'], from_latitude=data_set['GPSLatitude'], to_longitude=to_lat_long[0], to_latitude=to_lat_long[1])
|
|
if args.debug:
|
|
print("### **= FUZZY CACHE: => distance: {} (m), shortest: {}".format(distance, shortest_distance))
|
|
if distance <= shortest_distance:
|
|
# set new distance and keep current best matching location
|
|
shortest_distance = distance
|
|
best_match_latlong = _cache_key
|
|
has_fuzzy_cache = True
|
|
if args.debug:
|
|
print("### ***= FUZZY CACHE: YES => Best match: {}".format(best_match_latlong))
|
|
if not has_fuzzy_cache:
|
|
# get location from maps (google or openstreetmap)
|
|
maps_location = reverseGeolocate(latitude=data_set['GPSLatitude'], longitude=data_set['GPSLongitude'], map_type=map_type)
|
|
# cache data with Lat/Long
|
|
data_cache[cache_key] = maps_location
|
|
from_cache = False
|
|
else:
|
|
maps_location = data_cache[best_match_latlong]
|
|
# cache this one, because the next one will match this one too
|
|
# we don't need to loop search again for the same fuzzy location
|
|
data_cache[cache_key] = maps_location
|
|
count['cache'] += 1
|
|
count['fuzzy_cache'] += 1
|
|
from_cache = True
|
|
else:
|
|
# load location from cache
|
|
maps_location = data_cache[cache_key]
|
|
count['cache'] += 1
|
|
from_cache = True
|
|
# overwrite sets (note options check here)
|
|
if args.debug:
|
|
print("### Map Location ({}): {}".format(map_type, maps_location))
|
|
# must have at least the country set to write anything back
|
|
if maps_location['Country']:
|
|
for loc in data_set_loc:
|
|
# only write to XMP if overwrite check passes
|
|
if checkOverwrite(data_set_original[loc], loc, args.field_controls):
|
|
data_set[loc] = maps_location[loc]
|
|
xmp.set_property(xmp_fields[loc], loc, maps_location[loc])
|
|
write_file = True
|
|
if write_file:
|
|
count['map'] += 1
|
|
else:
|
|
print("(!) Could not geo loaction data ", end='')
|
|
failed = True
|
|
else:
|
|
if args.debug:
|
|
print("Lightroom data use: {}, Lightroom data ok: {}".format(use_lightroom, lightroom_data_ok))
|
|
# check if the data_set differs from the original (LR db load)
|
|
# if yes write, else skip
|
|
if use_lightroom and lightroom_data_ok:
|
|
for key in data_set:
|
|
# if not the same (to original data) and passes overwrite check
|
|
if data_set[key] != data_set_original[key] and checkOverwrite(data_set_original[key], key, args.field_controls):
|
|
xmp.set_property(xmp_fields[key], key, data_set[key])
|
|
write_file = True
|
|
if write_file:
|
|
count['lightroom'] += 1
|
|
# if we have the write flag set, write data
|
|
if write_file:
|
|
if not args.test:
|
|
# use copyfile to create a backup copy
|
|
if not args.no_xmp_backup:
|
|
# check if there is another file with .BK. already there, if yes, get the max number and +1 it, if not set to 1
|
|
bk_file_counter = getBackupFileCounter(xmp_file)
|
|
# copy to new backup file
|
|
copyfile(xmp_file, "{}.BK.{}{}".format(os.path.splitext(xmp_file)[0], bk_file_counter, os.path.splitext(xmp_file)[1]))
|
|
# write back to riginal file
|
|
with open(xmp_file, 'w') as fptr:
|
|
fptr.write(xmp.serialize_to_str(omit_packet_wrapper=True))
|
|
else:
|
|
print("[TEST] Would write {} ".format(data_set, xmp_file), end='')
|
|
if from_cache:
|
|
print("[UPDATED FROM CACHE]")
|
|
else:
|
|
print("[UPDATED]")
|
|
count['changed'] += 1
|
|
elif failed:
|
|
print("[FAILED]")
|
|
count['failed'] += 1
|
|
# log data to array for post print
|
|
failed_files.append(xmp_file)
|
|
else:
|
|
print("[SKIP]")
|
|
count['skipped'] += 1
|
|
|
|
# close DB connection
|
|
if use_lightroom:
|
|
lrdb.close()
|
|
|
|
# end stats only if we write
|
|
print("{}".format('=' * 40))
|
|
print("XMP Files found : {:9,}".format(count['all']))
|
|
if args.read_only:
|
|
print("XMP Files listed : {:9,}".format(count['listed']))
|
|
if not args.read_only:
|
|
print("Updated : {:9,}".format(count['changed']))
|
|
print("Skipped : {:9,}".format(count['skipped']))
|
|
print("New GeoLocation from Map : {:9,}".format(count['map']))
|
|
print("GeoLocation from Cache : {:9,}".format(count['cache']))
|
|
print("GeoLocation from Fuzzy Cache : {:9,}".format(count['fuzzy_cache']))
|
|
print("Failed reverse GeoLocate : {:9,}".format(count['failed']))
|
|
if use_lightroom:
|
|
print("GeoLocaction from Lightroom : {:9,}".format(count['lightroom']))
|
|
print("No Lightroom data found : {:9,}".format(count['not_found']))
|
|
print("More than one found in LR : {:9,}".format(count['many_found']))
|
|
# if we have failed data
|
|
if len(failed_files) > 0:
|
|
print("{}".format('-' * 40))
|
|
print("Files that failed to update:")
|
|
print("{}".format(', '.join(failed_files)))
|
|
|
|
# __END__
|