help with task cancellation

Aug 14, 2014 at 9:38 PM
I'm using v2.1 on Windows 7/Server 2008. I need to start a number of REST tasks that can run in parallel, but the consumer of those tasks' output can only process the output of one task at a time. My first design attempt puts the the REST tasks in a vector. The consumer periodically checks task.is_done() on the tasks. It consumes the output of any that have completed and removes them from the vector, repeating until the vector is empty.

This works fine unless a task throws an exception. When this happens the exception exits consuming function and is handled further up the call stack. This causes any remaining tasks to be destructed, which I think is where my problem lies. Currently, the exception is caught and handled, but something (probably _REPORT_PPLTASK_UNOBSERVED_EXCEPTION()) is ending the program.

I believe I need to cancel the remaining tasks before I allow them to be destructed. I can give them a cancellation_token, but there may be six or more still running. Triggering and handling all those task_canceled exceptions would be messy.

This suggests that I'm not doing this right. What is the proper way to wait for and sequentially consume the output of a number of tasks?
Coordinator
Aug 15, 2014 at 12:38 AM
Hi evanb,

The absolute easiest way to do this will be to add a big lock + continuation to each request like
// Important detail: Don't let BKL get destroyed before all the tasks finish
std::mutex BKL;

request_task.then([&BKL](pplx::task<foo> t) {
    try {
        auto x = t.get();

        std::lock_guard<std::mutex> lock(BKL);
        // Use X under the critical section
    } catch (...) {
        // Handle error
    }
});
This removes the need for the vector of tasks, ensures that they're all waited on, and performs the (successful) continuations sequentially. If you need to do something after they're all done, you could stuff all of these continuations into a vector and use pplx::when_all(v.begin(), v.end()).then(...).

However, if your "use X" section takes a while (either because it's a big computation or it does some blocking thing), you could be tying up a lot of threads. In this case, there are several (more complex!) alternatives.

Finally, let me link you some good documentation around this: http://msdn.microsoft.com/en-us/library/dd492427.aspx#continuations.

Let us know if this works for you!
roschuma
Marked as answer by roschuma on 8/21/2014 at 3:09 PM
Aug 15, 2014 at 1:27 AM
roschuma:

I appreciate your reply. I'm glad the idea of brute-force serialization (using the mutex) came from someone other than me. :)> I've considered it.

However, I think there is a better answer for me in the link you provided. The Composing Tasks section offers the observe_all_exceptions() function, which in as many words is exactly what I need to do. I think I will be able to cancel all pending tasks through a cancellation_token then use that function to clean up the ensuing exceptions before returning. It will also handle any other exceptions thrown by the remaining tasks as I'm setting up to exit with the first exception.

Thanks again for the link. It's the single most informative page I've seen in the PPL documentation so far.

I'll post again to wrap up once I've determined that I've solved my problem.

-evan
Aug 20, 2014 at 4:33 AM
The observe_all_exceptions() function does the job. In the catch handler that catches the "real" exception I call cancellation_token_source::cancel() to kill the remaining tasks, consume all their task_canceled exceptions with observe_all_exceptions(), then (in my case) rethrow the real exception.

-evan
Aug 20, 2014 at 9:05 AM
@evanb, can you post some code? Should be useful to others.
Aug 21, 2014 at 9:44 PM
Here is a condensation of what I'm doing.

observe_all_exceptions() is lifted directly from here. get_vector_of_tasks() is a stand-in for the actual much-more-complex source of the vector<tasks_t> so don't take it too seriously. The code following it, though, is pretty much verbatim from the application, minus extraneous stuff that doesn't pertain to the use of observe_all_exceptions(). Both that code and the real get_vector_of_tasks() are class member functions in the actual code.

The key requirement is that all tasks returned by get_vector_of_tasks() be created with cancel_token, and that they check and respond to it reasonably quickly during operation. Otherwise, observe_all_exceptions() can take a while.
typedef std::vector<pplx::task<web::json::value>> tasks_t;
//...
template<class T, class InIt>
void observe_all_exceptions(InIt first, InIt last)
{
    std::for_each(first, last, [](pplx::task<T> t)
    {
        t.then([](pplx::task<T> previousTask)
        {
            try { previousTask.get(); }
            catch (std::exception const &x) {
                log_debug("observe_all_exceptions: task ends with exception [%s]", x.what());
            }
            catch (...) {
                log_debug("observe_all_exceptions: task ends with exception [catchall exception]");
            }
        });
    });
}

tasks_t get_vector_of_tasks(concurrency::cancellation_token cancel_token)
{
    using namespace web::http;
    
    client::http_client client(U("http://some.web.site"));
    tasks_t rtn;
    for (int i = 0; i < 10; ++i)
    {
        http_request request(methods::GET);
        http_headers &headers = request.headers();
        headers.add(header_names::accept, U("application/json"));

        web::uri uri;
        //... make uri unique
        
        auto task = client.request(request, cancel_token).then(
        [uri](http_response response) -> pplx::task<web::json::value>
        {
            status_code status = response.status_code();
            if (response.status_code() >= 400)  //one possible source of exceptions
            {
                utility::ostringstream_t msg;
                msg << U("URI: [") << uri.to_string() << U("] ")
                    << U("response: [") << response.status_code()
                    << U(' ') << response.reason_phrase()
                    << U(' ') << response.extract_string().get().substr(0, 128) << U(']');
                throw http_exception(msg.str());
            }   //if
            return response.extract_json();
        });
        rtn.push_back(task);
    }   //for
    return rtn;
}

concurrency::cancellation_token_source cts;
tasks_t tasks;
try {

tasks = get_vector_of_tasks(cts.get_token()).get();

while (!tasks.empty())
{
    auto sep = std::partition(std::begin(tasks), std::end(tasks),
        [](pplx::task<web::json::value> task){ return !task.is_done(); });
    if (0 == std::distance(sep, std::end(tasks)))
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        continue;
    }   //if
    for (auto task_it = sep; task_it != std::end(tasks); ++task_it)
    {
        auto const &task = *task_it;
        auto json = task.get();
        auto result_json = json[U("result")].as_array();
        for (auto result : result_json)
        {
            //... non-thread-safe consumer of result is called here
        }   //for
    }   //for
    tasks.erase(sep, tasks.end());
}   //while

}
catch (...)
{
    cts.cancel();   //tasks must timely pay attention to this
    observe_all_exceptions<web::json::value>(std::begin(tasks), std::end(tasks));
    throw;
}
-evan
Marked as answer by roschuma on 8/21/2014 at 3:09 PM
Aug 26, 2014 at 9:24 PM
I made a small improvement in the code from my previous post. I use pplx::wait_any() to wait for completed tasks instead of the original's sleep/continue. The while() loop is now:
while (!tasks.empty())
{
    auto json = pplx::when_any(std::begin(tasks), std::end(tasks)).then(
    [&tasks](std::pair<web::json::value, size_t> result) -> web::json::value
    {
        tasks.erase(std::begin(tasks) + result.second);
        return result.first;
    }).get();
    auto result_json = json[U("result")].as_array();
    for (auto result : result_json)
    {
        //... non-thread-safe consumer of result is called here
    }   //for
}   //while
I chose to call get() on the task returned by when_any() instead of putting the consumer loop in a continuation task with then() for two reasons. One, the loop potentially takes a significant amount of time and I try to keep long-running tasks off thread pool threads. Two, in my case, this code is in a DLL, and the consumption of result calls back to the main executable. The main thread is the caller's thread, and making the callback on that thread reduces the opportunity for thread-related problems on the caller's end.

-evan
Aug 27, 2014 at 8:33 AM
Thanks for the code. Cheers.