Notes /
GDPJSCallbacksThe GDP is no longer supported by the Accessors Framework. This page exists as a historical reference. Problem StatementWe want to use callbacks so that the GDPLogSubscriber accessor is a spontaneous accessor. Under Cape Code, we now have a spontaneous accessor. Under Node, we were using polling, where the accessor had an input. However, now, under Node, the GDPLogSubscriber is not generating output. How to run what we haveTo run the Cape Code version: To run the Node Host Composite Accessor version: The problem is that the TrainableTest connected to the GDPLogSubscriber does not run: nodeHost.js: invoking wrapup() for accessor: GDPLogSubscribeJS.GDPLogSubscribe2 nodeHost.js: invoking wrapup() for accessor: GDPLogSubscribeJS.GDPLogAppend2 nodeHost.js: while invoking wrapup() of all accessors, an exception was thrown: Error: GDPLogSubscribeJS.TrainableTest2The input handler of this accessor was never invoked. Usually, this is an error indicating that starvation is occurring. bash-3.2$ What we needWhen we first call subscribe, we need to register a callback that when invoked, will send data to the output port of GDPSubscriber. The callback will be invoked by the GDP C code that gets an event that a subscription is present. The callback eventually needs to invoke JavaScript code that sends the data. So, we need a way to tell the GDP C code to callback to invoke a JavaScript callback. Subscription FilesA review of files is in order:
Building the GDPIn Ptolemy II, the GDP source code is typically installed in Or: cd $PTII/vendors mkdir gdp cd gdp git clone repoman@repo.eecs.berkeley.edu:projects/swarmlab/gdp.git cd gdp Install Macports ./adm/gdp-setup make all Subscription CallbacksThe GDP has C support for callbacks. The (FIXME: There are a number of different files that are called in part because the code has evolved. Initially, the JavaScript that invokes the GDP C code was a line-for-line port of GDP C utilities from C to JavaScript.)
In datum = gdp_datum_new_js(); ... var crec = 0; // counts records read - an index into recarray_out[] ... evtype_int = gdp_event_gettype_js(gev_Ptr); //C //C // decode it switch (evtype_int) { case GDP_EVENT_DATA: ... if (gdp_event_cbfunc) { // TBD1 - cleanup here // do this stashing into recarray_out even if we // don't have a non-null gdp_event_cbfunc() // grab record contents for this newly read record var val = gdp_datum_buf_as_string(datum); // grab record number and timestamp for this newly read record var ts = gdp_datum_getts_as_string_js(datum, true /* format */ ); // TBD: below check for 64-bit integer return type, gdp_recno_t var rn = gdp_datum_getrecno_js(datum); // TBD: check that recno and rn agree - which to use here? recarray_out[crec] = { recno: rn, // for now we use gdp's record number timestamp: ts, value: val }; crec++; if (debug) { console.log( 'gdpjs.js: do_multiread(): GDP_EVENT_DATA: about to call gdp_event_cbfunc()' ); } // TBD1 - cleanup here gdp_event_cbfunc(evtype_int, datum, recarray_out); } From the above, the callback method should take the following arguments
Note that the version of do_multiread() in void multiread_cb(gdp_event_t *gev) { (void) multiread_print_event(gev, false); gdp_event_free(gev); } ... do_multiread(gdp_gcl_t *gcl, gdp_recno_t firstrec, const char *dtstr, int32_t numrecs, bool subscribe, bool use_callbacks) { ... if (use_callbacks) cbfunc = multiread_cb; ... if (subscribe) { ... estat = gdp_gcl_subscribe(gcl, firstrec, numrecs, NULL, cbfunc, NULL); Node ffi Callbacks.
The Stackoverflow page is very useful. What we want to do is call In Formerly, the code looked like: var gdp_event_cbfunc = null // No callback needed when just logging. Instead, we define an event callback: // Define an event callback. // gdp.h defines: // typedef void (*gdp_event_cbfunc_t)( // the callback function // gdp_event_t *ev); // the event triggering the call // FIXME: Copied from gdpjs.sj var ffi = require('ffi'); var ref = require('ref'); var gdp_event_t = ref.types.void; // opaque for us up here in JS var gdp_event_tPtr = ref.refType(gdp_event_t); // End of copy // Pick one of the next two var statements. //var gdp_event_cbfunc = null // No callback needed when just logging. var gdp_event_cbfunc = ffi.Callback('void', [gdp_event_tPtr], function(gdp_event) { console.log("CallBack!!!!!!!!!!!!!!"); }); Unfortunately, it seems like our callback is never called. Also, we probably need to change Subscription AnalysisI have some very ugly example code created by someone else. The idea, which was not a bad idea, was to rewrite the GDP C code as JavaScript. We ended up with a read_gcl_records() JavaScript method that takes a bunch of args and tries to do too much. That method is used in gdp.js and defined in gdpjs.js. Here's what happens in the Subscription Accessor at Initialize registers a handler for /** Add an input handler that will subscribe to a log. */ exports.initialize = function() { var self = this; // Set an input handler to unsubscribe and then invoke this initialize() // function when a new logname is provided. this.addInputHandler('logname', function() { // If there is an open subscription, close it. self.exports.wrapup.call(self); // Open the new subscription. self.exports.subscribe.call(self); }); self.exports.subscribe.call(self); } In /** If a non-empty logname is given, subscribe to the log. */ exports.subscribe = function() { var self = this; var logname = this.get('logname'); if (logname === '') { // Nothing more to do. return; } var logdname = this.get('logdname'); // Create or connect to a log. // The second argument specifies to open the log "read only." log = new GDP.GDP(logname, 1, logdname); // Listen for data from the log. log.on('data', function(data) { console.log('****** received: ' + data); self.send('data', data); console.log('****** sent data: ' + data); }); log.setDebugLevel(this.getParameter('debugLevel')); // Subscribe to the log so that 'data' events are emitted. log.subscribe( this.getParameter('startrec'), this.getParameter('numrec'), this.getParameter('timeout')); }; Focusing on this call log.subscribe( this.getParameter('startrec'), this.getParameter('numrec'), this.getParameter('timeout')); Earlier in GDPLogSubscribe.xml, we have var GDP = require('gdp'); ... exports.subscribe = function() { ... log = new GDP.GDP(logname, 1, logdname); For the Node Host, the /** Subscribe. * @param self The CapeCode subscribe() function passes "this" as its first argument, * so we need to do the same. * @param {int} startrec The starting record. The first record is record 1. * @param {int} numrecs The number of records * @param {int} timeout The timeout in milliseconds. */ exports.GDP.prototype.subscribe = function (self, startrec, numrecs, timeout) { // console.log("node_modules/gdp/gdp.js:subscribe(" + startrec + ", " + numrecs + ", " + timeo\ ut + ")"); var gdpd_addr = gdpd_addr; var gcl_name = this.name var gcl_firstrec = startrec; var gcl_numrecs = numrecs var gcl_subscribe = true; var gcl_multiread = false; var conout = true; // recdest = -1 so output to console.log() var wait_for_events = false; // don't wait indefinitely in read_gcl_records() for var gcl_get_next_event = false; // See {$ACCESSORS_HOME}/wiki/Main/GDPJSCallbacks // and http://stackoverflow.com/questions/36819679/node-fficall-c-lib-with-callback-function // Define an event callback. // gdp.h defines: // typedef void (*gdp_event_cbfunc_t)( // the callback function // gdp_event_t *ev); // the event triggering the call // FIXME: Copied from gdpjs.sj var ffi = require('ffi'); var ref = require('ref'); var gdp_event_t = ref.types.void; // opaque for us up here in JS var gdp_event_tPtr = ref.refType(gdp_event_t); // End of copy var gdp_event_cbfunc = null // FIXME: not sure about this: no callback needed when just logging //var gdp_event_cbfunc = ffi.Callback('void', [gdp_event_tPtr], // function(gdp_event) { // console.log("CallBack!!!!!!!!!!!!!!"); // }); console.log("node_modules/gdp/gdp.js:subscribe(): about to call read_gcl_records()"); var data = gdpjs.read_gcl_records(gdpd_addr, gcl_name, gcl_firstrec, gcl_numrecs, gcl_subscribe, gcl_multiread, /*recdest,*/ conout, gdp_event_cbfunc, wait_for_events, gcl_get_next_event ); console.log("node_modules/gdp/gdp.js:subscribe(): returned from read_gcl_records(). data:"); console.log(data); //if (data.records.length > 0) { // return data.records[0].value; //} else { // return null; //} }; This code is pretty gross but it calls For subscription, if (gcl_subscribe || gcl_multiread) { // DEBUG TBD1 // true for reader-test.js; false for gdpREST_server.js if (!gcl_get_next_event) { // gcl_get_next_event is false, so we have not yet subscribed to the log. console.log('gdpjs.js: read_gcl_records(): before do_multiread()' ); estat = do_multiread(gcl_Ptr, gcl_firstrec, gcl_numrecs, gcl_subscribe, wait_for_events, recarray_out, conout, gdp_event_cbfunc ); } else { // gcl_get_next_event is true, so we have already subscribed to the log. console.log('gdpjs.js: read_gcl_records(): before do_multiread_inner()' ); estat = do_multiread_inner(gcl_Ptr, gcl_firstrec, gcl_numrecs, gcl_subscribe, wait_for_events, recarray_out, conout, gdp_event_cbfunc, /* Timeout in seconds */ 1 ); } } else { estat = do_simpleread(gcl_Ptr, gcl_firstrec, gcl_numrecs, recarray_out, conout ); }
Looking at In estat = do_multiread(gcl, firstrec, dtstr, numrecs, subscribe, use_callbacks); /* ** DO_MULTIREAD --- subscribe or multiread ** ** This routine handles calls that return multiple values via the ** event interface. They might include subscriptions. */ EP_STAT do_multiread(gdp_gcl_t *gcl, gdp_recno_t firstrec, const char *dtstr, int32_t numrecs, bool subscribe, bool use_callbacks) { EP_STAT estat; void (*cbfunc)(gdp_event_t *) = NULL; EP_TIME_SPEC ts; if (use_callbacks) cbfunc = multiread_cb; // are we reading by record number or by timestamp? if (dtstr != NULL) { // timestamp estat = ep_time_parse(dtstr, &ts, EP_TIME_USE_LOCALTIME); if (!EP_STAT_ISOK(estat)) { fprintf(stderr, "Cannot convert date/time string \"%s\"\n", dtstr); return estat; } } if (subscribe) { // start up a subscription if (dtstr == NULL) estat = gdp_gcl_subscribe(gcl, firstrec, numrecs, NULL, cbfunc, NULL); else estat = gdp_gcl_subscribe_ts(gcl, &ts, numrecs, NULL, cbfunc, NULL); } else { { // make the flags more user-friendly if (firstrec == 0) firstrec = 1; // start up a multiread if (dtstr == NULL) estat = gdp_gcl_multiread(gcl, firstrec, numrecs, cbfunc, NULL); else estat = gdp_gcl_multiread_ts(gcl, &ts, numrecs, cbfunc, NULL); } // check to make sure the subscribe/multiread succeeded; if not, bail if (!EP_STAT_ISOK(estat)) { char ebuf[200]; ep_app_fatal("Cannot %s:\n\t%s", subscribe ? "subscribe" : "multiread", ep_stat_tostr(estat, ebuf, sizeof ebuf)); } // this sleep will allow multiple results to appear before we start reading if (ep_dbg_test(Dbg, 100)) ep_time_nanosleep(500000000); //DEBUG: one half second // now start reading the events that will be generated if (!use_callbacks) { for (;;) { // get the next incoming event gdp_event_t *gev = gdp_event_next(NULL, 0); // print it estat = print_event(gev, subscribe); // don't forget to free the event! gdp_event_free(gev); EP_STAT_CHECK(estat, break); } } else { // hang for an hour waiting for events sleep(3600); } return estat; } For Subscription, this boils down to void (*cbfunc)(gdp_event_t *) = NULL; ... cbfunc = multiread_cb; ... estat = gdp_gcl_subscribe(gcl, firstrec, numrecs, NULL, cbfunc, NULL); ... sleep(3600); So, in JavaScript, we should use this simple method and avoid the hell of Now, the data is visible in the callback.
exports.GDP.prototype.subscribe = function (self, startrec, numrecs, timeout) { ... var gdp_event_cbfunc = ffi.Callback('void', [gdp_event_tPtr], function(gdp_event) { console.log("gdp.js CallBack!!!!!!!!!!!!!!"); console.log(gdp_event); if (typeof gdp_event !== 'undefined') { console.log("gdp.js callback: gdp_event is defined."); // Here, we want to first get the datum. var datum = gdpjs.gdp_event_getdatum_js(gdp_event); console.log("gdp.js callback: datum:") console.log(datum); console.log("gdp.js callback: about to try to print datum"); gdpjs.gdp_datum_print_stdout_js(datum); var data = gdpjs.gdp_datum_buf_as_string(datum); console.log("gdp.js: callback: data is: "); console.log(data); // Then call _notifyIncoming, which is defined in ptolemy/actor/lib/jjs/modules/gdp/GDP.js //_notifyIncoming(data); // gdp/gdp/doc/gdp-programmatic-api.html // says that for gdp_gcl_subscribe(), "It is // the responsibility of the callback function // to call gdp_event_free(gev)." gdpjs.gdp_event_free_js(gdp_event); } }); console.log("node_modules/gdp/gdp.js:subscribe(): about to call read_gcl_records()"); var data = gdpjs.read_gcl_records(gdpd_addr, gcl_name, gcl_firstrec, gcl_numrecs, gcl_subscribe, gcl_multiread, /*recdest,*/ conout, gdp_event_cbfunc, wait_for_events, gcl_get_next_event ); Here, the problem is that we have the data in the callback method and we want to emit it. In CapeCode, public void subscribe(int startRecord, int numberOfRecords, int timeout) throws GDPException { EP_TIME_SPEC timeoutSpec = null; if (timeout != 0) { timeoutSpec = new EP_TIME_SPEC(timeout/1000, 0, /* nanoseconds */ 0.001f /* accuracy in seconds */); } // FIXME: We need to cast to a long here because it seems // like passing longs from JavaScript does not work for us. _gcl.subscribe((long)startRecord, numberOfRecords, timeoutSpec); Runnable blocking = new Runnable() { public void run() { while (_subscribed) { // Last argument is a timeout in ms. When it expires, if there // is no data, then an empty HashMap is returned. // FIXME: Any way to set the timeout to wait forever? final HashMap<String, Object> gdpEvent = GDP_GCL.get_next_event(_gcl, 10000); if (gdpEvent != null) { if (gdpEvent.size() > 0) { // Issue the response in the director thread. _issueResponse(() -> { HashMap<String,Object> result = (HashMap<String,Object>)gdpEvent.get("datum"); System.out.println("GDPHelper.subscribe(): about to call _notifyIncoming " + result.toString()); _currentObj.callMember("_notifyIncoming", _datumToData(result)); }); } // FIXME: The code in GDPHelper should // probably call gdp_event_free() // vendors/gdp/gdp/doc/gdp-programmatic-api.html // says that for gdp_gcl_subscribe(), "It is // the responsibility of the callback function // to call gdp_event_free(gev)." // Also vendors/gdp/gdp/apps/gdp-reader.c defines the callback as: // void // multiread_cb(gdp_event_t *gev) // { // (void) print_event(gev, false); // gdp_event_free(gev); // } } else { _subscribed = false; } } } }; _subscribed = true; Thread thread = new Thread(blocking, "GDP subscriber thread: " + _logName); // Start this as a deamon thread so that it doesn't block exiting the process. thread.setDaemon(true); thread.start(); } The key part is: _currentObj.callMember("_notifyIncoming", _datumToData(result));
See Also
|