Skip to content
This repository has been archived by the owner on Jan 16, 2021. It is now read-only.

Tame rendezvous windowing

tomquisel edited this page Jul 3, 2012 · 2 revisions

Rendezvous Windowing (pipelining)

When programming Web applications, or other distributed systems, I often find myself "RPC Windowing." That is, I have 1,000 RPCs to make but want to be polite and not blast them all at once. So the obvious solution is to set a window (like 5) and commit to having at most that many RPCs outstanding at a given time.

Here is a sample program (ex4.T).

#include "tame.h"
#include "arpc.h"
#include "parseopt.h"
#include "ex_prot.h"

tamed static void 
dostuff (const str &h, int port, cbb cb)
{
  // declare all of your "stack" variables here
  tvars {
    int fd (99999);
    ptr<axprt_stream> x;
    ptr<aclnt> cli;
    vec<int> res;
    vec<clnt_stat> errs;
    int ntot (40), window_sz (5), id;
    int nsent (0), nrecv (0);
    bool err (false);
    rendezvous_t<int> rv;
    bool ret (true);
    int return_id;
  }

  // Call tcpconnect, and block until it returns; when it does return,
  // assign the local variable 'fd' to the result.
  twait { tcpconnect (h, port, mkevent(fd)); }

  if (fd < 0) {
    warn ("%s:%d: connection failed: %m\n", h.cstr(), port);
    err = true;
  } else {
    res.setsize (ntot);
    errs.setsize (ntot);
    x = axprt_stream::alloc (fd);
    cli = aclnt::alloc (x, ex_prog_1);

    // Now do the pipelined/windowed RPCs
    while (nrecv < ntot) {
      if (nsent < ntot && nsent - nrecv < window_sz) {
	// Launch new calls since there is room in the window!
	cli->call (EX_RANDOM, NULL, &res[nsent], 
		   mkevent (rv, nsent, errs[nsent])); 
	nsent++;
      } else {
	// Harvest
	twait (rv, id);
	if (errs[id]) {
	  err = true;
	  warn << "RPC error: " << errs[id] << "\n";
	} else {
	  warn << "Success " << id << ": " << res[id] << "\n";
	}
	nrecv++;
      }
    }
    warn << "All done...\n";
  }
  (*cb) (!err);
}

static void finish (bool rc)
{
  exit (rc ? 0 : -1);
}

int
main (int argc, char *argv[])
{
  int port;
  if (argc != 3 || !convertint (argv[2], &port))
    fatal << "usage: ex2 <hostname> <port>\n";
  
  dostuff (argv[1], port, wrap (finish));
  amain ();
}

The details of establishing an RPC-over-TCP connection are exactly as before. The difference comes with how the asynchronous calls are fired off, and later joined.

cli->call (EX_RANDOM, NULL, &res[nsent], 
                    mkevent (rv, nsent, errs[nsent]));

Asynchronous calls launched outside of twait {...} block are first associated with a pointer to a rendezvous. A rendezvous_t is a simple object that will coordinate launches and joins for a given group of events (see libtame/tame_core.h for the class definition of a rendezvous_t). Each rendezvous_t is also associated with data types used in distinguishing callbacks from each other when they are eventually joined. In the given case, we are launching window_sz RPCs, labelled by an integer from 0 to _window_sz

  • 1_. A distinct value for i is associated with each call, and consequently, with each callback.

As before, the callback argument is given with mkevent(..) syntax, but with an extra arguments, to express the rendezvous the new callback belongs to, and also any bound-int values. When this function eventually "returns" by calling its callback, it will set the status of the RPC to the _i_th slot of the vector errs.

Once all RPCs in the windows are launched, control will continue to the subsequent twait statement:

   while (nrecv < ntot) {
      if (nsent < ntot && nsent - nrecv < window_sz) {
         // Launch new calls since there is room in the window!
         cli->call (EX_RANDOM, NULL, &res[nsent], 
                    mkevent (rv, nsent, errs[nsent])); 
         nsent++;
      } else {
         // Harvest
         twait (rv, id);
         if (errs[id]) {
           err = true;
           warn << "RPC error: " << errs[id] << "\n";
         } else {
           warn << "Success " << id << ": " << res[id] << "\n"
         }
         nrecv++;
       }
    }

The condition in the while loops returns true so long as there are calls that remain to be received. Thus, assuming that window_sz and ntot are greater than 0, control will reach the twait(); statement at least once. As the comments suggest, control will appear to block at the twait statement until one of the RPC calls from above completes. When control passes the twait(); statement, the closure variable id will be set with the values bounds to the joined asynchronous call at call time. If the call launched when i==2 above, then id will have the value 2. In this way, code after the twait(); statement can respond to the specific RPC that returned.

In the above example, the declaration of the rendezvous_t and all references to it are in the same function, but keeping all references to a rendezvous_t within a single function is not required. New callbacks can be added to rendezvous and can be subtracted via twait from anywhere where the given rendezvous is in scope. Of course, rendezvous_ts are regular C++ objects, so they can be passed to functions, declared extern, etc.

It is suggested that all coordination variables created with mkevent calls are later twaited on, and I have trouble thinking of a case in which one would not want to twait on all outstanding calls. However, programs should still work as expected if zombie calls are leftover. That is, the rendezvous will only be deallocated once all of its callbacks have been called.

Clone this wiki locally