Questions tagged [producer-consumer]
The Producer-Consumer Problem (also known as the bounded-buffer problem) is a classical example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue.
1,585
questions
175
votes
13
answers
116k
views
Is Zookeeper a must for Kafka? [closed]
In Kafka, I would like to use only a single broker, single topic and a single partition having one producer and multiple consumers (each consumer getting its own copy of data from the broker). Given ...
82
votes
5
answers
209k
views
How to solve Warning: React does not recognize the X prop on a DOM element
I'm using a thing called react-firebase-js to handle firebase auth, but my understanding of react and of the provider-consumer idea is limited.
I started with a built a very big JSX thing all at ...
73
votes
6
answers
62k
views
Blocking queue and multi-threaded consumer, how to know when to stop
I have a single thread producer which creates some task objects which are then added into an ArrayBlockingQueue (which is of fixed size).
I also start a multi-threaded consumer. This is build as a ...
68
votes
8
answers
131k
views
Producer/Consumer threads using a Queue
I'd like to create some sort of Producer/Consumer threading app. But I'm not sure what the best way to implement a queue between the two.
So I've some up with two ideas (both of which could be ...
46
votes
5
answers
66k
views
How to iterate Queue.Queue items in Python?
Does anyone know a pythonic way of iterating over the elements of a Queue.Queue without removing them from the Queue. I have a producer/consumer-type program where items to be processed are passed by ...
44
votes
7
answers
27k
views
Job queue as SQL table with multiple consumers (PostgreSQL)
I have a typical producer-consumer problem:
Multiple producer applications write job requests to a job-table on a PostgreSQL database.
The job requests have a state field that starts contains QUEUED ...
43
votes
2
answers
15k
views
How does consumer rebalancing work in Kafka?
When a new consumer/brorker is added or goes down, Kafka triggers a rebalance operation. Is Kafka Rebalancing a blocking operation. Are Kafka consumers blocked while a rebalancing operation is in ...
32
votes
7
answers
65k
views
C# producer/consumer
i've recently come across a producer/consumer pattern c# implementation. it's very simple and (for me at least) very elegant.
it seems to have been devised around 2006, so i was wondering if this ...
32
votes
3
answers
31k
views
Try Dequeue in ConcurrentQueue
The TryDequeue in ConcurrentQueue<T> will return false if no items in queue.
If the queue is empty I need that my queue will wait until new item to be added in queue and it dequeue that new one, ...
30
votes
3
answers
12k
views
Single producer, single consumer data structure with double buffer in C++
I have an application at $work where I have to move between two real-time threads that are scheduled at different frequencies. (The actual scheduling is beyond my control.) The application is hard ...
26
votes
3
answers
34k
views
Java how to avoid using Thread.sleep() in a loop
From my main I am starting two threads called producer and consumer. Both contains while(true) loop. Producer loop is UDP Server hence it does not require sleep. My problem is in the Consumer loop. ...
21
votes
5
answers
12k
views
Is adding tasks to BlockingQueue of ThreadPoolExecutor advisable?
The JavaDoc for ThreadPoolExecutor is unclear on whether it is acceptable to add tasks directly to the BlockingQueue backing the executor. The docs say calling executor.getQueue() is "intended ...
21
votes
5
answers
30k
views
RabbitMQ: fast producer and slow consumer
I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives ...
21
votes
5
answers
18k
views
Java BlockingQueue with batching?
I am interested in a data structure identical to the Java BlockingQueue, with the exception that it must be able to batch objects in the queue. In other words, I would like the producer to be able to ...
20
votes
4
answers
46k
views
can a kafka consumer filter messages before polling all of them from a topic?
It was said that consumers can only read the whole topic. No luck doing evaluations on brokers to filter messages.
It implies that we have to consume/receive all messages from a topic and filter them ...
18
votes
4
answers
5k
views
How to make worker threads quit after work is finished in a multithreaded producer-consumer pattern?
I am trying to implement a multithreaded producer-consumer pattern using
Queue.Queue in Python 2.7. I am trying to figure out how to make the
consumers, i.e. the worker threads, stop once all required ...
18
votes
4
answers
18k
views
Go: One producer many consumers
So I have seen a lot of ways of implementing one consumer and many producers in Go - the classic fanIn function from the Concurrency in Go talk.
What I want is a fanOut function. It takes as a ...
17
votes
2
answers
16k
views
Find out if I'm on the unity thread
How can I check if the thread I'm on is the Unity thread?
I tried capturing the threadId at constructor time, but somewhere along the lifetime of the program, the threadId moves.
In my project, some ...
17
votes
3
answers
5k
views
How to consume a BlockingCollection<T> in batches
I've come up with some code to consume all wating items from a queue. Rather than processing the items 1 by 1, it makes sense to process all waiting items as a set.
I've declared my queue like this.
...
16
votes
1
answer
5k
views
Apple doc's GCD Producer-Consumer solution wrong?
In the Migrating Away from Threads section of Apple's Concurrency Programming Guide, there is
Changing Producer-Consumer Implementations, which claims that the typical multistep pthread mutex + ...
16
votes
2
answers
4k
views
Difference between SynchronousQueue vs TransferQueue
What is the difference between these two implementations? In which cases should be used one over another?
15
votes
2
answers
17k
views
Python - simple reading lines from a pipe
I'm trying to read lines from a pipe and process them, but I'm doing something silly and I can't figure out what. The producer is going to keep producing lines indefinitely, like this:
producer.py
...
15
votes
3
answers
18k
views
Multiple producers, single consumer
I have to develop a multithreaded application, where there will be multiple threads, each thread generates custom event log which need to be saved in queue (not Microsoft MSMQ).
There will be another ...
14
votes
5
answers
27k
views
boost c++ lock-free queue vs shared queue
I'm quite new in multithreading programming, I just know the most common Producer-Consumer-Queue.
I'm using the boost c++ libraries and I don't know if is better use boost::lockfree::queue or a ...
14
votes
3
answers
17k
views
Error: Could not find or load main class config.zookeeper.properties
I am trying to execute a sample producer consumer application using Apache Kafka. I downloaded it from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src.tgz . Then I ...
13
votes
1
answer
6k
views
Calling Dispose on an BlockingCollection<T>
I've reused the example producer consumer queue from the C# in a Nutshell book of Albahari (http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT) and a colleague remarked:
"Why isn't the ...
13
votes
4
answers
2k
views
Producer-Consumer Queue in AngularJS
I know python and databases since several years ago.
But I want to improve my limited JavaScript knowledge. For my toy project I want to use an asynchronous queue in the web browser and use AngularJS ...
13
votes
1
answer
22k
views
Implementing the Producer/Consumer Pattern in C#
How can I implement the Producer/Consumer patterns in C# using Events and Delegates? What do I need to keep an eye out for when it comes to resources when using these design patterns? Are there any ...
12
votes
2
answers
3k
views
Is this a job for TPL Dataflow?
I run a pretty typical producer/consumer model on different tasks.
Task1: Reads batches of byte[] from binary files and kicks off a new task for each collection of byte arrays. (the operation is ...
12
votes
1
answer
367
views
Why would I place a synchronized block within a single-threaded method?
I stumbled upon this article on IBM - developerworks, and the code they posted had me raise some questions:
Why is the building of the local variable Map wrapped within a synchronized block? Note ...
12
votes
3
answers
2k
views
BlockingCollection that discards old data
I have a BlockingCollection. Producer tasks add items to it, and consumer tasks remove items.
Now I want to limit the number of items in the collection, automatically discarding old data if more ...
12
votes
3
answers
4k
views
C++ Producer consumer queue with (very) fast and reliable handover
Hi I am looking into having thread handover using a fast and reliable producer consumer queue. I am working on Windows with VC++.
I based my design on Anthony Williams queue, that is, basically a ...
11
votes
3
answers
20k
views
producer/consumer work queues
I'm wrestling with the best way to implement my processing pipeline.
My producers feed work to a BlockingQueue. On the consumer side, I poll the queue, wrap what I get in a Runnable task, and submit ...
11
votes
1
answer
2k
views
When should System.Threading.Channels be preferred to ConcurrentQueue?
I recently built a consumer/producer system using ConcurrentQueue<T> and SemaphoreSlim. Then made another alternative system utilizing the new System.Threading.Channel class.
After benchmarking ...
11
votes
3
answers
25k
views
producer-consumer problem with pthreads
I'm attempting to solve the producer-consumer problem using pthreads and semaphores, but it looks like the producer threads aren't producing, and the consumer threads aren't consuming. It appears that ...
11
votes
6
answers
9k
views
Using pthread condition variable with rwlock
I'm looking for a way to use pthread rwlock structure with conditions routines in C++.
I have two questions:
First: How is it possible and if we can't, why ?
Second: Why current POSIX pthread have ...
10
votes
5
answers
24k
views
What is the best way to communicate a kernel module with a user space program?
This question seems to be simple, but I want to send an event to notify my user space program that the module buffer is ready to be read.
For example, I have a buffer in my kernel module and its data ...
10
votes
2
answers
7k
views
Java thread wait and notify
I have two threads. Thread A is pulling some elements from queue and thread B is adding some elements to the queue.
I want thread A to go to sleep when the queue is empty.
When thread B adds some ...
10
votes
3
answers
11k
views
RabbitMQ C# API Event based Message Consumption
while (true)
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)Consumer.Queue.Dequeue();
IBasicProperties properties = e.BasicProperties;
byte[] body = e.Body;
Console.WriteLine("...
10
votes
1
answer
556
views
"The usage of semaphores is subtly wrong"
This past semester I was taking an OS practicum in C, in which the first project involved making a threads package, then writing a multiple producer-consumer program to demonstrate the functionality. ...
10
votes
3
answers
6k
views
C++11 non-blocking producer/consumer
I have a C++11 application with a high-priority thread that's producing data, and a low-priority thread that's consuming it (in my case, writing it to disk). I'd like to make sure the high-priority ...
9
votes
4
answers
10k
views
Using a named mutex to lock a file
I'm using a named mutex to lock access to a file (with path 'strFilePath') in a construction like this:
private void DoSomethingsWithAFile(string strFilePath)
{
Mutex mutex = new Mutex(false,...
9
votes
1
answer
2k
views
Looking for "consumer that returns value" abstraction in Java
Is there a built-in or robust third-party abstraction for consumer that returns value in Java 8+?
P.S. For deferred execution it may return Future as well.
Update. Function interface has a perfect ...
9
votes
2
answers
600
views
How to subscribe exactly once to an element from AsyncSubject (consumer pattern)
In rxjs5, I have an AsyncSubject and want to subscribe to it multiple times, but only ONE subscriber should ever receive the next() event. All others (if they are not yet unsubscribed) should ...
9
votes
1
answer
1k
views
How do I use a BlockingCollection in the Producer/Consumer pattern when the producers are also the consumers - How do I end?
I have a recursive problem where the consumers do some work at each level of a tree, then need to recurse down the tree and perform that same work at the next level.
I want to use ConcurrentBag/...
8
votes
2
answers
12k
views
Producer-Consumer waiting when queue is empty?
I have a list of work items that need to be processed in order. Sometimes the list will be empty, sometimes it will have a thousand items. Only one can be processed at a time and in order. Currently I ...
8
votes
3
answers
5k
views
does Monitor.Wait Needs synchronization?
I have developed a generic producer-consumer queue which pulses by Monitor in the following way:
the enqueue :
public void EnqueueTask(T task)
{
_workerQueue.Enqueue(task);
...
8
votes
1
answer
5k
views
LMAX Disruptor - what determines the batch size?
I have been recently learning about the LMAX Disruptor and been doing some experimentation. One thing that is puzzling me is the endOfBatch parameter of the onEvent handler method of the EventHandler. ...
8
votes
8
answers
7k
views
Multi-threading to speed up an email-sending application
I have built an application to send email mailers for a website through Amazon SES. It is coded in C#.
Each email takes .3 seconds to send via the Amazon SES API.
That means, using a single-threaded ...
8
votes
2
answers
13k
views
Looking for the right ring buffer implementation in C
I am looking for a ring buffer implementation (or pseudocode) in C with the following characteristics:
multiple producer single consumer pattern (MPSC)
consumer blocks on empty
producers block on ...