24 tháng 2, 2019

Package context trong Go

Go Concurrency Patterns: Context

Introduction

Trong Go, mỗi request đến sẽ được handle trong một goroutine. Các request-handler thường bổ sung thêm các goroutine để truy cập đến backend như database và các RPC service.

Context

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

Method Done trả về một channel được sử dụng như một tín hiệu hủy cho các function đại diện cho Context: khi một channel close, các function nên hủy bỏ công việc đang thực thi và return. Method Err trả về một error chỉ ra lý do mà Context bị hủy.

Lý do mà Context không có method Cancel cũng tương tự như channel Done chỉ là một channel nhận kết quả (receive-only): hàm mà nhận tín hiệu cancel thông thường sẽ không phải là một trong các hàm gửi tín hiệu (signal) đi. Đặc biệt, khi một operation cha khởi chạy goroutine cho các sub-operation, những sub-operation này sẽ không thể hủy operation cha. Thay vào đó, hàm WithCancel có cung cấp cách để hủy một Context mới.

Context có thể được dùng đồng thời bởi nhiều goroutine. Chúng ta có thể truyền một Context cho goroutine bất kì và có thể hủy Context để báo hiệu cho tất cả các goroutine chứa Context này.

Method Deadline cho phép các function xác định xem liệu chúng có nên thực thi công việc hay không. Nếu thời gian còn lại quá ngắn thì nó có thể không đáng để thực thi. Ta cũng có thể sử dụng một thời hạn (deadline) để thực hiện thiết lập một timeout cho các giao tiếp I/O.

Method Value cho phép một Context mang theo dữ liệu thuộc phạm vi request (request-scope). Dữ liệu này bắt buộc phải là safe khi được sử dụng đồng thời bời nhiều goroutine.

Derived contexts

Package context cung cấp các hàm dẫn xuất một giá trị Context mới từ một Context đã tồn tại, chúng sẽ tạo thành dạng cây (Context-tree): khi một Context bị cancel, tất cả Context đã được dẫn xuất từ nó cũng sẽ bị cancel.

Background sẽ là root của các Context-tree, nó không bao giờ bị cancel:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

Các hàm WithCancelWithTimeout trả về giá trị Context dẫn xuất có thể bị cancel trước Context cha. Context liên kết với một request đến thường bị cancel khi handler của request này return. Hàm WithCancel cũng hữu ích để cancel các request dư thừa khi sử dụng nhiều replica. Hàm WithTimeout thì có thể giúp thiết lập một deadline cho các request đến các server backend:

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue cung cấp cách liên kết các giá trị request-scope với một Context:

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

Cách tốt nhất để nắm được cách sử dụng package context là tham khảo qua các ví dụ thực tế.

Ví dụ: Google Web Search

Ví dụ của chúng ta là một server HTTP handle các URL /search?q=golang&timeout=1s bằng việc forward từ khóa truy vấn "golang" đến Google Web Search API và hiển thị ra kết quả. Tham số timeout báo cho server biết khoảng thời gian để cancel request.

Code được chia thành 3 package:

  • server cung cấp hàm main và handler cho /search.
  • userip cung cấp các hàm để trích xuất địa chỉ IP của người dùng từ request và liên kết nó với một Context.
  • google cung cấp hàm Search để gửi truy vấn đến cho Google.

Server

Ở đây server sẽ xử lý các request có format giống như search?q=golang bằng cách cung cấp một vài kết quả đầu tiên khi tìm kiếm bằng Google. Ta sẽ sử dụng hàm handleSearch để handle endpoint /search. Handler này khởi tạo một Context gọi là ctx và bố trí cancel khi handler return. Nếu request đến bao gồm tham số timeout trong URL thì Context sẽ bị cancel tự động nếu như hết thời gian timeout:

func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

Handler này trích xuất từ khóa truy vấn từ request và địa chỉ IP của client bằng cách sử dụng package userip. Backend request cần đến địa chỉ IP của client vì vậy hàm handleSearch đính kèm nó vào trong ctx:

    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Store the user IP in ctx for use by code in other packages.
    userIP, err := userip.FromRequest(req)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    ctx = userip.NewContext(ctx, userIP)

Handler gọi đến google.Search sử dụng ctxquery:

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(ctx, query)
    elapsed := time.Since(start)

Nếu tìm kiếm thành công, handler này sẽ hiển thị ra kết quả:

    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }

Package userip

Package userip cung cấp các function để trích xuất địa chỉ IP người dùng từ request gửi lên và liên kết nó với một Context. Một Context chứa một đối tượng map với keyvalue đều là kiểu dữ liệu interface{}. Kiểu dữ liệu của key phải hỗ trợ so sánh equal, và value phải an toàn khi được sử dụng đồng thời bời nhiều goroutine. Các package giống như userip ẩn đi chi tiết ánh xạ key-value và cung cấp khả năng truy cập vào một value cụ thể trong Context.

Để tránh trùng lặp kiểu dữ liệu key, userip định nghĩa ra một kiểu unexportedkey và sử dụng một value của kiểu dữ liệu này như một context-key:

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

FromRequest trích xuất một giá trị userIP từ một http.Request:

func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }
    ...
}

NewContext khởi tạo một đối tượng Context mới mang giá trị userIP đã được cấp:

func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

FromContext trích xuất một userIP từ một Context:

func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}
// Package google provides a function to do Google searches using the Google Web
// Search API. See https://developers.google.com/web-search/docs/
//
// This package is an example to accompany https://blog.golang.org/context.
// It is not intended for use by others.
//
// Google has since disabled its search API,
// and so this package is no longer useful.
package google
import (
"context"
"encoding/json"
"net/http"
"golang.org/x/blog/content/context/userip"
)
// Results is an ordered list of search results.
type Results []Result
// A Result contains the title and URL of a search result.
type Result struct {
Title, URL string
}
// Search sends query to Google search and returns the results.
func Search(ctx context.Context, query string) (Results, error) {
// Prepare the Google Search API request.
req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Set("q", query)
// If ctx is carrying the user IP address, forward it to the server.
// Google APIs use the user IP to distinguish server-initiated requests
// from end-user requests.
if userIP, ok := userip.FromContext(ctx); ok {
q.Set("userip", userIP.String())
}
req.URL.RawQuery = q.Encode()
// Issue the HTTP request and handle the response. The httpDo function
// cancels the request if ctx.Done is closed.
var results Results
err = httpDo(ctx, req, func(resp *http.Response, err error) error {
if err != nil {
return err
}
defer resp.Body.Close()
// Parse the JSON search result.
// https://developers.google.com/web-search/docs/#fonje
var data struct {
ResponseData struct {
Results []struct {
TitleNoFormatting string
URL string
}
}
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return err
}
for _, res := range data.ResponseData.Results {
results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
}
return nil
})
// httpDo waits for the closure we provided to return, so it's safe to
// read results here.
return results, err
}
// httpDo issues the HTTP request and calls f with the response. If ctx.Done is
// closed while the request or f is running, httpDo cancels the request, waits
// for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error.
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
// Run the HTTP request in a goroutine and pass the response to f.
c := make(chan error, 1)
req = req.WithContext(ctx)
go func() { c <- f(http.DefaultClient.Do(req)) }()
select {
case <-ctx.Done():
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}
view raw google.go hosted with ❤ by GitHub
// The server program issues Google search requests and demonstrates the use of
// the go.net Context API. It serves on port 8080.
//
// The /search endpoint accepts these query params:
// q=the Google search query
// timeout=a timeout for the request, in time.Duration format
//
// For example, http://localhost:8080/search?q=golang&timeout=1s serves the
// first few Google search results for "golang" or a "deadline exceeded" error
// if the timeout expires.
package main
import (
"context"
"html/template"
"log"
"net/http"
"time"
"golang.org/x/blog/content/context/google"
"golang.org/x/blog/content/context/userip"
)
func main() {
http.HandleFunc("/search", handleSearch)
log.Fatal(http.ListenAndServe(":8080", nil))
}
// handleSearch handles URLs like /search?q=golang&timeout=1s by forwarding the
// query to google.Search. If the query param includes timeout, the search is
// canceled after that duration elapses.
func handleSearch(w http.ResponseWriter, req *http.Request) {
// ctx is the Context for this handler. Calling cancel closes the
// ctx.Done channel, which is the cancellation signal for requests
// started by this handler.
var (
ctx context.Context
cancel context.CancelFunc
)
timeout, err := time.ParseDuration(req.FormValue("timeout"))
if err == nil {
// The request has a timeout, so create a context that is
// canceled automatically when the timeout expires.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel() // Cancel ctx as soon as handleSearch returns.
// Check the search query.
query := req.FormValue("q")
if query == "" {
http.Error(w, "no query", http.StatusBadRequest)
return
}
// Store the user IP in ctx for use by code in other packages.
userIP, err := userip.FromRequest(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx = userip.NewContext(ctx, userIP)
// Run the Google search and print the results.
start := time.Now()
results, err := google.Search(ctx, query)
elapsed := time.Since(start)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := resultsTemplate.Execute(w, struct {
Results google.Results
Timeout, Elapsed time.Duration
}{
Results: results,
Timeout: timeout,
Elapsed: elapsed,
}); err != nil {
log.Print(err)
return
}
}
var resultsTemplate = template.Must(template.New("results").Parse(`
<html>
<head/>
<body>
<ol>
{{range .Results}}
<li>{{.Title}} - <a href="{{.URL}}">{{.URL}}</a></li>
{{end}}
</ol>
<p>{{len .Results}} results in {{.Elapsed}}; timeout {{.Timeout}}</p>
</body>
</html>
`))
view raw server.go hosted with ❤ by GitHub
// Package userip provides functions for extracting a user IP address from a
// request and associating it with a Context.
//
// This package is an example to accompany https://blog.golang.org/context.
// It is not intended for use by others.
package userip
import (
"context"
"fmt"
"net"
"net/http"
)
// FromRequest extracts the user IP address from req, if present.
func FromRequest(req *http.Request) (net.IP, error) {
ip, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}
userIP := net.ParseIP(ip)
if userIP == nil {
return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
}
return userIP, nil
}
// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int
// userIPkey is the context key for the user IP address. Its value of zero is
// arbitrary. If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0
// NewContext returns a new Context carrying userIP.
func NewContext(ctx context.Context, userIP net.IP) context.Context {
return context.WithValue(ctx, userIPKey, userIP)
}
// FromContext extracts the user IP address from ctx, if present.
func FromContext(ctx context.Context) (net.IP, bool) {
// ctx.Value returns nil if ctx has no value for the key;
// the net.IP type assertion returns ok=false for nil.
userIP, ok := ctx.Value(userIPKey).(net.IP)
return userIP, ok
}
view raw userip.go hosted with ❤ by GitHub