mr-edd.co.uk :: horsing around with the C++ programming language

An asynchronous function call mechanism: futures

[12nd July 2007]

In this post I'll be continuing to discuss an implementation of an asynchronous function call mechanism, a library that will allow us to kick off a function in the background and get the result at a later time, when ready. Here's an example:

std::size_t count_primes_upto(std::size_t n);

// Create an "invoker" object that will make the eventual call
// and catch any exceptions that escape.
async::layered_cage<std::exception, async::catch_all> cage;

// call count_primes_upto in another thread, don't block
async::future<std::size_t> n = async::call(cage, &count_primes_upto, 9876543210);

// do some other stuff in the mean time
// ...

// We're now ready for the result of count_primes_upto().
// The following call will block until the result has been calculated.
// It will return the result or propagate an exception.
std::size_t num_primes = n.value();

In my previous post I described how layered_cage is implemented and introduced the general concept of an invoker, which I recommend at least skim-reading to get up to speed. We'll now go on to look at the implementation of the future<> template class. You'll get the most out of this post if you're already familiar with boost.threads, boost.bind and boost.function.

EDIT 5th August 2007: async now has it's own permanent home, here. If you're more interested in the usage of the library, as opposed to the implementation, you should go there instead.

What's in the future?

A future is an object that represents a value that may not have yet materialised. When we try to access the object that the future represents, we will have to wait until the object comes in to existence. Of course, if the future's object already exists, we won't have to wait at all and we can get what we're after immediately.

I should mention that I didn't invent the concept, but this particular implementation was created from scratch. The interface is simple:

namespace async
{
    template<typename ValueType>
    struct future_result
    {
        typedef const ValueType &type;
    };

    template<>
    struct future_result<void>
    {
        typedef void type;
    };

    template<typename ValueType>
    class future
    {
        public:
            typedef ValueType value_type;
            typedef typename future_result<ValueType>::type result_type;

            future(std::auto_ptr<future_worker<value_type> > &core);

            result_type value() const;

            void wait() const;

        private:
            boost::shared_ptr<future_worker<value_type> > core_;
    };

}

There are a couple of additional member functions that could be added, such as one to query whether an exception will be thrown when value() is called, but I'm going to keep it simple for the time being.

The value() member function will block until the future object has materialised and then either return it, or propagate an exception of some kind. The wait() function will block until the future value is ready but will not return or throw anything.

The future has two internal typedefs. value_type is essentially the type of object that the future holds. result_type is defined in terms of a future_result<> meta function, which is usually equivalent to const value_type & and is what the value() member function will return.

But we need to make special provisions when calling functions that have a return type of void, because there's no such thing as a const void &. So the future_result<> meta function returns void when its argument is void.

The next thing to notice is that the only constructor takes an auto_ptr<future_worker<value_type> >. I thought the future concept useful enough to make it usable outside of the async::call() implementation. So by deriving from the abstract future_worker strategy class, anyone can implement a future that waits for a value to emerge from any kind of asynchronous process. For instance, you could build a class that implements the future_worker interface to wait for some data being sent over a high-latency network.

So let's have a look at future_worker<>.

namespace async
{
    template<typename ValueType>
    class future_worker : boost::noncopyable
    {
        public:
            typedef ValueType value_type;
            typedef typename future_result<ValueType>::type result_type;

            future_worker() : started_(false) { }

            virtual ~future_worker() { }

            void start()
            {
                if (!started_)
                {
                    started_ = true;
                    start_impl();
                }
            }

            void wait() { wait_impl(); }

            result_type value()
            {
                wait_impl();
                return value_impl();
            }

        private:
            virtual void start_impl() = 0;
            virtual result_type value_impl() = 0;
            virtual void wait_impl() = 0;

        private:
            bool started_;
    };
}

We have wait() and value() functions that a future<> can call to do its dirty work. There's also a start() function that a future<> calls in its constructor in order to kick off the process that makes the value appear, if it hasn't been started already.

In order to implement a custom future_worker, we must derive from this class and implement the private pure virtual functions. Here, I've gone with the template method pattern as it provides a nice division of responsibility. The wait_impl() function is only concerned with waiting until the task has finished. value_impl() is only called once wait_impl() has returned and so it can simply return something and not have to worry about synchronisation issues. The implementer is also guaranteed that start_impl() is only called once so they don't have to worry about checking to see if the task is already under way.

Ensuring threads aren't hard-killed on exit

On some systems, if threads started by the program haven't finished by the time the program leaves main(), they're hard-killed. This means they're stopped dead in their tracks. No destructors are called for objects active in those threads and so none of the corresponding resources are released. This is pretty bad and we'd like a way to avoid the situation, if possible.

You'll notice that the future<> constructor takes an auto_ptr rather than a boost::shared_ptr, which I usually prefer. It's rare that a client of the library will have to invoke the constructor themselves, anyway. But I chose std::auto_ptr<> here for a number of reasons:

  1. it communicates that the future_worker given must be dynamically allocated
  2. it communicates that the ownership of the future_worker is being transferred to the future
  3. and most importantly it allows us to create the future's boost::shared_ptr<> member object with a custom deleter.

By default a boost::shared_ptr<> invokes delete on the pointer it holds when the final copy is destroyed. But one incredibly useful feature of boost::shared_ptr<> is that it allows us to call a custom functor to do the deletion or clean-up by passing such a functor to the shared_ptr constructor as the second argument.

We can take advantage of this by creating a custom deleter that calls wait() on the future_worker before delete-ing it. This way, when the last copy of a future goes out of scope, the deleter is invoked and the worker thread must finish before the program can continue. However, by keeping a copy of the future<> available beyond the end of the scope, the worker thread is allowed to continue.

namespace detail
{
    template<typename ResultType>
    struct deleter
    {
        void operator() (future_worker<ResultType> *kill_me) const
        {
            if (kill_me)
            {
                try { kill_me->wait(); } // shouldn't throw but...
                catch(...) { } // do this else we might be letting an exception escape from a destructor.

                delete kill_me;
            }
        }
    };
}

// Now future's constructor looks like this:
template<typename ValueType>
class future
{
    public:
    //...
        future(std::auto_ptr<future_worker<value_type> > &core) :
            core_(core.release(), detail::deleter<value_type>()) // install custom deleter
        {
            assert(core_);
            core_->start();
        }

    // ...
};

At first, I thought this approach was somewhat controversial, but I think it's the right thing to do. Most importantly it solves the problem of hard-killing threads before their time. But what if you're in the situation where you want to kick off a function in the background and don't care about the return value?

Here you have two options. The first is just to use boost::thread and boost::bind, though you don't get async::call()'s exception handling. The second option is to keep a copy of the future in some globally accessible pool of background tasks. All the tasks are then wait()ed upon when main() is left. I hope to provide this kind of facility in the library once it's finished, perhaps something like:

async::future<int> v = async::call(cage, &calc_variance, dataset);

async::bg(v); // this would copy v in to a global background pool

The thing that still disturbs me is that if an exception is thrown after a future is obtained from an async::call() invocation, the exception will have to wait until the future's task has finished before it can escape the current scope. For example:

async::future<int> v = async::call(cage, &calc_variance, dataset);

throw some_exception(); // a pause here while v's destructor waits for calc_variance to finish

Is this a bit weird? I don't know. Is it dangerous? I don't think so, unless your code is littered with dodgy race conditions anyway. Answers on a post card!

The custom future_worker used by async::call

So we've seen that a future's behaviour is determined by the future_worker<> strategy object that is given to it. We're clearly going to need an appropriate future_worker implementation that can be created inside async::call().

Using boost::bind it's pretty easy to take a function and a bunch of arguments and turn it in to a nullary functor[1] with the same return type as the original function.

For this reason, we'll assume that our custom future_worker implementation will be given a boost::function<R ()> on construction, where R is the return type of the function we'll be calling asynchronously. We'll give our custom future_worker the inspired moniker of call_future_worker, because it'll only be used to implement the async::call() overloads.

So, let's have a look at what call_future_worker will have to do:

  1. It will have to create a boost::function<void ()> from the boost::function<R ()> that will store the return value of the latter somewhere when called
  2. It will also have to call this boost::function<void ()> in a child thread through an invoker object, such as an async::layered_cage<>

Now, the type of invoker object that async::call() accepts is arbitrary. So we're going to need to parameterize call_future_worker on both the invoker type and on the return type of the function. We'll need a constructor that takes a function and an invoker object as these are the two key ingredients we'll need to make the call:

template<typename Invoker, typename ValueType>
class call_future_worker : public future_worker<ValueType>
{
    public:
        typedef typename future_worker<ValueType>::result_type result_type;
        typedef typename future_worker<ValueType>::value_type value_type;

        call_future_worker(const Invoker &invoker, const boost::function<ValueType ()> &f);

        // ...

    private:
        Invoker invoker_;
        result_store<value_type> store_;
        boost::function<void ()> func_;

Now we somehow need to turn the boost::function<ValueType ()> given to the constructor in to a boost::function<void ()> that stashes the ValueType return value somewhere.

So we'll create a result_store class in which the return value can be placed once ready. This will need to have appropriate synchronisation mechanisms in place because the caller and callee threads will be potentially accessing this result store object at the same time; the worker thread will be writing to it and the parent thread reading from it through the future. Most of the call_future_worker's functionality will be simple calls to functions of an associated result_store.

Here's the result_store interface:

template<typename ReturnType>
class result_store : boost::noncopyable
{
    public:
        void store_value(const ReturnType &rv); // called by the worker thread when done

        void wait(); // block until a value has been stored

        const ReturnType &result(); // assumes wait() has been called

        exception_store &ex_store(); // result() will throw if this ends up with an exception inside

        boost::mutex &monitor(); // for mutual exclusion to the store

        boost::condition &call_finished(); // will be signalled when the function has finished

        static void void_return_adaptor(result_store &store, const boost::function<ReturnType ()> &f);
};

I also had to create a specialisation for when ReturnType is void. The interface is a little different, but the general idea is the same, so I won't bother showing it here.

The idea is that the body of the worker thread will take out a lock on the associated result_store's monitor() mutex until the function call is complete. At this point the call_finished() condition variable will signal to any thread watching that the function is done. wait() watches this condition variable from the parent thread and will only return once signalled as described.

At this point the parent thread can call result() to get the return value of the function, or have any exception propagated from the internal exception_store (I presented the exception_store class in the previous post).

Now what's this void_return_adaptor() function about? This is what we'll use to turn our boost::function<ReturnType ()> in to a boost::function<void()>. Here's the body of the function:

static void void_return_adaptor(result_store &store, const boost::function<ReturnType ()> &f)
{
    store.store_value(f());
}

Now using boost::bind and boost::ref, we can do:

// assuming f1 is a boost::function<ReturnType ()>
using namespace boost;

function<void ()> f2 = bind(&result_store<ReturnType>::void_return_adaptor, ref(a_result_store), f1);

f2(); // calls f1 and stores the return value in a_result_store

So by giving our call_future_worker its own result_store we can employ code such as that above to create a void-returning nullary functor suitable for passing to an invoker object. Here's the call_future_worker constructor implementation:

call_future_worker(const Invoker &invoker, const boost::function<ValueType ()> &f) :
    invoker_(invoker),
    func_(boost::bind(result_store<value_type>::void_return_adaptor, boost::ref(store_), f))
{
}

The worker thread body

Now that we have our boost::function<void ()> to call through our invoker object, we need to define the body of the worker thread. To do this, we add a thread_body member function to call_future_worker, so that it has access to the result_store and the all-important functor.

void thread_body()
{
    boost::mutex::scoped_lock l(store_.monitor());
    try { invoker_.call(func_, store_.ex_store()); }
    catch(...)
    {
        store_.ex_store().store_exception(unhandled_exception());
        store_.call_finished().notify_one();
        throw;
    }
    store_.call_finished().notify_all();
}

So, as I mentioned earlier, the result_store's mutex is locked until the function call has been completed. We call func_ via the invoker object. It's perfectly possible that an exception may still escape, so we catch it and store an async::unhandled_exception object in the exception store. I won't show the code for async::unhandled_exception here. It's a trivial sub-class of std::exception.

If an exception managed to escape the invoker object's clutches, we re-throw it and let the implementation deal with it. It was the invoker's job to catch specific exception types so if it failed to catch this one, we can only assume that the intention is to let it wander off[2].

Whatever the outcome of the function call, we signal that it has finished by calling store_.call_finished().notify_all();.

All that remains now is to override the abstract virtual functions inherited from the future_worker base class. boost::bind() to the rescue again!

// ...

private:
    void start_impl()
    {
        boost::thread(
            boost::bind(&call_future_worker<Invoker, ValueType>::thread_body, boost::ref(*this))
        );
    }

    void wait_impl() { store_.wait(); }

    result_type value_impl() { return store_.result(); }

Next time

So now we can create our own futures that are implemented in such a way as to wait for an arbitrary function call. Seemingly, it's only a small step to implement the final async::call() overloads, again using boost::bind and also the boost preprocessor library, but I'm going to save that for another post because there's some subtleties involved in the copying of the arguments that I'd like to refine before presenting the final functions.

I'll try to post some complete example code next time, too!

EDIT 26/7/2007: The next article in this series can be read here.

Footnotes
  1. a functor that takes 0 arguments []
  2. it is trivial to implement an invoker object that has a try/catch(...) block. Indeed the layered_cage<> facility implemented in the previous post caters for this nicely []

Comments

(optional)
(optional)
(required, hint)

Links can be added like [this one -> http://www.mr-edd.co.uk], to my homepage.
Phrases and blocks of code can be enclosed in {{{triple braces}}}.
Any HTML markup will be escaped.