]> git.rmz.io Git - dotfiles.git/blob - mutt/mutt_oauth2.py
nvim: add FPP copyright snippet
[dotfiles.git] / mutt / mutt_oauth2.py
1 #!/usr/bin/env python3
2 #
3 # Mutt OAuth2 token management script, version 2020-08-07
4 # Written against python 3.7.3, not tried with earlier python versions.
5 #
6 # Copyright (C) 2020 Alexander Perlis
7 #
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License as
10 # published by the Free Software Foundation; either version 2 of the
11 # License, or (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 # 02110-1301, USA.
22 '''Mutt OAuth2 token management'''
23
24 import sys
25 import json
26 import argparse
27 import urllib.parse
28 import urllib.request
29 import imaplib
30 import poplib
31 import smtplib
32 import base64
33 import secrets
34 import hashlib
35 import time
36 from datetime import timedelta, datetime
37 from pathlib import Path
38 import socket
39 import http.server
40 import subprocess
41
42 # The token file must be encrypted because it contains multi-use bearer tokens
43 # whose usage does not require additional verification. Specify whichever
44 # encryption and decryption pipes you prefer. They should read from standard
45 # input and write to standard output. The example values here invoke GPG,
46 # although won't work until an appropriate identity appears in the first line.
47 ENCRYPTION_PIPE = ['gpg', '--encrypt', '--recipient', 'me@rmz.io']
48 DECRYPTION_PIPE = ['gpg', '--decrypt']
49
50 registrations = {
51 'google': {
52 'authorize_endpoint': 'https://accounts.google.com/o/oauth2/auth',
53 'devicecode_endpoint': 'https://oauth2.googleapis.com/device/code',
54 'token_endpoint': 'https://accounts.google.com/o/oauth2/token',
55 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
56 'imap_endpoint': 'imap.gmail.com',
57 'pop_endpoint': 'pop.gmail.com',
58 'smtp_endpoint': 'smtp.gmail.com',
59 'sasl_method': 'OAUTHBEARER',
60 'scope': 'https://mail.google.com/',
61 'client_id': '406964657835-aq8lmia8j95dhl1a2bvharmfk3t1hgqj.apps.googleusercontent.com',
62 'client_secret': 'kSmqreRr0qwBWJgbf5Y-PjSU',
63 },
64 'microsoft': {
65 'authorize_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize',
66 'devicecode_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/devicecode',
67 'token_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/token',
68 'redirect_uri': 'https://login.microsoftonline.com/common/oauth2/nativeclient',
69 'tenant': 'common',
70 'imap_endpoint': 'outlook.office365.com',
71 'pop_endpoint': 'outlook.office365.com',
72 'smtp_endpoint': 'smtp.office365.com',
73 'sasl_method': 'XOAUTH2',
74 'scope': ('offline_access https://outlook.office.com/IMAP.AccessAsUser.All '
75 'https://outlook.office.com/POP.AccessAsUser.All '
76 'https://outlook.office.com/SMTP.Send'),
77 'client_id': '08162f7c-0fd2-4200-a84a-f25a4db0b584',
78 'client_secret': 'TxRBilcHdC6WGBee]fs?QR:SJ8nI[g82',
79 },
80 }
81
82 ap = argparse.ArgumentParser(epilog='''
83 This script obtains and prints a valid OAuth2 access token. State is maintained in an
84 encrypted TOKENFILE. Run with "--verbose --authorize" to get started or whenever all
85 tokens have expired, optionally with "--authflow" to override the default authorization
86 flow. To truly start over from scratch, first delete TOKENFILE. Use "--verbose --test"
87 to test the IMAP/POP/SMTP endpoints.
88 ''')
89 ap.add_argument('-v', '--verbose', action='store_true', help='increase verbosity')
90 ap.add_argument('-d', '--debug', action='store_true', help='enable debug output')
91 ap.add_argument('tokenfile', help='persistent token storage')
92 ap.add_argument('-a', '--authorize', action='store_true', help='manually authorize new tokens')
93 ap.add_argument('--authflow', help='authcode | localhostauthcode | devicecode')
94 ap.add_argument('-t', '--test', action='store_true', help='test IMAP/POP/SMTP endpoints')
95 args = ap.parse_args()
96
97 token = {}
98 path = Path(args.tokenfile)
99 if path.exists():
100 if 0o777 & path.stat().st_mode != 0o600:
101 sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
102 try:
103 sub = subprocess.run(DECRYPTION_PIPE, check=True, input=path.read_bytes(),
104 capture_output=True)
105 token = json.loads(sub.stdout)
106 except subprocess.CalledProcessError:
107 sys.exit('Difficulty decrypting token file. Is your decryption agent primed for '
108 'non-interactive usage, or an appropriate environment variable such as '
109 'GPG_TTY set to allow interactive agent usage from inside a pipe?')
110
111
112 def writetokenfile():
113 '''Writes global token dictionary into token file.'''
114 if not path.exists():
115 path.touch(mode=0o600)
116 if 0o777 & path.stat().st_mode != 0o600:
117 sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
118 sub2 = subprocess.run(ENCRYPTION_PIPE, check=True, input=json.dumps(token).encode(),
119 capture_output=True)
120 path.write_bytes(sub2.stdout)
121
122
123 if args.debug:
124 print('Obtained from token file:', json.dumps(token))
125 if not token:
126 if not args.authorize:
127 sys.exit('You must run script with "--authorize" at least once.')
128 print('Available app and endpoint registrations:', *registrations)
129 token['registration'] = input('OAuth2 registration: ')
130 token['authflow'] = input('Preferred OAuth2 flow ("authcode" or "localhostauthcode" '
131 'or "devicecode"): ')
132 token['email'] = input('Account e-mail address: ')
133 token['access_token'] = ''
134 token['access_token_expiration'] = ''
135 token['refresh_token'] = ''
136 writetokenfile()
137
138 if token['registration'] not in registrations:
139 sys.exit(f'ERROR: Unknown registration "{token["registration"]}". Delete token file '
140 f'and start over.')
141 registration = registrations[token['registration']]
142
143 authflow = token['authflow']
144 if args.authflow:
145 authflow = args.authflow
146
147 baseparams = {'client_id': registration['client_id']}
148 # Microsoft uses 'tenant' but Google does not
149 if 'tenant' in registration:
150 baseparams['tenant'] = registration['tenant']
151
152
153 def access_token_valid():
154 '''Returns True when stored access token exists and is still valid at this time.'''
155 token_exp = token['access_token_expiration']
156 return token_exp and datetime.now() < datetime.fromisoformat(token_exp)
157
158
159 def update_tokens(r):
160 '''Takes a response dictionary, extracts tokens out of it, and updates token file.'''
161 token['access_token'] = r['access_token']
162 token['access_token_expiration'] = (datetime.now() +
163 timedelta(seconds=int(r['expires_in']))).isoformat()
164 if 'refresh_token' in r:
165 token['refresh_token'] = r['refresh_token']
166 writetokenfile()
167 if args.verbose:
168 print(f'NOTICE: Obtained new access token, expires {token["access_token_expiration"]}.')
169
170
171 if args.authorize:
172 p = baseparams.copy()
173 p['scope'] = registration['scope']
174
175 if authflow in ('authcode', 'localhostauthcode'):
176 verifier = secrets.token_urlsafe(90)
177 challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())[:-1]
178 redirect_uri = registration['redirect_uri']
179 listen_port = 0
180 if authflow == 'localhostauthcode':
181 # Find an available port to listen on
182 s = socket.socket()
183 s.bind(('127.0.0.1', 0))
184 listen_port = s.getsockname()[1]
185 s.close()
186 redirect_uri = 'http://localhost:'+str(listen_port)+'/'
187 # Probably should edit the port number into the actual redirect URL.
188
189 p.update({'login_hint': token['email'],
190 'response_type': 'code',
191 'redirect_uri': redirect_uri,
192 'code_challenge': challenge,
193 'code_challenge_method': 'S256'})
194 print(registration["authorize_endpoint"] + '?' +
195 urllib.parse.urlencode(p, quote_via=urllib.parse.quote))
196
197 authcode = ''
198 if authflow == 'authcode':
199 authcode = input('Visit displayed URL to retrieve authorization code. Enter '
200 'code from server (might be in browser address bar): ')
201 else:
202 print('Visit displayed URL to authorize this application. Waiting...',
203 end='', flush=True)
204
205 class MyHandler(http.server.BaseHTTPRequestHandler):
206 '''Handles the browser query resulting from redirect to redirect_uri.'''
207
208 # pylint: disable=C0103
209 def do_HEAD(self):
210 '''Response to a HEAD requests.'''
211 self.send_response(200)
212 self.send_header('Content-type', 'text/html')
213 self.end_headers()
214
215 def do_GET(self):
216 '''For GET request, extract code parameter from URL.'''
217 # pylint: disable=W0603
218 global authcode
219 querystring = urllib.parse.urlparse(self.path).query
220 querydict = urllib.parse.parse_qs(querystring)
221 if 'code' in querydict:
222 authcode = querydict['code'][0]
223 self.do_HEAD()
224 self.wfile.write(b'<html><head><title>Authorizaton result</title></head>')
225 self.wfile.write(b'<body><p>Authorization redirect completed. You may '
226 b'close this window.</p></body></html>')
227 with http.server.HTTPServer(('127.0.0.1', listen_port), MyHandler) as httpd:
228 try:
229 httpd.handle_request()
230 except KeyboardInterrupt:
231 pass
232
233 if not authcode:
234 sys.exit('Did not obtain an authcode.')
235
236 for k in 'response_type', 'login_hint', 'code_challenge', 'code_challenge_method':
237 del p[k]
238 p.update({'grant_type': 'authorization_code',
239 'code': authcode,
240 'client_secret': registration['client_secret'],
241 'code_verifier': verifier})
242 try:
243 response = urllib.request.urlopen(registration['token_endpoint'],
244 urllib.parse.urlencode(p).encode())
245 except urllib.error.HTTPError as err:
246 print(err.code, err.reason)
247 response = err
248 response = response.read()
249 if args.debug:
250 print(response)
251 response = json.loads(response)
252 if 'error' in response:
253 print(response['error'])
254 if 'error_description' in response:
255 print(response['error_description'])
256 sys.exit(1)
257
258 elif authflow == 'devicecode':
259 try:
260 response = urllib.request.urlopen(registration['devicecode_endpoint'],
261 urllib.parse.urlencode(p).encode())
262 except urllib.error.HTTPError as err:
263 print(err.code, err.reason)
264 response = err
265 response = response.read()
266 if args.debug:
267 print(response)
268 response = json.loads(response)
269 if 'error' in response:
270 print(response['error'])
271 if 'error_description' in response:
272 print(response['error_description'])
273 sys.exit(1)
274 print(response['message'])
275 del p['scope']
276 p.update({'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
277 'client_secret': registration['client_secret'],
278 'device_code': response['device_code']})
279 interval = int(response['interval'])
280 print('Polling...', end='', flush=True)
281 while True:
282 time.sleep(interval)
283 print('.', end='', flush=True)
284 try:
285 response = urllib.request.urlopen(registration['token_endpoint'],
286 urllib.parse.urlencode(p).encode())
287 except urllib.error.HTTPError as err:
288 # Not actually always an error, might just mean "keep trying..."
289 response = err
290 response = response.read()
291 if args.debug:
292 print(response)
293 response = json.loads(response)
294 if 'error' not in response:
295 break
296 if response['error'] == 'authorization_declined':
297 print(' user declined authorization.')
298 sys.exit(1)
299 if response['error'] == 'expired_token':
300 print(' too much time has elapsed.')
301 sys.exit(1)
302 if response['error'] != 'authorization_pending':
303 print(response['error'])
304 if 'error_description' in response:
305 print(response['error_description'])
306 sys.exit(1)
307 print()
308
309 else:
310 sys.exit(f'ERROR: Unknown OAuth2 flow "{token["authflow"]}. Delete token file and '
311 f'start over.')
312
313 update_tokens(response)
314
315
316 if not access_token_valid():
317 if args.verbose:
318 print('NOTICE: Invalid or expired access token; using refresh token '
319 'to obtain new access token.')
320 if not token['refresh_token']:
321 sys.exit('ERROR: No refresh token. Run script with "--authorize".')
322 p = baseparams.copy()
323 p.update({'client_secret': registration['client_secret'],
324 'refresh_token': token['refresh_token'],
325 'grant_type': 'refresh_token'})
326 try:
327 response = urllib.request.urlopen(registration['token_endpoint'],
328 urllib.parse.urlencode(p).encode())
329 except urllib.error.HTTPError as err:
330 print(err.code, err.reason)
331 response = err
332 response = response.read()
333 if args.debug:
334 print(response)
335 response = json.loads(response)
336 if 'error' in response:
337 print(response['error'])
338 if 'error_description' in response:
339 print(response['error_description'])
340 print('Perhaps refresh token invalid. Try running once with "--authorize"')
341 sys.exit(1)
342 update_tokens(response)
343
344
345 if not access_token_valid():
346 sys.exit('ERROR: No valid access token. This should not be able to happen.')
347
348
349 if args.verbose:
350 print('Access Token: ', end='')
351 print(token['access_token'])
352
353
354 def build_sasl_string(user, host, port, bearer_token):
355 '''Build appropriate SASL string, which depends on cloud server's supported SASL method.'''
356 if registration['sasl_method'] == 'OAUTHBEARER':
357 return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {bearer_token}\1\1'
358 if registration['sasl_method'] == 'XOAUTH2':
359 return f'user={user}\1auth=Bearer {bearer_token}\1\1'
360 sys.exit(f'Unknown SASL method {registration["sasl_method"]}.')
361
362
363 if args.test:
364 errors = False
365
366 imap_conn = imaplib.IMAP4_SSL(registration['imap_endpoint'])
367 sasl_string = build_sasl_string(token['email'], registration['imap_endpoint'], 993,
368 token['access_token'])
369 if args.debug:
370 imap_conn.debug = 4
371 try:
372 imap_conn.authenticate(registration['sasl_method'], lambda _: sasl_string.encode())
373 # Microsoft has a bug wherein a mismatch between username and token can still report a
374 # successful login... (Try a consumer login with the token from a work/school account.)
375 # Fortunately subsequent commands fail with an error. Thus we follow AUTH with another
376 # IMAP command before reporting success.
377 imap_conn.list()
378 if args.verbose:
379 print('IMAP authentication succeeded')
380 except imaplib.IMAP4.error as e:
381 print('IMAP authentication FAILED (does your account allow IMAP?):', e)
382 errors = True
383
384 pop_conn = poplib.POP3_SSL(registration['pop_endpoint'])
385 sasl_string = build_sasl_string(token['email'], registration['pop_endpoint'], 995,
386 token['access_token'])
387 if args.debug:
388 pop_conn.set_debuglevel(2)
389 try:
390 # poplib doesn't have an auth command taking an authenticator object
391 # Microsoft requires a two-line SASL for POP
392 # pylint: disable=W0212
393 pop_conn._shortcmd('AUTH ' + registration['sasl_method'])
394 pop_conn._shortcmd(base64.standard_b64encode(sasl_string.encode()).decode())
395 if args.verbose:
396 print('POP authentication succeeded')
397 except poplib.error_proto as e:
398 print('POP authentication FAILED (does your account allow POP?):', e.args[0].decode())
399 errors = True
400
401 # SMTP_SSL would be simpler but Microsoft does not answer on port 465.
402 smtp_conn = smtplib.SMTP(registration['smtp_endpoint'], 587)
403 sasl_string = build_sasl_string(token['email'], registration['smtp_endpoint'], 587,
404 token['access_token'])
405 smtp_conn.ehlo('test')
406 smtp_conn.starttls()
407 smtp_conn.ehlo('test')
408 if args.debug:
409 smtp_conn.set_debuglevel(2)
410 try:
411 smtp_conn.auth(registration['sasl_method'], lambda _=None: sasl_string)
412 if args.verbose:
413 print('SMTP authentication succeeded')
414 except smtplib.SMTPAuthenticationError as e:
415 print('SMTP authentication FAILED:', e)
416 errors = True
417
418 if errors:
419 sys.exit(1)