diff --git a/lib/constants.go b/lib/constants.go index e0ef1a8..aa5edf8 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -2,6 +2,7 @@ package libunlynx import ( "sync" + "time" "go.dedis.ch/onet/v3/simul/monitor" ) @@ -10,7 +11,7 @@ import ( //______________________________________________________________________________________________________________________ // TIME is true if we use protocols with time measurements of computations. -const TIME = false +var TIME = false // VPARALLELIZE allows to choose the level of parallelization in the vector computations const VPARALLELIZE = 100 @@ -18,6 +19,9 @@ const VPARALLELIZE = 100 // DIFFPRI enables the DRO protocol (Distributed Results Obfuscation) const DIFFPRI = false +// TIMEOUT ddefines the default channel timeout +var TIMEOUT = 10 * time.Minute + // StartTimer starts measurement of time func StartTimer(name string) *monitor.TimeMeasure { if TIME { diff --git a/protocols/collective_aggregation_protocol.go b/protocols/collective_aggregation_protocol.go index cf4cfd7..0524511 100644 --- a/protocols/collective_aggregation_protocol.go +++ b/protocols/collective_aggregation_protocol.go @@ -9,6 +9,7 @@ package protocolsunlynx import ( "errors" + "os" "sync" "time" @@ -107,8 +108,6 @@ type CollectiveAggregationProtocol struct { Proofs bool ProofFunc proofCollectiveAggregationFunction // proof function for when we want to do something different with the proofs (e.g. insert in the blockchain) MapPIs map[string]onet.ProtocolInstance // protocol instances to be able to call protocols inside protocols (e.g. proof_collection_protocol) - - Timeout time.Duration } // NewCollectiveAggregationProtocol initializes the protocol instance. @@ -132,9 +131,6 @@ func NewCollectiveAggregationProtocol(n *onet.TreeNodeInstance) (onet.ProtocolIn return nil, errors.New("couldn't register data reference channel: " + err.Error()) } - // default timeout - pap.Timeout = 10 * time.Minute - return pap, nil } @@ -201,6 +197,11 @@ func (p *CollectiveAggregationProtocol) Dispatch() error { // Announce forwarding down the tree. func (p *CollectiveAggregationProtocol) aggregationAnnouncementPhase() error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + select { case dataReferenceMessage := <-p.DataReferenceChannel: if !p.IsLeaf() { @@ -208,7 +209,7 @@ func (p *CollectiveAggregationProtocol) aggregationAnnouncementPhase() error { return errors.New("Error sending :" + err.Error()) } } - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } return nil diff --git a/protocols/deterministic_tagging_protocol.go b/protocols/deterministic_tagging_protocol.go index 11cac51..854d40f 100644 --- a/protocols/deterministic_tagging_protocol.go +++ b/protocols/deterministic_tagging_protocol.go @@ -8,6 +8,7 @@ package protocolsunlynx import ( "errors" + "os" "sync" "time" @@ -91,8 +92,6 @@ type DeterministicTaggingProtocol struct { Proofs bool ExecTime time.Duration - - Timeout time.Duration } // NewDeterministicTaggingProtocol constructs tagging switching protocol instances. @@ -118,10 +117,6 @@ func NewDeterministicTaggingProtocol(n *onet.TreeNodeInstance) (onet.ProtocolIns break } } - - // default timeout - dsp.Timeout = 10 * time.Minute - return dsp, nil } @@ -160,16 +155,21 @@ func (p *DeterministicTaggingProtocol) Start() error { func (p *DeterministicTaggingProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + //************ ----- first round, add value derivated from ephemeral secret to message ---- ******************** var deterministicTaggingTargetBytesBef deterministicTaggingBytesStruct select { case deterministicTaggingTargetBytesBef = <-p.PreviousNodeInPathChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the (first round) on time.") } deterministicTaggingTargetBef := DeterministicTaggingMessage{Data: make([]libunlynx.CipherText, 0)} - err := deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data) + err = deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data) if err != nil { return err } @@ -217,7 +217,7 @@ func (p *DeterministicTaggingProtocol) Dispatch() error { var deterministicTaggingTargetBytes deterministicTaggingBytesStruct select { case deterministicTaggingTargetBytes = <-p.PreviousNodeInPathChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the (second round) on time.") } diff --git a/protocols/key_switching_protocol.go b/protocols/key_switching_protocol.go index 76ac82f..5755053 100644 --- a/protocols/key_switching_protocol.go +++ b/protocols/key_switching_protocol.go @@ -8,6 +8,7 @@ package protocolsunlynx import ( "errors" + "os" "time" "github.com/ldsec/unlynx/lib" @@ -114,8 +115,6 @@ type KeySwitchingProtocol struct { // Test (only use in order to test the protocol) ExecTime time.Duration - - Timeout time.Duration } // NewKeySwitchingProtocol initializes the protocol instance. @@ -139,9 +138,6 @@ func NewKeySwitchingProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstance, e return nil, errors.New("couldn't register length channel: " + err.Error()) } - // default timeout - pap.Timeout = 10 * time.Minute - return pap, nil } @@ -235,10 +231,15 @@ func (p *KeySwitchingProtocol) Dispatch() error { // Announce forwarding down the tree. func (p *KeySwitchingProtocol) announcementKSPhase() (kyber.Point, []kyber.Point, error) { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var dataReferenceMessage DownBytesStruct select { case dataReferenceMessage = <-p.DownChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return nil, nil, errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/shuffling+ddt_protocol.go b/protocols/shuffling+ddt_protocol.go index 0b3abe9..1344788 100644 --- a/protocols/shuffling+ddt_protocol.go +++ b/protocols/shuffling+ddt_protocol.go @@ -2,6 +2,7 @@ package protocolsunlynx import ( "errors" + "os" "sync" "time" @@ -82,8 +83,6 @@ type ShufflingPlusDDTProtocol struct { // Proofs Proofs bool - - Timeout time.Duration } // NewShufflingPlusDDTProtocol constructs neff shuffle + ddt protocol instance. @@ -109,10 +108,6 @@ func NewShufflingPlusDDTProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstanc break } } - - // default timeout - pi.Timeout = 10 * time.Minute - return pi, nil } @@ -159,23 +154,28 @@ func (p *ShufflingPlusDDTProtocol) Start() error { func (p *ShufflingPlusDDTProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var shufflingPlusDDTBytesMessageLength shufflingPlusDDTBytesLengthStruct select { case shufflingPlusDDTBytesMessageLength = <-p.LengthNodeChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } var tmp shufflingPlusDDTBytesStruct select { case tmp = <-p.PreviousNodeInPathChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } readData := libunlynx.StartTimer(p.Name() + "_ShufflingPlusDDT(ReadData)") sm := ShufflingPlusDDTMessage{} - err := sm.FromBytes(tmp.Data, tmp.ShuffKey, shufflingPlusDDTBytesMessageLength.CVLengths) + err = sm.FromBytes(tmp.Data, tmp.ShuffKey, shufflingPlusDDTBytesMessageLength.CVLengths) if err != nil { return err } diff --git a/protocols/shuffling_protocol.go b/protocols/shuffling_protocol.go index 7973c7c..cf548f7 100644 --- a/protocols/shuffling_protocol.go +++ b/protocols/shuffling_protocol.go @@ -5,6 +5,7 @@ package protocolsunlynx import ( "errors" + "os" "time" "github.com/ldsec/unlynx/lib" @@ -91,8 +92,6 @@ type ShufflingProtocol struct { CollectiveKey kyber.Point ExecTimeStart time.Duration ExecTime time.Duration - - Timeout time.Duration } // NewShufflingProtocol constructs neff shuffle protocol instances. @@ -119,9 +118,6 @@ func NewShufflingProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstance, erro } } - // default timeout - dsp.Timeout = 10 * time.Minute - return dsp, nil } @@ -192,17 +188,22 @@ func (p *ShufflingProtocol) Start() error { func (p *ShufflingProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var shufflingBytesMessageLength shufflingBytesLengthStruct select { case shufflingBytesMessageLength = <-p.LengthNodeChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } var tmp shufflingBytesStruct select { case tmp = <-p.PreviousNodeInPathChannel: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/addrm_server_protocol.go b/protocols/utils/addrm_server_protocol.go index 90349b3..a83c63a 100644 --- a/protocols/utils/addrm_server_protocol.go +++ b/protocols/utils/addrm_server_protocol.go @@ -5,6 +5,7 @@ package protocolsunlynxutils import ( "errors" + "os" "sync" "time" @@ -39,8 +40,6 @@ type AddRmServerProtocol struct { KeyToRm kyber.Scalar Proofs bool Add bool - - Timeout time.Duration } // NewAddRmProtocol is constructor of add/rm protocol instances. @@ -50,9 +49,6 @@ func NewAddRmProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstance, error) { FeedbackChannel: make(chan []libunlynx.CipherText), } - // default timeout - pvp.Timeout = 10 * time.Minute - return pvp, nil } @@ -98,10 +94,15 @@ func (p *AddRmServerProtocol) Start() error { func (p *AddRmServerProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var finalResultMessage []libunlynx.CipherText select { case finalResultMessage = <-finalResultAddrm: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/local_aggregation_protocol.go b/protocols/utils/local_aggregation_protocol.go index c548bae..7de617a 100644 --- a/protocols/utils/local_aggregation_protocol.go +++ b/protocols/utils/local_aggregation_protocol.go @@ -8,6 +8,7 @@ import ( "github.com/ldsec/unlynx/lib/aggregation" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" + "os" "time" ) @@ -35,8 +36,6 @@ type LocalAggregationProtocol struct { // Protocol state data TargetOfAggregation []libunlynx.FilteredResponseDet Proofs bool - - Timeout time.Duration } // NewLocalAggregationProtocol is constructor of Local Aggregation protocol instances. @@ -45,10 +44,6 @@ func NewLocalAggregationProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInstanc TreeNodeInstance: n, FeedbackChannel: make(chan map[libunlynx.GroupingKey]libunlynx.FilteredResponse), } - - // default timeout - pvp.Timeout = 10 * time.Minute - return pvp, nil } @@ -90,10 +85,15 @@ func (p *LocalAggregationProtocol) Start() error { func (p *LocalAggregationProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var finalResultMessage map[libunlynx.GroupingKey]libunlynx.FilteredResponse select { case finalResultMessage = <-finalResultAggr: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/local_clear_aggregation_protocol.go b/protocols/utils/local_clear_aggregation_protocol.go index 66f1fb7..207b6e6 100644 --- a/protocols/utils/local_clear_aggregation_protocol.go +++ b/protocols/utils/local_clear_aggregation_protocol.go @@ -8,6 +8,7 @@ import ( "github.com/ldsec/unlynx/lib/store" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" + "os" "time" ) @@ -31,8 +32,6 @@ type LocalClearAggregationProtocol struct { // Protocol state data TargetOfAggregation []libunlynx.DpClearResponse - - Timeout time.Duration } // NewLocalClearAggregationProtocol is constructor of Proofs Verification protocol instances. @@ -41,10 +40,6 @@ func NewLocalClearAggregationProtocol(n *onet.TreeNodeInstance) (onet.ProtocolIn TreeNodeInstance: n, FeedbackChannel: make(chan []libunlynx.DpClearResponse), } - - // default timeout - pvp.Timeout = 10 * time.Minute - return pvp, nil } @@ -64,10 +59,15 @@ func (p *LocalClearAggregationProtocol) Start() error { func (p *LocalClearAggregationProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var finalResultMessage []libunlynx.DpClearResponse select { case finalResultMessage = <-finalResultClearAggr: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/proofs_verification_protocol.go b/protocols/utils/proofs_verification_protocol.go index e4185d1..b416fc2 100644 --- a/protocols/utils/proofs_verification_protocol.go +++ b/protocols/utils/proofs_verification_protocol.go @@ -13,6 +13,7 @@ import ( "github.com/ldsec/unlynx/lib/shuffle" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" + "os" "time" ) @@ -47,8 +48,6 @@ type ProofsVerificationProtocol struct { // Protocol state data TargetOfVerification ProofsToVerify - - Timeout time.Duration } // NewProofsVerificationProtocol is constructor of Proofs Verification protocol instances. @@ -58,9 +57,6 @@ func NewProofsVerificationProtocol(n *onet.TreeNodeInstance) (onet.ProtocolInsta FeedbackChannel: make(chan []bool), } - // default timeout - pvp.Timeout = 10 * time.Minute - return pvp, nil } @@ -114,10 +110,15 @@ func (p *ProofsVerificationProtocol) Start() error { func (p *ProofsVerificationProtocol) Dispatch() error { defer p.Done() + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + var finalResultMessage []bool select { case finalResultMessage = <-finalResult: - case <-time.After(p.Timeout): + case <-time.After(timeout): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/services/service.go b/services/service.go index b824635..fb1025d 100644 --- a/services/service.go +++ b/services/service.go @@ -2,6 +2,7 @@ package servicesunlynx import ( "errors" + "os" "strconv" "time" @@ -89,14 +90,8 @@ func init() { network.RegisterMessage(&SurveyResponseQuery{}) network.RegisterMessage(&ServiceState{}) network.RegisterMessage(&ServiceResult{}) - - // Default timeout just for all tests to work - TimeoutService = 20 * time.Minute } -// TimeoutService is the communication idle timeout -var TimeoutService time.Duration - // QueryBroadcastFinished is used to ensure that all servers have received the query/survey type QueryBroadcastFinished struct { SurveyID SurveyID @@ -133,9 +128,6 @@ type ServiceResult struct { // Service defines a service in unlynx with a survey. type Service struct { *onet.ServiceProcessor - - Timeout time.Duration - Survey *concurrent.ConcurrentMap } @@ -158,7 +150,6 @@ func (s *Service) putSurvey(sid SurveyID, surv Survey) error { // NewService constructor which registers the needed messages. func NewService(c *onet.Context) (onet.Service, error) { newUnLynxInstance := &Service{ - Timeout: TimeoutService, ServiceProcessor: onet.NewServiceProcessor(c), Survey: concurrent.NewConcurrentMap(), } @@ -724,6 +715,11 @@ func (s *Service) StartService(targetSurvey SurveyID, root bool) error { // ShufflingPhase performs the shuffling of the ClientResponses func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + survey, err := s.getSurvey(targetSurvey) if err != nil { return err @@ -742,7 +738,7 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { var tmpShufflingResult []libunlynx.CipherVector select { case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel: - case <-time.After(s.Timeout): + case <-time.After(timeout): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -759,6 +755,11 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { // TaggingPhase performs the private grouping on the currently collected data. func (s *Service) TaggingPhase(targetSurvey SurveyID) error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + survey, err := s.getSurvey(targetSurvey) if err != nil { return err @@ -777,7 +778,7 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error { var tmpDeterministicTaggingResult []libunlynx.DeterministCipherText select { case tmpDeterministicTaggingResult = <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel: - case <-time.After(s.Timeout): + case <-time.After(timeout): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -808,6 +809,11 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error { // AggregationPhase performs the per-group aggregation on the currently grouped data. func (s *Service) AggregationPhase(targetSurvey SurveyID) error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + pi, err := s.StartProtocol(protocolsunlynx.CollectiveAggregationProtocolName, targetSurvey) if err != nil { return err @@ -816,7 +822,7 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error { var cothorityAggregatedData protocolsunlynx.CothorityAggregatedData select { case cothorityAggregatedData = <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel: - case <-time.After(s.Timeout): + case <-time.After(timeout): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -832,6 +838,11 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error { // DROPhase shuffles the list of noise values. func (s *Service) DROPhase(targetSurvey SurveyID) error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + pi, err := s.StartProtocol(protocolsunlynx.DROProtocolName, targetSurvey) if err != nil { return err @@ -845,7 +856,7 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error { var tmpShufflingResult []libunlynx.CipherVector select { case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel: - case <-time.After(s.Timeout): + case <-time.After(timeout): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -858,6 +869,11 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error { // KeySwitchingPhase performs the switch to the querier's key on the currently aggregated data. func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error { + timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + timeout = libunlynx.TIMEOUT + } + pi, err := s.StartProtocol(protocolsunlynx.KeySwitchingProtocolName, targetSurvey) if err != nil { return err @@ -871,7 +887,7 @@ func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error { var tmpKeySwitchedAggregatedResponses libunlynx.CipherVector select { case tmpKeySwitchedAggregatedResponses = <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel: - case <-time.After(s.Timeout): + case <-time.After(timeout): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") }