[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: avro git commit: AVRO-1969: Add schema compatibility checker for Ruby
From: suraj () apache ! org
Date: 2017-04-04 22:21:08
Message-ID: 03dec43157fc4b5081f3f1dd33d3e576 () git ! apache ! org
[Download RAW message or body]
Repository: avro
Updated Branches:
refs/heads/master 0550d2cce -> 4b3677c32
AVRO-1969: Add schema compatibility checker for Ruby
This closes #170
Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: sacharya <suraj@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/avro/repo
Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/4b3677c3
Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/4b3677c3
Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/4b3677c3
Branch: refs/heads/master
Commit: 4b3677c32b879e0e7f717eb95f9135ac654da760
Parents: 0550d2c
Author: Tim Perkins <tperkins@salsify.com>
Authored: Thu Dec 15 09:35:21 2016 -0500
Committer: sacharya <suraj@apache.org>
Committed: Tue Apr 4 17:20:10 2017 -0500
----------------------------------------------------------------------
lang/ruby/Manifest | 2 +
lang/ruby/lib/avro.rb | 1 +
lang/ruby/lib/avro/io.rb | 49 +--
lang/ruby/lib/avro/schema.rb | 28 +-
lang/ruby/lib/avro/schema_compatibility.rb | 168 ++++++++
lang/ruby/test/test_io.rb | 34 ++
lang/ruby/test/test_schema.rb | 34 ++
lang/ruby/test/test_schema_compatibility.rb | 463 +++++++++++++++++++++++
8 files changed, 728 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/Manifest
----------------------------------------------------------------------
diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest
index 3edd7cf..87bfd98 100644
--- a/lang/ruby/Manifest
+++ b/lang/ruby/Manifest
@@ -11,6 +11,7 @@ lib/avro/io.rb
lib/avro/ipc.rb
lib/avro/protocol.rb
lib/avro/schema.rb
+lib/avro/schema_compatibility.rb
lib/avro/schema_normalization.rb
lib/avro/schema_validator.rb
test/case_finder.rb
@@ -25,6 +26,7 @@ test/test_help.rb
test/test_io.rb
test/test_protocol.rb
test/test_schema.rb
+test/test_schema_compatibility.rb
test/test_schema_normalization.rb
test/test_schema_validator.rb
test/test_socket_transport.rb
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/lib/avro.rb b/lang/ruby/lib/avro.rb
index 1293f0f..81afbda 100644
--- a/lang/ruby/lib/avro.rb
+++ b/lang/ruby/lib/avro.rb
@@ -41,3 +41,4 @@ require 'avro/protocol'
require 'avro/ipc'
require 'avro/schema_normalization'
require 'avro/schema_validator'
+require 'avro/schema_compatibility'
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/io.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb
index 22beea2..b04a19a 100644
--- a/lang/ruby/lib/avro/io.rb
+++ b/lang/ruby/lib/avro/io.rb
@@ -221,46 +221,7 @@ module Avro
class DatumReader
def self.match_schemas(writers_schema, readers_schema)
- w_type = writers_schema.type_sym
- r_type = readers_schema.type_sym
-
- # This conditional is begging for some OO love.
- if w_type == :union || r_type == :union
- return true
- end
-
- if w_type == r_type
- return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
-
- case r_type
- when :record
- return writers_schema.fullname == readers_schema.fullname
- when :error
- return writers_schema.fullname == readers_schema.fullname
- when :request
- return true
- when :fixed
- return writers_schema.fullname == readers_schema.fullname &&
- writers_schema.size == readers_schema.size
- when :enum
- return writers_schema.fullname == readers_schema.fullname
- when :map
- return writers_schema.values.type == readers_schema.values.type
- when :array
- return writers_schema.items.type == readers_schema.items.type
- end
- end
-
- # Handle schema promotion
- if w_type == :int && [:long, :float, :double].include?(r_type)
- return true
- elsif w_type == :long && [:float, :double].include?(r_type)
- return true
- elsif w_type == :float && r_type == :double
- return true
- end
-
- return false
+ Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
end
attr_accessor :writers_schema, :readers_schema
@@ -393,11 +354,11 @@ module Avro
writers_fields_hash = writers_schema.fields_hash
readers_fields_hash.each do |field_name, field|
unless writers_fields_hash.has_key? field_name
- if !field.default.nil?
+ if field.default?
field_val = read_default_value(field.type, field.default)
read_record[field.name] = field_val
else
- # FIXME(jmhodges) another 'unset' here
+ raise AvroError, "Missing data for #{field.type} with no default"
end
end
end
@@ -407,10 +368,6 @@ module Avro
end
def read_default_value(field_schema, default_value)
- if default_value == :no_default
- raise AvroError, "Missing data for #{field_schema} with no default"
- end
-
# Basically a JSON Decoder?
case field_schema.type_sym
when :null
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/schema.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index 5038311..024d562 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -122,6 +122,18 @@ module Avro
Digest::SHA256.hexdigest(parsing_form).to_i(16)
end
+ def read?(writers_schema)
+ SchemaCompatibility.can_read?(writers_schema, self)
+ end
+
+ def be_read?(other_schema)
+ other_schema.read?(self)
+ end
+
+ def mutual_read?(other_schema)
+ SchemaCompatibility.mutual_read?(other_schema, self)
+ end
+
def ==(other, seen=nil)
other.is_a?(Schema) && type_sym == other.type_sym
end
@@ -210,7 +222,11 @@ module Avro
else
super(schema_type, name, namespace, names, doc)
end
- @fields = RecordSchema.make_field_objects(fields, names, self.namespace)
+ @fields = if fields
+ RecordSchema.make_field_objects(fields, names, self.namespace)
+ else
+ {}
+ end
end
def fields_hash
@@ -261,8 +277,7 @@ module Avro
def initialize(schemas, names=nil, default_namespace=nil)
super(:union)
- schema_objects = []
- schemas.each_with_index do |schema, i|
+ @schemas = schemas.each_with_object([]) do |schema, schema_objects|
new_schema = subparse(schema, names, default_namespace)
ns_type = new_schema.type_sym
@@ -275,7 +290,6 @@ module Avro
else
schema_objects << new_schema
end
- @schemas = schema_objects
end
end
@@ -348,9 +362,13 @@ module Avro
@doc = doc
end
+ def default?
+ @default != :no_default
+ end
+
def to_avro(names=Set.new)
{'name' => name, 'type' => type.to_avro(names)}.tap do |avro|
- avro['default'] = default unless default == :no_default
+ avro['default'] = default if default?
avro['order'] = order if order
avro['doc'] = doc if doc
end
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/lib/avro/schema_compatibility.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/lib/avro/schema_compatibility.rb \
b/lang/ruby/lib/avro/schema_compatibility.rb new file mode 100644
index 0000000..1842b3e
--- /dev/null
+++ b/lang/ruby/lib/avro/schema_compatibility.rb
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+module Avro
+ module SchemaCompatibility
+ # Perform a full, recursive check that a datum written using the writers_schema
+ # can be read using the readers_schema.
+ def self.can_read?(writers_schema, readers_schema)
+ Checker.new.can_read?(writers_schema, readers_schema)
+ end
+
+ # Perform a full, recursive check that a datum written using either the
+ # writers_schema or the readers_schema can be read using the other schema.
+ def self.mutual_read?(writers_schema, readers_schema)
+ Checker.new.mutual_read?(writers_schema, readers_schema)
+ end
+
+ # Perform a basic check that a datum written with the writers_schema could
+ # be read using the readers_schema. This check only includes matching the types,
+ # including schema promotion, and matching the full name for named types.
+ # Aliases for named types are not supported here, and the ruby implementation
+ # of Avro in general does not include support for aliases.
+ def self.match_schemas(writers_schema, readers_schema)
+ w_type = writers_schema.type_sym
+ r_type = readers_schema.type_sym
+
+ # This conditional is begging for some OO love.
+ if w_type == :union || r_type == :union
+ return true
+ end
+
+ if w_type == r_type
+ return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
+
+ case r_type
+ when :record
+ return writers_schema.fullname == readers_schema.fullname
+ when :error
+ return writers_schema.fullname == readers_schema.fullname
+ when :request
+ return true
+ when :fixed
+ return writers_schema.fullname == readers_schema.fullname &&
+ writers_schema.size == readers_schema.size
+ when :enum
+ return writers_schema.fullname == readers_schema.fullname
+ when :map
+ return match_schemas(writers_schema.values, readers_schema.values)
+ when :array
+ return match_schemas(writers_schema.items, readers_schema.items)
+ end
+ end
+
+ # Handle schema promotion
+ if w_type == :int && [:long, :float, :double].include?(r_type)
+ return true
+ elsif w_type == :long && [:float, :double].include?(r_type)
+ return true
+ elsif w_type == :float && r_type == :double
+ return true
+ elsif w_type == :string && r_type == :bytes
+ return true
+ elsif w_type == :bytes && r_type == :string
+ return true
+ end
+
+ return false
+ end
+
+ class Checker
+ SIMPLE_CHECKS = Schema::PRIMITIVE_TYPES_SYM.dup.add(:fixed).freeze
+
+ attr_reader :recursion_set
+ private :recursion_set
+
+ def initialize
+ @recursion_set = Set.new
+ end
+
+ def can_read?(writers_schema, readers_schema)
+ full_match_schemas(writers_schema, readers_schema)
+ end
+
+ def mutual_read?(writers_schema, readers_schema)
+ can_read?(writers_schema, readers_schema) && can_read?(readers_schema, \
writers_schema) + end
+
+ private
+
+ def full_match_schemas(writers_schema, readers_schema)
+ return true if recursion_in_progress?(writers_schema, readers_schema)
+
+ return false unless Avro::SchemaCompatibility.match_schemas(writers_schema, \
readers_schema) +
+ if writers_schema.type_sym != :union && \
SIMPLE_CHECKS.include?(readers_schema.type_sym) + return true
+ end
+
+ case readers_schema.type_sym
+ when :record
+ match_record_schemas(writers_schema, readers_schema)
+ when :map
+ full_match_schemas(writers_schema.values, readers_schema.values)
+ when :array
+ full_match_schemas(writers_schema.items, readers_schema.items)
+ when :union
+ match_union_schemas(writers_schema, readers_schema)
+ when :enum
+ # reader's symbols must contain all writer's symbols
+ (writers_schema.symbols - readers_schema.symbols).empty?
+ else
+ if writers_schema.type_sym == :union && writers_schema.schemas.size == 1
+ full_match_schemas(writers_schema.schemas.first, readers_schema)
+ else
+ false
+ end
+ end
+ end
+
+ def match_union_schemas(writers_schema, readers_schema)
+ raise 'readers_schema must be a union' unless readers_schema.type_sym == \
:union +
+ case writers_schema.type_sym
+ when :union
+ writers_schema.schemas.all? { |writer_type| \
full_match_schemas(writer_type, readers_schema) } + else
+ readers_schema.schemas.any? { |reader_type| \
full_match_schemas(writers_schema, reader_type) } + end
+ end
+
+ def match_record_schemas(writers_schema, readers_schema)
+ writer_fields_hash = writers_schema.fields_hash
+ readers_schema.fields.each do |field|
+ if writer_fields_hash.key?(field.name)
+ return false unless \
full_match_schemas(writer_fields_hash[field.name].type, field.type) + else
+ return false unless field.default?
+ end
+ end
+
+ return true
+ end
+
+ def recursion_in_progress?(writers_schema, readers_schema)
+ key = [writers_schema.object_id, readers_schema.object_id]
+
+ if recursion_set.include?(key)
+ true
+ else
+ recursion_set.add(key)
+ false
+ end
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_io.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb
index 09d725d..fc0088b 100644
--- a/lang/ruby/test/test_io.rb
+++ b/lang/ruby/test/test_io.rb
@@ -341,6 +341,40 @@ EOS
end
end
+ def test_interchangeable_schemas
+ interchangeable_schemas = ['"string"', '"bytes"']
+ incorrect = 0
+ interchangeable_schemas.each_with_index do |ws, i|
+ writers_schema = Avro::Schema.parse(ws)
+ datum_to_write = 'foo'
+ readers_schema = Avro::Schema.parse(interchangeable_schemas[i == 0 ? 1 : 0])
+ writer, * = write_datum(datum_to_write, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ if datum_read != datum_to_write
+ incorrect += 1
+ end
+ end
+ assert_equal(incorrect, 0)
+ end
+
+ def test_array_schema_promotion
+ writers_schema = Avro::Schema.parse('{"type":"array", "items":"int"}')
+ readers_schema = Avro::Schema.parse('{"type":"array", "items":"long"}')
+ datum_to_write = [1, 2]
+ writer, * = write_datum(datum_to_write, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ assert_equal(datum_read, datum_to_write)
+ end
+
+ def test_map_schema_promotion
+ writers_schema = Avro::Schema.parse('{"type":"map", "values":"int"}')
+ readers_schema = Avro::Schema.parse('{"type":"map", "values":"long"}')
+ datum_to_write = { 'foo' => 1, 'bar' => 2 }
+ writer, * = write_datum(datum_to_write, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ assert_equal(datum_read, datum_to_write)
+ end
+
def test_snappy_backward_compat
# a snappy-compressed block payload without the checksum
# this has no back-references, just one literal so the last 9
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_schema.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index 417d511..48fe0a5 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -258,4 +258,38 @@ class TestSchema < Test::Unit::TestCase
}
assert_equal enum_schema_hash, enum_schema_json.to_avro
end
+
+def test_empty_record
+ schema = Avro::Schema.parse('{"type":"record", "name":"Empty"}')
+ assert_empty(schema.fields)
+ end
+
+ def test_empty_union
+ schema = Avro::Schema.parse('[]')
+ assert_equal(schema.to_s, '[]')
+ end
+
+ def test_read
+ schema = Avro::Schema.parse('"string"')
+ writer_schema = Avro::Schema.parse('"int"')
+ assert_false(schema.read?(writer_schema))
+ assert_true(schema.read?(schema))
+ end
+
+ def test_be_read
+ schema = Avro::Schema.parse('"string"')
+ writer_schema = Avro::Schema.parse('"int"')
+ assert_false(schema.be_read?(writer_schema))
+ assert_true(schema.be_read?(schema))
+ end
+
+ def test_mutual_read
+ schema = Avro::Schema.parse('"string"')
+ writer_schema = Avro::Schema.parse('"int"')
+ default1 = Avro::Schema.parse('{"type":"record", "name":"Default", \
"fields":[{"name":"i", "type":"int", "default": 1}]}') + default2 = \
Avro::Schema.parse('{"type":"record", "name":"Default", "fields":[{"name:":"s", \
"type":"string", "default": ""}]}') + \
assert_false(schema.mutual_read?(writer_schema)) + \
assert_true(schema.mutual_read?(schema)) + \
assert_true(default1.mutual_read?(default2)) + end
end
http://git-wip-us.apache.org/repos/asf/avro/blob/4b3677c3/lang/ruby/test/test_schema_compatibility.rb
----------------------------------------------------------------------
diff --git a/lang/ruby/test/test_schema_compatibility.rb \
b/lang/ruby/test/test_schema_compatibility.rb new file mode 100644
index 0000000..138c895
--- /dev/null
+++ b/lang/ruby/test/test_schema_compatibility.rb
@@ -0,0 +1,463 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'test_help'
+
+class TestSchemaCompatibility < Test::Unit::TestCase
+
+ def test_primitive_schema_compatibility
+ Avro::Schema::PRIMITIVE_TYPES.each do |schema_type|
+ assert_true(can_read?(send("#{schema_type}_schema"), \
send("#{schema_type}_schema"))) + end
+ end
+
+ def test_compatible_reader_writer_pairs
+ [
+ long_schema, int_schema,
+ float_schema, int_schema,
+ float_schema, long_schema,
+ double_schema, long_schema,
+ double_schema, int_schema,
+ double_schema, float_schema,
+
+ int_array_schema, int_array_schema,
+ long_array_schema, int_array_schema,
+ int_map_schema, int_map_schema,
+ long_map_schema, int_map_schema,
+
+ enum1_ab_schema, enum1_ab_schema,
+ enum1_abc_schema, enum1_ab_schema,
+
+ string_schema, bytes_schema,
+ bytes_schema, string_schema,
+
+ empty_union_schema, empty_union_schema,
+ int_union_schema, int_union_schema,
+ int_string_union_schema, string_int_union_schema,
+ int_union_schema, empty_union_schema,
+ long_union_schema, int_union_schema,
+
+ int_union_schema, int_schema,
+ int_schema, int_union_schema,
+
+ empty_record1_schema, empty_record1_schema,
+ empty_record1_schema, a_int_record1_schema,
+
+ a_int_record1_schema, a_int_record1_schema,
+ a_dint_record1_schema, a_int_record1_schema,
+ a_dint_record1_schema, a_dint_record1_schema,
+ a_int_record1_schema, a_dint_record1_schema,
+
+ a_long_record1_schema, a_int_record1_schema,
+
+ a_int_record1_schema, a_int_b_int_record1_schema,
+ a_dint_record1_schema, a_int_b_int_record1_schema,
+
+ a_int_b_dint_record1_schema, a_int_record1_schema,
+ a_dint_b_dint_record1_schema, empty_record1_schema,
+ a_dint_b_dint_record1_schema, a_int_record1_schema,
+ a_int_b_int_record1_schema, a_dint_b_dint_record1_schema,
+
+ int_list_record_schema, int_list_record_schema,
+ long_list_record_schema, long_list_record_schema,
+ long_list_record_schema, int_list_record_schema,
+
+ null_schema, null_schema
+ ].each_slice(2) do |(reader, writer)|
+ assert_true(can_read?(writer, reader), "expecting #{reader} to read \
#{writer}") + end
+ end
+
+ def test_broken
+ assert_false(can_read?(int_string_union_schema, int_union_schema))
+ end
+
+ def test_incompatible_reader_writer_pairs
+ [
+ null_schema, int_schema,
+ null_schema, long_schema,
+
+ boolean_schema, int_schema,
+
+ int_schema, null_schema,
+ int_schema, boolean_schema,
+ int_schema, long_schema,
+ int_schema, float_schema,
+ int_schema, double_schema,
+
+ long_schema, float_schema,
+ long_schema, double_schema,
+
+ float_schema, double_schema,
+
+ string_schema, boolean_schema,
+ string_schema, int_schema,
+
+ bytes_schema, null_schema,
+ bytes_schema, int_schema,
+
+ int_array_schema, long_array_schema,
+ int_map_schema, int_array_schema,
+ int_array_schema, int_map_schema,
+ int_map_schema, long_map_schema,
+
+ enum1_ab_schema, enum1_abc_schema,
+ enum1_bc_schema, enum1_abc_schema,
+
+ enum1_ab_schema, enum2_ab_schema,
+ int_schema, enum2_ab_schema,
+ enum2_ab_schema, int_schema,
+
+ int_union_schema, int_string_union_schema,
+ string_union_schema, int_string_union_schema,
+
+ empty_record2_schema, empty_record1_schema,
+ a_int_record1_schema, empty_record1_schema,
+ a_int_b_dint_record1_schema, empty_record1_schema,
+
+ int_list_record_schema, long_list_record_schema,
+
+ null_schema, int_schema
+ ].each_slice(2) do |(reader, writer)|
+ assert_false(can_read?(writer, reader), "expecting #{reader} not to read \
#{writer}") + end
+ end
+
+ def writer_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield1", "type":"int"},
+ {"name":"oldfield2", "type":"string"}
+ ]}
+ SCHEMA
+ end
+
+ def test_missing_field
+ reader_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield1", "type":"int"}
+ ]}
+ SCHEMA
+ assert_true(can_read?(writer_schema, reader_schema))
+ assert_false(can_read?(reader_schema, writer_schema))
+ end
+
+ def test_missing_second_field
+ reader_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield2", "type":"string"}
+ ]}
+ SCHEMA
+ assert_true(can_read?(writer_schema, reader_schema))
+ assert_false(can_read?(reader_schema, writer_schema))
+ end
+
+ def test_all_fields
+ reader_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield1", "type":"int"},
+ {"name":"oldfield2", "type":"string"}
+ ]}
+ SCHEMA
+ assert_true(can_read?(writer_schema, reader_schema))
+ assert_true(can_read?(reader_schema, writer_schema))
+ end
+
+ def test_new_field_with_default
+ reader_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield1", "type":"int"},
+ {"name":"newfield1", "type":"int", "default":42}
+ ]}
+ SCHEMA
+ assert_true(can_read?(writer_schema, reader_schema))
+ assert_false(can_read?(reader_schema, writer_schema))
+ end
+
+ def test_new_field
+ reader_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"oldfield1", "type":"int"},
+ {"name":"newfield1", "type":"int"}
+ ]}
+ SCHEMA
+ assert_false(can_read?(writer_schema, reader_schema))
+ assert_false(can_read?(reader_schema, writer_schema))
+ end
+
+ def test_array_writer_schema
+ valid_reader = string_array_schema
+ invalid_reader = string_map_schema
+
+ assert_true(can_read?(string_array_schema, valid_reader))
+ assert_false(can_read?(string_array_schema, invalid_reader))
+ end
+
+ def test_primitive_writer_schema
+ valid_reader = string_schema
+ assert_true(can_read?(string_schema, valid_reader))
+ assert_false(can_read?(int_schema, string_schema))
+ end
+
+ def test_union_reader_writer_subset_incompatiblity
+ # reader union schema must contain all writer union branches
+ union_writer = union_schema(int_schema, string_schema)
+ union_reader = union_schema(string_schema)
+
+ assert_false(can_read?(union_writer, union_reader))
+ assert_true(can_read?(union_reader, union_writer))
+ end
+
+ def test_incompatible_record_field
+ string_schema = Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
+ {"name":"field1", "type":"string"}
+ ]}
+ SCHEMA
+ int_schema = Avro::Schema.parse <<-SCHEMA2
+ {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
+ {"name":"field1", "type":"int"}
+ ]}
+ SCHEMA2
+ assert_false(can_read?(string_schema, int_schema))
+ end
+
+ def test_enum_symbols
+ enum_schema1 = Avro::Schema.parse <<-SCHEMA
+ {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
+ SCHEMA
+ enum_schema2 = Avro::Schema.parse <<-SCHEMA
+ {"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}
+ SCHEMA
+ assert_false(can_read?(enum_schema2, enum_schema1))
+ assert_true(can_read?(enum_schema1, enum_schema2))
+ end
+
+ # Tests from lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator2.java
+
+ def point_2d_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Point2D", "fields":[
+ {"name":"x", "type":"double"},
+ {"name":"y", "type":"double"}
+ ]}
+ SCHEMA
+ end
+
+ def point_2d_fullname_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Point", "namespace":"written", "fields":[
+ {"name":"x", "type":"double"},
+ {"name":"y", "type":"double"}
+ ]}
+ SCHEMA
+ end
+
+ def point_3d_no_default_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Point", "fields":[
+ {"name":"x", "type":"double"},
+ {"name":"y", "type":"double"},
+ {"name":"z", "type":"double"}
+ ]}
+ SCHEMA
+ end
+
+ def point_3d_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Point3D", "fields":[
+ {"name":"x", "type":"double"},
+ {"name":"y", "type":"double"},
+ {"name":"z", "type":"double", "default": 0.0}
+ ]}
+ SCHEMA
+ end
+
+ def point_3d_match_name_schema
+ Avro::Schema.parse <<-SCHEMA
+ {"type":"record", "name":"Point", "fields":[
+ {"name":"x", "type":"double"},
+ {"name":"y", "type":"double"},
+ {"name":"z", "type":"double", "default": 0.0}
+ ]}
+ SCHEMA
+ end
+
+ def test_union_resolution_no_structure_match
+ # short name match, but no structure match
+ read_schema = union_schema(null_schema, point_3d_no_default_schema)
+ assert_false(can_read?(point_2d_fullname_schema, read_schema))
+ end
+
+ def test_union_resolution_first_structure_match_2d
+ # multiple structure matches with no name matches
+ read_schema = union_schema(null_schema, point_3d_no_default_schema, \
point_2d_schema, point_3d_schema) + \
assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end
+
+ def test_union_resolution_first_structure_match_3d
+ # multiple structure matches with no name matches
+ read_schema = union_schema(null_schema, point_3d_no_default_schema, \
point_3d_schema, point_2d_schema) + \
assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end
+
+ def test_union_resolution_named_structure_match
+ # multiple structure matches with a short name match
+ read_schema = union_schema(null_schema, point_2d_schema, \
point_3d_match_name_schema, point_3d_schema) + \
assert_false(can_read?(point_2d_fullname_schema, read_schema)) + end
+
+ def test_union_resolution_full_name_match
+ # there is a full name match that should be chosen
+ read_schema = union_schema(null_schema, point_2d_schema, \
point_3d_match_name_schema, point_3d_schema, point_2d_fullname_schema) + \
assert_true(can_read?(point_2d_fullname_schema, read_schema)) + end
+
+ def can_read?(writer, reader)
+ Avro::SchemaCompatibility.can_read?(writer, reader)
+ end
+
+ def union_schema(*schemas)
+ schemas ||= []
+ Avro::Schema.parse("[#{schemas.map(&:to_s).join(',')}]")
+ end
+
+ Avro::Schema::PRIMITIVE_TYPES.each do |schema_type|
+ define_method("#{schema_type}_schema") do
+ Avro::Schema.parse("\"#{schema_type}\"")
+ end
+ end
+
+ def int_array_schema
+ Avro::Schema.parse('{"type":"array", "items":"int"}')
+ end
+
+ def long_array_schema
+ Avro::Schema.parse('{"type":"array", "items":"long"}')
+ end
+
+ def string_array_schema
+ Avro::Schema.parse('{"type":"array", "items":"string"}')
+ end
+
+ def int_map_schema
+ Avro::Schema.parse('{"type":"map", "values":"int"}')
+ end
+
+ def long_map_schema
+ Avro::Schema.parse('{"type":"map", "values":"long"}')
+ end
+
+ def string_map_schema
+ Avro::Schema.parse('{"type":"map", "values":"string"}')
+ end
+
+ def enum1_ab_schema
+ Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B"]}')
+ end
+
+ def enum1_abc_schema
+ Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}')
+ end
+
+ def enum1_bc_schema
+ Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["B","C"]}')
+ end
+
+ def enum2_ab_schema
+ Avro::Schema.parse('{"type":"enum", "name":"Enum2", "symbols":["A","B"]}')
+ end
+
+ def empty_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1"}')
+ end
+
+ def empty_record2_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record2"}')
+ end
+
+ def a_int_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"int"}]}') + end
+
+ def a_long_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"long"}]}') + end
+
+ def a_int_b_int_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"int"}, {"name":"b", "type":"int"}]}') + end
+
+ def a_dint_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"int", "default":0}]}') + end
+
+ def a_int_b_dint_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"int"}, {"name":"b", "type":"int", "default":0}]}') + end
+
+ def a_dint_b_dint_record1_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record1", "fields":[{"name":"a", \
"type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}') + end
+
+ def int_list_record_schema
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type":"record", "name":"List", "fields": [
+ {"name": "head", "type": "int"},
+ {"name": "tail", "type": "List"}
+ ]}
+ SCHEMA
+ end
+
+ def long_list_record_schema
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type":"record", "name":"List", "fields": [
+ {"name": "head", "type": "long"},
+ {"name": "tail", "type": "List"}
+ ]}
+ SCHEMA
+ end
+
+ def empty_union_schema
+ union_schema
+ end
+
+ def null_union_schema
+ union_schema(null_schema)
+ end
+
+ def int_union_schema
+ union_schema(int_schema)
+ end
+
+ def long_union_schema
+ union_schema(long_schema)
+ end
+
+ def string_union_schema
+ union_schema(string_schema)
+ end
+
+ def int_string_union_schema
+ union_schema(int_schema, string_schema)
+ end
+
+ def string_int_union_schema
+ union_schema(string_schema, int_schema)
+ end
+end
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic