-
Notifications
You must be signed in to change notification settings - Fork 2
/
input_queue.c
154 lines (128 loc) · 5.76 KB
/
input_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#ifndef _STDLIB_H
#include <stdlib.h>
#endif // _STDLIB_H
#ifndef _STRING_H
#include <string.h>
#endif // _STRING_H
#include "input_queue.h"
/*queue_t * queue_create() {
queue_t * queue = calloc(1, sizeof(* queue));
return queue;
}*/
/** @brief Функция начальной инициализации очереди
* @param queue - указатель очередь
* @return В случае успеха возвращает 1.
*/
int queue_init(queue_t *queue) {
int i;
assert(queue);
// Заполнить начальными значениями
queue->read_pos = queue->write_pos = queue->size = 0;
for (i = 0; i < MAX_QUEUE_SIZE; i++)
memset(&queue->data[i], 0, sizeof(queue->data[0]));
return 1;
}
/** @brief Функция для добавления элемента в конец очереди
* @param queue - указатель на очередь
* @param src - указатель на элемент
* @return Возвращает 1 в случае успеха иначе 0.
*/
int queue_push(queue_t *queue, const void *src) {
message_t const *new_msg = (message_t *)src;
message_t *cur_msg = 0;
int i;
assert(queue);
/* есть место для записи */
// добавлена возможность пероезаписи элементов
/*if(queue->size >= queue->max_size)
return 0;*/
// Выполнить проверку на дублирование сообщений. В один момент
// времени в очереди не может существовать нескольких сообщений
// от одного источника к одному устройству.
// если такое случится, то необходимо обновить существующее сообщ. новым.
for (i = queue->read_pos; i != queue->write_pos;
i = (i + 1) % MAX_QUEUE_SIZE) {
cur_msg = &queue->data[i];
// Для замены сообщения источник данных и адрес устройства должны совпадать.
// Адрес устройства - 0 байт в массиве сообщения
if (cur_msg->src_connection == new_msg->src_connection &&
cur_msg->pdu.data[0] == new_msg->pdu.data[0]) {
// обновить сообщение
memcpy(cur_msg, new_msg, sizeof(message_t));
return 1;
}
}
// Если мы здесь, то добавляем сообщения в конец очереди
/* Перенести данные в буфер*/
memcpy(&queue->data[queue->write_pos], src, sizeof(message_t));
/* Сдвинуть указатель записи */
queue->write_pos = (queue->write_pos + 1) % MAX_QUEUE_SIZE;
if (queue->size < MAX_QUEUE_SIZE) queue->size = queue->size + 1;
return 1;
}
/** @brief Функция для извлечения элемента из головы очереди
* @param queue - указатель на очередь
* @param src - указатель на элемент
* @return Возвращает 1 в случае успеха иначе 0.
*/
int queue_pop(queue_t *queue, void *dst) {
assert(queue);
if (queue->size == 0) return 0;
/* Вычислить адрес памяти, куда будет добавлен объект
адрес = начало буфера + номер записи * размер элемента */
// uint8_t * ptr = queue->data + queue->read_pos * queue->element_size;
/* Перенести данные и уменшить размер очереди */
memcpy(dst, &queue->data[queue->read_pos], sizeof(message_t));
queue->size--;
queue->read_pos = (queue->read_pos + 1) % MAX_QUEUE_SIZE; // queue->max_size;
return 1;
}
/** @brief Функция для определения факта переполнения очережи
* @param queue - указатель на очередь
* @return Возвращает 1 в случае переполнения.
*/
int queue_overflow(queue_t *queue) {
assert(queue);
return queue->size >= MAX_QUEUE_SIZE;
}
/** @brief Функция для определения размера очереди
* @param queue - указатель на очередьмент
* @return Возвращает размер очереди
*/
int queue_get_size(queue_t *queue) {
assert(queue);
return queue->size;
}
/** @brief Функция для сброса очереди
* @param queue - указатель на очередь
*/
void queue_flush(queue_t *queue) {
assert(queue);
queue->read_pos = queue->write_pos = queue->size = 0;
}
/** @brief Функция для уничтоженя очереди
* @param queue - указатель на очередь
*/
void queue_free(queue_t *queue) {
assert(queue);
free(queue);
}
// Вывод сообщений, связанных с поключение из очереди
void queue_invalidate_recs(queue_t *queue, connection_t const *conn) {
int i;
assert(queue);
assert(conn);
// Цикл по всем сообщениям в очереди.
// Если указатель подключения в сообщении совпадает с указателм
// conn, сообщению будет установлена некорректная метка времени
// а так же обнулен указатель на подключение - источник,
// что исключит его из очереди обработки.
for (i = queue->read_pos; i != queue->write_pos;
i = (i + 1) % MAX_QUEUE_SIZE) {
// "Повредить" метку времени и указатель на коннект
if (queue->data[i].src_connection == conn) {
queue->data[i].timestamp = (struct timeval){0, 0};
queue->data[i].src_connection = 0;
}
}
}