Skip to content

Commit

Permalink
Implement active TCP candidate type (RFC6544)
Browse files Browse the repository at this point in the history
By default TCP candidate type priority is UDP one minus 27 (except
relay), so that UDP+srlfx priority > TCP+host priority. That priority
offset can be configured using AgentConfig.
IPv6 TCP candidates are also supported.
  • Loading branch information
ashellunts authored May 15, 2023
1 parent 898746c commit 1d502ca
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 66 deletions.
51 changes: 50 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -72,6 +73,8 @@ type Agent struct {
prflxAcceptanceMinWait time.Duration
relayAcceptanceMinWait time.Duration

tcpPriorityOffset uint16

portMin uint16
portMax uint16

Expand Down Expand Up @@ -585,6 +588,44 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair {
}

func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypeActive {
return nil
}

if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive {
addressToConnect := net.JoinHostPort(remote.Address(), strconv.Itoa(remote.Port()))

conn, err := a.net.Dial("tcp", addressToConnect)
if err != nil {
a.log.Errorf("Failed to dial TCP address %s: %v", addressToConnect, err)
return nil
}

packetConn := newTCPPacketConn(tcpPacketParams{
ReadBuffer: tcpReadBufferSize,
LocalAddr: conn.LocalAddr(),
Logger: a.log,
})

if err = packetConn.AddConn(conn, nil); err != nil {
a.log.Errorf("Failed to add TCP connection: %v", err)
return nil
}

localAddress, ok := conn.LocalAddr().(*net.TCPAddr)
if !ok {
a.log.Errorf("Failed to cast local address to TCP address")
return nil
}

localCandidateHost, ok := local.(*CandidateHost)
if !ok {
a.log.Errorf("Failed to cast local candidate to CandidateHost")
return nil
}
localCandidateHost.port = localAddress.Port // this causes a data race with candidateBase.Port()
local.start(a, packetConn, a.startedCh)
}
p := newCandidatePair(local, remote, a.isControlling)
a.checklist = append(a.checklist, p)
return p
Expand Down Expand Up @@ -761,7 +802,9 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net
}
}

c.start(a, candidateConn, a.startedCh)
if c.TCPType() != TCPTypeActive {
c.start(a, candidateConn, a.startedCh)
}

set = append(set, c)
a.localCandidates[c.NetworkType()] = set
Expand Down Expand Up @@ -1029,13 +1072,19 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr)
return
}

remoteTCPType := TCPTypeUnspecified
if local.TCPType() == TCPTypePassive {
remoteTCPType = TCPTypeActive
}

prflxCandidateConfig := CandidatePeerReflexiveConfig{
Network: networkType.String(),
Address: ip.String(),
Port: port,
Component: local.Component(),
RelAddr: "",
RelPort: 0,
TCPType: remoteTCPType,
}

prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
Expand Down
162 changes: 162 additions & 0 deletions agent_active_tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build !js
// +build !js

package ice

import (
"net"
"testing"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v2/stdnet"
"github.com/pion/transport/v2/test"
"github.com/stretchr/testify/require"
)

func getLocalIPAddress(t *testing.T, networkType NetworkType) net.IP {
net, err := stdnet.NewNet()
require.NoError(t, err)
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{networkType}, false)
require.NoError(t, err)
require.NotEmpty(t, localIPs)
return localIPs[0]
}

func ipv6Available(t *testing.T) bool {
net, err := stdnet.NewNet()
require.NoError(t, err)
localIPs, err := localInterfaces(net, nil, nil, []NetworkType{NetworkTypeTCP6}, false)
require.NoError(t, err)
return len(localIPs) > 0
}

func TestAgentActiveTCP(t *testing.T) {
report := test.CheckRoutines(t)
defer report()

lim := test.TimeOut(time.Second * 5)
defer lim.Stop()

const listenPort = 7686
type testCase struct {
name string
networkTypes []NetworkType
listenIPAddress net.IP
selectedPairNetworkType string
}
testCases := []testCase{
{
name: "TCP4 connection",
networkTypes: []NetworkType{NetworkTypeTCP4},
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
selectedPairNetworkType: tcp,
},
{
name: "UDP is preferred over TCP4", // fails some time
networkTypes: supportedNetworkTypes(),
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP4),
selectedPairNetworkType: udp,
},
}

if ipv6Available(t) {
tcpv6Cases := []testCase{
{
name: "TCP6 connection",
networkTypes: []NetworkType{NetworkTypeTCP6},
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
selectedPairNetworkType: tcp,
},
{
name: "UDP is preferred over TCP6", // fails some time
networkTypes: supportedNetworkTypes(),
listenIPAddress: getLocalIPAddress(t, NetworkTypeTCP6),
selectedPairNetworkType: udp,
},
}
testCases = append(testCases, tcpv6Cases...)
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
r := require.New(t)

listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: testCase.listenIPAddress,
Port: listenPort,
})
r.NoError(err)
defer func() {
_ = listener.Close()
}()

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

tcpMux := NewTCPMuxDefault(TCPMuxParams{
Listener: listener,
Logger: loggerFactory.NewLogger("passive-ice-tcp-mux"),
ReadBufferSize: 20,
})

defer func() {
_ = tcpMux.Close()
}()

r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil")

hostAcceptanceMinWait := 100 * time.Millisecond
passiveAgent, err := NewAgent(&AgentConfig{
TCPMux: tcpMux,
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: testCase.networkTypes,
LoggerFactory: loggerFactory,
IncludeLoopback: true,
HostAcceptanceMinWait: &hostAcceptanceMinWait,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: testCase.networkTypes,
LoggerFactory: loggerFactory,
HostAcceptanceMinWait: &hostAcceptanceMinWait,
})
r.NoError(err)
r.NotNil(activeAgent)

passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)

pair := passiveAgent.getSelectedPair()
r.NotNil(pair)
r.Equal(testCase.selectedPairNetworkType, pair.Local.NetworkType().NetworkShort())

foo := []byte("foo")
_, err = passiveAgentConn.Write(foo)
r.NoError(err)

buffer := make([]byte, 1024)
n, err := activeAgenConn.Read(buffer)
r.NoError(err)
r.Equal(foo, buffer[:n])

bar := []byte("bar")
_, err = activeAgenConn.Write(bar)
r.NoError(err)

n, err = passiveAgentConn.Read(buffer)
r.NoError(err)
r.Equal(bar, buffer[:n])

r.NoError(activeAgenConn.Close())
r.NoError(passiveAgentConn.Close())
})
}
}
19 changes: 19 additions & 0 deletions agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,18 @@ const (
// defaultMaxBindingRequests is the maximum number of binding requests before considering a pair failed
defaultMaxBindingRequests = 7

// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
// for host, srflx and prfx candidate types.
defaultTCPPriorityOffset = 27

// maxBufferSize is the number of bytes that can be buffered before we start to error
maxBufferSize = 1000 * 1000 // 1MB

// maxBindingRequestTimeout is the wait time before binding requests can be deleted
maxBindingRequestTimeout = 4000 * time.Millisecond

// tcpReadBufferSize is the size of the read buffer of tcpPacketConn used by active tcp candidate
tcpReadBufferSize = 8
)

func defaultCandidateTypes() []CandidateType {
Expand Down Expand Up @@ -174,6 +181,12 @@ type AgentConfig struct {

// Include loopback addresses in the candidate list.
IncludeLoopback bool

// TCPPriorityOffset is a number which is subtracted from the default (UDP) candidate type preference
// for host, srflx and prfx candidate types. It helps to configure relative preference of UDP candidates
// against TCP ones. Relay candidates for TCP and UDP are always 0 and not affected by this setting.
// When this is nil, defaultTCPPriorityOffset is used.
TCPPriorityOffset *uint16
}

// initWithDefaults populates an agent and falls back to defaults if fields are unset
Expand Down Expand Up @@ -208,6 +221,12 @@ func (config *AgentConfig) initWithDefaults(a *Agent) {
a.relayAcceptanceMinWait = *config.RelayAcceptanceMinWait
}

if config.TCPPriorityOffset == nil {
a.tcpPriorityOffset = defaultTCPPriorityOffset
} else {
a.tcpPriorityOffset = *config.TCPPriorityOffset
}

if config.DisconnectedTimeout == nil {
a.disconnectedTimeout = defaultDisconnectedTimeout
} else {
Expand Down
4 changes: 2 additions & 2 deletions agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {

KeepaliveInterval := time.Hour
cfg0 := &AgentConfig{
NetworkTypes: supportedNetworkTypes(),
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
MulticastDNSMode: MulticastDNSModeDisabled,
Net: net0,

Expand All @@ -1652,7 +1652,7 @@ func TestAcceptAggressiveNomination(t *testing.T) {
require.NoError(t, aAgent.OnConnectionStateChange(aNotifier))

cfg1 := &AgentConfig{
NetworkTypes: supportedNetworkTypes(),
NetworkTypes: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
MulticastDNSMode: MulticastDNSModeDisabled,
Net: net1,
KeepaliveInterval: &KeepaliveInterval,
Expand Down
14 changes: 10 additions & 4 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
copy(m.Raw, buf)

if err := m.Decode(); err != nil {
a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", c.addr(), srcAddr, err)
a.log.Warnf("Failed to handle decode ICE from %s to %s: %v", srcAddr, c.addr(), err)
return
}

Expand All @@ -279,7 +279,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
if !c.validateSTUNTrafficCache(srcAddr) {
remoteCandidate, valid := a.validateNonSTUNTraffic(c, srcAddr) //nolint:contextcheck
if !valid {
a.log.Warnf("Discarded message from %s, not a valid remote candidate", c.addr())
a.log.Warnf("Discarded message to %s, not a valid remote candidate", c.addr())
return
}
c.addRemoteCandidateCache(remoteCandidate, srcAddr)
Expand Down Expand Up @@ -355,7 +355,13 @@ func (c *candidateBase) Priority() uint32 {
// candidates for a particular component for a particular data stream
// that have the same type, the local preference MUST be unique for each
// one.
return (1<<24)*uint32(c.Type().Preference()) +

var tcpPriorityOffset uint16 = defaultTCPPriorityOffset
if c.agent() != nil {
tcpPriorityOffset = c.agent().tcpPriorityOffset
}

return (1<<24)*uint32(c.Type().Preference(c.networkType, tcpPriorityOffset)) +
(1<<8)*uint32(c.LocalPreference()) +
uint32(256-c.Component())
}
Expand Down Expand Up @@ -533,7 +539,7 @@ func UnmarshalCandidate(raw string) (Candidate, error) {
case "srflx":
return NewCandidateServerReflexive(&CandidateServerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
case "prflx":
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort})
return NewCandidatePeerReflexive(&CandidatePeerReflexiveConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, tcpType})
case "relay":
return NewCandidateRelay(&CandidateRelayConfig{"", protocol, address, port, component, priority, foundation, relatedAddress, relatedPort, "", nil})
default:
Expand Down
2 changes: 2 additions & 0 deletions candidate_peer_reflexive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type CandidatePeerReflexiveConfig struct {
Foundation string
RelAddr string
RelPort int
TCPType TCPType
}

// NewCandidatePeerReflexive creates a new peer reflective candidate
Expand All @@ -49,6 +50,7 @@ func NewCandidatePeerReflexive(config *CandidatePeerReflexiveConfig) (*Candidate
id: candidateID,
networkType: networkType,
candidateType: CandidateTypePeerReflexive,
tcpType: config.TCPType,
address: config.Address,
port: config.Port,
resolvedAddr: createAddr(networkType, ip, config.Port),
Expand Down
Loading

0 comments on commit 1d502ca

Please sign in to comment.