1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it'll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it'll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro's RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
86 class reader(fo: Union[IO, AvroJSONDecoder], reader_schema: Op‐
87 tional[Union[str, List, Dict]] = None, return_record_name: bool =
88 False, return_record_name_override: bool = False, handle_unicode_er‐
89 rors: str = 'strict', return_named_type: bool = False, re‐
90 turn_named_type_override: bool = False)
91 Iterator over records in an avro file.
92
93 Parameters
94
95 • fo -- File-like object to read from
96
97 • reader_schema -- Reader schema
98
99 • return_record_name -- If true, when reading a union of
100 records, the result will be a tuple where the first
101 value is the name of the record and the second value is
102 the record itself
103
104 • return_record_name_override -- If true, this will mod‐
105 ify the behavior of return_record_name so that the
106 record name is only returned for unions where there is
107 more than one record. For unions that only have one
108 record, this option will make it so that the record is
109 returned by itself, not a tuple with the name.
110
111 • return_named_type -- If true, when reading a union of
112 named types, the result will be a tuple where the first
113 value is the name of the type and the second value is
114 the record itself NOTE: Using this option will ignore
115 return_record_name and return_record_name_override
116
117 • return_named_type_override -- If true, this will modify
118 the behavior of return_named_type so that the named
119 type is only returned for unions where there is more
120 than one named type. For unions that only have one
121 named type, this option will make it so that the named
122 type is returned by itself, not a tuple with the name
123
124 • handle_unicode_errors -- Default strict. Should be set
125 to a valid string that can be used in the errors argu‐
126 ment of the string decode() function. Examples include
127 replace and ignore
128
129 Example:
130
131 from fastavro import reader
132 with open('some-file.avro', 'rb') as fo:
133 avro_reader = reader(fo)
134 for record in avro_reader:
135 process_record(record)
136
137 The fo argument is a file-like object so another common example
138 usage would use an io.BytesIO object like so:
139
140 from io import BytesIO
141 from fastavro import writer, reader
142
143 fo = BytesIO()
144 writer(fo, schema, records)
145 fo.seek(0)
146 for record in reader(fo):
147 process_record(record)
148
149 metadata
150 Key-value pairs in the header metadata
151
152 codec The codec used when writing
153
154 writer_schema
155 The schema used when writing
156
157 reader_schema
158 The schema used when reading (if provided)
159
160 class block_reader(fo: IO, reader_schema: Optional[Union[str, List,
161 Dict]] = None, return_record_name: bool = False, re‐
162 turn_record_name_override: bool = False, handle_unicode_errors: str =
163 'strict', return_named_type: bool = False, return_named_type_override:
164 bool = False)
165 Iterator over Block in an avro file.
166
167 Parameters
168
169 • fo -- Input stream
170
171 • reader_schema -- Reader schema
172
173 • return_record_name -- If true, when reading a union of
174 records, the result will be a tuple where the first
175 value is the name of the record and the second value is
176 the record itself
177
178 • return_record_name_override -- If true, this will mod‐
179 ify the behavior of return_record_name so that the
180 record name is only returned for unions where there is
181 more than one record. For unions that only have one
182 record, this option will make it so that the record is
183 returned by itself, not a tuple with the name.
184
185 • return_named_type -- If true, when reading a union of
186 named types, the result will be a tuple where the first
187 value is the name of the type and the second value is
188 the record itself NOTE: Using this option will ignore
189 return_record_name and return_record_name_override
190
191 • return_named_type_override -- If true, this will modify
192 the behavior of return_named_type so that the named
193 type is only returned for unions where there is more
194 than one named type. For unions that only have one
195 named type, this option will make it so that the named
196 type is returned by itself, not a tuple with the name
197
198 • handle_unicode_errors -- Default strict. Should be set
199 to a valid string that can be used in the errors argu‐
200 ment of the string decode() function. Examples include
201 replace and ignore
202
203 Example:
204
205 from fastavro import block_reader
206 with open('some-file.avro', 'rb') as fo:
207 avro_reader = block_reader(fo)
208 for block in avro_reader:
209 process_block(block)
210
211 metadata
212 Key-value pairs in the header metadata
213
214 codec The codec used when writing
215
216 writer_schema
217 The schema used when writing
218
219 reader_schema
220 The schema used when reading (if provided)
221
222 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
223 named_schemas, offset, size, options)
224 An avro block. Will yield records when iterated over
225
226 num_records
227 Number of records in the block
228
229 writer_schema
230 The schema used when writing
231
232 reader_schema
233 The schema used when reading (if provided)
234
235 offset Offset of the block from the beginning of the avro file
236
237 size Size of the block in bytes
238
239 schemaless_reader(fo: IO, writer_schema: Union[str, List, Dict],
240 reader_schema: Optional[Union[str, List, Dict]] = None, re‐
241 turn_record_name: bool = False, return_record_name_override: bool =
242 False, handle_unicode_errors: str = 'strict', return_named_type: bool =
243 False, return_named_type_override: bool = False) -> Union[None, str,
244 float, int, Decimal, bool, bytes, List, Dict]
245 Reads a single record written using the schemaless_writer()
246
247 Parameters
248
249 • fo -- Input stream
250
251 • writer_schema -- Schema used when calling schema‐
252 less_writer
253
254 • reader_schema -- If the schema has changed since being
255 written then the new schema can be given to allow for
256 schema migration
257
258 • return_record_name -- If true, when reading a union of
259 records, the result will be a tuple where the first
260 value is the name of the record and the second value is
261 the record itself
262
263 • return_record_name_override -- If true, this will mod‐
264 ify the behavior of return_record_name so that the
265 record name is only returned for unions where there is
266 more than one record. For unions that only have one
267 record, this option will make it so that the record is
268 returned by itself, not a tuple with the name.
269
270 • return_named_type -- If true, when reading a union of
271 named types, the result will be a tuple where the first
272 value is the name of the type and the second value is
273 the record itself NOTE: Using this option will ignore
274 return_record_name and return_record_name_override
275
276 • return_named_type_override -- If true, this will modify
277 the behavior of return_named_type so that the named
278 type is only returned for unions where there is more
279 than one named type. For unions that only have one
280 named type, this option will make it so that the named
281 type is returned by itself, not a tuple with the name
282
283 • handle_unicode_errors -- Default strict. Should be set
284 to a valid string that can be used in the errors argu‐
285 ment of the string decode() function. Examples include
286 replace and ignore
287
288 Example:
289
290 parsed_schema = fastavro.parse_schema(schema)
291 with open('file', 'rb') as fp:
292 record = fastavro.schemaless_reader(fp, parsed_schema)
293
294 Note: The schemaless_reader can only read a single record.
295
296 is_avro(path_or_buffer: Union[str, IO]) -> bool
297 Return True if path (or buffer) points to an Avro file. This
298 will only work for avro files that contain the normal avro
299 schema header like those create from writer(). This function is
300 not intended to be used with binary data created from
301 schemaless_writer() since that does not include the avro header.
302
303 Parameters
304 path_or_buffer -- Path to file
305
307 writer(fo: Union[IO, AvroJSONEncoder], schema: Union[str, List, Dict],
308 records: Iterable[Any], codec: str = 'null', sync_interval: int =
309 16000, metadata: Optional[Dict[str, str]] = None, validator: bool =
310 False, sync_marker: bytes = b'', codec_compression_level: Optional[int]
311 = None, *, strict: bool = False, strict_allow_default: bool = False,
312 disable_tuple_notation: bool = False)
313 Write records to fo (stream) according to schema
314
315 Parameters
316
317 • fo -- Output stream
318
319 • schema -- Writer schema
320
321 • records -- Records to write. This is commonly a list of
322 the dictionary representation of the records, but it
323 can be any iterable
324
325 • codec -- Compression codec, can be 'null', 'deflate' or
326 'snappy' (if installed)
327
328 • sync_interval -- Size of sync interval
329
330 • metadata -- Header metadata
331
332 • validator -- If true, validation will be done on the
333 records
334
335 • sync_marker -- A byte string used as the avro sync
336 marker. If not provided, a random byte string will be
337 used.
338
339 • codec_compression_level -- Compression level to use
340 with the specified codec (if the codec supports it)
341
342 • strict -- If set to True, an error will be raised if
343 records do not contain exactly the same fields that the
344 schema states
345
346 • strict_allow_default -- If set to True, an error will
347 be raised if records do not contain exactly the same
348 fields that the schema states unless it is a missing
349 field that has a default value in the schema
350
351 • disable_tuple_notation -- If set to True, tuples will
352 not be treated as a special case. Therefore, using a
353 tuple to indicate the type of a record will not work
354
355 Example:
356
357 from fastavro import writer, parse_schema
358
359 schema = {
360 'doc': 'A weather reading.',
361 'name': 'Weather',
362 'namespace': 'test',
363 'type': 'record',
364 'fields': [
365 {'name': 'station', 'type': 'string'},
366 {'name': 'time', 'type': 'long'},
367 {'name': 'temp', 'type': 'int'},
368 ],
369 }
370 parsed_schema = parse_schema(schema)
371
372 records = [
373 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
374 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
375 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
376 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
377 ]
378
379 with open('weather.avro', 'wb') as out:
380 writer(out, parsed_schema, records)
381
382 The fo argument is a file-like object so another common example
383 usage would use an io.BytesIO object like so:
384
385 from io import BytesIO
386 from fastavro import writer
387
388 fo = BytesIO()
389 writer(fo, schema, records)
390
391 Given an existing avro file, it's possible to append to it by
392 re-opening the file in a+b mode. If the file is only opened in
393 ab mode, we aren't able to read some of the existing header in‐
394 formation and an error will be raised. For example:
395
396 # Write initial records
397 with open('weather.avro', 'wb') as out:
398 writer(out, parsed_schema, records)
399
400 # Write some more records
401 with open('weather.avro', 'a+b') as out:
402 writer(out, None, more_records)
403
404 Note: When appending, any schema provided will be ignored since
405 the schema in the avro file will be re-used. Therefore it is
406 convenient to just use None as the schema.
407
408 schemaless_writer(fo: IO, schema: Union[str, List, Dict], record: Any,
409 *, strict: bool = False, strict_allow_default: bool = False, dis‐
410 able_tuple_notation: bool = False)
411 Write a single record without the schema or header information
412
413 Parameters
414
415 • fo -- Output file
416
417 • schema -- Schema
418
419 • record -- Record to write
420
421 • strict -- If set to True, an error will be raised if
422 records do not contain exactly the same fields that the
423 schema states
424
425 • strict_allow_default -- If set to True, an error will
426 be raised if records do not contain exactly the same
427 fields that the schema states unless it is a missing
428 field that has a default value in the schema
429
430 • disable_tuple_notation -- If set to True, tuples will
431 not be treated as a special case. Therefore, using a
432 tuple to indicate the type of a record will not work
433
434 Example:
435
436 parsed_schema = fastavro.parse_schema(schema)
437 with open('file', 'wb') as fp:
438 fastavro.schemaless_writer(fp, parsed_schema, record)
439
440 Note: The schemaless_writer can only write a single record.
441
442 Using the tuple notation to specify which branch of a union to take
443 Since this library uses plain dictionaries to represent a record, it is
444 possible for that dictionary to fit the definition of two different
445 records.
446
447 For example, given a dictionary like this:
448
449 {"name": "My Name"}
450
451 It would be valid against both of these records:
452
453 child_schema = {
454 "name": "Child",
455 "type": "record",
456 "fields": [
457 {"name": "name", "type": "string"},
458 {"name": "favorite_color", "type": ["null", "string"]},
459 ]
460 }
461
462 pet_schema = {
463 "name": "Pet",
464 "type": "record",
465 "fields": [
466 {"name": "name", "type": "string"},
467 {"name": "favorite_toy", "type": ["null", "string"]},
468 ]
469 }
470
471 This becomes a problem when a schema contains a union of these two sim‐
472 ilar records as it is not clear which record the dictionary represents.
473 For example, if you used the previous dictionary with the following
474 schema, it wouldn't be clear if the record should be serialized as a
475 Child or a Pet:
476
477 household_schema = {
478 "name": "Household",
479 "type": "record",
480 "fields": [
481 {"name": "address", "type": "string"},
482 {
483 "name": "family_members",
484 "type": {
485 "type": "array", "items": [
486 {
487 "name": "Child",
488 "type": "record",
489 "fields": [
490 {"name": "name", "type": "string"},
491 {"name": "favorite_color", "type": ["null", "string"]},
492 ]
493 }, {
494 "name": "Pet",
495 "type": "record",
496 "fields": [
497 {"name": "name", "type": "string"},
498 {"name": "favorite_toy", "type": ["null", "string"]},
499 ]
500 }
501 ]
502 }
503 },
504 ]
505 }
506
507 To resolve this, you can use a tuple notation where the first value of
508 the tuple is the fully namespaced record name and the second value is
509 the dictionary. For example:
510
511 records = [
512 {
513 "address": "123 Drive Street",
514 "family_members": [
515 ("Child", {"name": "Son"}),
516 ("Child", {"name": "Daughter"}),
517 ("Pet", {"name": "Dog"}),
518 ]
519 }
520 ]
521
522 Using the record hint to specify which branch of a union to take
523 In addition to the tuple notation for specifying the name of a record,
524 you can also include a special -type attribute (note that this attri‐
525 bute is -type, not type) on a record to do the same thing. So the exam‐
526 ple above which looked like this:
527
528 records = [
529 {
530 "address": "123 Drive Street",
531 "family_members": [
532 ("Child", {"name": "Son"}),
533 ("Child", {"name": "Daughter"}),
534 ("Pet", {"name": "Dog"}),
535 ]
536 }
537 ]
538
539 Would now look like this:
540
541 records = [
542 {
543 "address": "123 Drive Street",
544 "family_members": [
545 {"-type": "Child", "name": "Son"},
546 {"-type": "Child", "name": "Daughter"},
547 {"-type": "Pet", "name": "Dog"},
548 ]
549 }
550 ]
551
552 Unlike the tuple notation which can be used with any avro type in a
553 union, this -type hint can only be used with records. However, this can
554 be useful if you want to make a single record dictionary that can be
555 used both in and out of unions.
556
558 json_reader(fo: ~typing.IO, schema: ~typing.Union[str, ~typing.List,
559 ~typing.Dict], reader_schema: ~typing.Optional[~typing.Union[str, ~typ‐
560 ing.List, ~typing.Dict]] = None, *, decoder=<class 'fas‐
561 tavro.io.json_decoder.AvroJSONDecoder'>) -> reader
562 Iterator over records in an avro json file.
563
564 Parameters
565
566 • fo -- File-like object to read from
567
568 • schema -- Original schema used when writing the JSON
569 data
570
571 • reader_schema -- If the schema has changed since being
572 written then the new schema can be given to allow for
573 schema migration
574
575 • decoder -- By default the standard AvroJSONDecoder will
576 be used, but a custom one could be passed here
577
578 Example:
579
580 from fastavro import json_reader
581
582 schema = {
583 'doc': 'A weather reading.',
584 'name': 'Weather',
585 'namespace': 'test',
586 'type': 'record',
587 'fields': [
588 {'name': 'station', 'type': 'string'},
589 {'name': 'time', 'type': 'long'},
590 {'name': 'temp', 'type': 'int'},
591 ]
592 }
593
594 with open('some-file', 'r') as fo:
595 avro_reader = json_reader(fo, schema)
596 for record in avro_reader:
597 print(record)
598
600 json_writer(fo: ~typing.IO, schema: ~typing.Union[str, ~typing.List,
601 ~typing.Dict], records: ~typing.Iterable[~typing.Any], *,
602 write_union_type: bool = True, validator: bool = False, encoder=<class
603 'fastavro.io.json_encoder.AvroJSONEncoder'>, strict: bool = False,
604 strict_allow_default: bool = False, disable_tuple_notation: bool =
605 False) -> None
606 Write records to fo (stream) according to schema
607
608 Parameters
609
610 • fo -- File-like object to write to
611
612 • schema -- Writer schema
613
614 • records -- Records to write. This is commonly a list of
615 the dictionary representation of the records, but it
616 can be any iterable
617
618 • write_union_type -- Determine whether to write the
619 union type in the json message. If this is set to
620 False the output will be clear json. It may however
621 not be decodable back to avro record by json_read.
622
623 • validator -- If true, validation will be done on the
624 records
625
626 • encoder -- By default the standard AvroJSONEncoder will
627 be used, but a custom one could be passed here
628
629 • strict -- If set to True, an error will be raised if
630 records do not contain exactly the same fields that the
631 schema states
632
633 • strict_allow_default -- If set to True, an error will
634 be raised if records do not contain exactly the same
635 fields that the schema states unless it is a missing
636 field that has a default value in the schema
637
638 • disable_tuple_notation -- If set to True, tuples will
639 not be treated as a special case. Therefore, using a
640 tuple to indicate the type of a record will not work
641
642 Example:
643
644 from fastavro import json_writer, parse_schema
645
646 schema = {
647 'doc': 'A weather reading.',
648 'name': 'Weather',
649 'namespace': 'test',
650 'type': 'record',
651 'fields': [
652 {'name': 'station', 'type': 'string'},
653 {'name': 'time', 'type': 'long'},
654 {'name': 'temp', 'type': 'int'},
655 ],
656 }
657 parsed_schema = parse_schema(schema)
658
659 records = [
660 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
661 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
662 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
663 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
664 ]
665
666 with open('some-file', 'w') as out:
667 json_writer(out, parsed_schema, records)
668
670 parse_schema(schema: Union[str, List, Dict], named_schemas: Op‐
671 tional[Dict[str, Dict]] = None, *, expand: bool = False, _write_hint:
672 bool = True, _force: bool = False, _ignore_default_error: bool = False)
673 -> Union[str, List, Dict]
674 Returns a parsed avro schema
675
676 It is not necessary to call parse_schema but doing so and saving
677 the parsed schema for use later will make future operations
678 faster as the schema will not need to be reparsed.
679
680 Parameters
681
682 • schema -- Input schema
683
684 • named_schemas -- Dictionary of named schemas to their
685 schema definition
686
687 • expand -- If true, named schemas will be fully expanded
688 to their true schemas rather than being represented as
689 just the name. This format should be considered an out‐
690 put only and not passed in to other reader/writer func‐
691 tions as it does not conform to the avro specification
692 and will likely cause an exception
693
694 • _write_hint -- Internal API argument specifying whether
695 or not the __fastavro_parsed marker should be added to
696 the schema
697
698 • _force -- Internal API argument. If True, the schema
699 will always be parsed even if it has been parsed and
700 has the __fastavro_parsed marker
701
702 • _ignore_default_error -- Internal API argument. If
703 True, when a union has the wrong default value, an er‐
704 ror will not be raised.
705
706 Example:
707
708 from fastavro import parse_schema
709 from fastavro import writer
710
711 parsed_schema = parse_schema(original_schema)
712 with open('weather.avro', 'wb') as out:
713 writer(out, parsed_schema, records)
714
715 Sometimes you might have two schemas where one schema references
716 another. For the sake of example, let's assume you have a Par‐
717 ent schema that references a Child schema`. If you were to try
718 to parse the parent schema on its own, you would get an excep‐
719 tion because the child schema isn't defined. To accommodate
720 this, we can use the named_schemas argument to pass a shared
721 dictionary when parsing both of the schemas. The dictionary will
722 get populated with the necessary schema references to make pars‐
723 ing possible. For example:
724
725 from fastavro import parse_schema
726
727 named_schemas = {}
728 parsed_child = parse_schema(child_schema, named_schemas)
729 parsed_parent = parse_schema(parent_schema, named_schemas)
730
731 fullname(schema: Dict) -> str
732 Returns the fullname of a schema
733
734 Parameters
735 schema -- Input schema
736
737 Example:
738
739 from fastavro.schema import fullname
740
741 schema = {
742 'doc': 'A weather reading.',
743 'name': 'Weather',
744 'namespace': 'test',
745 'type': 'record',
746 'fields': [
747 {'name': 'station', 'type': 'string'},
748 {'name': 'time', 'type': 'long'},
749 {'name': 'temp', 'type': 'int'},
750 ],
751 }
752
753 fname = fullname(schema)
754 assert fname == "test.Weather"
755
756 expand_schema(schema: Union[str, List, Dict]) -> Union[str, List, Dict]
757 Returns a schema where all named types are expanded to their
758 real schema
759
760 NOTE: The output of this function produces a schema that can in‐
761 clude multiple definitions of the same named type (as per de‐
762 sign) which are not valid per the avro specification. Therefore,
763 the output of this should not be passed to the normal
764 writer/reader functions as it will likely result in an error.
765
766 Parameters
767 schema (dict) -- Input schema
768
769 Example:
770
771 from fastavro.schema import expand_schema
772
773 original_schema = {
774 "name": "MasterSchema",
775 "namespace": "com.namespace.master",
776 "type": "record",
777 "fields": [{
778 "name": "field_1",
779 "type": {
780 "name": "Dependency",
781 "namespace": "com.namespace.dependencies",
782 "type": "record",
783 "fields": [
784 {"name": "sub_field_1", "type": "string"}
785 ]
786 }
787 }, {
788 "name": "field_2",
789 "type": "com.namespace.dependencies.Dependency"
790 }]
791 }
792
793 expanded_schema = expand_schema(original_schema)
794
795 assert expanded_schema == {
796 "name": "com.namespace.master.MasterSchema",
797 "type": "record",
798 "fields": [{
799 "name": "field_1",
800 "type": {
801 "name": "com.namespace.dependencies.Dependency",
802 "type": "record",
803 "fields": [
804 {"name": "sub_field_1", "type": "string"}
805 ]
806 }
807 }, {
808 "name": "field_2",
809 "type": {
810 "name": "com.namespace.dependencies.Dependency",
811 "type": "record",
812 "fields": [
813 {"name": "sub_field_1", "type": "string"}
814 ]
815 }
816 }]
817 }
818
819 load_schema(schema_path: str, *, repo: Optional[‐
820 AbstractSchemaRepository] = None, named_schemas: Optional[Dict[str,
821 Dict]] = None, _write_hint: bool = True, _injected_schemas: Op‐
822 tional[Set[str]] = None) -> Union[str, List, Dict]
823 Returns a schema loaded from repository.
824
825 Will recursively load referenced schemas attempting to load them
826 from same repository, using schema_path as schema name.
827
828 If repo is not provided, FlatDictRepository is used. FlatDic‐
829 tRepository will try to load schemas from the same directory as‐
830 suming files are named with the convention <full_name>.avsc.
831
832 Parameters
833
834 • schema_path -- Full schema name, or path to schema file
835 if default repo is used.
836
837 • repo -- Schema repository instance.
838
839 • named_schemas -- Dictionary of named schemas to their
840 schema definition
841
842 • _write_hint -- Internal API argument specifying whether
843 or not the __fastavro_parsed marker should be added to
844 the schema
845
846 • _injected_schemas -- Internal API argument. Set of
847 names that have been injected
848
849 Consider the following example with default FlatDictReposi‐
850 tory...
851
852 namespace.Parent.avsc:
853
854 {
855 "type": "record",
856 "name": "Parent",
857 "namespace": "namespace",
858 "fields": [
859 {
860 "name": "child",
861 "type": "Child"
862 }
863 ]
864 }
865
866 namespace.Child.avsc:
867
868 {
869 "type": "record",
870 "namespace": "namespace",
871 "name": "Child",
872 "fields": []
873 }
874
875 Code:
876
877 from fastavro.schema import load_schema
878
879 parsed_schema = load_schema("namespace.Parent.avsc")
880
881 load_schema_ordered(ordered_schemas: List[str], *, _write_hint: bool =
882 True) -> Union[str, List, Dict]
883 Returns a schema loaded from a list of schemas.
884
885 The list of schemas should be ordered such that any dependencies
886 are listed before any other schemas that use those dependencies.
887 For example, if schema A depends on schema B and schema B de‐
888 pends on schema C, then the list of schemas should be [C, B, A].
889
890 Parameters
891
892 • ordered_schemas -- List of paths to schemas
893
894 • _write_hint -- Internal API argument specifying whether
895 or not the __fastavro_parsed marker should be added to
896 the schema
897
898 Consider the following example...
899
900 Parent.avsc:
901
902 {
903 "type": "record",
904 "name": "Parent",
905 "namespace": "namespace",
906 "fields": [
907 {
908 "name": "child",
909 "type": "Child"
910 }
911 ]
912 }
913
914 namespace.Child.avsc:
915
916 {
917 "type": "record",
918 "namespace": "namespace",
919 "name": "Child",
920 "fields": []
921 }
922
923 Code:
924
925 from fastavro.schema import load_schema_ordered
926
927 parsed_schema = load_schema_ordered(
928 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
929 )
930
931 to_parsing_canonical_form(schema: Union[str, List, Dict]) -> str
932 Returns a string represening the parsing canonical form of the
933 schema.
934
935 For more details on the parsing canonical form, see here:
936 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
937
938 Parameters
939 schema -- Schema to transform
940
941 fingerprint(parsing_canonical_form: str, algorithm: str) -> str
942 Returns a string represening a fingerprint/hash of the parsing
943 canonical form of a schema.
944
945 For more details on the fingerprint, see here:
946 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
947
948 Parameters
949
950 • parsing_canonical_form -- The parsing canonical form of
951 a schema
952
953 • algorithm -- The hashing algorithm
954
956 validate(datum: Any, schema: Union[str, List, Dict], field: str = '',
957 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
958 tion: bool = False) -> bool
959 Determine if a python datum is an instance of a schema.
960
961 Parameters
962
963 • datum -- Data being validated
964
965 • schema -- Schema
966
967 • field -- Record field being validated
968
969 • raise_errors -- If true, errors are raised for invalid
970 data. If false, a simple True (valid) or False (in‐
971 valid) result is returned
972
973 • strict -- If true, fields without values will raise er‐
974 rors rather than implicitly defaulting to None
975
976 • disable_tuple_notation -- If set to True, tuples will
977 not be treated as a special case. Therefore, using a
978 tuple to indicate the type of a record will not work
979
980 Example:
981
982 from fastavro.validation import validate
983 schema = {...}
984 record = {...}
985 validate(record, schema)
986
987 validate_many(records: Iterable[Any], schema: Union[str, List, Dict],
988 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
989 tion: bool = False) -> bool
990 Validate a list of data!
991
992 Parameters
993
994 • records -- List of records to validate
995
996 • schema -- Schema
997
998 • raise_errors -- If true, errors are raised for invalid
999 data. If false, a simple True (valid) or False (in‐
1000 valid) result is returned
1001
1002 • strict -- If true, fields without values will raise er‐
1003 rors rather than implicitly defaulting to None
1004
1005 • disable_tuple_notation -- If set to True, tuples will
1006 not be treated as a special case. Therefore, using a
1007 tuple to indicate the type of a record will not work
1008
1009 Example:
1010
1011 from fastavro.validation import validate_many
1012 schema = {...}
1013 records = [{...}, {...}, ...]
1014 validate_many(records, schema)
1015
1017 generate_one(schema: Union[str, List, Dict]) -> Any
1018 Returns a single instance of arbitrary data that conforms to the
1019 schema.
1020
1021 Parameters
1022 schema -- Schema that data should conform to
1023
1024 Example:
1025
1026 from fastavro import schemaless_writer
1027 from fastavro.utils import generate_one
1028
1029 schema = {
1030 'doc': 'A weather reading.',
1031 'name': 'Weather',
1032 'namespace': 'test',
1033 'type': 'record',
1034 'fields': [
1035 {'name': 'station', 'type': 'string'},
1036 {'name': 'time', 'type': 'long'},
1037 {'name': 'temp', 'type': 'int'},
1038 ],
1039 }
1040
1041 with open('weather.avro', 'wb') as out:
1042 schemaless_writer(out, schema, generate_one(schema))
1043
1044 generate_many(schema: Union[str, List, Dict], count: int) -> Itera‐
1045 tor[Any]
1046 A generator that yields arbitrary data that conforms to the
1047 schema. It will yield a number of data structures equal to what
1048 is given in the count
1049
1050 Parameters
1051
1052 • schema -- Schema that data should conform to
1053
1054 • count -- Number of objects to generate
1055
1056 Example:
1057
1058 from fastavro import writer
1059 from fastavro.utils import generate_many
1060
1061 schema = {
1062 'doc': 'A weather reading.',
1063 'name': 'Weather',
1064 'namespace': 'test',
1065 'type': 'record',
1066 'fields': [
1067 {'name': 'station', 'type': 'string'},
1068 {'name': 'time', 'type': 'long'},
1069 {'name': 'temp', 'type': 'int'},
1070 ],
1071 }
1072
1073 with open('weather.avro', 'wb') as out:
1074 writer(out, schema, generate_many(schema, 5))
1075
1076 anonymize_schema(schema: Union[str, List, Dict]) -> Union[str, List,
1077 Dict]
1078 Returns an anonymized schema
1079
1080 Parameters
1081 schema -- Schema to anonymize
1082
1083 Example:
1084
1085 from fastavro.utils import anonymize_schema
1086
1087 anonymized_schema = anonymize_schema(original_schema)
1088
1090 Fastavro supports the following official logical types:
1091
1092 • decimal
1093
1094 • uuid
1095
1096 • date
1097
1098 • time-millis
1099
1100 • time-micros
1101
1102 • timestamp-millis
1103
1104 • timestamp-micros
1105
1106 • local-timestamp-millis
1107
1108 • local-timestamp-micros
1109
1110 Fastavro is missing support for the following official logical types:
1111
1112 • duration
1113
1114 How to specify logical types in your schemas
1115 The docs say that when you want to make something a logical type, you
1116 just need to add a logicalType key. Unfortunately, this means that a
1117 common confusion is that people will take a pre-existing schema like
1118 this:
1119
1120 schema = {
1121 "type": "record",
1122 "name": "root",
1123 "fields": [
1124 {
1125 "name": "id",
1126 "type": "string",
1127 },
1128 ]
1129 }
1130
1131 And then add the uuid logical type like this:
1132
1133 schema = {
1134 "type": "record",
1135 "name": "root",
1136 "fields": [
1137 {
1138 "name": "id",
1139 "type": "string",
1140 "logicalType": "uuid", # This is the wrong place to add this key
1141 },
1142 ]
1143 }
1144
1145 However, that adds the logicalType key to the field schema which is not
1146 correct. Instead, we want to group it with the string like so:
1147
1148 schema = {
1149 "type": "record",
1150 "name": "root",
1151 "fields": [
1152 {
1153 "name": "id",
1154 "type": {
1155 "type": "string",
1156 "logicalType": "uuid", # This is the correct place to add this key
1157 },
1158 },
1159 ]
1160 }
1161
1162 Custom Logical Types
1163 The Avro specification defines a handful of logical types that most im‐
1164 plementations support. For example, one of the defined logical types is
1165 a microsecond precision timestamp. The specification states that this
1166 value will get encoded as an avro long type.
1167
1168 For the sake of an example, let's say you want to create a new logical
1169 type for a microsecond precision timestamp that uses a string as the
1170 underlying avro type.
1171
1172 To do this, there are a few functions that need to be defined. First,
1173 we need an encoder function that will encode a datetime object as a
1174 string. The encoder function is called with two arguments: the data and
1175 the schema. So we could define this like so:
1176
1177 def encode_datetime_as_string(data, schema):
1178 return datetime.isoformat(data)
1179
1180 # or
1181
1182 def encode_datetime_as_string(data, *args):
1183 return datetime.isoformat(data)
1184
1185 Then, we need a decoder function that will transform the string back
1186 into a datetime object. The decoder function is called with three argu‐
1187 ments: the data, the writer schema, and the reader schema. So we could
1188 define this like so:
1189
1190 def decode_string_as_datetime(data, writer_schema, reader_schema):
1191 return datetime.fromisoformat(data)
1192
1193 # or
1194
1195 def decode_string_as_datetime(data, *args):
1196 return datetime.fromisoformat(data)
1197
1198 Finally, we need to tell fastavro to use these functions. The schema
1199 for this custom logical type will use the type string and can use what‐
1200 ever name you would like as the logicalType. In this example, let's
1201 suppose we call the logicalType datetime2. To have the library actually
1202 use the custom logical type, we use the name of <avro_type>-<logi‐
1203 cal_type>, so in this example that name would be string-datetime2 and
1204 then we add those functions like so:
1205
1206 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1207 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1208
1209 And you are done. Now if the library comes across a schema with a logi‐
1210 cal type of datetime2 and an avro type of string, it will use the cus‐
1211 tom functions. For a complete example, see here:
1212
1213 import io
1214 from datetime import datetime
1215
1216 import fastavro
1217 from fastavro import writer, reader
1218
1219
1220 def encode_datetime_as_string(data, *args):
1221 return datetime.isoformat(data)
1222
1223 def decode_string_as_datetime(data, *args):
1224 return datetime.fromisoformat(data)
1225
1226 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1227 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1228
1229
1230 writer_schema = fastavro.parse_schema({
1231 "type": "record",
1232 "name": "root",
1233 "fields": [
1234 {
1235 "name": "some_date",
1236 "type": [
1237 "null",
1238 {
1239 "type": "string",
1240 "logicalType": "datetime2",
1241 },
1242 ],
1243 },
1244 ]
1245 })
1246
1247 records = [
1248 {"some_date": datetime.now()}
1249 ]
1250
1251 bio = io.BytesIO()
1252
1253 writer(bio, writer_schema, records)
1254
1255 bio.seek(0)
1256
1257 for record in reader(bio):
1258 print(record)
1259
1261 A command line script is installed with the library that can be used to
1262 dump the contents of avro file(s) to the standard output.
1263
1264 Usage:
1265
1266 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1267
1268 iter over avro file, emit records as JSON
1269
1270 positional arguments:
1271 file file(s) to parse
1272
1273 optional arguments:
1274 -h, --help show this help message and exit
1275 --schema dump schema instead of records
1276 --codecs print supported codecs
1277 --version show program's version number and exit
1278 -p, --pretty pretty print json
1279
1280 Examples
1281 Read an avro file:
1282
1283 $ fastavro weather.avro
1284
1285 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1286 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1287 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1288 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1289 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1290
1291 Show the schema:
1292
1293 $ fastavro --schema weather.avro
1294
1295 {
1296 "type": "record",
1297 "namespace": "test",
1298 "doc": "A weather reading.",
1299 "fields": [
1300 {
1301 "type": "string",
1302 "name": "station"
1303 },
1304 {
1305 "type": "long",
1306 "name": "time"
1307 },
1308 {
1309 "type": "int",
1310 "name": "temp"
1311 }
1312 ],
1313 "name": "Weather"
1314 }
1315
1317 fastavro.io.json_decoder
1318 class AvroJSONDecoder(fo: IO)
1319 Decoder for the avro JSON format.
1320
1321 NOTE: All attributes and methods on this class should be consid‐
1322 ered private.
1323
1324 Parameters
1325 fo -- File-like object to reader from
1326
1327 fastavro.io.json_encoder
1328 class AvroJSONEncoder(fo: IO, *, write_union_type: bool = True)
1329 Encoder for the avro JSON format.
1330
1331 NOTE: All attributes and methods on this class should be consid‐
1332 ered private.
1333
1334 Parameters
1335
1336 • fo -- Input stream
1337
1338 • write_union_type -- Determine whether to write the
1339 union type in the json message.
1340
1342 fastavro.repository.base
1343 class AbstractSchemaRepository
1344
1345 • Index
1346
1347 • Module Index
1348
1349 • Search Page
1350
1352 Miki Tebeka
1353
1355 2023, Miki Tebeka
1356
1357
1358
1359
13601.8.3 Sep 10, 2023 FASTAVRO(1)