对象存储

  • 对象存储 > 常见问题 > python 异步 fetch demo

    python 异步 fetch demo

    最近更新时间: 2019-02-22 10:40:19

    python2 版本

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    import requests
    import json
    import hmac
    from hashlib import sha1
    from base64 import urlsafe_b64encode
    from urlparse import urlparse
    
    """
    1. 该demo 是基于 python2.7 的
    2. 客户需要依赖 requests 库,需要 install
    3. 客户需要填写如下参数
        a. 抓取到七牛空间的 URL
        b. 抓取后保存到七牛空间的 key
        c. 抓取的目标 bucket
        d. 回调的 URL(必填项可以写一个无效 404 的地址)
        e. 回调的 body(这个如果没有的话也可以写空串)
    4. sohuvideo 是华东空间,zone 是 z0;sohumedia 是华北空间,zone 是 z1
    
    """
    
    access_key = '...'
    secret_key = '...'
    
    def urlsafe_base64_encode(data):
        """urlsafe的base64编码:
    
        对提供的数据进行urlsafe的base64编码。规格参考:
        https://developer.qiniu.com/kodo/manual/1231/appendix#1
    
        Args:
            data: 待编码的数据,一般为字符串
    
        Returns:
            编码后的字符串
        """
        ret = urlsafe_b64encode(bytes(data))
        return bytes(ret)
    
    def __token(data):
        data = bytes(data)
        hashed = hmac.new(bytes(secret_key), data, sha1)
        return urlsafe_base64_encode(hashed.digest())
    
    
    def token_of_request(url, body=None, content_type=None):
        """带请求体的签名(本质上是管理凭证的签名)
    
        Args:
            url:          待签名请求的url
            body:         待签名请求的body
            content_type: 待签名请求的body的Content-Type
    
        Returns:
            管理凭证
        """
        parsed_url = urlparse(url)
        query = parsed_url.query
        path = parsed_url.path
        data = path
        if query != '':
            data = ''.join([data, '?', query])
        data = ''.join([data, "\n"])
    
        if body:
            mimes = [
                'application/json'
            ]
            if content_type in mimes:
                data += body
    
        return '{0}:{1}'.format(access_key, __token(data))
    
    
    bucket_name = 'bucket_name'
    
    fetch_url = 'http://xxx.abc.com/123.kpg'
    
    key = '123.jpg'
    
    callbackurl = 'https://xxx.xxx.cn/callback'
    
    callbackbody = '$(bucket)'
    
    
    # z0为华东存储区域,z1为华北存储区域,z2为华南存储区域,na0为北美存储区域
    zone = 'z0'
    
    token = token_of_request("http://api-{}.qiniu.com/sisyphus/fetch".format(zone))
    
    
    url = 'http://api-{}.qiniu.com/sisyphus/fetch'.format(zone)
    s = json.dumps({'bucket': bucket_name, 'url': fetch_url,
                    'key': key, 'callbackurl': callbackurl,
                    'callbackbody': callbackbody})
    headers = {"Authorization": "QBox {}".format(token)}
    r = requests.post(url, data=s, headers=headers)
    print(r.headers)
    print(r.status_code)
    print(r.content)
    print("finished")
    
    



    python3版本

    # -*- coding: utf-8 -*-
    
    import requests
    import json
    import hmac
    from hashlib import sha1
    from base64 import urlsafe_b64encode
    from urllib.parse import urlparse
    
    
    access_key = '...'
    secret_key = '...'
    
    def urlsafe_base64_encode(data):
        """urlsafe的base64编码:
    
        对提供的数据进行urlsafe的base64编码。规格参考:
        https://developer.qiniu.com/kodo/manual/1231/appendix#1
    
        Args:
            data: 待编码的数据,一般为字符串
    
        Returns:
            编码后的字符串
        """
        ret = urlsafe_b64encode(b(data))
        return s(ret)
    
    def s(data):
        if isinstance(data, bytes):
            data = data.decode('utf-8')
        return data
    
    def b(data):
        if isinstance(data, str):
            return data.encode('utf-8')
        return data
    
    def __token(data):
        data = b(data)
        hashed = hmac.new(b(secret_key), data, sha1)
        return urlsafe_base64_encode(hashed.digest())
    
    
    def token_of_request(url, body=None, content_type=None):
        """带请求体的签名(本质上是管理凭证的签名)
    
        Args:
            url:          待签名请求的url
            body:         待签名请求的body
            content_type: 待签名请求的body的Content-Type
    
        Returns:
            管理凭证
        """
        parsed_url = urlparse(url)
        query = parsed_url.query
        path = parsed_url.path
        data = path
        if query != '':
            data = ''.join([data, '?', query])
        data = ''.join([data, "\n"])
    
        if body:
            mimes = [
                'application/x-www-form-urlencoded'
            ]
            if content_type in mimes:
                data += body
    
        return '{0}:{1}'.format(access_key, __token(data))
    
    
    
    
    
    
    bucket_name = ''
    
    fetch_url = 'http://xxx.xxx.com/123.jpg'
    
    key = '123.jpg'
    
    callbackurl = 'https://xxx.xxx.com/callback'
    
    callbackbody = '$(bucket)'
    
    callbackbodytype = None
    
    file_type = None
    
    md5 = None
    
    # z0为华东存储区域,z1为华北存储区域,z2为华南存储区域,na0为北美存储区域
    zone = 'z0'
    
    token = token_of_request("http://api-{}.qiniu.com/sisyphus/fetch".format(zone))
    
    
    url = 'http://api-{}.qiniu.com/sisyphus/fetch'.format(zone)
    s = json.dumps({'bucket': bucket_name, 'url': fetch_url,
                    'key': key, 'callbackurl': callbackurl,
                    'callbackbody': callbackbody})
    headers = {"Authorization": "QBox {}".format(token)}
    r = requests.post(url, data=s, headers=headers)
    print(r.headers)
    print(r.content)
    print(r.status_code)
    print("finished")
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close