Notes /
SocketThis page contains random notes about the Problems with the Node socket implementation.TCPSocketServer is not receiving data events'''This issue is solved, the notes are left here for future reference. The port of the @accessors-modules/socket module to node is not working. The TCPSocketServer in a composite accessor is not receiving data events. Running this command: (cd $PTII/org/terraswarm/accessor/accessors/web/; node hosts/node/nodeHostInvoke.js hosts/node/node_modules/@accessors-modules/socket/test/auto/TCPSocketUnsignedShort.js) causes an error because the test actor connected to the TCPSocketServer is not getting input DetailsBelow are the files in question:
A model was updated to include only accessors: To run the model: $PTII/bin/capecode $PTII/org/terraswarm/accessor/test/auto/TCPSocketUnsignedShort.xml The model works fine in Cape Code:
The issue is that when we drag in a code generator and generate code, then the generated code fails because TrainableTest is not seeing any data. To run the generated code: (cd $PTII/org/terraswarm/accessor/accessors/web/cg; node ../node_modules/@accessors-hosts/node/nodeHostInvoke.js cg/TCPSocketUnsignedShort) The generated model with the stopTime increased slightly is checked in as PTII/org/terraswarm/accessor/accessors/web/hosts/node/node_modules/@accessors-modules/socket/test/auto/TCPSocketUnsignedShort.js The checked in version may be run with: (cd $PTII/org/terraswarm/accessor/accessors/web/; node hosts/node/nodeHostInvoke.js hosts/node/node_modules/@accessors-modules/socket/test/auto/TCPSocketUnsignedShort.js) TCPSocketServer.js and TCPSocketClient.js have had debugging statements temporarily added to them. Comparing the output of the Cape Code run and the Node run indicates that the server is not getting socket data. Below is the output of the Node version of the model, which was created with (cd $PTII/org/terraswarm/accessor/accessors/web/; node hosts/node/nodeHostInvoke.js hosts/node/node_modules/@accessors-modules/socket/test/auto/TCPSocketUnsignedShort.js) socket.js: SocketServer._socketCreated(): socket localPort: undefined, remotePort(): 52288 socket.js: Socket.remotePort() TCPSocketServer.js: initialize() connection: localPort: undefined, remotePort: 52288 socket.js: Socket.remoteHost() socket.js: Socket.remotePort() TCPSocketClient.js: toSendInputHandler() TCPSocketClient.js: send() TCPSocketClient.js: isOpen() socket.js: SocketClient.send(-1,0,1,65535,65536), typeof data: object, Array.isArray(data): true socket.js: SocketClient.send(-1,0,1,65535,65536): converted from array socket.js: SocketClient.send(-1,0,1,65535,65536): calling this.wrapper.send() socket.js: SocketWrapper.send(): this.eventEmitter: SocketClient(52287, localhost, options) socket.js: SocketWrapper.send(-1,0,1,65535,65536): about to invoke write() on socket [object Object] remoteAddress: 127.0.0.1:52287. Writing -1,0,1,65535,65536 socket.js SocketWrapper.send(-1,0,1,65535,65536): handle case where data is an array. socket.js: SocketWrapper.send(65535,0,1,65535,65536): added element 65535 to buffer: 0xffff0000000000000000 socket.js: SocketWrapper.send(65535,0,1,65535,65536): added element 0 to buffer: 0xffff0000000000000000 socket.js: SocketWrapper.send(65535,0,1,65535,65536): added element 1 to buffer: 0xffff0000000100000000 socket.js: SocketWrapper.send(65535,0,1,65535,65536): added element 65535 to buffer: 0xffff00000001ffff0000 socket.js: SocketWrapper.send(65535,0,1,65535,0): added element 0 to buffer: 0xffff00000001ffff0000 socket.js: SocketWrapper.send(65535,0,1,65535,0), : about to invoke write() on socket [object Object]. local: 127.0.0.1:52288, remote: 127.0.0.1:52287. Writing buffer: <Buffer ff ff 00 00 00 01 ff ff 00 00> socket.js: SocketWrapper.send(65535,0,1,65535,0): write returned true socket.js: SocketWrapper.send(undefined): write callback: undefined socket.js: SocketWrapper(): data event: this.eventEmitter: Socket() socket buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer(): this.eventEmitter: Socket(), buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 socket.js: Socket netSocket on data: ��^@^@^@��^@^@ and then the output hangs until the model times out and fails. TCPSocketClient.js: wrapup() socket.js: SocketClient.close() socket.js: SocketWrapper.close() TCPSocketClient: close Connection closed. Status: Connection closed in wrapup. TCPSocketServer.js: wrapup() socket.js: SocketServer.stop() nodeHost.js: Error: Error: TCPSocketUnsignedShort.TrainableTest: The input handler of this accessor was never invoked. Usually, this is an error indicating that starvation is occurring. at Accessor.exports.wrapup (eval at Accessor (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/commonHost.js:611:19), <anonymous>:272:23) at Accessor.wrapup (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/commonHost.js:748:37) at Accessor.wrapup (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/commonHost.js:741:52) at Accessor.stop (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/commonHost.js:2240:15) at /Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/commonHost.js:2251:14 at executeCallbacks (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/modules/deterministicTemporalSemantics.js:303:50) at Timeout.executeAndSetNextTick [as _onTimeout] (/Users/cxh/ptII/org/terraswarm/accessor/accessors/web/node_modules/@accessors-hosts/common/modules/deterministicTemporalSemantics.js:256:9) at ontimeout (timers.js:380:14) at tryOnTimeout (timers.js:244:5) at Timer.listOnTimeout (timers.js:214:5) nodeHost.js: Error: Node will exit and return 1, which should be zero. nodeHost.js: Error: Node will exit and return 1, which should be zero. Below is the Cape Code output, created with: $PTII/bin/capecode $PTII/org/terraswarm/accessor/test/auto/TCPSocketUnsignedShort.xml which works fine: socket.js: SocketServer._socketCreated(): socket localPort: undefined, remotePort(): 52307 socket.js: Socket.remotePort() TCPSocketServer.js: initialize() connection: localPort: undefined, remotePort: 52307 socket.js: Socket.remoteHost() socket.js: Socket.remotePort() TCPSocketClient.js: toSendInputHandler() TCPSocketClient.js: send() TCPSocketClient.js: isOpen() socket.js: SocketClient.send(-1,0,1,65535,65536) socket.js: SocketClient.send([Ljava.lang.Object;@124b20e4): converted from array socket.js: SocketClient.send([Ljava.lang.Object;@124b20e4): calling this.wrapper.send() SocketHelper.SocketWrapper.send(): _eventEmitter: SocketClient(52306, localhost, options) SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): handle case where data is an array. SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): added element -1 to buffer: 0xffff SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): added element 0 to buffer: 0xffff0000 SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): added element 1 to buffer: 0xffff00000001 SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): added element 65535 to buffer: 0xffff00000001ffff SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): added element 65536 to buffer: 0xffff00000001ffff0000 SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): about to invoke write() on socket io.vertx.core.net.impl.NetSocketImpl@6313f97b, local: 127.0.0.1:52307, remote: 127.0.0.1:52306. Writing buffer: ffff00000001ffff0000 SocketHelper.SocketWrapper.send([Ljava.lang.Object;@124b20e4): after write() on socket io.vertx.core.net.impl.NetSocketImpl@6313f97b. SocketHelper.SocketWrapper(): _eventEmitter: Socket() _socket.handler() SocketHelper.SocketWrapper._processBuffer() start: _eventEmitter: Socket() SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: initialize() data: 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: initialize() data: 0 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 TCPSocketServer.js: initialize() data: 1 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: initialize() data: 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: initialize() data: 0 TestDisplayServerReceived: 65535 TestDisplayServerReceived: 0 TestDisplayServerReceived: 1 TestDisplayServerReceived: 65535 TestDisplayServerReceived: 0 TCPSocketServer.js: toSendInputHandler() start. TCPSocketServer.js: toSendInputHandler() socket[0]. TCPSocketServer.js: toSendInputHandler() socket[1]. TCPSocketServer.js: toSendInputHandler() socket[1]. sending: 65536 socket.js: Socket.send(): 65536 SocketHelper.SocketWrapper.send(): _eventEmitter: Socket() SocketHelper.SocketWrapper.send(65536): is not an array. TCPSocketServer.js: toSendInputHandler() socket[1]. done sending: 65536 TCPSocketServer.js: toSendInputHandler() end. SocketHelper.SocketWrapper.send(65536): about to invoke write() on socket io.vertx.core.net.impl.NetSocketImpl@200af59c, local: 127.0.0.1:52306, remote: 127.0.0.1:52307. Writing buffer: 0000 The two outputs are slightly different because of platform dependencies, below is a more detailed analysis of the above outputs. The Node and Cape Code output match fairly closely until processBuffer(). The Node output calls emit socket.js: SocketWrapper(): data event: this.eventEmitter: Socket() socket buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer(): this.eventEmitter: Socket(), buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 socket.js: Socket netSocket on data: ��^@^@^@��^@^@ The difference is that the Cape Code output below shows that TCPSocketServer.js is getting data events, whereas the node output is not. SocketHelper.SocketWrapper(): _eventEmitter: Socket() _socket.handler() SocketHelper.SocketWrapper._processBuffer() start: _eventEmitter: Socket() SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: initialize() data: 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: initialize() data: 0 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 TCPSocketServer.js: initialize() data: 1 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: initialize() data: 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: initialize() data: 0 Looking at the source code is helpful. In Cape Code, the way public SocketWrapper( ScriptObjectMirror eventEmitter, Object socket, ... boolean emitBatchDataAsAvailable) { ... _socket = (NetSocket)socket; ... // Handler for received data. _socket.handler(buffer -> { System.out.println("SocketHelper.SocketWrapper(): _eventEmitter: " + _eventEmitter.get("iama") + " _socket.handler()"); _processBuffer(buffer); }); In Cape Code, The code above emits a (BTW - To make it easier to track private void _processBuffer(Buffer buffer) { System.out.println("SocketHelper.SocketWrapper._processBuffer() start: _eventEmitter: " + _eventEmitter.get("iama")); ... // Defer this to the director thread. // This ensures that if the handler responds to a "data" event by // producing multiple outputs, then all those outputs appear simultaneously // (with the same time stamp) on the accessor output. _issueResponse(() -> { if (_receiveType == DATA_TYPE.STRING) { System.out.println("SocketHelper.SocketWrapper._processBuffer() issueResponse: String emit"); _eventEmitter.callMember("emit", "data", finalBuffer.getString(0, finalBuffer.length())); } else if (_receiveType == DATA_TYPE.IMAGE) { ... } else { // Assume a numeric type. int size = _sizeOfType(_receiveType); int length = finalBuffer.length(); int numberOfElements = length / size; if (numberOfElements == 1) { System.out.println("SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric numberOfElements == 1 emit"); _eventEmitter.callMember("emit", "data", _extractFromBuffer(finalBuffer, _receiveType, 0)); } else if (numberOfElements > 1) { if (_rawBytes && !_emitBatchDataAsAvailable) { int position = 0; for (int i = 0; i < numberOfElements; i++) { System.out.println("SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: " + _eventEmitter.get("iama") + " Reading: " + _extractFromBuffer(finalBuffer, _receiveType, position)); _eventEmitter.callMember("emit", "data", _extractFromBuffer(finalBuffer, _receiveType, position)); position += size; } } else { // Using message framing, so we output a single array. In Cape Code, when we run the above, we generate the following output SocketHelper.SocketWrapper._processBuffer() start: _eventEmitter: Socket() SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535
exports.initialize = function () { console.log('TCPSocketServer.js: initialize() start: port: ' + this.getParameter('port')); ... server.on('connection', function (serverSocket) { console.log('TCPSocketServer.js: initialize() connection: localPort: ' + serverSocket.localPort + ', remotePort: ' + serverSocket.remotePort()); ... serverSocket.on('data', function (data) { console.log('TCPSocketServer.js: initialize() data: ' + data); self.send('received', data); self.send('receivedID', connectionCount); }); Which, in Cape Code, correctly produces: TCPSocketServer.js: initialize() data: 65535 For the Node socket module, a Looking at the socket module in node at $PTII/org/terraswarm/accessor/accessors/web/hosts/node/node_modules/@accessors-modules/socket/socket.js exports.SocketWrapper = function(helper, eventEmitter, socket, sendType, receiveType, rawBytes, emitBatchDataAsAvailab\ le) { this.eventEmitter = eventEmitter; // FIXME: I Am A is for debugging so that we can identify objects. console.log('socket.js: SocketWrapper(): this.eventEmitter: ' + this.eventEmitter.iama); this.socket = socket; this.sendType = sendType; this.receiveType = receiveType; this.rawBytes = rawBytes; this.emitBatchDataAsAvailable = emitBatchDataAsAvailable; // If we get a data event, then process the buffer. var self = this; this.socket.on('data', function(buffer) { console.log('socket.js: SocketWrapper(): data event: this.eventEmitter: ' + self.eventEmitter.iama + ' socket \ buffer: ' + buffer); self.processBuffer(buffer); }); }; util.inherits(exports.SocketWrapper, EventEmitter); ... /** Process new buffer data. * @param buffer The buffer, or null to process previously received data. */ exports.SocketWrapper.prototype.processBuffer = function (buffer) { // FIXME: not sure if we should export this function console.log('socket.js: SocketWrapper.processBuffer(): this.eventEmitter: ' + this.eventEmitter.iama + ', buffer: ' + buffer); // Assume a numeric type. // int size = _sizeOfType(this.receiveType); var size = 2; var length = buffer.length; var numberOfElements = length / size; if (numberOfElements == 1) { console.log('socket.js: SocketWrapper.processBuffer() issueResponse: Numeric numberOfElements == 1 emit'); this.eventEmitter.emit('data', buffer); } else { var position = 0; for (var i = 0; i < numberOfElements; i++) { console.log('socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: ' + this.eventEmitter.iama + ' Reading: ' + buffer.readUInt16BE(position)); this.eventEmitter.emit("emit", "data", buffer.readUInt16BE(position)); position += size; } } } In the Node host, the above generates socket.js: SocketWrapper(): data event: this.eventEmitter: Socket() socket buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer(): this.eventEmitter: Socket(), buffer: ��^@^@^@��^@^@ socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 However, it seems that the data event that is emitted is never acted upon. Solution to emit problemIn this.eventEmitter.emit("emit", "data", buffer.readUInt16BE(position)); to this.eventEmitter.emit('data', buffer.readUInt16BE(position)); Node: Downstream Actors Don't See Tokens.To replicate: $PTII/bin/capecode $PTII/org/terraswarm/accessor/test/auto/TCPSocketUnsignedShortSimple.xml This model works fine under CapeCode. Under Node, when we generate code, it fails because the TrainableTest actor sees a 2 instead of 65536. This is because under Node, the TCPSocketServer produces 65535,0,1,65535,0,2 and the TrainableTest sees the last 2. In the CapeCode output below, we can see that we are emitting the data and then the Test accessor is getting the inputs SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: serverSocket data listener: 65535 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: serverSocket data listener: 0 0 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 TCPSocketServer.js: serverSocket data listener: 1 1 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: serverSocket data listener: 65535 65535 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: serverSocket data listener: 0 0 SocketHelper.SocketWrapper._processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 2 TCPSocketServer.js: serverSocket data listener: 2 2 Test: 0, input: 65535, referenceToken: 65535 Test: 1, input: 0, referenceToken: 0 Test: 2, input: 1, referenceToken: 1 Test: 3, input: 65535, referenceToken: 65535 Test: 4, input: 0, referenceToken: 0 Test: 5, input: 2, referenceToken: 2 TCPSocketClient.js: wrapup() In the node example, which was created from the Cape Code model and run with: (cd $PTII/org/terraswarm/accessor/accessors/web/cg; node ../hosts/node/nodeHostInvoke.js cg/TCPSocketUnsignedShortSimple) we can see that we are getting a socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: serverSocket data listener: 65535 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: serverSocket data listener: 0 0 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 1 TCPSocketServer.js: serverSocket data listener: 1 1 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 65535 TCPSocketServer.js: serverSocket data listener: 65535 65535 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 0 TCPSocketServer.js: serverSocket data listener: 0 0 socket.js: SocketWrapper.processBuffer() issueResponse: Numeric A: numberOfElements > 1 emit: Socket() Reading: 2 TCPSocketServer.js: serverSocket data listener: 2 2 socket.js: Socket netSocket on data: ��^@^@^@��^@^@^@ nodeHost.js: Error: Error: commonHost.js, react(), invoking a specific handler for "input": Exception occurred in input handler, which has now has been removed. Exception was: Error: TCPSocketUnsignedShortSimple.TrainableTest: The input "2" is not within "1e-9" of the expected value "65535" Stacktrace was: Error: TCPSocketUnsignedShortSimple.TrainableTest: The input "2" is not within "1e-9" of the expected value "65535" at Accessor.eval (eval at Accessor (/Users/cxh/src/ptII11.0.devel/org/terraswarm/accessor/accessors/web/hosts/common/commonHost.js:617:19), <anonymous>:179:27) |