Spaces:
Sleeping
Sleeping
; | |
const net = require('net'), | |
tls = require('tls'), | |
EventParser = require('../entities/EventParser.js'), | |
Message = require('js-message'), | |
fs = require('fs'), | |
Queue = require('@node-ipc/js-queue'), | |
Events = require('event-pubsub'); | |
let eventParser = new EventParser(); | |
class Client extends Events{ | |
constructor(config,log){ | |
super(); | |
Object.assign( | |
this, | |
{ | |
Client : Client, | |
config : config, | |
queue : new Queue, | |
socket : false, | |
connect : connect, | |
emit : emit, | |
log : log, | |
retriesRemaining:config.maxRetries||0, | |
explicitlyDisconnected: false | |
} | |
); | |
eventParser=new EventParser(this.config); | |
} | |
} | |
function emit(type,data){ | |
this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data); | |
let message=new Message; | |
message.type=type; | |
message.data=data; | |
if(this.config.rawBuffer){ | |
message=Buffer.from(type,this.config.encoding); | |
}else{ | |
message=eventParser.format(message); | |
} | |
if(!this.config.sync){ | |
this.socket.write(message); | |
return; | |
} | |
this.queue.add( | |
syncEmit.bind(this,message) | |
); | |
} | |
function syncEmit(message){ | |
this.log('dispatching event to ', this.id, this.path, ' : ', message); | |
this.socket.write(message); | |
} | |
function connect(){ | |
//init client object for scope persistance especially inside of socket events. | |
let client=this; | |
client.log('requested connection to ', client.id, client.path); | |
if(!this.path){ | |
client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.'); | |
return; | |
} | |
const options={}; | |
if(!client.port){ | |
client.log('Connecting client on Unix Socket :', client.path); | |
options.path=client.path; | |
if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){ | |
options.path = options.path.replace(/^\//, ''); | |
options.path = options.path.replace(/\//g, '-'); | |
options.path= `\\\\.\\pipe\\${options.path}`; | |
} | |
client.socket = net.connect(options); | |
}else{ | |
options.host=client.path; | |
options.port=client.port; | |
if(client.config.interface.localAddress){ | |
options.localAddress=client.config.interface.localAddress; | |
} | |
if(client.config.interface.localPort){ | |
options.localPort=client.config.interface.localPort; | |
} | |
if(client.config.interface.family){ | |
options.family=client.config.interface.family; | |
} | |
if(client.config.interface.hints){ | |
options.hints=client.config.interface.hints; | |
} | |
if(client.config.interface.lookup){ | |
options.lookup=client.config.interface.lookup; | |
} | |
if(!client.config.tls){ | |
client.log('Connecting client via TCP to', options); | |
client.socket = net.connect(options); | |
}else{ | |
client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls); | |
if(client.config.tls.private){ | |
client.config.tls.key=fs.readFileSync(client.config.tls.private); | |
} | |
if(client.config.tls.public){ | |
client.config.tls.cert=fs.readFileSync(client.config.tls.public); | |
} | |
if(client.config.tls.trustedConnections){ | |
if(typeof client.config.tls.trustedConnections === 'string'){ | |
client.config.tls.trustedConnections=[client.config.tls.trustedConnections]; | |
} | |
client.config.tls.ca=[]; | |
for(let i=0; i<client.config.tls.trustedConnections.length; i++){ | |
client.config.tls.ca.push( | |
fs.readFileSync(client.config.tls.trustedConnections[i]) | |
); | |
} | |
} | |
Object.assign(client.config.tls,options); | |
client.socket = tls.connect( | |
client.config.tls | |
); | |
} | |
} | |
client.socket.setEncoding(this.config.encoding); | |
client.socket.on( | |
'error', | |
function(err){ | |
client.log('\n\n######\nerror: ', err); | |
client.publish('error', err); | |
} | |
); | |
client.socket.on( | |
'connect', | |
function connectionMade(){ | |
client.publish('connect'); | |
client.retriesRemaining=client.config.maxRetries; | |
client.log('retrying reset'); | |
} | |
); | |
client.socket.on( | |
'close', | |
function connectionClosed(){ | |
client.log('connection closed' ,client.id , client.path, | |
client.retriesRemaining, 'tries remaining of', client.config.maxRetries | |
); | |
if( | |
client.config.stopRetrying || | |
client.retriesRemaining<1 || | |
client.explicitlyDisconnected | |
){ | |
client.publish('disconnect'); | |
client.log( | |
(client.config.id), | |
'exceeded connection rety amount of', | |
' or stopRetrying flag set.' | |
); | |
client.socket.destroy(); | |
client.publish('destroy'); | |
client=undefined; | |
return; | |
} | |
setTimeout( | |
function retryTimeout(){ | |
if (client.explicitlyDisconnected) { | |
return; | |
} | |
client.retriesRemaining--; | |
client.connect(); | |
}.bind(null,client), | |
client.config.retry | |
); | |
client.publish('disconnect'); | |
} | |
); | |
client.socket.on( | |
'data', | |
function(data) { | |
client.log('## received events ##'); | |
if(client.config.rawBuffer){ | |
client.publish( | |
'data', | |
Buffer.from(data,client.config.encoding) | |
); | |
if(!client.config.sync){ | |
return; | |
} | |
client.queue.next(); | |
return; | |
} | |
if(!this.ipcBuffer){ | |
this.ipcBuffer=''; | |
} | |
data=(this.ipcBuffer+=data); | |
if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){ | |
client.log('Messages are large, You may want to consider smaller messages.'); | |
return; | |
} | |
this.ipcBuffer=''; | |
const events = eventParser.parse(data); | |
const eCount = events.length; | |
for(let i=0; i<eCount; i++){ | |
let message=new Message; | |
message.load(events[i]); | |
client.log('detected event', message.type, message.data); | |
client.publish( | |
message.type, | |
message.data | |
); | |
} | |
if(!client.config.sync){ | |
return; | |
} | |
client.queue.next(); | |
} | |
); | |
} | |
module.exports=Client; | |