1MONGOC_CHANGE_STREAM_T(3) libmongoc MONGOC_CHANGE_STREAM_T(3)
2
3
4
6 #include <mongoc/mongoc.h>
7
8 typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
9
10 mongoc_change_stream_t is a handle to a change stream. A collection
11 change stream can be obtained using mongoc_collection_watch().
12
13 It is recommended to use a mongoc_change_stream_t and its functions in‐
14 stead of a raw aggregation with a $changeStream stage. For more infor‐
15 mation see the MongoDB Manual Entry on Change Streams.
16
18 example-collection-watch.c
19
20 #include <mongoc/mongoc.h>
21
22 int
23 main (void)
24 {
25 bson_t empty = BSON_INITIALIZER;
26 const bson_t *doc;
27 bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
28 const bson_t *err_doc;
29 bson_error_t error;
30 const char *uri_string;
31 mongoc_uri_t *uri;
32 mongoc_client_t *client;
33 mongoc_collection_t *coll;
34 mongoc_change_stream_t *stream;
35 mongoc_write_concern_t *wc = mongoc_write_concern_new ();
36 bson_t opts = BSON_INITIALIZER;
37 bool r;
38
39 mongoc_init ();
40
41 uri_string = "mongodb://"
42 "localhost:27017,localhost:27018,localhost:"
43 "27019/db?replicaSet=rs0";
44
45 uri = mongoc_uri_new_with_error (uri_string, &error);
46 if (!uri) {
47 fprintf (stderr,
48 "failed to parse URI: %s\n"
49 "error message: %s\n",
50 uri_string,
51 error.message);
52 return EXIT_FAILURE;
53 }
54
55 client = mongoc_client_new_from_uri (uri);
56 if (!client) {
57 return EXIT_FAILURE;
58 }
59
60 coll = mongoc_client_get_collection (client, "db", "coll");
61 stream = mongoc_collection_watch (coll, &empty, NULL);
62
63 mongoc_write_concern_set_wmajority (wc, 10000);
64 mongoc_write_concern_append (wc, &opts);
65 r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
66 if (!r) {
67 fprintf (stderr, "Error: %s\n", error.message);
68 return EXIT_FAILURE;
69 }
70
71 while (mongoc_change_stream_next (stream, &doc)) {
72 char *as_json = bson_as_relaxed_extended_json (doc, NULL);
73 fprintf (stderr, "Got document: %s\n", as_json);
74 bson_free (as_json);
75 }
76
77 if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
78 if (!bson_empty (err_doc)) {
79 fprintf (stderr,
80 "Server Error: %s\n",
81 bson_as_relaxed_extended_json (err_doc, NULL));
82 } else {
83 fprintf (stderr, "Client Error: %s\n", error.message);
84 }
85 return EXIT_FAILURE;
86 }
87
88 bson_destroy (to_insert);
89 mongoc_write_concern_destroy (wc);
90 bson_destroy (&opts);
91 mongoc_change_stream_destroy (stream);
92 mongoc_collection_destroy (coll);
93 mongoc_uri_destroy (uri);
94 mongoc_client_destroy (client);
95 mongoc_cleanup ();
96
97 return EXIT_SUCCESS;
98 }
99
100
101 Starting and Resuming
102 All watch functions accept several options to indicate where a change
103 stream should start returning changes from: resumeAfter, startAfter,
104 and startAtOperationTime.
105
106 All changes returned by mongoc_change_stream_next() include a resume
107 token in the _id field. MongoDB 4.2 also includes an additional resume
108 token in each "aggregate" and "getMore" command response, which points
109 to the end of that response's batch. The current token is automatically
110 cached by libmongoc. In the event of an error, libmongoc attempts to
111 recreate the change stream starting where it left off by passing the
112 cached resume token. libmongoc only attempts to resume once, but client
113 applications can access the cached resume token with
114 mongoc_change_stream_get_resume_token() and use it for their own resume
115 logic by passing it as either the resumeAfter or startAfter option.
116
117 Additionally, change streams can start returning changes at an opera‐
118 tion time by using the startAtOperationTime field. This can be the
119 timestamp returned in the operationTime field of a command reply.
120
121 resumeAfter, startAfter, and startAtOperationTime are mutually exclu‐
122 sive options. Setting more than one will result in a server error.
123
124 The following example implements custom resuming logic, persisting the
125 resume token in a file.
126
127 example-resume.c
128
129 #include <mongoc/mongoc.h>
130
131 /* An example implementation of custom resume logic in a change stream.
132 * example-resume starts a client-wide change stream and persists the resume
133 * token in a file "resume-token.json". On restart, if "resume-token.json"
134 * exists, the change stream starts watching after the persisted resume token.
135 *
136 * This behavior allows a user to exit example-resume, and restart it later
137 * without missing any change events.
138 */
139 #include <unistd.h>
140
141 static const char *RESUME_TOKEN_PATH = "resume-token.json";
142
143 static bool
144 _save_resume_token (const bson_t *doc)
145 {
146 FILE *file_stream;
147 bson_iter_t iter;
148 bson_t resume_token_doc;
149 char *as_json = NULL;
150 size_t as_json_len;
151 ssize_t r, n_written;
152 const bson_value_t *resume_token;
153
154 if (!bson_iter_init_find (&iter, doc, "_id")) {
155 fprintf (stderr, "reply does not contain operationTime.");
156 return false;
157 }
158 resume_token = bson_iter_value (&iter);
159 /* store the resume token in a document, { resumeAfter: <resume token> }
160 * which we can later append easily. */
161 file_stream = fopen (RESUME_TOKEN_PATH, "w+");
162 if (!file_stream) {
163 fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
164 return false;
165 }
166 bson_init (&resume_token_doc);
167 BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
168 as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
169 bson_destroy (&resume_token_doc);
170 n_written = 0;
171 while (n_written < as_json_len) {
172 r = fwrite ((void *) (as_json + n_written),
173 sizeof (char),
174 as_json_len - n_written,
175 file_stream);
176 if (r == -1) {
177 fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
178 bson_free (as_json);
179 fclose (file_stream);
180 return false;
181 }
182 n_written += r;
183 }
184
185 bson_free (as_json);
186 fclose (file_stream);
187 return true;
188 }
189
190 bool
191 _load_resume_token (bson_t *opts)
192 {
193 bson_error_t error;
194 bson_json_reader_t *reader;
195 bson_t doc;
196
197 /* if the file does not exist, skip. */
198 if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
199 return true;
200 }
201 reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
202 if (!reader) {
203 fprintf (stderr,
204 "failed to open %s for reading: %s\n",
205 RESUME_TOKEN_PATH,
206 error.message);
207 return false;
208 }
209
210 bson_init (&doc);
211 if (-1 == bson_json_reader_read (reader, &doc, &error)) {
212 fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
213 bson_destroy (&doc);
214 bson_json_reader_destroy (reader);
215 return false;
216 }
217
218 printf ("found cached resume token in %s, resuming change stream.\n",
219 RESUME_TOKEN_PATH);
220
221 bson_concat (opts, &doc);
222 bson_destroy (&doc);
223 bson_json_reader_destroy (reader);
224 return true;
225 }
226
227 int
228 main (void)
229 {
230 int exit_code = EXIT_FAILURE;
231 const char *uri_string;
232 mongoc_uri_t *uri = NULL;
233 bson_error_t error;
234 mongoc_client_t *client = NULL;
235 bson_t pipeline = BSON_INITIALIZER;
236 bson_t opts = BSON_INITIALIZER;
237 mongoc_change_stream_t *stream = NULL;
238 const bson_t *doc;
239
240 const int max_time = 30; /* max amount of time, in seconds, that
241 mongoc_change_stream_next can block. */
242
243 mongoc_init ();
244 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
245 uri = mongoc_uri_new_with_error (uri_string, &error);
246 if (!uri) {
247 fprintf (stderr,
248 "failed to parse URI: %s\n"
249 "error message: %s\n",
250 uri_string,
251 error.message);
252 goto cleanup;
253 }
254
255 client = mongoc_client_new_from_uri (uri);
256 if (!client) {
257 goto cleanup;
258 }
259
260 if (!_load_resume_token (&opts)) {
261 goto cleanup;
262 }
263 BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
264
265 printf ("listening for changes on the client (max %d seconds).\n", max_time);
266 stream = mongoc_client_watch (client, &pipeline, &opts);
267
268 while (mongoc_change_stream_next (stream, &doc)) {
269 char *as_json;
270
271 as_json = bson_as_canonical_extended_json (doc, NULL);
272 printf ("change received: %s\n", as_json);
273 bson_free (as_json);
274 if (!_save_resume_token (doc)) {
275 goto cleanup;
276 }
277 }
278
279 exit_code = EXIT_SUCCESS;
280
281 cleanup:
282 mongoc_uri_destroy (uri);
283 bson_destroy (&pipeline);
284 bson_destroy (&opts);
285 mongoc_change_stream_destroy (stream);
286 mongoc_client_destroy (client);
287 mongoc_cleanup ();
288 return exit_code;
289 }
290
291
292 The following example shows using startAtOperationTime to synchronize a
293 change stream with another operation.
294
295 example-start-at-optime.c
296
297 /* An example of starting a change stream with startAtOperationTime. */
298 #include <mongoc/mongoc.h>
299
300 int
301 main (void)
302 {
303 int exit_code = EXIT_FAILURE;
304 const char *uri_string;
305 mongoc_uri_t *uri = NULL;
306 bson_error_t error;
307 mongoc_client_t *client = NULL;
308 mongoc_collection_t *coll = NULL;
309 bson_t pipeline = BSON_INITIALIZER;
310 bson_t opts = BSON_INITIALIZER;
311 mongoc_change_stream_t *stream = NULL;
312 bson_iter_t iter;
313 const bson_t *doc;
314 bson_value_t cached_operation_time = {0};
315 int i;
316 bool r;
317
318 mongoc_init ();
319 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
320 uri = mongoc_uri_new_with_error (uri_string, &error);
321 if (!uri) {
322 fprintf (stderr,
323 "failed to parse URI: %s\n"
324 "error message: %s\n",
325 uri_string,
326 error.message);
327 goto cleanup;
328 }
329
330 client = mongoc_client_new_from_uri (uri);
331 if (!client) {
332 goto cleanup;
333 }
334
335 /* insert five documents. */
336 coll = mongoc_client_get_collection (client, "db", "coll");
337 for (i = 0; i < 5; i++) {
338 bson_t reply;
339 bson_t *insert_cmd = BCON_NEW ("insert",
340 "coll",
341 "documents",
342 "[",
343 "{",
344 "x",
345 BCON_INT64 (i),
346 "}",
347 "]");
348
349 r = mongoc_collection_write_command_with_opts (
350 coll, insert_cmd, NULL, &reply, &error);
351 bson_destroy (insert_cmd);
352 if (!r) {
353 bson_destroy (&reply);
354 fprintf (stderr, "failed to insert: %s\n", error.message);
355 goto cleanup;
356 }
357 if (i == 0) {
358 /* cache the operation time in the first reply. */
359 if (bson_iter_init_find (&iter, &reply, "operationTime")) {
360 bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
361 } else {
362 fprintf (stderr, "reply does not contain operationTime.");
363 bson_destroy (&reply);
364 goto cleanup;
365 }
366 }
367 bson_destroy (&reply);
368 }
369
370 /* start a change stream at the first returned operationTime. */
371 BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
372 stream = mongoc_collection_watch (coll, &pipeline, &opts);
373
374 /* since the change stream started at the operation time of the first
375 * insert, the five inserts are returned. */
376 printf ("listening for changes on db.coll:\n");
377 while (mongoc_change_stream_next (stream, &doc)) {
378 char *as_json;
379
380 as_json = bson_as_canonical_extended_json (doc, NULL);
381 printf ("change received: %s\n", as_json);
382 bson_free (as_json);
383 }
384
385 exit_code = EXIT_SUCCESS;
386
387 cleanup:
388 mongoc_uri_destroy (uri);
389 bson_destroy (&pipeline);
390 bson_destroy (&opts);
391 if (cached_operation_time.value_type) {
392 bson_value_destroy (&cached_operation_time);
393 }
394 mongoc_change_stream_destroy (stream);
395 mongoc_collection_destroy (coll);
396 mongoc_client_destroy (client);
397 mongoc_cleanup ();
398 return exit_code;
399 }
400
401
403 MongoDB, Inc
404
406 2017-present, MongoDB, Inc
407
408
409
410
4111.25.1 Nov 08, 2023 MONGOC_CHANGE_STREAM_T(3)