commit
bd6080c52e
4
client/demo/dist/index.js
vendored
4
client/demo/dist/index.js
vendored
File diff suppressed because one or more lines are too long
@ -28,7 +28,7 @@ export class ServerConfigurator {
|
||||
}
|
||||
|
||||
updateSettings = async (key: ServerSettingKey, val: string) => {
|
||||
const url = this.serverUrl + "/update_setteings"
|
||||
const url = this.serverUrl + "/update_settings"
|
||||
const info = await new Promise<ServerInfo>(async (resolve) => {
|
||||
const formData = new FormData();
|
||||
formData.append("key", key);
|
||||
@ -132,4 +132,4 @@ export class ServerConfigurator {
|
||||
return await info
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,10 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Literal, TypeAlias
|
||||
|
||||
|
||||
ModelType: TypeAlias = Literal['MMVCv15', 'MMVCv13', 'so-vits-svc-40v2', 'so-vits-svc-40', 'so-vits-svc-40_c', 'DDSP-SVC', 'RVC']
|
||||
|
||||
ERROR_NO_ONNX_SESSION = "ERROR_NO_ONNX_SESSION"
|
||||
|
||||
@ -22,14 +26,14 @@ TMP_DIR = os.path.join(tmpdir.name, "tmp_dir") if hasattr(sys, "_MEIPASS") else
|
||||
os.makedirs(TMP_DIR, exist_ok=True)
|
||||
|
||||
|
||||
modelType = "MMVCv15"
|
||||
modelType: ModelType = "MMVCv15"
|
||||
|
||||
|
||||
def getModelType():
|
||||
def getModelType() -> ModelType:
|
||||
return modelType
|
||||
|
||||
|
||||
def setModelType(_modelType: str):
|
||||
def setModelType(_modelType: ModelType):
|
||||
global modelType
|
||||
modelType = _modelType
|
||||
|
||||
|
@ -21,7 +21,7 @@ class MMVC_Rest_Fileuploader:
|
||||
self.router.add_api_route("/info", self.get_info, methods=["GET"])
|
||||
self.router.add_api_route("/upload_file", self.post_upload_file, methods=["POST"])
|
||||
self.router.add_api_route("/concat_uploaded_file", self.post_concat_uploaded_file, methods=["POST"])
|
||||
self.router.add_api_route("/update_setteings", self.post_update_setteings, methods=["POST"])
|
||||
self.router.add_api_route("/update_settings", self.post_update_settings, methods=["POST"])
|
||||
self.router.add_api_route("/load_model", self.post_load_model, methods=["POST"])
|
||||
self.router.add_api_route("/load_model_for_train", self.post_load_model_for_train, methods=["POST"])
|
||||
self.router.add_api_route("/extract_voices", self.post_extract_voices, methods=["POST"])
|
||||
@ -43,9 +43,9 @@ class MMVC_Rest_Fileuploader:
|
||||
json_compatible_item_data = jsonable_encoder(info)
|
||||
return JSONResponse(content=json_compatible_item_data)
|
||||
|
||||
def post_update_setteings(self, key: str = Form(...), val: Union[int, str, float] = Form(...)):
|
||||
print("post_update_setteings", key, val)
|
||||
info = self.voiceChangerManager.update_setteings(key, val)
|
||||
def post_update_settings(self, key: str = Form(...), val: Union[int, str, float] = Form(...)):
|
||||
print("post_update_settings", key, val)
|
||||
info = self.voiceChangerManager.update_settings(key, val)
|
||||
json_compatible_item_data = jsonable_encoder(info)
|
||||
return JSONResponse(content=json_compatible_item_data)
|
||||
|
||||
|
@ -113,7 +113,7 @@ class DDSP_SVC:
|
||||
self.enhancer = Enhancer(self.args.enhancer.type, "./model_DDSP-SVC/enhancer/model", "cpu")
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -90,7 +90,7 @@ class DDSP_SVC:
|
||||
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -79,7 +79,7 @@ class MMVCv13:
|
||||
)
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -103,7 +103,7 @@ class MMVCv15:
|
||||
self.onxx_input_length = i.shape[2]
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.settings.onnxModelFile != "": # self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -115,7 +115,7 @@ class RVC:
|
||||
self.onnx_session = ModelWrapper(onnx_model_file, is_half=self.is_half)
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -139,7 +139,7 @@ class SoVitsSvc40:
|
||||
# print("output", i)
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -123,7 +123,7 @@ class SoVitsSvc40v2:
|
||||
input_info = self.onnx_session.get_inputs()
|
||||
return self.get_info()
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if key == "onnxExecutionProvider" and self.onnx_session != None:
|
||||
if val == "CUDAExecutionProvider":
|
||||
if self.settings.gpu < 0 or self.settings.gpu >= self.gpu_num:
|
||||
|
@ -1,3 +1,4 @@
|
||||
from typing import Any, Callable, Optional, Protocol, TypeAlias, Union, cast
|
||||
from const import TMP_DIR, getModelType
|
||||
import torch
|
||||
import os
|
||||
@ -12,6 +13,11 @@ from voice_changer.IORecorder import IORecorder
|
||||
|
||||
|
||||
import time
|
||||
|
||||
|
||||
AudioInput: TypeAlias = np.ndarray[Any, np.dtype[np.int16]]
|
||||
|
||||
|
||||
providers = ['OpenVINOExecutionProvider', "CUDAExecutionProvider", "DmlExecutionProvider", "CPUExecutionProvider"]
|
||||
|
||||
STREAM_INPUT_FILE = os.path.join(TMP_DIR, "in.wav")
|
||||
@ -20,8 +26,17 @@ STREAM_ANALYZE_FILE_DIO = os.path.join(TMP_DIR, "analyze-dio.png")
|
||||
STREAM_ANALYZE_FILE_HARVEST = os.path.join(TMP_DIR, "analyze-harvest.png")
|
||||
|
||||
|
||||
class VoiceChangerModel(Protocol):
|
||||
loadModel: Callable[..., dict[str, Any]]
|
||||
def get_processing_sampling_rate(self) -> int: ...
|
||||
def get_info(self) -> dict[str, Any]: ...
|
||||
def inference(self, data: tuple[Any, ...]) -> Any: ...
|
||||
def generate_input(self, newData: AudioInput, inputSize: int, crossfadeSize: int) -> tuple[Any, ...]: ...
|
||||
def update_settings(self, key: str, val: Any) -> bool: ...
|
||||
|
||||
|
||||
@dataclass
|
||||
class VocieChangerSettings():
|
||||
class VoiceChangerSettings():
|
||||
inputSampleRate: int = 24000 # 48000 or 24000
|
||||
|
||||
crossFadeOffsetRate: float = 0.1
|
||||
@ -31,16 +46,18 @@ class VocieChangerSettings():
|
||||
recordIO: int = 0 # 0:off, 1:on
|
||||
|
||||
# ↓mutableな物だけ列挙
|
||||
intData = ["inputSampleRate", "crossFadeOverlapSize", "recordIO"]
|
||||
floatData = ["crossFadeOffsetRate", "crossFadeEndRate"]
|
||||
strData = []
|
||||
intData: list[str] = ["inputSampleRate", "crossFadeOverlapSize", "recordIO"]
|
||||
floatData: list[str] = ["crossFadeOffsetRate", "crossFadeEndRate"]
|
||||
strData: list[str] = []
|
||||
|
||||
|
||||
class VoiceChanger():
|
||||
settings: VoiceChangerSettings
|
||||
voiceChanger: VoiceChangerModel
|
||||
|
||||
def __init__(self, params):
|
||||
# 初期化
|
||||
self.settings = VocieChangerSettings()
|
||||
self.settings = VoiceChangerSettings()
|
||||
self.onnx_session = None
|
||||
self.currentCrossFadeOffsetRate = 0
|
||||
self.currentCrossFadeEndRate = 0
|
||||
@ -51,7 +68,7 @@ class VoiceChanger():
|
||||
print("[VoiceChanger] activate model type:", self.modelType)
|
||||
if self.modelType == "MMVCv15":
|
||||
from voice_changer.MMVCv15.MMVCv15 import MMVCv15
|
||||
self.voiceChanger = MMVCv15()
|
||||
self.voiceChanger = MMVCv15() # type: ignore
|
||||
elif self.modelType == "MMVCv13":
|
||||
from voice_changer.MMVCv13.MMVCv13 import MMVCv13
|
||||
self.voiceChanger = MMVCv13()
|
||||
@ -73,11 +90,20 @@ class VoiceChanger():
|
||||
|
||||
self.gpu_num = torch.cuda.device_count()
|
||||
self.prev_audio = np.zeros(4096)
|
||||
self.mps_enabled = getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available()
|
||||
self.mps_enabled: bool = getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available()
|
||||
|
||||
print(f"VoiceChanger Initialized (GPU_NUM:{self.gpu_num}, mps_enabled:{self.mps_enabled})")
|
||||
|
||||
def loadModel(self, config: str, pyTorch_model_file: str = None, onnx_model_file: str = None, clusterTorchModel: str = None, feature_file: str = None, index_file: str = None, is_half: bool = True):
|
||||
def loadModel(
|
||||
self,
|
||||
config: str,
|
||||
pyTorch_model_file: Optional[str] = None,
|
||||
onnx_model_file: Optional[str] = None,
|
||||
clusterTorchModel: Optional[str] = None,
|
||||
feature_file: Optional[str] = None,
|
||||
index_file: Optional[str] = None,
|
||||
is_half: bool = True,
|
||||
):
|
||||
if self.modelType == "MMVCv15" or self.modelType == "MMVCv13":
|
||||
return self.voiceChanger.loadModel(config, pyTorch_model_file, onnx_model_file)
|
||||
elif self.modelType == "so-vits-svc-40" or self.modelType == "so-vits-svc-40_c" or self.modelType == "so-vits-svc-40v2":
|
||||
@ -92,7 +118,7 @@ class VoiceChanger():
|
||||
data.update(self.voiceChanger.get_info())
|
||||
return data
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: Any):
|
||||
if key in self.settings.intData:
|
||||
setattr(self.settings, key, int(val))
|
||||
if key == "crossFadeOffsetRate" or key == "crossFadeEndRate":
|
||||
@ -122,7 +148,7 @@ class VoiceChanger():
|
||||
elif key in self.settings.strData:
|
||||
setattr(self.settings, key, str(val))
|
||||
else:
|
||||
ret = self.voiceChanger.update_setteings(key, val)
|
||||
ret = self.voiceChanger.update_settings(key, val)
|
||||
if ret == False:
|
||||
print(f"{key} is not mutable variable or unknown variable!")
|
||||
|
||||
@ -159,7 +185,7 @@ class VoiceChanger():
|
||||
delattr(self, "np_prev_audio1")
|
||||
|
||||
# receivedData: tuple of short
|
||||
def on_request(self, receivedData: any):
|
||||
def on_request(self, receivedData: AudioInput) -> tuple[AudioInput, list[Union[int, float]]]:
|
||||
processing_sampling_rate = self.voiceChanger.get_processing_sampling_rate()
|
||||
|
||||
print_convert_processing(f"------------ Convert processing.... ------------")
|
||||
@ -169,7 +195,7 @@ class VoiceChanger():
|
||||
with Timer("pre-process") as t1:
|
||||
|
||||
if self.settings.inputSampleRate != processing_sampling_rate:
|
||||
newData = resampy.resample(receivedData, self.settings.inputSampleRate, processing_sampling_rate)
|
||||
newData = cast(AudioInput, resampy.resample(receivedData, self.settings.inputSampleRate, processing_sampling_rate))
|
||||
else:
|
||||
newData = receivedData
|
||||
# print("t1::::", t1.secs)
|
||||
@ -236,7 +262,7 @@ class VoiceChanger():
|
||||
with Timer("post-process") as t:
|
||||
result = result.astype(np.int16)
|
||||
if self.settings.inputSampleRate != processing_sampling_rate:
|
||||
outputData = resampy.resample(result, processing_sampling_rate, self.settings.inputSampleRate).astype(np.int16)
|
||||
outputData = cast(AudioInput, resampy.resample(result, processing_sampling_rate, self.settings.inputSampleRate).astype(np.int16))
|
||||
else:
|
||||
outputData = result
|
||||
# outputData = result
|
||||
@ -261,7 +287,7 @@ class VoiceChanger():
|
||||
|
||||
|
||||
##############
|
||||
PRINT_CONVERT_PROCESSING = False
|
||||
PRINT_CONVERT_PROCESSING: bool = False
|
||||
# PRINT_CONVERT_PROCESSING = True
|
||||
|
||||
|
||||
@ -270,7 +296,7 @@ def print_convert_processing(mess: str):
|
||||
print(mess)
|
||||
|
||||
|
||||
def pad_array(arr, target_length):
|
||||
def pad_array(arr: AudioInput, target_length: int):
|
||||
current_length = arr.shape[0]
|
||||
if current_length >= target_length:
|
||||
return arr
|
||||
@ -290,7 +316,7 @@ class Timer(object):
|
||||
self.start = time.time()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
def __exit__(self, *_):
|
||||
self.end = time.time()
|
||||
self.secs = self.end - self.start
|
||||
self.msecs = self.secs * 1000 # millisecs
|
||||
|
@ -23,9 +23,9 @@ class VoiceChangerManager():
|
||||
else:
|
||||
return {"status": "ERROR", "msg": "no model loaded"}
|
||||
|
||||
def update_setteings(self, key: str, val: any):
|
||||
def update_settings(self, key: str, val: any):
|
||||
if hasattr(self, 'voiceChanger'):
|
||||
info = self.voiceChanger.update_setteings(key, val)
|
||||
info = self.voiceChanger.update_settings(key, val)
|
||||
info["status"] = "OK"
|
||||
return info
|
||||
else:
|
||||
|
Loading…
x
Reference in New Issue
Block a user