Not so simple UDP proxy

golangperformancemonitoring

Context #

One of my project was migrating the whole monitoring system from DataDog to Cortex (a distribution of Prometheus). Most of our services are using DogStatsD/StatsD client to send the metrics to DataDog agent on the host, so the process required a dual-write mechanism which we have chose the plain simple approach of this udp-proxy code of Akagi201. During the go-to production period, we encountered a several issues and had to extend the code.

So it was something like this:

                                  ┌───────────────┐        ┌─────────────┐
                                ┌─► DataDog agent ├────────► DataDog app │
                                │ └───────────────┘        └─────────────┘
┌─────────────┐   ┌─────────────┤
│ App service ├───►  udp-proxy  │
└─────────────┘   └─────────────┤ ┌───────────────┐        ┌────────┐
                                └─► OTel collector├────────► Cortex │
                                  └───────────────┘        └────────┘

Windows build #

We have some Windows hosts that running our Windows-native service. Our issue with the original udp-proxy was that Windows OS controls services by setting up callbacks very different from the other OS systems. So we need to make the service to provide the API that could answer Windows probably despite the substantial differences. So I found this service wrapper: https://github.com/kardianos/service which supports to detect how a program is called, from an interactive terminal or from a service manager, also providing the necessary API mentioned.

So I have this wrapped around the original service:

package main

import (
log "github.com/sirupsen/logrus"

"github.com/kardianos/service"
)

var logger service.Logger

type program struct{}

func (p *program) Start(s service.Service) error {
go p.run()
return nil
}

func (p *program) run() {
// udp-proxy code here
}

func (p *program) Stop(s service.Service) error {
return nil
}

func main() {
svcConfig := &service.Config{
Name: "UDPService",
DisplayName: "UDP Service",
Description: "UDP Proxy for dual-write metric",
}

prg := &program{}
s, err := service.New(prg, svcConfig)
if err != nil {
log.Fatal(err)
}

err = s.Run()
if err != nil {
logger.Error(err)
}
}

This one fits perfect for single build pipeline into binaries for Linux and Windows

go mod download
GOOS=windows GOARCH=386 go build -o build/udp-proxy-windows-$(TAG).exe .
GOOS=linux GOARCH=amd64 go build -o build/udp-proxy-amd64-linux-$(TAG) .

Alternative 0: we might have go with Windows container approach, but some of our Windows servers is really old, that's why we had to go with a more native Windows service approach.

Alternative 1: Non-sucking service manager - https://nssm.cc/ - is a great one. If the first one didn't work, we might have ported the Puppet module for nssm to configure the original build as a Windows service managed by nssm and call it a day.

Critical performance issue #

For QA and Staging envs, we have a small load test for the udp-proxy and it has passed, therefore we were so confident to roll this out on production. After the roll out, we started seeing metrics being dropped, the throughput has been dropped from 14-15 million of records down to 1-2 million per metric entry.

We gathered a few things on 1 machine:

Next step, I started implementing a new load test so that it could cause the state above with JMeter, the load test is kinda simple, just looping with no sleeping time between each loop with the help of this plugin: https://jmeter-plugins.org/wiki/UDPRequest/

Boom, with a right amount of load, we could prove reproduce the case and benchmark our resource usage, then detect the bottleneck was from the following lines:

for {
b := make([]byte, opts.Buffer)
n, addr, err := sourceConn.ReadFromUDP(b)

if err != nil {
log.WithError(err).Error("Could not receive a packet")
continue
}

log.WithField("addr", addr.String()).WithField("bytes", n).WithField("content", string(b)).Info("Packet received")
for _, v := range targetConn {
if _, err := v.Write(b[0:n]); err != nil {
log.WithError(err).Warn("Could not forward packet.")
}
}
}

It's a simple for loop, read the packet then simply write it to target connections. the line v.Write here will block the next loop since this is running in single thread.

So ...

go solution

Leverage the best use of Go, we started refactoring and improving the code a bit, adding MetricWriter so it could handle each target connections separately:

type MetricPacket struct {
buffer []byte
n int
}

type MetricWriter struct {
num int
targetAddr *net.UDPAddr
packetsChannel chan MetricPacket
}

func (v *MetricWriter) start() {
// Spawn more goroutines in a controlled way to paralelize work and increase the read throughput
}

So now the for loop for reading packet is simplified to [read packet -> put into packet channels -> next loop]:

for {
b := make([]byte, opts.Buffer)

n, _, err := sourceConn.ReadFromUDP(b)

if err != nil {
log.WithError(err).Error("Could not receive a packet")
continue
}

for _, v := range packetsChannels {
packet := MetricPacket{buffer: b, n: n}
v <- packet
}
}

Add a flag to limit the size of the queuing channels in the opts struct:

var opts struct {
Source string `long:"source" default:":2203" description:"Source port to listen on"`
Target []string `long:"target" description:"Target address to forward to"`
LogLevel string `long:"log-level" description:"Log Level to use. debug,info, warn, error, fatal"`
Buffer int `long:"buffer" default:"5120" description:"max buffer size for the socket io"`
ChannelSize int `long:"channel-size" default:"100" description:"Set the total size of channels each writers have"`
}

For the Writers, we update the code to leverage the go routines features and with that, we solved the main read-blocking issue from the write. The new build passed the new load test and also solve the bottleneck on production when we rolled it out.

Some other improvements #

Lesson learned and we need to prepare for these kind of situtations in the near future, so I added a few improvements (yes, more complicated):

Why didn't you #