diff --git a/lib/constants.go b/lib/constants.go index aa5edf8..b317c23 100644 --- a/lib/constants.go +++ b/lib/constants.go @@ -1,12 +1,20 @@ package libunlynx import ( + "os" "sync" "time" "go.dedis.ch/onet/v3/simul/monitor" ) +func init() { + tmp, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) + if err != nil { + TIMEOUT = tmp + } +} + // Global Variables //______________________________________________________________________________________________________________________ diff --git a/protocols/collective_aggregation_protocol.go b/protocols/collective_aggregation_protocol.go index 0524511..f8a79ba 100644 --- a/protocols/collective_aggregation_protocol.go +++ b/protocols/collective_aggregation_protocol.go @@ -9,7 +9,6 @@ package protocolsunlynx import ( "errors" - "os" "sync" "time" @@ -197,11 +196,6 @@ func (p *CollectiveAggregationProtocol) Dispatch() error { // Announce forwarding down the tree. func (p *CollectiveAggregationProtocol) aggregationAnnouncementPhase() error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - select { case dataReferenceMessage := <-p.DataReferenceChannel: if !p.IsLeaf() { @@ -209,7 +203,7 @@ func (p *CollectiveAggregationProtocol) aggregationAnnouncementPhase() error { return errors.New("Error sending :" + err.Error()) } } - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } return nil diff --git a/protocols/deterministic_tagging_protocol.go b/protocols/deterministic_tagging_protocol.go index 854d40f..143d088 100644 --- a/protocols/deterministic_tagging_protocol.go +++ b/protocols/deterministic_tagging_protocol.go @@ -8,7 +8,6 @@ package protocolsunlynx import ( "errors" - "os" "sync" "time" @@ -155,21 +154,16 @@ func (p *DeterministicTaggingProtocol) Start() error { func (p *DeterministicTaggingProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - //************ ----- first round, add value derivated from ephemeral secret to message ---- ******************** var deterministicTaggingTargetBytesBef deterministicTaggingBytesStruct select { case deterministicTaggingTargetBytesBef = <-p.PreviousNodeInPathChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the (first round) on time.") } deterministicTaggingTargetBef := DeterministicTaggingMessage{Data: make([]libunlynx.CipherText, 0)} - err = deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data) + err := deterministicTaggingTargetBef.FromBytes(deterministicTaggingTargetBytesBef.Data) if err != nil { return err } @@ -217,7 +211,7 @@ func (p *DeterministicTaggingProtocol) Dispatch() error { var deterministicTaggingTargetBytes deterministicTaggingBytesStruct select { case deterministicTaggingTargetBytes = <-p.PreviousNodeInPathChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the (second round) on time.") } diff --git a/protocols/key_switching_protocol.go b/protocols/key_switching_protocol.go index 5755053..0b92137 100644 --- a/protocols/key_switching_protocol.go +++ b/protocols/key_switching_protocol.go @@ -8,7 +8,6 @@ package protocolsunlynx import ( "errors" - "os" "time" "github.com/ldsec/unlynx/lib" @@ -231,15 +230,10 @@ func (p *KeySwitchingProtocol) Dispatch() error { // Announce forwarding down the tree. func (p *KeySwitchingProtocol) announcementKSPhase() (kyber.Point, []kyber.Point, error) { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var dataReferenceMessage DownBytesStruct select { case dataReferenceMessage = <-p.DownChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return nil, nil, errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/shuffling+ddt_protocol.go b/protocols/shuffling+ddt_protocol.go index 1344788..827f364 100644 --- a/protocols/shuffling+ddt_protocol.go +++ b/protocols/shuffling+ddt_protocol.go @@ -154,22 +154,17 @@ func (p *ShufflingPlusDDTProtocol) Start() error { func (p *ShufflingPlusDDTProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var shufflingPlusDDTBytesMessageLength shufflingPlusDDTBytesLengthStruct select { case shufflingPlusDDTBytesMessageLength = <-p.LengthNodeChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } var tmp shufflingPlusDDTBytesStruct select { case tmp = <-p.PreviousNodeInPathChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/shuffling_protocol.go b/protocols/shuffling_protocol.go index cf548f7..7432589 100644 --- a/protocols/shuffling_protocol.go +++ b/protocols/shuffling_protocol.go @@ -5,7 +5,6 @@ package protocolsunlynx import ( "errors" - "os" "time" "github.com/ldsec/unlynx/lib" @@ -188,22 +187,17 @@ func (p *ShufflingProtocol) Start() error { func (p *ShufflingProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var shufflingBytesMessageLength shufflingBytesLengthStruct select { case shufflingBytesMessageLength = <-p.LengthNodeChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } var tmp shufflingBytesStruct select { case tmp = <-p.PreviousNodeInPathChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/addrm_server_protocol.go b/protocols/utils/addrm_server_protocol.go index a83c63a..d3cdbac 100644 --- a/protocols/utils/addrm_server_protocol.go +++ b/protocols/utils/addrm_server_protocol.go @@ -5,7 +5,6 @@ package protocolsunlynxutils import ( "errors" - "os" "sync" "time" @@ -94,15 +93,10 @@ func (p *AddRmServerProtocol) Start() error { func (p *AddRmServerProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var finalResultMessage []libunlynx.CipherText select { case finalResultMessage = <-finalResultAddrm: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/local_aggregation_protocol.go b/protocols/utils/local_aggregation_protocol.go index 7de617a..dc86a1c 100644 --- a/protocols/utils/local_aggregation_protocol.go +++ b/protocols/utils/local_aggregation_protocol.go @@ -8,7 +8,6 @@ import ( "github.com/ldsec/unlynx/lib/aggregation" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" - "os" "time" ) @@ -85,15 +84,10 @@ func (p *LocalAggregationProtocol) Start() error { func (p *LocalAggregationProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var finalResultMessage map[libunlynx.GroupingKey]libunlynx.FilteredResponse select { case finalResultMessage = <-finalResultAggr: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/local_clear_aggregation_protocol.go b/protocols/utils/local_clear_aggregation_protocol.go index 207b6e6..173b89c 100644 --- a/protocols/utils/local_clear_aggregation_protocol.go +++ b/protocols/utils/local_clear_aggregation_protocol.go @@ -8,7 +8,6 @@ import ( "github.com/ldsec/unlynx/lib/store" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" - "os" "time" ) @@ -59,15 +58,10 @@ func (p *LocalClearAggregationProtocol) Start() error { func (p *LocalClearAggregationProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var finalResultMessage []libunlynx.DpClearResponse select { case finalResultMessage = <-finalResultClearAggr: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/protocols/utils/proofs_verification_protocol.go b/protocols/utils/proofs_verification_protocol.go index b416fc2..f0c7de5 100644 --- a/protocols/utils/proofs_verification_protocol.go +++ b/protocols/utils/proofs_verification_protocol.go @@ -13,7 +13,6 @@ import ( "github.com/ldsec/unlynx/lib/shuffle" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" - "os" "time" ) @@ -110,15 +109,10 @@ func (p *ProofsVerificationProtocol) Start() error { func (p *ProofsVerificationProtocol) Dispatch() error { defer p.Done() - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - var finalResultMessage []bool select { case finalResultMessage = <-finalResult: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(p.ServerIdentity().String() + "didn't get the on time.") } diff --git a/services/service.go b/services/service.go index fb1025d..c0b709c 100644 --- a/services/service.go +++ b/services/service.go @@ -2,7 +2,6 @@ package servicesunlynx import ( "errors" - "os" "strconv" "time" @@ -715,11 +714,6 @@ func (s *Service) StartService(targetSurvey SurveyID, root bool) error { // ShufflingPhase performs the shuffling of the ClientResponses func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - survey, err := s.getSurvey(targetSurvey) if err != nil { return err @@ -738,7 +732,7 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { var tmpShufflingResult []libunlynx.CipherVector select { case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -755,11 +749,6 @@ func (s *Service) ShufflingPhase(targetSurvey SurveyID) error { // TaggingPhase performs the private grouping on the currently collected data. func (s *Service) TaggingPhase(targetSurvey SurveyID) error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - survey, err := s.getSurvey(targetSurvey) if err != nil { return err @@ -778,7 +767,7 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error { var tmpDeterministicTaggingResult []libunlynx.DeterministCipherText select { case tmpDeterministicTaggingResult = <-pi.(*protocolsunlynx.DeterministicTaggingProtocol).FeedbackChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -809,11 +798,6 @@ func (s *Service) TaggingPhase(targetSurvey SurveyID) error { // AggregationPhase performs the per-group aggregation on the currently grouped data. func (s *Service) AggregationPhase(targetSurvey SurveyID) error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - pi, err := s.StartProtocol(protocolsunlynx.CollectiveAggregationProtocolName, targetSurvey) if err != nil { return err @@ -822,7 +806,7 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error { var cothorityAggregatedData protocolsunlynx.CothorityAggregatedData select { case cothorityAggregatedData = <-pi.(*protocolsunlynx.CollectiveAggregationProtocol).FeedbackChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -838,11 +822,6 @@ func (s *Service) AggregationPhase(targetSurvey SurveyID) error { // DROPhase shuffles the list of noise values. func (s *Service) DROPhase(targetSurvey SurveyID) error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - pi, err := s.StartProtocol(protocolsunlynx.DROProtocolName, targetSurvey) if err != nil { return err @@ -856,7 +835,7 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error { var tmpShufflingResult []libunlynx.CipherVector select { case tmpShufflingResult = <-pi.(*protocolsunlynx.ShufflingProtocol).FeedbackChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } @@ -869,11 +848,6 @@ func (s *Service) DROPhase(targetSurvey SurveyID) error { // KeySwitchingPhase performs the switch to the querier's key on the currently aggregated data. func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error { - timeout, err := time.ParseDuration(os.Getenv("MEDCO_TIMEOUT")) - if err != nil { - timeout = libunlynx.TIMEOUT - } - pi, err := s.StartProtocol(protocolsunlynx.KeySwitchingProtocolName, targetSurvey) if err != nil { return err @@ -887,7 +861,7 @@ func (s *Service) KeySwitchingPhase(targetSurvey SurveyID) error { var tmpKeySwitchedAggregatedResponses libunlynx.CipherVector select { case tmpKeySwitchedAggregatedResponses = <-pi.(*protocolsunlynx.KeySwitchingProtocol).FeedbackChannel: - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New(s.ServerIdentity().String() + "didn't get the on time.") } diff --git a/simul/addrm_server_simul.go b/simul/addrm_server_simul.go index d70d174..78e96fa 100644 --- a/simul/addrm_server_simul.go +++ b/simul/addrm_server_simul.go @@ -53,8 +53,6 @@ func (sim *AddRmSimulation) Setup(dir string, hosts []string) (*onet.SimulationC // Run starts the simulation. func (sim *AddRmSimulation) Run(config *onet.SimulationConfig) error { - timeout := 10 * time.Minute - for round := 0; round < sim.Rounds; round++ { log.Lvl1("Starting round", round) @@ -92,7 +90,7 @@ func (sim *AddRmSimulation) Run(config *onet.SimulationConfig) error { case results := <-root.ProtocolInstance().(*protocolsunlynxutils.AddRmServerProtocol).FeedbackChannel: log.Lvl1("Number of aggregated lines: ", len(results)) libunlynx.EndTimer(round) - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New("simulation didn't finish in time") } } diff --git a/simul/local_aggregation_simul.go b/simul/local_aggregation_simul.go index 767e16d..ce2af5c 100644 --- a/simul/local_aggregation_simul.go +++ b/simul/local_aggregation_simul.go @@ -56,8 +56,6 @@ func (sim *LocalAggregationSimulation) Setup(dir string, hosts []string) (*onet. // Run starts the simulation. func (sim *LocalAggregationSimulation) Run(config *onet.SimulationConfig) error { - timeout := 10 * time.Minute - for round := 0; round < sim.Rounds; round++ { log.Lvl1("Starting round", round) rooti, err := config.Overlay.CreateProtocol("LocalAggregation", config.Tree, onet.NilServiceID) @@ -122,7 +120,7 @@ func (sim *LocalAggregationSimulation) Run(config *onet.SimulationConfig) error case results := <-root.ProtocolInstance().(*protocolsunlynxutils.LocalAggregationProtocol).FeedbackChannel: log.Lvl1("Number of aggregated lines: ", len(results)) libunlynx.EndTimer(round) - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New("simulation didn't finish in time") } } diff --git a/simul/local_clear_aggregation_simul.go b/simul/local_clear_aggregation_simul.go index cce609d..9c685e6 100644 --- a/simul/local_clear_aggregation_simul.go +++ b/simul/local_clear_aggregation_simul.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/BurntSushi/toml" "github.com/ldsec/unlynx/data" + libunlynx "github.com/ldsec/unlynx/lib" "github.com/ldsec/unlynx/protocols/utils" "go.dedis.ch/onet/v3" "go.dedis.ch/onet/v3/log" @@ -56,8 +57,6 @@ func (sim *LocalClearAggregationSimulation) Setup(dir string, hosts []string) (* // Run starts the simulation. func (sim *LocalClearAggregationSimulation) Run(config *onet.SimulationConfig) error { - timeout := 10 * time.Minute - for round := 0; round < sim.Rounds; round++ { log.Lvl1("Starting round", round) rooti, err := config.Overlay.CreateProtocol("LocalClearAggregation", config.Tree, onet.NilServiceID) @@ -98,7 +97,7 @@ func (sim *LocalClearAggregationSimulation) Run(config *onet.SimulationConfig) e log.Lvl1("Result is wrong! :(") } round.Record() - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New("simulation didn't finish in time") } } diff --git a/simul/proofs_verification_simul.go b/simul/proofs_verification_simul.go index d698ea7..f9baab9 100644 --- a/simul/proofs_verification_simul.go +++ b/simul/proofs_verification_simul.go @@ -56,8 +56,6 @@ func (sim *ProofsVerificationSimulation) Setup(dir string, hosts []string) (*one // Run starts the simulation. func (sim *ProofsVerificationSimulation) Run(config *onet.SimulationConfig) error { - timeout := 10 * time.Minute - for round := 0; round < sim.Rounds; round++ { log.Lvl1("Starting round", round) rooti, err := config.Overlay.CreateProtocol("ProofsVerification", config.Tree, onet.NilServiceID) @@ -252,7 +250,7 @@ func (sim *ProofsVerificationSimulation) Run(config *onet.SimulationConfig) erro } else if results[5] == false { return errors.New("collective aggregation proofs failed") } - case <-time.After(timeout): + case <-time.After(libunlynx.TIMEOUT): return errors.New("simulation didn't finish in time") } }