Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SST reader tries to use the writer's DataPlane or fail #3356

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 73 additions & 35 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,29 @@ static FMStructDescRec CP_DP_WriterArrayStructs[] = {
{"SstParams", NULL, sizeof(struct _SstParams), NULL},
{NULL, NULL, 0, NULL}};

static FMField CP_DPQueryList[] = {
{"writer_ID", "integer", sizeof(void *),
FMOffset(struct _DPQueryMsg *, WriterFile)},
{"writer_response_condition", "integer", sizeof(int),
FMOffset(struct _DPQueryMsg *, WriterResponseCondition)},
{NULL, NULL, 0, 0}};

static FMStructDescRec CP_DPQueryStructs[] = {
{"DPQuery", CP_DPQueryList, sizeof(struct _DPQueryMsg), NULL},
{NULL, NULL, 0, NULL}};

static FMField CP_DPQueryResponseList[] = {
{"writer_response_condition", "integer", sizeof(int),
FMOffset(struct _DPQueryResponseMsg *, WriterResponseCondition)},
{"writer_data_plane", "string", sizeof(char *),
FMOffset(struct _DPQueryResponseMsg *, OperativeDP)},
{NULL, NULL, 0, 0}};

static FMStructDescRec CP_DPQueryResponseStructs[] = {
{"DPQueryResponse", CP_DPQueryResponseList,
sizeof(struct _DPQueryResponseMsg), NULL},
{NULL, NULL, 0, NULL}};

static FMField CP_ReaderRegisterList[] = {
{"writer_ID", "integer", sizeof(void *),
FMOffset(struct _ReaderRegisterMsg *, WriterFile)},
Expand Down Expand Up @@ -954,41 +977,17 @@ static void FreeCustomStructs(CP_StructList *List)
free(List->CustomStructList);
}

static void doCMFormatRegistration(CP_GlobalCMInfo CPInfo,
CP_DP_Interface DPInfo)
static void doPrelimCMFormatRegistration(CP_GlobalCMInfo CPInfo)
{
FMStructDescList FullReaderRegisterStructs, FullWriterResponseStructs,
CombinedTimestepMetadataStructs;

FullReaderRegisterStructs =
combineCpDpFormats(CP_ReaderRegisterStructs, CP_ReaderInitStructs,
DPInfo->ReaderContactFormats);
CPInfo->ReaderRegisterFormat =
CMregister_format(CPInfo->cm, FullReaderRegisterStructs);
CMregister_handler(CPInfo->ReaderRegisterFormat, CP_ReaderRegisterHandler,
NULL);
AddCustomStruct(&CPInfo->CustomStructs, FullReaderRegisterStructs);

FullWriterResponseStructs =
combineCpDpFormats(CP_WriterResponseStructs, CP_WriterInitStructs,
DPInfo->WriterContactFormats);
CPInfo->WriterResponseFormat =
CMregister_format(CPInfo->cm, FullWriterResponseStructs);
CMregister_handler(CPInfo->WriterResponseFormat, CP_WriterResponseHandler,
NULL);
AddCustomStruct(&CPInfo->CustomStructs, FullWriterResponseStructs);

CombinedTimestepMetadataStructs = combineCpDpFormats(
TimestepMetadataStructs, NULL, DPInfo->TimestepInfoFormats);
CPInfo->DeliverTimestepMetadataFormat =
CMregister_format(CPInfo->cm, CombinedTimestepMetadataStructs);
CMregister_handler(CPInfo->DeliverTimestepMetadataFormat,
CP_TimestepMetadataHandler, NULL);
AddCustomStruct(&CPInfo->CustomStructs, CombinedTimestepMetadataStructs);

CPInfo->PeerSetupFormat = CMregister_format(CPInfo->cm, PeerSetupStructs);
CMregister_handler(CPInfo->PeerSetupFormat, CP_PeerSetupHandler, NULL);

CPInfo->DPQueryFormat = CMregister_format(CPInfo->cm, CP_DPQueryStructs);
CMregister_handler(CPInfo->DPQueryFormat, CP_DPQueryHandler, NULL);
CPInfo->DPQueryResponseFormat =
CMregister_format(CPInfo->cm, CP_DPQueryResponseStructs);
CMregister_handler(CPInfo->DPQueryResponseFormat, CP_DPQueryResponseHandler,
NULL);
CPInfo->ReaderActivateFormat =
CMregister_format(CPInfo->cm, ReaderActivateStructs);
CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler,
Expand Down Expand Up @@ -1018,6 +1017,39 @@ static void doCMFormatRegistration(CP_GlobalCMInfo CPInfo,
CMregister_handler(CPInfo->ReaderCloseFormat, CP_ReaderCloseHandler, NULL);
}

static void doFinalCMFormatRegistration(CP_GlobalCMInfo CPInfo,
CP_DP_Interface DPInfo)
{
FMStructDescList FullReaderRegisterStructs, FullWriterResponseStructs,
CombinedTimestepMetadataStructs;

FullReaderRegisterStructs =
combineCpDpFormats(CP_ReaderRegisterStructs, CP_ReaderInitStructs,
DPInfo->ReaderContactFormats);
CPInfo->ReaderRegisterFormat =
CMregister_format(CPInfo->cm, FullReaderRegisterStructs);
CMregister_handler(CPInfo->ReaderRegisterFormat, CP_ReaderRegisterHandler,
NULL);
AddCustomStruct(&CPInfo->CustomStructs, FullReaderRegisterStructs);

FullWriterResponseStructs =
combineCpDpFormats(CP_WriterResponseStructs, CP_WriterInitStructs,
DPInfo->WriterContactFormats);
CPInfo->WriterResponseFormat =
CMregister_format(CPInfo->cm, FullWriterResponseStructs);
CMregister_handler(CPInfo->WriterResponseFormat, CP_WriterResponseHandler,
NULL);
AddCustomStruct(&CPInfo->CustomStructs, FullWriterResponseStructs);

CombinedTimestepMetadataStructs = combineCpDpFormats(
TimestepMetadataStructs, NULL, DPInfo->TimestepInfoFormats);
CPInfo->DeliverTimestepMetadataFormat =
CMregister_format(CPInfo->cm, CombinedTimestepMetadataStructs);
CMregister_handler(CPInfo->DeliverTimestepMetadataFormat,
CP_TimestepMetadataHandler, NULL);
AddCustomStruct(&CPInfo->CustomStructs, CombinedTimestepMetadataStructs);
}

static void doFFSFormatRegistration(CP_Info CPInfo, CP_DP_Interface DPInfo)
{
FMStructDescList PerRankReaderStructs, CombinedReaderStructs;
Expand Down Expand Up @@ -1422,7 +1454,15 @@ static void CP_versionError(CMConnection conn, char *formatName)
"with the same version of ADIOS2.\n");
}

extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule)
extern void FinalizeCPInfo(CP_Info StreamCP, CP_DP_Interface DPInfo)
{
pthread_mutex_lock(&StateMutex);
doFinalCMFormatRegistration(SharedCMInfo, DPInfo);
doFFSFormatRegistration(StreamCP, DPInfo);
pthread_mutex_unlock(&StateMutex);
}

extern CP_Info CP_getCPInfo(char *ControlModule)
{
CP_Info StreamCP;

Expand Down Expand Up @@ -1501,7 +1541,7 @@ extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule)
CP_WriterResponseStructs[i].field_list = CP_SstParamsList;
}
}
doCMFormatRegistration(SharedCMInfo, DPInfo);
doPrelimCMFormatRegistration(SharedCMInfo);
}
SharedCMInfoRefCount++;
pthread_mutex_unlock(&StateMutex);
Expand All @@ -1511,8 +1551,6 @@ extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule)
StreamCP->fm_c = create_local_FMcontext();
StreamCP->ffs_c = create_FFSContext_FM(StreamCP->fm_c);

doFFSFormatRegistration(StreamCP, DPInfo);

return StreamCP;
}

Expand Down
31 changes: 30 additions & 1 deletion source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ typedef struct _CP_GlobalCMInfo
{
/* exchange info */
CManager cm;
CMFormat DPQueryFormat;
CMFormat DPQueryResponseFormat;
CMFormat ReaderRegisterFormat;
CMFormat WriterResponseFormat;
CMFormat DeliverTimestepMetadataFormat;
Expand Down Expand Up @@ -296,6 +298,27 @@ struct _MetadataPlusDPInfo
void *DP_TimestepInfo;
};

/*
* Data Plane Query messages are sent from reader rank 0 to writer rank 0
* and represent the reader asking what DP the writer is using
*/
struct _DPQueryMsg
{
void *WriterFile;
int WriterResponseCondition;
};

/*
* Data Plane Query responses messages are sent from writer rank 0 to reader
* rank 0 and tell the reader what Data plane the writer is using (and which the
* reader should use);
*/
struct _DPQueryResponseMsg
{
int WriterResponseCondition;
char *OperativeDP;
};

/*
* Reader register messages are sent from reader rank 0 to writer rank 0
* They contain basic info, plus contact information for each reader rank
Expand Down Expand Up @@ -501,7 +524,8 @@ typedef struct _MetadataPlusDPInfo *MetadataPlusDPInfo;
extern atom_t CM_TRANSPORT_ATOM;

void CP_validateParams(SstStream stream, SstParams Params, int Writer);
extern CP_Info CP_getCPInfo(CP_DP_Interface DPInfo, char *ControlModule);
extern void FinalizeCPInfo(CP_Info Info, CP_DP_Interface DPInfo);
extern CP_Info CP_getCPInfo(char *ControlModule);
extern char *CP_GetContactString(SstStream s, attr_list DPAttrs);
extern SstStream CP_newStream();
extern void SstInternalProvideTimestep(
Expand All @@ -516,6 +540,11 @@ void **CP_consolidateDataToAll(SstStream stream, void *local_info,
FFSTypeHandle type, void **ret_data_block);
void *CP_distributeDataFromRankZero(SstStream stream, void *root_info,
FFSTypeHandle type, void **ret_data_block);
extern void CP_DPQueryHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_data, attr_list attrs);
extern void CP_DPQueryResponseHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_data,
attr_list attrs);
extern void CP_ReaderRegisterHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_data,
attr_list attrs);
Expand Down
109 changes: 104 additions & 5 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
char *Filename = strdup(Name);
CMConnection rank0_to_rank0_conn = NULL;
void *WriterFileID;
char NeededDataPlane[32] = {
0}; // Don't name a data plane longer than 31 chars

Stream = CP_newStream();
Stream->Role = ReaderRole;
Expand All @@ -475,11 +477,7 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
CP_validateParams(Stream, Params, 0 /* reader */);
Stream->ConfigParams = Params;

Stream->DP_Interface =
SelectDP(&Svcs, Stream, Stream->ConfigParams, Stream->Rank);

Stream->CPInfo =
CP_getCPInfo(Stream->DP_Interface, Stream->ConfigParams->ControlModule);
Stream->CPInfo = CP_getCPInfo(Stream->ConfigParams->ControlModule);

Stream->FinalTimestep = INT_MAX; /* set this on close */
Stream->LastDPNotifiedTimestep = -1;
Expand All @@ -497,6 +495,71 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, SMPI_Comm comm)
return NULL;
}

if (Stream->Rank == 0)
{
struct _DPQueryMsg DPQuery;
memset(&DPQuery, 0, sizeof(DPQuery));

DPQuery.WriterFile = WriterFileID;
DPQuery.WriterResponseCondition =
CMCondition_get(Stream->CPInfo->SharedCM->cm, rank0_to_rank0_conn);

CMCondition_set_client_data(Stream->CPInfo->SharedCM->cm,
DPQuery.WriterResponseCondition,
&NeededDataPlane[0]);

if (CMwrite(rank0_to_rank0_conn,
Stream->CPInfo->SharedCM->DPQueryFormat, &DPQuery) != 1)
{
CP_verbose(
Stream, CriticalVerbose,
"DPQuery message failed to send to writer in SstReaderOpen\n");
}

/* wait for "go" from writer */
CP_verbose(
Stream, PerRankVerbose,
"Waiting for writer DPResponse message in SstReadOpen(\"%s\")\n",
Filename, DPQuery.WriterResponseCondition);
CMCondition_wait(Stream->CPInfo->SharedCM->cm,
DPQuery.WriterResponseCondition);
CP_verbose(Stream, PerRankVerbose,
"finished wait writer DPresponse message in read_open, "
"WRITER is using \"%s\" DataPlane\n",
&NeededDataPlane[0]);

// NeededDP should now contain the name of the dataplane the writer is
// using
SMPI_Bcast(&NeededDataPlane[0], sizeof(NeededDataPlane), SMPI_CHAR, 0,
Stream->mpiComm);
}
else
{
SMPI_Bcast(&NeededDataPlane[0], sizeof(NeededDataPlane), SMPI_CHAR, 0,
Stream->mpiComm);
}
{
char *RequestedDP = Stream->ConfigParams->DataTransport;
Stream->ConfigParams->DataTransport = strdup(&NeededDataPlane[0]);
Stream->DP_Interface =
SelectDP(&Svcs, Stream, Stream->ConfigParams, Stream->Rank);
if (Stream->DP_Interface)
if (strcmp(Stream->DP_Interface->DPName, &NeededDataPlane[0]) != 0)
{
fprintf(stderr,
"The writer is using the %s DataPlane for SST data "
"transport, but the reader has failed to load this "
"transport. Communication cannot occur. See the SST "
"DataTransport engine parameter to force a match.",
NeededDataPlane);
return NULL;
}
if (RequestedDP)
free(RequestedDP);
}

FinalizeCPInfo(Stream->CPInfo, Stream->DP_Interface);

Stream->DP_Stream = Stream->DP_Interface->initReader(
&Svcs, Stream, &dpInfo, Stream->ConfigParams, WriterContactAttributes,
&Stream->Stats);
Expand Down Expand Up @@ -1026,6 +1089,42 @@ void CP_WriterResponseHandler(CManager cm, CMConnection conn, void *Msg_v,
PERFSTUBS_TIMER_STOP_FUNC(timer);
}

// CP_DPQueryResponseHandler is called by the network handler thread to
// handle DPQueryResponse messages. One of these will be sent to rank0
// reader from rank0 writer in response to the DPQuery message.
// It will find rank0 writer in CMCondition_wait(). It's only action
// is to associate the incoming response message to the CMcondition
// we're waiting on,m so no locking is necessary.
void CP_DPQueryResponseHandler(CManager cm, CMConnection conn, void *Msg_v,
void *client_data, attr_list attrs)
{
PERFSTUBS_REGISTER_THREAD();
PERFSTUBS_TIMER_START_FUNC(timer);
struct _DPQueryResponseMsg *Msg = (struct _DPQueryResponseMsg *)Msg_v;
char *NeededDP_ptr;

// fprintf(stderr, "Received a writer_response message for condition
// %d\n",
// Msg->WriterResponseCondition);
// fprintf(stderr, "The responding writer has cohort of size %d :\n",
// Msg->writer_CohortSize);
// for (int i = 0; i < Msg->writer_CohortSize; i++) {
// fprintf(stderr, " rank %d CP contact info: %s, %p\n", i,
// Msg->CP_WriterInfo[i]->ContactInfo,
// Msg->CP_WriterInfo[i]->WriterID);
// }

/* attach the message to the CMCondition so it an be retrieved by the main
* thread */
NeededDP_ptr =
CMCondition_get_client_data(cm, Msg->WriterResponseCondition);
strcpy(NeededDP_ptr, Msg->OperativeDP);

/* wake the main thread */
CMCondition_signal(cm, Msg->WriterResponseCondition);
PERFSTUBS_TIMER_STOP_FUNC(timer);
}

// CP_WriterCloseHandler is called by the network handler thread to
// handle WriterResponse messages. One of these will be sent to rank0
// reader from rank0 writer in response to the ReaderRegister message.
Expand Down
Loading