Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

analog-input-every-n-samples.py can cancel acquisition before reading all samples #402

Closed
astarche opened this issue Oct 21, 2021 · 2 comments · Fixed by #442
Closed

analog-input-every-n-samples.py can cancel acquisition before reading all samples #402

astarche opened this issue Oct 21, 2021 · 2 comments · Fixed by #442

Comments

@astarche
Copy link
Collaborator

astarche commented Oct 21, 2021

If you turn the SampClk rate up high enough (15000-20000 for me) the read_data loop often gets cancelled before acquiring the last batch of samples.

I was hoping that grpc queueing would be able to preserve ordering:

  • Events fired in order
  • messages enqueued on server in order
  • messages sent in order
  • messages received and enqueued in order
  • messages not necessarily processed in order BUT cancel enqueued on same queue as above
  • data processed before cancel

But somewhere that's not working.

AB#1754028

@astarche
Copy link
Collaborator Author

astarche commented Oct 22, 2021

My mistake is that the 2 streams are independent. A good way to think of it: if you just didn't read every-n-samples stream, the done event would still arrive eventually.

For the purposes of the example, I think we can change it so the DoneEvent only handles tear down on error. But the every-n-samples loop stops itself (either by counting samples or querying IsTaskDone).

If we want to fix this so that the example works as written/intended (or, basically so that the feature of using those events together works as well as it does in the C API), I think we need to change the API so that they're both on the same stream.

That could be:

rpc CreateEventStream(stream_name) -> steam EventStreamResponse;
rpc RegisterEveryNSamplesEventStream(stream_name, ...) -> SimpleStatusResponse;
rpc RegisterDoneEventStream(stream_name, ...) -> SimpleStatusResponse;
[...]

message EventStreamResponse {
   oneof { EveryNSampleResponse n_samples_response; DoneResponse done_response; }
};

I'm not sure how much work that is. The registration/lifetime on callback streams has been pretty tricky. But, if the pieces fit it might not be too bad.

Or we could defer that and leave callbacks alone until we add a "true data streaming" API.

@astarche
Copy link
Collaborator Author

astarche commented Nov 1, 2021

I have a prototype that seems to work with the code snippet below.

Basically there's a unary-unary API that allows creating a named stream (tied to the Task lifetime) and registering for events on that stream. Then you read both events from the same stream. This allows us to preserve ordering. And separating out a unary-unary API for create/register also gets rid of the wait_for_initial_metadata.

I'll work with @harsha-bhushan on whether we like this API and how we want to prioritize this. This is a fairly different implementation, so we'll likely have some of the same problems getting all of the synchronization ironed out.

Notice the slightly different method/message names (this is a new API on the existing service):

service NiDAQmx {
[...]
  rpc CreateEventStream(CreateEventStreamRequest) returns (CreateEventStreamResponse);
  rpc RegisterDoneEventStream(RegisterDoneEventStreamRequest) returns (RegisterDoneEventStreamResponse);
  rpc RegisterEveryNSamplesEventStream(RegisterEveryNSamplesEventStreamRequest) returns (RegisterEveryNSamplesEventStreamResponse);
  rpc ReadEventStream(ReadEventStreamRequest) returns (stream ReadEventStreamResponse);
[...]
}
message ReadEventStreamResponse {
  oneof event_data {
    DoneResponse done_response = 1;
    EveryNSamplesResponse every_n_samples_response = 2;
  }
}

Sample client code:

await raise_if_error_async(
    client.CreateEventStream(
        nidaqmx_types.CreateEventStreamRequest(
            task=task, stream_name="STREAM"
        )
    )
)

await raise_if_error_async(
    client.RegisterEveryNSamplesEventStream(
        nidaqmx_types.RegisterEveryNSamplesEventStreamRequest(
            task=task,
            stream_name="STREAM",
            n_samples=100,
            every_n_samples_event_type=nidaqmx_types.EVERY_N_SAMPLES_EVENT_TYPE_ACQUIRED_INTO_BUFFER,
        )
    )
)

await raise_if_error_async(
    client.RegisterDoneEventStream(
        nidaqmx_types.RegisterDoneEventStreamRequest(
            task=task, stream_name="STREAM"
        )
    )
)

await raise_if_error_async(
    client.StartTask(nidaqmx_types.StartTaskRequest(task=task))
)

event_stream = client.ReadEventStream(
    nidaqmx_types.ReadEventStreamRequest(task=task, stream_name="STREAM")
)

async def read_data():
    read_count = 0
    event_stream_response: nidaqmx_types.ReadEventStreamResponse
    async for event_stream_response in event_stream:
        which_oneof = event_stream_response.WhichOneof("event_data")
        if which_oneof == "every_n_samples_response":
            every_n_samples_response = (
                event_stream_response.every_n_samples_response
            )
            await raise_if_error(every_n_samples_response)
            read_response: nidaqmx_types.ReadAnalogF64Response = await raise_if_error_async(
                client.ReadAnalogF64(
                    nidaqmx_types.ReadAnalogF64Request(
                        task=task,
                        num_samps_per_chan=100,
                        fill_mode=nidaqmx_types.GroupBy.GROUP_BY_GROUP_BY_CHANNEL,
                        array_size_in_samps=number_of_channels * 100,
                    )
                )
            )
            print("Read Data:", read_response.read_array[:10])
            read_count = read_count + 1
            print(read_count)
        elif which_oneof == "done_response":
            done_response = event_stream_response.done_response
            event_stream.cancel()
            await raise_if_error(done_response)

await read_data()

astarche added a commit that referenced this issue Dec 2, 2021
Change the analog-input-every-n-samples event so that it doesn't assume that ordering is preserved between the every-n-samples event and the done event.

Fixes #402
astarche added a commit that referenced this issue Dec 2, 2021
Change the analog-input-every-n-samples event so that it doesn't assume that ordering is preserved between the every-n-samples event and the done event.

Fixes #402
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant