3 # Mutt OAuth2 token management script, version 2020-08-07
4 # Written against python 3.7.3, not tried with earlier python versions.
6 # Copyright (C) 2020 Alexander Perlis
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License as
10 # published by the Free Software Foundation; either version 2 of the
11 # License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 '''Mutt OAuth2 token management'''
36 from datetime
import timedelta
, datetime
37 from pathlib
import Path
42 # The token file must be encrypted because it contains multi-use bearer tokens
43 # whose usage does not require additional verification. Specify whichever
44 # encryption and decryption pipes you prefer. They should read from standard
45 # input and write to standard output. The example values here invoke GPG,
46 # although won't work until an appropriate identity appears in the first line.
47 ENCRYPTION_PIPE
= ['gpg', '--encrypt', '--recipient', 'me@rmz.io']
48 DECRYPTION_PIPE
= ['gpg', '--decrypt']
52 'authorize_endpoint': 'https://accounts.google.com/o/oauth2/auth',
53 'devicecode_endpoint': 'https://oauth2.googleapis.com/device/code',
54 'token_endpoint': 'https://accounts.google.com/o/oauth2/token',
55 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
56 'imap_endpoint': 'imap.gmail.com',
57 'pop_endpoint': 'pop.gmail.com',
58 'smtp_endpoint': 'smtp.gmail.com',
59 'sasl_method': 'OAUTHBEARER',
60 'scope': 'https://mail.google.com/',
61 'client_id': '406964657835-aq8lmia8j95dhl1a2bvharmfk3t1hgqj.apps.googleusercontent.com',
62 'client_secret': 'kSmqreRr0qwBWJgbf5Y-PjSU',
65 'authorize_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize',
66 'devicecode_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/devicecode',
67 'token_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/token',
68 'redirect_uri': 'https://login.microsoftonline.com/common/oauth2/nativeclient',
70 'imap_endpoint': 'outlook.office365.com',
71 'pop_endpoint': 'outlook.office365.com',
72 'smtp_endpoint': 'smtp.office365.com',
73 'sasl_method': 'XOAUTH2',
74 'scope': ('offline_access https://outlook.office.com/IMAP.AccessAsUser.All '
75 'https://outlook.office.com/POP.AccessAsUser.All '
76 'https://outlook.office.com/SMTP.Send'),
77 'client_id': '08162f7c-0fd2-4200-a84a-f25a4db0b584',
78 'client_secret': 'TxRBilcHdC6WGBee]fs?QR:SJ8nI[g82',
82 ap
= argparse
.ArgumentParser(epilog
='''
83 This script obtains and prints a valid OAuth2 access token. State is maintained in an
84 encrypted TOKENFILE. Run with "--verbose --authorize" to get started or whenever all
85 tokens have expired, optionally with "--authflow" to override the default authorization
86 flow. To truly start over from scratch, first delete TOKENFILE. Use "--verbose --test"
87 to test the IMAP/POP/SMTP endpoints.
89 ap
.add_argument('-v', '--verbose', action
='store_true', help='increase verbosity')
90 ap
.add_argument('-d', '--debug', action
='store_true', help='enable debug output')
91 ap
.add_argument('tokenfile', help='persistent token storage')
92 ap
.add_argument('-a', '--authorize', action
='store_true', help='manually authorize new tokens')
93 ap
.add_argument('--authflow', help='authcode | localhostauthcode | devicecode')
94 ap
.add_argument('-t', '--test', action
='store_true', help='test IMAP/POP/SMTP endpoints')
95 args
= ap
.parse_args()
98 path
= Path(args
.tokenfile
)
100 if 0o777 & path
.stat().st_mode
!= 0o600:
101 sys
.exit('Token file has unsafe mode. Suggest deleting and starting over.')
103 sub
= subprocess
.run(DECRYPTION_PIPE
, check
=True, input=path
.read_bytes(),
105 token
= json
.loads(sub
.stdout
)
106 except subprocess
.CalledProcessError
:
107 sys
.exit('Difficulty decrypting token file. Is your decryption agent primed for '
108 'non-interactive usage, or an appropriate environment variable such as '
109 'GPG_TTY set to allow interactive agent usage from inside a pipe?')
112 def writetokenfile():
113 '''Writes global token dictionary into token file.'''
114 if not path
.exists():
115 path
.touch(mode
=0o600)
116 if 0o777 & path
.stat().st_mode
!= 0o600:
117 sys
.exit('Token file has unsafe mode. Suggest deleting and starting over.')
118 sub2
= subprocess
.run(ENCRYPTION_PIPE
, check
=True, input=json
.dumps(token
).encode(),
120 path
.write_bytes(sub2
.stdout
)
124 print('Obtained from token file:', json
.dumps(token
))
126 if not args
.authorize
:
127 sys
.exit('You must run script with "--authorize" at least once.')
128 print('Available app and endpoint registrations:', *registrations
)
129 token
['registration'] = input('OAuth2 registration: ')
130 token
['authflow'] = input('Preferred OAuth2 flow ("authcode" or "localhostauthcode" '
131 'or "devicecode"): ')
132 token
['email'] = input('Account e-mail address: ')
133 token
['access_token'] = ''
134 token
['access_token_expiration'] = ''
135 token
['refresh_token'] = ''
138 if token
['registration'] not in registrations
:
139 sys
.exit(f
'ERROR: Unknown registration "{token["registration"]}". Delete token file '
141 registration
= registrations
[token
['registration']]
143 authflow
= token
['authflow']
145 authflow
= args
.authflow
147 baseparams
= {'client_id': registration['client_id']}
148 # Microsoft uses 'tenant' but Google does not
149 if 'tenant' in registration
:
150 baseparams
['tenant'] = registration
['tenant']
153 def access_token_valid():
154 '''Returns True when stored access token exists and is still valid at this time.'''
155 token_exp
= token
['access_token_expiration']
156 return token_exp
and datetime
.now() < datetime
.fromisoformat(token_exp
)
159 def update_tokens(r
):
160 '''Takes a response dictionary, extracts tokens out of it, and updates token file.'''
161 token
['access_token'] = r
['access_token']
162 token
['access_token_expiration'] = (datetime
.now() +
163 timedelta(seconds
=int(r
['expires_in']))).isoformat()
164 if 'refresh_token' in r
:
165 token
['refresh_token'] = r
['refresh_token']
168 print(f
'NOTICE: Obtained new access token, expires {token["access_token_expiration"]}.')
172 p
= baseparams
.copy()
173 p
['scope'] = registration
['scope']
175 if authflow
in ('authcode', 'localhostauthcode'):
176 verifier
= secrets
.token_urlsafe(90)
177 challenge
= base64
.urlsafe_b64encode(hashlib
.sha256(verifier
.encode()).digest())[:-1]
178 redirect_uri
= registration
['redirect_uri']
180 if authflow
== 'localhostauthcode':
181 # Find an available port to listen on
183 s
.bind(('127.0.0.1', 0))
184 listen_port
= s
.getsockname()[1]
186 redirect_uri
= 'http://localhost:'+str(listen_port
)+'/'
187 # Probably should edit the port number into the actual redirect URL.
189 p
.update({'login_hint': token
['email'],
190 'response_type': 'code',
191 'redirect_uri': redirect_uri
,
192 'code_challenge': challenge
,
193 'code_challenge_method': 'S256'})
194 print(registration
["authorize_endpoint"] + '?' +
195 urllib
.parse
.urlencode(p
, quote_via
=urllib
.parse
.quote
))
198 if authflow
== 'authcode':
199 authcode
= input('Visit displayed URL to retrieve authorization code. Enter '
200 'code from server (might be in browser address bar): ')
202 print('Visit displayed URL to authorize this application. Waiting...',
205 class MyHandler(http
.server
.BaseHTTPRequestHandler
):
206 '''Handles the browser query resulting from redirect to redirect_uri.'''
208 # pylint: disable=C0103
210 '''Response to a HEAD requests.'''
211 self
.send_response(200)
212 self
.send_header('Content-type', 'text/html')
216 '''For GET request, extract code parameter from URL.'''
217 # pylint: disable=W0603
219 querystring
= urllib
.parse
.urlparse(self
.path
).query
220 querydict
= urllib
.parse
.parse_qs(querystring
)
221 if 'code' in querydict
:
222 authcode
= querydict
['code'][0]
224 self
.wfile
.write(b
'<html><head><title>Authorizaton result</title></head>')
225 self
.wfile
.write(b
'<body><p>Authorization redirect completed. You may '
226 b
'close this window.</p></body></html>')
227 with http
.server
.HTTPServer(('127.0.0.1', listen_port
), MyHandler
) as httpd
:
229 httpd
.handle_request()
230 except KeyboardInterrupt:
234 sys
.exit('Did not obtain an authcode.')
236 for k
in 'response_type', 'login_hint', 'code_challenge', 'code_challenge_method':
238 p
.update({'grant_type': 'authorization_code',
240 'client_secret': registration
['client_secret'],
241 'code_verifier': verifier
})
243 response
= urllib
.request
.urlopen(registration
['token_endpoint'],
244 urllib
.parse
.urlencode(p
).encode())
245 except urllib
.error
.HTTPError
as err
:
246 print(err
.code
, err
.reason
)
248 response
= response
.read()
251 response
= json
.loads(response
)
252 if 'error' in response
:
253 print(response
['error'])
254 if 'error_description' in response
:
255 print(response
['error_description'])
258 elif authflow
== 'devicecode':
260 response
= urllib
.request
.urlopen(registration
['devicecode_endpoint'],
261 urllib
.parse
.urlencode(p
).encode())
262 except urllib
.error
.HTTPError
as err
:
263 print(err
.code
, err
.reason
)
265 response
= response
.read()
268 response
= json
.loads(response
)
269 if 'error' in response
:
270 print(response
['error'])
271 if 'error_description' in response
:
272 print(response
['error_description'])
274 print(response
['message'])
276 p
.update({'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
277 'client_secret': registration
['client_secret'],
278 'device_code': response
['device_code']})
279 interval
= int(response
['interval'])
280 print('Polling...', end
='', flush
=True)
283 print('.', end
='', flush
=True)
285 response
= urllib
.request
.urlopen(registration
['token_endpoint'],
286 urllib
.parse
.urlencode(p
).encode())
287 except urllib
.error
.HTTPError
as err
:
288 # Not actually always an error, might just mean "keep trying..."
290 response
= response
.read()
293 response
= json
.loads(response
)
294 if 'error' not in response
:
296 if response
['error'] == 'authorization_declined':
297 print(' user declined authorization.')
299 if response
['error'] == 'expired_token':
300 print(' too much time has elapsed.')
302 if response
['error'] != 'authorization_pending':
303 print(response
['error'])
304 if 'error_description' in response
:
305 print(response
['error_description'])
310 sys
.exit(f
'ERROR: Unknown OAuth2 flow "{token["authflow"]}. Delete token file and '
313 update_tokens(response
)
316 if not access_token_valid():
318 print('NOTICE: Invalid or expired access token; using refresh token '
319 'to obtain new access token.')
320 if not token
['refresh_token']:
321 sys
.exit('ERROR: No refresh token. Run script with "--authorize".')
322 p
= baseparams
.copy()
323 p
.update({'client_secret': registration
['client_secret'],
324 'refresh_token': token
['refresh_token'],
325 'grant_type': 'refresh_token'})
327 response
= urllib
.request
.urlopen(registration
['token_endpoint'],
328 urllib
.parse
.urlencode(p
).encode())
329 except urllib
.error
.HTTPError
as err
:
330 print(err
.code
, err
.reason
)
332 response
= response
.read()
335 response
= json
.loads(response
)
336 if 'error' in response
:
337 print(response
['error'])
338 if 'error_description' in response
:
339 print(response
['error_description'])
340 print('Perhaps refresh token invalid. Try running once with "--authorize"')
342 update_tokens(response
)
345 if not access_token_valid():
346 sys
.exit('ERROR: No valid access token. This should not be able to happen.')
350 print('Access Token: ', end
='')
351 print(token
['access_token'])
354 def build_sasl_string(user
, host
, port
, bearer_token
):
355 '''Build appropriate SASL string, which depends on cloud server's supported SASL method.'''
356 if registration
['sasl_method'] == 'OAUTHBEARER':
357 return f
'n,a={user},\1host={host}\1port={port}\1auth=Bearer {bearer_token}\1\1'
358 if registration
['sasl_method'] == 'XOAUTH2':
359 return f
'user={user}\1auth=Bearer {bearer_token}\1\1'
360 sys
.exit(f
'Unknown SASL method {registration["sasl_method"]}.')
366 imap_conn
= imaplib
.IMAP4_SSL(registration
['imap_endpoint'])
367 sasl_string
= build_sasl_string(token
['email'], registration
['imap_endpoint'], 993,
368 token
['access_token'])
372 imap_conn
.authenticate(registration
['sasl_method'], lambda _
: sasl_string
.encode())
373 # Microsoft has a bug wherein a mismatch between username and token can still report a
374 # successful login... (Try a consumer login with the token from a work/school account.)
375 # Fortunately subsequent commands fail with an error. Thus we follow AUTH with another
376 # IMAP command before reporting success.
379 print('IMAP authentication succeeded')
380 except imaplib
.IMAP4
.error
as e
:
381 print('IMAP authentication FAILED (does your account allow IMAP?):', e
)
384 pop_conn
= poplib
.POP3_SSL(registration
['pop_endpoint'])
385 sasl_string
= build_sasl_string(token
['email'], registration
['pop_endpoint'], 995,
386 token
['access_token'])
388 pop_conn
.set_debuglevel(2)
390 # poplib doesn't have an auth command taking an authenticator object
391 # Microsoft requires a two-line SASL for POP
392 # pylint: disable=W0212
393 pop_conn
._shortcmd
('AUTH ' + registration
['sasl_method'])
394 pop_conn
._shortcmd
(base64
.standard_b64encode(sasl_string
.encode()).decode())
396 print('POP authentication succeeded')
397 except poplib
.error_proto
as e
:
398 print('POP authentication FAILED (does your account allow POP?):', e
.args
[0].decode())
401 # SMTP_SSL would be simpler but Microsoft does not answer on port 465.
402 smtp_conn
= smtplib
.SMTP(registration
['smtp_endpoint'], 587)
403 sasl_string
= build_sasl_string(token
['email'], registration
['smtp_endpoint'], 587,
404 token
['access_token'])
405 smtp_conn
.ehlo('test')
407 smtp_conn
.ehlo('test')
409 smtp_conn
.set_debuglevel(2)
411 smtp_conn
.auth(registration
['sasl_method'], lambda _
=None: sasl_string
)
413 print('SMTP authentication succeeded')
414 except smtplib
.SMTPAuthenticationError
as e
:
415 print('SMTP authentication FAILED:', e
)