对象存储

  • Python SDK

    最近更新时间:2017-09-01 11:52:03

    从 v5.0.0 版本开始,我们对 SDK 的内容进行了精简。所有管理操作,比如:创建/删除 bucket、为 bucket 绑定域名、设置数据处理的样式分隔符、新增数据处理样式等都去除了,统一建议到七牛开发者平台来完成。另外,此前服务端还有自己独有的上传 API,现在也推荐统一成基于客户端上传的工作方式。

    此 Python SDK 适用于2.6、2.7、3.3、3.4版本。

    安装

    • 直接安装:
    pip install qiniu
    或
    easy_install qiniu
    
    • 源码安装:
    #从Python SDK 下载地址下载源码
    tar xvzf python-sdk-$VERSION.tar.gz
    cd python-sdk-$VERSION
    python setup.py install
    

    初始化

    在使用SDK 前,您需要一对有效的 AccessKey 和 SecretKey 签名授权。

    可以通过如下步骤获得:

    1. 开通七牛开发者帐号
    2. 登录七牛开发者平台,查看 Access Key 和 Secret Key

    获取Access Key 和 Secret Key 后,调用如下两行代码进行初始化对接:

    from qiniu import Auth
    
    q = Auth(access_key, secret_key)
    


    上传相关


    上传流程

    为了尽可能地改善终端用户的上传体验,七牛云存储首创了客户端直传功能。更多信息请参阅业务流程

    上传代码:

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth, put_file, etag, urlsafe_base64_encode
    import qiniu.config
    
    #需要填写你的 Access Key 和 Secret Key
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    #构建鉴权对象
    q = Auth(access_key, secret_key)
    
    #要上传的空间
    bucket_name = 'Bucket_Name'
    
    #上传到七牛后保存的文件名
    key = 'my-python-logo.png';
    
    #生成上传 Token,可以指定过期时间等
    token = q.upload_token(bucket_name, key, 3600)
    
    #要上传文件的本地路径
    localfile = './sync/bbb.jpg'
    
    ret, info = put_file(token, key, localfile)
    print(info)
    assert ret['key'] == key
    assert ret['hash'] == etag(localfile)
    


    上传&回调

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth, put_file, etag,
    import qiniu.config
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    q = Auth(access_key, secret_key)
    
    bucket_name = 'Bucket_Name'
    
    key = 'my-python-logo.png';
    
    #上传文件到七牛后, 七牛将文件名和文件大小回调给业务服务器。
    policy={
     'callbackUrl':'http://your.domain.com/callback.php',
     'callbackBody':'filename=$(fname)&filesize=$(fsize)'
     }
    
    token = q.upload_token(bucket_name, key, 3600, policy)
    
    localfile = './sync/bbb.jpg'
    
    ret, info = put_file(token, key, localfile)
    print(info)
    assert ret['key'] == key
    assert ret['hash'] == etag(localfile)
    


    上传&预转持续化

    以视频转码为例:

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth, put_file, etag, urlsafe_base64_encode
    import qiniu.config
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    q = Auth(access_key, secret_key)
    
    bucket_name = 'Bucket_Name'
    
    key = 'my-python-logo.png'
    
    #设置转码参数
    fops = 'avthumb/mp4/s/640x360/vb/1.25m'
    
    #转码是使用的队列名称
    pipeline = 'abc'
    
    #可以对转码后的文件进行使用saveas参数自定义命名,当然也可以不指定文件会默认命名并保存在当前空间
    saveas_key = urlsafe_base64_encode('目标Bucket_Name:自定义文件key')
    fops = fops+'|saveas/'+saveas_key
    
    #在上传策略中指定
    policy={
      'persistentOps':fops,
      'persistentPipeline':pipeline
     }
    
    token = q.upload_token(bucket_name, key, 3600, policy)
    
    localfile = './sync/bbb.jpg'
    
    ret, info = put_file(token, key, localfile)
    print(info)
    assert ret['key'] == key
    assert ret['hash'] == etag(localfile)
    

    队列pipeline 请参阅创建私有队列;转码操作 具体参数请参阅音视频转码;saveas 请参阅处理结果另存

    Tips:上面的Demo只是针对视频转码功能,如果您需要使用比如音视频切片、视频截图、视频拼接等功能只需要修改上面 fops 后面的参数即可,如:fops = vframe/jpg/offset/1/w/480/h/360/rotate/90 就表示视频截图了。

    可以看到上传成功后的行为主要是由上传凭证中的 上传策略 来指定。其中 上传策略 可以指定的行为不止这些,具体请参阅 上传策略


    生成上传token

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    
    #需要填写你的 Access Key 和 Secret Key
    access_key = ''
    secret_key = ''
    
    #构建鉴权对象
    q = Auth(access_key, secret_key)
    
    #要上传的空间
    bucket_name = ''
    
    #上传到七牛后保存的文件名
    key = ''
    
    #生成上传 Token,可以指定过期时间等
    
    # 上传策略示例
    # https://developer.qiniu.com/kodo/manual/1206/put-policy
    policy = {
     # 'callbackUrl':'https://requestb.in/1c7q2d31',
     # 'callbackBody':'filename=$(fname)&filesize=$(fsize)'
     # 'persistentOps':'imageView2/1/w/200/h/200'
     }
    
    #3600为token过期时间,秒为单位。3600等于一小时
    token = q.upload_token(bucket_name, key, 3600, policy)
    
    print(token)
    


    数据处理

    触发持久化操作

    以视频转码为例:

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth, PersistentFop, build_op, op_save, urlsafe_base64_encode
    
    #对已经上传到七牛的视频发起异步转码操作 
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    q = Auth(access_key, secret_key)
    
    #要转码的文件所在的空间和文件名。
    bucket = 'Bucket_Name'
    key = '1.mp4'
    
    #转码是使用的队列名称。
    pipeline = 'mpsdemo'
    
    #要进行转码的转码操作。
    fops = 'avthumb/mp4/s/640x360/vb/1.25m'
    
    #可以对转码后的文件进行使用saveas参数自定义命名,当然也可以不指定文件会默认命名并保存在当前空间
    saveas_key = urlsafe_base64_encode('目标Bucket_Name:自定义文件key')
    fops = fops+'|saveas/'+saveas_key
    
    pfop = PersistentFop(q, bucket, pipeline)
    ops = []
    ops.append(fops)
    ret, info = pfop.execute(key, ops, 1)
    print(info)
    assert ret['persistentId'] is not None
    

    Tips:上面的Demo只是针对视频转码功能,如果您需要使用比如音视频切片、视频截图、视频拼接等功能只需要修改上面 fops 后面的参数即可,如:fops = vframe/jpg/offset/1/w/480/h/360/rotate/90 就表示视频截图了。

    但这个只是将转码这个耗时的操作提交到队列中,要想知道转码操作现在的状态, 需要根据返回的 persitentId 进行查询,查询接口。如果您不方便持续轮询每个异步处理的进度和状态,七牛可以异步处理完成后通知您们的业务服务器。这样就需要您在视频转码的例子中,初始化 PersistentFop 时添加上 notifyUrl , 来通知您们的业务服务器。


    下载相关

    生成时间戳防盗链

    # -*- coding: utf-8 -*-
    
    """
    获取一个配置时间戳防盗链的url
    """
    
    from qiniu.services.cdn.manager import create_timestamp_anti_leech_url
    import time
    
    host = 'http://a.example.com'
    
    # 配置时间戳时指定的key
    encrypt_key = ''
    
    # 资源路径
    file_name = 'a/b/c/example.jpeg'
    
    # 查询字符串,不需要加?
    query_string = ''
    
    # 截止日期的时间戳,秒为单位,3600为当前时间一小时之后过期
    deadline = int(time.time())+3600
    
    
    timestamp_url = create_timestamp_anti_leech_url(host, file_name, query_string, encrypt_key, deadline)
    
    print(timestamp_url)
    


    私有空间下载

    # -*- coding: utf-8 -*-
    # flake8: noqa
    import requests
    
    from qiniu import Auth
    
    access_key = 'AK'
    secret_key = 'SK'
    
    q = Auth(access_key, secret_key)
    
    #有两种方式构造base_url的形式
    base_url = 'http://%s/%s' % (bucket_domain, key)
    
    #或者直接输入url的方式下载
    base_url = 'http://domain/key'
    
    #可以设置token过期时间
    private_url = q.private_download_url(base_url, expires=3600)
    
    print(private_url)
    r = requests.get(private_url)
    assert r.status_code == 200
    


    资源管理

    资源管理包括的主要功能有:

    获取文件信息

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    #初始化Auth状态
    q = Auth(access_key, secret_key)
    
    #初始化BucketManager
    bucket = BucketManager(q)
    
    #你要测试的空间, 并且这个key在你空间中存在
    bucket_name = 'Bucket_Name'
    key = 'python-logo.png'
    
    #获取文件的状态信息
    ret, info = bucket.stat(bucket_name, key)
    print(info)
    assert 'hash' in ret
    


    获取指定前缀文件列表

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    q = Auth(access_key, secret_key)
    bucket = BucketManager(q)
    
    bucket_name = 'Bucket_Name'
    # 前缀
    prefix = None
    # 列举条目
    limit = 10
    # 列举出除'/'的所有文件以及以'/'为分隔的所有前缀
    delimiter = None
    # 标记
    marker = None
    
    ret, eof, info = bucket.list(bucket_name, prefix, marker, limit, delimiter)
    
    print(info)
    
    assert len(ret.get('items')) is not None
    


    抓取网络资源到空间

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    bucket_name = 'Bucket_Name'
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    url = 'http://7xr875.com1.z0.glb.clouddn.com/xxx.jpg'
    
    key = 'xxx.jpg'
    
    ret, info = bucket.fetch(url, bucket_name, key)
    print(info)
    assert ret['key'] == key
    


    抓取镜像源文件到空间

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    
    bucket_name = 'Bucket_Name'
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    # 要拉取的文件名
    key = 'test.jpg'
    
    ret, info = bucket.prefetch(bucket_name, key)
    print(info)
    assert ret['key'] == key
    


    移动单个文件

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    #初始化Auth状态
    q = Auth(access_key, secret_key)
    
    #初始化BucketManager
    bucket = BucketManager(q)
    
    #你要测试的空间, 并且这个key在你空间中存在
    bucket_name = 'Bucket_Name'
    key = 'python-logo.png'
    
    #将文件从文件key 移动到文件key2,可以实现文件的重命名 可以在不同bucket移动
    key2 = 'python-logo2.png'
    
    ret, info = bucket.move(bucket_name, key, bucket_name, key2)
    print(info)
    assert ret == {}
    


    复制单个文件

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    #初始化Auth状态
    q = Auth(access_key, secret_key)
    
    #初始化BucketManager
    bucket = BucketManager(q)
    
    #你要测试的空间, 并且这个key在你空间中存在
    bucket_name = 'Bucket_Name'
    key = 'python-logo.png'
    
    #将文件从文件key 复制到文件key2。 可以在不同bucket复制
    key2 = 'python-logo2.png'
    
    ret, info = bucket.copy(bucket_name, key, bucket_name, key2)
    print(info)
    assert ret == {}
    


    删除单个文件

    # -*- coding: utf-8 -*-
    # flake8: noqa
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = 'Access_Key'
    secret_key = 'Secret_Key'
    
    #初始化Auth状态
    q = Auth(access_key, secret_key)
    
    #初始化BucketManager
    bucket = BucketManager(q)
    
    #你要测试的空间, 并且这个key在你空间中存在
    bucket_name = 'Bucket_Name'
    key = 'python-logo.png'
    
    #删除bucket_name 中的文件 key
    ret, info = bucket.delete(bucket_name, key)
    print(info)
    assert ret == {}
    


    设置或更新文件生存时间

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    #初始化Auth状态
    q = Auth(access_key, secret_key)
    
    #初始化BucketManager
    bucket = BucketManager(q)
    
    #你要测试的空间, 并且这个key在你空间中存在
    bucket_name = 'Bucket_Name'
    key = 'python-test.png'
    
    #您要更新的生命周期
    days = '5'
    
    ret, info = bucket.delete_after_days(bucket_name, key, days)
    print(info)
    


    修改文件存储类型

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    bucket_name = 'Bucket_Name'
    
    key = '...'
    
    ret, info = bucket.change_type(bucket_name, key ,1)#1表示低频存储,0是标准存储
    
    print(info)
    


    修改资源元信息

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager
    
    access_key = '...'
    secret_key = '...'
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    bucket_name = 'Bucket_Name'
    
    key = '...'
    
    # 将一个文件的元信息修改为jpg
    ret, info = bucket.change_mime(bucket_name, key, 'image/jpg')
    print(info)
    


    批量操作

    # -*- coding: utf-8 -*-
    # flake8: noqa
    
    from qiniu import Auth
    from qiniu import BucketManager,build_batch_rename
    from qiniu import build_batch_copy
    from qiniu import build_batch_move
    
    access_key = '...'
    secret_key = '...'
    
    # 初始化Auth状态
    q = Auth(access_key, secret_key)
    
    # 初始化BucketManager
    bucket = BucketManager(q)
    keys = {'123.jpg':'123.jpg'}
    
    # ops = build_batch_copy( 'teest', keys, 'teest',force='true')
    # ops = build_batch_move('teest', keys, 'teest', force='true')
    ops = build_batch_rename('teest', keys,force='true')
    
    ret, info = bucket.batch(ops)
    print(ret)
    print(info)
    assert ret == {}
    


    批量查询文件信息

    # -*- coding: utf-8 -*-
    """
    批量查询文件信息
    
    https://developer.qiniu.com/kodo/api/1250/batch
    """
    
    
    from qiniu import build_batch_stat, Auth, BucketManager
    
    access_key = ''
    
    secret_key = ''
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    bucket_name = ''
    
    # 需要查询的文件名
    keys = ['1.gif', '2.txt', '3.png', '4.html']
    
    ops = build_batch_stat(bucket_name, keys)
    ret, info = bucket.batch(ops)
    print(info)
    


    批量重命名文件

    # -*- coding: utf-8 -*-
    """
    批量重命名文件
    
    https://developer.qiniu.com/kodo/api/1250/batch
    """
    
    
    from qiniu import build_batch_rename, Auth, BucketManager
    
    access_key = ''
    
    secret_key = ''
    
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    bucket_name = ''
    
    
    # force为true时强制同名覆盖, 字典的键为原文件,值为目标文件
    ops = build_batch_rename(bucket_name, {'src_key1': 'target_key1', 'src_key2': 'target_key2'}, force='true')
    ret, info = bucket.batch(ops)
    print(info)
    


    批量移动文件

    # -*- coding: utf-8 -*-
    """
    批量移动文件
    
    https://developer.qiniu.com/kodo/api/1250/batch
    """
    
    
    from qiniu import build_batch_move, Auth, BucketManager
    
    access_key = ''
    
    secret_key = ''
    
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    src_bucket_name = ''
    
    target_bucket_name = ''
    
    # force为true时强制同名覆盖, 字典的键为原文件,值为目标文件
    ops = build_batch_move(src_bucket_name, {'src_key1': 'target_key1', 'src_key2': 'target_key2'}, target_bucket_name, force='true')
    ret, info = bucket.batch(ops)
    print(info)
    


    批量删除文件

    # -*- coding: utf-8 -*-
    """
    批量删除文件
    
    https://developer.qiniu.com/kodo/api/1250/batch
    """
    
    
    from qiniu import build_batch_delete, Auth, BucketManager
    
    access_key = ''
    
    secret_key = ''
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    bucket_name = ''
    
    keys = ['1.gif', '2.txt', '3.png', '4.html']
    
    ops = build_batch_delete(bucket_name, keys)
    ret, info = bucket.batch(ops)
    print(info)
    


    批量复制文件

    # -*- coding: utf-8 -*-
    """
    批量拷贝文件
    
    https://developer.qiniu.com/kodo/api/1250/batch
    """
    
    
    from qiniu import build_batch_copy, Auth, BucketManager
    
    access_key = ''
    
    secret_key = ''
    
    q = Auth(access_key, secret_key)
    
    bucket = BucketManager(q)
    
    src_bucket_name = ''
    
    target_bucket_name = ''
    
    # force为true时强制同名覆盖, 字典的键为原文件,值为目标文件
    ops = build_batch_copy(src_bucket_name, {'src_key1': 'target_key1', 'src_key2': 'target_key2'}, target_bucket_name, force='true')
    ret, info = bucket.batch(ops)
    print(info)
    


    CDN相关


    刷新节点资源

    # -*- coding: utf-8 -*-
    import qiniu
    from qiniu import CdnManager
    
    
    
    # 账户ak,sk
    access_key = '...'
    secret_key = '...'
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    # 需要刷新的文件链接
    urls = [
        'http://aaa.example.com/a.gif',
        'http://bbb.example.com/b.jpg'
    ]
    
    # 刷新链接
    refresh_url_result = cdn_manager.refresh_urls(urls)
    print(refresh_url_result)
    


    刷新整个目录下的节点资源

    # -*- coding: utf-8 -*-
    import qiniu
    from qiniu import CdnManager
    
    
    
    # 账户ak,sk
    access_key = '...'
    secret_key = '...'
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    # 需要刷新的目录链接
    dirs = [
        'http://aaa.example.com/doc/img/',
        'http://bbb.example.com/doc/video/'
    ]
    
    # 刷新链接
    refresh_dir_result = cdn_manager.refresh_dirs(dirs)
    


    预取资源到cdn结点

    # -*- coding: utf-8 -*-
    """
    预取资源到cdn节点
    
    https://developer.qiniu.com/fusion/api/1227/file-prefetching
    """
    
    import qiniu
    from qiniu import CdnManager
    
    
    # 账户ak,sk
    access_key = '...'
    secret_key = '...'
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    # 需要刷新的文件链接
    urls = [
        'http://aaa.example.com/doc/img/',
        'http://bbb.example.com/doc/video/'
    ]
    
    # 刷新链接
    refresh_dir_result = cdn_manager.prefetch_urls(urls)
    


    获取指定域名指定时间内的日志链接

    # -*- coding: utf-8 -*-
    """
    获取指定域名指定时间内的日志链接
    """
    import qiniu
    from qiniu import CdnManager
    
    
    # 账户ak,sk
    access_key = ''
    secret_key = ''
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    log_date = '2017-07-20'
    
    urls = [
        'a.example.com',
        'b.example.com'
    ]
    ret, info = cdn_manager.get_log_list_data(urls, log_date)
    
    print(ret)
    print(info)
    


    获取指定域名指定时间段内的流量

    # -*- coding: utf-8 -*-
    
    import qiniu
    from qiniu import CdnManager
    
    
    # 账户ak,sk
    access_key = ''
    secret_key = ''
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    startDate = '2017-07-20'
    
    endDate = '2017-08-20'
    
    granularity = 'day'
    
    urls = [
        'a.example.com',
        'b.example.com'
    ]
    
    # 获得指定域名流量
    ret, info = cdn_manager.get_flux_data(urls, startDate, endDate, granularity)
    
    print(ret)
    print(info)
    


    获取指定域名指定时间段内的带宽

    # -*- coding: utf-8 -*-
    
    import qiniu
    from qiniu import CdnManager
    
    
    # 账户ak,sk
    access_key = ''
    secret_key = ''
    
    auth = qiniu.Auth(access_key=access_key, secret_key=secret_key)
    cdn_manager = CdnManager(auth)
    
    startDate = '2017-07-20'
    
    endDate = '2017-08-20'
    
    granularity = 'day'
    
    urls = [
        'a.example.com',
        'b.example.com'
    ]
    
    ret, info = cdn_manager.get_bandwidth_data(urls, startDate, endDate, granularity)
    
    print(ret)
    print(info)
    

    API参考手册

    常见问题

    • 第二个参数 info 保留了请求响应的信息,失败情况下 ret 为 none ,将 info 打印出来,提交给我们。
    • API 的使用 demo 可以参考 单元测试
    • 如果碰到ImportError: No module named requests.auth 请安装 requests

    相关资源

    如果您有任何关于我们文档或产品的建议和想法,欢迎到我们的技术论坛参与讨论。

    • 技术论坛 - 在这里您可以和其他开发者愉快的讨论如何更好的使用七牛云服务
    • 提交工单 - 如果您的问题不适合在论坛讨论或得不到回答,您也可以提交一个工单,技术支持人员会尽快回复您
    • 博客 - 这里会持续发布市场活动和技术分享文章
    • 微博
    • 常见问题

    贡献代码

    1. Fork

    2. 创建您的特性分支 git checkout -b my-new-feature

    3. 提交您的改动 git commit -am 'Added some feature'

    4. 将您的修改记录提交到远程 git 仓库 git push origin my-new-feature

    5. 然后到 github 网站的该 git 远程仓库的 my-new-feature 分支下发起 Pull Request

    许可证

    Copyright (c) 2014 qiniu.com

    基于 MIT 协议发布:

    以上内容是否对您有帮助?
  • 提交工单