AI 文章摘要

DeepSeek V3.1
本文详细介绍了OpenAI API的标准化接口规范与实现指南,为开发者提供了完整的API兼容性解决方案。核心内容包括API的基础URL结构、请求与响应格式、流式传输机制以及错误处理方案。技术要点涵盖:支持多模型的聊天完成接口设计,包含必需参数和消息角色定义;标准化的JSON响应结构及Server-Sent Events流式输出格式;基于Bearer Token的认证方式和完整的HTTP状态码规范。文档还提供了FastAPI框架的完整实现参考,包括数据模型定义、路由处理、流式响应生成和错误处理机制,帮助开发者快速构建兼容OpenAI API格式的服务接口,适用于需要集成大型语言模型的应用开发场景。

OpenAI API 格式规范与实现指南

目录

  1. 概述
  2. API 规范
  3. 请求格式
  4. 响应格式
  5. 流式响应
  6. 错误处理
  7. FastAPI 实现
  8. 认证与安全
  9. 最佳实践
  10. 示例代码

概述

OpenAI API 是一套标准化的 RESTful API 接口,用于与大型语言模型进行交互。本文档详细介绍了如何实现兼容 OpenAI API 格式的服务接口。

核心特性

  • 标准化格式:遵循 OpenAI 官方 API 规范
  • 流式响应:支持实时流式输出
  • 多模型支持:可适配不同的语言模型
  • 错误处理:完善的错误响应机制
  • 认证安全:Bearer Token 认证方式

API 规范

基础 URL 结构

1
2
3
GET  /v1/models                    # 获取模型列表
POST /v1/chat/completions # 聊天完成接口
GET /health # 健康检查

HTTP 头部要求

1
2
3
Content-Type: application/json
Authorization: Bearer YOUR_API_KEY
Accept: application/json

状态码规范

状态码 含义 使用场景
200 成功 正常响应
400 请求错误 参数验证失败
401 未授权 API Key 无效
404 未找到 路由不存在
422 参数错误 请求格式错误
500 服务器错误 内部处理异常

请求格式

聊天完成请求 (POST /v1/chat/completions)

基本参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello, how are you?"
}
],
"stream": false,
"temperature": 0.7,
"max_tokens": 1000,
"top_p": 1.0,
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"stop": null,
"user": "user-123"
}

参数详解

参数 类型 必需 默认值 说明
model string - 模型名称
messages array - 对话消息列表
stream boolean false 是否流式响应
temperature number 1.0 随机性控制 (0.0-2.0)
max_tokens integer null 最大生成 token 数
top_p number 1.0 核采样参数 (0.0-1.0)
frequency_penalty number 0.0 频率惩罚 (-2.0-2.0)
presence_penalty number 0.0 存在惩罚 (-2.0-2.0)
stop string/array null 停止序列
user string null 用户标识

消息格式

1
2
3
4
5
6
{
"role": "user|assistant|system|function",
"content": "消息内容",
"name": "可选的名称",
"reasoning": "可选的推理内容(用于 reasoning 模型)"
}

角色说明:

  • system: 系统提示,设定 AI 的行为
  • user: 用户消息
  • assistant: AI 助手的回复
  • function: 函数调用结果

响应格式

非流式响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"id": "chatcmpl-123456789",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Hello! I'm doing well, thank you for asking.",
"reasoning": "用户问候,我应该礼貌回应"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 20,
"completion_tokens": 15,
"total_tokens": 35
}
}

响应字段说明

字段 类型 说明
id string 响应唯一标识符
object string 对象类型
created integer 创建时间戳
model string 使用的模型
choices array 生成的选择列表
usage object Token 使用统计

finish_reason 说明

说明
stop 自然结束
length 达到最大长度
content_filter 内容过滤
function_call 函数调用

流式响应

Server-Sent Events (SSE) 格式

流式响应使用 SSE 格式,每个数据块以 data: 开头:

1
2
3
4
5
6
7
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":" there!"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

流式响应结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"id": "chatcmpl-123456789",
"object": "chat.completion.chunk",
"created": 1677652288,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": "Hello",
"reasoning": "推理内容(可选)"
},
"finish_reason": null
}
]
}

流式响应要点

  1. Content-Type: text/event-stream
  2. 缓存控制: Cache-Control: no-cache
  3. 连接保持: Connection: keep-alive
  4. 结束标记: 最后发送 data: [DONE]
  5. 错误处理: 异常时发送错误信息并结束流

错误处理

错误响应格式

1
2
3
4
5
6
7
8
{
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"param": "api_key",
"code": "invalid_api_key"
}
}

常见错误类型

错误类型 HTTP 状态码 说明
invalid_request_error 400 请求格式错误
invalid_api_key 401 API Key 无效
insufficient_quota 429 配额不足
model_not_found 404 模型不存在
server_error 500 服务器内部错误

FastAPI 实现

项目结构

1
2
3
4
5
6
7
project/
├── main.py # 应用入口
├── models.py # 数据模型
├── api_server.py # API 路由
├── converter.py # 格式转换
├── config.py # 配置管理
└── requirements.txt # 依赖包

数据模型定义 (models.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from typing import List, Optional, Union, Dict, Any
from pydantic import BaseModel, Field
from enum import Enum

class Role(str, Enum):
"""消息角色枚举"""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
FUNCTION = "function"

class Message(BaseModel):
"""聊天消息模型"""
role: Role
content: str
name: Optional[str] = None
reasoning: Optional[str] = None

class ChatCompletionRequest(BaseModel):
"""聊天完成请求模型"""
model: str
messages: List[Message]
temperature: Optional[float] = Field(default=1.0, ge=0.0, le=2.0)
top_p: Optional[float] = Field(default=1.0, ge=0.0, le=1.0)
n: Optional[int] = Field(default=1, ge=1)
stream: Optional[bool] = False
stop: Optional[Union[str, List[str]]] = None
max_tokens: Optional[int] = Field(default=None, ge=1)
presence_penalty: Optional[float] = Field(default=0.0, ge=-2.0, le=2.0)
frequency_penalty: Optional[float] = Field(default=0.0, ge=-2.0, le=2.0)
user: Optional[str] = None

class Usage(BaseModel):
"""使用情况统计"""
prompt_tokens: int
completion_tokens: int
total_tokens: int

class Choice(BaseModel):
"""选择项模型"""
index: int
message: Message
finish_reason: Optional[str] = None

class ChatCompletionResponse(BaseModel):
"""聊天完成响应模型"""
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[Choice]
usage: Usage

class Delta(BaseModel):
"""流式响应增量"""
role: Optional[str] = None
content: Optional[str] = None
reasoning: Optional[str] = None

class StreamChoice(BaseModel):
"""流式选择项"""
index: int
delta: Delta
finish_reason: Optional[str] = None

class ChatCompletionStreamResponse(BaseModel):
"""流式聊天完成响应"""
id: str
object: str = "chat.completion.chunk"
created: int
model: str
choices: List[StreamChoice]

API 路由实现 (api_server.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import logging
import time
from typing import AsyncGenerator

from models import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionStreamResponse,
StreamChoice,
Delta
)

app = FastAPI(
title="OpenAI Compatible API",
description="OpenAI 格式兼容的 API 服务",
version="1.0.0"
)

# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

@app.get("/")
async def root():
"""根路径"""
return {
"message": "OpenAI Compatible API Server",
"version": "1.0.0",
"endpoints": {
"chat": "/v1/chat/completions",
"models": "/v1/models",
"health": "/health"
}
}

@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "timestamp": int(time.time())}

@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest, http_request: Request):
"""创建聊天完成"""
# 获取 API Key
auth_header = http_request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"code": "invalid_api_key"
}
}
)

api_key = auth_header[7:] # 移除 "Bearer " 前缀

try:
if request.stream:
# 流式响应
return StreamingResponse(
generate_stream_response(request, api_key),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream"
}
)
else:
# 非流式响应
response = await generate_completion(request, api_key)
return response

except Exception as e:
raise HTTPException(
status_code=500,
detail={
"error": {
"message": str(e),
"type": "server_error",
"code": "internal_error"
}
}
)

async def generate_completion(request: ChatCompletionRequest, api_key: str) -> ChatCompletionResponse:
"""生成非流式完成响应"""
# 这里实现你的模型调用逻辑
# 示例实现
response_id = f"chatcmpl-{int(time.time())}"

# 模拟响应
return ChatCompletionResponse(
id=response_id,
created=int(time.time()),
model=request.model,
choices=[
Choice(
index=0,
message=Message(
role="assistant",
content="这是一个示例响应"
),
finish_reason="stop"
)
],
usage=Usage(
prompt_tokens=10,
completion_tokens=5,
total_tokens=15
)
)

async def generate_stream_response(request: ChatCompletionRequest, api_key: str) -> AsyncGenerator[str, None]:
"""生成流式响应"""
response_id = f"chatcmpl-{int(time.time())}"
created = int(time.time())

try:
# 模拟流式输出
content_chunks = ["这是", "一个", "流式", "响应", "示例"]

for chunk in content_chunks:
stream_response = ChatCompletionStreamResponse(
id=response_id,
created=created,
model=request.model,
choices=[
StreamChoice(
index=0,
delta=Delta(content=chunk),
finish_reason=None
)
]
)
yield f"data: {stream_response.model_dump_json()}\n\n"

# 模拟延迟
import asyncio
await asyncio.sleep(0.1)

# 发送结束标记
final_response = ChatCompletionStreamResponse(
id=response_id,
created=created,
model=request.model,
choices=[
StreamChoice(
index=0,
delta=Delta(),
finish_reason="stop"
)
]
)
yield f"data: {final_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"

except Exception as e:
# 错误处理
error_response = {
"error": {
"message": str(e),
"type": "server_error",
"code": "internal_error"
}
}
yield f"data: {json.dumps(error_response)}\n\n"
yield "data: [DONE]\n\n"

@app.get("/v1/models")
async def list_models():
"""获取模型列表"""
return {
"object": "list",
"data": [
{
"id": "gpt-3.5-turbo",
"object": "model",
"created": 1677610602,
"owned_by": "openai"
},
{
"id": "gpt-4",
"object": "model",
"created": 1687882411,
"owned_by": "openai"
}
]
}

配置管理 (config.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
from typing import List
from dotenv import load_dotenv

load_dotenv()

class Config:
"""应用配置类"""

def __init__(self):
# 服务器配置
self.host: str = os.getenv("HOST", "0.0.0.0")
self.port: int = int(os.getenv("PORT", "8000"))
self.debug: bool = os.getenv("DEBUG", "false").lower() == "true"

# API 配置
self.api_prefix: str = os.getenv("API_PREFIX", "/v1")
self.cors_origins: List[str] = os.getenv("CORS_ORIGINS", "*").split(",")

# 模型配置
self.default_model: str = os.getenv("DEFAULT_MODEL", "gpt-3.5-turbo")
self.max_tokens: int = int(os.getenv("MAX_TOKENS", "2048"))

# 安全配置
self.api_keys: List[str] = os.getenv("API_KEYS", "").split(",")

def validate_api_key(self, api_key: str) -> bool:
"""验证 API Key"""
if not self.api_keys or not self.api_keys[0]:
return True # 如果没有配置 API Key,则不验证
return api_key in self.api_keys

config = Config()

应用入口 (main.py)

1
2
3
4
5
6
7
8
9
10
11
12
import uvicorn
from api_server import app
from config import config

if __name__ == "__main__":
uvicorn.run(
"api_server:app",
host=config.host,
port=config.port,
reload=config.debug,
log_level="info" if not config.debug else "debug"
)

认证与安全

Bearer Token 认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证 API Key"""
if not config.validate_api_key(credentials.credentials):
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"code": "invalid_api_key"
}
}
)
return credentials.credentials

# 在路由中使用
@app.post("/v1/chat/completions")
async def create_chat_completion(
request: ChatCompletionRequest,
api_key: str = Depends(verify_api_key)
):
# 处理请求
pass

请求限制

1
2
3
4
5
6
7
8
9
10
11
12
13
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/v1/chat/completions")
@limiter.limit("10/minute")
async def create_chat_completion(request: Request, ...):
# 处理请求
pass

最佳实践

1. 错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 统一错误处理中间件
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
try:
response = await call_next(request)
return response
except Exception as e:
return JSONResponse(
status_code=500,
content={
"error": {
"message": "Internal server error",
"type": "server_error",
"code": "internal_error"
}
}
)

2. 日志记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 请求日志中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = datetime.now()

# 记录请求
logger.info(f"Request: {request.method} {request.url}")

response = await call_next(request)

# 记录响应
process_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Response: {response.status_code} - {process_time:.3f}s")

return response

3. 参数验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pydantic import validator

class ChatCompletionRequest(BaseModel):
model: str
messages: List[Message]
temperature: Optional[float] = 1.0

@validator('temperature')
def validate_temperature(cls, v):
if v < 0.0 or v > 2.0:
raise ValueError('temperature must be between 0.0 and 2.0')
return v

@validator('messages')
def validate_messages(cls, v):
if not v:
raise ValueError('messages cannot be empty')
return v

4. 性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 异步处理
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def process_model_request(request_data):
"""异步处理模型请求"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
sync_model_call,
request_data
)
return result

# 连接池管理
from httpx import AsyncClient

class ModelClient:
def __init__(self):
self.client = AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)

async def call_model(self, data):
response = await self.client.post(
"https://api.model-provider.com/v1/completions",
json=data
)
return response.json()

示例代码

客户端调用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import requests
import json

# 非流式请求
def test_completion():
url = "http://localhost:8000/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer your-api-key"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"temperature": 0.7,
"max_tokens": 100
}

response = requests.post(url, headers=headers, json=data)
print(response.json())

# 流式请求
def test_stream():
url = "http://localhost:8000/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer your-api-key"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Tell me a story"}
],
"stream": True
}

response = requests.post(url, headers=headers, json=data, stream=True)

for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_content = line[6:]
if data_content == '[DONE]':
break
try:
chunk = json.loads(data_content)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
except json.JSONDecodeError:
pass
print() # 换行

if __name__ == "__main__":
test_completion()
test_stream()

JavaScript 客户端示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 非流式请求
async function testCompletion() {
const response = await fetch('http://localhost:8000/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [
{ role: 'user', content: 'Hello, how are you?' }
],
temperature: 0.7,
max_tokens: 100
})
});

const data = await response.json();
console.log(data);
}

// 流式请求
async function testStream() {
const response = await fetch('http://localhost:8000/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [
{ role: 'user', content: 'Tell me a story' }
],
stream: true
})
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return;
}

try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
} catch (e) {
// 忽略解析错误
}
}
}
}
}

部署配置

Docker 部署

1
2
3
4
5
6
7
8
9
10
11
12
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "api_server:app", "--host", "0.0.0.0", "--port", "8000"]

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
version: '3.8'

services:
api:
build: .
ports:
- "8000:8000"
environment:
- DEBUG=false
- API_KEYS=your-secret-key-1,your-secret-key-2
- CORS_ORIGINS=https://yourdomain.com
volumes:
- ./logs:/app/logs
restart: unless-stopped

requirements.txt

1
2
3
4
5
6
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-dotenv==1.0.0
httpx==0.25.2
slowapi==0.1.9

总结

本文档详细介绍了 OpenAI API 格式的规范和 FastAPI 实现方法。通过遵循这些规范和最佳实践,你可以构建一个完全兼容 OpenAI API 的服务,支持:

  • ✅ 标准化的请求/响应格式
  • ✅ 流式和非流式响应
  • ✅ 完善的错误处理
  • ✅ Bearer Token 认证
  • ✅ 请求限制和安全控制
  • ✅ 高性能异步处理
  • ✅ 易于部署和扩展

这样的实现可以无缝集成到现有的 OpenAI 生态系统中,为用户提供一致的 API 体验。