Use TBB to Generate Dynamic Dependency Graph For Computation

Table of Contents

1 Overview

TBB's flow graph provides a way to do flow-based programming. For problems which can be described as a dependency graph, a flow graph provides a clear interface to describe the problem, with good performance and scalability across multiple CPU cores.

Typical TBB flow graph code looks like below:

using namespace tbb::flow;

int main() {
   graph g;
   function_node<int,int>
       f1( g, unlimited, [](const int &i) { return 2*i; } );
   function_node<float,float>
       f2( g, unlimited, [](const float &f) { return f/2; } );

   join_node< flow::tuple<int,float> > j(g);

   function_node< flow::tuple<int,float> >
       f3( g, unlimited,
           []( const flow::tuple<int,float> &t ) {
               printf( "Result is %f\n",
                       std::get<0>(t) + std::get<1>(t));
           } );

   make_edge( f1, input_port<0>( j ) );
   make_edge( f2, input_port<1>( j ) ;
   make_edge( j, f3 );

   f1.try_put( 3 );
   f2.try_put( 3 );
   g.wait_for_all( );
   return 0;
}

Example taken from tbb::flow::join_node documentation. 1

2 Dynamic Graph

The example above is quite simple, using a predefined computation flow graph. However in reality, the original graph usually is dynamically created somewhere, especially when trying to adapt TBB to a legacy system, and we will need to create the flow graph at runtime:

using fnode = tbb::flow::function_node<msg_t, msg_t>;

void computeFlowGraph(const std::vector<Node*>& nodes, const msg_t& computationParam) {
  std::vector<fnode*> fnodes;
  std::unordered_map<Node*, fnode*> nfmap;

  tbb::flow::graph g;

  for (auto n : nodes) {
    // not capturing *n directly to avoid copying
    auto f = new fnode(g, n->getConcurrentCount(),
                       [=](const msg_t& p) { return (*n)(p); } ));
    fnodes.push_back(f);
    nfmap[n] = f;
  }

  for (auto n : nodes) {
    for (auto p : n->getParents()) {
      tbb::flow::make_edge(*nfmap[p], *nfmap[n]);
    }
  }

  // get all the initial function nodes and start computation
  std::vector<fnode> initials = getInitialNodes(nfmap);
  for (auto fn: initials) {
    fn->try_put(computationParam);
  }

  g.wait_for_all();

  deletePointerVector(fnodes);
}

Because the topology is not known until we create the graph, the TBB flow graph nodes can only be placed on the heap so they will remain valid till the end of the computation.

3 Message Aggregation

Everything in TBB is fine and dandy, except the function_node might not be the one you expect: You are expecting a node to spawn only one task after receiving one message from everyone of its parents, not spawn one task per message received from any parent. The node needs to wait for all its parents to finish processing their message first.

There is a join_node looks to be an interesting candidate to solve this, but on inspection it's still not the expected one: the message type is of a fixed empty class continue_msg, and there is no way to pass any extra information through the join_node.

We see that actually no node type can pass extra information while waiting for all its parents at the same time. How to resolve this problem?

4 Pipeline

TBB provides another pattern called pipeline. 2 It simulates an assembly line which contains several processing stages. Only after a stage finishes its work, can the next stage start. Each stage can process independent input concurrently.

One constraint of tbb::parallel_pipeline is that TBB doesn't support non-linear pipelines. 3 This may seem to reduce the parallelism, however it only affects the latency since all of the stages can always be processed in parallel.

To express a complex pipeline as a linear one, sort the stages.

However there is still a limitation of pipeline: Before a message can be passed to a child node, we need to aggregate all messages from its parents. Pipeline cannot deal with this situation.

5 Merge Node

To solve the "1 task per N messages" problem, all parents' messages should to be merged into one, then passed onto the child function_node. The general idea is like this:

join_node<tuple<msg_t...>> \rightArrow function_node<msg_t, msg_t>

From the interface we can tell join_node is lightweight that only gathers the input from its parents into a tuple then passes the tuple to its children. If we chain the join_node with a function_node, there should be little cost.

But here comes another problem: How to store these nodes of different types. The entire graph is created dynamically, and all nodes need to be kept alive before computation finishes. Take below topology for example:

dependency-graph-example.png

auto *afnode = new function_node<msg_t, msg_t>(...);
auto *bfnode = new function_node<msg_t, msg_t>(...);
auto *cjnode = new join_node<tuple<msg_t, msg_t>>(...);
auto *cfnode = new function_node<msg_t, msg_t>(...);
make_edge(*cjnode, cfnode);
auto *djnode = new new join_node<tuple<msg_t, msg_t, msg_t>>(...);
auto *dfnode = new function_node<msg_t, msg_t>(...);
make_edge(*djnode, dfnode);

As we can see from above code, join_node which join different number of nodes are of different types. We cannot put them into a std::vector<join_node<...>*> to keep them alive until the end of computation.

Imagine the graph can be rather complicated where a node can have at most 10 parents! Either there will be many node vectors corresponding to each node type, or we have to find another way to erase the type of each node and restore the type information later for making edges and destruction.

Suppose there is a merge_node which will wrap a join_node and connect it to a function_node. These nodes above will be: (Nodes E & F omitted)

auto *A = new function_node<msg_t, msg_t>(...);
auto *B = new function_node<msg_t, msg_t>(...);
auto *C = new merge_node(new join_node<tuple<msg_t, msg_t>>(...),
                         new function_node<msg_t, msg_t>(...));
auto *D = new merge_node(new join_node<tuple<msg_t, msg_t, msg_t>>(...),
                         new function_node<msg_t, msg_t>(...));

6 Erase/Restore Type Information

In above example, the type of a node is related to how many parents it has. If you remember the old C trick: every pointer can be converted to a void*, and convert back to a proper type. Using this trick, we can keep the pointer to the join_node as a void*. Only convert it to the correct type when needed (i.e. making edge and inside destructor) by doing a runtime dispatch based on how many parents a node has.

template <typename msg_t, typename merge_t>
class merge_node {
  const int n_parent = 0;
  void* jnode = nullptr;
  void* fnode = nullptr;
  merge_t merge;

public:
  template <typename Body>
  merge_node(tbb::flow::gaph& g, const Body& body, int n_parent);

  ~merge_node();
};

template <typename msg_t, typename merge_t>
template <typename Body>
merge_node<msg_t, merge_t>::merge_node(tbb::flow::graph& g, size_t concurrency,
                                       const Body& body, int n_parent)
  : n_parent(n_parent) {
  if (n_parent <= 1) {
    fnode = new function_node<msg_t, msg_t>(g, body);
  } else {
    switch (n_parent) {
    case 2: {
      auto* jn = new join_node<tuple<msg_t, msg_t>>(g);
      auto* fn = new function_node<tuple<msg_t, msg_t>, msg_t>(g, concurrency,
                    [=](const tuple<msg_t, msg_t>& msgs) {
                      return body(merge(msgs));
                    });
      make_edge(*jn, *fnode);
      jnode = jn;
      fnode = fn;
      break;
      }
    // case 3 and the rest is similar to case 2
    }
  }
}

template <typename msg_t, typename merge_t>
merge_node<msg_t, merge_t>::~merge_node() {
  assert(jnode);
  switch (n_parent) {
  case 0:
  case 1:
    assert(fnode && !jnode);
    delete reinterpret_cast<function_node<msg_t, msg_t>*>(fnode);
    return;

  case 2:
    assert(fnode && jnode);
    delete reinterpret_cast<function_node<tuple<msg_t, msg_t>, msg_t>*>(fnode);
    delete reinterpret_cast<join_node<tuple<msg_t, msg_t>>*>(jnode);
    return;
  // case 3 and the rest is similar to case 2
  }
}

Of course you can use macros to remove the many duplications here. For the complete implementation, please refer to mergenode sources.

7 Known Limitations

Because TBB doesn't support join_node with more than 10 parents, merge_node doesn't either. But you can always have a workaround by creating an intermediate node that merge 0~9 nodes and merge its output with the rest 10~N nodes.

However, usually this number of parents would indicates a bottle neck in the graph, and probably a poor graph design.

This code has been found with several bugs due to carelessness. So be careful and use it as a POC only.

8 Further Improvements

Sometimes your function node will not always pass a message to the child nodes. Then you probably want to use the tbb::flow::multifunction_node so whether pass a message to child nodes can be controlled. The implementation is pretty much the same with a few tweaks. 4

Footnotes:

HomeTop