Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
December 28, 2021 06:01 am GMT

Reverse HTTP proxy over WebSocket in Go (Part 4)

Series introduction

In part 3 I talked about how to relay TCP connection from "App" to the peer of WebSocket, especially implementation to receive requests to be proxied and to pool the WebSocket connection on the server for relaying in Go.

In this post, I will be starting to talk about how to relay TCP connection from "App" to the peer of WebSocket in Go.

  • Start a WebSocket server (Part 1)
  • Establish a WebSocket connection (Part 2)
  • Relay TCP connection from "App" to the peer of WebSocket (Part 3 | Part 4 | Part 5)
  • Relay TCP connection in WebSocket data to "internal API"
  • Keep a established connection

Reverse HTTP proxy over WebSocket

A reverse HTTP proxy over WebSocket is a type of proxies, which retrieves resources on behalf on a client from servers and uses the WebSocket protocol as a "tunnel" to pass TCP communication from server to client.

A network diagram for reverse proxy over WebSocket

I'll introduce Go sample project forked from root-gg/wsp (I forked it because maintenance has stopped and the Go language and libraries version needed to be updated).

GitHub logo hgsgtk / wsp

HTTP tunnel over Websocket

Relay TCP connection to the peer WebSocket

A reverse HTTP proxy over WebSocket relay TCP connection to the peer WebSocket over the WebSocket connection.

HTTP communication is relayed by the following route.

app -[1]-> wsp server -[2](WebSocket)-> wsp client -> internal API

And, these flow are divided into three parts to explain it.

  1. Receive requests to be proxied ([1] in the relay flow)
  2. Pool the WebSocket connection on the server for relaying
  3. Relay TCP connection to the peer WebSocket ([2] in the relay flow)

I explained the 1st and 2nd flow in part 3, so let's move on the 3rd flow.

Let's take a look at the continuation of the previous HTTP handler code, which waits the request from "app" to the endpoint /requests/.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {    // (omit): [1]: Receive requests to be proxied    // [2]: Take a WebSocket connection available from pools for relaying received requests.    request := NewConnectionRequest(s.Config.GetTimeout())    // "Dispatcher" is running in a separate thread from the server by `go s.dispatchConnections()`.    // It waits to receive requests to dispatch connection from available pools to clients requests.    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L93    //    // Notify request from handler to dispatcher through Server.dispatcher channel.    s.dispatcher <- request    // Dispatcher tries to find an available connection pool,    // and it returns the connection through Server.connection channel.    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L189    //    // Here waiting for a result from dispatcher.    connection := <-request.connection    if connection == nil {        // It means that dispatcher has set `nil` which is a system error case that is        // not expected in the normal flow.        wsp.ProxyErrorf(w, "Unable to get a proxy connection")        return    }    // [3]: Send the request to the peer through the WebSocket connection.    if err := connection.proxyRequest(w, r); err != nil {        // An error occurred throw the connection away        log.Println(err)        connection.Close()        // Try to return an error to the client        // This might fail if response headers have already been sent        wsp.ProxyError(w, err)    }}

In brief, the following process is performed.

  • Take a WebSocket connection available from pools for relaying received requests
  • Send the request to the peer through the WebSocket connection

I'll explain the one by one.

Take a WebSocket connection available from pools for relaying received requests

As a pre-requisite, we will start with WebSocket connection already established with the WebSocket client (wsp_client) and held by the server as a pool (Chapter 2).

func (s *Server) Register(w http.ResponseWriter, r *http.Request) {    // - 1. Upgrade a received HTTP request to a WebSocket connection    // (omit)    // - 2. Wait a greeting message from the peer and parse it    // (omit)    // 3. Register the connection into server pools.    // (omit)    // Add the WebSocket connection to the pool    pool.Register(ws)}

There are several possible designs for retrieving the connection from the pools, and I'll explain the pattern of using multiple threads. Specifically, there are two threads running: "Server", which accepts http requests, and "Dispatcher", which dispatches connections from the pools to be used to relay. Here is the Go code to start "Server" and "Dispatcher" that will be called from the main function.

func (s *Server) Start() {    // (omit)...    // start the "Dispatcher"    go s.dispatchConnections()    // start the "Server"    s.server = &http.Server{        Addr:    s.Config.GetAddr(),        Handler: r,    }    go func() { log.Fatal(s.server.ListenAndServe()) }()}

The go statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same the address space.

A diagram describing two threads in wsp_server

Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.

type Server struct {    // (omit)    // Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.    // "server" thread sends the value to this channel when accepting requests in the endpoint /requests,     // and "dispatcher" thread reads this channel.    dispatcher chan *ConnectionRequest}func (s *Server) dispatchConnections() {    for {        //  The operator <- is "receive operator", which expression blocks until a value is available.        request, ok := <-s.dispatcher        if !ok {            // The value of `ok` is false if it is a zero value generated because the channel is closed an empty.            // In this case, that means server shutdowns.           break        }        for {            // (omit)...            // Verify that we can use this connection            if connection.Take() {                request.connection <- connection                break            }        }        // (omit)...    }}

The type of the field dispatcher in the Server structure is channel. Channel types provide a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.

A diagram describing how dispatcher channel is used

for {    request, ok := <-s.dispatcher    if !ok {        break    }}

The operator x, ok := <-ch is receiver operator. The value of the receive operation <-s.dispatcher is the value received from the channel s.dispatcher. Also, the expression <- blocks until a value is available, so waits until a connection request is sent by "Server" thread.

The value of ok is false if it is a zero value generated because the channel is closed or empty. In this case, that means server shutdowns.

A diagram describing how dispatcher channel is used

On the other hand, the /request/ handler on the "Server" tread sends a value to this channel.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {    // (omit): [1]: Receive requests to be proxied    // Here! Sends a value to dispatcher channel    request := NewConnectionRequest(s.Config.GetTimeout())    s.dispatcher <- request    connection := <-request.connection    if connection == nil {        wsp.ProxyErrorf(w, "Unable to get a proxy connection")        return    }}

The operator ch <- x is send statements, which sends a value on a channel. Here it sends the request to the dispatcher channel.

By the way, the type of variable request is ConnectionRequest.

type ConnectionRequest struct {    connection chan *Connection}

After sending the value to the s.dispatcher channel, it waits to be available to get the value in the request.connection channel.

A diagram describing how connection channel is used

func (s *Server) request(w http.ResponseWriter, r *http.Request) {    // (omit)    s.dispatcher <- request    // Here!    connection := <-request.connection    if connection == nil {        wsp.ProxyErrorf(w, "Unable to get a proxy connection")        return    }}

Next, let's look at the "Dispatcher" thread.

To summarize, "Server" thread sends the value to this channel when accepting requests in the endpoint /requests, and "dispatcher" thread reads this channel.

Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.

type Server struct {    // (omit)    // Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.    // "server" thread sends the value to this channel when accepting requests in the endpoint /requests,     // and "dispatcher" thread reads this channel.    dispatcher chan *ConnectionRequest}func (s *Server) dispatchConnections() {    for {        // Runs in an infinite loop and keeps receiving the value from the `server.dispatcher` channel        // The operator <- is "receive operator", which expression blocks until a value is available.        request, ok := <-s.dispatcher        if !ok {            // The value of `ok` is false if it is a zero value generated because the channel is closed an empty.            // In this case, that means server shutdowns.            break        }        // A timeout is set for each dispatch request.        ctx := context.Background()        ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())        defer cancel()    L:        for {            select {            case <-ctx.Done(): // The timeout elapses                break L            default: // Go through            }            s.lock.RLock()            if len(s.pools) == 0 {                // No connection pool available                s.lock.RUnlock()                break            }            // [1]: Select a pool which has an idle connection            // Build a select statement dynamically to handle an arbitrary number of pools.            cases := make([]reflect.SelectCase, len(s.pools)+1)            for i, ch := range s.pools {                cases[i] = reflect.SelectCase{                    Dir:  reflect.SelectRecv,                    Chan: reflect.ValueOf(ch.idle)}            }            cases[len(cases)-1] = reflect.SelectCase{                Dir: reflect.SelectDefault}            s.lock.RUnlock()            _, value, ok := reflect.Select(cases)            if !ok {                continue // a pool has been removed, try again            }            connection, _ := value.Interface().(*Connection)            // [2]: Verify that we can use this connection and take it.            if connection.Take() {                request.connection <- connection                break            }        }        close(request.connection)    }}

First, dispatchConnections runs in an infinite loop and keeps receiving the value from the server.dispatcher channel.

for {    request, ok := <-s.dispatcher    // ...}

The next step is to set the timeout, if no idle connection is obtained after a predetermined time, the channel will be closed.

// A timeout is set for each dispatch request.ctx := context.Background()ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())defer cancel()L:    for {        select {        case <-ctx.Done(): // The timeout elapses            break L        default: // Go through        }        // ...    }    close(request.connection)

When the channel is closed, a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received (See The Go Programming Language Specification#Receive operator more detail). In this case, the "Server" thread waits to receive the value, and will get nil from request.connection channel, so the "Server" will return the error response.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {    // (omit)    s.dispatcher <- request    // Here!    connection := <-request.connection    if connection == nil {        wsp.ProxyErrorf(w, "Unable to get a proxy connection")        return    }}

Also, if you want to know more about timeout using the context package, please refer to the following post.

Then, Select a pool which has an idle connection by building a select statement dynamically to handle an arbitrary number of pools.

cases := make([]reflect.SelectCase, len(s.pools)+1)for i, ch := range s.pools {    cases[i] = reflect.SelectCase{        Dir:  reflect.SelectRecv,        Chan: reflect.ValueOf(ch.idle)}}cases[len(cases)-1] = reflect.SelectCase{    Dir: reflect.SelectDefault}s.lock.RUnlock()_, value, ok := reflect.Select(cases)if !ok {    continue // a pool has been removed, try again}connection, _ := value.Interface().(*Connection)

reflect.Select allows us to receive a variable number of channels. See the following post for more information.

Lastly, Verify that we can use this connection and take it.

if connection.Take() {    request.connection <- connection    break}

connection.Take verify the status of connection whether it's available one or not, then if it's available one, mark it busy.

A diagram describing how connection channel is used

That's it to take a WebSocket connection available from pools for relaying received requests.

Conclusion

Following part 3, I explained how to relay TCP connection from "App" to the peer of WebSocket. Especially, I focused on the way to take a WebSocket connection available from pools for relaying received requests.

In part 5, I'll explain how to send the request to the peer through the WebSocket connection.


Original Link: https://dev.to/hgsgtk/reverse-http-proxy-over-websocket-in-go-part-4-3g7c

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To