Gores:Go 语言编写的基于 Redis 的消息队列系统

2017-01-12 10:00:45来源:作者:Github人点击


An asynchronous job execution system based on Redis


Get the package

$ go get github.com/wang502/gores/gores

Import the package

import "github.com/wang502/gores/gores" Usage Configuration

Add a config.json in your project folder

{ "REDISURL": "", "REDIS_PW": "mypassword", "BLPOP_MAX_BLOCK_TIME" : 1, "MAX_WORKERS": 2, "Queues": ["queue1", "queue2"], "DispatcherTimeout": 5, "WorkerTimeout": 5} REDISURL : Redis server address. If you run in a local Redis, the dafault host is REDIS_PW : Redis password. If the password is not set, then password can be any string. BLPOP_MAX_BLOCK_TIME : Blocking time when calling BLPOP command in Redis. MAX_WORKERS : Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item. Queues : Array of queue names on Redis message broker. DispatcherTimeout : Duration dispatcher will wait to dispatch new job before quitting. WorkerTimeout : Duration worker will wait to process new job before quitting.

Initialize config

configPath := flag.String("c", "config.json", "path to configuration file")flag.Parse()config, err := gores.InitConfig(*configPath) Enqueue item to Redis queue

An item is a Go map. It is required to have several keys:

Name : name of the item to enqueue, items with different names are mapped to different tasks. Queue : name of the queue you want to put the item in. Args : the required arguments that you need in order for the workers to execute those tasks. Enqueue_timestamp : the Unix timestamp of when the item is enqueued. resq := gores.NewResQ(config)item := map[string]interface{}{ "Name": "Rectangle", "Queue": "TestJob", "Args": map[string]interface{}{ "Length": 10, "Width": 10, }, "Enqueue_timestamp": time.Now().Unix(),}err = resq.Enqueue(item)if err != nil { log.Fatalf("ERROR Enqueue item to ResQ")} $ go run main.go -c ./config.json -o produce Define tasks package tasks// task for item with 'Name' = 'Rectangle'// calculating the area of an rectangle by multiplying Length with Widthfunc CalculateArea(args map[string]interface{}) error { var err error length := args["Length"] width := args["Width"] if length == nil || width == nil { err = errors.New("Map has no required attributes") return err } fmt.Printf("The area is %d/n", int(length.(float64)) * int(width.(float64))) return err} Launch workers to consume items and execute tasks tasks := map[string]interface{}{ "Item": tasks.PrintItem, "Rectangle": tasks.CalculateArea, }gores.Launch(config, &tasks) $ go run main.go -c ./config.json -o consume

The output will be:

The rectangle area is 100 Info about processed/failed job resq := gores.NewResQ(config)if resq == nil { log.Fatalf("resq is nil")}info := resq.Info()for k, v := range info { switch v.(type) { case string: fmt.Printf("%s : %s/n", k, v) case int: fmt.Printf("%s : %d/n", k, v) case int64: fmt.Printf("%s : %d/n", k, v) }}

The output will be:

Gores Info:queues : 2workers : 0failed : 0host : : 0processed : 1 Contribution

Please feel free to suggest new features. Also open to pull request!