Main Content

parallel.pool.PollableDataQueue

Send and poll data between client and workers

    Description

    A PollableDataQueue object enables synchronous sending and polling for data or messages between the client and workers in a parallel pool during a computation. For example, from a worker, you can send intermediate values to the client or another worker and use the values in another computation. You can also:

    • Send data from the client or any worker.

    • Create a PollableDataQueue object that allows only the client or worker that created the queue to receive data.

    • Create a PollableDataQueue object that allows the client or any worker in the pool to receive data. (since R2025a)

    Unlike all other handle objects, PollableDataQueue and DataQueue objects remain connected when you transfer them.

    Creation

    Description

    q = parallel.pool.PollableDataQueue creates a PollableDataQueue object that you can use to send and poll for data between the client and workers. The resulting PollableDataQueue object can be polled only by the client or worker that creates it. Create the PollableDataQueue on the worker or client where you want to receive the data.

    example

    q = parallel.pool.PollableDataQueue(Destination=destination) sets the destination behavior of the PollableDataQueue object. (since R2025a)

    If you want the client or any worker to be able to poll the PollableDataQueue object to receive data, set Destination="any".

    example

    Input Arguments

    expand all

    Since R2025a

    Destination behavior of the queue, specified as one of these values:

    • "creator" — Allows only the client or worker that creates the queue to poll the queue and receive data. Any data sent to the queue is immediately sent to the client or worker that creates the queue.

    • "any" — Allows the client or any worker in the parallel pool to poll the queue to receive data. The data waits in the queue and is sent to whichever client or worker polls the queue, making that client or worker the destination for the specific data.

    Properties

    expand all

    Since R2025a

    This property is read-only after you close the queue using the close object function.

    Queue closure state, represented as one of these values:

    • false — The queue is not closed and you can send data to the queue.

    • true — The queue is closed and you cannot send data to the queue. Any attempt to send data to the queue results in an error. You can continue to poll the queue for data. You cannot reopen a closed queue.

    Data Types: logical

    This property is read-only.

    The number of items of data currently held in the queue that a worker or the client can potentially poll to receive, represented as zero or a positive integer.

    The destination behavior of the queue, set using the Destination name-value argument, determines the QueueLength property value:

    • If you create a PollableDataQueue object without setting the Destination argument, or if you set Destination to "creator", the QueueLength is 0 or a positive integer on the worker or client that creates the PollableDataQueue object.

      • If the client creates the PollableDataQueue object, the value is 0 on all workers.

      • If a worker creates the PollableDataQueue, the value is 0 on the client and all other workers.

    • If you set Destination to "any", the value is 0 or a positive integer on the client and all workers.

    Before R2025a: The QueueLength property value is 0 or a positive integer on the worker or client that creates the PollableDataQueue object. If the client creates the PollableDataQueue object, the value is 0 on all workers. If a worker creates the PollableDataQueue, the value is 0 on the client and all other workers.

    Object Functions

    closeClose pollable data queue
    poll Retrieve data sent to pollable data queue
    sendSend data between clients and workers using a data queue

    Examples

    collapse all

    Create a PollableDataQueue object.

    p = parallel.pool.PollableDataQueue;
    

    Run a parfor-loop, and send a message, such as data with the value 1.

    parfor idx = 1
        send(p,idx); 
    end
    

    Poll for the result.

    poll(p)
    1
    

    For more details on polling for data using a PollableDataQueue object, see poll.

    Since R2025a

    Use a PollableDataQueue object with Destination set to "any" to send messages from the client to multiple workers in a parallel pool.

    Start a pool of four thread workers.

    numWorkers = 4;
    pool = parpool("Threads",numWorkers);
    Starting parallel pool (parpool) using the 'Threads' profile ...
    Connected to parallel pool with 4 workers.
    

    Create two PollableDataQueue objects, a queue named workerPdq (you create by setting Destination to "any") to send messages to workers, and a queue named clientPdq to receive messages back from the workers.

    workerPdq = parallel.pool.PollableDataQueue(Destination="any");
    clientPdq = parallel.pool.PollableDataQueue;

    Use parfevalOnAll to execute the analyzeMessage helper function on all workers. Pass the workerPdq and clientPdq queues as arguments to the function.

    parfevalOnAll(@analyzeMessage,0,workerPdq,clientPdq);

    Send a personalized message to each worker through the workerPdq queue.

    for idx = 1:numWorkers
        send(workerPdq,compose("Hello, Worker %d!",idx));
    end

    Poll the clientPdq queue to receive messages from the workers. Use inf to wait indefinitely for each message.

    for idx = 1:numWorkers
        poll(clientPdq,inf)
    end
    ans = 
    "Worker 1 received message!"
    
    ans = 
    "Worker 2 received message!"
    
    ans = 
    "Worker 3 received message!"
    
    ans = 
    "Worker 4 received message!"
    

    Define the helper function analyzeMessage that each worker executes. The function polls the inQueue queue for a message and extracts the worker number. The function then sends a confirmation message back to the outQueue queue.

    function analyzeMessage(inQueue,outQueue)
        message = poll(inQueue,2);
        workerNum = sscanf(message,"Hello, Worker %u");
        send(outQueue,compose("Worker %d received message!",workerNum));
        pause(2)
    end

    When you send a message to a PollableDataQueue object, the message waits in the queue. Each message adds 1 to the queue length. When you use poll, one message is collected from the queue. In this example, you use the QueueLength property to find the length of a PollableDataQueue object and observe how the Destination argument affects it.

    First, create a parallel pool with one worker.

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    Create a PollableDataQueue object. By default, the parallel.pool.PollableDataQueue function creates a PollableDataQueue object with the Destination set to "creator". This type of PollableDataQueue object allows only the client or worker that creates the queue to poll the object for data.

    queue = parallel.pool.PollableDataQueue
    queue = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    Initially, the queue is empty. Check the queue length on the client and the worker. The QueueLength property value is 0 for both the client and the worker.

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    Next, send a message to the queue from the worker. Then, use the QueueLength property to find the length of the queue. With Destination set to "creator", the QueueLength property value is 1 on the client (which created the queue) and 0 on the worker.

    parfor idx = 1
        send(queue,"A message");
    end
    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    Use poll to retrieve the message from the queue.

    msg = poll(queue)
    msg = 
    "A message"
    

    Check the length of the queue again. The QueueLength property value is now 0 because you have removed a message.

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    

    Create a PollableDataQueue object with Destination set to "any". This command creates a PollableDataQueue object that the client or any worker in the pool can poll to receive data.

    queueAny = parallel.pool.PollableDataQueue(Destination="any")
    queueAny = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    Send a message to this queue.

    parfor idx = 1
        send(queueAny,"Another message");
    end

    Check the queue length. With Destination set to "any", both the client and the worker show a QueueLength property value of 1, demonstrating that the client or worker can poll the queue to receive data.

    fprintf("Queue length on the client: %i\n", queueAny.QueueLength);
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 1
    

    Finally, retrieve the message from the queue and check the queue length. The QueueLength property value is 0 because the queue processing is complete.

    msg = poll(queueAny)
    msg = 
    "Another message"
    
    fprintf("Queue length o the client: %i\n",queueAny.QueueLength);
    Queue length o the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 0
    

    Tips

    • You can only manually retrieve data or messages sent using a PollableDataQueue object. To automatically process data after it is received on the client, use a parallel.pool.DataQueue object to send the data instead.

    • Before R2025a: To send data from a parallel pool worker back to the client, first create a PollableDataQueue object at the client. Pass this PollableDataQueue object in a parfor-loop or other parallel language construct, such as parfeval. From the workers, call send to send data back to the client. At the client, use poll to retrieve the result of a message or data sent from a worker.

    • Before R2025a: To send data from the client to the workers, create the queue on the workers and send it back to the client. For an example of this workflow, see Receive Communication on Workers.

    • Before R2023b: You cannot send data from one worker to another. To transfer data between workers, use spmd, spmdSend, or spmdReceive instead.

    Extended Capabilities

    expand all

    Version History

    Introduced in R2017a

    expand all