Consumer / producer upload

Jun 25, 2015 at 5:59 PM
I can't find a better way to write that title, but I figure it's probably going to be pretty ambiguous, so let me explain this one.

Basically, I want to experiment with implementing a producer/consumer pattern where the producer produces some data which the consumer (in this case, upload) then uploads to a server. But what makes this special is that I want to do this upload in one chunk. So the uploading thread needs to wait until there's data, upload it, wait again, and so on, while the producer produces the data to upload.

Now, as for details, I'm still uploading to youtube and youtube requires the full length of the data that's uploaded (which is not known in advance!) in order to resume. So doing a resumable upload where I transfer some content, do a resume and transfer some more content until done is not possible, as far as I understand.

So that leaves me the question if it's somehow possible to achieve this in some other way? Pesudo code of what I'm trying to do:

Producer:
auto Data = ProduceData();
Buffer.Put(Data);

Consumer:
auto Connection = EstablishConnection();
while (Buffer.DataIsAvailable()) // Wait until producer has produced all data
{
    Buffer.Wait(); // Wait for data
    auto Data = Buffer.Get();
    Connection.Send(Data); // Send data, but do not tear down connection. Let connection remain idle while waiting for data.
}
Specifically, I'm trying to encode a video and uploading finished chunks of the entire encode to youtube. I don't know the final size of the encode in advance, and I don't want to buffer the entire encode in memory or write it to file and upload when finished. I want to upload data as it becomes available.

Does Casablanca support such an approach?
Jun 25, 2015 at 6:10 PM
Jun 25, 2015 at 10:42 PM
The producer consumer stream apparently does not derive from istream, which is what http_response::set_body takes as input argument, so I'm not sure how this would fit together in the puzzle.
Jun 26, 2015 at 12:08 AM
Edited Jun 26, 2015 at 12:08 AM
It's somewhat convoluted, but the producer consumer stream has an api create_istream() which makes an istream object. This function is inherited from streambuf<_CharType>.

If you expand "Public Member Functions Inherited From..." on the above reference doc page, it will be listed.
Jun 28, 2015 at 8:10 PM
Alright, thanks.
I'm going to give it a shot when I've finished up to the point that I can work on the upload.
Jul 2, 2015 at 10:00 PM
So I'm starting trials on using this right now, but I can't quite figure out how to get it working. Then doesn't appear to be any documentation on the "approach," so to speak. This if what I've tried so far:
const size_t BufSize = 4 * 1024 * 1024;

void MyReadFile(pplx::streams::producer_consumer_buffer<uint8_t>& Buf)
{
    std::ifstream InFile(LR"(E:\Video\Recorded\Neptunia Sisters Generation\Test.mkv)", std::ios_base::binary);
    std::vector<uint8_t> FileBuf(BufSize);
    
    while ( InFile.read((char*)&FileBuf[0], FileBuf.size()) )
        Buf.putn_nocopy(&FileBuf[0], FileBuf.size()).wait();

    Buf.close(); // ???
}

int main()
{
    namespace ggl = Network::Google;
    ggl::XYoutubeManager Youtube(ggl::XYoutubeManager::CreateDefaultConnection());
    pplx::streams::producer_consumer_buffer<uint8_t> InBuf(BufSize);
    std::thread(&MyReadFile, std::ref(InBuf)).detach();
    Youtube.Upload(InBuf.create_istream(),
                   u8("Test"), u8("Test"), {}, [](double) {});
}
I'm getting an exception that the stream can't be read. I presume it is because the writing thread closes the stream, but unless I somehow close the stream, how does the reader know that the stream is EOF? This code is tested for file upload and works just fine, so that part shouldn't be a problem.

Is my approach right?
Jul 2, 2015 at 11:09 PM
Hi Essentia,

The problem is you closed both the input and output (or read/write) head of the producer consumer buffer. If you just closed the write head then the http_client or anyone else will be able to successfully still read from the stream. In your MyReadFile function instead of calling Buf.close() you should call Buf.close(std::ios_base::out).

Also I'm not sure if you have removed some of your code in your post for brevity but what you have right now is not going to very efficient. Why are you reading into the temporary std::vector, it would be more efficient to directly read from the file stream into the producer consumer buffer. Or even better is just pass the file stream directly to the http_client request.

Steve
Jul 2, 2015 at 11:13 PM
Still trying to get my head around the actual interface and how to use it, but even so, in the code I expect to use this, I'm not sure I can write directly into the buffer due to requirements, etc. I'm not sure if it's possible for the class to use a provided buffer (and without trying to free it)?

I'm reading manually from the file just to test the consumer/producer problem as the idea is that I take the file input, encode it, and upload it without writing a temporary file first.
Jul 2, 2015 at 11:16 PM
I see yes if you need to perform some sort of transformation on the data yourself then you will have to read it into memory... Let us know if you have any other questions.

Steve
Jul 2, 2015 at 11:28 PM
Alright, so this works! It's like magic.

Thanks for that. Would have a little bit better if I could provide the buffer to use myself, though, as I don't have the option to actually use an already allocated buffer (unless I can provide an allocator?). Silly libraries tend to have silly requirements, barring me from doing using a buffer allocated through new. I found that out the hard way.
Jul 2, 2015 at 11:38 PM
Glad to hear you have it working.

I came up with another idea. With the stream buffers in our library you can ask them to allocate a contiguous block of storage that you can write directly to. Then once you are finished writing you commit the memory back to the stream buffer. Basically just call alloc(...) with the maximum amount of memory you might want to write, write to the returned pointer, then call commit(...) indicated the actual number of characters written.

Steve
Jul 2, 2015 at 11:44 PM
How is this going to improve things?
Jul 2, 2015 at 11:46 PM
I should have explained more, this is one possible way to avoid a copy being made into the vector in your code snippet.

Steve
Jul 2, 2015 at 11:56 PM
Edited Jul 2, 2015 at 11:57 PM
Here's the thing. The final step in the process before uploading is to mux audio and video in a container. This container data is of course buffered in memory.

The library I'm using that does this operation REQUIRES that the memory buffer is allocated using a special function. The library simply does not support writing into any memory buffer, so I simply MUST allocate this buffer myself. I can't let Casablanca do it unless I can specify an allocator.

In simple terms, it's something like this:
class XAvlibBuffer
{
    XAvlibBuffer(const u8& OutputFilename):
        // Buffer MUST be allocated with av_malloc.
        m_Buffer((uint8_t*)av_malloc(BufferSize))
    {
        m_Context = avio_alloc_context(m_Buffer, (int)BufferSize, AVIO_FLAG_WRITE,
                this, nullptr, &XAvlibBuffer::Write, &XAvlibBuffer::Seek);
    }

    ~XAvlibBuffer()
    {
        av_free(m_Buffer);
    }

    AVIOContext* GetContext() const { return m_Context; }

    static int Write(void* This_, uint8_t* SrcBuffer, int SrcBufSize)
    {
        // Here I can upload the data. SrcBuffer is actually This_->m_Buffer.
    }

    AVIOContext* m_Context = nullptr;
    uint8_t* m_Buffer = nullptr;
    ggl::XYoutubeManager m_Youtube{ ggl::XYoutubeManager::CreateDefaultConnection() };
};

// This library only supports writing to a AVIOContext which has an associated buffer allocated with av_malloc.
m_i->m_Context->pb = m_i->Buf.GetContext();

 // Write data to container buffer; actual data is written to m_i->Buf.m_Buffer.
PrintError(av_write_frame(m_i->m_Context, &Packet));
I hope that makes sense as to why I can't avoid the temporary buffer unless I can specify an allocator or provide the buffer myself.
Jul 3, 2015 at 12:01 AM
Yes I understand. What you could do then instead of using a producer_consumer buffer you could construct a stream buffer from the memory you've received from that other library using the rawptr_buffer. Basically you would be creating a stream buffer for reading only from a raw pointer and a size.

Steve
Jul 3, 2015 at 12:11 AM
I see.
But that leaves questions. I did a quick try:

const size_t BufSize = 4 * 1024 * 1024;
std::vector<uint8_t> FileBuf(BufSize);

void MyReadFile(pplx::streams::rawptr_buffer<uint8_t>& Buf)
{
    Sleep(5000);
    std::cout << "Starting write operation...\n";
    std::ifstream InFile(LR"(E:\Video\Recorded\Neptunia Sisters Generation\Test.mkv)", std::ios_base::binary);
    
    while ( InFile.read((char*)&FileBuf[0], FileBuf.size()) )
    {
        Buf.putn_nocopy(&FileBuf[0], FileBuf.size()).wait();
        Sleep(1000);
    }

    Buf.close(std::ios_base::out); // ???
    std::cout << "Reading thread done!\n";
}

int main()
{
    //XVideoEncoder Encoder( stf2::VerifyAbsolutePath(u8{ R"(E:\Video\Recorded\Neptunia Sisters Generation\TestIn.avs)" }) );
    //Encoder.Start();

    namespace ggl = Network::Google;
    ggl::XYoutubeManager Youtube(ggl::XYoutubeManager::CreateDefaultConnection());
    pplx::streams::rawptr_buffer<uint8_t> InBuf(&FileBuf[0], FileBuf.size(), std::ios_base::in);
    std::thread(&MyReadFile, std::ref(InBuf)).detach();

    std::cout << "Uploading...\n";
    Youtube.Upload(InBuf.create_istream(),
                   u8("Test"), u8("Test"), {}, [](double) {});
    std::cout << "Done!\n";
}
But it finishes immediately, so my guess is that the consumer doesn't know when there's data available. So how do I signal it when there's data available and when there is no more data?

(I realize there's a race condition here, but this is test code.)
Jul 3, 2015 at 12:20 AM
I think there is some confusion here on either my part or yours. Without seeing the rest of your code I'm not exactly sure. I just wanted to let you know that if you have memory all set that is ready to be sent out in an HTTP request that you can construct a stream buffer based off of the raw memory.

Steve
Jul 3, 2015 at 12:28 AM
Oh, I see.
Yeah, no, the data is uploaded in chunks.

It goes like this:
Initialize decoders.
Initialize encoder.
Initialize muxer. Muxer sends an upload request to youtube.

Decode frame and audio samples.
Prepare frame for encoding.
Encode frame.
Mux frame and audio samples into container stored in memory buffer.
Upload memory buffer to youtube.
Wait for next frame, etc.

Again, the idea is to upload data as it becomes available. The data is not available in advance. It is uploaded as it becomes available.
Hence it's a producer consumer problem. Producer (encoder) produces frames. Consumer (muxer) uploads frame to youtube.
This is a SINGLE video, so there is only ONE http request being sent.

I could post the code, but eh, you know, trying to actually put oneself into code that others wrote is not always the easiest or more fun option.

This example code I'm trying is trying to emulate this behavior.
To summarize:
Data is available in chunks.
Each chunks is part of the same video.
Only ONE http request is sent.
All data is not available before the http request is sent.
Jul 3, 2015 at 12:30 AM

Understood, then sticking with the producer consumer buffer is probably what you want.

Steve

Jul 3, 2015 at 4:48 AM
Thanks for the help!

I have managed to successfully implement this with directly uploading to youtube instead of writing a temporary file!