当前位置:AIGC资讯 > AIGC > 正文

Streaming local LLM with FastAPI, Llama.cpp and Langchain

题意:

使用FastAPI、Llama.cpp和Langchain流式传输本地大型语言模型

问题背景:

I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but I wasn't able to implement it with a FastAPI response.

我已经使用Llama.cpp和Langchain设置了FastAPI。现在我想在FastAPI响应中启用流式传输。在我的终端中,流式传输与Llama.cpp一起工作正常,但我无法将其与FastAPI响应一起实现。

Most tutorials focused on enabling streaming with an OpenAI model, but I am using a local LLM (quantized Mistral) with llama.cpp. I think I have to modify the Callbackhandler, but no tutorial worked. Here is my code:

大多数教程都集中在如何使用OpenAI模型启用流式传输,但我正在使用带有llama.cpp的本地大型语言模型(量化的Mistral)。我认为我需要修改Callbackhandler,但我没有找到任何可行的教程。以下是我的代码:

from fastapi import FastAPI, Request, Response
from langchain_community.llms import LlamaCpp
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import copy
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

model_path = "../modelle/mixtral-8x7b-instruct-v0.1.Q5_K_M.gguf"

prompt= """
<s> [INST] Im folgenden bekommst du eine Aufgabe. Erledige diese anhand des User Inputs.

### Hier die Aufgabe: ###
{typescript_string}

### Hier der User Input: ###
{input}

Antwort: [/INST]
"""

def model_response_prompt():
    return PromptTemplate(template=prompt, input_variables=['input', 'typescript_string'])

def build_llm(model_path, callback=None):
        callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
        #callback_manager = CallbackManager(callback)
        
        n_gpu_layers = 1 # Metal set to 1 is enough. # ausprobiert mit mehreren
        n_batch = 512#1024 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip.
   
        llm = LlamaCpp(
                max_tokens =1000,
                n_threads = 6,
                model_path=model_path,
                temperature= 0.8,
                f16_kv=True,
                n_ctx=28000, 
                n_gpu_layers=n_gpu_layers,
                n_batch=n_batch,
                callback_manager=callback_manager, 
                verbose=True,
                top_p=0.75,
                top_k=40,
                repeat_penalty = 1.1,
                streaming=True,
                model_kwargs={
                        'mirostat': 2,
                },
        )
        
        return llm

# caching LLM
@lru_cache(maxsize=100)
def get_cached_llm():
        chat = build_llm(model_path)
        return chat

chat = get_cached_llm()

app = FastAPI(
    title="Inference API for Mistral and Mixtral",
    description="A simple API that use Mistral or Mixtral",
    version="1.0",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def bullet_point_model():          
    llm = build_llm(model_path=model_path)
    llm_chain = LLMChain(
        llm=llm,
        prompt=model_response_prompt(),
        verbose=True,
    )
    return llm_chain

@app.get('/model_response')
async def model(question : str, prompt: str):
    model = bullet_point_model()
    res = model({"typescript_string": prompt, "input": question})
    result = copy.deepcopy(res)
    return result

In a example notebook, I am calling FastAPI like this:

在一个示例笔记本中,我像这样调用FastAPI:

import  subprocess
import urllib.parse
import shlex
query = input("Insert your bullet points here: ")
task = input("Insert the task here: ")
#Safe Encode url string
encodedquery =  urllib.parse.quote(query)
encodedtask =  urllib.parse.quote(task)
#Join the curl command textx
command = f"curl -X 'GET' 'http://127.0.0.1:8000/model_response?question={encodedquery}&prompt={encodedtask}' -H 'accept: application/json'"
print(command)
args = shlex.split(command)
process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout)

So with this code, getting responses from the API works. But I only see streaming in my terminal (I think this is because of the StreamingStdOutCallbackHandler. After the streaming in the terminal is complete, I am getting my FastAPI response.

所以,使用这段代码,从API获取响应是可行的。但我只能在终端中看到流式传输(我认为这是因为使用了StreamingStdOutCallbackHandler)。在终端中的流式传输完成后,我才能收到FastAPI的响应。

What do I have to change now that I can stream token by token with FastAPI and a local llama.cpp model?

我现在可以使用FastAPI和本地的llama.cpp模型逐令牌(token-by-token)地进行流式传输,那么我还需要改变什么?

问题解决:

I was doing the same and hit similar issue that FastAPI was not streaming the response even I am using the StreamingResponse API and eventually I got the following code work. There are three important part:

我之前也做了同样的事情,并遇到了类似的问题,即即使我使用了StreamingResponse API,FastAPI也没有流式传输响应。但最终我得到了以下可以工作的代码。这里有三个重要的部分:

Make sure using StreamingResponse to wrap an Iterator.

确保使用StreamingResponse来包装一个迭代器

Make sure the Iterator sends newline character \n in each streaming response.

确保迭代器在每个流式响应中发送换行符 \n

Make sure using streaming APIs to connect to your LLMs. For example, _client.chat function in my example is using httpx to connect to REST APIs for LLMs. If you use requests package, it won't work as it doesn't support streaming.

确保使用流式API来连接您的大型语言模型(LLMs)。例如,在我的示例中,_client.chat 函数使用 httpx 来连接到LLMs的REST API。如果您使用 requests 包,那么它将无法工作,因为 requests 不支持流式传输。

async def chat(self, request: Request):
"""
Generate a chat response using the requested model.
"""

# Passing request body JSON to parameters of function _chat
# Request body follows ollama API's chat request format for now.
params = await request.json()
self.logger.debug("Request data: %s", params)

chat_response = self._client.chat(**params)

# Always return as streaming
if isinstance(chat_response, Iterator):
    def generate_response():
        for response in chat_response:
            yield json.dumps(response) + "\n"
    return StreamingResponse(generate_response(), media_type="application/x-ndjson")
elif chat_response is not None:
    return json.dumps(chat_response)

总结

**文章总结**
本文探讨了在FastAPI中结合Llama.cpp和Langchain实现本地大型语言模型(Local Large Language Model,LLM)流式传输的问题。用户已经成功通过Llama.cpp与Langchain在终端中实现了流式传输,但面临将这一功能集成到FastAPI响应中的挑战。
**问题背景**
用户设置了一个使用FastAPI、Llama.cpp和Langchain的框架,并希望在FastAPI的响应中使用流式传输。尽管在终端层面,结合Llama.cpp的流式传输工作正常,但在将此功能集成到FastAPI框架中时存在困难。用户发现大多数教程重点关注的是与OpenAI模型进行流式传输,而没有一个直接适用于使用Llama.cpp的本地LLM的指南。
**解决方案探索**
原本的代码示例中,用户尝试在FastAPI的路由函数中调用模型进行预测,并直接返回结果。然而,这种方式并不支持流式响应。
**问题解决**
最终,用户通过以下方式解决了问题:
1. **使用`StreamingResponse`包装迭代器**:为了确保FastAPI能够流式传输响应,需要用`StreamingResponse`来包装一个迭代器。这个迭代器能够逐项产生响应的内容。
2. **确保迭代器在每个流式响应中发送换行符**:为了确保前端能够正确地处理这些流式响应的分段,需要在每个响应之后添加换行符`\n`。
3. **使用支持流式传输的客户端API连接LLMs**:在调用LLMs进行预测时,需要确保使用的客户端API支持流式传输。在本例中,使用了`httpx`库(而非`requests`包),因为`httpx`支持流式请求和响应。
代码示例展示了如何在FastAPI的路由函数中实现流式响应的功能,并通过一个名为`chat`的异步函数演示了如何利用迭代器与LLMs进行流式通信,并结合`StreamingResponse`返回流式响应给前端。通过这些改动,用户成功地将原本只在终端工作的流式传输功能集成到了FastAPI应用中。

更新时间 2024-07-18