skip to Main Content

Im using redis package , and im having a lot of connection issues with connections giving ECONNREFUSED suddenly.

I suspect its because i have done a wrong connection management.

The issue with this project is that my app, sends parameters to the api (ip, and port) , and the api must create a connection given those values, fetch some data, and return it.
I have hundreds of servers, so I dont know how to manage all those connections.

So far im managing it in a single connection. And thats why I think its failing.

It currently looks like this..

let redisClient;

const killRedis = () => {
    redisClient.quit()
    console.log("redis client shut down")
}


const createRedisClient = async (port, url) => {
    redisClient = require('redis').createClient(port, url, {
        no_ready_check: true,
        db: 1
    })
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis();
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

It kind of works, but ends up failing with econnection refused from time to time.

I use it like the following in my routes.

const scanAll = async (port, url) => {
    const redisClient = await createRedisClient(port, url)
    if (!redisClient) return 500
    const scan = promisify(redisClient.scan).bind(redisClient);
    const found = [];
    let cursor = '0';
    do {
        const reply = await scan(cursor, 'MATCH', '*');
        cursor = reply[0];
        found.push(...reply[1]);
    } while (cursor !== '0');
    return found;
};

/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
    const data = await scanAll(req.body.port, req.body.ip);
    console.log("data ", data)
    if (data === 500) {
        res.status(500).json({
            error: "Error, server connection refused"
        })
    }
    else if (data.length === 0) {
        res.status(204).json(data)
    } else {
        res.status(200).json(data);
    }

})

How can i do a proper connections management?

EDIT: my new attempt, but i think my connections are overflowing when making 2 simultaneos petitions

let connections = []


findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)
    console.log("pre")
    console.log(connection)
    if (connection && connection.connection) {
        console.log("opcion1: ", connection.ip)
        console.log("connection already exists")
        return connection[0].connection
    } else {
        console.log("opcion2")
        console.log(connections)
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        redisClient.quit()
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

I have noticed that i get the following error

MaxListenersExceededWarning: Possible EventEmitter memory leak
detected. 11 error listeners added. Use emitter.setMaxListeners() to
increase limit

EDIT: Last implementation problem

My current implementation is the following

    let connections = []

const killRedis = (redisClient, ip, port) => {
    redisClient.quit()
    connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}

const subscribe = (redisClient, url, port) => {
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis(redisClient, url, port)
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
        return redisClient;
    });
}

findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)

    if (connection && connection.length > 0) {
        subscribe(connection[0].connection)
        return connection[0].connection
    } else {
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    return redisClient
}
module.exports = { createRedisClient }

It is almost working, the problem is that i dont know how to handle the rror of

And im not sure how to handle the error event listener. If it fails i should return an undefined, But seems that it isnt doing so,

4

Answers


  1. Chosen as BEST ANSWER

    My final implementation that works as expected

    let connections = []
    
    const killRedis = (redisClient, ip, port) => {
        redisClient.quit()
        connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
    }
    
    const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
        redisClient.on('error', function (err) {
            killRedis(redisClient, url, port)
            reject(err)
        });
        redisClient.on('connect', function () {
            resolve(redisClient)
        });
    })
    
    findConnection = async (ip, port) => {
        let connection = connections.filter(i => i.ip == ip && i.port == port)
    
        if (connection && connection.length > 0) {
            return connection[0].connection
        } else {
            try {
                let newConnection = require('redis').createClient(port, ip, {
                    no_ready_check: true,
                    db: 1
                })
                const client = await subscribe(newConnection, ip, port)
                connections.push({
                    ip: ip,
                    port: port,
                    connection: newConnection
                })
                return client
            } catch (error) {
                return undefined
            }
        }
    }
    const createRedisClient = async (port, url) => {
        let redisClient = await findConnection(url, port)
        return redisClient
    }
    module.exports = { createRedisClient }
    

  2. you can use this npm package

    npm i connect-redis
    
    Login or Signup to reply.
  3. You can build a cache pool of existing connections and reuse the connections, rather than creating a new Redis connection for every new request. This way, you won’t cross the limit of event listeners per Redis Connection. You just need to create a connection, if it doesn’t already exist in the pool.

    PS – For simplicity, my current implementation of cache pool creates a single connection for every pair of host and port and stores them. You can implement an LRU cache in your Cache Pool to evict unused Redis connections if need be.

    This way you should be able to solve your connection management problem since you will be creating the connection once and reuse them.

    cache-pool.js

    const redis = require('redis');
    const Q = require('q');
    
    class CachePool {
        constructor() {
            this.cachedClients = {};
            this.pendingCachedClients = {};
        }
    
        getConnection(host, port) {
            const deferred = Q.defer();
            const connectionId = CachePool.getConnectionId(host, port);
            if (this.cachedClients[connectionId]) {
                deferred.resolve(this.cachedClients[connectionId]);
            } else {
                this.cachedClients[connectionId] = redis.createClient({
                    host,
                    port,
                });
                this.cachedClients[connectionId].on('connect', (connection) => {
                    deferred.resolve(this.cachedClients[connectionId]);
                });
                this.cachedClients[connectionId].on('error', (err) => {
                    deferred.reject(err);
                });
            }
            return deferred.promise;
        }
    
        static getConnectionId(host, port) {
            return `${host}:${port}`;
        }
    }
    
    module.exports = new CachePool();
    

    cache.js

    const { promisify } = require('util');
    
    const CachePool = require('./cache-pool');
    
    class Cache {
        constructor(host, port) {
            this.host = host;
            this.port = port;
        }
    
        connect() {
            return CachePool.getConnection(this.host, this.port);
        }
    
        async scanAll() {
            const redisConnection = await this.connect();
            const scan = promisify(redisConnection.scan).bind(redisConnection);
            const found = [];
            let cursor = '0';
            do {
                const reply = await scan(cursor, 'MATCH', '*');
                cursor = reply[0];
                found.push(...reply[1]);
            } while (cursor !== '0');
            return found;
        }
    }
    
    module.exports = Cache;
    

    test-sever.js

    const express = require('express');
    
    const Cache = require('./cache');
    
    const app = express();
    
    app.use(express.json({ type: '*/json' }));
    
    const port = 3000;
    
    /* Return all the users id */
    app.post('/user/list', async function (req, res, next) {
        const redisClient = new Cache(req.body.ip, req.body.port);
        const data = await redisClient.scanAll();
        if (data === 500) {
            res.status(500).json({
                error: "Error, server connection refused"
            })
        }
        else if (data.length === 0) {
            res.status(204).json(data)
        } else {
            res.status(200).json(data);
        }
    
    })
    
    app.listen(port, () => {
      console.log(`Example app listening at http://localhost:${port}`)
    });
    
    Login or Signup to reply.
  4. The single connection solution or cache pool solution are all available. You meet some problems with these two solutions.

    • Single Connection Solution: ECONNREFUSED

    I guess the error triggered when redis maxclient reached, because your single connection solution do not close the client when processing each request. So it kind of works when do not reach maxclient limit, but end with ECONNREFUSED.

    You can try to resolve it, just add killRedis before request return.

    const scanAll = async (port, url) => {
        const redisClient = await createRedisClient(port, url)
        if (!redisClient) return 500
        const scan = promisify(redisClient.scan).bind(redisClient);
        const found = [];
        let cursor = '0';
        do {
            const reply = await scan(cursor, 'MATCH', '*');
            cursor = reply[0];
            found.push(...reply[1]);
        } while (cursor !== '0');
        killRedis(); // add code here.
        return found;
    };
    
    • Cache Pool Solution: MaxListenersExceededWarning

    Refer to the answer MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 message lis teners added. Use emitter.setMaxListeners() to increase limit

    For each request, you run redisClient.on() twice in createRedisClient function, you do a subscribe twice, then two new listener added. The default limit is 10. It will end up with MaxListenersExceededWarning because lacking of unsubscribe. The solution is moving all redisClient.on code from createRedisClient function to findConnection function, just subscribe twice when connection created, and not related to user request.

    • added comment code
    findConnection = (ip, port) => {
        let connection = connections.filter(i => i.ip == ip && i.port == port)
    
        if (connection && connection.length > 0) {
            // subscribe(connection[0].connection) // should be deleted
            return connection[0].connection
        } else {
            connections.push({
                ip: ip,
                port: port,
                connection: require('redis').createClient(port, ip, {
                    no_ready_check: true,
                    db: 1
                })
            })
            subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
            return connections.filter(i => i.ip == ip && i.port == port)[0].connection
        }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search