I’m essentially doing an ETL on a large SQL table. I need to pull rows, perform a calculation, then write them back to the same server but a different table. What I’m seeing is workChannel fill up then everything stops. If I take out the SQL insert statement it runs through all rows. I thought that by opening a SQL connection within the reader thread and another in the main thread they wouldn’t block each other, but that seems to not be the case.
func startReader() chan RawData { readerChannel := make(chan RawData, 10) defer close(readerChannel) go func() { // Open a DB connection and select large row count. for query.Next() { // prep rawDataStruct readerChannel <- rawDataStruct } }() return readerChannel } func startWorkers(readerChannel chan RawData, workers int) chan CalcData { var wg sync.WaitGroup wg.Add(workers) workerChannel := make(chan CalcData, 10) for ; workers > 0; workers-- { go func() { rawData, isOpen := <- readerChannel if !isOpen { return } workerChannel <- doCalculations(rawData) }() }() go func() { wg.Wait() close(workerChannel) }() return workerChannel } func main() { readerChannel := startReader() workerChannel := startWorkers(readerChannel, 2) // open new sql connection for { calcData, isOpen := workerChannel if !isOpen { break } // issue insert query against database connection. } }
2
Answers
After fixing the syntax errors and replacing the SQL calls with some simple integers…
wg.Done()
sowg.Wait()
will never return.Your workers should loop like so. Note the use of a
for
loop which automatically checks for a closed channel.Your main function can similarly be simplified.
Demonstration.
You almost certainly do not intend closing the
readerChannel
upon exit of thestartReader
function, which is what your code (as shown) currently does:The
defer close(readerChannel)
should be inside the goroutine, closing the channel only once all data has been retrieved from the DB and sent to the channel: