AIGC - Stable Diffusion 搭建
文章目录
AIGC - Stable Diffusion 搭建 1. Huggingface Token 2. Cloud GPU 3. autocover.txt 4. userinput_write_pyfile.py: 5. SSH API 6. ffmpeg 7. Piano Audio To Midi 8. Spleeter1. Huggingface Token
url = ’‘
1. 注册获取账号
2. 生成获取用户项目token
2. Cloud GPU
url = '' # 可租用矩池云
1. 注册获取账号
2. 选择GPU档次及预装型号(建议huggingface,Pytorch)
3. huggingface-cli login # Cloud服务器通过token验证 | 下载模型
4. JupyterLab
pip3 install diffusers
pip3 install accelerate
pip3 install spacy ftfy==4.4.3
3. autocover.txt
import diffusers, torch, os
from time import strftime, localtime
model_id = "CompVis/stable-diffusion-v1-4"
device = torch.device("cuda")
pipe = diffusers.StableDiffusionPipeline.from_pretrained(model_id, use_auth_token=True)
pipe = pipe.to(device)
image = pipe(prompt).images[0]
timeprint = strftime("%Y%m%d%H%M%S", localtime())
fldpath = os.path.join('/mnt/', 'xxx/img')
if not os.path.exists(fldpath):
os.mkdir(fldpath)
img_filenm = f'{timeprint}.png'
image.save(os.path.join(fldpath, img_filenm))
print(f'{timeprint}.png')
4. userinput_write_pyfile.py:
def userinput_write_pyfile(user_input,local_put_path):
'''
- 将用户输入的文字信息写入远程py待执行文件中
'''
import os
f = open(local_put_path,'r')
index = 0
lines = []
for line in f.readlines():
index +=1
if index == 3:
prompt = f'prompt = "{user_input}"'
lines.append(prompt)
lines.append(line)
usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload')
pyfilepath = os.path.join(usr_path,'autocover.py')
pf = open(pyfilepath,'w')
for line in lines:
pf.write(''.join(line))
pf.close()
return pyfilepath
5. SSH API
# -*- coding:utf-8 -*-
def get_SSH_account():
host = "" # 远程服务器ip
port = 0000 # 远程服务器端口
password = "" # 远程服务器密码
user = "" # 远程服务器用户名
account_info = [host,port,password,user] # 返回SSH登陆信息
return account_info
def SSH_connection(user_input,accounts,paths,write_py, specail_fname,is_DownloadALL,filter_fname):
import paramiko,os,time
(host, port, password, user) = accounts
(remote_get_path, remote_put_paths, local_get_path, local_put_paths, pyfilepath, python_path) = paths
class Pysftp(object):
"""
python3 自动上传、下载文件
"""
global local_put_paths, local_put_path, local_get_path, remote_put_paths,remote_put_path, remote_get_path, pyfilepath, python_path
def __init__(self, host, port, user, password):
self.host = host
self.port = port
self.user = user
self.password = password
def connect(self):
"""
建立ssh连接
"""
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(self.host, self.port, self.user, self.password)
print("[SSH] -> 连接成功")
# def cmd (self, cmd):
# """
# 需要执行的命令
# """
# cmd = "ls"
# stdin, stdout, stderr = self.ssh.exec_command(cmd)
# print(stdout.read().decode("utf8"))
# def mkdir(self):
# """
# 创建本地文件夹,存放上传、下载的文件
# """
# for lp in [local_put_path, local_get_path]:
# if not os.path.exists(lp):
# os.makedirs(lp, 666)
# print("创建本地文件夹:{}".format(lp))
# else:
# print("本地文件夹:{}已存在".format(lp))
def run_pyfile(self,pyfilepath):
'''
执行python文件
'''
cmd = f"export PATH={python_path}:$PATH;python3 -u {pyfilepath}"
stdin, stdout, stderr = self.ssh.exec_command(cmd)
for line in stderr :
print(line.rstrip())
flnm = stdout.readlines()[0].replace('\n', '')
print("[SSH] ->生成成功")
return flnm
def put(self,user_input,local_put_path,remote_put_path,path_i):
"""
上传文件
"""
sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
# sftp = self.ssh.open_sftp()
if path_i == 0:
pyfile_path = write_py(user_input,local_put_path) # ♻️
local_full_name = pyfile_path
fname = os.path.basename(pyfile_path)
else:
local_full_name = local_put_path
fname = specail_fname
# local_done_write(local_full_name) # 本地文件已写入完成,可以上传了
sftp.put(local_full_name, os.path.join(remote_put_path, fname))
print(f'[{fname}] -> 上传成功')
# os.remove(local_full_name)
# print("{}\n上传成功:本地文件:{}====>远程{}:{},已删除该本地文件\n".format(datetime.datetime.now(), local_full_name, self.host, remote_put_path))
def get(self,flnm,is_DownloadALL,filter_fname):
"""
下载文件
"""
sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
flidx = 0
for fname in sftp.listdir(remote_get_path):
flidx += 1
try:
if is_DownloadALL:
print(f'[{fname}] -> 正在下载')
remote_full_name = os.path.join(remote_get_path, fname)
self.remote_done_transffer(remote_full_name)
sftp.get(remote_full_name, os.path.join(local_get_path, fname))
# sftp.remove(remote_full_name)
print(f'[{fname}] -> 下载成功')
else:
if len(filter_fname) > 0:
if filter_fname in fname:
print(f'[{fname}] -> 正在下载')
remote_full_name = os.path.join(remote_get_path, fname)
self.remote_done_transffer(remote_full_name)
from time import strftime,localtime
timeprint = strftime("%Y%m%d%H%M%S", localtime())
nfname = f'{timeprint}{filter_fname}'
sftp.get(remote_full_name, os.path.join(local_get_path, nfname))
# sftp.remove(remote_full_name)
print(f'[{fname}] -> 下载成功')
flnm = nfname
break
else:
print(f'[{fname}] -> [{filter_fname}]')
if flidx == len(sftp.listdir(remote_get_path)) - 1:
print(f'[{fname}] -> [{filter_fname}] 无法找寻A')
else:
if flnm in fname:
print(f'[{fname}] -> 正在下载')
remote_full_name = os.path.join(remote_get_path, fname)
self.remote_done_transffer(remote_full_name)
sftp.get(remote_full_name, os.path.join(local_get_path, fname))
# sftp.remove(remote_full_name)
print(f'[{fname}] -> 下载成功')
break
else:
if flidx == len(sftp.listdir(remote_get_path))-1:
print(f'[{fname}] -> [{flnm}] 无法找寻B')
except Exception as e:
print(e)
return flnm
def stat(self, fpath):
"""
检查远程服务器文件状态
:param fpath:文件绝对路径
"""
sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
# sftp = self.ssh.open_sftp()
return sftp.stat(fpath)
def remote_done_transffer(self, fpath):
"""
检查文件是否传输完成
:param fpath:远程服务器上待下载文件绝对路径
"""
while True:
old_size = self.stat(fpath).st_size
time.sleep(3)
new_size = self.stat(fpath).st_size
if new_size <= old_size: # 传输已完成
return
def close(self):
"""
关闭ssh连接
"""
self.ssh.close()
print("[SSH] -> 连接关闭")
def local_done_write(self, fpath,old_size):
"""
检查本地文件是否已写入完成
:param fpath:本地待上传文件绝对路径
"""
new_size = os.stat(fpath).st_size
if new_size > old_size:
print('[SSH] -> ? 获取成功')
return
else:
print('[SSH] -> 没有新的写入')
def transffer(local_get_path):
"""
传输函数
"""
old_size = os.stat(local_get_path).st_size
obj = Pysftp(host, port, user, password)
obj.connect() # SSH连接
for path_i in range(len(local_put_paths)): # 上传py文件/其它附件
local_put_path = local_put_paths[path_i]
remote_put_path = remote_put_paths[path_i]
obj.put(user_input,local_put_path,remote_put_path,path_i)
flnm = obj.run_pyfile(pyfilepath) # 执行py文件
flnm = obj.get(flnm,is_DownloadALL,filter_fname)# 下载文件
obj.local_done_write(local_get_path,old_size) # 下载检查
obj.close() # 关闭连接
return flnm
flnm = transffer(local_get_path)
flpath = os.path.join(local_get_path,flnm)
return flpath
6. ffmpeg
解决音频内容GPU运算时遇到ffmpeg引起问题
# conda terminal install
conda config --add channels conda-forge
conda install ffmpeg
pip3 install ffmpy
# 找到ffmpeg安装的位置
which ffmpeg
# 寻找ffdec.py文件并修改33行 【/root/miniconda3/envs/myconda/lib/python3.8/site-packages/audioread/】
ori_COMMANDS = ('ffmpeg', 'avconv')
new_COMMANDS = ('/root/miniconda3/envs/myconda/bin/ffmpeg', 'avconv')
7. Piano Audio To Midi
# pip3 install
pip3 install piano_transcription_inference
# 将训练集放置在指定位置
8. Spleeter
write_py
# pip3 install
# pip3 install Spleeter
# pip3 install pydub
# 将训练集放置在指定位置
def auto_spleeter_pywrite(userinput,local_put_path):
import os
f = open(local_put_path, 'r')
index = 0
lines = []
for line in f.readlines():
index +=1
if index == 1:
song_convert_path = f'song_convert_path = "/mnt/aigc_music/spleeter/source/song/"'
bounce_path = f'bounce_path = "/mnt/aigc_music/spleeter/bounce"'
lines.append(song_convert_path)
lines.append('\n')
lines.append(bounce_path)
lines.append('\n')
lines.append(line)
mainpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH')
usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload')
downloadpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/download')
paths = [mainpath, usr_path, downloadpath]
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
pyfilepath = os.path.join(usr_path, 'audio_spleeter.py')
pf = open(pyfilepath, 'w')
for line in lines:
pf.write(''.join(line))
pf.close()
return pyfilepath
audio_spleeter.txt
import os,subprocess,spleeter
audio_path = os.path.dirname(song_convert_path) + '/'
result_path = bounce_path
# 执行spleeter
cmd = 'for f in ' + audio_path + '*.mp3; do spleeter separate -o ' + result_path + ' -p spleeter:5stems-16kHz $f -B tensorflow; done'
subprocess.call(cmd, shell=True)
# 压缩音频文件
def wav_to_mp3():
from pydub import AudioSegment
wav_foldepath = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0]
wav_flodepath = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(wav_foldepath)
wav_files = [os.path.join(wav_foldepath,i) for i in os.listdir(wav_flodepath) if i.endswith('.wav')]
for w in wav_files:
sound = AudioSegment.from_wav(w)
mp3_flnm = w.replace('.wav', '.mp3')
sound.export(mp3_flnm, format='mp3', bitrate='320k')
wav_to_mp3()
# zip打包分轨
def get_zip_filepath():
zipfolders = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0]
zipfolders = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(zipfolders) + '/'
zipsource_files = [os.path.join(zipfolders,i) for i in os.listdir(zipfolders) if i.endswith('.mp3')]
zip_file = os.path.join(bounce_path+'/temp_split/','zip_temp.zip')
def file2zip(zip_file, zipsource_files):
import zipfile, os
with zipfile.ZipFile(zip_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file in zipsource_files:
parent_path, name = os.path.split(file)
zf.write(file, arcname=name)
file2zip(
zip_file=zip_file,
zipsource_files=zipsource_files,
) # ♻️
zip_tmpfolderpath = zipfolders
if not os.path.exists(zip_tmpfolderpath):
os.mkdir(zip_tmpfolderpath)
zip_path = os.path.join(zip_tmpfolderpath,'zip_temp.zip')
return zip_path
zip_path = get_zip_filepath() # ♻️
print(zip_path)