API 使用说明
推理 API 支持功能
- 联网搜索推理
- 图片文字识别推理
- 语音文字识别
- 文字生成语音
API 介绍说明
只需调用七牛云的推理 API 接口(兼容 openai 库),就可以将 DeepSeek-R1/V3 的 AI 能力快速集成到您的业务和应用中。
获取接口密钥
支持接口列表
接口名 | 说明 |
---|---|
/v1/models | 列举可用模型 |
/v1/chat/completions | 对话型推理接口,支持联网搜索、图片文字识别 图片格式:支持 JPG、JPEG、PNG、BMP、PDF 等常见格式,建议使用 JPG 格式 |
/v1/voice/asr | 语音文字识别 |
/v1/voice/tts | 文字生成语音 |
HTTP 调用示例
使用上一步获取的LLM API KEY 调用 chat 接口,支持模型:deepseek-r1 和 deepseek-v3 例如:
# 调用文本摘要 API
export LLM_API_KEY="<你的 LLM API KEY>"
curl https://api.qnaigc.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LLM_API_KEY" \
-d '{
"messages": [{"role": "user", "content": "七牛云提供 GPU 云产品能用于哪些场景?"}],
"model": "deepseek-v3",
"stream": true
}'
Python openai 库代码示例
- 流式调用
from openai import OpenAI
url = 'https://api.qnaigc.com/v1/'
llm_api_key = 'your llm_api_key'
client = OpenAI(
base_url=url,
api_key=llm_api_key
)
# 发送带有流式输出的请求
content = ""
messages = [
{"role": "user", "content": "七牛云提供 GPU 云产品能用于哪些场景?"}
]
response = client.chat.completions.create(
model="deepseek-v3",
messages=messages,
stream=True, # 启用流式输出
max_tokens=4096
)
# 逐步接收并处理响应
for chunk in response:
if chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
print(content)
# Round 2
messages.append({"role": "assistant", "content": content})
messages.append({'role': 'user', 'content': "继续"})
response = client.chat.completions.create(
model="deepseek-v3",
messages=messages,
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
print(content)
- 非流式调用
from openai import OpenAI
url = 'https://api.qnaigc.com/v1/'
llm_api_key = 'your llm_api_key'
client = OpenAI(
base_url=url,
api_key=llm_api_key
)
# 发送非流式输出的请求
messages = [
{"role": "user", "content": "七牛云提供 GPU 云产品能用于哪些场景?"}
]
response = client.chat.completions.create(
model="deepseek-v3",
messages=messages,
stream=False,
max_tokens=4096
)
content = response.choices[0].message.content
print(content)
# Round 2
messages.append({"role": "assistant", "content": content})
messages.append({'role': 'user', 'content': "继续"})
response = client.chat.completions.create(
model="deepseek-v3",
messages=messages,
stream=False
)
content = response.choices[0].message.content
print(content)
联网搜索推理
API 支持联网功能,为保持对 OpenAI 的兼容,联网功能通过在模型后面加”?search”来开启。例如:
# 调用文本摘要 API
export LLM_API_KEY="<你的 LLM API KEY>"
curl https://api.qnaigc.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LLM_API_KEY" \
-d '{
"messages": [{"role": "user", "content": "七牛云提供 GPU 云产品能用于哪些场景?"}],
"model": "deepseek-v3?search"
}'
图片文字识别推理
API 已经支持图片文字识别推理,同时接口依然兼容 OpenAI, 示例代码如下:
# 调用文本摘要 API
export LLM_API_KEY="<你的 LLM API KEY>"
curl https://api.qnaigc.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LLM_API_KEY" \
-d '{
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "请帮我整理图片中的内容并整理一份报告给我."
},
{
"type": "image_url",
"image_url": {
"url": "https://www.qiniu.com/qiniu_ai_token_snapshot.png"
}
}
]
}
],
"model": "deepseek-v3"
}'
- 注意:image_url 指向的文件大小不能超过 8MB
- 文件格式:支持 JPG、JPEG、PNG、BMP、PDF 等常见格式,建议使用 JPG 格式
如何获取图片上传地址?
推荐使用七牛对象存储来存储文件并获取在线访问地址,提供给模型来进行识别和推理。
可以通过 curl 快速进行上传测试:
curl $你的空间区域上传地址 \
-F "file=@你的文件路径" \
-F "token=$你的上传 token" \
-F "key=你的文件名" \
或者在代码中使用 python 来实现。
import requests
# 目标 URL
url = "空间区域上传地址"
# 文件字段
files = {
"file": open("上传的文件地址", "rb") # 以二进制模式打开文件
}
# 其他表单字段
data = {
"key": "保存的文件名称",
"token": "你的上传 token"
}
# 发送 POST 请求
response = requests.post(url, files=files, data=data)
# 输出响应
print("Status Code:", response.status_code)
print("Response Body:", response.text)
我们还提供了许多语言的上传 SDK,简单易用,欢迎查看我们的【SDK 中心】来了解。
更多对象存储信息欢迎参考对象存储的【产品使用文档】。
图片文字识别
- 支持图片、PDF文档输入,精准识别文字;
- 超低延迟响应,无调用次数限制;
- 本接口的输出可以作为AI推理接口的输入文本。
- API使用方式:
curl --location 'https://api.qnaigc.com/v1/images/ocr' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-xxxx' \
--data '{
"model":"ocr",
"url":"https://idh.qnaigc.com/ocrtest.png"
}'
语音文字识别
- 支持中英等多语种语音到文字的识别,嘈杂环境识别准确率超95%;
- 音频容器格式支持: raw / wav / mp3 / ogg;
- ASR接口的输出可以作为AI推理的输入文本。
- API使用方式:
curl --location 'https://api.qnaigc.com/v1/voice/asr' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-xxx' \
--data '{
"model":"asr",
"audio": {
"format": "mp3",
"url": "http://idh.qnaigc.com/voicetest.mp3"
}
}'
Python 示例
import asyncio
import gzip
import json
import time
import uuid
import websockets
import pyaudio
# -------------------- 协议相关常量和函数 --------------------
PROTOCOL_VERSION = 0b0001
# Message Types
FULL_CLIENT_REQUEST = 0b0001
AUDIO_ONLY_REQUEST = 0b0010
FULL_SERVER_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111
# Message Type Specific Flags
NO_SEQUENCE = 0b0000
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_WITH_SEQUENCE = 0b0011
# 序列化和压缩方式
NO_SERIALIZATION = 0b0000
JSON_SERIALIZATION = 0b0001
NO_COMPRESSION = 0b0000
GZIP_COMPRESSION = 0b0001
def generate_header(message_type=FULL_CLIENT_REQUEST,
message_type_specific_flags=NO_SEQUENCE,
serial_method=JSON_SERIALIZATION,
compression_type=GZIP_COMPRESSION,
reserved_data=0x00):
header = bytearray()
header_size = 1
header.append((PROTOCOL_VERSION << 4) | header_size)
header.append((message_type << 4) | message_type_specific_flags)
header.append((serial_method << 4) | compression_type)
header.append(reserved_data)
return header
def generate_before_payload(sequence: int):
before_payload = bytearray()
before_payload.extend(sequence.to_bytes(4, 'big', signed=True))
return before_payload
def parse_response(res):
"""
如果 res 是 bytes,则按协议解析;
如果 res 是 str,则直接返回文本内容,避免出现位移操作错误。
"""
if not isinstance(res, bytes):
return {'payload_msg': res}
header_size = res[0] & 0x0f
message_type = res[1] >> 4
message_type_specific_flags = res[1] & 0x0f
serialization_method = res[2] >> 4
message_compression = res[2] & 0x0f
payload = res[header_size * 4:]
result = {}
if message_type_specific_flags & 0x01:
seq = int.from_bytes(payload[:4], "big", signed=True)
result['payload_sequence'] = seq
payload = payload[4:]
result['is_last_package'] = bool(message_type_specific_flags & 0x02)
if message_type == FULL_SERVER_RESPONSE:
payload_size = int.from_bytes(payload[:4], "big", signed=True)
payload_msg = payload[4:]
elif message_type == SERVER_ACK:
seq = int.from_bytes(payload[:4], "big", signed=True)
result['seq'] = seq
if len(payload) >= 8:
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
else:
payload_msg = b""
elif message_type == SERVER_ERROR_RESPONSE:
code = int.from_bytes(payload[:4], "big", signed=False)
result['code'] = code
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
else:
payload_msg = payload
if message_compression == GZIP_COMPRESSION:
try:
payload_msg = gzip.decompress(payload_msg)
except Exception as e:
pass
if serialization_method == JSON_SERIALIZATION:
try:
payload_text = payload_msg.decode("utf-8")
payload_msg = json.loads(payload_text)
except Exception as e:
pass
else:
payload_msg = payload_msg.decode("utf-8", errors="ignore")
result['payload_msg'] = payload_msg
return result
# -------------------- 基于麦克风采集 PCM 数据的 ASR 测试客户端 --------------------
class AsrMicClient:
def __init__(self, token, ws_url, seg_duration=100, sample_rate=16000, channels=1, bits=16, format="pcm", **kwargs):
"""
:param token: 鉴权 token
:param ws_url: ASR websocket 服务地址
:param seg_duration: 分段时长,单位毫秒
:param sample_rate: 采样率(Hz)
:param channels: 通道数(一般单声道为 1)
:param bits: 采样位数(16 表示 16 位)
:param format: 音频格式,这里设为 "pcm"
"""
self.token = token
self.ws_url = ws_url
self.seg_duration = seg_duration # 毫秒
self.sample_rate = sample_rate
self.channels = channels
self.bits = bits
self.format = format
self.uid = kwargs.get("uid", "test")
self.codec = kwargs.get("codec", "raw")
self.streaming = kwargs.get("streaming", True)
def construct_request(self, reqid):
req = {
"user": {"uid": self.uid},
"audio": {
"format": self.format,
"sample_rate": self.sample_rate,
"bits": self.bits,
"channel": self.channels,
"codec": self.codec,
},
"request": {"model_name": "asr", "enable_punc": True}
}
return req
async def stream_mic(self):
"""
异步生成麦克风采集的 PCM 数据段,
使用 pyaudio 读取数据时设置 exception_on_overflow=False 避免输入溢出异常。
"""
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=1024)
bytes_per_frame = self.channels * (self.bits // 8)
frames_needed = int(self.sample_rate * self.seg_duration / 1000)
bytes_needed = frames_needed * bytes_per_frame
frames = []
while True:
try:
data = await asyncio.to_thread(stream.read, 1024, False)
except Exception as e:
print("麦克风读取错误:", e)
continue
frames.append(data)
if sum(len(f) for f in frames) >= bytes_needed:
segment = b"".join(frames)[:bytes_needed]
yield segment
frames = []
async def execute(self):
reqid = str(uuid.uuid4())
seq = 1
request_params = self.construct_request(reqid)
payload_bytes = json.dumps(request_params).encode("utf-8")
payload_bytes = gzip.compress(payload_bytes)
# 构造初始配置信息请求
full_client_request = bytearray(generate_header(message_type_specific_flags=POS_SEQUENCE))
full_client_request.extend(generate_before_payload(sequence=seq))
full_client_request.extend((len(payload_bytes)).to_bytes(4, "big"))
full_client_request.extend(payload_bytes)
headers = {"Authorization": "Bearer " + self.token}
# 用于记录上一次满足条件的响应文本与时间
begin_time = time.time()
print(f"开始时间:{begin_time}")
try:
async with websockets.connect(self.ws_url, extra_headers=headers, max_size=1000000000) as ws:
await ws.send(full_client_request)
try:
res = await asyncio.wait_for(ws.recv(), timeout=10.0)
except asyncio.TimeoutError:
print(f"{time.time() - begin_time}毫秒等待配置信息响应超时")
return
result = parse_response(res)
print(f"{time.time() - begin_time}毫秒配置响应:", result)
# 开始采集麦克风音频并分段发送
async for chunk in self.stream_mic():
seq += 1
audio_only_request = bytearray(
generate_header(message_type=AUDIO_ONLY_REQUEST,
message_type_specific_flags=POS_SEQUENCE))
audio_only_request.extend(generate_before_payload(sequence=seq))
compressed_chunk = gzip.compress(chunk)
audio_only_request.extend((len(compressed_chunk)).to_bytes(4, "big"))
audio_only_request.extend(compressed_chunk)
await ws.send(audio_only_request)
try:
res = await asyncio.wait_for(ws.recv(), timeout=5.0)
result = parse_response(res)
print(f"{time.time() - begin_time}毫秒接收响应:", result)
except asyncio.TimeoutError:
pass
await asyncio.sleep(self.seg_duration / 1000.0)
except Exception as e:
print("异常:", e)
def run(self):
asyncio.run(self.execute())
# -------------------- 入口 --------------------
if __name__ == '__main__':
# 替换下面的 token 与 ws_url 为你的实际参数 停止直接ctrl+c即可
token = "sk-xxx"
ws_url = "wss://api.qnaigc.com/v1/voice/asr"
seg_duration = 300 # 分段时长,单位毫秒,网络环境不好建议调大,否则会丢包
client = AsrMicClient(token=token, ws_url=ws_url, seg_duration=seg_duration, format="pcm")
client.run()
"""
在 macOS 上,你可以通过 Homebrew 安装它:
brew install portaudio
安装完成后,再尝试安装 PyAudio:
pip install pyaudio
"""
文字合成语音
- 丰富的多语言、音色库选择,并支持情感与语速调节;
- AI推理出的文本可以作为TTS接口的输入文本。
- API使用方式:
curl --location 'https://api.qnaigc.com/v1/voice/tts' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-xxx' \
--data '{
"audio": {
"voice_type": "zh_male_M392_conversation_wvae_bigtts",
"encoding": "mp3",
"speed_ratio": 1.0
},
"request": {
"text": "你好,世界!"
}
}'
使用七牛云 GPU 主机部署私有版本 DeepSeek
文档反馈
(如有产品使用问题,请 提交工单)