skip to Main Content

I’m trying to write a unit test for a REDHAWK waveform. I would like to use stream sources to input data and stream/message sinks to store the output. I have written unit tests for components this way, but wanted to create a test for a waveform as well. I found a solution for connecting a StreamSource to a waveform’s port, but have not been able to determine how to connect a sink to a waveform port.

For a source and a component (where self.comp is the component), normally one can use the following to connect them:

src = StreamSource(streamId='strm1', format='short')

For a source and a waveform (where is the waveform), I was able to get the following to work:

src = StreamSource(streamId='strm1', format='short')

However, for a sink I would normally call connect on the component:

sink = StreamSink('short')
self.comp.connect(sink, usesPortName='dataShort_out')

I tried to use a similar approach as for the source case by getting the port from the waveform as below:

sink = StreamSink('short')'dataShort_out').connectPort(sink, 'outputConn')

However, this gives the error:

File "/usr/local/redhawk/core/lib/python/ossie/cf/", line 86, in connectPort
  return self._obj.invoke("connectPort", _0_CF.Port._d_connectPort, args)
BAD_PARAM: CORBA.BAD_PARAM(omniORB.BAD_PARAM_WrongPythonType, CORBA.COMPLETED_NO, ["Expecting object reference, got <class 'bulkio.sandbox.streamsink.StreamSink'>", "Operation 'connectPort' parameter 0"])

I am not sure how I can get a CORBA obj ref for the sink to use here. Or is there another approach I can use here to connect the port to the sink?

I am using REDHAWK 2.2.2 on Centos 7.



  1. Chosen as BEST ANSWER

    I think I have found a solution to my own question. I ended up creating a new class that manages port connections that works for both sinks and sources. I called it ConnectionManager (hopefully it won't be confused with the ossie.utils.model.connection.ConnectionManager class.

    class ConnectionManager:
        def __init__(self):
            self.connections = list()
        def clear(self):
            del self.connections[:]
        def connect(self, usesPort, providesPort, id):
            usesPort.connectPort(providesPort, id)
            self.connections.append( (usesPort, id))
        def disconnectAll(self):
            for port, id in self.connections:

    Here's an example using a StreamSource ( is a ConnectionManager):

    strm = sb.StreamSource(streamID='strm1', format='short')'shortOut'),

    And an example using a StreamSink:

    sink = sb.StreamSink('short')'dataShort_out'),

    My unit test setUp method has a call to and the tearDown method a call to to clean up the connections after each test.

    The only thing I don't understand is the names of the ports for the sink and source classes. Using the {format}{In|Out} names work, but I don't know why.

  2. The same process that you applied for connecting a component to a sink applies to an application, as long as the application is a sandbox object rather than a CORBA one:

    dom = redhawk.attach()
    app = dom.apps[0]
    sink = sb.StreamSink('short')

    The next code shows the names of the ports. In this case, there is just one of type short.

    from pprint import pprint

    The code below shows the syntax for using a CORBA reference instead of a sandbox object.

    sink_port = sink.getPort('shortIn')
    ref = app.ref
    ref.getPort('dataShort_out').connectPort(sink_port, 'outputConn')

    You can run a waveform in the sandbox. Note that the waveform’s components need to run on the local host.

    Use the nodeBooter shell command or kickDomain from the redhawk Python package to start a domain manager and a device manager.

    Sample code to run a waveform in the sandbox:

    import os
    from ossie.utils import redhawk, sb
    dom = redhawk.attach()
    SDRROOT = os.getenv('SDRROOT')
    waveform_dir = os.path.join(SDRROOT, 'dom', 'waveforms')
    waveform_name = os.listdir(waveform_dir)[0]
    app = dom.createApplication(waveform_name)
    sink = sb.StreamSink()
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top