skip to Main Content

I have a method where I used Parallel.ForEach to speed up processing, First it is picking small files and processing fast; for the big files, it is very slow and is slower than the for loop. How can I improve the speed or should I try a different approach?

Please suggest or guide me how can I improve the speed.

public bool CheckQueueExist()
{
    try
    {
        if (QueueVariables.NewFolderPath == "")
        {
            LoadFilePath();
        }
        string path = QueueVariables.NewFolderPath;
        DirectoryInfo info = new DirectoryInfo(path);
        var files = info.GetFiles().OrderBy(p => p.CreationTime)
            .ThenBy(a => a.Name).Take(10000).ToArray();

        var batchOfFiles = files
            .Select((x, i) => new { Index = i, FullName = x.FullName, Name = x.Name })
            .GroupBy(x => x.Index / 10)
            .Select(x => x.Select(v => new { v.Name, v.FullName }).ToList())
            .ToList();

        bool commitstatus = false;

        Parallel.ForEach(Partitioner.Create(
            batchOfFiles, EnumerablePartitionerOptions.NoBuffering),
            new ParallelOptions() { MaxDegreeOfParallelism = 2 }, batch =>
        {
            StringBuilder strQueryBuild = new StringBuilder();
            List<string> filenamesToMove = new List<string>();

            foreach (var file in batch)
            {
                string fullfilepath = file.FullName;
                string Filename = file.Name;
                try
                {
                    string content = System.IO.File.ReadAllText(fullfilepath);
                    strQueryBuild.Append(content);
                    filenamesToMove.Add(fullfilepath);
                }
                catch (Exception ex)
                {
                    commitstatus = false;
                    AppHelper.ErrrorLog(ex, "File in USE: " + fullfilepath);
                }
            }

            int RowsAffected = 0;
            try
            {
                string connstr = System.Configuration.ConfigurationManager
                    .ConnectionStrings["XYZ"].ConnectionString;
                using (var conn = new NpgsqlConnection(connstr))
                {
                    conn.Open();
                    var tra = conn.BeginTransaction();
                    try
                    {
                        NpgsqlCommand cmd = new NpgsqlCommand();
                        cmd.Connection = conn;
                        cmd.CommandType = CommandType.Text;
                        cmd.CommandTimeout = 0;
                        cmd.CommandText = strQueryBuild.ToString();
                        RowsAffected = cmd.ExecuteNonQuery();
                        if (RowsAffected == 0)
                        {
                            RowsAffected = 1;
                        }
                        tra.Commit();
                        commitstatus = true;
                    }
                    catch (Exception ex)
                    {
                        AppHelper.ErrrorLog(ex, "UploadFileData-Error1");
                        RowsAffected = -1;
                        tra.Rollback();
                        commitstatus = false;
                    }
                }
            }
            catch (Exception ex)
            {
                commitstatus = false;
                AppHelper.ErrrorLog(ex, "ProcessQueue-Error2");
            }

            if (commitstatus)
            {
                Parallel.ForEach(filenamesToMove, filepath =>
                {
                    string Filename = Path.GetFileName(filepath);
                    MovetoSuccessFolder(filepath, Filename);
                });
            }
            else
            {
                Parallel.ForEach(filenamesToMove, filepath =>
                {
                    string Filename = Path.GetFileName(filepath);
                    MovetoFailureFolder(filepath, Filename);
                });
            }
        });

        return commitstatus;
    }
    catch (Exception ex)
    {
        AppHelper.ErrrorLog(ex, "CheckQueueExist");
        return false;
    }
    finally
    {

    }
}

3

Answers


  1. Some observations and advices:

    • You have nested Parallel.ForEach loops: one Parallel.ForEach inside an other. This is not a good idea, because it makes difficult to control the degree of parallelism (among other reasons). Parallel.ForEach loops should not be nested.
    • You have Parallel.ForEach loops with unconfigured MaxDegreeOfParallelism. This is not a good idea, because it results in the saturation of the ThreadPool. Microsoft’s recommendation to leave the MaxDegreeOfParallelism unconfigured, which is a bad advice IMHO, comes with a lot of exceptions and caveats. It is especially harmfull when the Parallel.ForEach is used to parallelize blocking I/O, like in your case.
    • You are processing the files in batches of 10. Batching is a helpful technique in case the workload is granular (lightweight), to minimize the synchronization overhead. In your case the workload is blocking I/O, which is most likely chunky. The synchronization overhead is most likely negligible compared to the workload, so the batching only serves at imbalancing the parallelization. As a side node, you could use the LINQ operator Chunk instead of the less efficient Select/GroupBy/Select/ToList approach.
    Login or Signup to reply.
  2. Lets se what you are actually doing here

    1. Read some type of database queries from disk
    2. Run these queries
    3. Move files

    All of these operations involve some amount of IO. Especially if the queries involve any kind of writing, since that will involve taking some kind of lock, and you risk all the queries blocking each other.

    IO operations are mostly serial operations. While SSDs handle concurrent requests much better than HDDs, they still work best when reading or writing sequential data, and suffer a massive penalty when doing any kind of random operation.

    Adding concurrency without understanding the actual performance problem is likely to do much more harm than good. The correct way to approach performance problems are:

    1. Measure/profile – Without this you are likely to spend a bunch of time improving things that do not matter. And this needs to be done after each "improvement"
    2. Improve algorithms – Algorithmic improvements often have the largest impact
    3. Reduce overhead – Try to find things done repeatedly, or unnecessarily, or where there is a faster way to do the same thing. This is especially true for databases, since there are often multiple ways to do the same thing, often with drastically different performance profiles, unless you are a database expert it is easy to make mistakes that can cause terrible performance.
    4. Consider parallelizing – but you need to make sure your program is actually suitable for parallelization, meaning you are compute-bound, and your algorithm is not inherently sequential in nature. And even then, consider if it worth the effort of ensuring thread safety.
    Login or Signup to reply.
  3. There could be several factors to Parallel.ForEach for being slower.

    1. Overhead of parallelization, Parallel.ForEach introduces overhead for managing and coordinating parallel execution. This overhead can become significant when dealing with small or lightweight task, this leads the parallel version to be slower than the sequential one.
    2. Limited parallelism, in the code MaxDegreeOfParallelism is set to 2, which limits the number of concurrent tasks to 2. Of course this will depends on your system’s capabilities, this may not fully use the available resources, especially for large files.You could experiment with increasing the degree of parallelism to see if it improves performance.
    3. Shared resources, it appears that you are using a shared StringBuilder (strQueryBuild) and lists (filenamesToMove) within the parallel loop. Accessing shared resources concurrently can introduce synchronisation overhead and potentially impact performance.

    For improving the speed of your code you can:

    1. Optimize I/O operations, reading file content using System.IO.File.ReadAllText can be a bottleneck (especially for large files). Asynchronous I/O operations can improve a lot the performances.
    2. Batch size optimisation, you can experiment with different batch sized to find an optimal value that balances the parallelism and resource usage.Larger batch sizes can reduce the parallelization overhead, while excessively large batches may cause resource contention. Measure the performance with various batch sizes to determine the most effective value.
    3. Profiling and performance measurement, using profiling tools for identify potential bottlenecks in your code.

    Remember that parallelisation does not always guarantee improved performance and can introduce complexity (readability, control flow, …)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search