跳转至

处理流式响应的实例

此处的坑有

1、流式响应的header
2、js处理流式返回的方式,如果不当会导致响应结束才有反应

后端

import json
import urllib.parse
from ai.deep_speek import maxai
from collections import deque
apkey = 'application-24f27892e5f9da86d669d842c1c68254'
# apkey = 'application-80adf7e3384c1c9bf8b7a50e645dfc67'# 测试AI
c1 = maxai(new_chat=True, auth=apkey)

from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer,SimpleHTTPRequestHandler
import sqlite3

import sqlite3

def create_table(db_name):
    """创建表"""
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS my_table (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        value TEXT NOT NULL
    )
    ''')
    conn.commit()
    cursor.close()
    conn.close()

def add_record(db_name, value):
    """向表中添加记录"""
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    try:
        cursor.execute("INSERT INTO my_table (value) VALUES (?)", (value,))
        conn.commit()  # 提交事务
        print(f"Record added: {value}")
    except sqlite3.Error as e:
        print(f"An error occurred while adding record: {e}")
    finally:
        cursor.close()
        conn.close()

def get_and_delete_min_id_value(db_name):
    """获取并删除 id 最小的记录"""
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    try:
        cursor.execute("SELECT id, value FROM my_table ORDER BY id ASC LIMIT 1")
        row = cursor.fetchone()

        if row:
            min_id, value = row
            print(f"Retrieved value: {value}")

            # 删除该记录
            cursor.execute("DELETE FROM my_table WHERE id = ?", (min_id,))
            conn.commit()  # 提交事务

            return value
        else:
            # print("No records found.")
            return None

    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        return None

    finally:
        cursor.close()
        conn.close()


class SSEHandler(BaseHTTPRequestHandler):
    # # 关闭日志
    def log_message(self, format, *args):
        pass


    def do_POST(self):
        print("Received POST request")  # 添加日志输出
        content_length = int(self.headers['Content-Length'])
        post_data = self.rfile.read(content_length)

        decoded_str = urllib.parse.unquote(post_data.decode('utf-8'))
        add_record(db_name, decoded_str) # "🐕测试用例设计:"+



        print("Received message:", decoded_str)

        response = ''
        self.send_response(200)
        self.send_header('Content-type', 'text/plain')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Cache-Control', 'no-cache')
        self.end_headers()
        self.wfile.write(response.encode('utf-8'))

    def do_GET(self):

        self.send_response(200)
        self.send_header("Content-type", "text/html; charset=utf-8")
        self.send_header('Cache-Control', 'no-cache')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()
        value = get_and_delete_min_id_value(db_name)

        if value:
            response = c1.webchat(value)
            if response.status_code == 200:
                # 设置响应编码为UTF-8
                response.encoding = 'utf-8'
                # 逐行读取响应内容
                for line in response.iter_lines(decode_unicode=True):
                    if line:
                        event_data = json.loads(line[5:])
                        if event_data['is_end'] == False:
                            text = event_data['content']
                            if text:
                                print(text, end='',flush=True)
                                self.wfile.write(text.encode('utf-8'))



def run(server_class=ThreadingHTTPServer, handler_class=SSEHandler, port=8000):
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    print(f'Starting httpd server on port {port}...')
    httpd.serve_forever()

if __name__ == '__main__':
    db_name = 'ai'
    create_table(db_name)
    run()

前端

<!DOCTYPE html>
<html lang="zh">
<head>
    <meta charset="utf-8"/>
    <title>Simple example - Editor.md examples</title>
    <link rel="stylesheet" href="css/style.css"/>
    <link rel="stylesheet" href="../css/editormd.css"/>
    <link rel="shortcut icon" href="https://pandao.github.io/editor.md/favicon.ico" type="image/x-icon"/>
</head>
<body>
<div id="layout">
    <header>
        <h1>Simple example</h1>
    </header>
    <form action="http://127.0.0.1:8000" onsubmit="handleFormSubmit(event)">

        <input type="text" id="text" name="text">
        <button type="submit">Send</button>
    </form>
    <div id="test-editormd">
                <textarea style="display:none;" id="textareaabc">
</textarea>
    </div>
</div>
<script src="js/jquery.min.js"></script>
<script src="../editormd.min.js"></script>
<script type="text/javascript">
    var testEditor;

    $(function () {
        testEditor = editormd("test-editormd", {
            width: "95%",
            height: 800,
            syncScrolling: "both",
            path: "../lib/",
            preview: true,

        });

    })
    ;

    function fetchData() {
        fetch('http://127.0.0.1:8000/data')
            .then(response => {
                // 确保响应是可读的
                if (!response.body) {
                    throw new Error("ReadableStream not yet supported in this browser.");
                }

                const reader = response.body.getReader();
                const decoder = new TextDecoder("utf-8");

                // 读取数据块
                function read() {
                    reader.read().then(({done, value}) => {
                        if (done) {
                            console.log("Stream completed");
                            return;
                        }

                        // 将读取到的二进制数据转换为字符串
                        const chunk = decoder.decode(value, {stream: true});
                        console.log("Received chunk:", chunk);
                        var currentMarkdown = testEditor.getMarkdown();
                        testEditor.setMarkdown(currentMarkdown + chunk); // 更新 Markdown 内容

                        // 递归调用以继续读取
                        read();
                    });
                }

                // 开始读取
                read();
            })
            .catch(error => {
                console.error("Error fetching stream:", error);
            });
    }

    // 每隔 1 秒请求一次数据
    //setInterval(fetchData, 1000);

    function handleFormSubmit(event) {
        // 阻止表单的默认提交行为
        event.preventDefault();

        // 获取表单元素
        const input = document.getElementById('text');
        let inputValue = input.value;

        // 在这里修改参数

        // 创建一个新的 XMLHttpRequest 对象
        const xhr = new XMLHttpRequest();
        xhr.open('POST', 'http://127.0.0.1:8000/events123', true);
        xhr.setRequestHeader('Content-Type', 'application/x-www-form-urlencoded');

        // 发送请求
        xhr.send(encodeURIComponent(inputValue));

        // 可选:处理响应
        xhr.onload = function () {
            if (xhr.status === 200) {
                console.log('请求成功:', xhr.responseText);
                fetchData();
            } else {
                console.error('请求失败:', xhr.statusText);
            }
        };
    }

</script>


</body>
</html>