Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the rule handle after flush #88

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (cc *Conn) AddChain(c *Chain) *Chain {
{Type: unix.NFTA_CHAIN_TYPE, Data: []byte(c.Type + "\x00")},
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWCHAIN),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand All @@ -144,7 +144,7 @@ func (cc *Conn) DelChain(c *Chain) {
{Type: unix.NFTA_CHAIN_NAME, Data: []byte(c.Name + "\x00")},
})

cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELCHAIN),
Flags: netlink.Request | netlink.Acknowledge,
Expand All @@ -162,7 +162,7 @@ func (cc *Conn) FlushChain(c *Chain) {
{Type: unix.NFTA_RULE_TABLE, Data: []byte(c.Table.Name + "\x00")},
{Type: unix.NFTA_RULE_CHAIN, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,
Expand Down
133 changes: 114 additions & 19 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ package nftables
import (
"fmt"
"sync"
"sync/atomic"

"github.com/google/nftables/expr"
"github.com/mdlayher/netlink"
"github.com/mdlayher/netlink/nltest"
"golang.org/x/sys/unix"
)

type Entity interface {
HandleResponse(netlink.Message)
}

// A Conn represents a netlink connection of the nftables family.
//
// All methods return their input, so that variables can be defined from string
Expand All @@ -34,7 +39,10 @@ type Conn struct {
TestDial nltest.Func // for testing only; passed to nltest.Dial
NetNS int // Network namespace netlink will interact with.
sync.Mutex
put sync.Mutex
alexispires marked this conversation as resolved.
Show resolved Hide resolved
messages []netlink.Message
entities map[int32]Entity
it int32
alexispires marked this conversation as resolved.
Show resolved Hide resolved
err error
}

Expand All @@ -43,6 +51,8 @@ func (cc *Conn) Flush() error {
cc.Lock()
defer func() {
cc.messages = nil
cc.entities = nil
cc.it = 0
cc.Unlock()
}()
if len(cc.messages) == 0 {
Expand All @@ -59,23 +69,109 @@ func (cc *Conn) Flush() error {

defer conn.Close()

if _, err := conn.SendMessages(batch(cc.messages)); err != nil {
cc.endBatch(cc.messages)

_, err = conn.SendMessages(cc.messages[:cc.it+1])
alexispires marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return fmt.Errorf("SendMessages: %w", err)
}

if _, err := conn.Receive(); err != nil {
return fmt.Errorf("Receive: %w", err)
// Retrieving of seq number associated to entities
entitiesBySeq := make(map[uint32]Entity)
for i, e := range cc.entities {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the separate entites book-keeping, why not just iterate over cc.messages and check if the type implements HandleResponse?

for _, msg := range cc.messages {
  m, ok := msg.(interface { HandleResponse(netlink.Message) })
  if !ok {
    continue
  }
  m.HandleResponse(rmsg)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandleResponse is not a method that belong to netlink.Message. It's belong to entity.
For a given entity, it's perform some operations on it by using the netlink response. We have to connect an entity (like a Rule) and a netlink message in case of the slice of responses is unordered. I connect them with the netlink sequence.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I’m still not particularly happy about having to manage messages and entities, which are separate but connected.

Instead of the current approach, we could make messages take an interface type which has a NetlinkMessage method. putMessages would then wrap a netlink.Message in a type which implements NetlinkMessage, and can optionally implement HandleResponse as well.

Copy link
Contributor Author

@alexispires alexispires Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stapelberg I'm not sure to understand what do you have in mind but the main limit I have is that the SendMessages method of netlink library take a slice of netlink.Message and not a slice of netlink.Message's pointer. So the only data struct that give me the netlink sequence number is the slice of netlink.Message returned by sendMessages. So the slice of netlink.Message have to be connected in one way or another to something else. In your solution I have to connect the type that wrap netlink.Message with the netlink.Message that contains the seq even if both are the same netlink message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m a little confused now: you’re saying the sequence numbers are only set in the []netlink.Message that’s returned by (*netlink.Conn).SendMessages, but in your PR, you’re discarding that result:

if _, err = conn.SendMessages(cc.messages); err != nil {

Is that a bug in your PR, or am I misunderstanding?

(Independently, related to your question: the fact that the netlink package takes a []netlink.Message instead of []*netlink.Message doesn’t limit us in which data structures we want to use. If need be, we can do a copy from one to the other. A netlink.Message’s Header is just a few ints, and copying the Body byte slice won’t need to copy its contents.)

entitiesBySeq[cc.messages[i].Header.Sequence] = e
}

// Trigger entities callback
msg, err := cc.checkReceive(conn)
alexispires marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call checkReceive instead of just calling conn.Receive? It’s not clear to me whether you’re blocking until the next message is available, or only reading as long as messages are available. If the latter, that doesn’t seem sound to me—instead, we should read blockingly until we can identify the end of the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to read as long as message are available. I'm not sure how to detect the end of the response without select.

Here an example of batch with echo flag:

sendmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=00000000}, msg_namelen=12, msg_iov=[{iov_base=[{{len=20, type=NFNL_MSG_BATCH_BEGIN, flags=NLM_F_REQUEST, seq=3, pid=0}, "\x00\x00\x0a\x00"}, {{len=36, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWTABLE, flags=NLM_F_REQUEST|NLM_F_ECHO, seq=4, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x08\x00\x02\x00\x00\x00\x00\x00"}, {{len=84, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=5, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x03\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x14\x00\x04\x80\x08\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x00\x00\x00\x00\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00"}, {{len=84, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=6, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x10\x00\x03\x00\x70\x6f\x73\x74\x72\x6f\x75\x74\x69\x6e\x67\x00\x14\x00\x04\x80\x08\x00\x01\x00\x00\x00\x00\x04\x08\x00\x02\x00\x00\x00\x00\x64\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00"}, {{len=96, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSET, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=7, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x03\x00\x00\x00\x00\x03\x08\x00\x04\x00\x00\x00\x00\x07\x08\x00\x05\x00\x00\x00\x00\x04\x08\x00\x0a\x00\x00\x00\x00\x04\x0c\x00\x09\x80\x08\x00\x01\x00\x00\x00\x00\x04\x0a\x00\x0d\x00\x00\x04\x02\x00\x00\x00\x00\x00"}, {{len=116, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_CREATE, seq=7, pid=0}, "\x02\x00\x00\x00\x0c\x00\x02\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x04\x00\x00\x00\x00\x04\x08\x00\x01\x00\x6e\x61\x74\x00\x44\x00\x03\x80\x10\x00\x01\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x54\x4d\x28\x84\x10\x00\x02\x80\x0c\x00\x01\x80\x08\x00\x01\x00\xb0\x1f\x35\x63\x10\x00\x03\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x51\x13\x60\x94\x10\x00\x04\x80\x0c\x00\x01\x80\x08\x00\x01\x00\x8a\x64\x3e\x08"}, {{len=168, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWRULE, flags=NLM_F_REQUEST|NLM_F_ECHO|NLM_F_EXCL|NLM_F_CREATE|NLM_F_APPEND, seq=8, pid=0}, "\x02\x00\x00\x00\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x02\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x7c\x00\x04\x80\x34\x00\x01\x80\x0c\x00\x01\x00\x70\x61\x79\x6c\x6f\x61\x64\x00\x24\x00\x02\x80\x08\x00\x01\x00\x00\x00\x00\x01\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x03\x00\x00\x00\x00\x0c\x08\x00\x04\x00\x00\x00\x00\x04\x30\x00\x01\x80\x0b\x00\x01\x00\x6c\x6f\x6f\x6b\x75\x70\x00\x00\x20\x00\x02\x80\x08\x00\x02\x00\x00\x00\x00\x01\x0c\x00\x01\x00\x5f\x5f\x73\x65\x74\x25\x64\x00\x08\x00\x04\x00\x00\x00\x00\x04\x14\x00\x01\x80\x0c\x00\x01\x00\x63\x6f\x75\x6e\x74\x65\x72\x00\x04\x00\x02\x80"}, {{len=20, type=NFNL_MSG_BATCH_END, flags=NLM_F_REQUEST, seq=9, pid=0}, "\x00\x00\x0a\x00"}], iov_len=624}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 624
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=104, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=0, seq=5, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x01\x0f\x00\x03\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x14\x00\x04\x00\x08\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x00\x00\x00\x00\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00\x08\x00\x06\x00\x00\x00\x00\x04"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 104
fstat(1, {st_dev=makedev(0, 22), st_ino=3, st_mode=S_IFCHR|0620, st_nlink=1, st_uid=1000, st_gid=5, st_blksize=1024, st_blocks=0, st_rdev=makedev(136, 0), st_atime=1579772816 /* 2020-01-23T10:46:56.597292734+0100 */, st_atime_nsec=597292734, st_mtime=1579772816 /* 2020-01-23T10:46:56.597292734+0100 */, st_mtime_nsec=597292734, st_ctime=1579772494 /* 2020-01-23T10:41:34.597292734+0100 */, st_ctime_nsec=597292734}) = 0
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=104, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWCHAIN, flags=0, seq=6, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10\x00\x03\x00\x70\x6f\x73\x74\x72\x6f\x75\x74\x69\x6e\x67\x00\x14\x00\x04\x00\x08\x00\x01\x00\x00\x00\x00\x04\x08\x00\x02\x00\x00\x00\x00\x64\x08\x00\x05\x00\x00\x00\x00\x01\x0b\x00\x07\x00\x66\x69\x6c\x74\x65\x72\x00\x00\x08\x00\x06\x00\x00\x00\x00\x00"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 104
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=100, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSET, flags=0, seq=7, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x0c\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x09\x08\x00\x03\x00\x00\x00\x00\x03\x08\x00\x04\x00\x00\x00\x00\x07\x08\x00\x05\x00\x00\x00\x00\x04\x0a\x00\x0d\x00\x00\x04\x02\x00\x00\x00\x00\x00\x0c\x00\x09\x00\x08\x00\x01\x00\x00\x00\x00\x04"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 100
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x54\x4d\x28\x84"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\xb0\x1f\x35\x63"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x51\x13\x60\x94"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=60, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWSETELEM, flags=0, seq=0, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0b\x00\x02\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x14\x00\x03\x00\x10\x00\x01\x00\x0c\x00\x01\x00\x08\x00\x01\x00\x8a\x64\x3e\x08"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 60
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 1 (in [3], left {tv_sec=0, tv_usec=0})
recvmsg(3, {msg_name={sa_family=AF_NETLINK, nl_pid=0, nl_groups=0x000040}, msg_namelen=12, msg_iov=[{iov_base={{len=204, type=NFNL_SUBSYS_NFTABLES<<8|NFT_MSG_NEWRULE, flags=0, seq=8, pid=3583}, "\x02\x00\x00\x07\x08\x00\x01\x00\x6e\x61\x74\x00\x0f\x00\x02\x00\x70\x72\x65\x72\x6f\x75\x74\x69\x6e\x67\x00\x00\x0c\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x0a\x94\x00\x04\x00\x34\x00\x01\x00\x0c\x00\x01\x00\x70\x61\x79\x6c\x6f\x61\x64\x00\x24\x00\x02\x00\x08\x00\x01\x00\x00\x00\x00\x01\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x03\x00\x00\x00\x00\x0c\x08\x00\x04\x00\x00\x00\x00\x04\x30\x00\x01\x00\x0b\x00\x01\x00\x6c\x6f\x6f\x6b\x75\x70\x00\x00\x20\x00\x02\x00\x0b\x00\x01\x00\x5f\x5f\x73\x65\x74\x33\x00\x00\x08\x00\x02\x00\x00\x00\x00\x01\x08\x00\x05\x00\x00\x00\x00\x00\x2c\x00\x01\x00\x0c\x00\x01\x00\x63\x6f\x75\x6e\x74\x65\x72\x00\x1c\x00\x02\x00\x0c\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00"}, iov_len=4096}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 204
select(4, [3], NULL, NULL, {tv_sec=0, tv_usec=0}) = 0 (Timeout)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about setting NLM_F_ACK or NLM_F_ECHO on the batch start and batch end messages? That way, you should be able to read until you encounter the sequence number for the batch end message.

for msg {
rmsg, err := conn.Receive()

alexispires marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("Receive: %w", err)
}

for _, msg := range rmsg {
if e, ok := entitiesBySeq[msg.Header.Sequence]; ok {
e.HandleResponse(msg)
}
}
msg, err = cc.checkReceive(conn)
}

return err
}

// PutMessage store netlink message to sent after
func (cc *Conn) PutMessage(msg netlink.Message) int32 {
alexispires marked this conversation as resolved.
Show resolved Hide resolved
cc.put.Lock()
defer cc.put.Unlock()

if cc.messages == nil {
cc.messages = make([]netlink.Message, 16)
cc.messages[0] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
}
}

i := atomic.AddInt32(&cc.it, 1)

if len(cc.messages) <= int(i) {
cc.messages = resize(cc.messages)
alexispires marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
cc.messages[i] = msg

return i
}

// PutEntity store entity to relate to netlink response
func (cc *Conn) PutEntity(i int32, e Entity) {
if cc.entities == nil {
cc.entities = make(map[int32]Entity)
}
cc.entities[i] = e
}

func (cc *Conn) checkReceive(c *netlink.Conn) (bool, error) {
if cc.TestDial != nil {
return false, nil
}

sc, err := c.SyscallConn()
alexispires marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return false, fmt.Errorf("SyscallConn error: %w", err)
}

var n int

sc.Control(func(fd uintptr) {
alexispires marked this conversation as resolved.
Show resolved Hide resolved
var fdSet unix.FdSet
fdSet.Zero()
fdSet.Set(int(fd))

n, err = unix.Select(int(fd)+1, &fdSet, nil, nil, &unix.Timeval{})
})

if err == nil && n > 0 {
return true, nil
}

return false, err
}

// FlushRuleset flushes the entire ruleset. See also
// https://wiki.nftables.org/wiki-nftables/index.php/Operations_at_ruleset_level
func (cc *Conn) FlushRuleset() {
cc.Lock()
defer cc.Unlock()
cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand Down Expand Up @@ -116,26 +212,25 @@ func (cc *Conn) marshalExpr(e expr.Any) []byte {
return b
}

func batch(messages []netlink.Message) []netlink.Message {
batch := []netlink.Message{
{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
},
}
func (cc *Conn) endBatch(messages []netlink.Message) {

i := atomic.AddInt32(&cc.it, 1)

batch = append(batch, messages...)
if len(cc.messages) <= int(i) {
cc.messages = resize(cc.messages)
}

batch = append(batch, netlink.Message{
cc.messages[i] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_END),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
})
}
}

return batch
func resize(messages []netlink.Message) []netlink.Message {
new := make([]netlink.Message, cap(messages)*2)
copy(new, messages)
return new
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.12

require (
github.com/koneu/natend v0.0.0-20150829182554-ec0926ea948d
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b
github.com/mdlayher/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc
golang.org/x/net v0.0.0-20191028085509-fe3aa8a45271 // indirect
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/koneu/natend v0.0.0-20150829182554-ec0926ea948d/go.mod h1:QHb4k4cr1fQ
github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA=
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b h1:W3er9pI7mt2gOqOWzwvx20iJ8Akiqz1mUMTxU6wdvl8=
github.com/mdlayher/netlink v0.0.0-20191009155606-de872b0d824b/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M=
github.com/mdlayher/netlink v1.0.0 h1:vySPY5Oxnn/8lxAPn2cK6kAzcZzYJl3KriSLO46OT18=
github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M=
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc h1:R83G5ikgLMxrBvLh22JhdfI8K6YXEPHx5P03Uu3DRs4=
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -25,4 +27,6 @@ golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c h1:S/FtSvpNLtFBgjTqcKsRpsa6a
golang.org/x/sys v0.0.0-20191029155521-f43be2a4598c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea h1:Mz1TMnfJDRJLk8S8OPCoJYgrsp/Se/2TBre2+vwX128=
golang.org/x/sys v0.0.0-20191113150313-8ad342257130 h1:+sdNBpwFF05NvMnEyGynbOs/Gr2LQwORWEPKXuEXxzU=
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632 h1:ateQkYCVYo8UwIBvoR3zj1Dh2K6Op/n3GxemXfB44/Y=
golang.org/x/sys v0.0.0-20200106114638-5f8ca72cd632/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
83 changes: 83 additions & 0 deletions nftables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"testing"

"github.com/google/nftables"
Expand Down Expand Up @@ -3980,3 +3981,85 @@ func TestStatelessNAT(t *testing.T) {
t.Fatal(err)
}
}

func TestIntegrationAddRule(t *testing.T) {

// Create a new network namespace to test these operations,
// and tear down the namespace at test completion.
c, newNS := openSystemNFTConn(t)
defer cleanupSystemNFTConn(t, newNS)
// Clear all rules at the beginning + end of the test.
c.FlushRuleset()
defer c.FlushRuleset()

filter := c.AddTable(&nftables.Table{
Family: nftables.TableFamilyIPv4,
Name: "filter",
})

chain := c.AddChain(&nftables.Chain{
Name: "chain",
Table: filter,
Type: nftables.ChainTypeFilter,
Hooknum: nftables.ChainHookPrerouting,
Priority: nftables.ChainPriorityFilter,
})

c.Flush()

execN := func(w int, n int) {

alexispires marked this conversation as resolved.
Show resolved Hide resolved
c := &nftables.Conn{NetNS: int(newNS)}

for i := 0; i < n; i++ {

r := c.AddRule(&nftables.Rule{
Table: filter,
Chain: chain,
UserData: []byte(fmt.Sprintf("%d-%d", w, i)),
Exprs: []expr.Any{
&expr.Verdict{
// [ immediate reg 0 drop ]
Kind: expr.VerdictDrop,
},
},
})

if r.Handle != 0 {
t.Fatalf("unexpected handle value at %d", i)
}

if err := c.Flush(); err != nil {
t.Fatal(err)
}

if r.Handle == 0 {
t.Fatalf("handle value is empty at %d", i)
}

rulesGetted, _ := c.GetRule(filter, chain)

for i, rg := range rulesGetted {
if bytes.Equal(rg.UserData, r.UserData) && rg.Handle != r.Handle {
t.Fatalf("mismatched handle at %d-%d, got: %d, want: %d", w, i, r.Handle, rg.Handle)
}
}
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test running many workers concurrently? In other words: why is not sufficient to test with 1 worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test the behaviors with concurrency as exprimed here: #88 (comment)
IMO it's safer to keep it to identify regression on concurent access. But it's not specific on this part of lib, I think concurrency have to be tested on the whole lib.

const (
workers = 16
iterations = 256
)

var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(n int) {
defer wg.Done()
execN(n, iterations)
}(i)
}

wg.Wait()
}
2 changes: 1 addition & 1 deletion obj.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cc *Conn) AddObj(o Obj) Obj {
return nil
}

cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWOBJ),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
Expand Down
18 changes: 15 additions & 3 deletions rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ func (cc *Conn) AddRule(r *Rule) *Rule {
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO | unix.NLM_F_APPEND
}

cc.messages = append(cc.messages, netlink.Message{
m := netlink.Message{
Header: netlink.Header{
Type: ruleHeaderType,
Flags: flags,
},
Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
})
}

i := cc.PutMessage(m)
cc.PutEntity(i, r)

return r
}
Expand All @@ -149,7 +152,7 @@ func (cc *Conn) DelRule(r *Rule) error {
})...)
flags := netlink.Request | netlink.Acknowledge

cc.messages = append(cc.messages, netlink.Message{
cc.PutMessage(netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: flags,
Expand All @@ -160,6 +163,15 @@ func (cc *Conn) DelRule(r *Rule) error {
return nil
}

// HandleResponse retrieves Handle in netlink response
func (r *Rule) HandleResponse(msg netlink.Message) {
rule, err := ruleFromMsg(msg)

alexispires marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
r.Handle = rule.Handle
alexispires marked this conversation as resolved.
Show resolved Hide resolved
}
}

func exprsFromMsg(b []byte) ([]expr.Any, error) {
ad, err := netlink.NewAttributeDecoder(b)
if err != nil {
Expand Down
Loading