]> git.rmz.io Git - dotfiles.git/commitdiff
mail: use XOAUTH2 for wio
authorSamir Benmendil <me@rmz.io>
Wed, 31 Aug 2022 21:34:56 +0000 (22:34 +0100)
committerSamir Benmendil <me@rmz.io>
Wed, 31 Aug 2022 21:42:12 +0000 (22:42 +0100)
mbsync/config
msmtprc
mutt/mutt_oauth2.py [new file with mode: 0755]

index 16baaf2c9dd3c12e295552c04228ec6de648ec42..62bffc565b10a50a06297e11dc1833f0db2475d0 100644 (file)
@@ -64,8 +64,8 @@ Channel mailfence/archive
 IMAPAccount     wio
 Host            outlook.office365.com
 User            sbenmendil@witekio.com
-PassCmd         "sed -En '/outlook.*sbenmendil@witekio.com/ s/.*password (.*)/\\1/p' ~/.netrc"
-AuthMechs       LOGIN
+PassCmd         "~/.config/mutt/mutt_oauth2.py ~/.cache/mutt_oauth2/sbenmendil@witekio.com.tokens"
+AuthMechs       XOAUTH2
 SSLType         IMAPS
 CertificateFile /etc/ssl/certs/ca-certificates.crt
 
diff --git a/msmtprc b/msmtprc
index 42f2d5b27ec8dc3d83b4b360ac565f505563ea39..0147825562cfede8216d307b5cea53fe48f95868 100644 (file)
--- a/msmtprc
+++ b/msmtprc
@@ -21,9 +21,9 @@ account wio
 host smtp.office365.com
 port 587
 protocol smtp
-auth on
+auth xoauth2
 user sbenmendil@witekio.com
-# passwordeval "pass gmail.com/msmtp"
+passwordeval "~/.config/mutt/mutt_oauth2.py ~/.cache/mutt_oauth2/sbenmendil@witekio.com.tokens"
 from sbenmendil@witekio.com
 
 #set default account
diff --git a/mutt/mutt_oauth2.py b/mutt/mutt_oauth2.py
new file mode 100755 (executable)
index 0000000..1c6ec37
--- /dev/null
@@ -0,0 +1,419 @@
+#!/usr/bin/env python3
+#
+# Mutt OAuth2 token management script, version 2020-08-07
+# Written against python 3.7.3, not tried with earlier python versions.
+#
+#   Copyright (C) 2020 Alexander Perlis
+#
+#   This program is free software; you can redistribute it and/or
+#   modify it under the terms of the GNU General Public License as
+#   published by the Free Software Foundation; either version 2 of the
+#   License, or (at your option) any later version.
+#
+#   This program is distributed in the hope that it will be useful,
+#   but WITHOUT ANY WARRANTY; without even the implied warranty of
+#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+#   General Public License for more details.
+#
+#   You should have received a copy of the GNU General Public License
+#   along with this program; if not, write to the Free Software
+#   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+#   02110-1301, USA.
+'''Mutt OAuth2 token management'''
+
+import sys
+import json
+import argparse
+import urllib.parse
+import urllib.request
+import imaplib
+import poplib
+import smtplib
+import base64
+import secrets
+import hashlib
+import time
+from datetime import timedelta, datetime
+from pathlib import Path
+import socket
+import http.server
+import subprocess
+
+# The token file must be encrypted because it contains multi-use bearer tokens
+# whose usage does not require additional verification. Specify whichever
+# encryption and decryption pipes you prefer. They should read from standard
+# input and write to standard output. The example values here invoke GPG,
+# although won't work until an appropriate identity appears in the first line.
+ENCRYPTION_PIPE = ['gpg', '--encrypt', '--recipient', 'me@rmz.io']
+DECRYPTION_PIPE = ['gpg', '--decrypt']
+
+registrations = {
+    'google': {
+        'authorize_endpoint': 'https://accounts.google.com/o/oauth2/auth',
+        'devicecode_endpoint': 'https://oauth2.googleapis.com/device/code',
+        'token_endpoint': 'https://accounts.google.com/o/oauth2/token',
+        'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
+        'imap_endpoint': 'imap.gmail.com',
+        'pop_endpoint': 'pop.gmail.com',
+        'smtp_endpoint': 'smtp.gmail.com',
+        'sasl_method': 'OAUTHBEARER',
+        'scope': 'https://mail.google.com/',
+        'client_id': '',
+        'client_secret': '',
+    },
+    'microsoft': {
+        'authorize_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize',
+        'devicecode_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/devicecode',
+        'token_endpoint': 'https://login.microsoftonline.com/common/oauth2/v2.0/token',
+        'redirect_uri': 'https://login.microsoftonline.com/common/oauth2/nativeclient',
+        'tenant': 'common',
+        'imap_endpoint': 'outlook.office365.com',
+        'pop_endpoint': 'outlook.office365.com',
+        'smtp_endpoint': 'smtp.office365.com',
+        'sasl_method': 'XOAUTH2',
+        'scope': ('offline_access https://outlook.office.com/IMAP.AccessAsUser.All '
+                  'https://outlook.office.com/POP.AccessAsUser.All '
+                  'https://outlook.office.com/SMTP.Send'),
+        'client_id': '08162f7c-0fd2-4200-a84a-f25a4db0b584',
+        'client_secret': 'TxRBilcHdC6WGBee]fs?QR:SJ8nI[g82',
+    },
+}
+
+ap = argparse.ArgumentParser(epilog='''
+This script obtains and prints a valid OAuth2 access token.  State is maintained in an
+encrypted TOKENFILE.  Run with "--verbose --authorize" to get started or whenever all
+tokens have expired, optionally with "--authflow" to override the default authorization
+flow.  To truly start over from scratch, first delete TOKENFILE.  Use "--verbose --test"
+to test the IMAP/POP/SMTP endpoints.
+''')
+ap.add_argument('-v', '--verbose', action='store_true', help='increase verbosity')
+ap.add_argument('-d', '--debug', action='store_true', help='enable debug output')
+ap.add_argument('tokenfile', help='persistent token storage')
+ap.add_argument('-a', '--authorize', action='store_true', help='manually authorize new tokens')
+ap.add_argument('--authflow', help='authcode | localhostauthcode | devicecode')
+ap.add_argument('-t', '--test', action='store_true', help='test IMAP/POP/SMTP endpoints')
+args = ap.parse_args()
+
+token = {}
+path = Path(args.tokenfile)
+if path.exists():
+    if 0o777 & path.stat().st_mode != 0o600:
+        sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
+    try:
+        sub = subprocess.run(DECRYPTION_PIPE, check=True, input=path.read_bytes(),
+                             capture_output=True)
+        token = json.loads(sub.stdout)
+    except subprocess.CalledProcessError:
+        sys.exit('Difficulty decrypting token file. Is your decryption agent primed for '
+                 'non-interactive usage, or an appropriate environment variable such as '
+                 'GPG_TTY set to allow interactive agent usage from inside a pipe?')
+
+
+def writetokenfile():
+    '''Writes global token dictionary into token file.'''
+    if not path.exists():
+        path.touch(mode=0o600)
+    if 0o777 & path.stat().st_mode != 0o600:
+        sys.exit('Token file has unsafe mode. Suggest deleting and starting over.')
+    sub2 = subprocess.run(ENCRYPTION_PIPE, check=True, input=json.dumps(token).encode(),
+                          capture_output=True)
+    path.write_bytes(sub2.stdout)
+
+
+if args.debug:
+    print('Obtained from token file:', json.dumps(token))
+if not token:
+    if not args.authorize:
+        sys.exit('You must run script with "--authorize" at least once.')
+    print('Available app and endpoint registrations:', *registrations)
+    token['registration'] = input('OAuth2 registration: ')
+    token['authflow'] = input('Preferred OAuth2 flow ("authcode" or "localhostauthcode" '
+                              'or "devicecode"): ')
+    token['email'] = input('Account e-mail address: ')
+    token['access_token'] = ''
+    token['access_token_expiration'] = ''
+    token['refresh_token'] = ''
+    writetokenfile()
+
+if token['registration'] not in registrations:
+    sys.exit(f'ERROR: Unknown registration "{token["registration"]}". Delete token file '
+             f'and start over.')
+registration = registrations[token['registration']]
+
+authflow = token['authflow']
+if args.authflow:
+    authflow = args.authflow
+
+baseparams = {'client_id': registration['client_id']}
+# Microsoft uses 'tenant' but Google does not
+if 'tenant' in registration:
+    baseparams['tenant'] = registration['tenant']
+
+
+def access_token_valid():
+    '''Returns True when stored access token exists and is still valid at this time.'''
+    token_exp = token['access_token_expiration']
+    return token_exp and datetime.now() < datetime.fromisoformat(token_exp)
+
+
+def update_tokens(r):
+    '''Takes a response dictionary, extracts tokens out of it, and updates token file.'''
+    token['access_token'] = r['access_token']
+    token['access_token_expiration'] = (datetime.now() +
+                                        timedelta(seconds=int(r['expires_in']))).isoformat()
+    if 'refresh_token' in r:
+        token['refresh_token'] = r['refresh_token']
+    writetokenfile()
+    if args.verbose:
+        print(f'NOTICE: Obtained new access token, expires {token["access_token_expiration"]}.')
+
+
+if args.authorize:
+    p = baseparams.copy()
+    p['scope'] = registration['scope']
+
+    if authflow in ('authcode', 'localhostauthcode'):
+        verifier = secrets.token_urlsafe(90)
+        challenge = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest())[:-1]
+        redirect_uri = registration['redirect_uri']
+        listen_port = 0
+        if authflow == 'localhostauthcode':
+            # Find an available port to listen on
+            s = socket.socket()
+            s.bind(('127.0.0.1', 0))
+            listen_port = s.getsockname()[1]
+            s.close()
+            redirect_uri = 'http://localhost:'+str(listen_port)+'/'
+            # Probably should edit the port number into the actual redirect URL.
+
+        p.update({'login_hint': token['email'],
+                  'response_type': 'code',
+                  'redirect_uri': redirect_uri,
+                  'code_challenge': challenge,
+                  'code_challenge_method': 'S256'})
+        print(registration["authorize_endpoint"] + '?' +
+              urllib.parse.urlencode(p, quote_via=urllib.parse.quote))
+
+        authcode = ''
+        if authflow == 'authcode':
+            authcode = input('Visit displayed URL to retrieve authorization code. Enter '
+                             'code from server (might be in browser address bar): ')
+        else:
+            print('Visit displayed URL to authorize this application. Waiting...',
+                  end='', flush=True)
+
+            class MyHandler(http.server.BaseHTTPRequestHandler):
+                '''Handles the browser query resulting from redirect to redirect_uri.'''
+
+                # pylint: disable=C0103
+                def do_HEAD(self):
+                    '''Response to a HEAD requests.'''
+                    self.send_response(200)
+                    self.send_header('Content-type', 'text/html')
+                    self.end_headers()
+
+                def do_GET(self):
+                    '''For GET request, extract code parameter from URL.'''
+                    # pylint: disable=W0603
+                    global authcode
+                    querystring = urllib.parse.urlparse(self.path).query
+                    querydict = urllib.parse.parse_qs(querystring)
+                    if 'code' in querydict:
+                        authcode = querydict['code'][0]
+                    self.do_HEAD()
+                    self.wfile.write(b'<html><head><title>Authorizaton result</title></head>')
+                    self.wfile.write(b'<body><p>Authorization redirect completed. You may '
+                                     b'close this window.</p></body></html>')
+            with http.server.HTTPServer(('127.0.0.1', listen_port), MyHandler) as httpd:
+                try:
+                    httpd.handle_request()
+                except KeyboardInterrupt:
+                    pass
+
+        if not authcode:
+            sys.exit('Did not obtain an authcode.')
+
+        for k in 'response_type', 'login_hint', 'code_challenge', 'code_challenge_method':
+            del p[k]
+        p.update({'grant_type': 'authorization_code',
+                  'code': authcode,
+                  'client_secret': registration['client_secret'],
+                  'code_verifier': verifier})
+        try:
+            response = urllib.request.urlopen(registration['token_endpoint'],
+                                              urllib.parse.urlencode(p).encode())
+        except urllib.error.HTTPError as err:
+            print(err.code, err.reason)
+            response = err
+        response = response.read()
+        if args.debug:
+            print(response)
+        response = json.loads(response)
+        if 'error' in response:
+            print(response['error'])
+            if 'error_description' in response:
+                print(response['error_description'])
+            sys.exit(1)
+
+    elif authflow == 'devicecode':
+        try:
+            response = urllib.request.urlopen(registration['devicecode_endpoint'],
+                                              urllib.parse.urlencode(p).encode())
+        except urllib.error.HTTPError as err:
+            print(err.code, err.reason)
+            response = err
+        response = response.read()
+        if args.debug:
+            print(response)
+        response = json.loads(response)
+        if 'error' in response:
+            print(response['error'])
+            if 'error_description' in response:
+                print(response['error_description'])
+            sys.exit(1)
+        print(response['message'])
+        del p['scope']
+        p.update({'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
+                  'client_secret': registration['client_secret'],
+                  'device_code': response['device_code']})
+        interval = int(response['interval'])
+        print('Polling...', end='', flush=True)
+        while True:
+            time.sleep(interval)
+            print('.', end='', flush=True)
+            try:
+                response = urllib.request.urlopen(registration['token_endpoint'],
+                                                  urllib.parse.urlencode(p).encode())
+            except urllib.error.HTTPError as err:
+                # Not actually always an error, might just mean "keep trying..."
+                response = err
+            response = response.read()
+            if args.debug:
+                print(response)
+            response = json.loads(response)
+            if 'error' not in response:
+                break
+            if response['error'] == 'authorization_declined':
+                print(' user declined authorization.')
+                sys.exit(1)
+            if response['error'] == 'expired_token':
+                print(' too much time has elapsed.')
+                sys.exit(1)
+            if response['error'] != 'authorization_pending':
+                print(response['error'])
+                if 'error_description' in response:
+                    print(response['error_description'])
+                sys.exit(1)
+        print()
+
+    else:
+        sys.exit(f'ERROR: Unknown OAuth2 flow "{token["authflow"]}. Delete token file and '
+                 f'start over.')
+
+    update_tokens(response)
+
+
+if not access_token_valid():
+    if args.verbose:
+        print('NOTICE: Invalid or expired access token; using refresh token '
+              'to obtain new access token.')
+    if not token['refresh_token']:
+        sys.exit('ERROR: No refresh token. Run script with "--authorize".')
+    p = baseparams.copy()
+    p.update({'client_secret': registration['client_secret'],
+              'refresh_token': token['refresh_token'],
+              'grant_type': 'refresh_token'})
+    try:
+        response = urllib.request.urlopen(registration['token_endpoint'],
+                                          urllib.parse.urlencode(p).encode())
+    except urllib.error.HTTPError as err:
+        print(err.code, err.reason)
+        response = err
+    response = response.read()
+    if args.debug:
+        print(response)
+    response = json.loads(response)
+    if 'error' in response:
+        print(response['error'])
+        if 'error_description' in response:
+            print(response['error_description'])
+        print('Perhaps refresh token invalid. Try running once with "--authorize"')
+        sys.exit(1)
+    update_tokens(response)
+
+
+if not access_token_valid():
+    sys.exit('ERROR: No valid access token. This should not be able to happen.')
+
+
+if args.verbose:
+    print('Access Token: ', end='')
+print(token['access_token'])
+
+
+def build_sasl_string(user, host, port, bearer_token):
+    '''Build appropriate SASL string, which depends on cloud server's supported SASL method.'''
+    if registration['sasl_method'] == 'OAUTHBEARER':
+        return f'n,a={user},\1host={host}\1port={port}\1auth=Bearer {bearer_token}\1\1'
+    if registration['sasl_method'] == 'XOAUTH2':
+        return f'user={user}\1auth=Bearer {bearer_token}\1\1'
+    sys.exit(f'Unknown SASL method {registration["sasl_method"]}.')
+
+
+if args.test:
+    errors = False
+
+    imap_conn = imaplib.IMAP4_SSL(registration['imap_endpoint'])
+    sasl_string = build_sasl_string(token['email'], registration['imap_endpoint'], 993,
+                                    token['access_token'])
+    if args.debug:
+        imap_conn.debug = 4
+    try:
+        imap_conn.authenticate(registration['sasl_method'], lambda _: sasl_string.encode())
+        # Microsoft has a bug wherein a mismatch between username and token can still report a
+        # successful login... (Try a consumer login with the token from a work/school account.)
+        # Fortunately subsequent commands fail with an error. Thus we follow AUTH with another
+        # IMAP command before reporting success.
+        imap_conn.list()
+        if args.verbose:
+            print('IMAP authentication succeeded')
+    except imaplib.IMAP4.error as e:
+        print('IMAP authentication FAILED (does your account allow IMAP?):', e)
+        errors = True
+
+    pop_conn = poplib.POP3_SSL(registration['pop_endpoint'])
+    sasl_string = build_sasl_string(token['email'], registration['pop_endpoint'], 995,
+                                    token['access_token'])
+    if args.debug:
+        pop_conn.set_debuglevel(2)
+    try:
+        # poplib doesn't have an auth command taking an authenticator object
+        # Microsoft requires a two-line SASL for POP
+        # pylint: disable=W0212
+        pop_conn._shortcmd('AUTH ' + registration['sasl_method'])
+        pop_conn._shortcmd(base64.standard_b64encode(sasl_string.encode()).decode())
+        if args.verbose:
+            print('POP authentication succeeded')
+    except poplib.error_proto as e:
+        print('POP authentication FAILED (does your account allow POP?):', e.args[0].decode())
+        errors = True
+
+    # SMTP_SSL would be simpler but Microsoft does not answer on port 465.
+    smtp_conn = smtplib.SMTP(registration['smtp_endpoint'], 587)
+    sasl_string = build_sasl_string(token['email'], registration['smtp_endpoint'], 587,
+                                    token['access_token'])
+    smtp_conn.ehlo('test')
+    smtp_conn.starttls()
+    smtp_conn.ehlo('test')
+    if args.debug:
+        smtp_conn.set_debuglevel(2)
+    try:
+        smtp_conn.auth(registration['sasl_method'], lambda _=None: sasl_string)
+        if args.verbose:
+            print('SMTP authentication succeeded')
+    except smtplib.SMTPAuthenticationError as e:
+        print('SMTP authentication FAILED:', e)
+        errors = True
+
+    if errors:
+        sys.exit(1)