1、实验设备
01科技的V831
2、总体概述
按下按键,蓝灯亮起,开始定时录音,录音结束,蓝灯熄灭。接着,通过百度的语音识别API进行语音转文字,得到文字后通过百度的文心一言API进行回答,得到的回答通过百度的语音合成API把回答的文字合成为音频,最后进行播放。
3、录音
#录音
def record(WAVE_OUTPUT_FILENAME,RECORD_SECONDS):
'''
:param WAVE_OUTPUT_FILENAME: 录音保存文件名
:param RECORD_SECONDS: 录音时间
:return:
'''
# 设置WAV文件格式
CHUNK = 512
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = RECORD_SECONDS # 录音时间
WAVE_OUTPUT_FILENAME = WAVE_OUTPUT_FILENAME#r"/root/test.wav" # 文件名称,默认保存到U盘,即/root/下
# 构建pyAuduio对象
p = pyaudio.PyAudio()
# 开启流,输入模式,即录音
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# print("* recording")
# 录音数据
frames = []
# 读取流,即录音
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
# print("* done recording")
# 停止流
stream.stop_stream()
stream.close()
# 关闭pyAudio
p.terminate()
# 保存WAV文件
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
4、语音识别
在百度智能云创建应用,获取相关key
# 语音转文字模块
def SpeechRecognition(path):
'''
:param path: 需要语音转文字的音频文件
:return: 返回识别到的文字
'''
import json
import base64
timer = time.perf_counter
# 普通版
DEV_PID = 1537 # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get' # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有
# 需要识别的文件
AUDIO_FILE = f'{path}'
# 文件格式
FORMAT = AUDIO_FILE[-3:] # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
CUID = '123456PYTHON'
# 采样率
RATE = 16000 # 固定值
""" TOKEN start """
TOKEN_URL = 'http://aip.baidubce.com/oauth/2.0/token'
class DemoError(Exception):
pass
def fetch_token():
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
if (IS_PY3):
post_data = post_data.encode('utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = result_str.decode()
# print(result_str)
result = json.loads(result_str)
# print(result)
if ('access_token' in result.keys() and 'scope' in result.keys()):
print(SCOPE)
if SCOPE and (not SCOPE in result['scope'].split(' ')): # SCOPE = False 忽略检查
raise DemoError('scope is not correct')
# print('SUCCESS WITH TOKEN: %s EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
return result['access_token']
else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')
""" TOKEN end """
token = fetch_token()
speech_data = []
with open(AUDIO_FILE, 'rb') as speech_file:
speech_data = speech_file.read()
length = len(speech_data)
if length==0:
raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)
speech = base64.b64encode(speech_data)
if (IS_PY3):
speech = str(speech, 'utf-8')
params = {'dev_pid': DEV_PID,
# "lm_id" : LM_ID, #测试自训练平台开启此项
'format': FORMAT,
'rate': RATE,
'token': token,
'cuid': CUID,
'channel': 1,
'speech': speech,
'len': length
}
post_data = json.dumps(params, sort_keys=False)
# print post_data
req = Request(ASR_URL, post_data.encode('utf-8'))
req.add_header('Content-Type', 'application/json')
try:
# begin = timer()
f = urlopen(req)
result_str = f.read()
# print("Request time cost %f" % (timer() - begin))
except URLError as err:
print('asr http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = str(result_str, 'utf-8')
result = json.loads(result_str)
# print(result['result'][0])
#把百度返回的内容写到SpeechRecognition_result.txt
with open(r"/root/SpeechRecognition_result.txt", "w") as of:
of.write(result_str)
return result['result'][0]
5、智能回答
在千帆大模型平台创建应用,获取相关key
使用的是免费的第三方大语言模型Yi-34B-Chat,响应速度一般
def Wen_Xin_said(text_words=""):
API_Key = "********************" #百度文心一言的API Key
Secret_Key = "********************" #百度文心一言的Secret Key
def get_access_token():
"""
使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
"""
url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={API_Key}&client_secret={Secret_Key}"
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/yi_34b_chat?access_token=" + get_access_token()
# print('等待回应中\r\n')
payload = json.dumps({
"messages": [
{
"role": "user",
"content": "hi"
},
{
"role": "assistant",
"content": "Hello! How can I assist you today? If you have any questions or need information on a specific topic, feel free to ask."
},
{
"role": "user",
"content": f"{text_words}"
}
]
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
# print(response.text)
response_result = response.text
result = json.loads(response_result)
# print(result['result'])
# 把文心一言返回的内容写到Wen_Xin_said_result.txt
with open(r"/root/Wen_Xin_said_result.txt", "w") as of:
of.write(response_result)
return result['result']
6、语音合成
使用的key和前面的百度语音识别一样
#文字转语音模块
def SpeechSynthesis(text_words=""):
TEXT = text_words
# 发音人选择, 基础音库:0为度小美,1为度小宇,3为度逍遥,4为度丫丫,
# 精品音库:5为度小娇,103为度米朵,106为度博文,110为度小童,111为度小萌,默认为度小美
PER = 0
# 语速,取值0-15,默认为5中语速
SPD = 5
# 音调,取值0-15,默认为5中语调
PIT = 1
# 音量,取值0-9,默认为5中音量
VOL = 3
# 下载的文件格式, 3:mp3(default) 4: pcm-16k 5: pcm-8k 6. wav
AUE = 6
FORMATS = {3: "mp3", 4: "pcm", 5: "pcm", 6: "wav"}
FORMAT = FORMATS[AUE]
CUID = "123456PYTHON"
TTS_URL = 'http://tsn.baidu.com/text2audio'
class DemoError(Exception):
pass
""" TOKEN start """
TOKEN_URL = 'http://aip.baidubce.com/oauth/2.0/token'
SCOPE = 'audio_tts_post' # 有此scope表示有tts能力,没有请在网页里勾选
def fetch_token():
# print("fetch token begin")
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
if (IS_PY3):
post_data = post_data.encode('utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req, timeout=5)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = result_str.decode()
# print(result_str)
result = json.loads(result_str)
# print(result)
if ('access_token' in result.keys() and 'scope' in result.keys()):
if not SCOPE in result['scope'].split(' '):
raise DemoError('scope is not correct')
# print('SUCCESS WITH TOKEN: %s ; EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
return result['access_token']
else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')
""" TOKEN end """
token = fetch_token()
tex = quote_plus(TEXT) # 此处TEXT需要两次urlencode
# print(tex)
params = {'tok': token, 'tex': tex, 'per': PER, 'spd': SPD, 'pit': PIT, 'vol': VOL, 'aue': AUE, 'cuid': CUID,
'lan': 'zh', 'ctp': 1} # lan ctp 固定参数
data = urlencode(params)
# print('test on Web Browser' + TTS_URL + '?' + data)
req = Request(TTS_URL, data.encode('utf-8'))
has_error = False
try:
f = urlopen(req)
result_str = f.read()
headers = dict((name.lower(), value) for name, value in f.headers.items())
has_error = ('content-type' not in headers.keys() or headers['content-type'].find('audio/') < 0)
except URLError as err:
print('asr http response http code : ' + str(err.code))
result_str = err.read()
has_error = True
save_file = "error.txt" if has_error else '/root/result.' + FORMAT
with open(save_file, 'wb') as of:
of.write(result_str)
if has_error:
if (IS_PY3):
result_str = str(result_str, 'utf-8')
print("tts api error:" + result_str)
print("result saved as :" + save_file)
7、语音播放
将前面转换的回复语音进行播放
#播放
def audio(PLAY_FILENAME):
PLAY_FILENAME = PLAY_FILENAME
# 设置WAV文件格式
CHUNK = 512
# 文件名称,默认保存到U盘,即/root/下
wf = wave.open(PLAY_FILENAME, 'rb')
# 构建pyAuduio对象
p = pyaudio.PyAudio()
# 开启流,输出模式,即播放
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
# 音频数据
data = wf.readframes(CHUNK)
# 流播放音频
while len(data) > 0:
stream.write(data)
data = wf.readframes(CHUNK)
# 停止流
stream.stop_stream()
stream.close()
# 停止pyAudio对象
p.terminate()
8、完整代码
# 导入相关模块
import time
# 导入相关模块
import pyaudio
import wave
import sys
import _thread # 导入线程模块
import sys
import json
import time
import requests
from urllib.request import urlopen
from urllib.request import Request
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.parse import quote_plus
from maix import gpio # 导入相关模块
# KEY 是 PH13,输入模式
KEY = gpio.gpio(13, "H", 1, 2)
# LED 是 PH14,默认输出模式
LED = gpio.gpio(14, "H", 1)
IS_PY3 = sys.version_info.major == 3
API_KEY = '********************' #百度语音识别的API KEY
SECRET_KEY = '********************' #百度语音识别的SECRET KEY
LED.set_value(1)
#录音
def record(WAVE_OUTPUT_FILENAME,RECORD_SECONDS):
'''
:param WAVE_OUTPUT_FILENAME: 录音保存文件名
:param RECORD_SECONDS: 录音时间
:return:
'''
# 设置WAV文件格式
CHUNK = 512
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = RECORD_SECONDS # 录音时间
WAVE_OUTPUT_FILENAME = WAVE_OUTPUT_FILENAME#r"/root/test.wav" # 文件名称,默认保存到U盘,即/root/下
# 构建pyAuduio对象
p = pyaudio.PyAudio()
# 开启流,输入模式,即录音
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
# print("* recording")
# 录音数据
frames = []
# 读取流,即录音
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
# print("* done recording")
# 停止流
stream.stop_stream()
stream.close()
# 关闭pyAudio
p.terminate()
# 保存WAV文件
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
#播放
def audio(PLAY_FILENAME):
PLAY_FILENAME = PLAY_FILENAME
# 设置WAV文件格式
CHUNK = 512
# 文件名称,默认保存到U盘,即/root/下
wf = wave.open(PLAY_FILENAME, 'rb')
# 构建pyAuduio对象
p = pyaudio.PyAudio()
# 开启流,输出模式,即播放
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
# 音频数据
data = wf.readframes(CHUNK)
# 流播放音频
while len(data) > 0:
stream.write(data)
data = wf.readframes(CHUNK)
# 停止流
stream.stop_stream()
stream.close()
# 停止pyAudio对象
p.terminate()
# 语音转文字模块
def SpeechRecognition(path):
'''
:param path: 需要语音转文字的音频文件
:return: 返回识别到的文字
'''
import json
import base64
timer = time.perf_counter
# 普通版
DEV_PID = 1537 # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get' # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有
# 需要识别的文件
AUDIO_FILE = f'{path}'
# 文件格式
FORMAT = AUDIO_FILE[-3:] # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
CUID = '123456PYTHON'
# 采样率
RATE = 16000 # 固定值
""" TOKEN start """
TOKEN_URL = 'http://aip.baidubce.com/oauth/2.0/token'
class DemoError(Exception):
pass
def fetch_token():
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
if (IS_PY3):
post_data = post_data.encode('utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = result_str.decode()
# print(result_str)
result = json.loads(result_str)
# print(result)
if ('access_token' in result.keys() and 'scope' in result.keys()):
print(SCOPE)
if SCOPE and (not SCOPE in result['scope'].split(' ')): # SCOPE = False 忽略检查
raise DemoError('scope is not correct')
# print('SUCCESS WITH TOKEN: %s EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
return result['access_token']
else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')
""" TOKEN end """
token = fetch_token()
speech_data = []
with open(AUDIO_FILE, 'rb') as speech_file:
speech_data = speech_file.read()
length = len(speech_data)
if length==0:
raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)
speech = base64.b64encode(speech_data)
if (IS_PY3):
speech = str(speech, 'utf-8')
params = {'dev_pid': DEV_PID,
# "lm_id" : LM_ID, #测试自训练平台开启此项
'format': FORMAT,
'rate': RATE,
'token': token,
'cuid': CUID,
'channel': 1,
'speech': speech,
'len': length
}
post_data = json.dumps(params, sort_keys=False)
# print post_data
req = Request(ASR_URL, post_data.encode('utf-8'))
req.add_header('Content-Type', 'application/json')
try:
# begin = timer()
f = urlopen(req)
result_str = f.read()
# print("Request time cost %f" % (timer() - begin))
except URLError as err:
print('asr http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = str(result_str, 'utf-8')
result = json.loads(result_str)
# print(result['result'][0])
#把百度返回的内容写到SpeechRecognition_result.txt
with open(r"/root/SpeechRecognition_result.txt", "w") as of:
of.write(result_str)
return result['result'][0]
#文字转语音模块
def SpeechSynthesis(text_words=""):
TEXT = text_words
# 发音人选择, 基础音库:0为度小美,1为度小宇,3为度逍遥,4为度丫丫,
# 精品音库:5为度小娇,103为度米朵,106为度博文,110为度小童,111为度小萌,默认为度小美
PER = 0
# 语速,取值0-15,默认为5中语速
SPD = 5
# 音调,取值0-15,默认为5中语调
PIT = 1
# 音量,取值0-9,默认为5中音量
VOL = 3
# 下载的文件格式, 3:mp3(default) 4: pcm-16k 5: pcm-8k 6. wav
AUE = 6
FORMATS = {3: "mp3", 4: "pcm", 5: "pcm", 6: "wav"}
FORMAT = FORMATS[AUE]
CUID = "123456PYTHON"
TTS_URL = 'http://tsn.baidu.com/text2audio'
class DemoError(Exception):
pass
""" TOKEN start """
TOKEN_URL = 'http://aip.baidubce.com/oauth/2.0/token'
SCOPE = 'audio_tts_post' # 有此scope表示有tts能力,没有请在网页里勾选
def fetch_token():
# print("fetch token begin")
params = {'grant_type': 'client_credentials',
'client_id': API_KEY,
'client_secret': SECRET_KEY}
post_data = urlencode(params)
if (IS_PY3):
post_data = post_data.encode('utf-8')
req = Request(TOKEN_URL, post_data)
try:
f = urlopen(req, timeout=5)
result_str = f.read()
except URLError as err:
print('token http response http code : ' + str(err.code))
result_str = err.read()
if (IS_PY3):
result_str = result_str.decode()
# print(result_str)
result = json.loads(result_str)
# print(result)
if ('access_token' in result.keys() and 'scope' in result.keys()):
if not SCOPE in result['scope'].split(' '):
raise DemoError('scope is not correct')
# print('SUCCESS WITH TOKEN: %s ; EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
return result['access_token']
else:
raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')
""" TOKEN end """
token = fetch_token()
tex = quote_plus(TEXT) # 此处TEXT需要两次urlencode
# print(tex)
params = {'tok': token, 'tex': tex, 'per': PER, 'spd': SPD, 'pit': PIT, 'vol': VOL, 'aue': AUE, 'cuid': CUID,
'lan': 'zh', 'ctp': 1} # lan ctp 固定参数
data = urlencode(params)
# print('test on Web Browser' + TTS_URL + '?' + data)
req = Request(TTS_URL, data.encode('utf-8'))
has_error = False
try:
f = urlopen(req)
result_str = f.read()
headers = dict((name.lower(), value) for name, value in f.headers.items())
has_error = ('content-type' not in headers.keys() or headers['content-type'].find('audio/') < 0)
except URLError as err:
print('asr http response http code : ' + str(err.code))
result_str = err.read()
has_error = True
save_file = "error.txt" if has_error else '/root/result.' + FORMAT
with open(save_file, 'wb') as of:
of.write(result_str)
if has_error:
if (IS_PY3):
result_str = str(result_str, 'utf-8')
print("tts api error:" + result_str)
print("result saved as :" + save_file)
def Wen_Xin_said(text_words=""):
API_Key = "********************" #百度文心一言的API Key
Secret_Key = "********************" #百度文心一言的Secret Key
def get_access_token():
"""
使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
"""
url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={API_Key}&client_secret={Secret_Key}"
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/yi_34b_chat?access_token=" + get_access_token()
# print('等待回应中\r\n')
payload = json.dumps({
"messages": [
{
"role": "user",
"content": "hi"
},
{
"role": "assistant",
"content": "Hello! How can I assist you today? If you have any questions or need information on a specific topic, feel free to ask."
},
{
"role": "user",
"content": f"{text_words}"
}
]
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
# print(response.text)
response_result = response.text
result = json.loads(response_result)
# print(result['result'])
# 把文心一言返回的内容写到Wen_Xin_said_result.txt
with open(r"/root/Wen_Xin_said_result.txt", "w") as of:
of.write(response_result)
return result['result']
global key_state
key_state = 0
# 按键线程函数
def func_key(name):
global key_state
while True:
# KEY 被按下
if KEY.get_value()==0:
time.sleep(0.05) # 延时消抖
if KEY.get_value()==0:
LED.set_value(0) # 点亮蓝灯
key_state = 1
while KEY.get_value()==0: # 等待按键释放
pass
# else:
# LED.set_value(1) # 熄灭蓝灯
# 对话线程函数
def func_conversation(name):
global key_state
while True:
if key_state:
record(r"/root/test.wav",RECORD_SECONDS=10) #录音
LED.set_value(1) # 熄灭蓝灯
# time.sleep(5)
result = SpeechRecognition(r"/root/test.wav") #语音转文字
result = Wen_Xin_said(result) #文心一言回答
SpeechSynthesis(result) #文字转语音
audio(r"/root/result.wav") #播放回答
key_state = 0
_thread.start_new_thread(func_key, ("1",)) # 开启按键线程,参数必须是元组
_thread.start_new_thread(func_conversation, ("2",)) # 开启对话线程,参数必须是元组
while True:
pass
9、实验效果
语音识别和文心一言的回答都会保存相应的.txt文档,如下所示: