Skip to content
Snippets Groups Projects
Unverified Commit 718e6477 authored by John Eikenberry's avatar John Eikenberry Committed by GitHub
Browse files

Merge pull request #266 from pkg/issue265

fix race w/ open packet and stat
parents fe93131e b0f20f99
No related branches found
Tags v1.8.2
No related merge requests found
......@@ -105,33 +105,32 @@ func (s *packetManager) close() {
func (s *packetManager) workerChan(runWorker func(chan orderedRequest),
) chan orderedRequest {
// multiple workers for faster read/writes
rwChan := make(chan orderedRequest, SftpServerWorkerCount)
for i := 0; i < SftpServerWorkerCount; i++ {
runWorker(rwChan)
}
// single worker to enforce sequential processing of everything else
cmdChan := make(chan orderedRequest)
runWorker(cmdChan)
pktChan := make(chan orderedRequest, SftpServerWorkerCount)
go func() {
// start with cmdChan
curChan := cmdChan
for pkt := range pktChan {
// on file open packet, switch to rwChan
switch pkt.requestPacket.(type) {
case *sshFxpOpenPacket:
curChan = rwChan
// on file close packet, switch back to cmdChan
// after waiting for any reads/writes to finish
case *sshFxpReadPacket, *sshFxpWritePacket:
s.incomingPacket(pkt)
rwChan <- pkt
continue
case *sshFxpClosePacket:
// wait for rwChan to finish
// wait for reads/writes to finish when file is closed
// incomingPacket() call must occur after this
s.working.Wait()
// stop using rwChan
curChan = cmdChan
}
s.incomingPacket(pkt)
curChan <- pkt
// all non-RW use sequential cmdChan
cmdChan <- pkt
}
close(rwChan)
close(cmdChan)
......
......@@ -3,6 +3,7 @@ package sftp
import (
"io"
"os"
"path"
"regexp"
"sync"
"syscall"
......@@ -278,3 +279,51 @@ func TestStatusFromError(t *testing.T) {
assert.Equal(t, tc.pkt, statusFromError(tc.pkt, tc.err))
}
}
// This was written to test a race b/w open immediately followed by a stat.
// Previous to this the Open would trigger the use of a worker pool, then the
// stat packet would come in an hit the pool and return faster than the open
// (returning a file-not-found error).
// The below by itself wouldn't trigger the race however, I needed to add a
// small sleep in the openpacket code to trigger the issue. I wanted to add a
// way to inject that in the code but right now there is no good place for it.
// I'm thinking after I convert the server into a request-server backend I
// might be able to do something with the runWorker method passed into the
// packet manager. But with the 2 implementations fo the server it just doesn't
// fit well right now.
func TestOpenStatRace(t *testing.T) {
client, server := clientServerPair(t)
defer client.Close()
defer server.Close()
// openpacket finishes to fast to trigger race in tests
// need to add a small sleep on server to openpackets somehow
tmppath := path.Join(os.TempDir(), "stat_race")
pflags := flags(os.O_RDWR | os.O_CREATE | os.O_TRUNC)
ch := make(chan result, 3)
id1 := client.nextID()
client.dispatchRequest(ch, sshFxpOpenPacket{
ID: id1,
Path: tmppath,
Pflags: pflags,
})
id2 := client.nextID()
client.dispatchRequest(ch, sshFxpLstatPacket{
ID: id2,
Path: tmppath,
})
testreply := func(id uint32, ch chan result) {
r := <-ch
switch r.typ {
case ssh_FXP_ATTRS, ssh_FXP_HANDLE: // ignore
case ssh_FXP_STATUS:
err := normaliseError(unmarshalStatus(id, r.data))
assert.NoError(t, err, "race hit, stat before open")
default:
assert.Fail(t, "Unexpected type")
}
}
testreply(id1, ch)
testreply(id2, ch)
os.Remove(tmppath)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment