[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: [avro] branch master updated: AVRO-2748: Refactor Schema Matching (#861)
From: kojiromike () apache ! org
Date: 2020-04-27 12:53:29
Message-ID: 158799200982.9041.576039272748114076 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 771c16b AVRO-2748: Refactor Schema Matching (#861)
771c16b is described below
commit 771c16b521e34e721e31bb2caf9f86699ea10dff
Author: Michael A. Smith <michael@smith-li.com>
AuthorDate: Mon Apr 27 08:53:18 2020 -0400
AVRO-2748: Refactor Schema Matching (#861)
Move schema matching into schema objects because each type knows its matching \
logic best.
---
lang/py/avro/io.py | 73 +++++++--------------------------------------
lang/py/avro/schema.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 89 insertions(+), 64 deletions(-)
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index 52b631a..ffbd0af 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -585,61 +585,8 @@ class BinaryEncoder(object):
#
# DatumReader/Writer
#
-
class DatumReader(object):
"""Deserialize Avro-encoded data into a Python data structure."""
- @staticmethod
- def check_props(schema_one, schema_two, prop_list):
- for prop in prop_list:
- if getattr(schema_one, prop) != getattr(schema_two, prop):
- return False
- return True
-
- @staticmethod
- def match_schemas(writers_schema, readers_schema):
- w_type = writers_schema.type
- r_type = readers_schema.type
- if 'union' in [w_type, r_type] or 'error_union' in [w_type, r_type]:
- return True
- elif (w_type in schema.PRIMITIVE_TYPES and r_type in schema.PRIMITIVE_TYPES
- and w_type == r_type):
- return True
- elif (w_type == r_type == 'record' and
- DatumReader.check_props(writers_schema, readers_schema,
- ['fullname'])):
- return True
- elif (w_type == r_type == 'error' and
- DatumReader.check_props(writers_schema, readers_schema,
- ['fullname'])):
- return True
- elif (w_type == r_type == 'request'):
- return True
- elif (w_type == r_type == 'fixed' and
- DatumReader.check_props(writers_schema, readers_schema,
- ['fullname', 'size'])):
- return True
- elif (w_type == r_type == 'enum' and
- DatumReader.check_props(writers_schema, readers_schema,
- ['fullname'])):
- return True
- elif (w_type == r_type == 'map' and
- DatumReader.check_props(writers_schema.values,
- readers_schema.values, ['type'])):
- return True
- elif (w_type == r_type == 'array' and
- DatumReader.check_props(writers_schema.items,
- readers_schema.items, ['type'])):
- return True
-
- # Handle schema promotion
- if w_type == 'int' and r_type in ['long', 'float', 'double']:
- return True
- elif w_type == 'long' and r_type in ['float', 'double']:
- return True
- elif w_type == 'float' and r_type == 'double':
- return True
- return False
-
def __init__(self, writers_schema=None, readers_schema=None):
"""
As defined in the Avro specification, we call the schema encoded
@@ -658,7 +605,6 @@ class DatumReader(object):
self._readers_schema = readers_schema
readers_schema = property(lambda self: self._readers_schema,
set_readers_schema)
-
def read(self, decoder):
if self.readers_schema is None:
self.readers_schema = self.writers_schema
@@ -666,21 +612,26 @@ class DatumReader(object):
def read_data(self, writers_schema, readers_schema, decoder):
# schema matching
- if not DatumReader.match_schemas(writers_schema, readers_schema):
+ if not readers_schema.match(writers_schema):
fail_msg = 'Schemas do not match.'
raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
logical_type = getattr(writers_schema, 'logical_type', None)
- # schema resolution: reader's schema is a union, writer's schema is not
- if (writers_schema.type not in ['union', 'error_union']
- and readers_schema.type in ['union', 'error_union']):
+
+ # function dispatch for reading data based on type of writer's schema
+ if writers_schema.type in ['union', 'error_union']:
+ return self.read_union(writers_schema, readers_schema, decoder)
+
+ if readers_schema.type in ['union', 'error_union']:
+ # schema resolution: reader's schema is a union, writer's schema is not
for s in readers_schema.schemas:
- if DatumReader.match_schemas(writers_schema, s):
+ if s.match(writers_schema):
return self.read_data(writers_schema, s, decoder)
+
+ # This shouldn't happen because of the match check at the start of this \
method. fail_msg = 'Schemas do not match.'
raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
- # function dispatch for reading data based on type of writer's schema
if writers_schema.type == 'null':
return decoder.read_null()
elif writers_schema.type == 'boolean':
@@ -728,8 +679,6 @@ class DatumReader(object):
return self.read_array(writers_schema, readers_schema, decoder)
elif writers_schema.type == 'map':
return self.read_map(writers_schema, readers_schema, decoder)
- elif writers_schema.type in ['union', 'error_union']:
- return self.read_union(writers_schema, readers_schema, decoder)
elif writers_schema.type in ['record', 'error', 'request']:
return self.read_record(writers_schema, readers_schema, decoder)
else:
diff --git a/lang/py/avro/schema.py b/lang/py/avro/schema.py
index f41852d..1248ec5 100644
--- a/lang/py/avro/schema.py
+++ b/lang/py/avro/schema.py
@@ -177,6 +177,23 @@ class Schema(object):
other_props = property(lambda self: get_other_props(self._props, \
SCHEMA_RESERVED_PROPS), doc="dictionary of non-reserved properties")
+ def check_props(self, other, props):
+ """Check that the given props are identical in two schemas.
+
+ @arg other: The other schema to check
+ @arg props: An iterable of properties to check
+ @return bool: True if all the properties match
+ """
+ return all(getattr(self, prop) == getattr(other, prop) for prop in props)
+
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the writer schema to match against.
+ @return bool
+ """
+ raise NotImplemented("Must be implemented by subclasses")
+
# utility functions to manipulate properties dict
def get_prop(self, key):
return self._props.get(key)
@@ -460,6 +477,19 @@ class PrimitiveSchema(Schema):
self.fullname = type
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return self.type == writer.type or {
+ 'float': self.type == 'double',
+ 'int': self.type in {'double', 'float', 'long'},
+ 'long': self.type in {'double', 'float',},
+ }.get(writer.type, False)
+
+
def to_json(self, names=None):
if len(self.props) == 1:
return self.fullname
@@ -472,7 +502,6 @@ class PrimitiveSchema(Schema):
#
# Decimal Bytes Type
#
-
class BytesDecimalSchema(PrimitiveSchema, DecimalLogicalSchema):
def __init__(self, precision, scale=0, other_props=None):
DecimalLogicalSchema.__init__(self, precision, scale, max_precision=((1 << 31) - \
1)) @@ -494,7 +523,6 @@ class BytesDecimalSchema(PrimitiveSchema, \
DecimalLogicalSchema): #
# Complex Types (non-recursive)
#
-
class FixedSchema(NamedSchema):
def __init__(self, name, namespace, size, names=None, other_props=None):
# Ensure valid ctor args
@@ -511,6 +539,14 @@ class FixedSchema(NamedSchema):
# read-only properties
size = property(lambda self: self.get_prop('size'))
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return self.type == writer.type and self.check_props(writer, ['fullname', \
'size']) +
def to_json(self, names=None):
if names is None:
names = Names()
@@ -574,6 +610,14 @@ class EnumSchema(NamedSchema):
symbols = property(lambda self: self.get_prop('symbols'))
doc = property(lambda self: self.get_prop('doc'))
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return self.type == writer.type and self.check_props(writer, ['fullname'])
+
def to_json(self, names=None):
if names is None:
names = Names()
@@ -610,6 +654,14 @@ class ArraySchema(Schema):
# read-only properties
items = property(lambda self: self.get_prop('items'))
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return self.type == writer.type and self.items.check_props(writer.items, \
['type']) +
def to_json(self, names=None):
if names is None:
names = Names()
@@ -643,6 +695,14 @@ class MapSchema(Schema):
# read-only properties
values = property(lambda self: self.get_prop('values'))
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return writer.type == self.type and self.values.check_props(writer.values, \
['type']) +
def to_json(self, names=None):
if names is None:
names = Names()
@@ -690,6 +750,14 @@ class UnionSchema(Schema):
# read-only properties
schemas = property(lambda self: self._schemas)
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the writer schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return writer.type in {'union', 'error_union'} or any(s.match(writer) for s in \
self.schemas) +
def to_json(self, names=None):
if names is None:
names = Names()
@@ -750,6 +818,14 @@ class RecordSchema(NamedSchema):
field_objects.append(new_field)
return field_objects
+ def match(self, writer):
+ """Return True if the current schema (as reader) matches the other schema.
+
+ @arg writer: the schema to match against
+ @return bool
+ """
+ return writer.type == self.type and (self.type == 'request' or \
self.check_props(writer, ['fullname'])) +
def __init__(self, name, namespace, fields, names=None, schema_type='record',
doc=None, other_props=None):
# Ensure valid ctor args
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic