How to make regularly-scheduled work cancelable

Dec 12, 2014 at 1:48 AM
I know how to do the basics: I can setup a Task<void> to be executed after an arbitrary delay; I can setup a continuation of that task to do my scheduled work; I can setup a continuation of that task to schedule another invocation of this chain at the next scheduled time point.

I'm struggling with how to make this cancelable in a thread-safe manner. Mostly, I'm struggling to reassure myself that I've properly dealt with all the races that occur in trying to do this safely.
#include <pplx/pplx.h>
#include <pplx/pplxtasks.h>
#include <pplx/threadpool.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <mutex>

class MyWork
{
    MyWork(int delay);
    ~MyWork();

    void Start();
    void Cancel();

private:
    int _delay;
    std::mutex _mutex;
    pplx::cancellation_token_source _cts;
    pplx::task_completion_event<void> *_event;
    boost::asio::deadline_timer _timer;
    pplx::task<void> _worktask;

    void TimerHandler(const boost::system::error_code& error);
    void DoWork();
};

MyWork::MyWork(int msec) : _delay(msec), _event(nullptr), _timer(crossplat::threadpool::shared_instance().service()) {}

void MyWork::Start()
{
    std::lock_guard<std::mutex> lock(_mutex);
    delete _event;  // Destroy previous event (already signalled, in general)
    _event = new pplx::task_completion_event<void>();

    _worktask = pplx::task<void>(*_event).then([this](pplx::task<void> t){
        try {
            t.get();    // ?? Or should I just call .wait() ?
            if (pplx::is_task_cancellation_requested()) {
                std::lock_guard<std::mutex> lock(_mutex);
                delete _event;
                _event = nullptr;
                pplx::cancel_current_task();
            } else {
                Start();    // Schedule the next go'round
                DoWork();   // The regularly-scheduled stuff we care about
            }
        }
        catch (...) {
            // Don't call Start()
        }
    }, _cts.get_token());

    // Changing timer expiration cancels all previous async_waiters
    _timer.expires_from_now(boost::posix_time::milliseconds(_delay));
    _timer.async_wait(boost::bind(&MyWork::TimerHandler, this, boost::asio::placeholders::error));
}

void MyWork::Cancel()
{
    std::lock_guard<std::mutex> lock(_mutex);
    _cts.cancel();
    _timer.cancel();
    if (_event) {
        _event->set();
    }
}

void MyWork::TimerHandler(const boost::system::error_code& error)
{
    if (!error) {   // Timer popped; signal the first task in the chain
        _event->set();
    } else {
        // Handle errors
    }
}
Is there any point to calling t.get(), since the "previous task" was just waiting on the task_completion_event, is not itself cancelled, etc?

Other than the exceptions my own DoWork() function might throw, are there exceptions I'd see in the catch(...) block that can actually be handled meaningfully, or should I just rethrow them?

Did I cover all the possible races (again, ignoring for the moment what DoWork() does) that can happen with cancellation-vs-timer?

Jason
Dec 12, 2014 at 6:04 PM
Hi Jason,

There are a lot of questions/comments here so I'll try to see if I address them all.
  1. Why both allocating the task_completion_event on the heap? I think you should just reinitialize it to a new one instead. This reduces some of the complexity of your code and probably improves performance.
  2. In this example I agree with you it probably doesn't matter if you call get()/wait(), it only would if someone was setting an exception on the task_completion_event. To be consistent and get in the practice I still recommend you call get()/wait() here. By the way get() retrieves the value from the completed task, wait() just waits until it is completed. Both will throw an exception if the task has one. Take a look here for more information on task parallelism.
  3. There is no need to call pplx::cancel_current_task(). In this code path the task is already canceled, you checked right above. Just simply return from the task.
  4. Do you care about making sure that multiple calls to DoWork() can't happen in parallel. Because right now depending on timing, how long DoWork() takes, and timing I believe it is possible. Say the first task fires and calls Start() which immediately schedules another timer and task. The first task then calls DoWork(), it is entirely possible that the second task, depending on the delay, could start running as well. Then calling Start() and DoWork(). Again not sure if you care about this, one option would be call DoWork() first and then call Start().
From a high level exactly what do you want to accomplish? If you want to just execute some work on a periodic basis I think there is no need for all the task/task_completion_event code. You could just have DoWork() called when the deadline timer fires, and then schedule a new in the timer callback. Boost.Asio's documentation explains how to do with a timer.

Steve
Dec 12, 2014 at 7:44 PM
Regarding task_completion_event on the heap - the deliberate lack of a "reset" method left me stymied for a bit. Hadn't occurred to me to just re-initialize and let the assignment operator destroy the old one before copying a new one on top.

Regarding the call to cancel_current_task(), this page is pretty clear:
It is important to call cancel_current_task when you respond to cancellation because it transitions the task to the canceled state. If you return early instead of calling cancel_current_task, the operation transitions to the completed state and any value-based continuations are run.
I was trying to code a bit more generally rather than just address the immediate need; if more continuations were chained on, for example, then the doc indicates that I have to propagate cancellation down the chain.

I'm not worried about DoWork() overlapping, but I did think about it. I deliberately put DoWork() and Start() in the order I chose; it's a lazy-man's way of trying to keep the periodicity of events as even as possible without going to the pain of recomputing a new delay each time. (There is at least one comprehensive discussion of this issue elsewhere in this forum.) In my case, the actual task period is on the order of 60-300 seconds, and the elapsed time of DoWork() is under 100msec. Given the scale (number of simultaneous tasks being performed) and durability (runs for weeks or months) I'm looking at, I know I can't actually be lazy; if I want to run a job every 60 seconds for months on end, I'm really going to have to recompute the delay each time (or set an absolute time for the deadline_timer, which is what I'll actually do).

Now that I've gone through this exercise and have all of this stuff tamped into my brain, I understand your final point (don't screw with tasks at all; just use the asio::deadline_timer::async_wait method to cause my DoWork() to be run). There's a small complication due to the cancellation race; if I cancel the timer and get back the answer "Sorry, the timer's already tripped and the work function has been invoked someplace", I can't immediately destroy whatever DoWork() is working on. I think I know how to finesse that.

Some of this was exercise; some of this was an attempt to build a generalizable mechanism I can use elsewhere in the code, later on in development. I appreciate your insights.

Jason
Dec 12, 2014 at 9:21 PM
Hi Jason,

Yes you are correct, I see in the documentation :). I hadn't considered the task would be transitioning to completed instead of cancelled in that case.

I agree about the DoWork() overlapping I was just pointing it out.

I do think you code would be significantly reduced in complexity if you just used the timer. It does mean if you try to cancel the timer there is a possibility that DoWork() has started and you will have to wait for it complete before you can destroy your class.

Steve