1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it'll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it'll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro's RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
86 class reader(fo: IO | AvroJSONDecoder, reader_schema: str | List | Dict
87 | None = None, return_record_name: bool = False, re‐
88 turn_record_name_override: bool = False, handle_unicode_errors: str =
89 'strict', return_named_type: bool = False, return_named_type_override:
90 bool = False)
91 Iterator over records in an avro file.
92
93 Parameters
94
95 • fo -- File-like object to read from
96
97 • reader_schema -- Reader schema
98
99 • return_record_name -- If true, when reading a union of
100 records, the result will be a tuple where the first
101 value is the name of the record and the second value is
102 the record itself
103
104 • return_record_name_override -- If true, this will mod‐
105 ify the behavior of return_record_name so that the
106 record name is only returned for unions where there is
107 more than one record. For unions that only have one
108 record, this option will make it so that the record is
109 returned by itself, not a tuple with the name.
110
111 • return_named_type -- If true, when reading a union of
112 named types, the result will be a tuple where the first
113 value is the name of the type and the second value is
114 the record itself NOTE: Using this option will ignore
115 return_record_name and return_record_name_override
116
117 • return_named_type_override -- If true, this will modify
118 the behavior of return_named_type so that the named
119 type is only returned for unions where there is more
120 than one named type. For unions that only have one
121 named type, this option will make it so that the named
122 type is returned by itself, not a tuple with the name
123
124 • handle_unicode_errors -- Default strict. Should be set
125 to a valid string that can be used in the errors argu‐
126 ment of the string decode() function. Examples include
127 replace and ignore
128
129 Example:
130
131 from fastavro import reader
132 with open('some-file.avro', 'rb') as fo:
133 avro_reader = reader(fo)
134 for record in avro_reader:
135 process_record(record)
136
137 The fo argument is a file-like object so another common example
138 usage would use an io.BytesIO object like so:
139
140 from io import BytesIO
141 from fastavro import writer, reader
142
143 fo = BytesIO()
144 writer(fo, schema, records)
145 fo.seek(0)
146 for record in reader(fo):
147 process_record(record)
148
149 metadata
150 Key-value pairs in the header metadata
151
152 codec The codec used when writing
153
154 writer_schema
155 The schema used when writing
156
157 reader_schema
158 The schema used when reading (if provided)
159
160 class block_reader(fo: IO, reader_schema: str | List | Dict | None =
161 None, return_record_name: bool = False, return_record_name_override:
162 bool = False, handle_unicode_errors: str = 'strict', return_named_type:
163 bool = False, return_named_type_override: bool = False)
164 Iterator over Block in an avro file.
165
166 Parameters
167
168 • fo -- Input stream
169
170 • reader_schema -- Reader schema
171
172 • return_record_name -- If true, when reading a union of
173 records, the result will be a tuple where the first
174 value is the name of the record and the second value is
175 the record itself
176
177 • return_record_name_override -- If true, this will mod‐
178 ify the behavior of return_record_name so that the
179 record name is only returned for unions where there is
180 more than one record. For unions that only have one
181 record, this option will make it so that the record is
182 returned by itself, not a tuple with the name.
183
184 • return_named_type -- If true, when reading a union of
185 named types, the result will be a tuple where the first
186 value is the name of the type and the second value is
187 the record itself NOTE: Using this option will ignore
188 return_record_name and return_record_name_override
189
190 • return_named_type_override -- If true, this will modify
191 the behavior of return_named_type so that the named
192 type is only returned for unions where there is more
193 than one named type. For unions that only have one
194 named type, this option will make it so that the named
195 type is returned by itself, not a tuple with the name
196
197 • handle_unicode_errors -- Default strict. Should be set
198 to a valid string that can be used in the errors argu‐
199 ment of the string decode() function. Examples include
200 replace and ignore
201
202 Example:
203
204 from fastavro import block_reader
205 with open('some-file.avro', 'rb') as fo:
206 avro_reader = block_reader(fo)
207 for block in avro_reader:
208 process_block(block)
209
210 metadata
211 Key-value pairs in the header metadata
212
213 codec The codec used when writing
214
215 writer_schema
216 The schema used when writing
217
218 reader_schema
219 The schema used when reading (if provided)
220
221 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
222 named_schemas, offset, size, options)
223 An avro block. Will yield records when iterated over
224
225 num_records
226 Number of records in the block
227
228 writer_schema
229 The schema used when writing
230
231 reader_schema
232 The schema used when reading (if provided)
233
234 offset Offset of the block from the beginning of the avro file
235
236 size Size of the block in bytes
237
238 schemaless_reader(fo: IO, writer_schema: str | List | Dict,
239 reader_schema: str | List | Dict | None = None, return_record_name:
240 bool = False, return_record_name_override: bool = False, handle_uni‐
241 code_errors: str = 'strict', return_named_type: bool = False, re‐
242 turn_named_type_override: bool = False) -> None | str | float | int |
243 Decimal | bool | bytes | List | Dict
244 Reads a single record written using the schemaless_writer()
245
246 Parameters
247
248 • fo -- Input stream
249
250 • writer_schema -- Schema used when calling schema‐
251 less_writer
252
253 • reader_schema -- If the schema has changed since being
254 written then the new schema can be given to allow for
255 schema migration
256
257 • return_record_name -- If true, when reading a union of
258 records, the result will be a tuple where the first
259 value is the name of the record and the second value is
260 the record itself
261
262 • return_record_name_override -- If true, this will mod‐
263 ify the behavior of return_record_name so that the
264 record name is only returned for unions where there is
265 more than one record. For unions that only have one
266 record, this option will make it so that the record is
267 returned by itself, not a tuple with the name.
268
269 • return_named_type -- If true, when reading a union of
270 named types, the result will be a tuple where the first
271 value is the name of the type and the second value is
272 the record itself NOTE: Using this option will ignore
273 return_record_name and return_record_name_override
274
275 • return_named_type_override -- If true, this will modify
276 the behavior of return_named_type so that the named
277 type is only returned for unions where there is more
278 than one named type. For unions that only have one
279 named type, this option will make it so that the named
280 type is returned by itself, not a tuple with the name
281
282 • handle_unicode_errors -- Default strict. Should be set
283 to a valid string that can be used in the errors argu‐
284 ment of the string decode() function. Examples include
285 replace and ignore
286
287 Example:
288
289 parsed_schema = fastavro.parse_schema(schema)
290 with open('file', 'rb') as fp:
291 record = fastavro.schemaless_reader(fp, parsed_schema)
292
293 Note: The schemaless_reader can only read a single record.
294
295 is_avro(path_or_buffer: str | IO) -> bool
296 Return True if path (or buffer) points to an Avro file. This
297 will only work for avro files that contain the normal avro
298 schema header like those create from writer(). This function is
299 not intended to be used with binary data created from
300 schemaless_writer() since that does not include the avro header.
301
302 Parameters
303 path_or_buffer -- Path to file
304
306 writer(fo: IO | AvroJSONEncoder, schema: str | List | Dict, records:
307 Iterable[Any], codec: str = 'null', sync_interval: int = 16000, meta‐
308 data: Dict[str, str] | None = None, validator: bool = False,
309 sync_marker: bytes = b'', codec_compression_level: int | None = None,
310 *, strict: bool = False, strict_allow_default: bool = False, dis‐
311 able_tuple_notation: bool = False)
312 Write records to fo (stream) according to schema
313
314 Parameters
315
316 • fo -- Output stream
317
318 • schema -- Writer schema
319
320 • records -- Records to write. This is commonly a list of
321 the dictionary representation of the records, but it
322 can be any iterable
323
324 • codec -- Compression codec, can be 'null', 'deflate' or
325 'snappy' (if installed)
326
327 • sync_interval -- Size of sync interval
328
329 • metadata -- Header metadata
330
331 • validator -- If true, validation will be done on the
332 records
333
334 • sync_marker -- A byte string used as the avro sync
335 marker. If not provided, a random byte string will be
336 used.
337
338 • codec_compression_level -- Compression level to use
339 with the specified codec (if the codec supports it)
340
341 • strict -- If set to True, an error will be raised if
342 records do not contain exactly the same fields that the
343 schema states
344
345 • strict_allow_default -- If set to True, an error will
346 be raised if records do not contain exactly the same
347 fields that the schema states unless it is a missing
348 field that has a default value in the schema
349
350 • disable_tuple_notation -- If set to True, tuples will
351 not be treated as a special case. Therefore, using a
352 tuple to indicate the type of a record will not work
353
354 Example:
355
356 from fastavro import writer, parse_schema
357
358 schema = {
359 'doc': 'A weather reading.',
360 'name': 'Weather',
361 'namespace': 'test',
362 'type': 'record',
363 'fields': [
364 {'name': 'station', 'type': 'string'},
365 {'name': 'time', 'type': 'long'},
366 {'name': 'temp', 'type': 'int'},
367 ],
368 }
369 parsed_schema = parse_schema(schema)
370
371 records = [
372 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
373 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
374 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
375 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
376 ]
377
378 with open('weather.avro', 'wb') as out:
379 writer(out, parsed_schema, records)
380
381 The fo argument is a file-like object so another common example
382 usage would use an io.BytesIO object like so:
383
384 from io import BytesIO
385 from fastavro import writer
386
387 fo = BytesIO()
388 writer(fo, schema, records)
389
390 Given an existing avro file, it's possible to append to it by
391 re-opening the file in a+b mode. If the file is only opened in
392 ab mode, we aren't able to read some of the existing header in‐
393 formation and an error will be raised. For example:
394
395 # Write initial records
396 with open('weather.avro', 'wb') as out:
397 writer(out, parsed_schema, records)
398
399 # Write some more records
400 with open('weather.avro', 'a+b') as out:
401 writer(out, None, more_records)
402
403 Note: When appending, any schema provided will be ignored since
404 the schema in the avro file will be re-used. Therefore it is
405 convenient to just use None as the schema.
406
407 schemaless_writer(fo: IO, schema: str | List | Dict, record: Any, *,
408 strict: bool = False, strict_allow_default: bool = False, disable_tu‐
409 ple_notation: bool = False)
410 Write a single record without the schema or header information
411
412 Parameters
413
414 • fo -- Output file
415
416 • schema -- Schema
417
418 • record -- Record to write
419
420 • strict -- If set to True, an error will be raised if
421 records do not contain exactly the same fields that the
422 schema states
423
424 • strict_allow_default -- If set to True, an error will
425 be raised if records do not contain exactly the same
426 fields that the schema states unless it is a missing
427 field that has a default value in the schema
428
429 • disable_tuple_notation -- If set to True, tuples will
430 not be treated as a special case. Therefore, using a
431 tuple to indicate the type of a record will not work
432
433 Example:
434
435 parsed_schema = fastavro.parse_schema(schema)
436 with open('file', 'wb') as fp:
437 fastavro.schemaless_writer(fp, parsed_schema, record)
438
439 Note: The schemaless_writer can only write a single record.
440
441 Using the tuple notation to specify which branch of a union to take
442 Since this library uses plain dictionaries to represent a record, it is
443 possible for that dictionary to fit the definition of two different
444 records.
445
446 For example, given a dictionary like this:
447
448 {"name": "My Name"}
449
450 It would be valid against both of these records:
451
452 child_schema = {
453 "name": "Child",
454 "type": "record",
455 "fields": [
456 {"name": "name", "type": "string"},
457 {"name": "favorite_color", "type": ["null", "string"]},
458 ]
459 }
460
461 pet_schema = {
462 "name": "Pet",
463 "type": "record",
464 "fields": [
465 {"name": "name", "type": "string"},
466 {"name": "favorite_toy", "type": ["null", "string"]},
467 ]
468 }
469
470 This becomes a problem when a schema contains a union of these two sim‐
471 ilar records as it is not clear which record the dictionary represents.
472 For example, if you used the previous dictionary with the following
473 schema, it wouldn't be clear if the record should be serialized as a
474 Child or a Pet:
475
476 household_schema = {
477 "name": "Household",
478 "type": "record",
479 "fields": [
480 {"name": "address", "type": "string"},
481 {
482 "name": "family_members",
483 "type": {
484 "type": "array", "items": [
485 {
486 "name": "Child",
487 "type": "record",
488 "fields": [
489 {"name": "name", "type": "string"},
490 {"name": "favorite_color", "type": ["null", "string"]},
491 ]
492 }, {
493 "name": "Pet",
494 "type": "record",
495 "fields": [
496 {"name": "name", "type": "string"},
497 {"name": "favorite_toy", "type": ["null", "string"]},
498 ]
499 }
500 ]
501 }
502 },
503 ]
504 }
505
506 To resolve this, you can use a tuple notation where the first value of
507 the tuple is the fully namespaced record name and the second value is
508 the dictionary. For example:
509
510 records = [
511 {
512 "address": "123 Drive Street",
513 "family_members": [
514 ("Child", {"name": "Son"}),
515 ("Child", {"name": "Daughter"}),
516 ("Pet", {"name": "Dog"}),
517 ]
518 }
519 ]
520
521 Using the record hint to specify which branch of a union to take
522 In addition to the tuple notation for specifying the name of a record,
523 you can also include a special -type attribute (note that this attri‐
524 bute is -type, not type) on a record to do the same thing. So the exam‐
525 ple above which looked like this:
526
527 records = [
528 {
529 "address": "123 Drive Street",
530 "family_members": [
531 ("Child", {"name": "Son"}),
532 ("Child", {"name": "Daughter"}),
533 ("Pet", {"name": "Dog"}),
534 ]
535 }
536 ]
537
538 Would now look like this:
539
540 records = [
541 {
542 "address": "123 Drive Street",
543 "family_members": [
544 {"-type": "Child", "name": "Son"},
545 {"-type": "Child", "name": "Daughter"},
546 {"-type": "Pet", "name": "Dog"},
547 ]
548 }
549 ]
550
551 Unlike the tuple notation which can be used with any avro type in a
552 union, this -type hint can only be used with records. However, this can
553 be useful if you want to make a single record dictionary that can be
554 used both in and out of unions.
555
557 json_reader(fo: ~typing.IO, schema: str | ~typing.List | ~typing.Dict,
558 reader_schema: str | ~typing.List | ~typing.Dict | None = None, *, de‐
559 coder=<class 'fastavro.io.json_decoder.AvroJSONDecoder'>) -> reader
560 Iterator over records in an avro json file.
561
562 Parameters
563
564 • fo -- File-like object to read from
565
566 • schema -- Original schema used when writing the JSON
567 data
568
569 • reader_schema -- If the schema has changed since being
570 written then the new schema can be given to allow for
571 schema migration
572
573 • decoder -- By default the standard AvroJSONDecoder will
574 be used, but a custom one could be passed here
575
576 Example:
577
578 from fastavro import json_reader
579
580 schema = {
581 'doc': 'A weather reading.',
582 'name': 'Weather',
583 'namespace': 'test',
584 'type': 'record',
585 'fields': [
586 {'name': 'station', 'type': 'string'},
587 {'name': 'time', 'type': 'long'},
588 {'name': 'temp', 'type': 'int'},
589 ]
590 }
591
592 with open('some-file', 'r') as fo:
593 avro_reader = json_reader(fo, schema)
594 for record in avro_reader:
595 print(record)
596
598 json_writer(fo: ~typing.IO, schema: str | ~typing.List | ~typing.Dict,
599 records: ~typing.Iterable[~typing.Any], *, write_union_type: bool =
600 True, validator: bool = False, encoder=<class 'fastavro.io.json_en‐
601 coder.AvroJSONEncoder'>, strict: bool = False, strict_allow_default:
602 bool = False, disable_tuple_notation: bool = False) -> None
603 Write records to fo (stream) according to schema
604
605 Parameters
606
607 • fo -- File-like object to write to
608
609 • schema -- Writer schema
610
611 • records -- Records to write. This is commonly a list of
612 the dictionary representation of the records, but it
613 can be any iterable
614
615 • write_union_type -- Determine whether to write the
616 union type in the json message. If this is set to
617 False the output will be clear json. It may however
618 not be decodable back to avro record by json_read.
619
620 • validator -- If true, validation will be done on the
621 records
622
623 • encoder -- By default the standard AvroJSONEncoder will
624 be used, but a custom one could be passed here
625
626 • strict -- If set to True, an error will be raised if
627 records do not contain exactly the same fields that the
628 schema states
629
630 • strict_allow_default -- If set to True, an error will
631 be raised if records do not contain exactly the same
632 fields that the schema states unless it is a missing
633 field that has a default value in the schema
634
635 • disable_tuple_notation -- If set to True, tuples will
636 not be treated as a special case. Therefore, using a
637 tuple to indicate the type of a record will not work
638
639 Example:
640
641 from fastavro import json_writer, parse_schema
642
643 schema = {
644 'doc': 'A weather reading.',
645 'name': 'Weather',
646 'namespace': 'test',
647 'type': 'record',
648 'fields': [
649 {'name': 'station', 'type': 'string'},
650 {'name': 'time', 'type': 'long'},
651 {'name': 'temp', 'type': 'int'},
652 ],
653 }
654 parsed_schema = parse_schema(schema)
655
656 records = [
657 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
658 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
659 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
660 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
661 ]
662
663 with open('some-file', 'w') as out:
664 json_writer(out, parsed_schema, records)
665
667 parse_schema(schema: str | List | Dict, named_schemas: Dict[str, Dict]
668 | None = None, *, expand: bool = False, _write_hint: bool = True,
669 _force: bool = False, _ignore_default_error: bool = False) -> str |
670 List | Dict
671 Returns a parsed avro schema
672
673 It is not necessary to call parse_schema but doing so and saving
674 the parsed schema for use later will make future operations
675 faster as the schema will not need to be reparsed.
676
677 Parameters
678
679 • schema -- Input schema
680
681 • named_schemas -- Dictionary of named schemas to their
682 schema definition
683
684 • expand -- If true, named schemas will be fully expanded
685 to their true schemas rather than being represented as
686 just the name. This format should be considered an out‐
687 put only and not passed in to other reader/writer func‐
688 tions as it does not conform to the avro specification
689 and will likely cause an exception
690
691 • _write_hint -- Internal API argument specifying whether
692 or not the __fastavro_parsed marker should be added to
693 the schema
694
695 • _force -- Internal API argument. If True, the schema
696 will always be parsed even if it has been parsed and
697 has the __fastavro_parsed marker
698
699 • _ignore_default_error -- Internal API argument. If
700 True, when a union has the wrong default value, an er‐
701 ror will not be raised.
702
703 Example:
704
705 from fastavro import parse_schema
706 from fastavro import writer
707
708 parsed_schema = parse_schema(original_schema)
709 with open('weather.avro', 'wb') as out:
710 writer(out, parsed_schema, records)
711
712 Sometimes you might have two schemas where one schema references
713 another. For the sake of example, let's assume you have a Par‐
714 ent schema that references a Child schema`. If you were to try
715 to parse the parent schema on its own, you would get an excep‐
716 tion because the child schema isn't defined. To accommodate
717 this, we can use the named_schemas argument to pass a shared
718 dictionary when parsing both of the schemas. The dictionary will
719 get populated with the necessary schema references to make pars‐
720 ing possible. For example:
721
722 from fastavro import parse_schema
723
724 named_schemas = {}
725 parsed_child = parse_schema(child_schema, named_schemas)
726 parsed_parent = parse_schema(parent_schema, named_schemas)
727
728 fullname(schema: Dict) -> str
729 Returns the fullname of a schema
730
731 Parameters
732 schema -- Input schema
733
734 Example:
735
736 from fastavro.schema import fullname
737
738 schema = {
739 'doc': 'A weather reading.',
740 'name': 'Weather',
741 'namespace': 'test',
742 'type': 'record',
743 'fields': [
744 {'name': 'station', 'type': 'string'},
745 {'name': 'time', 'type': 'long'},
746 {'name': 'temp', 'type': 'int'},
747 ],
748 }
749
750 fname = fullname(schema)
751 assert fname == "test.Weather"
752
753 expand_schema(schema: str | List | Dict) -> str | List | Dict
754 Returns a schema where all named types are expanded to their
755 real schema
756
757 NOTE: The output of this function produces a schema that can in‐
758 clude multiple definitions of the same named type (as per de‐
759 sign) which are not valid per the avro specification. Therefore,
760 the output of this should not be passed to the normal
761 writer/reader functions as it will likely result in an error.
762
763 Parameters
764 schema (dict) -- Input schema
765
766 Example:
767
768 from fastavro.schema import expand_schema
769
770 original_schema = {
771 "name": "MasterSchema",
772 "namespace": "com.namespace.master",
773 "type": "record",
774 "fields": [{
775 "name": "field_1",
776 "type": {
777 "name": "Dependency",
778 "namespace": "com.namespace.dependencies",
779 "type": "record",
780 "fields": [
781 {"name": "sub_field_1", "type": "string"}
782 ]
783 }
784 }, {
785 "name": "field_2",
786 "type": "com.namespace.dependencies.Dependency"
787 }]
788 }
789
790 expanded_schema = expand_schema(original_schema)
791
792 assert expanded_schema == {
793 "name": "com.namespace.master.MasterSchema",
794 "type": "record",
795 "fields": [{
796 "name": "field_1",
797 "type": {
798 "name": "com.namespace.dependencies.Dependency",
799 "type": "record",
800 "fields": [
801 {"name": "sub_field_1", "type": "string"}
802 ]
803 }
804 }, {
805 "name": "field_2",
806 "type": {
807 "name": "com.namespace.dependencies.Dependency",
808 "type": "record",
809 "fields": [
810 {"name": "sub_field_1", "type": "string"}
811 ]
812 }
813 }]
814 }
815
816 load_schema(schema_path: str, *, repo: AbstractSchemaRepository | None
817 = None, named_schemas: Dict[str, Dict] | None = None, _write_hint: bool
818 = True, _injected_schemas: Set[str] | None = None) -> str | List | Dict
819 Returns a schema loaded from repository.
820
821 Will recursively load referenced schemas attempting to load them
822 from same repository, using schema_path as schema name.
823
824 If repo is not provided, FlatDictRepository is used. FlatDic‐
825 tRepository will try to load schemas from the same directory as‐
826 suming files are named with the convention <full_name>.avsc.
827
828 Parameters
829
830 • schema_path -- Full schema name, or path to schema file
831 if default repo is used.
832
833 • repo -- Schema repository instance.
834
835 • named_schemas -- Dictionary of named schemas to their
836 schema definition
837
838 • _write_hint -- Internal API argument specifying whether
839 or not the __fastavro_parsed marker should be added to
840 the schema
841
842 • _injected_schemas -- Internal API argument. Set of
843 names that have been injected
844
845 Consider the following example with default FlatDictReposi‐
846 tory...
847
848 namespace.Parent.avsc:
849
850 {
851 "type": "record",
852 "name": "Parent",
853 "namespace": "namespace",
854 "fields": [
855 {
856 "name": "child",
857 "type": "Child"
858 }
859 ]
860 }
861
862 namespace.Child.avsc:
863
864 {
865 "type": "record",
866 "namespace": "namespace",
867 "name": "Child",
868 "fields": []
869 }
870
871 Code:
872
873 from fastavro.schema import load_schema
874
875 parsed_schema = load_schema("namespace.Parent.avsc")
876
877 load_schema_ordered(ordered_schemas: List[str], *, _write_hint: bool =
878 True) -> str | List | Dict
879 Returns a schema loaded from a list of schemas.
880
881 The list of schemas should be ordered such that any dependencies
882 are listed before any other schemas that use those dependencies.
883 For example, if schema A depends on schema B and schema B de‐
884 pends on schema C, then the list of schemas should be [C, B, A].
885
886 Parameters
887
888 • ordered_schemas -- List of paths to schemas
889
890 • _write_hint -- Internal API argument specifying whether
891 or not the __fastavro_parsed marker should be added to
892 the schema
893
894 Consider the following example...
895
896 Parent.avsc:
897
898 {
899 "type": "record",
900 "name": "Parent",
901 "namespace": "namespace",
902 "fields": [
903 {
904 "name": "child",
905 "type": "Child"
906 }
907 ]
908 }
909
910 namespace.Child.avsc:
911
912 {
913 "type": "record",
914 "namespace": "namespace",
915 "name": "Child",
916 "fields": []
917 }
918
919 Code:
920
921 from fastavro.schema import load_schema_ordered
922
923 parsed_schema = load_schema_ordered(
924 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
925 )
926
927 to_parsing_canonical_form(schema: str | List | Dict) -> str
928 Returns a string represening the parsing canonical form of the
929 schema.
930
931 For more details on the parsing canonical form, see here:
932 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
933
934 Parameters
935 schema -- Schema to transform
936
937 fingerprint(parsing_canonical_form: str, algorithm: str) -> str
938 Returns a string represening a fingerprint/hash of the parsing
939 canonical form of a schema.
940
941 For more details on the fingerprint, see here:
942 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
943
944 Parameters
945
946 • parsing_canonical_form -- The parsing canonical form of
947 a schema
948
949 • algorithm -- The hashing algorithm
950
952 validate(datum: Any, schema: str | List | Dict, field: str = '',
953 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
954 tion: bool = False) -> bool
955 Determine if a python datum is an instance of a schema.
956
957 Parameters
958
959 • datum -- Data being validated
960
961 • schema -- Schema
962
963 • field -- Record field being validated
964
965 • raise_errors -- If true, errors are raised for invalid
966 data. If false, a simple True (valid) or False (in‐
967 valid) result is returned
968
969 • strict -- If true, fields without values will raise er‐
970 rors rather than implicitly defaulting to None
971
972 • disable_tuple_notation -- If set to True, tuples will
973 not be treated as a special case. Therefore, using a
974 tuple to indicate the type of a record will not work
975
976 Example:
977
978 from fastavro.validation import validate
979 schema = {...}
980 record = {...}
981 validate(record, schema)
982
983 validate_many(records: Iterable[Any], schema: str | List | Dict,
984 raise_errors: bool = True, strict: bool = False, disable_tuple_nota‐
985 tion: bool = False) -> bool
986 Validate a list of data!
987
988 Parameters
989
990 • records -- List of records to validate
991
992 • schema -- Schema
993
994 • raise_errors -- If true, errors are raised for invalid
995 data. If false, a simple True (valid) or False (in‐
996 valid) result is returned
997
998 • strict -- If true, fields without values will raise er‐
999 rors rather than implicitly defaulting to None
1000
1001 • disable_tuple_notation -- If set to True, tuples will
1002 not be treated as a special case. Therefore, using a
1003 tuple to indicate the type of a record will not work
1004
1005 Example:
1006
1007 from fastavro.validation import validate_many
1008 schema = {...}
1009 records = [{...}, {...}, ...]
1010 validate_many(records, schema)
1011
1013 generate_one(schema: str | List | Dict) -> Any
1014 Returns a single instance of arbitrary data that conforms to the
1015 schema.
1016
1017 Parameters
1018 schema -- Schema that data should conform to
1019
1020 Example:
1021
1022 from fastavro import schemaless_writer
1023 from fastavro.utils import generate_one
1024
1025 schema = {
1026 'doc': 'A weather reading.',
1027 'name': 'Weather',
1028 'namespace': 'test',
1029 'type': 'record',
1030 'fields': [
1031 {'name': 'station', 'type': 'string'},
1032 {'name': 'time', 'type': 'long'},
1033 {'name': 'temp', 'type': 'int'},
1034 ],
1035 }
1036
1037 with open('weather.avro', 'wb') as out:
1038 schemaless_writer(out, schema, generate_one(schema))
1039
1040 generate_many(schema: str | List | Dict, count: int) -> Iterator[Any]
1041 A generator that yields arbitrary data that conforms to the
1042 schema. It will yield a number of data structures equal to what
1043 is given in the count
1044
1045 Parameters
1046
1047 • schema -- Schema that data should conform to
1048
1049 • count -- Number of objects to generate
1050
1051 Example:
1052
1053 from fastavro import writer
1054 from fastavro.utils import generate_many
1055
1056 schema = {
1057 'doc': 'A weather reading.',
1058 'name': 'Weather',
1059 'namespace': 'test',
1060 'type': 'record',
1061 'fields': [
1062 {'name': 'station', 'type': 'string'},
1063 {'name': 'time', 'type': 'long'},
1064 {'name': 'temp', 'type': 'int'},
1065 ],
1066 }
1067
1068 with open('weather.avro', 'wb') as out:
1069 writer(out, schema, generate_many(schema, 5))
1070
1071 anonymize_schema(schema: str | List | Dict) -> str | List | Dict
1072 Returns an anonymized schema
1073
1074 Parameters
1075 schema -- Schema to anonymize
1076
1077 Example:
1078
1079 from fastavro.utils import anonymize_schema
1080
1081 anonymized_schema = anonymize_schema(original_schema)
1082
1084 Fastavro supports the following official logical types:
1085
1086 • decimal
1087
1088 • uuid
1089
1090 • date
1091
1092 • time-millis
1093
1094 • time-micros
1095
1096 • timestamp-millis
1097
1098 • timestamp-micros
1099
1100 • local-timestamp-millis
1101
1102 • local-timestamp-micros
1103
1104 Fastavro is missing support for the following official logical types:
1105
1106 • duration
1107
1108 How to specify logical types in your schemas
1109 The docs say that when you want to make something a logical type, you
1110 just need to add a logicalType key. Unfortunately, this means that a
1111 common confusion is that people will take a pre-existing schema like
1112 this:
1113
1114 schema = {
1115 "type": "record",
1116 "name": "root",
1117 "fields": [
1118 {
1119 "name": "id",
1120 "type": "string",
1121 },
1122 ]
1123 }
1124
1125 And then add the uuid logical type like this:
1126
1127 schema = {
1128 "type": "record",
1129 "name": "root",
1130 "fields": [
1131 {
1132 "name": "id",
1133 "type": "string",
1134 "logicalType": "uuid", # This is the wrong place to add this key
1135 },
1136 ]
1137 }
1138
1139 However, that adds the logicalType key to the field schema which is not
1140 correct. Instead, we want to group it with the string like so:
1141
1142 schema = {
1143 "type": "record",
1144 "name": "root",
1145 "fields": [
1146 {
1147 "name": "id",
1148 "type": {
1149 "type": "string",
1150 "logicalType": "uuid", # This is the correct place to add this key
1151 },
1152 },
1153 ]
1154 }
1155
1156 Custom Logical Types
1157 The Avro specification defines a handful of logical types that most im‐
1158 plementations support. For example, one of the defined logical types is
1159 a microsecond precision timestamp. The specification states that this
1160 value will get encoded as an avro long type.
1161
1162 For the sake of an example, let's say you want to create a new logical
1163 type for a microsecond precision timestamp that uses a string as the
1164 underlying avro type.
1165
1166 To do this, there are a few functions that need to be defined. First,
1167 we need an encoder function that will encode a datetime object as a
1168 string. The encoder function is called with two arguments: the data and
1169 the schema. So we could define this like so:
1170
1171 def encode_datetime_as_string(data, schema):
1172 return datetime.isoformat(data)
1173
1174 # or
1175
1176 def encode_datetime_as_string(data, *args):
1177 return datetime.isoformat(data)
1178
1179 Then, we need a decoder function that will transform the string back
1180 into a datetime object. The decoder function is called with three argu‐
1181 ments: the data, the writer schema, and the reader schema. So we could
1182 define this like so:
1183
1184 def decode_string_as_datetime(data, writer_schema, reader_schema):
1185 return datetime.fromisoformat(data)
1186
1187 # or
1188
1189 def decode_string_as_datetime(data, *args):
1190 return datetime.fromisoformat(data)
1191
1192 Finally, we need to tell fastavro to use these functions. The schema
1193 for this custom logical type will use the type string and can use what‐
1194 ever name you would like as the logicalType. In this example, let's
1195 suppose we call the logicalType datetime2. To have the library actually
1196 use the custom logical type, we use the name of <avro_type>-<logi‐
1197 cal_type>, so in this example that name would be string-datetime2 and
1198 then we add those functions like so:
1199
1200 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1201 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1202
1203 And you are done. Now if the library comes across a schema with a logi‐
1204 cal type of datetime2 and an avro type of string, it will use the cus‐
1205 tom functions. For a complete example, see here:
1206
1207 import io
1208 from datetime import datetime
1209
1210 import fastavro
1211 from fastavro import writer, reader
1212
1213
1214 def encode_datetime_as_string(data, *args):
1215 return datetime.isoformat(data)
1216
1217 def decode_string_as_datetime(data, *args):
1218 return datetime.fromisoformat(data)
1219
1220 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1221 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1222
1223
1224 writer_schema = fastavro.parse_schema({
1225 "type": "record",
1226 "name": "root",
1227 "fields": [
1228 {
1229 "name": "some_date",
1230 "type": [
1231 "null",
1232 {
1233 "type": "string",
1234 "logicalType": "datetime2",
1235 },
1236 ],
1237 },
1238 ]
1239 })
1240
1241 records = [
1242 {"some_date": datetime.now()}
1243 ]
1244
1245 bio = io.BytesIO()
1246
1247 writer(bio, writer_schema, records)
1248
1249 bio.seek(0)
1250
1251 for record in reader(bio):
1252 print(record)
1253
1255 A command line script is installed with the library that can be used to
1256 dump the contents of avro file(s) to the standard output.
1257
1258 Usage:
1259
1260 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1261
1262 iter over avro file, emit records as JSON
1263
1264 positional arguments:
1265 file file(s) to parse
1266
1267 optional arguments:
1268 -h, --help show this help message and exit
1269 --schema dump schema instead of records
1270 --codecs print supported codecs
1271 --version show program's version number and exit
1272 -p, --pretty pretty print json
1273
1274 Examples
1275 Read an avro file:
1276
1277 $ fastavro weather.avro
1278
1279 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1280 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1281 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1282 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1283 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1284
1285 Show the schema:
1286
1287 $ fastavro --schema weather.avro
1288
1289 {
1290 "type": "record",
1291 "namespace": "test",
1292 "doc": "A weather reading.",
1293 "fields": [
1294 {
1295 "type": "string",
1296 "name": "station"
1297 },
1298 {
1299 "type": "long",
1300 "name": "time"
1301 },
1302 {
1303 "type": "int",
1304 "name": "temp"
1305 }
1306 ],
1307 "name": "Weather"
1308 }
1309
1311 fastavro.io.json_decoder
1312 class AvroJSONDecoder(fo: IO)
1313 Decoder for the avro JSON format.
1314
1315 NOTE: All attributes and methods on this class should be consid‐
1316 ered private.
1317
1318 Parameters
1319 fo -- File-like object to reader from
1320
1321 fastavro.io.json_encoder
1322 class AvroJSONEncoder(fo: IO, *, write_union_type: bool = True)
1323 Encoder for the avro JSON format.
1324
1325 NOTE: All attributes and methods on this class should be consid‐
1326 ered private.
1327
1328 Parameters
1329
1330 • fo -- Input stream
1331
1332 • write_union_type -- Determine whether to write the
1333 union type in the json message.
1334
1336 fastavro.repository.base
1337 class AbstractSchemaRepository
1338
1339 • Index
1340
1341 • Module Index
1342
1343 • Search Page
1344
1346 Miki Tebeka
1347
1349 2023, Miki Tebeka
1350
1351
1352
1353
13541.8.4 Oct 06, 2023 FASTAVRO(1)