skip to Main Content

I’m trying to make queries to the database through goroutines, but I constantly run into limits on the number of queries and I can’t solve this problem in any way 🙁

But everytime i have "read: connection reset by peer" or "Too many connection"

What am I doing wrong? Help me please. Thanks in advance. Here is my code.

type Page struct {
    Stat int
}

func main() {
    cfg := mysql.Config{
        // some config
    }
    // 5000 ids
    groups := []int{}

    // trying set buffer limit
    pages := make(chan Page, 8)
    for _, id := range groups {
        go getData(id, cfg, pages)
    }
    for _, id := range groups {
        page := <-pages
        fmt.Println(id, page.Stat)
    }
}

func getData(i int, cfg mysql.Config, channel chan Page) {
    db, err := sql.Open("mysql", cfg.FormatDSN())
    db.SetMaxOpenConns(8)
    db.SetMaxIdleConns(8)
    checkError(err)
    rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
    checkError(err)

    defer rows.Close()
    defer db.Close()

    count := 0;
    for rows.Next() {
        err = rows.Scan(&id1)
        checkError(err)
        count++
    }
    channel <- Page{Stat: count}
}

3

Answers


  1. The connection creation should be done outside getData. This code can create too many connections (~ 5000) in parallel.

    Login or Signup to reply.
  2. Per comments, the database instance should be handled outside the goroutines. But doing this alone can still cause errors because of your settings for open connections in the connection pool. For example, you set the maximum open connections to 8, then spawn all your goroutines. Some of the goroutines might time out waiting for an available connection from the connection pool.

    You can improve this and optimize the usage of connections in the connection pool and the number of goroutines to get the best error-free performance. The maximum number of goroutines that can be active at one moment should equal the number of available open connections, just to be on the safe side not to cause any timeout errors.

    func main() {
    
        cfg := mysql.Config{
            // some config
        }
    
        numberOfConns := 20
        db, err := sql.Open("mysql", cfg.FormatDSN())
        checkError(err)
        
        defer db.Close()
        db.SetMaxOpenConns(numberOfConns)
        db.SetMaxIdleConns(numberOfConns)
    
        // 5000 ids
        groups := []int{}
    
        // trying set buffer limit
        pages := make(chan Page, numberOfConns)
        limit := make(chan bool, numberOfConns)
        for _, id := range groups {
            limit <- true //limits the number of goroutines that can be spawned based on numberOfConns 
            go getData(id, db, pages, limit)
        }
        for _, id := range groups {
            page := <-pages
            fmt.Println(id, page.Stat)
        }
    }
    
    func getData(i int, db *sql.DB, channel chan Page, limit chan bool) {
        rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
        checkError(err)
    
        defer rows.Close()
    
        count := 0;
        for rows.Next() {
            err = rows.Scan(&id1)
            checkError(err)
            count++
        }
        channel <- Page{Stat: count}
        <-limit // release the resource so next goroutine can be started
    }
    
    Login or Signup to reply.
  3. Here the proper version of your code.

    type Page struct {
        Stat int
        Id   int
    }
    
    func main() {
        cfg := mysql.Config{
            // some config
        }
        // 5000 ids
        groups := []int{1, 2, 3}
    
        // this buffered channel will block at the concurrency limit
        semaphoreChan := make(chan struct{}, 5)
    
        // for collecting result
        pages := make(chan *Page)
    
        defer func() {
            close(semaphoreChan)
            close(pages)
        }()
    
        go func() {
            for _, id := range groups {
                // this sends an empty struct into the semaphoreChan which
                // is basically saying add one to the limit, but when the
                // limit has been reached block until there is room
                semaphoreChan <- struct{}{}
    
                go func(i int) {
    
                    getData(i, cfg, pages)
                    // once we're done it's we read from the semaphoreChan which
                    // has the effect of removing one from the limit and allowing
                    // another goroutine to start
                    <-semaphoreChan
    
                }(id)
            }
        }()
    
        var results []Page
    
        for {
            result := <-pages
            log.Println(result.Stat, result.Id)
            results = append(results, *result)
            if len(results) == len(groups) {
                break
            }
        }
    
    }
    
    func getData(i int, cfg mysql.Config, channel chan *Page) {
        db, err := sql.Open("mysql", cfg.FormatDSN())
        db.SetMaxOpenConns(8)
        db.SetMaxIdleConns(8)
        checkError(err)
        rows, err := db.Query(`select g.id from goods as g where g.groupid = ?`, i)
        checkError(err)
    
        defer rows.Close()
        defer db.Close()
    
        count := 0
        for rows.Next() {
            err = rows.Scan(&id1)
            checkError(err)
            count++
        }
        channel <- &Page{Stat: count, Id: i}
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search