X

Using TPL and PLINQ to raise performance of feed aggregator

In this posting I will show you how to use Task Parallel Library (TPL) and PLINQ features to boost performance of simple RSS-feed aggregator. I will use here only very basic .NET classes that almost every developer starts from when learning parallel programming. Of course, we will also measure how every optimization affects performance of feed aggregator.

Feed aggregator

Our feed aggregator works as follows:

  1. Load list of blogs
  2. Download RSS-feed
  3. Parse feed XML
  4. Add new posts to database

Our feed aggregator is run by task scheduler after every 15 minutes by example.

We will start our journey with serial implementation of feed aggregator. Second step is to use task parallelism and parallelize feeds downloading and parsing. And our last step is to use data parallelism to parallelize database operations.

We will use Stopwatch class to measure how much time it takes for aggregator to download and insert all posts from all registered blogs. After every run we empty posts table in database.

Serial aggregation

Before doing parallel stuff let’s take a look at serial implementation of feed aggregator. All tasks happen one after other.

internal class FeedClient
{
    private readonly INewsService _newsService;
    private const int FeedItemContentMaxLength = 255;
 
    public FeedClient()
    {
         ObjectFactory.Initialize(container =>
         {
             container.PullConfigurationFromAppConfig = true;
         });
 
        _newsService = ObjectFactory.GetInstance<INewsService>();
    }
 
    public void Execute()
    {
        var blogs = _newsService.ListPublishedBlogs();
 
        for (var index = 0; index <blogs.Count; index++)
        {
             ImportFeed(blogs[index]);
        }
    }
 
    private void ImportFeed(BlogDto blog)
    {
        if(blog == null)
            return;
        if (string.IsNullOrEmpty(blog.RssUrl))
            return;
 
        var uri = new Uri(blog.RssUrl);
        SyndicationContentFormat feedFormat;
 
        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);
 
        if (feedFormat == SyndicationContentFormat.Rss)
            ImportRssFeed(blog);
        if (feedFormat == SyndicationContentFormat.Atom)
            ImportAtomFeed(blog);           
    }
 
    private void ImportRssFeed(BlogDto blog)
    {
        var uri = new Uri(blog.RssUrl);
        var feed = RssFeed.Create(uri);
 
        foreach (var item in feed.Channel.Items)
        {
            SaveRssFeedItem(item, blog.Id, blog.CreatedById);
        }
    }
 
    private void ImportAtomFeed(BlogDto blog)
    {
        var uri = new Uri(blog.RssUrl);
        var feed = AtomFeed.Create(uri);
 
        foreach (var item in feed.Entries)
        {
            SaveAtomFeedEntry(item, blog.Id, blog.CreatedById);
        }
    }
}

Serial implementation of feed aggregator downloads and inserts all posts with 25.46 seconds.

Task parallelism

Task parallelism means that separate tasks are run in parallel. You can find out more about task parallelism from MSDN page Task Parallelism (Task Parallel Library) and Wikipedia page Task parallelism. Although finding parts of code that can run safely in parallel without synchronization issues is not easy task we are lucky this time. Feeds import and parsing is perfect candidate for parallel tasks.

We can safely parallelize feeds import because importing tasks doesn’t share any resources and therefore they don’t also need any synchronization. After getting the list of blogs we iterate through the collection and start new TPL task for each blog feed aggregation.

internal class FeedClient
{
    private readonly INewsService _newsService;
    private const int FeedItemContentMaxLength = 255;
 
    public FeedClient()
    {
         ObjectFactory.Initialize(container =>
         {
             container.PullConfigurationFromAppConfig = true;
         });
 
        _newsService = ObjectFactory.GetInstance<INewsService>();
    }
 
    public void Execute()
    {
        var blogs = _newsService.ListPublishedBlogs();      
        var tasks = new Task[blogs.Count];
 
        for (var index = 0; index <blogs.Count; index++)
        {
            tasks[index] = new Task(ImportFeed, blogs[index]);
            tasks[index].Start();
        }
 
        Task.WaitAll(tasks);
    }
 
    private void ImportFeed(object blogObject)
    {
        if(blogObject == null)
            return;
        var blog = (BlogDto)blogObject;
        if (string.IsNullOrEmpty(blog.RssUrl))
            return;
 
        var uri = new Uri(blog.RssUrl);
        SyndicationContentFormat feedFormat;
 
        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);
 
        if (feedFormat == SyndicationContentFormat.Rss)
            ImportRssFeed(blog);
        if (feedFormat == SyndicationContentFormat.Atom)
            ImportAtomFeed(blog);          
    }
 
    private void ImportRssFeed(BlogDto blog)
    {
         var uri = new Uri(blog.RssUrl);
         var feed = RssFeed.Create(uri);
 
        foreach (var item in feed.Channel.Items)
         {
             SaveRssFeedItem(item, blog.Id, blog.CreatedById);
         }
    }
    private void ImportAtomFeed(BlogDto blog)
    {
        var uri = new Uri(blog.RssUrl);
        var feed = AtomFeed.Create(uri);
 
        foreach (var item in feed.Entries)
        {
            SaveAtomFeedEntry(item, blog.Id, blog.CreatedById);
        }
    }
}

You should notice first signs of the power of TPL. We made only minor changes to our code to parallelize blog feeds aggregating. On my machine this modification gives some performance boost – time is now 17.57 seconds.

Data parallelism

There is one more way how to parallelize activities. Previous section introduced task or operation based parallelism, this section introduces data based parallelism. By MSDN page Data Parallelism (Task Parallel Library) data parallelism refers to scenario in which the same operation is performed concurrently on elements in a source collection or array.

In our code we have independent collections we can process in parallel – imported feed entries. As checking for feed entry existence and inserting it if it is missing from database doesn’t affect other entries the imported feed entries collection is ideal candidate for parallelization.

internal class FeedClient
{
    private readonly INewsService _newsService;
    private const int FeedItemContentMaxLength = 255;
 
    public FeedClient()
    {
         ObjectFactory.Initialize(container =>
         {
             container.PullConfigurationFromAppConfig = true;
         });
 
        _newsService = ObjectFactory.GetInstance<INewsService>();
    }
 
    public void Execute()
    {
        var blogs = _newsService.ListPublishedBlogs();      
        var tasks = new Task[blogs.Count];
 
        for (var index = 0; index <blogs.Count; index++)
        {
            tasks[index] = new Task(ImportFeed, blogs[index]);
            tasks[index].Start();
        }
 
        Task.WaitAll(tasks);
    }
 
    private void ImportFeed(object blogObject)
    {
        if(blogObject == null)
            return;
        var blog = (BlogDto)blogObject;
        if (string.IsNullOrEmpty(blog.RssUrl))
            return;
 
        var uri = new Uri(blog.RssUrl);
        SyndicationContentFormat feedFormat;
 
        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);
 
        if (feedFormat == SyndicationContentFormat.Rss)
            ImportRssFeed(blog);
        if (feedFormat == SyndicationContentFormat.Atom)
            ImportAtomFeed(blog);          
    }
 
    private void ImportRssFeed(BlogDto blog)
    {
        var uri = new Uri(blog.RssUrl);
        var feed = RssFeed.Create(uri);
 
        feed.Channel.Items.AsParallel().ForAll(a =>
        {
            SaveRssFeedItem(a, blog.Id, blog.CreatedById);
        });
     }
 
     private void ImportAtomFeed(BlogDto blog)
     {
        var uri = new Uri(blog.RssUrl);
        var feed = AtomFeed.Create(uri);
 
        feed.Entries.AsParallel().ForAll(a =>
        {
             SaveAtomFeedEntry(a, blog.Id, blog.CreatedById);
        });
     }
}

We did small change again and as the result we parallelized checking and saving of feed items. This change was data centric as we applied same operation to all elements in collection. On my machine I got better performance again. Time is now 11.22seconds.

Results

Let’s visualize our measurement results (numbers are given in seconds).

As we can see then with task parallelism feed aggregation takes about 25% less time than in original case. When adding data parallelism to task parallelism our aggregation takes about 2.3 times less time than in original case.

More about TPL and PLINQ

Adding parallelism to your application can be very challenging task. You have to carefully find out parts of your code where you can safely go to parallel processing and even then you have to measure the effects of parallel processing to find out if parallel code performs better. If you are not careful then troubles you will face later are worse than ones you have seen before (imagine error that occurs by average only once per 10000 code runs).

Parallel programming is something that is hard to ignore. Effective programs are able to use multiple cores of processors. Using TPL you can also set degree of parallelism so your application doesn’t use all computing cores and leaves one or more of them free for host system and other processes. And there are many more things in TPL that make it easier for you to start and go on with parallel programming.

In next major version all .NET languages will have built-in support for parallel programming. There will be also new language constructs that support parallel programming. Currently you can download Visual Studio Async to get some idea about what is coming.

Conclusion

Parallel programming is very challenging but good tools offered by Visual Studio and .NET Framework make it way easier for us. In this posting we started with feed aggregator that imports feed items on serial mode. With two steps we parallelized feed importing and entries inserting gaining 2.3 times raise in performance. Although this number is specific to my test environment it shows clearly that parallel programming may raise the performance of your application significantly.

Liked this post? Empower your friends by sharing it!
Categories: .NET C#

View Comments (4)

  • Did you need to use Tasks?

    What happens when you alter the code to:

    public void Execute()
    {
    var blogs = _newsService.ListPublishedBlogs();

    blogs.AsParallel().ForAll(blog => ImportFeed(blog));
    }

    // Ryan

  • Of course you could have used a Parallel For loop, but then this article wouldn't have been as instructive.

    Thanks for this, it really makes me think about ways to get going with TPL. I only use it right now to spawn off tasks in a "Fire & Forget" mode, i.e. to let it manage threading for me.

    This is another level of performance optimization!

    Good Work!

  • Thanks for feedback, Ryan! :)

    Of course, it can be writter way shorter. My purpose with longer code was more illustrate some classes and mechanisms in TPL. Yous short version looks very elegant. Thanks!

Related Post