3 static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
4 int ncolumn, mrp_pep_value_t *data);
6 mrp_msg_t *create_register_message(mrp_pep_t *pep)
11 uint16_t ncolumn, type;
15 for (i = 0; i < pep->nowned; i++)
16 ncolumn += pep->owned[i].ncolumn;
17 for (i = 0; i < pep->nwatched; i++)
18 ncolumn += pep->watched[i].ncolumn;
20 msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_REGISTER),
21 MRP_PEPMSG_UINT32(MSGSEQ , 0),
22 MRP_PEPMSG_STRING(NAME , pep->name),
23 MRP_PEPMSG_UINT16(NTABLE , pep->nowned),
24 MRP_PEPMSG_UINT16(NWATCH , pep->nwatched),
25 MRP_PEPMSG_UINT16(NCOLDEF, ncolumn),
28 for (i = 0, t = pep->owned; i < pep->nowned; i++, t++) {
29 mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
30 mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
31 mrp_msg_append(msg, MRP_PEPMSG_SINT16(TBLIDX , t->idx_col));
32 for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
33 if (c->type == mqi_varchar)
34 type = mqi_blob + c->length;
38 mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
39 mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
43 for (i = 0, t = pep->watched; i < pep->nwatched; i++, t++) {
44 mrp_msg_append(msg, MRP_PEPMSG_STRING(TBLNAME, t->name));
45 mrp_msg_append(msg, MRP_PEPMSG_UINT16(NCOLUMN, t->ncolumn));
46 for (j = 0, c = t->columns; j < t->ncolumn; j++, c++) {
47 if (c->type == mqi_varchar)
48 type = mqi_blob + c->length;
52 mrp_msg_append(msg, MRP_PEPMSG_STRING(COLNAME, c->name));
53 mrp_msg_append(msg, MRP_PEPMSG_UINT16(COLTYPE, type));
62 int decode_register_message(mrp_msg_t *msg, mrp_pep_table_t *owned, int nowned,
63 mrp_pep_table_t *watched, int nwatched,
64 mqi_column_def_t *columns, int ncolumn)
70 uint16_t ntbl, nwch, ncol, type, idx_col;
75 if (!mrp_msg_iterate_get(msg, &it,
76 MRP_PEPMSG_UINT16(NTABLE , &ntbl),
77 MRP_PEPMSG_UINT16(NWATCH , &nwch),
78 MRP_PEPMSG_UINT16(NCOLDEF, &ncol),
82 if (ntbl > nowned || nwch > nwatched || ncol > ncolumn)
87 for (i = 0, t = owned; i < nowned; i++, t++) {
88 if (mrp_msg_iterate_get(msg, &it,
89 MRP_PEPMSG_STRING(TBLNAME, &name),
90 MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
91 MRP_PEPMSG_SINT16(TBLIDX , &idx_col),
101 for (j = 0; j < t->ncolumn; j++, c++, n++) {
105 if (mrp_msg_iterate_get(msg, &it,
106 MRP_PEPMSG_STRING(COLNAME, &name),
107 MRP_PEPMSG_UINT16(COLTYPE, &type),
111 if (type > mqi_blob) {
112 c->type = mqi_varchar;
113 c->length = type - mqi_blob;
121 for (i = 0, t = watched; i < nwatched; i++, t++) {
122 if (mrp_msg_iterate_get(msg, &it,
123 MRP_PEPMSG_STRING(TBLNAME, &name),
124 MRP_PEPMSG_UINT16(NCOLUMN, &ncol),
134 for (j = 0; j < t->ncolumn; j++, c++, n++) {
137 if (mrp_msg_iterate_get(msg, &it,
138 MRP_PEPMSG_STRING(COLNAME, &name),
139 MRP_PEPMSG_UINT16(COLTYPE, &type),
143 if (type > mqi_blob) {
144 c->type = mqi_varchar;
145 c->length = type - mqi_blob;
161 mrp_msg_t *create_ack_message(uint32_t seq)
163 return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_ACK),
164 MRP_PEPMSG_UINT32(MSGSEQ , seq),
169 mrp_msg_t *create_nak_message(uint32_t seq, int error, const char *errmsg)
171 return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NAK),
172 MRP_PEPMSG_UINT32(MSGSEQ , seq),
173 MRP_PEPMSG_SINT32(ERRCODE, error),
174 MRP_PEPMSG_STRING(ERRMSG , errmsg),
179 mrp_msg_t *create_notify_message(void)
181 return mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_NOTIFY),
182 MRP_PEPMSG_UINT32(MSGSEQ , 0),
183 MRP_PEPMSG_UINT16(NCHANGE, 0),
184 MRP_PEPMSG_UINT16(NTOTAL , 0),
189 int update_notify_message(mrp_msg_t *msg, int id, mqi_column_def_t *columns,
190 int ncolumn, mrp_pep_value_t *data, int nrow)
199 if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
200 !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nr )))
203 for (i = 0, v = data; i < nrow; i++, v += ncolumn) {
204 if (!append_one_row(msg, MRP_PEPTAG_DATA, columns, ncolumn, v))
212 int decode_notify_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
221 for (i = 0; i < data->nrow; i++) {
222 for (j = 0; j < data->ncolumn; j++) {
225 if (!mrp_msg_iterate_get(msg, it,
226 MRP_PEPMSG_STRING(DATA, &v->str),
232 if (!mrp_msg_iterate_get(msg, it,
233 MRP_PEPMSG_SINT32(DATA, &v->s32),
239 if (!mrp_msg_iterate_get(msg, it,
240 MRP_PEPMSG_UINT32(DATA, &v->u32),
246 if (!mrp_msg_iterate_get(msg, it,
247 MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
264 mrp_msg_t *create_set_message(uint32_t seq, mrp_pep_data_t *data, int ndata)
267 mrp_pep_value_t *vals;
268 mqi_column_def_t *defs;
269 uint16_t utable, utotal, tid, nval, nrow;
275 msg = mrp_msg_create(MRP_PEPMSG_UINT16(MSGTYPE, MRP_PEPMSG_SET),
276 MRP_PEPMSG_UINT32(MSGSEQ , seq),
277 MRP_PEPMSG_UINT16(NCHANGE, utable),
278 MRP_PEPMSG_UINT16(NTOTAL , 0),
282 for (i = 0; i < ndata; i++) {
284 vals = data[i].columns;
285 defs = data[i].coldefs;
286 nval = data[i].ncolumn;
289 if (!mrp_msg_append(msg, MRP_PEPMSG_UINT16(TBLID, tid)) ||
290 !mrp_msg_append(msg, MRP_PEPMSG_UINT16(NROW , nrow)))
293 for (j = 0; j < nrow; j++) {
294 if (!append_one_row(msg, MRP_PEPTAG_DATA, defs, nval, vals))
301 mrp_msg_set(msg, MRP_PEPMSG_UINT16(NTOTAL, utotal));
312 int decode_set_message(mrp_msg_t *msg, void **it, mrp_pep_data_t *data)
321 for (i = 0; i < data->nrow; i++) {
322 for (j = 0; j < data->ncolumn; j++) {
325 if (!mrp_msg_iterate_get(msg, it,
326 MRP_PEPMSG_STRING(DATA, &v->str),
332 if (!mrp_msg_iterate_get(msg, it,
333 MRP_PEPMSG_SINT32(DATA, &v->s32),
339 if (!mrp_msg_iterate_get(msg, it,
340 MRP_PEPMSG_UINT32(DATA, &v->u32),
346 if (!mrp_msg_iterate_get(msg, it,
347 MRP_PEPMSG_DOUBLE(DATA, &v->dbl),
364 static int append_one_row(mrp_msg_t *msg, uint16_t tag, mqi_column_def_t *col,
365 int ncolumn, mrp_pep_value_t *data)
367 #define HANDLE_TYPE(dbtype, type, member) \
369 if (!mrp_msg_append(msg, MRP_MSG_TAG_##type(tag, data->member))) \
375 for (i = 0; i < ncolumn; i++, data++, col++) {
377 HANDLE_TYPE(integer , SINT32, s32);
378 HANDLE_TYPE(unsignd , UINT32, u32);
379 HANDLE_TYPE(floating, DOUBLE, dbl);
380 HANDLE_TYPE(string , STRING, str);