gcx/scripts/aliyun_cloudauth_manager.py

237 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
阿里云实人认证 (CloudAuth) 管理工具
功能: 认证场景管理、发起认证、查询结果、套餐余额查询
依赖: pip install alibabacloud-cloudauth20190307 alibabacloud-bssopenapi20171214
"""
import argparse
import json
import sys
import os
import uuid
def get_client():
from alibabacloud_cloudauth20190307.client import Client
from alibabacloud_tea_openapi.models import Config
ak_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
ak_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
endpoint = os.environ.get('ALIYUN_KYC_ENDPOINT', 'cloudauth.aliyuncs.com')
if not ak_id or not ak_secret:
print('ERROR: Set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET')
sys.exit(1)
config = Config(
access_key_id=ak_id,
access_key_secret=ak_secret,
endpoint=endpoint,
)
return Client(config)
# ──────────────── 认证场景 ────────────────
def describe_verify_setting(args):
"""查询认证场景配置 (通过探测 API 检查服务状态)"""
client = get_client()
from alibabacloud_cloudauth20190307.models import InitFaceVerifyRequest
scene_id = args.scene_id or os.environ.get('ALIYUN_KYC_SCENE_ID', '')
try:
# Probe service via InitFaceVerify with dummy data
req = InitFaceVerifyRequest(
scene_id=int(scene_id) if scene_id else 0,
outer_order_no=str(uuid.uuid4()),
product_code='ID_PRO',
cert_type='IDENTITY_CARD',
cert_name='probe',
cert_no='000000000000000000',
)
resp = client.init_face_verify(req)
body = resp.body
print(f'\n📋 实人认证服务状态\n')
print(f' 场景ID: {scene_id or "(未设置)"}')
print(f' 产品码: ID_PRO (人脸活体检测)')
print(f' 请求ID: {body.request_id}')
print(f' 状态: ✅ 服务可用 (权限正常)')
if body.code and body.code != '200':
print(f' 探测响应: {body.code} - {body.message or ""}')
except Exception as e:
err = str(e)
if 'NoPermission' in err or '403' in err:
print(f'\n❌ 权限不足: RAM 子账号缺少 CloudAuth 权限')
print(f' 请添加 AliyunYundunCloudAuthFullAccess 策略')
else:
print(f'ERROR: {e}')
# ──────────────── 发起认证 ────────────────
def init_face_verify(args):
"""发起人脸活体检测认证"""
client = get_client()
from alibabacloud_cloudauth20190307.models import InitFaceVerifyRequest
scene_id = args.scene_id or os.environ.get('ALIYUN_KYC_SCENE_ID', '')
outer_order_no = args.order_no or str(uuid.uuid4())
req = InitFaceVerifyRequest(
scene_id=int(scene_id),
outer_order_no=outer_order_no,
product_code='ID_PRO',
cert_type='IDENTITY_CARD',
cert_name=args.name,
cert_no=args.id_number,
)
try:
resp = client.init_face_verify(req)
body = resp.body
if body.code == '200' or body.code == 200:
result = body.result_object
print(f'✅ 认证已发起')
print(f' CertifyId: {result.certify_id}')
print(f' CertifyUrl: {result.certify_url}')
print(f' OrderNo: {outer_order_no}')
else:
print(f'ERROR: {body.code} - {body.message}')
except Exception as e:
print(f'ERROR: {e}')
def describe_face_verify(args):
"""查询认证结果"""
client = get_client()
from alibabacloud_cloudauth20190307.models import DescribeFaceVerifyRequest
req = DescribeFaceVerifyRequest(
scene_id=int(args.scene_id or os.environ.get('ALIYUN_KYC_SCENE_ID', '')),
certify_id=args.certify_id,
)
try:
resp = client.describe_face_verify(req)
body = resp.body
if body.code == '200' or body.code == 200:
result = body.result_object
passed = result.passed == 'T'
print(f'\n📋 认证结果')
print(f' CertifyId: {args.certify_id}')
print(f' 通过: {"✅ 是" if passed else "❌ 否"}')
print(f' SubCode: {result.sub_code}')
if result.material_info:
print(f' 材料信息: {result.material_info}')
else:
print(f'ERROR: {body.code} - {body.message}')
except Exception as e:
print(f'ERROR: {e}')
# ──────────────── 身份二要素核验 ────────────────
def verify_identity(args):
"""身份证二要素核验 (姓名+身份证号)"""
client = get_client()
from alibabacloud_cloudauth20190307.models import Id2MetaVerifyRequest
req = Id2MetaVerifyRequest(
param_type='normal',
user_name=args.name,
identity_card_number=args.id_number,
)
try:
resp = client.id_2meta_verify(req)
body = resp.body
if body.code == '200' or body.code == 200:
result = body.result_object
bizCode = result.biz_code if result else 'UNKNOWN'
passed = bizCode == '1'
print(f'\n📋 身份二要素核验')
print(f' 姓名: {args.name}')
print(f' 身份证: {args.id_number[:6]}****{args.id_number[-4:]}')
print(f' 结果: {"✅ 一致" if passed else "❌ 不一致"}')
print(f' BizCode: {bizCode}')
else:
print(f'ERROR: {body.code} - {body.message}')
except Exception as e:
print(f'ERROR: {e}')
# ──────────────── 套餐余额查询 ────────────────
def query_quota(args):
"""查询实人认证套餐余额 (通过 BSS API)"""
from alibabacloud_bssopenapi20171214.client import Client as BssClient
from alibabacloud_bssopenapi20171214.models import QueryResourcePackageInstancesRequest
from alibabacloud_tea_openapi.models import Config
ak_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
ak_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
config = Config(
access_key_id=ak_id,
access_key_secret=ak_secret,
endpoint='business.aliyuncs.com',
)
client = BssClient(config)
try:
req = QueryResourcePackageInstancesRequest(page_num=1, page_size=100)
resp = client.query_resource_package_instances(req)
body = resp.body
instances = body.data.instances.instance if body.data and body.data.instances else []
# Filter cloudauth packages
auth_pkgs = [i for i in instances if 'cloudauth' in (i.to_map().get('CommodityCode', '') or '').lower()]
if not auth_pkgs:
print('\n📋 未找到实人认证相关套餐')
return
print(f'\n📋 实人认证套餐余额 (共 {len(auth_pkgs)} 个)\n')
print(f'{"套餐名称":<30} {"总量":<10} {"剩余":<10} {"到期日":<22} {"状态"}')
print('-' * 90)
for pkg in auth_pkgs:
m = pkg.to_map()
name = m.get('Remark', '') or m.get('PackageType', '')
total = m.get('TotalAmount', '0')
remain = m.get('RemainingAmount', '0')
unit = m.get('TotalAmountUnit', '')
expiry = (m.get('ExpiryTime', '') or '')[:10]
status = m.get('Status', '')
status_str = '✅ 可用' if status == 'Available' else f'{status}'
print(f'{name:<30} {total}{unit:<8} {remain}{unit:<8} {expiry:<22} {status_str}')
except Exception as e:
err = str(e)
if 'NotAuthorized' in err:
print(f'\n❌ 权限不足: 需要 AliyunBSSReadOnlyAccess 策略')
else:
print(f'ERROR: {e}')
# ──────────────── CLI ────────────────
def main():
parser = argparse.ArgumentParser(description='阿里云实人认证管理工具')
sub = parser.add_subparsers(dest='command')
# 场景查询
p = sub.add_parser('list-scenes', help='查询认证场景配置')
p.add_argument('--scene-id', help='场景ID (留空查全部)')
# 发起人脸认证
p = sub.add_parser('init-face', help='发起人脸活体检测')
p.add_argument('--name', required=True, help='真实姓名')
p.add_argument('--id-number', required=True, help='身份证号')
p.add_argument('--scene-id', help='场景ID (默认取环境变量)')
p.add_argument('--order-no', help='业务订单号 (默认自动生成)')
# 查询认证结果
p = sub.add_parser('query-face', help='查询人脸认证结果')
p.add_argument('--certify-id', required=True, help='认证ID')
p.add_argument('--scene-id', help='场景ID')
# 套餐余额
p = sub.add_parser('quota', help='查询套餐余额')
# 身份二要素核验
p = sub.add_parser('verify-id', help='身份证二要素核验')
p.add_argument('--name', required=True, help='真实姓名')
p.add_argument('--id-number', required=True, help='身份证号')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
cmds = {
'list-scenes': describe_verify_setting,
'init-face': init_face_verify,
'query-face': describe_face_verify,
'verify-id': verify_identity,
'quota': query_quota,
}
cmds[args.command](args)
if __name__ == '__main__':
main()