skip to Main Content

I’m essentially doing an ETL on a large SQL table. I need to pull rows, perform a calculation, then write them back to the same server but a different table. What I’m seeing is workChannel fill up then everything stops. If I take out the SQL insert statement it runs through all rows. I thought that by opening a SQL connection within the reader thread and another in the main thread they wouldn’t block each other, but that seems to not be the case.

func startReader() chan RawData {  
    readerChannel := make(chan RawData, 10)  
    defer close(readerChannel)  

    go func() {
      // Open a DB connection and select large row count.
      for query.Next() {
          // prep rawDataStruct
          readerChannel <- rawDataStruct
    return readerChannel 

func startWorkers(readerChannel chan RawData, workers int) chan CalcData {
    var wg sync.WaitGroup
    workerChannel := make(chan CalcData, 10)

    for ; workers > 0; workers-- {
      go func() {
        rawData, isOpen := <- readerChannel
        if !isOpen {
        workerChannel <- doCalculations(rawData)

    go func() {
    return workerChannel

 func main() {
  readerChannel := startReader()
  workerChannel := startWorkers(readerChannel, 2)
    // open new sql connection
    for {
      calcData, isOpen := workerChannel
      if !isOpen {
      // issue insert query against database connection.



  1. After fixing the syntax errors and replacing the SQL calls with some simple integers…

    Your workers should loop like so. Note the use of a for loop which automatically checks for a closed channel.

        for ; workers > 0; workers-- {
            go func() {
                defer wg.Done()
                for rawData := range readerChannel {
                    // process rawData
                    workerChannel <- rawData

    Your main function can similarly be simplified.

    func main() {
        readerChannel := startReader()
        workerChannel := startWorkers(readerChannel, 2)
        for calcData := range workerChannel {


    Login or Signup to reply.
  2. You almost certainly do not intend closing the readerChannel upon exit of the startReader function, which is what your code (as shown) currently does:

    func startReader() chan RawData {  
        readerChannel := make(chan RawData, 10)
        defer close(readerChannel)  // <- closes when startReader exits
        go func() {
          // Open a DB connection and select large row count.
          for query.Next() {
              // prep rawDataStruct
              readerChannel <- rawDataStruct
        return readerChannel 

    The defer close(readerChannel) should be inside the goroutine, closing the channel only once all data has been retrieved from the DB and sent to the channel:

    func startReader() chan RawData {  
        readerChannel := make(chan RawData, 10)
        go func() {
          defer close(readerChannel)  // <- close when all data read
          // Open a DB connection and select large row count.
          for query.Next() {
              // prep rawDataStruct
              readerChannel <- rawDataStruct
        return readerChannel 
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top