I am stuck at an architectural decision. I have Node + Express app, It has an API to upload files. After the upload is done, the response is closed and uploaded file is processed by FFMPEG batch wise with the help of Bull Queue + Redis. This structure works fine but recently I started testing on Server side events to give updates about processing to the end user. But I am unable to pass the response object to Bull Queue to write regular updates from server to user.
1. Imports
import childProcess from 'child_process';
import Bull from 'bull'
const Queue = new Bull('background_job', {redis: {port: process.env.port, host: process.env.host, password: process.env.password}});
2. Upload function
const uploadVideo = async(req, res) => {
try{
const result = await authUser(req);
const result2 = await checkUploadFile(result);
const result3 = await insertPost(result2, res);
await Queue.add(result3.data, result3.opts)
} catch(err){
res.status(403).send(err);
}
}
3. Promises
const authUser = (req) => {
return new Promise((resolve, reject) => {
//do some work
})
}
const checkUploadFile = (result) => {
return new Promise((resolve, reject) => {
//do some more work
})
}
const insertPost= (result, res) => {
return new Promise((resolve, reject) => {
//do final work
...........
//preparing server side events
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*'
};
res.writeHead(200, headers);
res.write(JSON.stringify({status: true, id: 1})); //testing server side events for the first time
//Let's continue to Bull
const data = {res: res} <- error here: TypeError: Converting circular structure to JSON
const opts = {removeOnComplete: true, removeOnFail: true}
resolve({data: data, opts: opts});
})
}
4. Queue Process
Queue.process((job, done) => {
const res = job.data.res
childProcess.execFile('someScript.sh', [`some`, `arguments`], { stdio: ['pipe', 'pipe', 'ignore']}, (err, stderr, stdout) => {
if(err){
done(new Error("Failed: " + err))
res.write(JSON.stringify({status: true, id: 2})); //here using SSE
res.end()
} else {
done()
res.write(JSON.stringify({status: false})); //here using SSE
res.end()
}
})
})
5. Error logged by PM2
TypeError: Converting circular structure to JSON
--> starting at object with constructor 'Socket'
| property 'parser' -> object with constructor 'HTTPParser'
--- property 'socket' closes the circle
I tried to use JSON.stringify(res)
to pass the response object as JSON but that didn’t work either. Now I’m considering if this approach is right or should I go with Socket.io (which is an overkill for a simple server side events)
Thank you
2
Answers
Why do you even write this line:
You still have access to the response object in uploadVideo function where you call insertPost. so it can simply be:
For instance:
Remove this line:
Just use response
Edit:
I see what you mean. Had a look at the bull module. Why cant you do something like this.
Then in your process function, simply return the data which will be emitted. i.e
;
You can use
job.progress()
to communicate with the route that’s connected to the client via SSE. Update the progress withjob.progress(percent)
, passing in a number. The Express route scope can then spin on this and emit SSE events to the client as the job progresses.Here’s a basic runnable example as a proof of concept you can add your processing, error handling and
job.progress
and SSE logic onto.Sample run:
See also How to use server-sent-events in express.js which has a sample client that can read the response stream.