Concurrent client request batching.

May 1, 2016 at 2:17 AM
Hello,

I am working on a project that parses many thousands of rest end-points. The services allow 20 concurrent requests (150 r / s). I can get and parse the end-points serially, but it is of course too slow.

I experimented with a vector of tasks, using pplx::when_all, but the requests do not complete. The tasks are launched while I am still filling the vector of tasks (I do not know if this is an issue).

After some research, I thought concurrency::task_group would fit the bill, but it isn't available in pplx.

My question is, with pplx, what would you recommend to launch batches of client requests tasks, to download tons of json data? Is there a mechanism to wait on a given number of concurrent tasks?

Here is some code, it is a prototype so I am not really bothered with details currently (like checking the json validity). get_values(...) is the client request task.
std::vector<pplx::task<void>> tasks;

for (auto& x : *out_data) {
    tasks.emplace_back(
        pplx::create_task([this, &x] {
            get_values(x.url)
            .then([&x](pplx::task<json::value> rawData) {
                try {
                    json::object obj = rawData.get().as_object();

                    json::object sov = obj[U("sovereignty")].as_object();
                    x.alliance = sov[U("name")].as_string();
                    x.security_class = obj[U("securityClass")].as_string();
                    json::object pos = obj[U("position")].as_object();
                    x.y = pos[U("y")].as_double();
                    x.x = pos[U("x")].as_double();
                    x.z = pos[U("z")].as_double();

                    std::cout << x;
                } catch (const json::json_exception& e) {
                    std::cout << "Parsing JSON failed - "
                            << e.what() << std::endl;
                }
            });
        })
    );
}

// Wait for all tasks to finish.
pplx::when_all(begin(tasks), end(tasks))
.then([=]() {
    for (const auto& x : *out_data) {
        std::cout << x;
    }
}).wait();

// concurrency::task_group task_group;
// for (const auto& x : tasks) {
//  task_group.run(x);
// }
// task_group.wait();