Adding And Removing Multi-Thread Producers and Consumers in VB.NET with BlockingCollection

Seekers,

This is what happens when you can’t find a solution anywhere on the Interwebs, and you just have to hack at it until you get it looking good [enough].

I have a situation where I need to parse objects coming in at high speed, but the parsing takes a relatively long time-far slower than they come in… I can tell how many cores are on a box pretty easily, and then I can carve up a team of worker threads (Tasks) to all divide & conquer! But the problem with ANY multi-threaded application is “How Many Threads?

There is no hard & fast answer, so don’t bother looking. My approach is to make a flexible approach that my main thread can monitor to see if throughput (aggregate work completed in X amount of time) is at maximum for the machine it happens to be running on. Also, that same machine may vary in load, RAM available, etc. over time, so you can’t just set it & forget it…

So this is the idea:

Tasks

So let’s see how I solved the issue of the blocking collection and then dishing our work to the Parser Tasks… I used a very generic, and easily re-usable approach of Producer and Consumer. In fact, in my example below, you can even adjust the amount of Producers as well. You can adjust my code below to more closely approximate your work, and then adjust the # of threads/tasks/Consumers to see how well parallelism can work for you.

First, the usual suspects in Imports..

The Producer(s) are initially set up with the above. While the itemsToProduce  doesn’t change during the program, the number of Producers & Consumers will. This is very rough as a draft, and it will no doubt be streamlined in your own code at some point, but this demonstrates how to solve this problem very well.

I used “IDs” to show in the output which thread was doing what. They only thing needed in production is the list of CTS instances:

The main machine here is that one little line:

Dim bc As New BlockingCollection(Of Integer)


Microsoft says:

BlockingCollection Overview
.NET Framework 4.5

BlockingCollection(Of T) is a thread-safe collection class that provides the following features:

  • An implementation of the Producer-Consumer pattern.
  • Concurrent adding and taking of items from multiple threads.
  • Optional maximum capacity.
  • Insertion and removal operations that block when collection is empty or full.
  • Insertion and removal “try” operations that do not block or that block up to a specified period of time.
  • Encapsulates any collection type that implements IProducerConsumerCollection(Of T)
  • Cancellation with cancellation tokens.
  • Two kinds of enumeration with foreach (For Each in Visual Basic):
  1. Read-only enumeration.
  2. Enumeration that removes items as they are enumerated.

itemId is just a variable that holds the fake payload. Producers will increment it by one to simulate a different object instance, or unit of work. You just change the type the BlockingCollection holds…

Now I’m not doing this in a FIFO way (which I will in production), but you can either do that, or even a FILO, as per Microsoft:


 

When you create a BlockingCollection(Of T) object, you can specify not only the bounded capacity but also the type of collection to use. For example, you could specify a ConcurrentQueue(Of T) object for first in, first out (FIFO) behavior, or a ConcurrentStack(Of T) object for last in, first out (LIFO) behavior.


 

Now THAT is useful! Here in this demo, I did it all willy-nilly… But like I said, for my specific need, I need FIFO, as in the diagram at the top…

Later, you will see the functions and subroutines, but the real magic here is in 2 collections-one for Producers and one for Consumers:

The Magic: As each Task(thread) is either created or closed, the corresponding CancellationTokenSource is either added or removed from the appropriate collection above.

Seriously, that is it! :)

Next in the code, the initial producers and consumers are created:

Aside from a few Thread.Sleep() calls, all the next part does is add or remove producer and consumer Tasks(threads). You can vary the initial values at the top to put it through the paces.

To create a Task… – CreateTask(<integer id for display>, <“Producer” or “Consumer”>)

To remove a Task, you (in one line) both get a random CTS, and then .Cancel() it:

GetRandomCTS(ProducerCTSs).Cancel()

GetRandomCTS(ConsumerCTSs).Cancel()

GetRandomCTS() takes the collection of CTS instances, picks one at random, then calls Cancel() on it.

And that is it!

Now for the fun parts:

Line 7: This is what we’ll be returning, a CancellationTokenSource

Line 16: ctsBag.ElementAt() allows us to pull out a specific CTS instance by number.

Below, the CreateTask takes an argument for the # you want it to display when running (just for demo, to see which thread is doing what), and a string to tell it whether you want a new Producer of Consumer. Sure, I could have made it more complex, but this is just a rough draft. :)

Line 7 & 10: These call the Producer() or Consumer() classes below, passing them the CancellationTokenSource needed to let them be able to elegantly be cancelled while running without corrupting any data.

t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)

Did you notice TaskCreationOptions.LongRunning? That is good in my case, and it improves performance by telling the program to not worry so much about wating what happens for cancellations too closely.

How cool is that?!

So what does a Producer() class look like?

I know, I know… it looks complicated! But really it isn’t. I’m not that smart! 1/2 the code is just to catch & handle the cancellation requests so that the processing doesn’t corrupt any data. That, and a cheesy StopWatch() to time things… And yes, there are artifacts of earlier versions still commented out. Like I said “ROUGH“…

Line 17: Simply adds the itemId (our payload, could be anything) to the BlockingCollection (bc).

Line 20: If a cancellation, we take care of it here, and not a random part of the function, which would likely corrupt all kinds of things…

Line 31: I added this as a cheesy way to tell the Producers when to stop …producing. This variable (limit) is set at the top of the code.

Line 38: bc.CompleteAdding() – This is a signal to everyone using the bc (BlockingCollection) that there will be no more items added. This way, the consumers know when to stop …consuming!

“Why would they want to do that?”

Well, suppose you wanted a short-running task, or tasks, and needed to know they were done in order to continue…  Yes, in my case, they are long-running, and in production I’ll be starting each task with “TaskCreationOptions.LongRunning

The Consumer() class is almost identical, with just a few tiny differences:

Line 3: In both classes, we make sure to check right at the top to see if we’ve been cancelled. This way, we waste no time or resources if another Task/thread did the last piece of work just as we were being instantiated.

Line 13: itemOut = bc.Take() – Here we grab the next item (depends on FIFO or FILO/LIFO, as configured in the above discussion. This BlockingCollection does it all!

When you sit back and look at it, all the other code in this class is just to dress up Line 13!

So let’s fire this puppy up!

Was this the output you expected?

Grab the entire solution 7z’d up for you, at the link below…

Download Solution from HERE!

It took me a long time to figure out the whole CancellationToken concept, but now that I’m using it, and the bullet-proof-ness of the BlockingCollection, I’m confident my application can handle hundreds of objects per second without messing anything up.

My production application will read the amount of cores on the host machine, and use that to set the initial number of consumers. I will then adjust up & down, monitoring the time to complete (in an aggregate fashion), thus making the best of the host machine’s resources, understanding that the host machine may be doing many other things at the same time as my application.

Your thoughts, improvements, etc. appreciated!

pat
:)

Leave a Reply

Your email address will not be published. Required fields are marked *