JSON-RPC 调用Odoo

微信公众号、小程序、钉钉开发

ajax 调用 json-rpc ,参考代码:

function jsonRpc(url, fct_name, params, settings) {
return genericJsonRpc(fct_name, params, function(data) {
return $.ajax(url, _.extend({}, settings, {
url: url,
dataType: 'json',
type: 'POST',
data: JSON.stringify(data, time.date_to_utc),
contentType: 'application/json'
}));
});
}

后段登录验证:

@http.route('/dingtalk/authenticate', type='json', auth='none', methods=["GET", "POST"], csrf=False)
def authenticate(self, code, *args, **kw):
icp = request.env['ir.config_parameter']
access_token = icp.get_param('dingtalk.access_token', default='')
result = user.get_user_id(access_token, code)
if result[0]:
userid = result[1]['userid']

res_users = request.env['hr.employee'].sudo().search([('dingtalk_id', '=', userid)], limit=1)

if res_users:
res_user = res_users[0]
uid = request.session.authenticate(request.session.db, res_user.login, userid)
request.session.dingtalk_user_id = userid

if uid:
values = {'status': 'success', 'dingtalk_user_id': userid, 'uid': uid,
'session_id': request.session.sid}
else:
return json.dumps({'status': 'error', 'errmsg': u'您没有绑定Odoo用户,请联系管理员!'})
_logger.info("values:" + str(values))
return json.dumps(values)
else:
return json.dumps({'status': 'error', 'errmsg': u'您没有绑定Odoo用户,请联系管理员!'})

else:
return json.dumps({'result': 'error', 'errmsg': u'获取钉钉用户信息失败'})

传递sesseion ,保持登录状态,重点是传递X-Openerp-Session-Id :

export function authenticate() {

return new Promise(function (resolve, reject) {
getSessionId().then((res) => {
let Config = {
method: 'POST',
uri: APPHOST + "/dingtalk/authenticate",
type: 'json',
headers: {
'Content-Type': 'application/json',
'X-Openerp-Session-Id': res || '',
},
body: body_data
}
authCode().then(function (result) {
const code = result.code
const params = {
code: code
}
Config.body.params = params || Config.body.params
request(Config, function (error, res) {
if (!error && res.ok) {
const data = res.data
if (data.result) {
resolve(data);
} else {
reject(data)
}
}
});
})
});
})

}

后端获取session,确保在登录状态:

def setup_session(self, httprequest):
# recover or create session
session_gc(self.session_store)

sid = httprequest.args.get('session_id')
explicit_session = True
if not sid:
sid = httprequest.headers.get("X-Openerp-Session-Id")
if not sid:
sid = httprequest.cookies.get('session_id')
explicit_session = False
if sid is None:
httprequest.session = self.session_store.new()
else:
httprequest.session = self.session_store.get(sid)
return explicit_session

具有以上知识,你就可以愉快的调用Odoo里面的任何业务逻辑。

常用的调用方法:

@http.route('/web/dataset/search_read', type='json', auth="user")
def search_read(self, model, fields=False, offset=0, limit=False, domain=None, sort=None):
return self.do_search_read(model, fields, offset, limit, domain, sort)

def do_search_read(self, model, fields=False, offset=0, limit=False, domain=None
, sort=None):
""" Performs a search() followed by a read() (if needed) using the
provided search criteria

:param str model: the name of the model to search on
:param fields: a list of the fields to return in the result records
:type fields: [str]
:param int offset: from which index should the results start being returned
:param int limit: the maximum number of records to return
:param list domain: the search domain for the query
:param list sort: sorting directives
:returns: A structure (dict) with two keys: ids (all the ids matching
the (domain, context) pair) and records (paginated records
matching fields selection set)
:rtype: list
"""
Model = request.env[model]

records = Model.search_read(domain, fields,
offset=offset or 0, limit=limit or False, order=sort or False)
if not records:
return {
'length': 0,
'records': []
}
if limit and len(records) == limit:
length = Model.search_count(domain)
else:
length = len(records) + (offset or 0)
return {
'length': length,
'records': records
}

@http.route('/web/dataset/load', type='json', auth="user")
def load(self, model, id, fields):
value = {}
r = request.env[model].browse([id]).read()
if r:
value = r[0]
return {'value': value}

def call_common(self, model, method, args, domain_id=None, context_id=None):
return self._call_kw(model, method, args, {})

def _call_kw(self, model, method, args, kwargs):
check_method_name(method)
return call_kw(request.env[model], method, args, kwargs)

@http.route('/web/dataset/call', type='json', auth="user")
def call(self, model, method, args, domain_id=None, context_id=None):
return self._call_kw(model, method, args, {})

@http.route(['/web/dataset/call_kw', '/web/dataset/call_kw/<path:path>'], type='json', auth="user")
def call_kw(self, model, method, args, kwargs, path=None):
return self._call_kw(model, method, args, kwargs)

@http.route('/web/dataset/call_button', type='json', auth="user")
def call_button(self, model, method, args, domain_id=None, context_id=None):
action = self._call_kw(model, method, args, {})
if isinstance(action, dict) and action.get('type') != '':
return clean_action(action)
return False