mirror of
https://github.com/folbricht/routedns.git
synced 2026-02-12 03:58:55 -06:00
New request-dedup group (#181)
* New request-dedup group * Add logging * Update docs and add example
This commit is contained in:
19
cmd/routedns/example-config/request-dedup.toml
Normal file
19
cmd/routedns/example-config/request-dedup.toml
Normal file
@@ -0,0 +1,19 @@
|
||||
# This configration prevents duplicate queries from being sent upstream. All
|
||||
# duplicate requests get the same answer (that of the first).
|
||||
|
||||
[listeners.local-udp]
|
||||
address = "127.0.0.1:53"
|
||||
protocol = "udp"
|
||||
resolver = "cache"
|
||||
|
||||
[groups.cache]
|
||||
type = "cache"
|
||||
resolvers = ["dedup"]
|
||||
|
||||
[groups.dedup]
|
||||
type = "request-dedup"
|
||||
resolvers = ["cloudflare-udp"]
|
||||
|
||||
[resolvers.cloudflare-udp]
|
||||
address = "1.1.1.1:53"
|
||||
protocol = "udp"
|
||||
@@ -401,6 +401,11 @@ func instantiateGroup(id string, g group, resolvers map[string]rdns.Resolver) er
|
||||
}
|
||||
opt := rdns.TruncateRetryOptions{}
|
||||
resolvers[id] = rdns.NewTruncateRetry(id, gr[0], retryResolver, opt)
|
||||
case "request-dedup":
|
||||
if len(gr) != 1 {
|
||||
return fmt.Errorf("type request-dedup only supports one resolver in '%s'", id)
|
||||
}
|
||||
resolvers[id] = rdns.NewRequestDedup(id, gr[0])
|
||||
case "fastest-tcp":
|
||||
if len(gr) != 1 {
|
||||
return fmt.Errorf("type fastest-tcp only supports one resolver in '%s'", id)
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
- [Rate Limiter](#Rate-Limiter)
|
||||
- [Fastest TCP Probe](#Fastest-TCP-Probe)
|
||||
- [Retrying Truncated Responses](#Retrying-Truncated-Responses)
|
||||
- [Request Deduplication](#Request-Deduplication)
|
||||
- [Resolvers](#Resolvers)
|
||||
- [Plain DNS](#Plain-DNS-Resolver)
|
||||
- [DNS-over-TLS](#DNS-over-TLS-Resolver)
|
||||
@@ -1224,6 +1225,41 @@ resolver = "cache"
|
||||
|
||||
Example config files: [truncate-retry.toml](../cmd/routedns/example-config/truncate-retry.toml)
|
||||
|
||||
### Request Deduplication
|
||||
|
||||
The `request-dedup` element passed individual queries to its upstream resolver. While the first query is being processed, further queries for the same name will be blocked. Once the first query has been answered, all waiting queries are completed with the same answer. This element can be used to reduce load on upstream servers when queried by clients sending the same query multiple times.
|
||||
|
||||
#### Configuration
|
||||
|
||||
To deduplicate queries, add an element with `type = "request-dedup"` in the groups section of the configuration.
|
||||
|
||||
Options:
|
||||
|
||||
- `resolvers` - Array of upstream resolvers, only one is supported.
|
||||
|
||||
Examples:
|
||||
|
||||
```toml
|
||||
[listeners.local-udp]
|
||||
address = "127.0.0.1:53"
|
||||
protocol = "udp"
|
||||
resolver = "cache"
|
||||
|
||||
[groups.cache]
|
||||
type = "cache"
|
||||
resolvers = ["dedup"]
|
||||
|
||||
[groups.dedup]
|
||||
type = "request-dedup"
|
||||
resolvers = ["cloudflare-udp"]
|
||||
|
||||
[resolvers.cloudflare-udp]
|
||||
address = "1.1.1.1:53"
|
||||
protocol = "udp"
|
||||
```
|
||||
|
||||
Example config files: [request-dedup.toml](../cmd/routedns/example-config/request-dedup.toml)
|
||||
|
||||
## Resolvers
|
||||
|
||||
Resolvers forward queries to other DNS servers over the network and typically represent the end of one or many processing pipelines. Resolvers encode every query that is passed from listeners, modifiers, routers etc and send them to a DNS server without further processing. Like with other elements in the pipeline, resolvers requires a unique identifier to reference them from other elements. The following protocols are supported:
|
||||
|
||||
139
request-dedup.go
Normal file
139
request-dedup.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package rdns
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
type dedupKey struct {
|
||||
name string
|
||||
qtype uint16
|
||||
ecs_ipv4 uint32
|
||||
ecs_ipv6_hi uint64
|
||||
ecs_ipv6_lo uint64
|
||||
ecs_mask uint8
|
||||
}
|
||||
|
||||
type inflightRequest struct {
|
||||
answer *dns.Msg
|
||||
err error
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// requestDedup passes individual requests normally. Subsequent
|
||||
// queries for the same name are being held until the first query
|
||||
// returns. In that case, all waiting requests are answered with
|
||||
// the same response. This element is used to smooth out spikes
|
||||
// of queries for the same name.
|
||||
type requestDedup struct {
|
||||
id string
|
||||
resolver Resolver
|
||||
mu sync.Mutex
|
||||
inflight map[dedupKey]*inflightRequest
|
||||
}
|
||||
|
||||
var _ Resolver = &requestDedup{}
|
||||
|
||||
func NewRequestDedup(id string, resolver Resolver) *requestDedup {
|
||||
return &requestDedup{
|
||||
id: id,
|
||||
resolver: resolver,
|
||||
inflight: make(map[dedupKey]*inflightRequest),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *requestDedup) Resolve(q *dns.Msg, ci ClientInfo) (*dns.Msg, error) {
|
||||
var (
|
||||
ecsIPv4 uint32
|
||||
ecsIPv6Lo, ecsIPv6Hi uint64
|
||||
ecsMask uint8
|
||||
)
|
||||
|
||||
edns0 := q.IsEdns0()
|
||||
if edns0 != nil {
|
||||
// Find the ECS option
|
||||
for _, opt := range edns0.Option {
|
||||
ecs, ok := opt.(*dns.EDNS0_SUBNET)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
switch ecs.Family {
|
||||
case 1: // ip4
|
||||
ecsIPv4 = byteToUint32(ecs.Address.To4())
|
||||
ecsMask = ecs.SourceNetmask
|
||||
case 2: // ip6
|
||||
ecsIPv6Hi, ecsIPv6Lo = byteToUint128(ecs.Address.To16())
|
||||
ecsMask = ecs.SourceNetmask
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
k := dedupKey{
|
||||
name: q.Question[0].Name,
|
||||
qtype: q.Question[0].Qtype,
|
||||
ecs_ipv4: ecsIPv4,
|
||||
ecs_ipv6_hi: ecsIPv6Hi,
|
||||
ecs_ipv6_lo: ecsIPv6Lo,
|
||||
ecs_mask: ecsMask,
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
req, ok := r.inflight[k]
|
||||
if !ok {
|
||||
req = &inflightRequest{
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
r.inflight[k] = req
|
||||
}
|
||||
r.mu.Unlock()
|
||||
|
||||
log := logger(r.id, q, ci)
|
||||
// If the request is already in flight, wait for that to complete and
|
||||
// return the same answer.
|
||||
if ok {
|
||||
log.Debug("duplicated request, waiting for first answer")
|
||||
<-req.done
|
||||
a, err := req.answer, req.err
|
||||
// Return a copy of the answer as other elements might be modifying it
|
||||
if a != nil {
|
||||
a = a.Copy()
|
||||
}
|
||||
return a, err
|
||||
}
|
||||
log.WithField("resolver", r.resolver).Debug("forwarding query to resolver")
|
||||
|
||||
// Not already in flight, make the request
|
||||
a, err := r.resolver.Resolve(q, ci)
|
||||
req.answer = a
|
||||
req.err = err
|
||||
close(req.done) // release other goroutines waiting for the response
|
||||
|
||||
// No longer in flight
|
||||
r.mu.Lock()
|
||||
delete(r.inflight, k)
|
||||
r.mu.Unlock()
|
||||
|
||||
return a, err
|
||||
}
|
||||
|
||||
func (r *requestDedup) String() string {
|
||||
return r.id
|
||||
}
|
||||
|
||||
func byteToUint128(b []byte) (uint64, uint64) {
|
||||
if len(b) != 16 {
|
||||
return 0, 0
|
||||
}
|
||||
hi := binary.BigEndian.Uint64(b[0:8])
|
||||
lo := binary.BigEndian.Uint64(b[8:16])
|
||||
return hi, lo
|
||||
}
|
||||
|
||||
func byteToUint32(b []byte) uint32 {
|
||||
if len(b) != 4 {
|
||||
return 0
|
||||
}
|
||||
return binary.BigEndian.Uint32(b[0:4])
|
||||
}
|
||||
39
request-dedup_test.go
Normal file
39
request-dedup_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package rdns
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRequestDedup(t *testing.T) {
|
||||
var ci ClientInfo
|
||||
r := &TestResolver{
|
||||
ResolveFunc: func(*dns.Msg, ClientInfo) (*dns.Msg, error) {
|
||||
time.Sleep(time.Second) // need to slow down to guarantee duplicates
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
g := NewRequestDedup("test-dedup", r)
|
||||
q := new(dns.Msg)
|
||||
q.SetQuestion("example.com.", dns.TypeA)
|
||||
|
||||
// Send a batch of queries
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := g.Resolve(q, ci)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Only one request should have hit the resolver
|
||||
require.Equal(t, 1, r.HitCount())
|
||||
}
|
||||
Reference in New Issue
Block a user