skip to Main Content

I’m essentially doing an ETL on a large SQL table. I need to pull rows, perform a calculation, then write them back to the same server but a different table. What I’m seeing is workChannel fill up then everything stops. If I take out the SQL insert statement it runs through all rows. I thought that by opening a SQL connection within the reader thread and another in the main thread they wouldn’t block each other, but that seems to not be the case.

func startReader() chan RawData {  
    readerChannel := make(chan RawData, 10)  
    defer close(readerChannel)  

    go func() {
      // Open a DB connection and select large row count.
      for query.Next() {
          // prep rawDataStruct
          readerChannel <- rawDataStruct
      }
    }()
    return readerChannel 
}


func startWorkers(readerChannel chan RawData, workers int) chan CalcData {
    var wg sync.WaitGroup
    wg.Add(workers)
    workerChannel := make(chan CalcData, 10)

    for ; workers > 0; workers-- {
      go func() {
        rawData, isOpen := <- readerChannel
        if !isOpen {
          return
        }
        workerChannel <- doCalculations(rawData)
      }()
    }()

    go func() {
      wg.Wait()
      close(workerChannel)
    }()
    return workerChannel
 }

 func main() {
  readerChannel := startReader()
  workerChannel := startWorkers(readerChannel, 2)
    
    // open new sql connection
    for {
      calcData, isOpen := workerChannel
      if !isOpen {
        break
      }
      // issue insert query against database connection.
    }
}

2

Answers


  1. After fixing the syntax errors and replacing the SQL calls with some simple integers…

    Your workers should loop like so. Note the use of a for loop which automatically checks for a closed channel.

        for ; workers > 0; workers-- {
            go func() {
                defer wg.Done()
                for rawData := range readerChannel {
                    // process rawData
                    workerChannel <- rawData
                }
            }()
        }
    

    Your main function can similarly be simplified.

    func main() {
        readerChannel := startReader()
        workerChannel := startWorkers(readerChannel, 2)
    
        for calcData := range workerChannel {
            fmt.Println(calcData)
        }
    }
    

    Demonstration.

    Login or Signup to reply.
  2. You almost certainly do not intend closing the readerChannel upon exit of the startReader function, which is what your code (as shown) currently does:

    func startReader() chan RawData {  
        readerChannel := make(chan RawData, 10)
        defer close(readerChannel)  // <- closes when startReader exits
    
        go func() {
          // Open a DB connection and select large row count.
          for query.Next() {
              // prep rawDataStruct
              readerChannel <- rawDataStruct
          }
        }()
        return readerChannel 
    }
    

    The defer close(readerChannel) should be inside the goroutine, closing the channel only once all data has been retrieved from the DB and sent to the channel:

    func startReader() chan RawData {  
        readerChannel := make(chan RawData, 10)
        go func() {
          defer close(readerChannel)  // <- close when all data read
        
          // Open a DB connection and select large row count.
          for query.Next() {
              // prep rawDataStruct
              readerChannel <- rawDataStruct
          }
        }()
        return readerChannel 
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search