#!/usr/bin/env python
"""Abstract API over the Google Alerts service."""
import base64
import json
import logging
import os
import pickle
import re
import sys
import requests
import requests.utils
from bs4 import BeautifulSoup
__author__ = "Brandon Dixon"
__copyright__ = "Copyright, Brandion Dixon"
__credits__ = ["Brandon Dixon"]
__license__ = "MIT"
__maintainer__ = "Brandon Dixon"
__email__ = "brandon@9bplus.com"
__status__ = "BETA"
PY2 = False
if sys.version_info[0] < 3:
PY2 = True
[docs]class InvalidCredentials(Exception):
"""Exception for invalid credentials."""
pass
[docs]class AccountCaptcha(Exception):
"""Exception for account CAPTCHA."""
pass
[docs]class InvalidState(Exception):
"""Exception for invalid state."""
pass
class StateParseFailure(Exception):
"""Exception for failing to parse state."""
pass
[docs]class MonitorNotFound(Exception):
"""Exception for missing monitors."""
pass
[docs]class InvalidConfig(Exception):
"""Exception for invalid configurations."""
pass
[docs]class ActionError(Exception):
"""Exception for generic failures on action."""
pass
def obfuscate(p, action):
"""Obfuscate the auth details to avoid easy snatching.
It's best to use a throw away account for these alerts to avoid having
your authentication put at risk by storing it locally.
"""
key = "ru7sll3uQrGtDPcIW3okutpFLo6YYtd5bWSpbZJIopYQ0Du0a1WlhvJOaZEH"
s = list()
if action == 'store':
if PY2:
for i in range(len(p)):
kc = key[i % len(key)]
ec = chr((ord(p[i]) + ord(kc)) % 256)
s.append(ec)
return base64.urlsafe_b64encode("".join(s))
else:
return base64.urlsafe_b64encode(p.encode()).decode()
else:
if PY2:
e = base64.urlsafe_b64decode(p)
for i in range(len(e)):
kc = key[i % len(key)]
dc = chr((256 + ord(e[i]) - ord(kc)) % 256)
s.append(dc)
return "".join(s)
else:
e = base64.urlsafe_b64decode(p)
return e.decode()
CONFIG_PATH = os.path.expanduser('~/.config/google_alerts')
CONFIG_FILE = os.path.join(CONFIG_PATH, 'config.json')
SESSION_FILE = os.path.join(CONFIG_PATH, 'session')
CONFIG_DEFAULTS = {'email': '', 'password': '', 'py2': PY2}
[docs]class GoogleAlerts:
NAME = "GoogleAlerts"
LOG_LEVEL = logging.DEBUG
LOGIN_URL = 'https://accounts.google.com/signin'
AUTH_URL = 'https://accounts.google.com/signin/challenge/sl/password'
ALERTS_URL = 'https://www.google.com/alerts'
TEST_URL = 'https://myaccount.google.com/?pli=1'
TEST_KEY = 'CREATE YOUR GOOGLE ACCOUNT'
CAPTCHA_KEY = 'captcha-container'
ALERTS_MODIFY_URL = 'https://www.google.com/alerts/modify?x={requestX}'
ALERTS_CREATE_URL = 'https://www.google.com/alerts/create?x={requestX}'
ALERTS_DELETE_URL = 'https://www.google.com/alerts/delete?x={requestX}'
MONITOR_MATCH_TYPE = {
2: 'ALL',
3: 'BEST'
}
ALERT_FREQ = {
1: 'AS_IT_HAPPENS',
2: 'AT_MOST_ONCE_A_DAY',
3: 'AT_MOST_ONCE_A_WEEK',
}
DELIVERY = {
1: 'MAIL',
2: 'RSS'
}
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36'
}
def __init__(self, email=None, password=None):
self._log = self._logger()
self._email = email
self._password = password
self._is_authenticated = False
self._state = None
self._session = requests.session()
self._config_bootstrap()
[docs] def _config_bootstrap(self):
"""Go through and establish the defaults on the file system.
The approach here was stolen from the CLI tool provided with the
module. Idea being that the user should not always need to provide a
username and password in order to run the script. If the configuration
file is already present with valid data, then lets use it.
"""
if not os.path.exists(CONFIG_PATH):
os.makedirs(CONFIG_PATH)
if not os.path.exists(CONFIG_FILE):
json.dump(CONFIG_DEFAULTS, open(CONFIG_FILE, 'w'), indent=4,
separators=(',', ': '))
config = CONFIG_DEFAULTS
if self._email and self._password:
# Save the configuration locally to pull later on
config['email'] = self._email
config['password'] = str(obfuscate(self._password, 'store'))
self._log.debug("Caching authentication in config file")
json.dump(config, open(CONFIG_FILE, 'w'), indent=4,
separators=(',', ': '))
else:
# Load the config file and override the class
config = json.load(open(CONFIG_FILE))
if config.get('py2', PY2) != PY2:
raise Exception("Python versions have changed. Please run `setup` again to reconfigure the client.")
if config['email'] and config['password']:
self._email = config['email']
self._password = obfuscate(str(config['password']), 'fetch')
self._log.debug("Loaded authentication from config file")
[docs] def _session_check(self):
"""Attempt to authenticate the user through a session file.
This process is done to avoid having to authenticate the user every
single time. It uses a session file that is saved when a valid session
is captured and then reused. Because sessions can expire, we need to
test the session prior to calling the user authenticated. Right now
that is done with a test string found in an unauthenticated session.
This approach is not an ideal method, but it works.
"""
if not os.path.exists(SESSION_FILE):
self._log.debug("Session file does not exist")
return False
with open(SESSION_FILE, 'rb') as f:
cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
self._session.cookies = cookies
self._log.debug("Loaded cookies from session file")
response = self._session.get(url=self.TEST_URL, headers=self.HEADERS)
if self.TEST_KEY in str(response.content):
self._log.debug("Session file appears invalid")
return False
self._is_authenticated = True
self._process_state()
return True
[docs] def _logger(self):
"""Create a logger to be used between processes.
:returns: Logging instance.
"""
logger = logging.getLogger(self.NAME)
logger.setLevel(self.LOG_LEVEL)
shandler = logging.StreamHandler(sys.stdout)
fmt = '\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():'
fmt += '%(lineno)d %(asctime)s\033[0m| %(message)s'
shandler.setFormatter(logging.Formatter(fmt))
logger.addHandler(shandler)
return logger
[docs] def set_log_level(self, level):
"""Override the default log level of the class"""
if level == 'info':
level = logging.INFO
if level == 'debug':
level = logging.DEBUG
if level == 'error':
level = logging.ERROR
self._log.setLevel(level)
[docs] def _process_state(self):
"""Process the application state configuration.
Google Alerts manages the account information and alert data through
some custom state configuration. Not all values have been completely
enumerated.
"""
self._log.debug("Capturing state from the request")
response = self._session.get(url=self.ALERTS_URL, headers=self.HEADERS)
soup = BeautifulSoup(response.content, "html.parser")
p = re.compile('window.STATE=(.*);')
for i in soup.findAll('script', {'src': False}):
if not p.search(i.string):
continue
try:
match = p.search(i.string)
state = json.loads(match.group(0)[13:-6])
if state != "":
self._state = state
self._log.debug("State value set: %s" % self._state)
except Exception as e:
raise StateParseFailure(
'Google has changed their core protocol and a new parser must be built. ' +
'Please file a bug at https://github.com/9b/google-alerts/issues.'
)
return self._state
def _build_payload(self, term, options):
if 'delivery' not in options:
raise InvalidConfig("`delivery` is required in options.")
region = options.get('region', 'US')
language = options.get('language', 'en')
imatch_type = {v: k for k, v in self.MONITOR_MATCH_TYPE.items()}
monitor_match = imatch_type[options.get('monitor_match', 'ALL')]
ialert_freq = {v: k for k, v in self.ALERT_FREQ.items()}
freq_option = options.get('alert_frequency', 'AT_MOST_ONCE_A_DAY')
freq_option = ialert_freq[freq_option]
if 'alert_frequency' not in options:
options['alert_frequency'] = 'AT_MOST_ONCE_A_DAY'
if options.get('exact', False):
term = "\"%s\"" % term
if options['delivery'] == 'RSS':
payload = [None, [None, None, None, [None, term, "com", [None,
language, region], None, None, None, 0, 1], None,
monitor_match, [[None, 2, "", [], 1, "en-US", None,
None, None, None, None, "0", None, None,
self._state[2]]]]]
else:
if options['alert_frequency'] == 'AT_MOST_ONCE_A_DAY':
payload = [None, [None, None, None, [None, term, "com", [None,
language, region], None, None, None, 0, 1], None,
monitor_match, [[None, 1, self._email, [None, None, 3],
freq_option, "en-US", None, None, None, None, None, "0",
None, None, self._state[2]]]]]
elif options['alert_frequency'] == 'AS_IT_HAPPENS':
payload = [None, [None, None, None, [None, term, "com", [None,
language, region], None, None, None, 0, 1], None,
monitor_match, [[None, 1, self._email, [], freq_option,
"en-US", None, None, None, None, None, "0",
None, None, self._state[2]]]]]
elif options['alert_frequency'] == 'AT_MOST_ONCE_A_WEEK':
payload = [None, [None, None, None, [None, term, "com", [None,
language, region], None, None, None, 0, 1], None,
monitor_match, [[None, 1, self._email, [None, None, 0, 3],
freq_option, "en-US", None, None, None, None, None, "0",
None, None, self._state[2]]]]]
if options.get('action') == 'MODIFY':
payload.insert(1, options.get('monitor_id'))
if 'rss_id' in options:
payload[2][6][0][11] = options['rss_id'].split('/')[-1]
return payload
[docs] def authenticate(self):
"""Authenticate the user and setup our state."""
valid = self._session_check()
if self._is_authenticated and valid:
self._log.debug("[!] User has already authenticated")
return
init = self._session.get(url=self.LOGIN_URL, headers=self.HEADERS)
soup = BeautifulSoup(init.content, "html.parser")
soup_login = soup.find('form').find_all('input')
post_data = dict()
for u in soup_login:
if u.has_attr('name') and u.has_attr('value'):
post_data[u['name']] = u['value']
post_data['Email'] = self._email
post_data['Passwd'] = self._password
response = self._session.post(url=self.AUTH_URL, data=post_data,
headers=self.HEADERS)
if self.CAPTCHA_KEY in str(response.content):
raise AccountCaptcha('Google is forcing a CAPTCHA. To get around this issue, run the google-alerts with the seed option to open an interactive authentication session. Once authenticated, this module will cache your session and load that in the future')
cookies = [x.name for x in response.cookies]
if 'SIDCC' not in cookies:
raise InvalidCredentials("Email or password was incorrect.")
with open(SESSION_FILE, 'wb') as f:
cookies = requests.utils.dict_from_cookiejar(self._session.cookies)
pickle.dump(cookies, f, protocol=2)
self._log.debug("Saved session to disk for future reference")
self._log.debug("User successfully authenticated")
self._is_authenticated = True
self._process_state()
return
[docs] def list(self, term=None):
"""List alerts configured for the account.
At the time of processing, here are several state examples:
- ['062bc676ab9e9d9b:5a96b75728adb9d4:com:en:US', [None, None, ['email_aih_all', 'com', ['en', 'US'], None, None, None, False], None, 2, [[1, 'XXX@gmail.com', [], 1, 'en-US', 1, None, None, None, None, '7290377213681086747', None, None, 'AB2Xq4g1vxP5nJCT4SVMp8-8CeYubB7G0yQdZnM']]], '06449491676132715360']
- ['062bc676ab9e9d9b:eb34fff1681232ae:com:en:US', [None, None, ['email_aih_best', 'com', ['en', 'US'], None, None, None, False], None, 3, [[1, 'XXX@gmail.com', [], 1, 'en-US', 1, None, None, None, None, '11048899972761343896', None, None, 'AB2Xq4ibeyRSs4e6CQEjGTYWRyQgHftJgjkGmdE']]], '06449491676132715360']
- ['062bc676ab9e9d9b:029a12ab092e4d48:com:en:US', [None, None, ['email_d_all', 'com', ['en', 'US'], None, None, None, False], None, 2, [[1, 'XXX@gmail.com', [None, 18], 2, 'en-US', 1, None, None, None, None, '13677540305540568185', None, None, 'AB2Xq4iqyPDNCX_G_ZahmtXr3Ev1Xxk71J3A9o8']]], '06449491676132715360']
- ['062bc676ab9e9d9b:be633f8e2d769ed1:com:en:US', [None, None, ['email_d_best', 'com', ['en', 'US'], None, None, None, False], None, 3, [[1, 'XXX@gmail.com', [None, 18], 2, 'en-US', 1, None, None, None, None, '3165773263851675895', None, None, 'AB2Xq4gAyl3SR-5AKh3NstCHFf3I5tOCH_8Te98']]], '06449491676132715360']
- ['062bc676ab9e9d9b:4064fca73997bea1:com:en:US', [None, None, ['email_w_all', 'com', ['en', 'US'], None, None, None, False], None, 2, [[1, 'XXX@gmail.com', [None, 18, 0], 3, 'en-US', 1, None, None, None, None, '1277526588871069988', None, None, 'AB2Xq4jNqRCDJaqIvPfZTI6Sos2MMPb5q_6jS14']]], '06449491676132715360']
- ['062bc676ab9e9d9b:ed3adf6fd0968cb0:com:en:US', [None, None, ['email_w_best', 'com', ['en', 'US'], None, None, None, False], None, 3, [[1, 'XXX@gmail.com', [None, 18, 0], 3, 'en-US', 1, None, None, None, None, '11943490843312281977', None, None, 'AB2Xq4gvnjg6s07wCxTs4Ag8_6uOC0u9-7Aiu8E']]], '06449491676132715360']
- ['062bc676ab9e9d9b:a92eace4d0488209:com:en:US', [None, None, ['rss_aih_best', 'com', ['en', 'US'], None, None, None, False], None, 3, [[2, '', [], 1, 'en-US', 1, None, None, None, None, '10457927733922767031', None, None, 'AB2Xq4jZ1IPZLS44ZpaXYn8Fh46euu8_so_2k7k']]], '06449491676132715360']
- ['062bc676ab9e9d9b:ac4752c338e8c363:com:en:US', [None, None, ['rss_all', 'com', ['en', 'US'], None, None, None, False], None, 2, [[2, '', [], 1, 'en-US', 1, None, None, None, None, '17387577876633356534', None, None, 'AB2Xq4h1wQcVxLfb0s835KmJWdw7bfUzzwpjUrg']]], '06449491676132715360']
"""
if not self._state:
raise InvalidState("State was not properly obtained from the app")
self._process_state()
if not self._state[0]:
self._log.info("No monitors have been created yet.")
return list()
monitors = list()
try:
for monitor in self._state[0][0]:
obj = dict()
obj['monitor_id'] = monitor[0]
obj['user_id'] = monitor[-1]
obj['term'] = monitor[1][2][0]
if term and obj['term'] != term:
continue
obj['language'] = monitor[1][2][2][0]
obj['region'] = monitor[1][2][2][1]
obj['delivery'] = self.DELIVERY[monitor[1][5][0][0]]
obj['match_type'] = self.MONITOR_MATCH_TYPE[monitor[1][4]]
if obj['delivery'] == 'MAIL':
obj['alert_frequency'] = self.ALERT_FREQ[monitor[1][5][0][3]]
obj['email_address'] = monitor[1][5][0][1]
else:
rss_id = monitor[1][5][0][10]
url = "https://google.com/alerts/feeds/{uid}/{fid}"
obj['rss_link'] = url.format(uid=obj['user_id'], fid=rss_id)
monitors.append(obj)
except Exception as e:
raise StateParseFailure("Observed state differs from parser. Please file a bug at https://github.com/9b/google-alerts/issues.")
return monitors
[docs] def create(self, term, options):
"""Create a monitor using passed configuration."""
if not self._state:
raise InvalidState("State was not properly obtained from the app")
options['action'] = 'CREATE'
payload = self._build_payload(term, options)
url = self.ALERTS_CREATE_URL.format(requestX=self._state[2])
self._log.debug("Creating alert using: %s" % url)
params = json.dumps(payload, separators=(',', ':'))
data = {'params': params}
response = self._session.post(url, data=data, headers=self.HEADERS)
if response.status_code != 200:
raise ActionError("Failed to create monitor: %s"
% response.content)
if options.get('exact', False):
term = "\"%s\"" % term
return self.list(term)
[docs] def modify(self, monitor_id, options):
"""Create a monitor using passed configuration."""
if not self._state:
raise InvalidState("State was not properly obtained from the app")
monitors = self.list() # Get the latest set of monitors
obj = None
for monitor in monitors:
if monitor_id != monitor['monitor_id']:
continue
obj = monitor
if not monitor_id:
raise MonitorNotFound("No monitor was found with that term.")
options['action'] = 'MODIFY'
options.update(obj)
payload = self._build_payload(obj['term'], options)
url = self.ALERTS_MODIFY_URL.format(requestX=self._state[2])
self._log.debug("Modifying alert using: %s" % url)
params = json.dumps(payload, separators=(',', ':'))
data = {'params': params}
response = self._session.post(url, data=data, headers=self.HEADERS)
if response.status_code != 200:
raise ActionError("Failed to create monitor: %s"
% response.content)
return self.list()
[docs] def delete(self, monitor_id):
"""Delete a monitor by ID."""
if not self._state:
raise InvalidState("State was not properly obtained from the app")
monitors = self.list() # Get the latest set of monitors
bit = None
for monitor in monitors:
if monitor_id != monitor['monitor_id']:
continue
bit = monitor['monitor_id']
if not bit:
raise MonitorNotFound("No monitor was found with that term.")
url = self.ALERTS_DELETE_URL.format(requestX=self._state[2])
self._log.debug("Deleting alert using: %s" % url)
payload = [None, monitor_id]
params = json.dumps(payload, separators=(',', ':'))
data = {'params': params}
response = self._session.post(url, data=data, headers=self.HEADERS)
if response.status_code != 200:
raise ActionError("Failed to delete by ID: %s"
% response.content)
return True
[docs] def delete_by_term(self, term):
"""Delete an alert by term."""
if not self._state:
raise InvalidState("State was not properly obtained from the app")
monitors = self.list() # Get the latest set of monitors
monitor_id = None
for monitor in monitors:
if term != monitor['term']:
continue
monitor_id = monitor['monitor_id']
if not monitor_id:
raise MonitorNotFound("No monitor was found with that term.")
url = self.ALERTS_DELETE_URL.format(requestX=self._state[2])
self._log.debug("Deleting alert using: %s" % url)
payload = [None, monitor_id]
params = json.dumps(payload, separators=(',', ':'))
data = {'params': params}
response = self._session.post(url, data=data, headers=self.HEADERS)
if response.status_code != 200:
raise ActionError("Failed to delete by term: %s"
% response.content)
return True