Skip to content

Commit

Permalink
Added init() for timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
JoaoAndreSa committed Mar 11, 2020
1 parent 3dcc5b4 commit abbe277
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 109 deletions.
8 changes: 8 additions & 0 deletions lib/constants.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package libunlynx

import (
"os"
"sync"
"time"

"go.dedis.ch/onet/v3/simul/monitor"
)

func init() {
tmp, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
TIMEOUT = tmp
}
}

// Global Variables
//______________________________________________________________________________________________________________________

Expand Down
8 changes: 1 addition & 7 deletions protocols/collective_aggregation_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package protocolsunlynx

import (
"errors"
"os"
"sync"
"time"

Expand Down Expand Up @@ -197,19 +196,14 @@ func (p *CollectiveAggregationProtocol) Dispatch() error {

// Announce forwarding down the tree.
func (p *CollectiveAggregationProtocol) aggregationAnnouncementPhase() error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

select {
case dataReferenceMessage := <-p.DataReferenceChannel:
if !p.IsLeaf() {
if err := p.SendToChildren(&dataReferenceMessage.DataReferenceMessage); err != nil {
return errors.New("Error sending <DataReferenceMessage>:" + err.Error())
}
}
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <dataReferenceMessage> on time.")
}
return nil
Expand Down
12 changes: 3 additions & 9 deletions protocols/deterministic_tagging_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package protocolsunlynx

import (
"errors"
"os"
"sync"
"time"

Expand Down Expand Up @@ -155,21 +154,16 @@ func (p *DeterministicTaggingProtocol) Start() error {
func (p *DeterministicTaggingProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

//************ ----- first round, add value derivated from ephemeral secret to message ---- ********************
var deterministicTaggingTargetBytesBef deterministicTaggingBytesStruct
select {
case deterministicTaggingTargetBytesBef = <-p.PreviousNodeInPathChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <deterministicTaggingTargetBytesBef> (first round) on time.")
}

deterministicTaggingTargetBef := DeterministicTaggingMessage{Data: make([]libunlynx.CipherText, 0)}
err = deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data)
err := deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data)
if err != nil {
return err
}
Expand Down Expand Up @@ -217,7 +211,7 @@ func (p *DeterministicTaggingProtocol) Dispatch() error {
var deterministicTaggingTargetBytes deterministicTaggingBytesStruct
select {
case deterministicTaggingTargetBytes = <-p.PreviousNodeInPathChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <deterministicTaggingTargetBytes> (second round) on time.")
}

Expand Down
8 changes: 1 addition & 7 deletions protocols/key_switching_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package protocolsunlynx

import (
"errors"
"os"
"time"

"github.com/ldsec/unlynx/lib"
Expand Down Expand Up @@ -231,15 +230,10 @@ func (p *KeySwitchingProtocol) Dispatch() error {

// Announce forwarding down the tree.
func (p *KeySwitchingProtocol) announcementKSPhase() (kyber.Point, []kyber.Point, error) {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var dataReferenceMessage DownBytesStruct
select {
case dataReferenceMessage = <-p.DownChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return nil, nil, errors.New(p.ServerIdentity().String() + "didn't get the <dataReferenceMessage> on time.")
}

Expand Down
9 changes: 2 additions & 7 deletions protocols/shuffling+ddt_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,17 @@ func (p *ShufflingPlusDDTProtocol) Start() error {
func (p *ShufflingPlusDDTProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var shufflingPlusDDTBytesMessageLength shufflingPlusDDTBytesLengthStruct
select {
case shufflingPlusDDTBytesMessageLength = <-p.LengthNodeChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <shufflingPlusDDTBytesMessageLength> on time.")
}

var tmp shufflingPlusDDTBytesStruct
select {
case tmp = <-p.PreviousNodeInPathChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <tmp> on time.")
}

Expand Down
10 changes: 2 additions & 8 deletions protocols/shuffling_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package protocolsunlynx

import (
"errors"
"os"
"time"

"github.com/ldsec/unlynx/lib"
Expand Down Expand Up @@ -188,22 +187,17 @@ func (p *ShufflingProtocol) Start() error {
func (p *ShufflingProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var shufflingBytesMessageLength shufflingBytesLengthStruct
select {
case shufflingBytesMessageLength = <-p.LengthNodeChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <shufflingBytesMessageLength> on time.")
}

var tmp shufflingBytesStruct
select {
case tmp = <-p.PreviousNodeInPathChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <tmp> on time.")
}

Expand Down
8 changes: 1 addition & 7 deletions protocols/utils/addrm_server_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package protocolsunlynxutils

import (
"errors"
"os"
"sync"
"time"

Expand Down Expand Up @@ -94,15 +93,10 @@ func (p *AddRmServerProtocol) Start() error {
func (p *AddRmServerProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var finalResultMessage []libunlynx.CipherText
select {
case finalResultMessage = <-finalResultAddrm:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <finalResultMessage> on time.")
}

Expand Down
8 changes: 1 addition & 7 deletions protocols/utils/local_aggregation_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ldsec/unlynx/lib/aggregation"
"go.dedis.ch/onet/v3"
"go.dedis.ch/onet/v3/log"
"os"
"time"
)

Expand Down Expand Up @@ -85,15 +84,10 @@ func (p *LocalAggregationProtocol) Start() error {
func (p *LocalAggregationProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var finalResultMessage map[libunlynx.GroupingKey]libunlynx.FilteredResponse
select {
case finalResultMessage = <-finalResultAggr:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <finalResultMessage> on time.")
}

Expand Down
8 changes: 1 addition & 7 deletions protocols/utils/local_clear_aggregation_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/ldsec/unlynx/lib/store"
"go.dedis.ch/onet/v3"
"go.dedis.ch/onet/v3/log"
"os"
"time"
)

Expand Down Expand Up @@ -59,15 +58,10 @@ func (p *LocalClearAggregationProtocol) Start() error {
func (p *LocalClearAggregationProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var finalResultMessage []libunlynx.DpClearResponse
select {
case finalResultMessage = <-finalResultClearAggr:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <finalResultMessage> on time.")
}

Expand Down
8 changes: 1 addition & 7 deletions protocols/utils/proofs_verification_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ldsec/unlynx/lib/shuffle"
"go.dedis.ch/onet/v3"
"go.dedis.ch/onet/v3/log"
"os"
"time"
)

Expand Down Expand Up @@ -110,15 +109,10 @@ func (p *ProofsVerificationProtocol) Start() error {
func (p *ProofsVerificationProtocol) Dispatch() error {
defer p.Done()

timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

var finalResultMessage []bool
select {
case finalResultMessage = <-finalResult:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(p.ServerIdentity().String() + "didn't get the <finalResultMessage> on time.")
}

Expand Down
36 changes: 5 additions & 31 deletions services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package servicesunlynx

import (
"errors"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -715,11 +714,6 @@ func (s *Service) StartService(targetSurvey SurveyID, root bool) error {

// ShufflingPhase performs the shuffling of the ClientResponses
func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

survey, err := s.getSurvey(targetSurvey)
if err != nil {
return err
Expand All @@ -738,7 +732,7 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {
var tmpShufflingResult []libunlynx.CipherVector
select {
case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(s.ServerIdentity().String() + "didn't get the <tmpShufflingResult> on time.")
}

Expand All @@ -755,11 +749,6 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error {

// TaggingPhase performs the private grouping on the currently collected data.
func (s *Service) TaggingPhase(targetSurvey SurveyID) error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

survey, err := s.getSurvey(targetSurvey)
if err != nil {
return err
Expand All @@ -778,7 +767,7 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error {
var tmpDeterministicTaggingResult []libunlynx.DeterministCipherText
select {
case tmpDeterministicTaggingResult = <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(s.ServerIdentity().String() + "didn't get the <tmpDeterministicTaggingResult> on time.")
}

Expand Down Expand Up @@ -809,11 +798,6 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error {

// AggregationPhase performs the per-group aggregation on the currently grouped data.
func (s *Service) AggregationPhase(targetSurvey SurveyID) error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

pi, err := s.StartProtocol(protocolsunlynx.CollectiveAggregationProtocolName, targetSurvey)
if err != nil {
return err
Expand All @@ -822,7 +806,7 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error {
var cothorityAggregatedData protocolsunlynx.CothorityAggregatedData
select {
case cothorityAggregatedData = <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(s.ServerIdentity().String() + "didn't get the <cothorityAggregatedData> on time.")
}

Expand All @@ -838,11 +822,6 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error {

// DROPhase shuffles the list of noise values.
func (s *Service) DROPhase(targetSurvey SurveyID) error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

pi, err := s.StartProtocol(protocolsunlynx.DROProtocolName, targetSurvey)
if err != nil {
return err
Expand All @@ -856,7 +835,7 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error {
var tmpShufflingResult []libunlynx.CipherVector
select {
case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(s.ServerIdentity().String() + "didn't get the <tmpShufflingResult> on time.")
}

Expand All @@ -869,11 +848,6 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error {

// KeySwitchingPhase performs the switch to the querier's key on the currently aggregated data.
func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error {
timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT"))
if err != nil {
timeout = libunlynx.TIMEOUT
}

pi, err := s.StartProtocol(protocolsunlynx.KeySwitchingProtocolName, targetSurvey)
if err != nil {
return err
Expand All @@ -887,7 +861,7 @@ func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error {
var tmpKeySwitchedAggregatedResponses libunlynx.CipherVector
select {
case tmpKeySwitchedAggregatedResponses = <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel:
case <-time.After(timeout):
case <-time.After(libunlynx.TIMEOUT):
return errors.New(s.ServerIdentity().String() + "didn't get the <tmpKeySwitchedAggregatedResponses> on time.")
}

Expand Down
Loading

0 comments on commit abbe277

Please sign in to comment.