skip to Main Content

I am facing problem while reading data from a Postgres database and inserting into Elastic search using Nest library in an ASP.NET Core console application.

My approach:

  1. Get total rows from RedShift.
  2. Make query of ~100,000 against RedShift.
    • Again divided the 100,000 rows into 10 ten times like 100,000/10,000
    • Make bulk insert into ES of ~10,000 records.
  3. Make next query, offset +100,000.

It’s working for 9 or 10 iterations, but sometime failed in ES.

Also this process is taking a long time, I am talking about only Elastic search insert time.

Errors:

The request was canceled due to the configured HttpClient.Timeout of 60 seconds elapsing

TimeoutException: The operation was canceled.

IOException: Unable to read data from the transport connection: The I/O operation has been aborted because of either a thread exit or an application request..

SocketException: The I/O operation has been aborted because of either a thread exit or an application request.

Here is the code snippet that I am using:

Program.cs file:

using dotenv.net;
using MemberSearchDataSync;
using MemberSearchDataSync.Data;
using Microsoft.Extensions.Configuration;
using Nest;
using System.Reflection;

var run_dir = Path.GetDirectoryName(new Uri(Assembly.GetExecutingAssembly().CodeBase).LocalPath);
DotEnv.Load();

var settings = new ConfigurationBuilder()
    .AddJsonFile(Path.Combine(run_dir, "appsettings.json"), true, true)
    .AddEnvironmentVariables()
    .Build();

var connection_settings = new ConnectionSettings(new Uri(Environment.GetEnvironmentVariable("ES_CLUSTER")))
    .ThrowExceptions();

connection_settings.RequestTimeout(TimeSpan.FromMinutes(5));
connection_settings.MaximumRetries(5);

var client = new ElasticClient(connection_settings);

var initialized = new MemberDataReader();
var rows = await initialized.TotalCount(); // query behind "SELECT COUNT(*) FROM dim_stuent where xyz");
var noOfRowPerRequest = Convert.ToInt32(Environment.GetEnvironmentVariable("ES_NO_OF_ITERATION"));
var elasticsearchLoader = new ElasticsearchLoader(client, settings);

var iterations = Math.Ceiling(Convert.ToDecimal(rows / noOfRowPerRequest));

try
{
    var iteration = 0;

    while (iteration <= iterations)
    {
        var skip = Convert.ToInt32(iteration * noOfRowPerRequest);
        var records = await initialized.ReadAll(noOfRowPerRequest, skip);//query behind "SELECT xyz FROM dim_stuent WHERE xyz LIMIT 100000 OFFSET {skip});        

        #region Inserting data into Elastic Search 
        if (records.Count > 0)
        {
            int numberOfObjectsPerPage = 10000;//Then thousant row per request
            var iterationsES = Math.Ceiling(Convert.ToDecimal(records.Count() / numberOfObjectsPerPage));
            var iterationES = 1;

            while (iterationES <= iterationsES)
            {
                var queryResultPage = records
                  .Skip(numberOfObjectsPerPage * (iterationES - 1))
                  .Take(numberOfObjectsPerPage)
                  .ToList();
                await elasticsearchLoader.BulkInsertAsync(queryResultPage);
                iterationES++; ;
            }
        }
        #endregion        

        iteration++;
    };
}
catch (Exception ex)
{
    throw;
}

ElasticsearchLoader.cs file:

using Microsoft.Extensions.Configuration;
using Nest;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace MemberSearchDataSync
{
    public class ElasticsearchLoader
    {
        private IElasticClient client;

        public ElasticsearchLoader(IElasticClient client, IConfigurationRoot configuration)
        {
            this.client = client;
        }

        public async Task<BulkResponse?> BulkInsertAsync(List<MemberSearchModel> item)
        {
            Console.WriteLine("ES Item Count to be insert:" + item.Count());
            if (item.Count == 0)
                return null;

            var idxName = Environment.GetEnvironmentVariable("ES_INDEX");
            var descriptor = new BulkDescriptor()
                .Index(idxName);

            descriptor.IndexMany(item, (d, doc) =>
            {
                var id = doc.id;
                return d.Id((string)id);
            });
            return await client.BulkAsync(descriptor);
        }
    }
}

Could anyone help me to resolve this problem and tell me how we can insert 10+ million records into Elastic Search successfully?

2

Answers


  1. I am unsure if the issue might be with anything else in your solution but this is what works for me:

    var chunks = item.Items.Chunk(5000);
    
    foreach (var chunk in chunks)
    {
        await nestClient.BulkAsync(x => x.Index(item.Index).UpdateMany(chunk, (y, z) => y.IdFrom(z, true).Doc(z).Id(z.Id).RetriesOnConflict(2)));
    }
    

    You might want to check the reqiest timeout on you client, mine:

     ConnectionSettings settings = new ConnectionSettings(pool)<other settings>.RequestTimeout(TimeSpan.FromMinutes(3))
    
    Login or Signup to reply.
  2. Lower the size of the bulk, as 10000 may be too large. Usual size is 1000. It really depends on your hardware and the Elasticsearch cluster configuration, but clearly if you have issues, you should reduce the batch size.

    Set the number of replicas to 0 and the refresh interval to -1 on your index when you are indexing. That way, Elasticsearch is fully dedicated to index your data, it does not spend resources replicating the data or merging the segments.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search