How to implement a producer consumer pattern with multiple consumers and a common queue?

15 vues (au cours des 30 derniers jours)
TLDR:
How do I setup a producer-consumer-pattern with one producer and multiple consumers receiving items from a common queue? Consumers don't have to communicate with each other nor send any information to the client. (But returning some status information might be nice in the future).
Long version:
My function takes several minutes (up to hours) to complete. In its current implementation it doesn't return anything but writes the results to disk. I want to process many (~10k) inputs. So this takes a while, but I have a remote machine with 24 cores.
I've been after this for a long time, never reaching a final solution. The obvious way to go is to use a parfor-loop. But since it is blocking, I cannot submit new inputs while the loop is running. Also, I cannot cancel the execution, change the order of upcoming calls, etc.
At first, I was pleased with a solution using parfeval:
% Pool is initialized
pool = parpool("IdleTimeout", inf);
% Settings are received ...
n_settings = numel(settings);
futures(n_settings) = parallel.FevalFuture;
for i_set = 1:n_settings
futures(i_set) = parfeval(...
pool, @my_slow_function, 0, settings(i_set));
end
In the real code this is packed into a bunch of methods having access to the ever running pool. This would be a nice starting point. But: Those futures tend to occupy a lot of RAM and they are created all at once. After while I run out of memory. I guess due to finished futures, I do not need anymore (since the result was saved on disk).
I would very much like to use a producer consumer pattern. Like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
end
function consumer(queue_to_consumer)
while true
setting = queue_to_consumer.poll()
% Some checking for escaping the loop.
my_slow_function(setting);
end
end
queue = parallel.pool.PollableDataQueue;
n_consumer = 24
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)
Please, don't hint me at some syntax problems here. It's just to get my idea across. It won't work since the queue wasn't created on the worker. I can create a queue on each worker and send it back to the client. But then I end up with 24 queues, not one.
I read through the documentation several times and consulted with AI. Both without success. Did I miss something? Maybe a human has an idea.

Réponses (1)

Thomas Falch
Thomas Falch le 5 Fév 2025
Modifié(e) : Thomas Falch le 15 Mai 2025
Starting from R2025a you can use the new "any-destination" PollableDataQueue to solve this problem. The any-destination PollableDataQueue makes it much easier to send data from the client to a worker, or from one worker to another.
With the new any-destination PollableDataQueue, your producer consumer pattern can be written like this:
function producer(queue_to_consumer, settings)
for setting = settings
queue_to_consumer.send(setting);
end
% Close queue to make all consumers stop
queue_to_consumer.close()
end
function consumer(queue_to_consumer)
while true
% Keep polling until producer closes queue
[setting, didReceive] = queue_to_consumer.poll(Inf)
if ~didReceive
break
end
my_slow_function(setting);
end
end
pool = parpool()
n_consumer = pool.NumWorkers - 1;
queue = parallel.pool.PollableDataQueue(Destination="any");
for i = 1:n_consumer
consumers(i) = parfeval(@consumer, 0, queue);
end
producer(queue, settings)
  1 commentaire
Matthias Hörr
Matthias Hörr le 15 Mai 2025
Thanks Thomas,
I will try that as soon as I gain access to 2025a and mark this as solved if it works as expected.

Connectez-vous pour commenter.

Catégories

En savoir plus sur Parallel for-Loops (parfor) dans Help Center et File Exchange

Produits


Version

R2024a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by