Files
Odoo14Kanjabung/test_business_category_access_runtime.py

494 lines
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Runtime test akses eksklusif Business Category (Odoo XML-RPC).
Tujuan:
- Memastikan user hanya bisa akses record pada business category yang diizinkan.
- Memastikan akses lintas category ditolak (read/write rule).
- Memastikan policy strict manager scope berjalan (kecuali sysadmin).
Cara pakai:
1) Sesuaikan konfigurasi di bagian CONFIG.
2) Pastikan di database sudah ada minimal 2 business category.
3) Pastikan ada 2 user test (A dan B) dengan category berbeda.
4) Jalankan: python test_business_category_access_runtime.py
"""
import traceback
import xmlrpc.client
# =========================
# CONFIG
# =========================
ODOO_URL = "http://localhost:8070"
ODOO_DB = "kanjabung_MRP"
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin"
# User uji dengan category berbeda
USER_A_USERNAME = "user.category.a"
USER_A_PASSWORD = "admin"
USER_B_USERNAME = "user.category.b"
USER_B_PASSWORD = "admin"
AUTO_BOOTSTRAP_TEST_USERS = True
# Jika None, script akan auto-detect dari effective_business_category_ids user
USER_A_CATEGORY_ID = None
USER_B_CATEGORY_ID = None
# Model + field category yang diuji
MODEL_CATEGORY_MAP = {
"crm.lead": "business_category_id",
"sale.order": "business_category_id",
"purchase.order": "business_category_id",
"hr.expense": "business_category_id",
"stock.picking": "business_category_id",
"account.move": "business_category_id", # Sales invoice
}
TEST_GROUP_XMLIDS = [
"base.group_user",
"crm.group_use_lead",
"sales_team.group_sale_salesman",
"sales_team.group_sale_manager",
"purchase.group_purchase_user",
"hr_expense.group_hr_expense_user",
"stock.group_stock_user",
"account.group_account_user",
]
def rpc_common(url):
return xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/common")
def rpc_object(url):
return xmlrpc.client.ServerProxy(f"{url}/xmlrpc/2/object")
def auth(common, db, username, password):
uid = common.authenticate(db, username, password, {})
if not uid:
raise RuntimeError(f"Gagal login: {username}")
return uid
def execute(models, db, uid, pwd, model, method, args=None, kwargs=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
return models.execute_kw(db, uid, pwd, model, method, args, kwargs)
def safe_call(label, fn):
try:
result = fn()
return True, result, None
except Exception as exc:
return False, None, f"{label}: {exc}"
def pick_user_category(models, db, admin_uid, admin_pwd, user_id, fallback=None):
if fallback:
return fallback
user_data = execute(
models,
db,
admin_uid,
admin_pwd,
"res.users",
"read",
[[user_id]],
{"fields": ["effective_business_category_ids"]},
)
category_ids = user_data[0].get("effective_business_category_ids", [])
if not category_ids:
raise RuntimeError(
f"User ID {user_id} tidak punya effective_business_category_ids."
)
return category_ids[0]
def get_xmlid_res_id(models, db, uid, pwd, xmlid):
module, name = xmlid.split(".", 1)
rec = execute(
models,
db,
uid,
pwd,
"ir.model.data",
"search_read",
[[("module", "=", module), ("name", "=", name)]],
{"fields": ["res_id"], "limit": 1},
)
return rec[0]["res_id"] if rec else None
def ensure_two_categories(models, db, admin_uid, admin_pwd):
categories = execute(
models,
db,
admin_uid,
admin_pwd,
"crm.business.category",
"search_read",
[[("active", "=", True)]],
{"fields": ["id", "name"], "order": "id asc", "limit": 10},
)
if len(categories) >= 2:
return categories[0]["id"], categories[1]["id"]
company = execute(
models,
db,
admin_uid,
admin_pwd,
"res.company",
"search_read",
[[]],
{"fields": ["id"], "limit": 1},
)
if not company:
raise RuntimeError("Tidak ada company di database.")
company_id = company[0]["id"]
while len(categories) < 2:
next_no = len(categories) + 1
new_id = execute(
models,
db,
admin_uid,
admin_pwd,
"crm.business.category",
"create",
[{
"name": f"AUTO Category {next_no}",
"code": f"AUTO{next_no}",
"company_id": company_id,
}],
)
categories.append({"id": new_id, "name": f"AUTO Category {next_no}"})
return categories[0]["id"], categories[1]["id"]
def ensure_user_with_category(models, db, admin_uid, admin_pwd, login, password, category_id):
user_rec = execute(
models,
db,
admin_uid,
admin_pwd,
"res.users",
"search_read",
[[("login", "=", login)]],
{"fields": ["id", "company_id", "company_ids"], "limit": 1},
)
company = execute(
models,
db,
admin_uid,
admin_pwd,
"crm.business.category",
"read",
[[category_id]],
{"fields": ["company_id"]},
)
if not company or not company[0].get("company_id"):
raise RuntimeError(f"Category {category_id} tidak punya company_id.")
company_id = company[0]["company_id"][0]
group_ids = []
for xmlid in TEST_GROUP_XMLIDS:
gid = get_xmlid_res_id(models, db, admin_uid, admin_pwd, xmlid)
if gid:
group_ids.append(gid)
vals = {
"name": login,
"login": login,
"password": password,
"company_id": company_id,
"company_ids": [(6, 0, [company_id])],
"allowed_business_category_ids": [(6, 0, [category_id])],
"default_business_category_id": category_id,
"active_business_category_id": category_id,
}
if group_ids:
vals["groups_id"] = [(6, 0, group_ids)]
if user_rec:
uid = user_rec[0]["id"]
execute(models, db, admin_uid, admin_pwd, "res.users", "write", [[uid], vals])
return uid
return execute(models, db, admin_uid, admin_pwd, "res.users", "create", [vals])
def ensure_crm_lead_sample(models, db, admin_uid, admin_pwd, category_id, suffix):
existing = execute(
models,
db,
admin_uid,
admin_pwd,
"crm.lead",
"search",
[[("business_category_id", "=", category_id)]],
{"limit": 1},
)
if existing:
return existing[0]
team = execute(
models,
db,
admin_uid,
admin_pwd,
"crm.team",
"search_read",
[[("business_category_id", "=", category_id)]],
{"fields": ["id"], "limit": 1},
)
vals = {
"name": f"AUTO Lead {suffix}",
"business_category_id": category_id,
}
if team:
vals["team_id"] = team[0]["id"]
ok, result, err = safe_call(
"crm.lead.create",
lambda: execute(
models,
db,
admin_uid,
admin_pwd,
"crm.lead",
"create",
[vals],
),
)
if ok:
return result
print(f"[WARN] Gagal create sample crm.lead untuk category {category_id}: {err}")
return None
def find_sample_record(models, db, admin_uid, admin_pwd, model, category_field, category_id):
recs = execute(
models,
db,
admin_uid,
admin_pwd,
model,
"search_read",
[[(category_field, "=", category_id)]],
{"fields": ["id"], "limit": 1},
)
return recs[0]["id"] if recs else None
def test_access_rule(models, db, uid, pwd, model, record_id, operation):
def _call():
return execute(
models,
db,
uid,
pwd,
model,
"check_access_rule",
[[record_id], operation],
)
return safe_call(f"{model}.check_access_rule({operation})", _call)
def test_read(models, db, uid, pwd, model, record_id):
def _call():
return execute(
models,
db,
uid,
pwd,
model,
"read",
[[record_id]],
{"fields": ["id"]},
)
return safe_call(f"{model}.read", _call)
def print_result(test_name, ok, detail=""):
status = "PASS" if ok else "FAIL"
print(f"[{status}] {test_name}")
if detail:
print(f" {detail}")
def main():
print("=" * 90)
print("TEST RUNTIME AKSES EKSKLUSIF BUSINESS CATEGORY")
print("=" * 90)
print(f"URL : {ODOO_URL}")
print(f"DB : {ODOO_DB}")
print("-" * 90)
common = rpc_common(ODOO_URL)
models = rpc_object(ODOO_URL)
admin_uid = auth(common, ODOO_DB, ADMIN_USERNAME, ADMIN_PASSWORD)
user_a_uid = common.authenticate(ODOO_DB, USER_A_USERNAME, USER_A_PASSWORD, {})
user_b_uid = common.authenticate(ODOO_DB, USER_B_USERNAME, USER_B_PASSWORD, {})
if AUTO_BOOTSTRAP_TEST_USERS and (not user_a_uid or not user_b_uid):
cat_a_bootstrap, cat_b_bootstrap = ensure_two_categories(
models, ODOO_DB, admin_uid, ADMIN_PASSWORD
)
ensure_user_with_category(
models,
ODOO_DB,
admin_uid,
ADMIN_PASSWORD,
USER_A_USERNAME,
USER_A_PASSWORD,
cat_a_bootstrap,
)
ensure_user_with_category(
models,
ODOO_DB,
admin_uid,
ADMIN_PASSWORD,
USER_B_USERNAME,
USER_B_PASSWORD,
cat_b_bootstrap,
)
ensure_crm_lead_sample(models, ODOO_DB, admin_uid, ADMIN_PASSWORD, cat_a_bootstrap, "A")
ensure_crm_lead_sample(models, ODOO_DB, admin_uid, ADMIN_PASSWORD, cat_b_bootstrap, "B")
user_a_uid = auth(common, ODOO_DB, USER_A_USERNAME, USER_A_PASSWORD)
user_b_uid = auth(common, ODOO_DB, USER_B_USERNAME, USER_B_PASSWORD)
else:
user_a_uid = auth(common, ODOO_DB, USER_A_USERNAME, USER_A_PASSWORD)
user_b_uid = auth(common, ODOO_DB, USER_B_USERNAME, USER_B_PASSWORD)
cat_a = pick_user_category(
models, ODOO_DB, admin_uid, ADMIN_PASSWORD, user_a_uid, fallback=USER_A_CATEGORY_ID
)
cat_b = pick_user_category(
models, ODOO_DB, admin_uid, ADMIN_PASSWORD, user_b_uid, fallback=USER_B_CATEGORY_ID
)
if cat_a == cat_b:
raise RuntimeError(
"User A dan User B berada di category yang sama. Gunakan 2 user dengan category berbeda."
)
print(f"User A UID={user_a_uid} category={cat_a}")
print(f"User B UID={user_b_uid} category={cat_b}")
print("-" * 90)
total = 0
passed = 0
for model, category_field in MODEL_CATEGORY_MAP.items():
print(f"\nModel: {model} (field: {category_field})")
sample_a = find_sample_record(
models, ODOO_DB, admin_uid, ADMIN_PASSWORD, model, category_field, cat_a
)
sample_b = find_sample_record(
models, ODOO_DB, admin_uid, ADMIN_PASSWORD, model, category_field, cat_b
)
if not sample_a or not sample_b:
print(
"[SKIP] Sample data tidak cukup untuk dua category. "
"Siapkan minimal 1 record per category."
)
continue
# 1) User A read own category => harus boleh
total += 1
ok_rule, _, err_rule = test_access_rule(
models, ODOO_DB, user_a_uid, USER_A_PASSWORD, model, sample_a, "read"
)
ok_read, _, err_read = test_read(
models, ODOO_DB, user_a_uid, USER_A_PASSWORD, model, sample_a
)
own_read_ok = ok_rule and ok_read
if own_read_ok:
passed += 1
print_result(
"User A read OWN category",
own_read_ok,
"" if own_read_ok else (err_rule or err_read or "ditolak"),
)
# 2) User A read other category => harus ditolak
total += 1
ok_rule_other, _, err_rule_other = test_access_rule(
models, ODOO_DB, user_a_uid, USER_A_PASSWORD, model, sample_b, "read"
)
ok_read_other, _, err_read_other = test_read(
models, ODOO_DB, user_a_uid, USER_A_PASSWORD, model, sample_b
)
other_read_denied = not (ok_rule_other and ok_read_other)
if other_read_denied:
passed += 1
print_result(
"User A read OTHER category (must deny)",
other_read_denied,
"denied as expected"
if other_read_denied
else "masih bisa baca lintas category",
)
# 3) User A write other category => harus ditolak
total += 1
ok_write_other, _, err_write_other = test_access_rule(
models, ODOO_DB, user_a_uid, USER_A_PASSWORD, model, sample_b, "write"
)
other_write_denied = not ok_write_other
if other_write_denied:
passed += 1
print_result(
"User A write OTHER category (must deny)",
other_write_denied,
"denied as expected"
if other_write_denied
else f"masih lolos check_access_rule(write): {err_write_other}",
)
print("\n" + "=" * 90)
print(f"SUMMARY: {passed}/{total} test PASS")
print("=" * 90)
if total == 0:
print("Tidak ada test yang dieksekusi karena sample data belum tersedia.")
return 2
if passed != total:
print("\nAda gap akses. Cek record rule di modul terkait (sales/purchase/expense/inventory/accounting).")
return 1
print("\nSemua test lolos. Akses eksklusif per business category terverifikasi.")
return 0
if __name__ == "__main__":
try:
exit_code = main()
except Exception:
print("\n[ERROR] Test gagal dijalankan:")
traceback.print_exc()
exit_code = 99
raise SystemExit(exit_code)