1MONGOC_CHANGE_STREAM_T(3) MongoDB C Driver MONGOC_CHANGE_STREAM_T(3)
2
3
4
6 mongoc_change_stream_t - mongoc_change_stream_t
7
9 #include <mongoc/mongoc.h>
10
11 typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13 mongoc_change_stream_t is a handle to a change stream. A collection
14 change stream can be obtained using mongoc_collection_watch.
15
16 It is recommended to use a mongoc_change_stream_t and its functions
17 instead of a raw aggregation with a $changeStream stage. For more
18 information see the MongoDB Manual Entry on Change Streams.
19
21 example-collection-watch.c.INDENT 0.0
22
23 #include <mongoc/mongoc.h>
24
25 int
26 main ()
27 {
28 bson_t empty = BSON_INITIALIZER;
29 const bson_t *doc;
30 bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31 const bson_t *err_doc;
32 bson_error_t error;
33 const char *uri_string;
34 mongoc_uri_t *uri;
35 mongoc_client_t *client;
36 mongoc_collection_t *coll;
37 mongoc_change_stream_t *stream;
38 mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39 bson_t opts = BSON_INITIALIZER;
40 bool r;
41
42 mongoc_init ();
43
44 uri_string = "mongodb://"
45 "localhost:27017,localhost:27018,localhost:"
46 "27019/db?replicaSet=rs0";
47
48 uri = mongoc_uri_new_with_error (uri_string, &error);
49 if (!uri) {
50 fprintf (stderr,
51 "failed to parse URI: %s\n"
52 "error message: %s\n",
53 uri_string,
54 error.message);
55 return EXIT_FAILURE;
56 }
57
58 client = mongoc_client_new_from_uri (uri);
59 if (!client) {
60 return EXIT_FAILURE;
61 }
62
63 coll = mongoc_client_get_collection (client, "db", "coll");
64 stream = mongoc_collection_watch (coll, &empty, NULL);
65
66 mongoc_write_concern_set_wmajority (wc, 10000);
67 mongoc_write_concern_append (wc, &opts);
68 r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69 if (!r) {
70 fprintf (stderr, "Error: %s\n", error.message);
71 return EXIT_FAILURE;
72 }
73
74 while (mongoc_change_stream_next (stream, &doc)) {
75 char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76 fprintf (stderr, "Got document: %s\n", as_json);
77 bson_free (as_json);
78 }
79
80 if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81 if (!bson_empty (err_doc)) {
82 fprintf (stderr,
83 "Server Error: %s\n",
84 bson_as_relaxed_extended_json (err_doc, NULL));
85 } else {
86 fprintf (stderr, "Client Error: %s\n", error.message);
87 }
88 return EXIT_FAILURE;
89 }
90
91 bson_destroy (to_insert);
92 mongoc_write_concern_destroy (wc);
93 bson_destroy (&opts);
94 mongoc_change_stream_destroy (stream);
95 mongoc_collection_destroy (coll);
96 mongoc_uri_destroy (uri);
97 mongoc_client_destroy (client);
98 mongoc_cleanup ();
99
100 return EXIT_SUCCESS;
101 }
102
103
104 Starting and Resuming
105 All watch functions accept several options to indicate where a change
106 stream should start returning changes from: resumeAfter, startAfter,
107 and startAtOperationTime.
108
109 All changes returned by mongoc_change_stream_next include a resume
110 token in the _id field. MongoDB 4.2 also includes an additional resume
111 token in each "aggregate" and "getMore" command response, which points
112 to the end of that response's batch. The current token is automatically
113 cached by libmongoc. In the event of an error, libmongoc attempts to
114 recreate the change stream starting where it left off by passing the
115 cached resume token. libmongoc only attempts to resume once, but client
116 applications can access the cached resume token with mon‐
117 goc_change_stream_get_resume_token and use it for their own resume
118 logic by passing it as either the resumeAfter or startAfter option.
119
120 Additionally, change streams can start returning changes at an opera‐
121 tion time by using the startAtOperationTime field. This can be the
122 timestamp returned in the operationTime field of a command reply.
123
124 resumeAfter, startAfter, and startAtOperationTime are mutually exclu‐
125 sive options. Setting more than one will result in a server error.
126
127 The following example implements custom resuming logic, persisting the
128 resume token in a file. example-resume.c.INDENT 0.0
129
130 #include <mongoc/mongoc.h>
131
132 /* An example implementation of custom resume logic in a change stream.
133 * example-resume starts a client-wide change stream and persists the resume
134 * token in a file "resume-token.json". On restart, if "resume-token.json"
135 * exists, the change stream starts watching after the persisted resume token.
136 *
137 * This behavior allows a user to exit example-resume, and restart it later
138 * without missing any change events.
139 */
140 #include <unistd.h>
141
142 static const char *RESUME_TOKEN_PATH = "resume-token.json";
143
144 static bool
145 _save_resume_token (const bson_t *doc)
146 {
147 FILE *file_stream;
148 bson_iter_t iter;
149 bson_t resume_token_doc;
150 char *as_json = NULL;
151 size_t as_json_len;
152 ssize_t r, n_written;
153 const bson_value_t *resume_token;
154
155 if (!bson_iter_init_find (&iter, doc, "_id")) {
156 fprintf (stderr, "reply does not contain operationTime.");
157 return false;
158 }
159 resume_token = bson_iter_value (&iter);
160 /* store the resume token in a document, { resumeAfter: <resume token> }
161 * which we can later append easily. */
162 file_stream = fopen (RESUME_TOKEN_PATH, "w+");
163 if (!file_stream) {
164 fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
165 return false;
166 }
167 bson_init (&resume_token_doc);
168 BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
169 as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
170 bson_destroy (&resume_token_doc);
171 n_written = 0;
172 while (n_written < as_json_len) {
173 r = fwrite ((void *) (as_json + n_written),
174 sizeof (char),
175 as_json_len - n_written,
176 file_stream);
177 if (r == -1) {
178 fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
179 bson_free (as_json);
180 fclose (file_stream);
181 return false;
182 }
183 n_written += r;
184 }
185
186 bson_free (as_json);
187 fclose (file_stream);
188 return true;
189 }
190
191 bool
192 _load_resume_token (bson_t *opts)
193 {
194 bson_error_t error;
195 bson_json_reader_t *reader;
196 bson_t doc;
197
198 /* if the file does not exist, skip. */
199 if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
200 return true;
201 }
202 reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
203 if (!reader) {
204 fprintf (stderr,
205 "failed to open %s for reading: %s\n",
206 RESUME_TOKEN_PATH,
207 error.message);
208 return false;
209 }
210
211 bson_init (&doc);
212 if (-1 == bson_json_reader_read (reader, &doc, &error)) {
213 fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
214 bson_destroy (&doc);
215 bson_json_reader_destroy (reader);
216 return false;
217 }
218
219 printf ("found cached resume token in %s, resuming change stream.\n",
220 RESUME_TOKEN_PATH);
221
222 bson_concat (opts, &doc);
223 bson_destroy (&doc);
224 bson_json_reader_destroy (reader);
225 return true;
226 }
227
228 int
229 main ()
230 {
231 int exit_code = EXIT_FAILURE;
232 const char *uri_string;
233 mongoc_uri_t *uri = NULL;
234 bson_error_t error;
235 mongoc_client_t *client = NULL;
236 bson_t pipeline = BSON_INITIALIZER;
237 bson_t opts = BSON_INITIALIZER;
238 mongoc_change_stream_t *stream = NULL;
239 const bson_t *doc;
240
241 const int max_time = 30; /* max amount of time, in seconds, that
242 mongoc_change_stream_next can block. */
243
244 mongoc_init ();
245 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
246 uri = mongoc_uri_new_with_error (uri_string, &error);
247 if (!uri) {
248 fprintf (stderr,
249 "failed to parse URI: %s\n"
250 "error message: %s\n",
251 uri_string,
252 error.message);
253 goto cleanup;
254 }
255
256 client = mongoc_client_new_from_uri (uri);
257 if (!client) {
258 goto cleanup;
259 }
260
261 if (!_load_resume_token (&opts)) {
262 goto cleanup;
263 }
264 BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
265
266 printf ("listening for changes on the client (max %d seconds).\n", max_time);
267 stream = mongoc_client_watch (client, &pipeline, &opts);
268
269 while (mongoc_change_stream_next (stream, &doc)) {
270 char *as_json;
271
272 as_json = bson_as_canonical_extended_json (doc, NULL);
273 printf ("change received: %s\n", as_json);
274 bson_free (as_json);
275 if (!_save_resume_token (doc)) {
276 goto cleanup;
277 }
278 }
279
280 exit_code = EXIT_SUCCESS;
281
282 cleanup:
283 mongoc_uri_destroy (uri);
284 bson_destroy (&pipeline);
285 bson_destroy (&opts);
286 mongoc_change_stream_destroy (stream);
287 mongoc_client_destroy (client);
288 mongoc_cleanup ();
289 return exit_code;
290 }
291
292
293The following example shows using startAtOperationTime to synchronize a change
294stream with another operation. example-start-at-optime.c.INDENT 0.0
295
296 /* An example of starting a change stream with startAtOperationTime. */
297 #include <mongoc/mongoc.h>
298
299 int
300 main ()
301 {
302 int exit_code = EXIT_FAILURE;
303 const char *uri_string;
304 mongoc_uri_t *uri = NULL;
305 bson_error_t error;
306 mongoc_client_t *client = NULL;
307 mongoc_collection_t *coll = NULL;
308 bson_t pipeline = BSON_INITIALIZER;
309 bson_t opts = BSON_INITIALIZER;
310 mongoc_change_stream_t *stream = NULL;
311 bson_iter_t iter;
312 const bson_t *doc;
313 bson_value_t cached_operation_time = {0};
314 int i;
315 bool r;
316
317 mongoc_init ();
318 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
319 uri = mongoc_uri_new_with_error (uri_string, &error);
320 if (!uri) {
321 fprintf (stderr,
322 "failed to parse URI: %s\n"
323 "error message: %s\n",
324 uri_string,
325 error.message);
326 goto cleanup;
327 }
328
329 client = mongoc_client_new_from_uri (uri);
330 if (!client) {
331 goto cleanup;
332 }
333
334 /* insert five documents. */
335 coll = mongoc_client_get_collection (client, "db", "coll");
336 for (i = 0; i < 5; i++) {
337 bson_t reply;
338 bson_t *insert_cmd = BCON_NEW ("insert",
339 "coll",
340 "documents",
341 "[",
342 "{",
343 "x",
344 BCON_INT64 (i),
345 "}",
346 "]");
347
348 r = mongoc_collection_write_command_with_opts (
349 coll, insert_cmd, NULL, &reply, &error);
350 bson_destroy (insert_cmd);
351 if (!r) {
352 bson_destroy (&reply);
353 fprintf (stderr, "failed to insert: %s\n", error.message);
354 goto cleanup;
355 }
356 if (i == 0) {
357 /* cache the operation time in the first reply. */
358 if (bson_iter_init_find (&iter, &reply, "operationTime")) {
359 bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
360 } else {
361 fprintf (stderr, "reply does not contain operationTime.");
362 bson_destroy (&reply);
363 goto cleanup;
364 }
365 }
366 bson_destroy (&reply);
367 }
368
369 /* start a change stream at the first returned operationTime. */
370 BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
371 stream = mongoc_collection_watch (coll, &pipeline, &opts);
372
373 /* since the change stream started at the operation time of the first
374 * insert, the five inserts are returned. */
375 printf ("listening for changes on db.coll:\n");
376 while (mongoc_change_stream_next (stream, &doc)) {
377 char *as_json;
378
379 as_json = bson_as_canonical_extended_json (doc, NULL);
380 printf ("change received: %s\n", as_json);
381 bson_free (as_json);
382 }
383
384 exit_code = EXIT_SUCCESS;
385
386 cleanup:
387 mongoc_uri_destroy (uri);
388 bson_destroy (&pipeline);
389 bson_destroy (&opts);
390 if (cached_operation_time.value_type) {
391 bson_value_destroy (&cached_operation_time);
392 }
393 mongoc_change_stream_destroy (stream);
394 mongoc_collection_destroy (coll);
395 mongoc_client_destroy (client);
396 mongoc_cleanup ();
397 return exit_code;
398 }
399
401 MongoDB, Inc
402
404 2017-present, MongoDB, Inc
405
406
407
408
4091.15.2 Nov 06, 2019 MONGOC_CHANGE_STREAM_T(3)