2023-01-05 02:28:36 +09:00
import { io , Socket } from "socket.io-client" ;
import { DefaultEventsMap } from "@socket.io/component-emitter" ;
import { Duplex , DuplexOptions } from "readable-stream" ;
2023-02-19 14:20:37 +09:00
import { AudioStreamerSetting , DefaultAudioStreamerSetting , DownSamplingMode , VOICE_CHANGER_CLIENT_EXCEPTION } from "./const" ;
2023-02-18 20:53:15 +09:00
2023-01-05 02:28:36 +09:00
export type Callbacks = {
2023-02-19 14:20:37 +09:00
onVoiceReceived : ( data : ArrayBuffer ) = > void
2023-01-05 02:28:36 +09:00
}
export type AudioStreamerListeners = {
notifySendBufferingTime : ( time : number ) = > void
notifyResponseTime : ( time : number ) = > void
2023-01-05 11:45:42 +09:00
notifyException : ( code : VOICE_CHANGER_CLIENT_EXCEPTION , message : string ) = > void
2023-01-05 02:28:36 +09:00
}
2023-01-08 16:18:20 +09:00
2023-01-05 02:28:36 +09:00
export class AudioStreamer extends Duplex {
2023-02-19 14:20:37 +09:00
private setting : AudioStreamerSetting = DefaultAudioStreamerSetting
2023-01-05 02:28:36 +09:00
private callbacks : Callbacks
private audioStreamerListeners : AudioStreamerListeners
private socket : Socket < DefaultEventsMap , DefaultEventsMap > | null = null
private requestChunks : ArrayBuffer [ ] = [ ]
// performance monitor
private bufferStart = 0 ;
2023-01-05 11:45:42 +09:00
constructor ( callbacks : Callbacks , audioStreamerListeners : AudioStreamerListeners , options? : DuplexOptions ) {
2023-01-05 02:28:36 +09:00
super ( options ) ;
this . callbacks = callbacks
this . audioStreamerListeners = audioStreamerListeners
}
private createSocketIO = ( ) = > {
if ( this . socket ) {
this . socket . close ( )
}
2023-02-19 14:20:37 +09:00
if ( this . setting . protocol === "sio" ) {
this . socket = io ( this . setting . serverUrl + "/test" ) ;
2023-01-05 11:45:42 +09:00
this . socket . on ( 'connect_error' , ( err ) = > {
this . audioStreamerListeners . notifyException ( VOICE_CHANGER_CLIENT_EXCEPTION . ERR_SIO_CONNECT_FAILED , ` [SIO] rconnection failed ${ err } ` )
} )
2023-02-18 04:15:34 +09:00
this . socket . on ( 'connect' , ( ) = > {
2023-02-19 14:20:37 +09:00
console . log ( ` [SIO] sonnect to ${ this . setting . serverUrl } ` )
2023-02-18 04:15:34 +09:00
console . log ( ` [SIO] ${ this . socket ? . id } ` )
} ) ;
2023-01-05 02:28:36 +09:00
this . socket . on ( 'response' , ( response : any [ ] ) = > {
const cur = Date . now ( )
const responseTime = cur - response [ 0 ]
const result = response [ 1 ] as ArrayBuffer
if ( result . byteLength < 128 * 2 ) {
2023-01-05 11:45:42 +09:00
this . audioStreamerListeners . notifyException ( VOICE_CHANGER_CLIENT_EXCEPTION . ERR_SIO_INVALID_RESPONSE , ` [SIO] recevied data is too short ${ result . byteLength } ` )
2023-01-05 02:28:36 +09:00
} else {
2023-02-19 14:20:37 +09:00
this . callbacks . onVoiceReceived ( response [ 1 ] )
2023-01-05 02:28:36 +09:00
this . audioStreamerListeners . notifyResponseTime ( responseTime )
}
} ) ;
}
}
2023-02-19 14:20:37 +09:00
// Option Change
updateSetting = ( setting : AudioStreamerSetting ) = > {
console . log ( ` [AudioStreamer] Updating AudioStreamer Setting, ` , this . setting , setting )
let recreateSocketIoRequired = false
if ( this . setting . serverUrl != setting . serverUrl || this . setting . protocol != setting . protocol ) {
recreateSocketIoRequired = true
}
this . setting = setting
if ( recreateSocketIoRequired ) {
this . createSocketIO ( )
}
2023-02-18 20:53:15 +09:00
}
2023-02-14 22:32:25 +09:00
2023-02-19 14:20:37 +09:00
getSettings = ( ) : AudioStreamerSetting = > {
return this . setting
2023-01-08 16:18:20 +09:00
}
2023-02-18 04:15:34 +09:00
getSocketId = ( ) = > {
return this . socket ? . id
}
2023-01-05 02:28:36 +09:00
// Main Process
//// Pipe from mic stream
_write = ( chunk : AudioBuffer , _encoding : any , callback : any ) = > {
const buffer = chunk . getChannelData ( 0 ) ;
2023-02-19 14:20:37 +09:00
this . _write_realtime ( buffer )
2023-01-05 02:28:36 +09:00
callback ( ) ;
}
2023-02-14 22:32:25 +09:00
_averageDownsampleBuffer ( buffer : Float32Array , originalSampleRate : number , destinationSamplerate : number ) {
if ( originalSampleRate == destinationSamplerate ) {
return buffer ;
}
if ( destinationSamplerate > originalSampleRate ) {
throw "downsampling rate show be smaller than original sample rate" ;
}
const sampleRateRatio = originalSampleRate / destinationSamplerate ;
const newLength = Math . round ( buffer . length / sampleRateRatio ) ;
const result = new Float32Array ( newLength ) ;
let offsetResult = 0 ;
let offsetBuffer = 0 ;
while ( offsetResult < result . length ) {
var nextOffsetBuffer = Math . round ( ( offsetResult + 1 ) * sampleRateRatio ) ;
// Use average value of skipped samples
var accum = 0 , count = 0 ;
for ( var i = offsetBuffer ; i < nextOffsetBuffer && i < buffer . length ; i ++ ) {
accum += buffer [ i ] ;
count ++ ;
}
result [ offsetResult ] = accum / count ;
// Or you can simply get rid of the skipped samples:
// result[offsetResult] = buffer[nextOffsetBuffer];
offsetResult ++ ;
offsetBuffer = nextOffsetBuffer ;
}
return result ;
}
2023-01-05 02:28:36 +09:00
2023-02-14 22:32:25 +09:00
2023-02-18 20:53:15 +09:00
private _write_realtime = async ( buffer : Float32Array ) = > {
2023-02-14 22:32:25 +09:00
let downsampledBuffer : Float32Array | null = null
2023-02-19 14:20:37 +09:00
if ( this . setting . sendingSampleRate == 48000 ) {
2023-02-18 20:53:15 +09:00
downsampledBuffer = buffer
2023-02-19 14:20:37 +09:00
} else if ( this . setting . downSamplingMode == DownSamplingMode . decimate ) {
2023-02-14 22:32:25 +09:00
//////// (Kind 1) 間引き //////////
// bufferSize個のデータ( 48Khz) が入ってくる。
//// 48000Hz で入ってくるので間引いて24000Hzに変換する。
downsampledBuffer = new Float32Array ( buffer . length / 2 ) ;
for ( let i = 0 ; i < buffer . length ; i ++ ) {
if ( i % 2 == 0 ) {
downsampledBuffer [ i / 2 ] = buffer [ i ]
}
2023-01-05 02:28:36 +09:00
}
2023-02-14 22:32:25 +09:00
} else {
//////// (Kind 2) 平均 //////////
2023-02-18 20:53:15 +09:00
// downsampledBuffer = this._averageDownsampleBuffer(buffer, 48000, 24000)
2023-02-19 14:20:37 +09:00
downsampledBuffer = this . _averageDownsampleBuffer ( buffer , 48000 , this . setting . sendingSampleRate )
2023-01-05 02:28:36 +09:00
}
2023-02-14 22:32:25 +09:00
// Float to signed16
const arrayBuffer = new ArrayBuffer ( downsampledBuffer . length * 2 )
const dataView = new DataView ( arrayBuffer ) ;
for ( let i = 0 ; i < downsampledBuffer . length ; i ++ ) {
let s = Math . max ( - 1 , Math . min ( 1 , downsampledBuffer [ i ] ) ) ;
s = s < 0 ? s * 0x8000 : s * 0x7FFF
dataView . setInt16 ( i * 2 , s , true ) ;
}
2023-01-05 02:28:36 +09:00
// 256byte(最低バッファサイズ256から間引いた個数x2byte)をchunkとして管理
2023-02-18 20:53:15 +09:00
// const chunkByteSize = 256 // (const.ts ★1)
// const chunkByteSize = 256 * 2 // (const.ts ★1)
2023-02-19 14:20:37 +09:00
const chunkByteSize = ( 256 * 2 ) * ( this . setting . sendingSampleRate / 48000 ) // (const.ts ★1)
2023-01-05 02:28:36 +09:00
for ( let i = 0 ; i < arrayBuffer . byteLength / chunkByteSize ; i ++ ) {
const ab = arrayBuffer . slice ( i * chunkByteSize , ( i + 1 ) * chunkByteSize )
this . requestChunks . push ( ab )
}
2023-02-14 22:32:25 +09:00
2023-01-05 02:28:36 +09:00
//// リクエストバッファの中身が、リクエスト送信数と違う場合は処理終了。
2023-02-19 14:20:37 +09:00
if ( this . requestChunks . length < this . setting . inputChunkNum ) {
2023-01-05 02:28:36 +09:00
return
}
// リクエスト用の入れ物を作成
const windowByteLength = this . requestChunks . reduce ( ( prev , cur ) = > {
return prev + cur . byteLength
} , 0 )
const newBuffer = new Uint8Array ( windowByteLength ) ;
// リクエストのデータをセット
this . requestChunks . reduce ( ( prev , cur ) = > {
newBuffer . set ( new Uint8Array ( cur ) , prev )
return prev + cur . byteLength
} , 0 )
2023-01-05 11:45:42 +09:00
// console.log("send buff length", newBuffer.length)
2023-01-05 02:28:36 +09:00
this . sendBuffer ( newBuffer )
this . requestChunks = [ ]
this . audioStreamerListeners . notifySendBufferingTime ( Date . now ( ) - this . bufferStart )
this . bufferStart = Date . now ( )
}
private sendBuffer = async ( newBuffer : Uint8Array ) = > {
const timestamp = Date . now ( )
2023-02-19 14:20:37 +09:00
if ( this . setting . protocol === "sio" ) {
2023-01-05 02:28:36 +09:00
if ( ! this . socket ) {
console . warn ( ` sio is not initialized ` )
return
}
2023-01-05 11:45:42 +09:00
// console.log("emit!")
2023-01-05 02:28:36 +09:00
this . socket . emit ( 'request_message' , [
timestamp ,
newBuffer . buffer ] ) ;
} else {
const res = await postVoice (
2023-02-19 14:20:37 +09:00
this . setting . serverUrl + "/test" ,
2023-01-05 02:28:36 +09:00
timestamp ,
newBuffer . buffer )
if ( res . byteLength < 128 * 2 ) {
2023-01-05 11:45:42 +09:00
this . audioStreamerListeners . notifyException ( VOICE_CHANGER_CLIENT_EXCEPTION . ERR_REST_INVALID_RESPONSE , ` [REST] recevied data is too short ${ res . byteLength } ` )
2023-01-05 02:28:36 +09:00
} else {
2023-02-19 14:20:37 +09:00
this . callbacks . onVoiceReceived ( res )
2023-01-05 02:28:36 +09:00
this . audioStreamerListeners . notifyResponseTime ( Date . now ( ) - timestamp )
}
}
}
}
export const postVoice = async (
url : string ,
timestamp : number ,
buffer : ArrayBuffer ) = > {
const obj = {
timestamp ,
buffer : Buffer.from ( buffer ) . toString ( 'base64' )
} ;
const body = JSON . stringify ( obj ) ;
const res = await fetch ( ` ${ url } ` , {
method : "POST" ,
headers : {
'Accept' : 'application/json' ,
'Content-Type' : 'application/json'
} ,
body : body
} )
const receivedJson = await res . json ( )
const changedVoiceBase64 = receivedJson [ "changedVoiceBase64" ]
const buf = Buffer . from ( changedVoiceBase64 , "base64" )
const ab = new ArrayBuffer ( buf . length ) ;
const view = new Uint8Array ( ab ) ;
for ( let i = 0 ; i < buf . length ; ++ i ) {
view [ i ] = buf [ i ] ;
}
return ab
}