Skip to content

Commit

Permalink
feat(graphsyncimpl): fix open/close events
Browse files Browse the repository at this point in the history
Remove error on push acceptance, make both sides have open events
  • Loading branch information
hannahhoward committed May 5, 2020
1 parent 9a1d95f commit 24ff6f0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 5 deletions.
20 changes: 20 additions & 0 deletions impl/graphsync/graphsync_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ func (impl *graphsyncImpl) OpenPushDataChannel(ctx context.Context, requestTo pe
if err != nil {
return chid, err
}
evt := datatransfer.Event{
Code: datatransfer.Open,
Message: "New Request Initiated",
Timestamp: time.Now(),
}
chst := impl.channels.GetByIDAndSender(chid, impl.peerID)
err = impl.pubSub.Publish(internalEvent{evt, chst})
if err != nil {
log.Warnf("err publishing DT event: %s", err.Error())
}
return chid, nil
}

Expand All @@ -183,6 +193,16 @@ func (impl *graphsyncImpl) OpenPullDataChannel(ctx context.Context, requestTo pe
if err != nil {
return chid, err
}
evt := datatransfer.Event{
Code: datatransfer.Open,
Message: "New Request Initiated",
Timestamp: time.Now(),
}
chst := impl.channels.GetByIDAndSender(chid, requestTo)
err = impl.pubSub.Publish(internalEvent{evt, chst})
if err != nil {
log.Warnf("err publishing DT event: %s", err.Error())
}
return chid, nil
}

Expand Down
36 changes: 33 additions & 3 deletions impl/graphsync/graphsync_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func TestDataTransferInitiatingPullGraphsyncRequests(t *testing.T) {

subscribeCalls := make(chan struct{}, 1)
subscribe := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Error {
if event.Code == datatransfer.Progress {
subscribeCalls <- struct{}{}
}
}
Expand Down Expand Up @@ -935,10 +935,18 @@ func TestDataTransferPushRoundTrip(t *testing.T) {
dt2 := NewGraphSyncDataTransfer(host2, gs2, gsData.StoredCounter2)

finished := make(chan struct{}, 2)
errChan := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Complete {
finished <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
Expand All @@ -949,11 +957,18 @@ func TestDataTransferPushRoundTrip(t *testing.T) {

chid, err := dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)
for i := 0; i < 2; i++ {
opens := 0
completes := 0
for opens < 2 || completes < 2 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
completes++
case <-opened:
opens++
case <-errChan:
t.Fatal("received error on data transfer")
}
}
gsData.VerifyFileTransferred(t, root, true)
Expand All @@ -978,10 +993,18 @@ func TestDataTransferPullRoundTrip(t *testing.T) {
dt2 := NewGraphSyncDataTransfer(host2, gs2, gsData.StoredCounter2)

finished := make(chan struct{}, 2)
errChan := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Complete {
finished <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
Expand All @@ -992,11 +1015,18 @@ func TestDataTransferPullRoundTrip(t *testing.T) {

_, err := dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)
for i := 0; i < 2; i++ {
opens := 0
completes := 0
for opens < 2 || completes < 2 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
completes++
case <-opened:
opens++
case <-errChan:
t.Fatal("received error on data transfer")
}
}
gsData.VerifyFileTransferred(t, root, true)
Expand Down
14 changes: 12 additions & 2 deletions impl/graphsync/graphsync_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,22 @@ func (receiver *graphsyncReceiver) ReceiveRequest(
receiver.impl.sendGsRequest(ctx, initiator, incoming.TransferID(), incoming.IsPull(), dataSender, root, stor)
}

_, err = receiver.impl.channels.CreateNew(incoming.TransferID(), incoming.BaseCid(), stor, voucher, initiator, dataSender, dataReceiver)
chid, err := receiver.impl.channels.CreateNew(incoming.TransferID(), incoming.BaseCid(), stor, voucher, initiator, dataSender, dataReceiver)
if err != nil {
log.Error(err)
receiver.impl.sendResponse(ctx, false, initiator, incoming.TransferID())
return
}
evt := datatransfer.Event{
Code: datatransfer.Open,
Message: "Incoming request accepted",
Timestamp: time.Now(),
}
chst := receiver.impl.channels.GetByIDAndSender(chid, dataSender)
err = receiver.impl.pubSub.Publish(internalEvent{evt, chst})
if err != nil {
log.Warnf("err publishing DT event: %s", err.Error())
}
receiver.impl.sendResponse(ctx, true, initiator, incoming.TransferID())
}

Expand Down Expand Up @@ -109,14 +119,14 @@ func (receiver *graphsyncReceiver) ReceiveResponse(
// initiator is us. construct a channel id for a pull request that we initiated and see
// if there is one in our saved channel list. otherwise we should not respond.
chid := datatransfer.ChannelID{Initiator: receiver.impl.peerID, ID: incoming.TransferID()}
evt.Code = datatransfer.Progress

// if we are handling a response to a pull request then they are sending data and the
// initiator is us
if chst = receiver.impl.channels.GetByIDAndSender(chid, sender); chst != datatransfer.EmptyChannelState {
baseCid := chst.BaseCID()
root := cidlink.Link{Cid: baseCid}
receiver.impl.sendGsRequest(ctx, receiver.impl.peerID, incoming.TransferID(), true, sender, root, chst.Selector())
evt.Code = datatransfer.Progress
}
}
err := receiver.impl.pubSub.Publish(internalEvent{evt, chst})
Expand Down

0 comments on commit 24ff6f0

Please sign in to comment.