I’m trying to implement Asynq which’s a famous Golang job scheduler into my project and I’m currently very confused by the lack of documentation regarding possible concrete scenarios inside of the tasks themselves. Mind you there’s not even one trace on the Internet which makes me think I might be very wrong in the way I want to solve this.
Typically, I’ll have a database connection set up in main.go
and communicated to everything via dependency injection.
clients := services.Clients{
Db: dbClient,
Redis: redisClient,
// and many more
}
// In my case I use GraphQL to handle requests
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &resolvers.Resolver{
Services: &catalogue, // some services I need to use depending the request
Clients: &clients, // here we communicate the database, etc.
}}))
As you see I can actually communicate the clients
to my requests handlers (resolvers) and live a happy life in there, querying the database.
When setting up Asynq, the client side looks like this
asynqClient := asynq.NewClient(asynq.RedisClientOpt{
Addr: os.Getenv("REDIS_ENDPOINT"),
DB: 0, // will use the default one
Password: os.Getenv("REDIS_PASSWORD"),
})
defer asynqClient.Close()
tsk, err := tasks.NewPing("pong")
if err != nil {
fmt.Printf("%s", err)
panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
fmt.Printf("%s", err)
panic(err)
}
I’ve already abstracted the ping code in tasks.go
package tasks
import (
"context"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
TypePing = "misc:ping"
)
type pingTaskPayload struct {
Test string
}
func NewPing(test string) (*asynq.Task, error) {
payload, err := json.Marshal(pingTaskPayload{Test: test})
if err != nil {
return nil, err
}
return asynq.NewTask(TypePing, payload), nil
}
func HandlePing(ctx context.Context, t *asynq.Task) error {
var p pingTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Printf(" [*] Ping received with arguments: %s", p.Test)
return nil
}
You marshal it, it goes through Redis and is caught back on the other side.
asynqServer := asynq.NewServer(
asynq.RedisClientOpt{
Addr: os.Getenv("REDIS_ENDPOINT"),
DB: 0, // will use the default one
Password: os.Getenv("REDIS_PASSWORD"),
},
asynq.Config{Concurrency: 10},
)
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tasks.HandlePing)
go asynqServer.Run(mux)
As you can see there’s no space to inject anything, anywhere. Where are the dependencies? Why doesn’t it propose to communicate it to the tasks somehow? How does everyone use that? The getting started doesn’t quote any dependency ever.
Ideally, I’d like to use mux.HandleFunc
and give a bunch of clients (e.g. db connection) to tasks.HandlePing
Right now, the only "viable" solution I see would be to set a global in the main
for my server to be taken from anywhere in the system, which I don’t want to do. I want a clear pattern of dependency injection.
How do I communicate my dependencies (clients
including database) to Asynq in a clean way? Am I wrong to avoid setting a global here?
I’ve searched extensively for hours. It’s like no one ever asked themselves this question of passing dependencies and this library is quite famous, so I might be doing something wrong somehow.
2
Answers
Well that was much simpler than I thought, I simply have to pass the clients into a
Tasks
struct I created.And then in the
main.go
Could wrap that in a
NewTasks
as well for good measure but this works in my case.Conceptually, tasks should be simple, you can inject dependencies into task processors dedicated to them and documentation covers such cases
https://github.com/hibiken/asynq/tree/master?tab=readme-ov-file#quickstart
but you should modify mux attachment
if you don’t want overhead of making new objects for dedicated task processors you can use closure functionality, so it returns a function that has access to injected dependencies.