当前位置:AIGC资讯 > AIGC > 正文

如何利用OpenAI、NATS和Streamlight彻底改变实时警报

本文将介绍如何使用Streamlight、NATS和OpenAI这些非常酷的工具,在Python中构建一个全栈事件驱动的天气警报聊天应用程序。该应用程序可以实时收集天气信息,使用人工智能了解警报标准,并将这些警报发送到用户界面。

这段内容和代码示例对于那些希望了解现代实时警报系统如何与大型语言模型(LLM)协调工作以及如何实现的开发人员来说非常有帮助。

人们也可以采用GitHub上的源代码自己进行尝试。

幕后的力量

以下了解人工智能天气警报聊天应用程序是如何工作的,并将原始数据转换为可操作的警报,实时了解天气变化。应用程序的核心是一个用Python实现的响应式后端,由NATS提供支持,以确保实时数据处理和消息管理。集成OpenAI的GPT模型,使对话式人工智能能够理解警报的性质,并响应用户的查询。用户可以使用自然语言指定他们的警报标准, 然后GPT模型将对其进行解释。

图1实时警报应用架构

实时数据采集

从后端自各种来源连续异步收集天气数据开始。应用程序现在使用api.weatherapi.com服务,每10秒实时获取一次天气信息。这些数据包括全球各地温度、湿度、降水等参数。这段代码异步获取爱沙尼亚当前的天气数据,但应用程序可以改进为从用户输入动态设置位置:

async def fetch_weather_data():
    api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=estonia"
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(api_url) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    logging.error(f"Error fetching weather data: HTTP {response.status}")
                    return None
    except Exception as e:
        logging.error(f"Error fetching weather data: {e}")
        return None

NATS在数据流中的作用

backend.py文件中main()函数中的代码段演示了NATS的集成,用于驱动消息传递、连续的天气监测和警报。使用NATS.py库将NATS集成到Python代码中。首先,在NATs://localhost:4222建立运行在Docker中的NATS服务器的连接。

nats_client = await nats.connect("nats://localhost:4222")

然后,定义一个异步message_handler函数。该函数订阅并处理聊天主题上从NATS服务器接收到的消息。如果消息以“Set Alert:”开头(将其附加在前端),它将提取并更新用户的警报标准。

async def message_handler(msg):
    nonlocal user_alert_criteria
    data = msg.data.decode()
    if data.startswith("Set Alert:"):
        user_alert_criteria = data[len("Set Alert:"):].strip()
        logging.info(f"User alert criteria updated: {user_alert_criteria}")
await nats_client.subscribe("chat", cb=message_handler)

后端服务集成了天气API和Open AI Chat Completion API等外部服务。如果同时存在天气数据和用户警报标准,该应用程序会为OpenAI的GPT模型构建一个提示,以确定天气是否符合用户的标准。该提示要求人工智能根据用户的标准分析当前天气,并以“是”或“否”和简短的天气摘要做出回应。

一旦人工智能确定传入的天气数据符合用户的警报标准,它就会制作个性化的警报消息,并向NATS服务器上的chat_response主题发布天气警报,以更新前端应用程序的最新变化。此消息包含用户友好的通知,旨在通知和建议用户。例如,它可能会提示,“小心!爱沙尼亚明天会下雨。别忘了带伞!”

while True:
        current_weather = await fetch_weather_data()
        if current_weather and user_alert_criteria:
            logging.info(f"Current weather data: {current_weather}")
            prompt = f"Use the current weather: {current_weather} information and user alert criteria: {user_alert_criteria}. Identify if the weather meets these criteria and return only YES or NO with a short weather temperature info without explaining why."
            response_text = await get_openai_response(prompt)
            if response_text and "YES" in response_text:
                logging.info("Weather conditions met user criteria.")
                ai_response = f"Weather alert! Your specified conditions have been met. {response_text}"
                await nats_client.publish("chat_response", payload=ai_response.encode())
            else:
                logging.info("Weather conditions did not meet user criteria.")
        else:
            logging.info("No current weather data or user alert criteria set.")await asyncio.sleep(10)

实时发送和接收警报

了解一下后端和前端之间的整体通信流程。

  • 通过使用Streamlit构建的简单聊天界面(请参阅frontend.py文件),用户可以使用自然语言输入天气警报标准并提交。
alert_criteria = st.text_input("Set your weather alert criteria", key="alert_criteria", disabled=st.session_state['alert_set'])
  • Streamlit前端代码通过NATS消息传递与后端服务交互。它将这些标准发布到聊天主题上的NATS服务器。
def send_message_to_nats_handler(message):
    with NATSClient() as client:
        client.connect()
        client.publish("chat", payload=message.encode())
        client.subscribe("chat_response", callback=read_message_from_nats_handler)
        client.wait()

if set_alert_btn:
    st.session_state['alert_set'] = True
    st.success('Alert criteria set')
    send_message_to_nats_handler(f"Set Alert: {alert_criteria}")

正如在前一节中看到的,后端服务监听聊天主题,接收标准,获取当前天气数据,并使用人工智能来确定是否应该触发警报。如果满足条件,后端服务将向chat_response主题发送警报消息。前端接收此消息并更新用户界面(UI)以通知用户。

def read_message_from_nats_handler(msg):
    message = msg.payload.decode()
    st.session_state['conversation'].append(("AI", message))
    st.markdown(f"<span style='color: red;'></span> AI: {message}", unsafe_allow_html=True)

进行尝试

要详细探索实时天气警报聊天应用程序并亲自尝试,可以访问前面链接的GitHub存储库。该存储库包含所有必要的代码、详细的设置说明和帮助入门的附加文档。在设置完成之后,就可以启动Streamlit前端和Python后端。设置天气警报标准,并查看系统如何处理实时天气数据以了解情况。

图2警报应用程序的Streamlight UI

建立流处理管道

实时天气警报聊天应用程序演示了NATS在分布式系统中用于实时消息传递的强大用例,允许在面向用户的前端和数据处理后端之间进行有效的通信。但是,应该考虑几个关键步骤,以确保呈现给用户的信息是相关的、准确的和可操作的。在应用程序中,只是获取实时的原始天气数据,并将其直接发送到OpenAI或前端。有时,需要在数据到达外部服务之前对其进行实时转换,以便对其进行过滤、丰富、聚合或规范化。需要开始考虑创建具有多个阶段的流处理管道。

例如,并非从API获取的所有数据都与每个用户相关,可以在初始阶段过滤掉不必要的信息。此外,数据可以采用各种格式,特别是如果需要从多个API获取信息以获得全面警报,这就需要对这些数据进行规范化。在下一阶段,使用额外的场景或原始数据的信息来丰富数据,使其更有用。这可能包括将当前天气状况与历史数据进行比较,以识别异常模式,或者使用另一个外部API添加基于位置的见解,例如针对特定地区天气状况的特定建议。在后期阶段,可能会汇总每小时的温度数据,以给出白天的平均温度或突出显示白天达到的峰值温度。

下一个步骤

当涉及到在生产环境中转换数据、部署、运行和扩展应用程序时,你可能希望使用Python中的专用框架(例如GlassFlow)来构建复杂的流处理管道。GlassFlow为流处理提供了一个完全托管的无服务器基础设施,不必考虑设置或维护,应用程序可以轻松处理大量数据和用户请求。它提供了高级状态管理功能,可以更轻松地跟踪用户警报标准和其他应用程序状态。而应用程序可以根据其用户群进行扩展,而不会影响性能。

原文标题:Revolutionizing Real-Time Alerts With AI, NATS, and Streamlit,作者:Bobur Umurzokov

链接:https://dzone.com/articles/revolutionizing-real-time-alerts-with-ai-nats-and。

更新时间 2024-04-07