python 异步 fetch demo

最近更新时间: 2019-02-22 10:40:19

python2 版本

# -*- coding: utf-8 -*-
# flake8: noqa

import requests
import json
import hmac
from hashlib import sha1
from base64 import urlsafe_b64encode
from urlparse import urlparse

"""
1. 该demo 是基于 python2.7 的
2. 客户需要依赖 requests 库,需要 install
3. 客户需要填写如下参数
    a. 抓取到七牛空间的 URL
    b. 抓取后保存到七牛空间的 key
    c. 抓取的目标 bucket
    d. 回调的 URL(必填项可以写一个无效 404 的地址)
    e. 回调的 body(这个如果没有的话也可以写空串)
4. sohuvideo 是华东空间,zone 是 z0;sohumedia 是华北空间,zone 是 z1

"""

access_key = '...'
secret_key = '...'

def urlsafe_base64_encode(data):
    """urlsafe的base64编码:

    对提供的数据进行urlsafe的base64编码。规格参考:
    https://developer.qiniu.com/kodo/manual/1231/appendix#1

    Args:
        data: 待编码的数据,一般为字符串

    Returns:
        编码后的字符串
    """
    ret = urlsafe_b64encode(bytes(data))
    return bytes(ret)

def __token(data):
    data = bytes(data)
    hashed = hmac.new(bytes(secret_key), data, sha1)
    return urlsafe_base64_encode(hashed.digest())


def token_of_request(url, body=None, content_type=None):
    """带请求体的签名(本质上是管理凭证的签名)

    Args:
        url:          待签名请求的url
        body:         待签名请求的body
        content_type: 待签名请求的body的Content-Type

    Returns:
        管理凭证
    """
    parsed_url = urlparse(url)
    query = parsed_url.query
    path = parsed_url.path
    data = path
    if query != '':
        data = ''.join([data, '?', query])
    data = ''.join([data, "\n"])

    if body:
        mimes = [
            'application/json'
        ]
        if content_type in mimes:
            data += body

    return '{0}:{1}'.format(access_key, __token(data))


bucket_name = 'bucket_name'

fetch_url = 'http://xxx.abc.com/123.kpg'

key = '123.jpg'

callbackurl = 'https://xxx.xxx.cn/callback'

callbackbody = '$(bucket)'


# z0为华东存储区域,z1为华北存储区域,z2为华南存储区域,na0为北美存储区域
zone = 'z0'

token = token_of_request("http://api-{}.qiniu.com/sisyphus/fetch".format(zone))


url = 'http://api-{}.qiniu.com/sisyphus/fetch'.format(zone)
s = json.dumps({'bucket': bucket_name, 'url': fetch_url,
                'key': key, 'callbackurl': callbackurl,
                'callbackbody': callbackbody})
headers = {"Authorization": "QBox {}".format(token)}
r = requests.post(url, data=s, headers=headers)
print(r.headers)
print(r.status_code)
print(r.content)
print("finished")



python3版本

# -*- coding: utf-8 -*-

import requests
import json
import hmac
from hashlib import sha1
from base64 import urlsafe_b64encode
from urllib.parse import urlparse


access_key = '...'
secret_key = '...'

def urlsafe_base64_encode(data):
    """urlsafe的base64编码:

    对提供的数据进行urlsafe的base64编码。规格参考:
    https://developer.qiniu.com/kodo/manual/1231/appendix#1

    Args:
        data: 待编码的数据,一般为字符串

    Returns:
        编码后的字符串
    """
    ret = urlsafe_b64encode(b(data))
    return s(ret)

def s(data):
    if isinstance(data, bytes):
        data = data.decode('utf-8')
    return data

def b(data):
    if isinstance(data, str):
        return data.encode('utf-8')
    return data

def __token(data):
    data = b(data)
    hashed = hmac.new(b(secret_key), data, sha1)
    return urlsafe_base64_encode(hashed.digest())


def token_of_request(url, body=None, content_type=None):
    """带请求体的签名(本质上是管理凭证的签名)

    Args:
        url:          待签名请求的url
        body:         待签名请求的body
        content_type: 待签名请求的body的Content-Type

    Returns:
        管理凭证
    """
    parsed_url = urlparse(url)
    query = parsed_url.query
    path = parsed_url.path
    data = path
    if query != '':
        data = ''.join([data, '?', query])
    data = ''.join([data, "\n"])

    if body:
        mimes = [
            'application/x-www-form-urlencoded'
        ]
        if content_type in mimes:
            data += body

    return '{0}:{1}'.format(access_key, __token(data))






bucket_name = ''

fetch_url = 'http://xxx.xxx.com/123.jpg'

key = '123.jpg'

callbackurl = 'https://xxx.xxx.com/callback'

callbackbody = '$(bucket)'

callbackbodytype = None

file_type = None

md5 = None

# z0为华东存储区域,z1为华北存储区域,z2为华南存储区域,na0为北美存储区域
zone = 'z0'

token = token_of_request("http://api-{}.qiniu.com/sisyphus/fetch".format(zone))


url = 'http://api-{}.qiniu.com/sisyphus/fetch'.format(zone)
s = json.dumps({'bucket': bucket_name, 'url': fetch_url,
                'key': key, 'callbackurl': callbackurl,
                'callbackbody': callbackbody})
headers = {"Authorization": "QBox {}".format(token)}
r = requests.post(url, data=s, headers=headers)
print(r.headers)
print(r.content)
print(r.status_code)
print("finished")
以上内容是否对您有帮助?
  • Qvm free helper
    Close