1MONGOC_CHANGE_STREAM_T(3) libmongoc MONGOC_CHANGE_STREAM_T(3)
2
3
4
6 mongoc_change_stream_t - mongoc_change_stream_t
7
9 #include <mongoc/mongoc.h>
10
11 typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13 mongoc_change_stream_t is a handle to a change stream. A collection
14 change stream can be obtained using mongoc_collection_watch().
15
16 It is recommended to use a mongoc_change_stream_t and its functions in‐
17 stead of a raw aggregation with a $changeStream stage. For more infor‐
18 mation see the MongoDB Manual Entry on Change Streams.
19
21 example-collection-watch.c
22
23 #include <mongoc/mongoc.h>
24
25 int
26 main ()
27 {
28 bson_t empty = BSON_INITIALIZER;
29 const bson_t *doc;
30 bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31 const bson_t *err_doc;
32 bson_error_t error;
33 const char *uri_string;
34 mongoc_uri_t *uri;
35 mongoc_client_t *client;
36 mongoc_collection_t *coll;
37 mongoc_change_stream_t *stream;
38 mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39 bson_t opts = BSON_INITIALIZER;
40 bool r;
41
42 mongoc_init ();
43
44 uri_string = "mongodb://"
45 "localhost:27017,localhost:27018,localhost:"
46 "27019/db?replicaSet=rs0";
47
48 uri = mongoc_uri_new_with_error (uri_string, &error);
49 if (!uri) {
50 fprintf (stderr,
51 "failed to parse URI: %s\n"
52 "error message: %s\n",
53 uri_string,
54 error.message);
55 return EXIT_FAILURE;
56 }
57
58 client = mongoc_client_new_from_uri (uri);
59 if (!client) {
60 return EXIT_FAILURE;
61 }
62
63 coll = mongoc_client_get_collection (client, "db", "coll");
64 stream = mongoc_collection_watch (coll, &empty, NULL);
65
66 mongoc_write_concern_set_wmajority (wc, 10000);
67 mongoc_write_concern_append (wc, &opts);
68 r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69 if (!r) {
70 fprintf (stderr, "Error: %s\n", error.message);
71 return EXIT_FAILURE;
72 }
73
74 while (mongoc_change_stream_next (stream, &doc)) {
75 char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76 fprintf (stderr, "Got document: %s\n", as_json);
77 bson_free (as_json);
78 }
79
80 if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81 if (!bson_empty (err_doc)) {
82 fprintf (stderr,
83 "Server Error: %s\n",
84 bson_as_relaxed_extended_json (err_doc, NULL));
85 } else {
86 fprintf (stderr, "Client Error: %s\n", error.message);
87 }
88 return EXIT_FAILURE;
89 }
90
91 bson_destroy (to_insert);
92 mongoc_write_concern_destroy (wc);
93 bson_destroy (&opts);
94 mongoc_change_stream_destroy (stream);
95 mongoc_collection_destroy (coll);
96 mongoc_uri_destroy (uri);
97 mongoc_client_destroy (client);
98 mongoc_cleanup ();
99
100 return EXIT_SUCCESS;
101 }
102
103
104 Starting and Resuming
105 All watch functions accept several options to indicate where a change
106 stream should start returning changes from: resumeAfter, startAfter,
107 and startAtOperationTime.
108
109 All changes returned by mongoc_change_stream_next() include a resume
110 token in the _id field. MongoDB 4.2 also includes an additional resume
111 token in each "aggregate" and "getMore" command response, which points
112 to the end of that response's batch. The current token is automatically
113 cached by libmongoc. In the event of an error, libmongoc attempts to
114 recreate the change stream starting where it left off by passing the
115 cached resume token. libmongoc only attempts to resume once, but client
116 applications can access the cached resume token with
117 mongoc_change_stream_get_resume_token() and use it for their own resume
118 logic by passing it as either the resumeAfter or startAfter option.
119
120 Additionally, change streams can start returning changes at an opera‐
121 tion time by using the startAtOperationTime field. This can be the
122 timestamp returned in the operationTime field of a command reply.
123
124 resumeAfter, startAfter, and startAtOperationTime are mutually exclu‐
125 sive options. Setting more than one will result in a server error.
126
127 The following example implements custom resuming logic, persisting the
128 resume token in a file.
129
130 example-resume.c
131
132 #include <mongoc/mongoc.h>
133
134 /* An example implementation of custom resume logic in a change stream.
135 * example-resume starts a client-wide change stream and persists the resume
136 * token in a file "resume-token.json". On restart, if "resume-token.json"
137 * exists, the change stream starts watching after the persisted resume token.
138 *
139 * This behavior allows a user to exit example-resume, and restart it later
140 * without missing any change events.
141 */
142 #include <unistd.h>
143
144 static const char *RESUME_TOKEN_PATH = "resume-token.json";
145
146 static bool
147 _save_resume_token (const bson_t *doc)
148 {
149 FILE *file_stream;
150 bson_iter_t iter;
151 bson_t resume_token_doc;
152 char *as_json = NULL;
153 size_t as_json_len;
154 ssize_t r, n_written;
155 const bson_value_t *resume_token;
156
157 if (!bson_iter_init_find (&iter, doc, "_id")) {
158 fprintf (stderr, "reply does not contain operationTime.");
159 return false;
160 }
161 resume_token = bson_iter_value (&iter);
162 /* store the resume token in a document, { resumeAfter: <resume token> }
163 * which we can later append easily. */
164 file_stream = fopen (RESUME_TOKEN_PATH, "w+");
165 if (!file_stream) {
166 fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
167 return false;
168 }
169 bson_init (&resume_token_doc);
170 BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
171 as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
172 bson_destroy (&resume_token_doc);
173 n_written = 0;
174 while (n_written < as_json_len) {
175 r = fwrite ((void *) (as_json + n_written),
176 sizeof (char),
177 as_json_len - n_written,
178 file_stream);
179 if (r == -1) {
180 fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
181 bson_free (as_json);
182 fclose (file_stream);
183 return false;
184 }
185 n_written += r;
186 }
187
188 bson_free (as_json);
189 fclose (file_stream);
190 return true;
191 }
192
193 bool
194 _load_resume_token (bson_t *opts)
195 {
196 bson_error_t error;
197 bson_json_reader_t *reader;
198 bson_t doc;
199
200 /* if the file does not exist, skip. */
201 if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
202 return true;
203 }
204 reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
205 if (!reader) {
206 fprintf (stderr,
207 "failed to open %s for reading: %s\n",
208 RESUME_TOKEN_PATH,
209 error.message);
210 return false;
211 }
212
213 bson_init (&doc);
214 if (-1 == bson_json_reader_read (reader, &doc, &error)) {
215 fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
216 bson_destroy (&doc);
217 bson_json_reader_destroy (reader);
218 return false;
219 }
220
221 printf ("found cached resume token in %s, resuming change stream.\n",
222 RESUME_TOKEN_PATH);
223
224 bson_concat (opts, &doc);
225 bson_destroy (&doc);
226 bson_json_reader_destroy (reader);
227 return true;
228 }
229
230 int
231 main ()
232 {
233 int exit_code = EXIT_FAILURE;
234 const char *uri_string;
235 mongoc_uri_t *uri = NULL;
236 bson_error_t error;
237 mongoc_client_t *client = NULL;
238 bson_t pipeline = BSON_INITIALIZER;
239 bson_t opts = BSON_INITIALIZER;
240 mongoc_change_stream_t *stream = NULL;
241 const bson_t *doc;
242
243 const int max_time = 30; /* max amount of time, in seconds, that
244 mongoc_change_stream_next can block. */
245
246 mongoc_init ();
247 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
248 uri = mongoc_uri_new_with_error (uri_string, &error);
249 if (!uri) {
250 fprintf (stderr,
251 "failed to parse URI: %s\n"
252 "error message: %s\n",
253 uri_string,
254 error.message);
255 goto cleanup;
256 }
257
258 client = mongoc_client_new_from_uri (uri);
259 if (!client) {
260 goto cleanup;
261 }
262
263 if (!_load_resume_token (&opts)) {
264 goto cleanup;
265 }
266 BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
267
268 printf ("listening for changes on the client (max %d seconds).\n", max_time);
269 stream = mongoc_client_watch (client, &pipeline, &opts);
270
271 while (mongoc_change_stream_next (stream, &doc)) {
272 char *as_json;
273
274 as_json = bson_as_canonical_extended_json (doc, NULL);
275 printf ("change received: %s\n", as_json);
276 bson_free (as_json);
277 if (!_save_resume_token (doc)) {
278 goto cleanup;
279 }
280 }
281
282 exit_code = EXIT_SUCCESS;
283
284 cleanup:
285 mongoc_uri_destroy (uri);
286 bson_destroy (&pipeline);
287 bson_destroy (&opts);
288 mongoc_change_stream_destroy (stream);
289 mongoc_client_destroy (client);
290 mongoc_cleanup ();
291 return exit_code;
292 }
293
294
295 The following example shows using startAtOperationTime to synchronize a
296 change stream with another operation.
297
298 example-start-at-optime.c
299
300 /* An example of starting a change stream with startAtOperationTime. */
301 #include <mongoc/mongoc.h>
302
303 int
304 main ()
305 {
306 int exit_code = EXIT_FAILURE;
307 const char *uri_string;
308 mongoc_uri_t *uri = NULL;
309 bson_error_t error;
310 mongoc_client_t *client = NULL;
311 mongoc_collection_t *coll = NULL;
312 bson_t pipeline = BSON_INITIALIZER;
313 bson_t opts = BSON_INITIALIZER;
314 mongoc_change_stream_t *stream = NULL;
315 bson_iter_t iter;
316 const bson_t *doc;
317 bson_value_t cached_operation_time = {0};
318 int i;
319 bool r;
320
321 mongoc_init ();
322 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
323 uri = mongoc_uri_new_with_error (uri_string, &error);
324 if (!uri) {
325 fprintf (stderr,
326 "failed to parse URI: %s\n"
327 "error message: %s\n",
328 uri_string,
329 error.message);
330 goto cleanup;
331 }
332
333 client = mongoc_client_new_from_uri (uri);
334 if (!client) {
335 goto cleanup;
336 }
337
338 /* insert five documents. */
339 coll = mongoc_client_get_collection (client, "db", "coll");
340 for (i = 0; i < 5; i++) {
341 bson_t reply;
342 bson_t *insert_cmd = BCON_NEW ("insert",
343 "coll",
344 "documents",
345 "[",
346 "{",
347 "x",
348 BCON_INT64 (i),
349 "}",
350 "]");
351
352 r = mongoc_collection_write_command_with_opts (
353 coll, insert_cmd, NULL, &reply, &error);
354 bson_destroy (insert_cmd);
355 if (!r) {
356 bson_destroy (&reply);
357 fprintf (stderr, "failed to insert: %s\n", error.message);
358 goto cleanup;
359 }
360 if (i == 0) {
361 /* cache the operation time in the first reply. */
362 if (bson_iter_init_find (&iter, &reply, "operationTime")) {
363 bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
364 } else {
365 fprintf (stderr, "reply does not contain operationTime.");
366 bson_destroy (&reply);
367 goto cleanup;
368 }
369 }
370 bson_destroy (&reply);
371 }
372
373 /* start a change stream at the first returned operationTime. */
374 BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
375 stream = mongoc_collection_watch (coll, &pipeline, &opts);
376
377 /* since the change stream started at the operation time of the first
378 * insert, the five inserts are returned. */
379 printf ("listening for changes on db.coll:\n");
380 while (mongoc_change_stream_next (stream, &doc)) {
381 char *as_json;
382
383 as_json = bson_as_canonical_extended_json (doc, NULL);
384 printf ("change received: %s\n", as_json);
385 bson_free (as_json);
386 }
387
388 exit_code = EXIT_SUCCESS;
389
390 cleanup:
391 mongoc_uri_destroy (uri);
392 bson_destroy (&pipeline);
393 bson_destroy (&opts);
394 if (cached_operation_time.value_type) {
395 bson_value_destroy (&cached_operation_time);
396 }
397 mongoc_change_stream_destroy (stream);
398 mongoc_collection_destroy (coll);
399 mongoc_client_destroy (client);
400 mongoc_cleanup ();
401 return exit_code;
402 }
403
404
406 MongoDB, Inc
407
409 2017-present, MongoDB, Inc
410
411
412
413
4141.23.1 Oct 20, 2022 MONGOC_CHANGE_STREAM_T(3)