1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it'll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it'll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro's RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
86 class reader(fo: Union[IO, AvroJSONDecoder], reader_schema: Op‐
87 tional[Union[str, List, Dict]] = None, return_record_name: bool =
88 False, return_record_name_override: bool = False)
89 Iterator over records in an avro file.
90
91 Parameters
92
93 • fo -- File-like object to read from
94
95 • reader_schema -- Reader schema
96
97 • return_record_name -- If true, when reading a union of
98 records, the result will be a tuple where the first
99 value is the name of the record and the second value is
100 the record itself
101
102 • return_record_name_override -- If true, this will mod‐
103 ify the behavior of return_record_name so that the
104 record name is only returned for unions where there is
105 more than one record. For unions that only have one
106 record, this option will make it so that the record is
107 returned by itself, not a tuple with the name.
108
109 Example:
110
111 from fastavro import reader
112 with open('some-file.avro', 'rb') as fo:
113 avro_reader = reader(fo)
114 for record in avro_reader:
115 process_record(record)
116
117 The fo argument is a file-like object so another common example
118 usage would use an io.BytesIO object like so:
119
120 from io import BytesIO
121 from fastavro import writer, reader
122
123 fo = BytesIO()
124 writer(fo, schema, records)
125 fo.seek(0)
126 for record in reader(fo):
127 process_record(record)
128
129 metadata
130 Key-value pairs in the header metadata
131
132 codec The codec used when writing
133
134 writer_schema
135 The schema used when writing
136
137 reader_schema
138 The schema used when reading (if provided)
139
140 class block_reader(fo: IO, reader_schema: Optional[Union[str, List,
141 Dict]] = None, return_record_name: bool = False, re‐
142 turn_record_name_override: bool = False)
143 Iterator over Block in an avro file.
144
145 Parameters
146
147 • fo -- Input stream
148
149 • reader_schema -- Reader schema
150
151 • return_record_name -- If true, when reading a union of
152 records, the result will be a tuple where the first
153 value is the name of the record and the second value is
154 the record itself
155
156 • return_record_name_override -- If true, this will mod‐
157 ify the behavior of return_record_name so that the
158 record name is only returned for unions where there is
159 more than one record. For unions that only have one
160 record, this option will make it so that the record is
161 returned by itself, not a tuple with the name.
162
163 Example:
164
165 from fastavro import block_reader
166 with open('some-file.avro', 'rb') as fo:
167 avro_reader = block_reader(fo)
168 for block in avro_reader:
169 process_block(block)
170
171 metadata
172 Key-value pairs in the header metadata
173
174 codec The codec used when writing
175
176 writer_schema
177 The schema used when writing
178
179 reader_schema
180 The schema used when reading (if provided)
181
182 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
183 named_schemas, offset, size, return_record_name=False, re‐
184 turn_record_name_override=False)
185 An avro block. Will yield records when iterated over
186
187 num_records
188 Number of records in the block
189
190 writer_schema
191 The schema used when writing
192
193 reader_schema
194 The schema used when reading (if provided)
195
196 offset Offset of the block from the beginning of the avro file
197
198 size Size of the block in bytes
199
200 schemaless_reader(fo: IO, writer_schema: Union[str, List, Dict],
201 reader_schema: Optional[Union[str, List, Dict]] = None, re‐
202 turn_record_name: bool = False, return_record_name_override: bool =
203 False) -> Union[None, str, float, int, Decimal, bool, bytes, List,
204 Dict]
205 Reads a single record written using the schemaless_writer()
206
207 Parameters
208
209 • fo -- Input stream
210
211 • writer_schema -- Schema used when calling schema‐
212 less_writer
213
214 • reader_schema -- If the schema has changed since being
215 written then the new schema can be given to allow for
216 schema migration
217
218 • return_record_name -- If true, when reading a union of
219 records, the result will be a tuple where the first
220 value is the name of the record and the second value is
221 the record itself
222
223 • return_record_name_override -- If true, this will mod‐
224 ify the behavior of return_record_name so that the
225 record name is only returned for unions where there is
226 more than one record. For unions that only have one
227 record, this option will make it so that the record is
228 returned by itself, not a tuple with the name.
229
230 Example:
231
232 parsed_schema = fastavro.parse_schema(schema)
233 with open('file', 'rb') as fp:
234 record = fastavro.schemaless_reader(fp, parsed_schema)
235
236 Note: The schemaless_reader can only read a single record.
237
238 is_avro(path_or_buffer: Union[str, IO]) -> bool
239 Return True if path (or buffer) points to an Avro file. This
240 will only work for avro files that contain the normal avro
241 schema header like those create from writer(). This function is
242 not intended to be used with binary data created from
243 schemaless_writer() since that does not include the avro header.
244
245 Parameters
246 path_or_buffer -- Path to file
247
249 writer(fo: Union[IO, AvroJSONEncoder], schema: Union[str, List, Dict],
250 records: Iterable[Any], codec: str = 'null', sync_interval: int =
251 16000, metadata: Optional[Dict[str, str]] = None, validator: bool =
252 False, sync_marker: bytes = b'', codec_compression_level: Optional[int]
253 = None, *, strict: bool = False, strict_allow_default: bool = False,
254 disable_tuple_notation: bool = False)
255 Write records to fo (stream) according to schema
256
257 Parameters
258
259 • fo -- Output stream
260
261 • schema -- Writer schema
262
263 • records -- Records to write. This is commonly a list of
264 the dictionary representation of the records, but it
265 can be any iterable
266
267 • codec -- Compression codec, can be 'null', 'deflate' or
268 'snappy' (if installed)
269
270 • sync_interval -- Size of sync interval
271
272 • metadata -- Header metadata
273
274 • validator -- If true, validation will be done on the
275 records
276
277 • sync_marker -- A byte string used as the avro sync
278 marker. If not provided, a random byte string will be
279 used.
280
281 • codec_compression_level -- Compression level to use
282 with the specified codec (if the codec supports it)
283
284 • strict -- If set to True, an error will be raised if
285 records do not contain exactly the same fields that the
286 schema states
287
288 • strict_allow_default -- If set to True, an error will
289 be raised if records do not contain exactly the same
290 fields that the schema states unless it is a missing
291 field that has a default value in the schema
292
293 • disable_tuple_notation -- If set to True, tuples will
294 not be treated as a special case. Therefore, using a
295 tuple to indicate the type of a record will not work
296
297 Example:
298
299 from fastavro import writer, parse_schema
300
301 schema = {
302 'doc': 'A weather reading.',
303 'name': 'Weather',
304 'namespace': 'test',
305 'type': 'record',
306 'fields': [
307 {'name': 'station', 'type': 'string'},
308 {'name': 'time', 'type': 'long'},
309 {'name': 'temp', 'type': 'int'},
310 ],
311 }
312 parsed_schema = parse_schema(schema)
313
314 records = [
315 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
316 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
317 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
318 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
319 ]
320
321 with open('weather.avro', 'wb') as out:
322 writer(out, parsed_schema, records)
323
324 The fo argument is a file-like object so another common example
325 usage would use an io.BytesIO object like so:
326
327 from io import BytesIO
328 from fastavro import writer
329
330 fo = BytesIO()
331 writer(fo, schema, records)
332
333 Given an existing avro file, it's possible to append to it by
334 re-opening the file in a+b mode. If the file is only opened in
335 ab mode, we aren't able to read some of the existing header in‐
336 formation and an error will be raised. For example:
337
338 # Write initial records
339 with open('weather.avro', 'wb') as out:
340 writer(out, parsed_schema, records)
341
342 # Write some more records
343 with open('weather.avro', 'a+b') as out:
344 writer(out, None, more_records)
345
346 Note: When appending, any schema provided will be ignored since
347 the schema in the avro file will be re-used. Therefore it is
348 convenient to just use None as the schema.
349
350 schemaless_writer(fo: IO, schema: Union[str, List, Dict], record: Any,
351 *, strict: bool = False, strict_allow_default: bool = False, dis‐
352 able_tuple_notation: bool = False)
353 Write a single record without the schema or header information
354
355 Parameters
356
357 • fo -- Output file
358
359 • schema -- Schema
360
361 • record -- Record to write
362
363 • strict -- If set to True, an error will be raised if
364 records do not contain exactly the same fields that the
365 schema states
366
367 • strict_allow_default -- If set to True, an error will
368 be raised if records do not contain exactly the same
369 fields that the schema states unless it is a missing
370 field that has a default value in the schema
371
372 • disable_tuple_notation -- If set to True, tuples will
373 not be treated as a special case. Therefore, using a
374 tuple to indicate the type of a record will not work
375
376 Example:
377
378 parsed_schema = fastavro.parse_schema(schema)
379 with open('file', 'wb') as fp:
380 fastavro.schemaless_writer(fp, parsed_schema, record)
381
382 Note: The schemaless_writer can only write a single record.
383
384 Using the tuple notation to specify which branch of a union to take
385 Since this library uses plain dictionaries to represent a record, it is
386 possible for that dictionary to fit the definition of two different
387 records.
388
389 For example, given a dictionary like this:
390
391 {"name": "My Name"}
392
393 It would be valid against both of these records:
394
395 child_schema = {
396 "name": "Child",
397 "type": "record",
398 "fields": [
399 {"name": "name", "type": "string"},
400 {"name": "favorite_color", "type": ["null", "string"]},
401 ]
402 }
403
404 pet_schema = {
405 "name": "Pet",
406 "type": "record",
407 "fields": [
408 {"name": "name", "type": "string"},
409 {"name": "favorite_toy", "type": ["null", "string"]},
410 ]
411 }
412
413 This becomes a problem when a schema contains a union of these two sim‐
414 ilar records as it is not clear which record the dictionary represents.
415 For example, if you used the previous dictionary with the following
416 schema, it wouldn't be clear if the record should be serialized as a
417 Child or a Pet:
418
419 household_schema = {
420 "name": "Household",
421 "type": "record",
422 "fields": [
423 {"name": "address", "type": "string"},
424 {
425 "name": "family_members",
426 "type": {
427 "type": "array", "items": [
428 {
429 "name": "Child",
430 "type": "record",
431 "fields": [
432 {"name": "name", "type": "string"},
433 {"name": "favorite_color", "type": ["null", "string"]},
434 ]
435 }, {
436 "name": "Pet",
437 "type": "record",
438 "fields": [
439 {"name": "name", "type": "string"},
440 {"name": "favorite_toy", "type": ["null", "string"]},
441 ]
442 }
443 ]
444 }
445 },
446 ]
447 }
448
449 To resolve this, you can use a tuple notation where the first value of
450 the tuple is the fully namespaced record name and the second value is
451 the dictionary. For example:
452
453 records = [
454 {
455 "address": "123 Drive Street",
456 "family_members": [
457 ("Child", {"name": "Son"}),
458 ("Child", {"name": "Daughter"}),
459 ("Pet", {"name": "Dog"}),
460 ]
461 }
462 ]
463
464 Using the record hint to specify which branch of a union to take
465 In addition to the tuple notation for specifying the name of a record,
466 you can also include a special -type attribute (note that this attri‐
467 bute is -type, not type) on a record to do the same thing. So the exam‐
468 ple above which looked like this:
469
470 records = [
471 {
472 "address": "123 Drive Street",
473 "family_members": [
474 ("Child", {"name": "Son"}),
475 ("Child", {"name": "Daughter"}),
476 ("Pet", {"name": "Dog"}),
477 ]
478 }
479 ]
480
481 Would now look like this:
482
483 records = [
484 {
485 "address": "123 Drive Street",
486 "family_members": [
487 {"-type": "Child", "name": "Son"},
488 {"-type": "Child", "name": "Daughter"},
489 {"-type": "Pet", "name": "Dog"},
490 ]
491 }
492 ]
493
494 Unlike the tuple notation which can be used with any avro type in a
495 union, this -type hint can only be used with records. However, this can
496 be useful if you want to make a single record dictionary that can be
497 used both in and out of unions.
498
500 json_reader(fo: ~typing.IO, schema: ~typing.Union[str, ~typing.List,
501 ~typing.Dict], *, decoder=<class 'fastavro.io.json_decoder.AvroJSONDe‐
502 coder'>) -> reader
503 Iterator over records in an avro json file.
504
505 Parameters
506
507 • fo -- File-like object to read from
508
509 • schema -- Reader schema
510
511 • decoder -- By default the standard AvroJSONDecoder will
512 be used, but a custom one could be passed here
513
514 Example:
515
516 from fastavro import json_reader
517
518 schema = {
519 'doc': 'A weather reading.',
520 'name': 'Weather',
521 'namespace': 'test',
522 'type': 'record',
523 'fields': [
524 {'name': 'station', 'type': 'string'},
525 {'name': 'time', 'type': 'long'},
526 {'name': 'temp', 'type': 'int'},
527 ]
528 }
529
530 with open('some-file', 'r') as fo:
531 avro_reader = json_reader(fo, schema)
532 for record in avro_reader:
533 print(record)
534
536 json_writer(fo: ~typing.IO, schema: ~typing.Union[str, ~typing.List,
537 ~typing.Dict], records: ~typing.Iterable[~typing.Any], *,
538 write_union_type: bool = True, validator: bool = False, encoder=<class
539 'fastavro.io.json_encoder.AvroJSONEncoder'>, strict: bool = False,
540 strict_allow_default: bool = False, disable_tuple_notation: bool =
541 False) -> None
542 Write records to fo (stream) according to schema
543
544 Parameters
545
546 • fo -- File-like object to write to
547
548 • schema -- Writer schema
549
550 • records -- Records to write. This is commonly a list of
551 the dictionary representation of the records, but it
552 can be any iterable
553
554 • write_union_type -- Determine whether to write the
555 union type in the json message. If this is set to
556 False the output will be clear json. It may however
557 not be decodable back to avro record by json_read.
558
559 • validator -- If true, validation will be done on the
560 records
561
562 • encoder -- By default the standard AvroJSONEncoder will
563 be used, but a custom one could be passed here
564
565 • strict -- If set to True, an error will be raised if
566 records do not contain exactly the same fields that the
567 schema states
568
569 • strict_allow_default -- If set to True, an error will
570 be raised if records do not contain exactly the same
571 fields that the schema states unless it is a missing
572 field that has a default value in the schema
573
574 • disable_tuple_notation -- If set to True, tuples will
575 not be treated as a special case. Therefore, using a
576 tuple to indicate the type of a record will not work
577
578 Example:
579
580 from fastavro import json_writer, parse_schema
581
582 schema = {
583 'doc': 'A weather reading.',
584 'name': 'Weather',
585 'namespace': 'test',
586 'type': 'record',
587 'fields': [
588 {'name': 'station', 'type': 'string'},
589 {'name': 'time', 'type': 'long'},
590 {'name': 'temp', 'type': 'int'},
591 ],
592 }
593 parsed_schema = parse_schema(schema)
594
595 records = [
596 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
597 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
598 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
599 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
600 ]
601
602 with open('some-file', 'w') as out:
603 json_writer(out, parsed_schema, records)
604
606 parse_schema(schema: Union[str, List, Dict], named_schemas: Op‐
607 tional[Dict[str, Dict]] = None, *, expand: bool = False, _write_hint:
608 bool = True, _force: bool = False) -> Union[str, List, Dict]
609 Returns a parsed avro schema
610
611 It is not necessary to call parse_schema but doing so and saving
612 the parsed schema for use later will make future operations
613 faster as the schema will not need to be reparsed.
614
615 Parameters
616
617 • schema -- Input schema
618
619 • named_schemas -- Dictionary of named schemas to their
620 schema definition
621
622 • expand -- If true, named schemas will be fully expanded
623 to their true schemas rather than being represented as
624 just the name. This format should be considered an out‐
625 put only and not passed in to other reader/writer func‐
626 tions as it does not conform to the avro specification
627 and will likely cause an exception
628
629 • _write_hint -- Internal API argument specifying whether
630 or not the __fastavro_parsed marker should be added to
631 the schema
632
633 • _force -- Internal API argument. If True, the schema
634 will always be parsed even if it has been parsed and
635 has the __fastavro_parsed marker
636
637 Example:
638
639 from fastavro import parse_schema
640 from fastavro import writer
641
642 parsed_schema = parse_schema(original_schema)
643 with open('weather.avro', 'wb') as out:
644 writer(out, parsed_schema, records)
645
646 Sometimes you might have two schemas where one schema references
647 another. For the sake of example, let's assume you have a Par‐
648 ent schema that references a Child schema`. If you were to try
649 to parse the parent schema on its own, you would get an excep‐
650 tion because the child schema isn't defined. To accommodate
651 this, we can use the named_schemas argument to pass a shared
652 dictionary when parsing both of the schemas. The dictionary will
653 get populated with the necessary schema references to make pars‐
654 ing possible. For example:
655
656 from fastavro import parse_schema
657
658 named_schemas = {}
659 parsed_child = parse_schema(child_schema, named_schemas)
660 parsed_parent = parse_schema(parent_schema, named_schemas)
661
662 fullname(schema: Dict) -> str
663 Returns the fullname of a schema
664
665 Parameters
666 schema -- Input schema
667
668 Example:
669
670 from fastavro.schema import fullname
671
672 schema = {
673 'doc': 'A weather reading.',
674 'name': 'Weather',
675 'namespace': 'test',
676 'type': 'record',
677 'fields': [
678 {'name': 'station', 'type': 'string'},
679 {'name': 'time', 'type': 'long'},
680 {'name': 'temp', 'type': 'int'},
681 ],
682 }
683
684 fname = fullname(schema)
685 assert fname == "test.Weather"
686
687 expand_schema(schema: Union[str, List, Dict]) -> Union[str, List, Dict]
688 Returns a schema where all named types are expanded to their
689 real schema
690
691 NOTE: The output of this function produces a schema that can in‐
692 clude multiple definitions of the same named type (as per de‐
693 sign) which are not valid per the avro specification. Therefore,
694 the output of this should not be passed to the normal
695 writer/reader functions as it will likely result in an error.
696
697 Parameters
698 schema (dict) -- Input schema
699
700 Example:
701
702 from fastavro.schema import expand_schema
703
704 original_schema = {
705 "name": "MasterSchema",
706 "namespace": "com.namespace.master",
707 "type": "record",
708 "fields": [{
709 "name": "field_1",
710 "type": {
711 "name": "Dependency",
712 "namespace": "com.namespace.dependencies",
713 "type": "record",
714 "fields": [
715 {"name": "sub_field_1", "type": "string"}
716 ]
717 }
718 }, {
719 "name": "field_2",
720 "type": "com.namespace.dependencies.Dependency"
721 }]
722 }
723
724 expanded_schema = expand_schema(original_schema)
725
726 assert expanded_schema == {
727 "name": "com.namespace.master.MasterSchema",
728 "type": "record",
729 "fields": [{
730 "name": "field_1",
731 "type": {
732 "name": "com.namespace.dependencies.Dependency",
733 "type": "record",
734 "fields": [
735 {"name": "sub_field_1", "type": "string"}
736 ]
737 }
738 }, {
739 "name": "field_2",
740 "type": {
741 "name": "com.namespace.dependencies.Dependency",
742 "type": "record",
743 "fields": [
744 {"name": "sub_field_1", "type": "string"}
745 ]
746 }
747 }]
748 }
749
750 load_schema(schema_path: str, *, repo: Optional[‐
751 AbstractSchemaRepository] = None, named_schemas: Optional[Dict[str,
752 Dict]] = None, _write_hint: bool = True, _injected_schemas: Set[str] =
753 None) -> Union[str, List, Dict]
754 Returns a schema loaded from repository.
755
756 Will recursively load referenced schemas attempting to load them
757 from same repository, using schema_path as schema name.
758
759 If repo is not provided, FlatDictRepository is used. FlatDic‐
760 tRepository will try to load schemas from the same directory as‐
761 suming files are named with the convention <full_name>.avsc.
762
763 Parameters
764
765 • schema_path -- Full schema name, or path to schema file
766 if default repo is used.
767
768 • repo -- Schema repository instance.
769
770 • named_schemas -- Dictionary of named schemas to their
771 schema definition
772
773 • _write_hint -- Internal API argument specifying whether
774 or not the __fastavro_parsed marker should be added to
775 the schema
776
777 • _injected_schemas -- Internal API argument. Set of
778 names that have been injected
779
780 Consider the following example with default FlatDictReposi‐
781 tory...
782
783 namespace.Parent.avsc:
784
785 {
786 "type": "record",
787 "name": "Parent",
788 "namespace": "namespace",
789 "fields": [
790 {
791 "name": "child",
792 "type": "Child"
793 }
794 ]
795 }
796
797 namespace.Child.avsc:
798
799 {
800 "type": "record",
801 "namespace": "namespace",
802 "name": "Child",
803 "fields": []
804 }
805
806 Code:
807
808 from fastavro.schema import load_schema
809
810 parsed_schema = load_schema("namespace.Parent.avsc")
811
812 load_schema_ordered(ordered_schemas: List[str], *, _write_hint: bool =
813 True) -> Union[str, List, Dict]
814 Returns a schema loaded from a list of schemas.
815
816 The list of schemas should be ordered such that any dependencies
817 are listed before any other schemas that use those dependencies.
818 For example, if schema A depends on schema B and schema B de‐
819 pends on schema C, then the list of schemas should be [C, B, A].
820
821 Parameters
822
823 • ordered_schemas -- List of paths to schemas
824
825 • _write_hint -- Internal API argument specifying whether
826 or not the __fastavro_parsed marker should be added to
827 the schema
828
829 Consider the following example...
830
831 Parent.avsc:
832
833 {
834 "type": "record",
835 "name": "Parent",
836 "namespace": "namespace",
837 "fields": [
838 {
839 "name": "child",
840 "type": "Child"
841 }
842 ]
843 }
844
845 namespace.Child.avsc:
846
847 {
848 "type": "record",
849 "namespace": "namespace",
850 "name": "Child",
851 "fields": []
852 }
853
854 Code:
855
856 from fastavro.schema import load_schema_ordered
857
858 parsed_schema = load_schema_ordered(
859 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
860 )
861
862 to_parsing_canonical_form(schema: Union[str, List, Dict]) -> str
863 Returns a string represening the parsing canonical form of the
864 schema.
865
866 For more details on the parsing canonical form, see here:
867 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
868
869 Parameters
870 schema -- Schema to transform
871
872 fingerprint(parsing_canonical_form: str, algorithm: str) -> str
873 Returns a string represening a fingerprint/hash of the parsing
874 canonical form of a schema.
875
876 For more details on the fingerprint, see here:
877 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
878
879 Parameters
880
881 • parsing_canonical_form -- The parsing canonical form of
882 a schema
883
884 • algorithm -- The hashing algorithm
885
887 validate(datum: Any, schema: Union[str, List, Dict], field: str = '',
888 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
889 tion: bool = False) -> bool
890 Determine if a python datum is an instance of a schema.
891
892 Parameters
893
894 • datum -- Data being validated
895
896 • schema -- Schema
897
898 • field -- Record field being validated
899
900 • raise_errors -- If true, errors are raised for invalid
901 data. If false, a simple True (valid) or False (in‐
902 valid) result is returned
903
904 • strict -- If true, fields without values will raise er‐
905 rors rather than implicitly defaulting to None
906
907 • disable_tuple_notation -- If set to True, tuples will
908 not be treated as a special case. Therefore, using a
909 tuple to indicate the type of a record will not work
910
911 Example:
912
913 from fastavro.validation import validate
914 schema = {...}
915 record = {...}
916 validate(record, schema)
917
918 validate_many(records: Iterable[Any], schema: Union[str, List, Dict],
919 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
920 tion: bool = False) -> bool
921 Validate a list of data!
922
923 Parameters
924
925 • records -- List of records to validate
926
927 • schema -- Schema
928
929 • raise_errors -- If true, errors are raised for invalid
930 data. If false, a simple True (valid) or False (in‐
931 valid) result is returned
932
933 • strict -- If true, fields without values will raise er‐
934 rors rather than implicitly defaulting to None
935
936 • disable_tuple_notation -- If set to True, tuples will
937 not be treated as a special case. Therefore, using a
938 tuple to indicate the type of a record will not work
939
940 Example:
941
942 from fastavro.validation import validate_many
943 schema = {...}
944 records = [{...}, {...}, ...]
945 validate_many(records, schema)
946
948 generate_one(schema: Union[str, List, Dict]) -> Any
949 Returns a single instance of arbitrary data that conforms to the
950 schema.
951
952 Parameters
953 schema -- Schema that data should conform to
954
955 Example:
956
957 from fastavro import schemaless_writer
958 from fastavro.utils import generate_one
959
960 schema = {
961 'doc': 'A weather reading.',
962 'name': 'Weather',
963 'namespace': 'test',
964 'type': 'record',
965 'fields': [
966 {'name': 'station', 'type': 'string'},
967 {'name': 'time', 'type': 'long'},
968 {'name': 'temp', 'type': 'int'},
969 ],
970 }
971
972 with open('weather.avro', 'wb') as out:
973 schemaless_writer(out, schema, generate_one(schema))
974
975 generate_many(schema: Union[str, List, Dict], count: int) -> Itera‐
976 tor[Any]
977 A generator that yields arbitrary data that conforms to the
978 schema. It will yield a number of data structures equal to what
979 is given in the count
980
981 Parameters
982
983 • schema -- Schema that data should conform to
984
985 • count -- Number of objects to generate
986
987 Example:
988
989 from fastavro import writer
990 from fastavro.utils import generate_many
991
992 schema = {
993 'doc': 'A weather reading.',
994 'name': 'Weather',
995 'namespace': 'test',
996 'type': 'record',
997 'fields': [
998 {'name': 'station', 'type': 'string'},
999 {'name': 'time', 'type': 'long'},
1000 {'name': 'temp', 'type': 'int'},
1001 ],
1002 }
1003
1004 with open('weather.avro', 'wb') as out:
1005 writer(out, schema, generate_many(schema, 5))
1006
1007 anonymize_schema(schema: Union[str, List, Dict]) -> Union[str, List,
1008 Dict]
1009 Returns an anonymized schema
1010
1011 Parameters
1012 schema -- Schema to anonymize
1013
1014 Example:
1015
1016 from fastavro.utils import anonymize_schema
1017
1018 anonymized_schema = anonymize_schema(original_schema)
1019
1021 Fastavro supports the following official logical types:
1022
1023 • decimal
1024
1025 • uuid
1026
1027 • date
1028
1029 • time-millis
1030
1031 • time-micros
1032
1033 • timestamp-millis
1034
1035 • timestamp-micros
1036
1037 • local-timestamp-millis
1038
1039 • local-timestamp-micros
1040
1041 Fastavro is missing support for the following official logical types:
1042
1043 • duration
1044
1045 How to specify logical types in your schemas
1046 The docs say that when you want to make something a logical type, you
1047 just need to add a logicalType key. Unfortunately, this means that a
1048 common confusion is that people will take a pre-existing schema like
1049 this:
1050
1051 schema = {
1052 "type": "record",
1053 "name": "root",
1054 "fields": [
1055 {
1056 "name": "id",
1057 "type": "string",
1058 },
1059 ]
1060 }
1061
1062 And then add the uuid logical type like this:
1063
1064 schema = {
1065 "type": "record",
1066 "name": "root",
1067 "fields": [
1068 {
1069 "name": "id",
1070 "type": "string",
1071 "logicalType": "uuid", # This is the wrong place to add this key
1072 },
1073 ]
1074 }
1075
1076 However, that adds the logicalType key to the field schema which is not
1077 correct. Instead, we want to group it with the string like so:
1078
1079 schema = {
1080 "type": "record",
1081 "name": "root",
1082 "fields": [
1083 {
1084 "name": "id",
1085 "type": {
1086 "type": "string",
1087 "logicalType": "uuid", # This is the correct place to add this key
1088 },
1089 },
1090 ]
1091 }
1092
1093 Custom Logical Types
1094 The Avro specification defines a handful of logical types that most im‐
1095 plementations support. For example, one of the defined logical types is
1096 a microsecond precision timestamp. The specification states that this
1097 value will get encoded as an avro long type.
1098
1099 For the sake of an example, let's say you want to create a new logical
1100 type for a microsecond precision timestamp that uses a string as the
1101 underlying avro type.
1102
1103 To do this, there are a few functions that need to be defined. First,
1104 we need an encoder function that will encode a datetime object as a
1105 string. The encoder function is called with two arguments: the data and
1106 the schema. So we could define this like so:
1107
1108 def encode_datetime_as_string(data, schema):
1109 return datetime.isoformat(data)
1110
1111 # or
1112
1113 def encode_datetime_as_string(data, *args):
1114 return datetime.isoformat(data)
1115
1116 Then, we need a decoder function that will transform the string back
1117 into a datetime object. The decoder function is called with three argu‐
1118 ments: the data, the writer schema, and the reader schema. So we could
1119 define this like so:
1120
1121 def decode_string_as_datetime(data, writer_schema, reader_schema):
1122 return datetime.fromisoformat(data)
1123
1124 # or
1125
1126 def decode_string_as_datetime(data, *args):
1127 return datetime.fromisoformat(data)
1128
1129 Finally, we need to tell fastavro to use these functions. The schema
1130 for this custom logical type will use the type string and can use what‐
1131 ever name you would like as the logicalType. In this example, let's
1132 suppose we call the logicalType datetime2. To have the library actually
1133 use the custom logical type, we use the name of <avro_type>-<logi‐
1134 cal_type>, so in this example that name would be string-datetime2 and
1135 then we add those functions like so:
1136
1137 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1138 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1139
1140 And you are done. Now if the library comes across a schema with a logi‐
1141 cal type of datetime2 and an avro type of string, it will use the cus‐
1142 tom functions. For a complete example, see here:
1143
1144 import io
1145 from datetime import datetime
1146
1147 import fastavro
1148 from fastavro import writer, reader
1149
1150
1151 def encode_datetime_as_string(data, *args):
1152 return datetime.isoformat(data)
1153
1154 def decode_string_as_datetime(data, *args):
1155 return datetime.fromisoformat(data)
1156
1157 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1158 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1159
1160
1161 writer_schema = fastavro.parse_schema({
1162 "type": "record",
1163 "name": "root",
1164 "fields": [
1165 {
1166 "name": "some_date",
1167 "type": [
1168 "null",
1169 {
1170 "type": "string",
1171 "logicalType": "datetime2",
1172 },
1173 ],
1174 },
1175 ]
1176 })
1177
1178 records = [
1179 {"some_date": datetime.now()}
1180 ]
1181
1182 bio = io.BytesIO()
1183
1184 writer(bio, writer_schema, records)
1185
1186 bio.seek(0)
1187
1188 for record in reader(bio):
1189 print(record)
1190
1192 A command line script is installed with the library that can be used to
1193 dump the contents of avro file(s) to the standard output.
1194
1195 Usage:
1196
1197 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1198
1199 iter over avro file, emit records as JSON
1200
1201 positional arguments:
1202 file file(s) to parse
1203
1204 optional arguments:
1205 -h, --help show this help message and exit
1206 --schema dump schema instead of records
1207 --codecs print supported codecs
1208 --version show program's version number and exit
1209 -p, --pretty pretty print json
1210
1211 Examples
1212 Read an avro file:
1213
1214 $ fastavro weather.avro
1215
1216 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1217 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1218 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1219 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1220 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1221
1222 Show the schema:
1223
1224 $ fastavro --schema weather.avro
1225
1226 {
1227 "type": "record",
1228 "namespace": "test",
1229 "doc": "A weather reading.",
1230 "fields": [
1231 {
1232 "type": "string",
1233 "name": "station"
1234 },
1235 {
1236 "type": "long",
1237 "name": "time"
1238 },
1239 {
1240 "type": "int",
1241 "name": "temp"
1242 }
1243 ],
1244 "name": "Weather"
1245 }
1246
1248 fastavro.io.json_decoder
1249 class AvroJSONDecoder(fo: IO)
1250 Decoder for the avro JSON format.
1251
1252 NOTE: All attributes and methods on this class should be consid‐
1253 ered private.
1254
1255 Parameters
1256 fo -- File-like object to reader from
1257
1258 fastavro.io.json_encoder
1259 class AvroJSONEncoder(fo: IO, *, write_union_type: bool = True)
1260 Encoder for the avro JSON format.
1261
1262 NOTE: All attributes and methods on this class should be consid‐
1263 ered private.
1264
1265 Parameters
1266
1267 • fo -- Input stream
1268
1269 • write_union_type -- Determine whether to write the
1270 union type in the json message.
1271
1273 fastavro.repository.base
1274 class AbstractSchemaRepository
1275
1276 • Index
1277
1278 • Module Index
1279
1280 • Search Page
1281
1283 Miki Tebeka
1284
1286 2022, Miki Tebeka
1287
1288
1289
1290
12911.7.0 Oct 30, 2022 FASTAVRO(1)