Thread safety of tasks?

Nov 13, 2014 at 11:30 PM
Edited Nov 13, 2014 at 11:31 PM
Consider the following pattern:

We have a timer that fires every N seconds. The purpose of this timer is to execute a task (A) that generates new task (B). B's purpose is to calculate some new data, but may take a long time to complete. A stores the task B into some internal state of the class.

Now a task C will be called by the caller of the class and will need the result calculated by task B. Therefore, it waits on task B by doing a continuation. Now, we can see that we have two tasks that can run in parallel with each other and both can modify or access the task B variable stored in the class.

So the question is this: is it safe to do this? What are the semantics? What if task B assigns a new task to the member variable at the same time that task C tries to wait on it? What if C is already waiting on it when it happens?

In pseudo code, it might look like something like:
void TaskA()
{
    // Run every N seconds
    m_TaskB = pplx::create_task(&TaskB); // TaskA assigns m_TaskB
}

void TaskB()
{
    // Do some stuff
    m_SomeData = /* some data */;
}

void TaskC()
{
    m_TaskB.then([this](pplx::task<void> t) // TaskC waits on m_TaskB.
    {
        t.get(); // Force throw an exception if previous task failed. All callers who are waiting on this task will be fed this exception.
        return m_SomeData;
    });
}
TaskB does not return any data because it mutates class variables.
Coordinator
Nov 14, 2014 at 3:21 AM
Hi Essentia,

In general any tasks that are allowed to execute in parallel you should expect that they could.

Looking at the example you are explaining I'm having trouble understanding how it could work, even without the task portions. Who is calling the functions TaskA and TaskC? If you call the function TaskC before TaskA then you are going to be accessing a task that hasn't been initialized yet, this will cause problems. Fundamentally you can't hook up a continuation to a task before it is created.

Is it possible to maybe do something like hook up the continuation to m_TaskB inside your TaskB() function?

Steve
Nov 14, 2014 at 10:17 AM
So let me try to ask in other words, then.

Assume that there is a server to which we can send API requests. Of course, the server requires authentication first, and after authentication, it returns a token that must be sent in all subsequent calls.

Now let's assume we have a class that models this. It allows the client to do API calls to the server while hiding the complexity of network latency and authentication.

In the constructor of this class, we create a timer and then we start task B and assign it to m_TaskB, e.g.:
Class::Class()
{
    m_Timer.setup(/*...*/);
    m_TaskB = pplx::create_task(&Class::TaskB, this);
}
Let's ignore why the timer is there for now. Task B's job is to authenticate with the server and retrieve the token. That's all.
Now, we can't return a task from the constructor, so the client must assume that the class is ready for use after the constructor call, so let's say the client calls DoX to do some API request.

In DoX, we first need to get our access token and then we can send our actual API request. Since the function shouldn't block (and because we may not actually have our access token yet), we will return a task that signals when the operation is finished, so it would look something like:
pplx::task<void> Class::DoX()
{
    return GetAccessToken().then([this](pplx::task<u8> AccessTokenTask)
    {
        auto AccessToken = AccessTokenTask.get();
        // Send network request here
    });
}
I'm assuming that u8 is a type for a UTF-8 formatted std::string here. Great, now what about the implementation of GetAccessToken? It would need to return a task whose value would be the access token. And to get that, we first need to ensure that Task B has finished executing and so have acquired an access token, so it would look something like:
pplx::task<u8> Class::GetAccessToken()
{
    return m_TaskB.then([this](pplx::task<void> GetAccessTokenTask)
    {
        GetAccessTokenTask.get(); // Just to ensure any error propagates to the caller of the class (i.e. the one who called GetX).
        return m_AccessToken; // Task B will store the access token here.
    });
}
Let's assume that Task B looks something like:
pplx::task<void> Class::TaskB()
{
    m_Client.Send(/*...*/).then([this](pplx::task<web::http_response> ResponseTask)
    {
        m_AccessToken = ResponseTask.get();
    });
}
Now there's just one thing left. We have to consider the fact that the access token won't last forever. It needs to be periodically refreshed. There's two ways to model this. The first one is in GetAccessToken, where we detected that the access token has expired and thus start up Task B again to get a new access token:
pplx::task<u8> Class::GetAccessToken()
{
    if (HasAccessTokenExpired())
        m_TaskB = pplx::create_task(&Class::TaskB, this);
    return m_TaskB.then([this](pplx::task<void> GetAccessTokenTask)
    {
        GetAccessTokenTask.get(); // Just to ensure any error propagates to the caller of the class (i.e. the one who called GetX).
        return m_AccessToken; // Task B will store the access token here.
    });
}
This is not very ideal because it will add extra latency to get a new token on demand. So ideally we'd simply like to periodically refresh the token in the background so that when the client calls DoX, we guarantee (or guarantee as much as we can; it's not going to be 100%) that we have a valid access token. So here's where our timer in the constructor comes into play. Every N seconds, we have the timer interrupt. In that interrupt, we restart Task B:
void Class::TimerInterrupt()
{
    m_TaskB = pplx::create_task(&Class::TaskB, this);
}
But now we immediately see problems. The TimerInterrupt can run at any time. GetAccessToken can run at any time and wait on m_TaskB, so we have a race. How do we solve this? Is it safe to assign a new task to m_TaskB while GetAccessToken is waiting on it?

But wait--there's more. In fact, we want multiple threads (or tasks) to be able to use this class concurrently so that multiple API calls may be on-flight to the server at once. The problem is that for some reason, the access token may no longer be valid and so the server returns an error during an API call. In this case, one or perhaps all of the API calls may fail due to the expired token. So they need to refresh it by once again calling GetAccessToken and then redoing the web request. Something like:
web::http_client Client(/*...*/);
web::http_request Request(/*.../*);
return Client.request(Request).then([Client, Request](pplx::task<web::http_response> ResponseTask)
{
    auto Response = ResponseTask.get();
    if (IsNotValidAccessToken(Response))
        return GetAccessToken().then([Client, Request](pplx::task<u8> AccessTokenTask)
        {
            auto AccessToken = AccessTokenTask.get();
            return Client.request(Request);
        }).then([](pplx::task<web::http_response> ResponseTask)
        {
            ResponseTask.get();
        });
    return pplx::create_task([]{}); // Return empty task because request succeeded
});
Then we have to take into account that multiple threads may simultaneously call GetAccessToken and GetAccessToken must not spawn additional tasks to get a new access token. It must done once and only once and only one such request must be in flight. So these additional threads will most likely wait on m_TaskB to know when the access token is ready.

But now we have the scenario that multiple threads are waiting on m_TaskB and a timer interrupt which may assign a new task B to m_TaskB. So what are the consequences with this design? Does it work? Should it be done differently?
Coordinator
Nov 15, 2014 at 2:10 AM
Hi Essentia,

Thanks for the great detail in your response! It certainly helps me understand a lot more about what is going on.

Ok to summarize, many different threads could be running concurrently and all need to potentially get/fresh the latest access token. For example the background refreshing timer, multiple client requests, failed requests needing new token... All of which need to access this shared resource the access token.

If you want to keep only one access token I'd think about encapsulating the retrieving and refreshing entirely in some API like that returns a task containing the access token. Let's call this function GetAccessToken. One option you do is use a lock to safe guard the concurrent access on retrieving and refreshing the access token. This could be done by storing a lock (m_lock), a bool to indicate currently refreshing the token (m_refreshing), a task for actually going and refreshing the token (m_refresh_task), and the token itself (m_token). Consider something like the following pseudocode:
task<token> GetAccessToken()
    take lock m_lock, use RAII
    if token has expired
        if m_refreshing
            return m_refresh_task
        else
            set m_refreshing to true
            start new refreshing task, m_refresh_task, executing code in RefreshTask()
            return m_refresh_task
    else return currently stored token using pplx::task_from_result(token)

RefreshTask()
    do work to get a new token in local token variable
    take lock m_lock, use RAII
    set m_token to local token variable
    set m_refreshing to false
Then whenever anyone needs to get the access token they just call GetAccessToken and hook up a continuation. Does that make sense? I think then you shouldn't have any concurrent access issues. I'm sure there is room for improvement and other solutions but I think this would at least get you started towards something that is functionally correct.

Steve
Nov 15, 2014 at 10:20 AM
Well, it's a step in the right direction, I guess, but it still violates two of my requirements. First off, what if RefreshTask throws an exception or is cancelled? The flag would never be set leading to incorrect result. Secondly, this only refreshes the token when a request to get it is made, which adds latency. That's why I used a task in the first place.

I'll think about the problem a little more. Perhaps it would be better to use two tasks and then swap them upon completion. Sounds like a waste of memory, but that's my inner self being greedy :P A task uses 200+ bytes of memory, but if you create an object but once, then that's nothing!
Coordinator
Nov 17, 2014 at 5:29 PM
Hi Essentia,

Yes I didn't actually write the full code here so there are other things to consider like exceptions. For the refreshing variable I'd use some sort of RAII pattern to make sure it is always set even if the task throws and exception.

For refreshing the token with a timer, I failed to mention but you could easily make the GetAccessToken take a bool or have another function ForceRefresh, which ignores the value of m_refreshing and spawns a new task.

Steve