114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
import base64
|
|
import hashlib
|
|
import secrets
|
|
|
|
from odoo import api, fields, models
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools.misc import consteq
|
|
|
|
|
|
class ApiClient(models.Model):
|
|
_name = 'api.client'
|
|
_description = 'API Client for JWT Authentication'
|
|
_rec_name = 'name'
|
|
|
|
name = fields.Char(string='Application Name', required=True)
|
|
active = fields.Boolean(default=True)
|
|
client_id = fields.Char(
|
|
string='Client ID',
|
|
required=True,
|
|
default=lambda self: secrets.token_urlsafe(24),
|
|
copy=False,
|
|
readonly=True,
|
|
)
|
|
client_secret = fields.Char(
|
|
string='New Client Secret',
|
|
copy=False,
|
|
store=False,
|
|
help='Shown only before saving. Use Regenerate Secret to rotate it.',
|
|
)
|
|
client_secret_hash = fields.Char(string='Client Secret Hash', copy=False, readonly=True)
|
|
user_id = fields.Many2one(
|
|
'res.users',
|
|
string='Odoo User',
|
|
required=True,
|
|
domain=[('active', '=', True)],
|
|
help='Odoo user mapped to JWT tokens issued for this client.',
|
|
)
|
|
token_ttl = fields.Integer(string='Token Lifetime (seconds)', default=7200, required=True)
|
|
last_used = fields.Datetime(readonly=True)
|
|
|
|
_sql_constraints = [
|
|
('client_id_unique', 'unique(client_id)', 'Client ID must be unique.'),
|
|
('token_ttl_positive', 'check(token_ttl > 0)', 'Token lifetime must be positive.'),
|
|
]
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
generated_secrets = []
|
|
for vals in vals_list:
|
|
plain_secret = vals.pop('client_secret', None) or self._generate_secret()
|
|
vals['client_secret_hash'] = self._hash_secret(plain_secret)
|
|
generated_secrets.append(plain_secret)
|
|
records = super().create(vals_list)
|
|
for record, plain_secret in zip(records, generated_secrets):
|
|
record.client_secret = plain_secret
|
|
return records
|
|
|
|
def write(self, vals):
|
|
if 'client_secret' in vals:
|
|
plain_secret = vals.pop('client_secret')
|
|
if plain_secret:
|
|
vals['client_secret_hash'] = self._hash_secret(plain_secret)
|
|
return super().write(vals)
|
|
|
|
def action_regenerate_secret(self):
|
|
for client in self:
|
|
plain_secret = self._generate_secret()
|
|
client.write({'client_secret': plain_secret})
|
|
client.client_secret = plain_secret
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'display_notification',
|
|
'params': {
|
|
'title': 'Client secret regenerated',
|
|
'message': 'Copy the new secret from the form before leaving this page.',
|
|
'type': 'warning',
|
|
'sticky': True,
|
|
},
|
|
}
|
|
|
|
@api.model
|
|
def _generate_secret(self):
|
|
return secrets.token_urlsafe(48)
|
|
|
|
@api.model
|
|
def _hash_secret(self, secret):
|
|
if not secret:
|
|
raise UserError('Client secret cannot be empty.')
|
|
iterations = 260000
|
|
salt = secrets.token_bytes(16)
|
|
digest = hashlib.pbkdf2_hmac('sha256', secret.encode(), salt, iterations)
|
|
return '$'.join((
|
|
'pbkdf2_sha256',
|
|
str(iterations),
|
|
base64.urlsafe_b64encode(salt).decode(),
|
|
base64.urlsafe_b64encode(digest).decode(),
|
|
))
|
|
|
|
def _check_secret(self, secret):
|
|
self.ensure_one()
|
|
try:
|
|
scheme, iterations, salt, expected = self.client_secret_hash.split('$', 3)
|
|
if scheme != 'pbkdf2_sha256':
|
|
return False
|
|
digest = hashlib.pbkdf2_hmac(
|
|
'sha256',
|
|
secret.encode(),
|
|
base64.urlsafe_b64decode(salt.encode()),
|
|
int(iterations),
|
|
)
|
|
return consteq(base64.urlsafe_b64encode(digest).decode(), expected)
|
|
except Exception:
|
|
return False
|