skip to Main Content

I’m trying to implement Asynq which’s a famous Golang job scheduler into my project and I’m currently very confused by the lack of documentation regarding possible concrete scenarios inside of the tasks themselves. Mind you there’s not even one trace on the Internet which makes me think I might be very wrong in the way I want to solve this.

Typically, I’ll have a database connection set up in main.go and communicated to everything via dependency injection.

clients := services.Clients{
  Db:           dbClient,
  Redis:        redisClient,
  // and many more
}

// In my case I use GraphQL to handle requests
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &resolvers.Resolver{
  Services: &catalogue, // some services I need to use depending the request
  Clients:  &clients, // here we communicate the database, etc.
}}))

As you see I can actually communicate the clients to my requests handlers (resolvers) and live a happy life in there, querying the database.

When setting up Asynq, the client side looks like this

asynqClient := asynq.NewClient(asynq.RedisClientOpt{
  Addr:     os.Getenv("REDIS_ENDPOINT"),
  DB:       0, // will use the default one
  Password: os.Getenv("REDIS_PASSWORD"),
})
defer asynqClient.Close()

tsk, err := tasks.NewPing("pong")
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}

I’ve already abstracted the ping code in tasks.go

package tasks

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypePing = "misc:ping"
)

type pingTaskPayload struct {
    Test string
}

func NewPing(test string) (*asynq.Task, error) {
    payload, err := json.Marshal(pingTaskPayload{Test: test})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypePing, payload), nil
}

func HandlePing(ctx context.Context, t *asynq.Task) error {
    var p pingTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Ping received with arguments: %s", p.Test)
    return nil
}

You marshal it, it goes through Redis and is caught back on the other side.

asynqServer := asynq.NewServer(
  asynq.RedisClientOpt{
    Addr:     os.Getenv("REDIS_ENDPOINT"),
    DB:       0, // will use the default one
    Password: os.Getenv("REDIS_PASSWORD"),
  },
  asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tasks.HandlePing)
go asynqServer.Run(mux)

As you can see there’s no space to inject anything, anywhere. Where are the dependencies? Why doesn’t it propose to communicate it to the tasks somehow? How does everyone use that? The getting started doesn’t quote any dependency ever.

Ideally, I’d like to use mux.HandleFunc and give a bunch of clients (e.g. db connection) to tasks.HandlePing

Right now, the only "viable" solution I see would be to set a global in the main for my server to be taken from anywhere in the system, which I don’t want to do. I want a clear pattern of dependency injection.

How do I communicate my dependencies (clients including database) to Asynq in a clean way? Am I wrong to avoid setting a global here?

I’ve searched extensively for hours. It’s like no one ever asked themselves this question of passing dependencies and this library is quite famous, so I might be doing something wrong somehow.

2

Answers


  1. Chosen as BEST ANSWER

    Well that was much simpler than I thought, I simply have to pass the clients into a Tasks struct I created.

    # in /tasks/
    package tasks
    
    import (
        "aquiestoy/pkg/mailer"
        "aquiestoy/pkg/tracking"
    
        "github.com/hibiken/asynq"
        "github.com/redis/go-redis/v9"
        "gorm.io/gorm"
    )
    
    type Clients struct {
        Db       *gorm.DB
        Redis    *redis.Client
        Tracking *tracking.Tracking
        Mailer   *mailer.Mailer
        Asynq    *asynq.Client
    }
    
    type Tasks struct {
        Clients *Clients
    }
    

    And then in the main.go

    tsks := tasks.Tasks{
      Clients: &tasks.Clients{
        Db:       dbClient,
        Redis:    redisClient,
        Tracking: tkClient,
        Mailer:   mailClient,
        Asynq:    asynqClient,
      },
    }
    
    tsk, err := tsks.NewPing("pong")
    if err != nil {
      fmt.Printf("%s", err)
      panic(err)
    }
    _, err = asynqClient.Enqueue(tsk)
    if err != nil {
      fmt.Printf("%s", err)
      panic(err)
    }
    
    // NOTE : this should eventually be separated from the client
    asynqServer := asynq.NewServer(
      asynq.RedisClientOpt{
        Addr:     os.Getenv("REDIS_ENDPOINT"),
        DB:       0, // will use the default one
        Password: os.Getenv("REDIS_PASSWORD"),
      },
      asynq.Config{Concurrency: 10},
    )
    
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypePing, tsks.HandlePing)
    go asynqServer.Run(mux)
    

    Could wrap that in a NewTasks as well for good measure but this works in my case.


  2. Conceptually, tasks should be simple, you can inject dependencies into task processors dedicated to them and documentation covers such cases

    https://github.com/hibiken/asynq/tree/master?tab=readme-ov-file#quickstart

    // PingProcessor implements asynq.Handler interface.
    type PingProcessor struct {
        db *sql.DB
        rdb redis.UniversalClient
        // etc
    }
    
    func NewPingProcessor(db *sql.DB, rdb redis.UniversalClient) *PingProcessor {
      return &PingProcessor{db: db, rdb: rdb}
    }
    
    func (p *PingProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
        // call db using p.db 
        // call redis using p.rdb 
    }
    
    

    but you should modify mux attachment

    mux.Handle(task.TypePing, task.NewPingProcessor(db, rdb))
    

    if you don’t want overhead of making new objects for dedicated task processors you can use closure functionality, so it returns a function that has access to injected dependencies.

    func HandlePing(db *sql.DB, rdb redis.UniversalHandler) asynq.HandlerFunc {
        return func(ctx context.Context, t *asynq.Task) error {
            return nil
        }
    }
    
    
    mux.HandleFunc(task.TypePing, task.HandlePing(db, rdb))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search