当前位置:AIGC资讯 > AIGC > 正文

AIGC - Stable Diffusion 搭建【从零到一】

AIGC - Stable Diffusion 搭建

文章目录

AIGC - Stable Diffusion 搭建 1. Huggingface Token 2. Cloud GPU 3. autocover.txt 4. userinput_write_pyfile.py: 5. SSH API 6. ffmpeg 7. Piano Audio To Midi 8. Spleeter

1. Huggingface Token

url = ’‘
1. 注册获取账号
2. 生成获取用户项目token

2. Cloud GPU

url = '' # 可租用矩池云
1. 注册获取账号
2. 选择GPU档次及预装型号(建议huggingface,Pytorch)
3. huggingface-cli login  # Cloud服务器通过token验证 | 下载模型 
4. JupyterLab 
   pip3 install diffusers 
   pip3 install accelerate
   pip3 install spacy ftfy==4.4.3
   

3. autocover.txt

import diffusers, torch, os
from time import strftime, localtime

model_id = "CompVis/stable-diffusion-v1-4"
device = torch.device("cuda")
pipe = diffusers.StableDiffusionPipeline.from_pretrained(model_id, use_auth_token=True)
pipe = pipe.to(device)
image = pipe(prompt).images[0]
timeprint = strftime("%Y%m%d%H%M%S", localtime())

fldpath = os.path.join('/mnt/', 'xxx/img')
if not os.path.exists(fldpath):
    os.mkdir(fldpath)
img_filenm = f'{timeprint}.png'
image.save(os.path.join(fldpath, img_filenm))
print(f'{timeprint}.png')

4. userinput_write_pyfile.py:

def userinput_write_pyfile(user_input,local_put_path):
	'''
	- 将用户输入的文字信息写入远程py待执行文件中
	'''
    import os
    f = open(local_put_path,'r')
    index = 0
    lines = []
    for line in f.readlines():
        index +=1
        if index == 3:
            prompt = f'prompt = "{user_input}"'
            lines.append(prompt)
        lines.append(line)
    usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload')
    pyfilepath = os.path.join(usr_path,'autocover.py')
    pf = open(pyfilepath,'w')
    for line in lines:
        pf.write(''.join(line))
    pf.close()
    return pyfilepath

5. SSH API

# -*- coding:utf-8 -*-
def get_SSH_account():
    host         =  ""            # 远程服务器ip
    port         =  0000          # 远程服务器端口
    password     =  ""            # 远程服务器密码
    user         =  ""            # 远程服务器用户名
    account_info = [host,port,password,user]    # 返回SSH登陆信息
    return account_info
    
def SSH_connection(user_input,accounts,paths,write_py, specail_fname,is_DownloadALL,filter_fname):
    import paramiko,os,time
    (host, port, password, user) = accounts
    (remote_get_path, remote_put_paths, local_get_path, local_put_paths, pyfilepath, python_path) = paths

    class Pysftp(object):
        """
        python3 自动上传、下载文件
        """
        global local_put_paths, local_put_path, local_get_path, remote_put_paths,remote_put_path, remote_get_path, pyfilepath, python_path

        def __init__(self, host, port, user, password):
            self.host = host
            self.port = port
            self.user = user
            self.password = password

        def connect(self):
            """
            建立ssh连接
            """
            self.ssh = paramiko.SSHClient()
            self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh.connect(self.host, self.port, self.user, self.password)
            print("[SSH] -> 连接成功")

        # def cmd (self, cmd):
        #     """
        # 	需要执行的命令
        # 	"""
        #     cmd = "ls"
        #     stdin, stdout, stderr = self.ssh.exec_command(cmd)
        #     print(stdout.read().decode("utf8"))

        # def mkdir(self):
        #     """
        #     创建本地文件夹,存放上传、下载的文件
        #     """
        #     for lp in [local_put_path, local_get_path]:
        #         if not os.path.exists(lp):
        #             os.makedirs(lp, 666)
        #             print("创建本地文件夹:{}".format(lp))
        #         else:
        #             print("本地文件夹:{}已存在".format(lp))
        
        def run_pyfile(self,pyfilepath):
            '''
            执行python文件
            '''
            cmd = f"export PATH={python_path}:$PATH;python3 -u {pyfilepath}"
            stdin, stdout, stderr = self.ssh.exec_command(cmd)
            for line in stderr :
                print(line.rstrip())
            flnm = stdout.readlines()[0].replace('\n', '')
            print("[SSH] ->生成成功")
            return flnm

        def put(self,user_input,local_put_path,remote_put_path,path_i):
            """
            上传文件
            """
            sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
            # sftp = self.ssh.open_sftp()
            if path_i == 0:
                pyfile_path = write_py(user_input,local_put_path) # ♻️
                local_full_name = pyfile_path
                fname = os.path.basename(pyfile_path)
            else:
                local_full_name = local_put_path
                fname = specail_fname
            # local_done_write(local_full_name)   # 本地文件已写入完成,可以上传了
            sftp.put(local_full_name, os.path.join(remote_put_path, fname))
            print(f'[{fname}] -> 上传成功')
            # os.remove(local_full_name)
            # print("{}\n上传成功:本地文件:{}====>远程{}:{},已删除该本地文件\n".format(datetime.datetime.now(), local_full_name, self.host, remote_put_path))

        def get(self,flnm,is_DownloadALL,filter_fname):
            """
            下载文件
            """
            sftp  = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
            flidx = 0
            for fname in sftp.listdir(remote_get_path):
                flidx += 1
                try:
                    if is_DownloadALL:
                        print(f'[{fname}] -> 正在下载')
                        remote_full_name = os.path.join(remote_get_path, fname)
                        self.remote_done_transffer(remote_full_name)
                        sftp.get(remote_full_name, os.path.join(local_get_path, fname))
                        # sftp.remove(remote_full_name)
                        print(f'[{fname}] -> 下载成功')
                    else:
                        if len(filter_fname) > 0:
                            if filter_fname in fname:
                                print(f'[{fname}] -> 正在下载')
                                remote_full_name = os.path.join(remote_get_path, fname)
                                self.remote_done_transffer(remote_full_name)
                                from time import strftime,localtime
                                timeprint = strftime("%Y%m%d%H%M%S", localtime())
                                nfname = f'{timeprint}{filter_fname}'
                                sftp.get(remote_full_name, os.path.join(local_get_path, nfname))
                                # sftp.remove(remote_full_name)
                                print(f'[{fname}] -> 下载成功')
                                flnm = nfname
                                break
                            else:
                                print(f'[{fname}] -> [{filter_fname}]')
                                if flidx == len(sftp.listdir(remote_get_path)) - 1:
                                    print(f'[{fname}] -> [{filter_fname}] 无法找寻A')
                        else:
                            if flnm in fname:
                                print(f'[{fname}] -> 正在下载')
                                remote_full_name = os.path.join(remote_get_path, fname)
                                self.remote_done_transffer(remote_full_name)
                                sftp.get(remote_full_name, os.path.join(local_get_path, fname))
                                # sftp.remove(remote_full_name)
                                print(f'[{fname}] -> 下载成功')
                                break
                            else:
                                if flidx == len(sftp.listdir(remote_get_path))-1:
                                    print(f'[{fname}] -> [{flnm}] 无法找寻B')
                except Exception as e:
                    print(e)
            return flnm

        def stat(self, fpath):
            """
            检查远程服务器文件状态
            :param fpath:文件绝对路径
            """
            sftp = paramiko.SFTPClient.from_transport(self.ssh.get_transport())
            # sftp = self.ssh.open_sftp()
            return sftp.stat(fpath)

        def remote_done_transffer(self, fpath):
            """
            检查文件是否传输完成
            :param fpath:远程服务器上待下载文件绝对路径
            """
            while True:
                old_size = self.stat(fpath).st_size
                time.sleep(3)
                new_size = self.stat(fpath).st_size
                if new_size <= old_size: # 传输已完成
                    return

        def close(self):
            """
            关闭ssh连接
            """
            self.ssh.close()
            print("[SSH] -> 连接关闭")

        def local_done_write(self, fpath,old_size):
            """
            检查本地文件是否已写入完成
            :param fpath:本地待上传文件绝对路径
            """
            new_size = os.stat(fpath).st_size
            if new_size > old_size:
                print('[SSH] -> ? 获取成功')
                return
            else:
                print('[SSH] -> 没有新的写入')

    def transffer(local_get_path):
        """
        传输函数
        """
        old_size = os.stat(local_get_path).st_size
        obj = Pysftp(host, port, user, password)
        obj.connect()                                   # SSH连接
        for path_i in range(len(local_put_paths)):      # 上传py文件/其它附件
            local_put_path  = local_put_paths[path_i]
            remote_put_path = remote_put_paths[path_i]
            obj.put(user_input,local_put_path,remote_put_path,path_i)
        flnm = obj.run_pyfile(pyfilepath)               # 执行py文件
        flnm = obj.get(flnm,is_DownloadALL,filter_fname)# 下载文件
        obj.local_done_write(local_get_path,old_size)   # 下载检查
        obj.close()                                     # 关闭连接
        return flnm
    flnm = transffer(local_get_path)
    flpath = os.path.join(local_get_path,flnm)
    return flpath

6. ffmpeg

解决音频内容GPU运算时遇到ffmpeg引起问题

# conda terminal install
conda config --add channels conda-forge
conda install ffmpeg
pip3 install ffmpy

# 找到ffmpeg安装的位置
which ffmpeg

# 寻找ffdec.py文件并修改33行 【/root/miniconda3/envs/myconda/lib/python3.8/site-packages/audioread/】
ori_COMMANDS = ('ffmpeg', 'avconv')
new_COMMANDS = ('/root/miniconda3/envs/myconda/bin/ffmpeg', 'avconv')

7. Piano Audio To Midi

# pip3 install 
pip3 install piano_transcription_inference

# 将训练集放置在指定位置

8. Spleeter

write_py

# pip3 install 
# pip3 install Spleeter
# pip3 install pydub
# 将训练集放置在指定位置

def auto_spleeter_pywrite(userinput,local_put_path):
    import os
    f = open(local_put_path, 'r')
    index = 0
    lines = []
    for line in f.readlines():
        index +=1
        if index == 1:
            song_convert_path = f'song_convert_path = "/mnt/aigc_music/spleeter/source/song/"'
            bounce_path       = f'bounce_path = "/mnt/aigc_music/spleeter/bounce"'
            lines.append(song_convert_path)
            lines.append('\n')
            lines.append(bounce_path)
            lines.append('\n')
        lines.append(line)
    mainpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH')
    usr_path = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/upload')
    downloadpath = os.path.join(os.path.expanduser('~'), 'Desktop/SSH/download')
    paths = [mainpath, usr_path, downloadpath]
    for path in paths:
        if not os.path.exists(path):
            os.mkdir(path)
    pyfilepath = os.path.join(usr_path, 'audio_spleeter.py')
    pf = open(pyfilepath, 'w')
    for line in lines:
        pf.write(''.join(line))
    pf.close()
    return pyfilepath

audio_spleeter.txt

import os,subprocess,spleeter
audio_path  = os.path.dirname(song_convert_path)  + '/'
result_path = bounce_path

# 执行spleeter
cmd = 'for f in ' + audio_path + '*.mp3; do spleeter separate -o ' + result_path + ' -p spleeter:5stems-16kHz $f -B tensorflow; done'
subprocess.call(cmd, shell=True)

# 压缩音频文件
def wav_to_mp3():
    from pydub import AudioSegment
    wav_foldepath = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0]
    wav_flodepath = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(wav_foldepath)
    wav_files = [os.path.join(wav_foldepath,i) for i in os.listdir(wav_flodepath) if i.endswith('.wav')]
    for w in wav_files:
        sound = AudioSegment.from_wav(w)
        mp3_flnm = w.replace('.wav', '.mp3')
        sound.export(mp3_flnm, format='mp3', bitrate='320k')
wav_to_mp3()

# zip打包分轨
def get_zip_filepath():
    zipfolders = [os.path.join(bounce_path,i) for i in os.listdir(bounce_path) if not i.startswith('.')][0]
    zipfolders = '/mnt/aigc_music/spleeter/bounce/temp_split' # os.path.dirname(zipfolders) + '/'
    zipsource_files = [os.path.join(zipfolders,i) for i in os.listdir(zipfolders) if i.endswith('.mp3')]
    zip_file = os.path.join(bounce_path+'/temp_split/','zip_temp.zip')
    def file2zip(zip_file, zipsource_files):
        import zipfile, os
        with zipfile.ZipFile(zip_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
            for file in zipsource_files:
                parent_path, name = os.path.split(file)
                zf.write(file, arcname=name)

    file2zip(
        zip_file=zip_file,
        zipsource_files=zipsource_files,
    )  # ♻️

    zip_tmpfolderpath = zipfolders
    if not os.path.exists(zip_tmpfolderpath):
        os.mkdir(zip_tmpfolderpath)
    zip_path = os.path.join(zip_tmpfolderpath,'zip_temp.zip')
    return zip_path
zip_path = get_zip_filepath()  # ♻️
print(zip_path)

更新时间 2023-11-22