I’m trying to write a unit test for a REDHAWK waveform. I would like to use stream sources to input data and stream/message sinks to store the output. I have written unit tests for components this way, but wanted to create a test for a waveform as well. I found a solution for connecting a StreamSource to a waveform’s port, but have not been able to determine how to connect a sink to a waveform port.
For a source and a component (where self.comp
is the component), normally one can use the following to connect them:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=self.comp,
providesPortName='dataShort_in',
connectionId='testConn')
For a source and a waveform (where self.app
is the waveform), I was able to get the following to work:
src = StreamSource(streamId='strm1', format='short')
src.connect(providesComponent=CorbaObject(self.app.getPort('dataShort_in')),
connectionId='testConn')
However, for a sink I would normally call connect
on the component:
sink = StreamSink('short')
self.comp.connect(sink, usesPortName='dataShort_out')
I tried to use a similar approach as for the source case by getting the port from the waveform as below:
sink = StreamSink('short')
self.app.getPort('dataShort_out').connectPort(sink, 'outputConn')
However, this gives the error:
File "/usr/local/redhawk/core/lib/python/ossie/cf/Port_idl.py", line 86, in connectPort
return self._obj.invoke("connectPort", _0_CF.Port._d_connectPort, args)
BAD_PARAM: CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType, CORBA.COMPLETED_NO, ["Expecting object reference, got <class 'bulkio.sandbox.streamsink.StreamSink'>", "Operation 'connectPort' parameter 0"])
I am not sure how I can get a CORBA obj ref for the sink to use here. Or is there another approach I can use here to connect the port to the sink?
I am using REDHAWK 2.2.2 on Centos 7.
2
Answers
I think I have found a solution to my own question. I ended up creating a new class that manages port connections that works for both sinks and sources. I called it
ConnectionManager
(hopefully it won't be confused with theossie.utils.model.connection.ConnectionManager
class.Here's an example using a
StreamSource
(self.cm
is aConnectionManager
):And an example using a
StreamSink
:My unit test
setUp
method has a call toself.cm.clear()
and thetearDown
method a call toself.cm.disconnectAll()
to clean up the connections after each test.The only thing I don't understand is the names of the ports for the sink and source classes. Using the
{format}{In|Out}
names work, but I don't know why.The same process that you applied for connecting a component to a sink applies to an application, as long as the application is a sandbox object rather than a CORBA one:
The next code shows the names of the ports. In this case, there is just one of type short.
The code below shows the syntax for using a CORBA reference instead of a sandbox object.
You can run a waveform in the sandbox. Note that the waveform’s components need to run on the local host.
Use the
nodeBooter
shell command orkickDomain
from theredhawk
Python package to start a domain manager and a device manager.Sample code to run a waveform in the sandbox: