时间:2022-04-23 08:46:03 | 栏目:Python代码 | 点击:次
签名
import base64 import json import time from datetime import datetime import hmac from hashlib import sha1 access_key_id = '' # 请填写您的AccessKeySecret。 access_key_secret = '' # host的格式为 bucketname.endpoint ,请替换为您的真实信息。 host = '' # callback_url为 上传回调服务器的URL,请将下面的IP和Port配置为您自己的真实信息。 callback_url = "" # 用户上传文件时指定的前缀。 upload_dir = 'user-dir-prefix/' expire_time = 1200 expire_syncpoint = int(time.time() + expire_time) policy_dict = { 'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'Z', 'conditions': [ {"bucket": "test-paige"}, ['starts-with', '$key', 'user/test/'] ] } policy = json.dumps(policy_dict).strip() policy_encode = base64.b64encode(policy.encode()) signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest()) callback_dict = { 'callbackUrl': callback_url, 'callbackBody': 'filename=${object}&size=${size}&mimeType=${mimeType}&height=${imageInfo.height}&width=${' 'imageInfo.width}', 'callbackBodyType': 'application/json' } callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode() var = { 'accessid': access_key_id, 'host': host, 'policy': policy_encode.decode(), 'signature': signature.decode().strip(), 'expire': expire_syncpoint, 'callback': callback }
回调验签
import asyncio import base64 import time import aiomysql import rsa from aiohttp import web, ClientSession from urllib import parse import uuid def success(msg='', data=None): if data is None: data = {} dict_data = { 'code': 1, 'msg': msg, 'data': data } return web.json_response(dict_data) def failed(msg='', data=None): if data is None: data = {} dict_data = { 'code': 0, 'msg': msg, 'data': data } return web.json_response(dict_data) async def handle(request): """ 获取连接池 :param web.BaseRequest request: :return: """ authorization_base64 = request.headers['authorization'] x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url'] pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode()) authorization = base64.b64decode(authorization_base64.encode()) path = request.path async with ClientSession() as session: async with session.get(pub_key_url.decode()) as resp: pub_key_body = await resp.text() pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(pub_key_body.encode()) body = await request.content.read() auth_str = parse.unquote(path) + '\n' + body.decode() parse_url = parse.parse_qs(body.decode()) print(parse_url) try: rsa.verify(auth_str.encode(), authorization, pubkey) pool = request.app['mysql_pool'] async with pool.acquire() as conn: async with conn.cursor() as cur: id = str(uuid.uuid4()) url = parse_url['filename'][0] mime = parse_url['mimeType'][0] disk = 'oss' time_at = time.strftime("%Y-%m-%d %H:%I:%S", time.localtime()) sql = "INSERT INTO media(id,url,mime,disk,created_at,updated_at) VALUES(%s,%s,%s,%s,%s,%s)" await cur.execute(sql, (id, url, mime, disk, time_at, time_at)) await conn.commit() dict_data = { 'id': id, 'url': url, 'cdn_url': 'https://cdn.***.net' + '/' + url, 'mime': mime, 'disk': disk, 'created_at': time_at, 'updated_at': time_at, } return success(data=dict_data) except rsa.pkcs1.VerificationError: return failed(msg='验证错误') async def init(loop): # 创建连接池 mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='', password='', db='', loop=loop) async def on_shutdown(application): """ 接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功. :param web.Application application: :return: """ application['mysql_pool'].close() # 没有下面这句话会报错 RuntimeError: Event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错 await application['mysql_pool'].wait_closed() application = web.Application() application.on_shutdown.append(on_shutdown) # 把连接池放到 application 实例中 application['mysql_pool'] = mysql_pool application.add_routes([web.get('/', handle), web.post('/oss', handle)]) return application if __name__ == '__main__': loop = asyncio.get_event_loop() application = loop.run_until_complete(init(loop)) web.run_app(application, host='127.0.0.1') loop.close()