[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: svn commit: r1557225 [2/3] - in /avro/trunk: ./ lang/py3/ lang/py3/avro/ lang/py3/avro/tests/ lang/p
From: cutting () apache ! org
Date: 2014-01-10 19:11:43
Message-ID: 20140110191144.8B8DF2388A5B () eris ! apache ! org
[Download RAW message or body]
Added: avro/trunk/lang/py3/avro/protocol.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/protocol.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/protocol.py (added)
+++ avro/trunk/lang/py3/avro/protocol.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,402 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Protocol implementation.
+"""
+
+
+import hashlib
+import json
+import logging
+
+from avro import schema
+
+ImmutableDict = schema.ImmutableDict
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Allowed top-level schemas in a protocol:
+VALID_TYPE_SCHEMA_TYPES = frozenset(['enum', 'record', 'error', 'fixed'])
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class ProtocolParseException(schema.AvroException):
+ """Error while parsing a JSON protocol descriptor."""
+ pass
+
+
+# ------------------------------------------------------------------------------
+# Base Classes
+
+
+class Protocol(object):
+ """An application protocol."""
+
+ @staticmethod
+ def _ParseTypeDesc(type_desc, names):
+ type_schema = schema.SchemaFromJSONData(type_desc, names=names)
+ if type_schema.type not in VALID_TYPE_SCHEMA_TYPES:
+ raise ProtocolParseException(
+ 'Invalid type %r in protocol %r: '
+ 'protocols can only declare types %s.'
+ % (type_schema, avro_name, ','.join(VALID_TYPE_SCHEMA_TYPES)))
+ return type_schema
+
+ @staticmethod
+ def _ParseMessageDesc(name, message_desc, names):
+ """Parses a protocol message descriptor.
+
+ Args:
+ name: Name of the message.
+ message_desc: Descriptor of the message.
+ names: Tracker of the named Avro schema.
+ Returns:
+ The parsed protocol message.
+ Raises:
+ ProtocolParseException: if the descriptor is invalid.
+ """
+ request_desc = message_desc.get('request')
+ if request_desc is None:
+ raise ProtocolParseException(
+ 'Invalid message descriptor with no "request": %r.' % message_desc)
+ request_schema = Message._ParseRequestFromJSONDesc(
+ request_desc=request_desc,
+ names=names,
+ )
+
+ response_desc = message_desc.get('response')
+ if response_desc is None:
+ raise ProtocolParseException(
+ 'Invalid message descriptor with no "response": %r.' % message_desc)
+ response_schema = Message._ParseResponseFromJSONDesc(
+ response_desc=response_desc,
+ names=names,
+ )
+
+ # Errors are optional:
+ errors_desc = message_desc.get('errors', tuple())
+ error_union_schema = Message._ParseErrorsFromJSONDesc(
+ errors_desc=errors_desc,
+ names=names,
+ )
+
+ return Message(
+ name=name,
+ request=request_schema,
+ response=response_schema,
+ errors=error_union_schema,
+ )
+
+ @staticmethod
+ def _ParseMessageDescMap(message_desc_map, names):
+ for name, message_desc in message_desc_map.items():
+ yield Protocol._ParseMessageDesc(
+ name=name,
+ message_desc=message_desc,
+ names=names,
+ )
+
+ def __init__(
+ self,
+ name,
+ namespace=None,
+ types=tuple(),
+ messages=tuple(),
+ ):
+ """Initializes a new protocol object.
+
+ Args:
+ name: Protocol name (absolute or relative).
+ namespace: Optional explicit namespace (if name is relative).
+ types: Collection of types in the protocol.
+ messages: Collection of messages in the protocol.
+ """
+ self._avro_name = schema.Name(name=name, namespace=namespace)
+ self._fullname = self._avro_name.fullname
+ self._name = self._avro_name.simple_name
+ self._namespace = self._avro_name.namespace
+
+ self._props = {}
+ self._props['name'] = self._name
+ if self._namespace:
+ self._props['namespace'] = self._namespace
+
+ self._names = schema.Names(default_namespace=self._namespace)
+
+ self._types = tuple(types)
+ # Map: type full name -> type schema
+ self._type_map = (
+ ImmutableDict((type.fullname, type) for type in self._types))
+ # This assertion cannot fail unless we don't track named schemas properly:
+ assert (len(self._types) == len(self._type_map)), (
+ 'Type list %r does not match type map: %r'
+ % (self._types, self._type_map))
+ # TODO: set props['types']
+
+ self._messages = tuple(messages)
+
+ # Map: message name -> Message
+ # Note that message names are simple names unique within the protocol.
+ self._message_map = ImmutableDict(
+ items=((message.name, message) for message in self._messages))
+ if len(self._messages) != len(self._message_map):
+ raise ProtocolParseException(
+ 'Invalid protocol %s with duplicate message name: %r'
+ % (self._avro_name, self._messages))
+ # TODO: set props['messages']
+
+ self._md5 = hashlib.md5(str(self).encode('utf-8')).digest()
+
+ @property
+ def name(self):
+ """Returns: the simple name of the protocol."""
+ return self._name
+
+ @property
+ def namespace(self):
+ """Returns: the namespace this protocol belongs to."""
+ return self._namespace
+
+ @property
+ def fullname(self):
+ """Returns: the fully qualified name of this protocol."""
+ return self._fullname
+
+ @property
+ def types(self):
+ """Returns: the collection of types declared in this protocol."""
+ return self._types
+
+ @property
+ def type_map(self):
+ """Returns: the map of types in this protocol, indexed by their full name."""
+ return self._type_map
+
+ @property
+ def messages(self):
+ """Returns: the collection of messages declared in this protocol."""
+ return self._messages
+
+ @property
+ def message_map(self):
+ """Returns: the map of messages in this protocol, indexed by their name."""
+ return self._message_map
+
+ @property
+ def md5(self):
+ return self._md5
+
+ @property
+ def props(self):
+ return self._props
+
+ def to_json(self):
+ to_dump = {}
+ to_dump['protocol'] = self.name
+ names = schema.Names(default_namespace=self.namespace)
+ if self.namespace:
+ to_dump['namespace'] = self.namespace
+ if self.types:
+ to_dump['types'] = [ t.to_json(names) for t in self.types ]
+ if self.messages:
+ messages_dict = {}
+ for name, body in self.message_map.items():
+ messages_dict[name] = body.to_json(names)
+ to_dump['messages'] = messages_dict
+ return to_dump
+
+ def __str__(self):
+ return json.dumps(self.to_json())
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class Message(object):
+ """A Protocol message."""
+
+ @staticmethod
+ def _ParseRequestFromJSONDesc(request_desc, names):
+ """Parses the request descriptor of a protocol message.
+
+ Args:
+ request_desc: Descriptor of the message request.
+ This is a list of fields that defines an unnamed record.
+ names: Tracker for named Avro schemas.
+ Returns:
+ The parsed request schema, as an unnamed record.
+ """
+ fields = schema.RecordSchema._MakeFieldList(request_desc, names=names)
+ return schema.RecordSchema(
+ name=None,
+ namespace=None,
+ fields=fields,
+ names=names,
+ record_type=schema.REQUEST,
+ )
+
+ @staticmethod
+ def _ParseResponseFromJSONDesc(response_desc, names):
+ """Parses the response descriptor of a protocol message.
+
+ Args:
+ response_desc: Descriptor of the message response.
+ This is an arbitrary Avro schema descriptor.
+ Returns:
+ The parsed response schema.
+ """
+ return schema.SchemaFromJSONData(response_desc, names=names)
+
+ @staticmethod
+ def _ParseErrorsFromJSONDesc(errors_desc, names):
+ """Parses the errors descriptor of a protocol message.
+
+ Args:
+ errors_desc: Descriptor of the errors thrown by the protocol message.
+ This is a list of error types understood as an implicit union.
+ Each error type is an arbitrary Avro schema.
+ names: Tracker for named Avro schemas.
+ Returns:
+ The parsed ErrorUnionSchema.
+ """
+ error_union_desc = {
+ 'type': schema.ERROR_UNION,
+ 'declared_errors': errors_desc,
+ }
+ return schema.SchemaFromJSONData(error_union_desc, names=names)
+
+ def __init__(self, name, request, response, errors=None):
+ self._name = name
+
+ self._props = {}
+ # TODO: set properties
+ self._request = request
+ self._response = response
+ self._errors = errors
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def request(self):
+ return self._request
+
+ @property
+ def response(self):
+ return self._response
+
+ @property
+ def errors(self):
+ return self._errors
+
+ def props(self):
+ return self._props
+
+ def __str__(self):
+ return json.dumps(self.to_json())
+
+ def to_json(self, names=None):
+ if names is None:
+ names = schema.Names()
+ to_dump = {}
+ to_dump['request'] = self.request.to_json(names)
+ to_dump['response'] = self.response.to_json(names)
+ if self.errors:
+ to_dump['errors'] = self.errors.to_json(names)
+ return to_dump
+
+ def __eq__(self, that):
+ return self.name == that.name and self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+
+
+def ProtocolFromJSONData(json_data):
+ """Builds an Avro Protocol from its JSON descriptor.
+
+ Args:
+ json_data: JSON data representing the descriptor of the Avro protocol.
+ Returns:
+ The Avro Protocol parsed from the JSON descriptor.
+ Raises:
+ ProtocolParseException: if the descriptor is invalid.
+ """
+ if type(json_data) != dict:
+ raise ProtocolParseException(
+ 'Invalid JSON descriptor for an Avro protocol: %r' % json_data)
+
+ name = json_data.get('protocol')
+ if name is None:
+ raise ProtocolParseException(
+ 'Invalid protocol descriptor with no "name": %r' % json_data)
+
+ # Namespace is optional
+ namespace = json_data.get('namespace')
+
+ avro_name = schema.Name(name=name, namespace=namespace)
+ names = schema.Names(default_namespace=avro_name.namespace)
+
+ type_desc_list = json_data.get('types', tuple())
+ types = tuple(map(
+ lambda desc: Protocol._ParseTypeDesc(desc, names=names),
+ type_desc_list))
+
+ message_desc_map = json_data.get('messages', dict())
+ messages = tuple(Protocol._ParseMessageDescMap(message_desc_map, names=names))
+
+ return Protocol(
+ name=name,
+ namespace=namespace,
+ types=types,
+ messages=messages,
+ )
+
+
+def Parse(json_string):
+ """Constructs a Protocol from its JSON descriptor in text form.
+
+ Args:
+ json_string: String representation of the JSON descriptor of the protocol.
+ Returns:
+ The parsed protocol.
+ Raises:
+ ProtocolParseException: on JSON parsing error,
+ or if the JSON descriptor is invalid.
+ """
+ try:
+ json_data = json.loads(json_string)
+ except Exception as exn:
+ raise ProtocolParseException(
+ 'Error parsing protocol from JSON: %r. '
+ 'Error message: %r.'
+ % (json_string, exn))
+
+ return ProtocolFromJSONData(json_data)
+
Propchange: avro/trunk/lang/py3/avro/protocol.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/schema.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/schema.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/schema.py (added)
+++ avro/trunk/lang/py3/avro/schema.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,1283 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Representation of Avro schemas.
+
+A schema may be one of:
+ - A record, mapping field names to field value data;
+ - An error, equivalent to a record;
+ - An enum, containing one of a small set of symbols;
+ - An array of values, all of the same schema;
+ - A map containing string/value pairs, each of a declared schema;
+ - A union of other schemas;
+ - A fixed sized binary object;
+ - A unicode string;
+ - A sequence of bytes;
+ - A 32-bit signed int;
+ - A 64-bit signed long;
+ - A 32-bit floating-point float;
+ - A 64-bit floating-point double;
+ - A boolean;
+ - Null.
+"""
+
+
+import abc
+import collections
+import json
+import logging
+import re
+
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Log level more verbose than DEBUG, INFO , etc.
+DEBUG_VERBOSE=5
+
+
+NULL = 'null'
+BOOLEAN = 'boolean'
+STRING = 'string'
+BYTES = 'bytes'
+INT = 'int'
+LONG = 'long'
+FLOAT = 'float'
+DOUBLE = 'double'
+FIXED = 'fixed'
+ENUM = 'enum'
+RECORD = 'record'
+ERROR = 'error'
+ARRAY = 'array'
+MAP = 'map'
+UNION = 'union'
+
+# Request and error unions are part of Avro protocols:
+REQUEST = 'request'
+ERROR_UNION = 'error_union'
+
+PRIMITIVE_TYPES = frozenset([
+ NULL,
+ BOOLEAN,
+ STRING,
+ BYTES,
+ INT,
+ LONG,
+ FLOAT,
+ DOUBLE,
+])
+
+NAMED_TYPES = frozenset([
+ FIXED,
+ ENUM,
+ RECORD,
+ ERROR,
+])
+
+VALID_TYPES = frozenset.union(
+ PRIMITIVE_TYPES,
+ NAMED_TYPES,
+ [
+ ARRAY,
+ MAP,
+ UNION,
+ REQUEST,
+ ERROR_UNION,
+ ],
+)
+
+SCHEMA_RESERVED_PROPS = frozenset([
+ 'type',
+ 'name',
+ 'namespace',
+ 'fields', # Record
+ 'items', # Array
+ 'size', # Fixed
+ 'symbols', # Enum
+ 'values', # Map
+ 'doc',
+])
+
+FIELD_RESERVED_PROPS = frozenset([
+ 'default',
+ 'name',
+ 'doc',
+ 'order',
+ 'type',
+])
+
+VALID_FIELD_SORT_ORDERS = frozenset([
+ 'ascending',
+ 'descending',
+ 'ignore',
+])
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class Error(Exception):
+ """Base class for errors in this module."""
+ pass
+
+
+class AvroException(Error):
+ """Generic Avro schema error."""
+ pass
+
+
+class SchemaParseException(AvroException):
+ """Error while parsing a JSON schema descriptor."""
+ pass
+
+
+# ------------------------------------------------------------------------------
+
+
+class ImmutableDict(dict):
+ """Dictionary guaranteed immutable.
+
+ All mutations raise an exception.
+ Behaves exactly as a dict otherwise.
+ """
+
+ def __init__(self, items=None, **kwargs):
+ if items is not None:
+ super(ImmutableDict, self).__init__(items)
+ assert (len(kwargs) == 0)
+ else:
+ super(ImmutableDict, self).__init__(**kwargs)
+
+ def __setitem__(self, key, value):
+ raise Exception(
+ 'Attempting to map key %r to value %r in ImmutableDict %r'
+ % (key, value, self))
+
+ def __delitem__(self, key):
+ raise Exception(
+ 'Attempting to remove mapping for key %r in ImmutableDict %r'
+ % (key, self))
+
+ def clear(self):
+ raise Exception('Attempting to clear ImmutableDict %r' % self)
+
+ def update(self, items=None, **kwargs):
+ raise Exception(
+ 'Attempting to update ImmutableDict %r with items=%r, kwargs=%r'
+ % (self, args, kwargs))
+
+ def pop(self, key, default=None):
+ raise Exception(
+ 'Attempting to pop key %r from ImmutableDict %r' % (key, self))
+
+ def popitem(self):
+ raise Exception('Attempting to pop item from ImmutableDict %r' % self)
+
+
+# ------------------------------------------------------------------------------
+
+
+class Schema(object, metaclass«c.ABCMeta):
+ """Abstract base class for all Schema classes."""
+
+ def __init__(self, type, other_props=None):
+ """Initializes a new schema object.
+
+ Args:
+ type: Type of the schema to initialize.
+ other_props: Optional dictionary of additional properties.
+ """
+ if type not in VALID_TYPES:
+ raise SchemaParseException('%r is not a valid Avro type.' % type)
+
+ # All properties of this schema, as a map: property name -> property value
+ self._props = {}
+
+ self._props['type'] = type
+ self._type = type
+
+ if other_props:
+ self._props.update(other_props)
+
+ @property
+ def name(self):
+ """Returns: the simple name of this schema."""
+ return self._props['name']
+
+ @property
+ def fullname(self):
+ """Returns: the fully qualified name of this schema."""
+ # By default, the full name is the simple name.
+ # Named schemas override this behavior to include the namespace.
+ return self.name
+
+ @property
+ def namespace(self):
+ """Returns: the namespace this schema belongs to, if any, or None."""
+ return self._props.get('namespace', None)
+
+ @property
+ def type(self):
+ """Returns: the type of this schema."""
+ return self._type
+
+ @property
+ def doc(self):
+ """Returns: the documentation associated to this schema, if any, or None."""
+ return self._props.get('doc', None)
+
+ @property
+ def props(self):
+ """Reports all the properties of this schema.
+
+ Includes all properties, reserved and non reserved.
+ JSON properties of this schema are directly generated from this dict.
+
+ Returns:
+ A read-only dictionary of properties associated to this schema.
+ """
+ return ImmutableDict(self._props)
+
+ @property
+ def other_props(self):
+ """Returns: the dictionary of non-reserved properties."""
+ return dict(FilterKeysOut(items=self._props, keys=SCHEMA_RESERVED_PROPS))
+
+ def __str__(self):
+ """Returns: the JSON representation of this schema."""
+ return json.dumps(self.to_json())
+
+ @abc.abstractmethod
+ def to_json(self, names):
+ """Converts the schema object into its AVRO specification representation.
+
+ Schema types that have names (records, enums, and fixed) must
+ be aware of not re-defining schemas that are already listed
+ in the parameter names.
+ """
+ raise Exception('Cannot run abstract method.')
+
+
+# ------------------------------------------------------------------------------
+
+
+_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*')
+
+_RE_FULL_NAME = re.compile(
+ r'^'
+ r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*' # optional namespace
+ r'([A-Za-z_][A-Za-z0-9_]*)' # name
+ r'$'
+)
+
+class Name(object):
+ """Representation of an Avro name."""
+
+ def __init__(self, name, namespace=None):
+ """Parses an Avro name.
+
+ Args:
+ name: Avro name to parse (relative or absolute).
+ namespace: Optional explicit namespace if the name is relative.
+ """
+ # Normalize: namespace is always defined as a string, possibly empty.
+ if namespace is None: namespace = ''
+
+ if '.' in name:
+ # name is absolute, namespace is ignored:
+ self._fullname = name
+
+ match = _RE_FULL_NAME.match(self._fullname)
+ if match is None:
+ raise SchemaParseException(
+ 'Invalid absolute schema name: %r.' % self._fullname)
+
+ self._name = match.group(1)
+ self._namespace = self._fullname[:-(len(self._name) + 1)]
+
+ else:
+ # name is relative, combine with explicit namespace:
+ self._name = name
+ self._namespace = namespace
+ self._fullname = '%s.%s' % (self._namespace, self._name)
+
+ # Validate the fullname:
+ if _RE_FULL_NAME.match(self._fullname) is None:
+ raise SchemaParseException(
+ 'Invalid schema name %r infered from name %r and namespace %r.'
+ % (self._fullname, self._name, self._namespace))
+
+ def __eq__(self, other):
+ if not isinstance(other, Name):
+ return False
+ return (self.fullname == other.fullname)
+
+ @property
+ def simple_name(self):
+ """Returns: the simple name part of this name."""
+ return self._name
+
+ @property
+ def namespace(self):
+ """Returns: this name's namespace, possible the empty string."""
+ return self._namespace
+
+ @property
+ def fullname(self):
+ """Returns: the full name (always contains a period '.')."""
+ return self._fullname
+
+
+# ------------------------------------------------------------------------------
+
+
+class Names(object):
+ """Tracks Avro named schemas and default namespace during parsing."""
+
+ def __init__(self, default_namespace=None, names=None):
+ """Initializes a new name tracker.
+
+ Args:
+ default_namespace: Optional default namespace.
+ names: Optional initial mapping of known named schemas.
+ """
+ if names is None:
+ names = {}
+ self._names = names
+ self._default_namespace = default_namespace
+
+ @property
+ def names(self):
+ """Returns: the mapping of known named schemas."""
+ return self._names
+
+ @property
+ def default_namespace(self):
+ """Returns: the default namespace, if any, or None."""
+ return self._default_namespace
+
+ def NewWithDefaultNamespace(self, namespace):
+ """Creates a new name tracker from this tracker, but with a new default ns.
+
+ Args:
+ namespace: New default namespace to use.
+ Returns:
+ New name tracker with the specified default namespace.
+ """
+ return Names(names=self._names, default_namespace=namespace)
+
+ def GetName(self, name, namespace=None):
+ """Resolves the Avro name according to this name tracker's state.
+
+ Args:
+ name: Name to resolve (absolute or relative).
+ namespace: Optional explicit namespace.
+ Returns:
+ The specified name, resolved according to this tracker.
+ """
+ if namespace is None: namespace = self._default_namespace
+ return Name(name=name, namespace=namespace)
+
+ def has_name(self, name, namespace=None):
+ avro_name = self.GetName(name=name, namespace=namespace)
+ return avro_name.fullname in self._names
+
+ def get_name(self, name, namespace=None):
+ avro_name = self.GetName(name=name, namespace=namespace)
+ return self._names.get(avro_name.fullname, None)
+
+ def GetSchema(self, name, namespace=None):
+ """Resolves an Avro schema by name.
+
+ Args:
+ name: Name (relative or absolute) of the Avro schema to look up.
+ namespace: Optional explicit namespace.
+ Returns:
+ The schema with the specified name, if any, or None.
+ """
+ avro_name = self.GetName(name=name, namespace=namespace)
+ return self._names.get(avro_name.fullname, None)
+
+ def prune_namespace(self, properties):
+ """given a properties, return properties with namespace removed if
+ it matches the own default namespace
+ """
+ if self.default_namespace is None:
+ # I have no default -- no change
+ return properties
+ if 'namespace' not in properties:
+ # he has no namespace - no change
+ return properties
+ if properties['namespace'] != self.default_namespace:
+ # we're different - leave his stuff alone
+ return properties
+ # we each have a namespace and it's redundant. delete his.
+ prunable = properties.copy()
+ del(prunable['namespace'])
+ return prunable
+
+ def Register(self, schema):
+ """Registers a new named schema in this tracker.
+
+ Args:
+ schema: Named Avro schema to register in this tracker.
+ """
+ if schema.fullname in VALID_TYPES:
+ raise SchemaParseException(
+ '%s is a reserved type name.' % schema.fullname)
+ if schema.fullname in self.names:
+ raise SchemaParseException(
+ 'Avro name %r already exists.' % schema.fullname)
+
+ logging.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname)
+ self._names[schema.fullname] = schema
+
+
+# ------------------------------------------------------------------------------
+
+
+class NamedSchema(Schema):
+ """Abstract base class for named schemas.
+
+ Named schemas are enumerated in NAMED_TYPES.
+ """
+
+ def __init__(
+ self,
+ type,
+ name,
+ namespace=None,
+ names=None,
+ other_props=None,
+ ):
+ """Initializes a new named schema object.
+
+ Args:
+ type: Type of the named schema.
+ name: Name (absolute or relative) of the schema.
+ namespace: Optional explicit namespace if name is relative.
+ names: Tracker to resolve and register Avro names.
+ other_props: Optional map of additional properties of the schema.
+ """
+ assert (type in NAMED_TYPES), ('Invalid named type: %r' % type)
+ self._avro_name = names.GetName(name=name, namespace=namespace)
+
+ super(NamedSchema, self).__init__(type, other_props)
+
+ names.Register(self)
+
+ self._props['name'] = self.name
+ if self.namespace:
+ self._props['namespace'] = self.namespace
+
+ @property
+ def avro_name(self):
+ """Returns: the Name object describing this schema's name."""
+ return self._avro_name
+
+ @property
+ def name(self):
+ return self._avro_name.simple_name
+
+ @property
+ def namespace(self):
+ return self._avro_name.namespace
+
+ @property
+ def fullname(self):
+ return self._avro_name.fullname
+
+ def name_ref(self, names):
+ """Reports this schema name relative to the specified name tracker.
+
+ Args:
+ names: Avro name tracker to relativise this schema name against.
+ Returns:
+ This schema name, relativised against the specified name tracker.
+ """
+ if self.namespace == names.default_namespace:
+ return self.name
+ else:
+ return self.fullname
+
+
+# ------------------------------------------------------------------------------
+
+
+_NO_DEFAULT = object()
+
+
+class Field(object):
+ """Representation of the schema of a field in a record."""
+
+ def __init__(
+ self,
+ type,
+ name,
+ index,
+ has_default,
+ default=_NO_DEFAULT,
+ order=None,
+ names=None,
+ doc=None,
+ other_props=None
+ ):
+ """Initializes a new Field object.
+
+ Args:
+ type: Avro schema of the field.
+ name: Name of the field.
+ index: 0-based position of the field.
+ has_default:
+ default:
+ order:
+ names:
+ doc:
+ other_props:
+ """
+ if (not isinstance(name, str)) or (len(name) == 0):
+ raise SchemaParseException('Invalid record field name: %r.' % name)
+ if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS):
+ raise SchemaParseException('Invalid record field order: %r.' % order)
+
+ # All properties of this record field:
+ self._props = {}
+
+ self._has_default = has_default
+ if other_props:
+ self._props.update(other_props)
+
+ self._index = index
+ self._type = self._props['type'] = type
+ self._name = self._props['name'] = name
+
+ # TODO: check to ensure default is valid
+ if has_default:
+ self._props['default'] = default
+
+ if order is not None:
+ self._props['order'] = order
+
+ if doc is not None:
+ self._props['doc'] = doc
+
+ @property
+ def type(self):
+ """Returns: the schema of this field."""
+ return self._type
+
+ @property
+ def name(self):
+ """Returns: this field name."""
+ return self._name
+
+ @property
+ def index(self):
+ """Returns: the 0-based index of this field in the record."""
+ return self._index
+
+ @property
+ def default(self):
+ return self._props['default']
+
+ @property
+ def has_default(self):
+ return self._has_default
+
+ @property
+ def order(self):
+ return self._props.get('order', None)
+
+ @property
+ def doc(self):
+ return self._props.get('doc', None)
+
+ @property
+ def props(self):
+ return self._props
+
+ @property
+ def other_props(self):
+ return FilterKeysOut(items=self._props, keys=FIELD_RESERVED_PROPS)
+
+ def __str__(self):
+ return json.dumps(self.to_json())
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ to_dump = self.props.copy()
+ to_dump['type'] = self.type.to_json(names)
+ return to_dump
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Primitive Types
+
+
+class PrimitiveSchema(Schema):
+ """Schema of a primitive Avro type.
+
+ Valid primitive types are defined in PRIMITIVE_TYPES.
+ """
+
+ def __init__(self, type):
+ """Initializes a new schema object for the specified primitive type.
+
+ Args:
+ type: Type of the schema to construct. Must be primitive.
+ """
+ if type not in PRIMITIVE_TYPES:
+ raise AvroException('%r is not a valid primitive type.' % type)
+ super(PrimitiveSchema, self).__init__(type)
+
+ @property
+ def name(self):
+ """Returns: the simple name of this schema."""
+ # The name of a primitive type is the type itself.
+ return self.type
+
+ def to_json(self, names=None):
+ if len(self.props) == 1:
+ return self.fullname
+ else:
+ return self.props
+
+ def __eq__(self, that):
+ return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (non-recursive)
+
+
+class FixedSchema(NamedSchema):
+ def __init__(
+ self,
+ name,
+ namespace,
+ size,
+ names=None,
+ other_props=None,
+ ):
+ # Ensure valid ctor args
+ if not isinstance(size, int):
+ fail_msg = 'Fixed Schema requires a valid integer for size property.'
+ raise AvroException(fail_msg)
+
+ super(FixedSchema, self).__init__(
+ type=FIXED,
+ name=name,
+ namespace=namespace,
+ names=names,
+ other_props=other_props,
+ )
+ self._props['size'] = size
+
+ @property
+ def size(self):
+ """Returns: the size of this fixed schema, in bytes."""
+ return self._props['size']
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ if self.fullname in names.names:
+ return self.name_ref(names)
+ else:
+ names.names[self.fullname] = self
+ return names.prune_namespace(self.props)
+
+ def __eq__(self, that):
+ return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+
+
+class EnumSchema(NamedSchema):
+ def __init__(
+ self,
+ name,
+ namespace,
+ symbols,
+ names=None,
+ doc=None,
+ other_props=None,
+ ):
+ """Initializes a new enumeration schema object.
+
+ Args:
+ name: Simple name of this enumeration.
+ namespace: Optional namespace.
+ symbols: Ordered list of symbols defined in this enumeration.
+ names:
+ doc:
+ other_props:
+ """
+ symbols = tuple(symbols)
+ symbol_set = frozenset(symbols)
+ if (len(symbol_set) != len(symbols)
+ or not all(map(lambda symbol: isinstance(symbol, str), symbols))):
+ raise AvroException(
+ 'Invalid symbols for enum schema: %r.' % (symbols,))
+
+ super(EnumSchema, self).__init__(
+ type=ENUM,
+ name=name,
+ namespace=namespace,
+ names=names,
+ other_props=other_props,
+ )
+
+ self._props['symbols'] = tuple(sorted(symbol_set))
+ if doc is not None:
+ self._props['doc'] = doc
+
+ @property
+ def symbols(self):
+ """Returns: the symbols defined in this enum."""
+ return self._props['symbols']
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ if self.fullname in names.names:
+ return self.name_ref(names)
+ else:
+ names.names[self.fullname] = self
+ return names.prune_namespace(self.props)
+
+ def __eq__(self, that):
+ return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (recursive)
+
+
+class ArraySchema(Schema):
+ """Schema of an array."""
+
+ def __init__(self, items, other_props=None):
+ """Initializes a new array schema object.
+
+ Args:
+ items: Avro schema of the array items.
+ other_props:
+ """
+ super(ArraySchema, self).__init__(
+ type=ARRAY,
+ other_props=other_props,
+ )
+ self._items_schema = items
+ self._props['items'] = items
+
+ @property
+ def items(self):
+ """Returns: the schema of the items in this array."""
+ return self._items_schema
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ to_dump = self.props.copy()
+ item_schema = self.items
+ to_dump['items'] = item_schema.to_json(names)
+ return to_dump
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class MapSchema(Schema):
+ """Schema of a map."""
+
+ def __init__(self, values, other_props=None):
+ """Initializes a new map schema object.
+
+ Args:
+ values: Avro schema of the map values.
+ other_props:
+ """
+ super(MapSchema, self).__init__(
+ type=MAP,
+ other_props=other_props,
+ )
+ self._values_schema = values
+ self._props['values'] = values
+
+ @property
+ def values(self):
+ """Returns: the schema of the values in this map."""
+ return self._values_schema
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ to_dump = self.props.copy()
+ to_dump['values'] = self.values.to_json(names)
+ return to_dump
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class UnionSchema(Schema):
+ """Schema of a union."""
+
+ def __init__(self, schemas):
+ """Initializes a new union schema object.
+
+ Args:
+ schemas: Ordered collection of schema branches in the union.
+ """
+ super(UnionSchema, self).__init__(type=UNION)
+ self._schemas = tuple(schemas)
+
+ # Validate the schema branches:
+
+ # All named schema names are unique:
+ named_branches = tuple(
+ filter(lambda schema: schema.type in NAMED_TYPES, self._schemas))
+ unique_names = frozenset(map(lambda schema: schema.fullname, named_branches))
+ if len(unique_names) != len(named_branches):
+ raise AvroException(
+ 'Invalid union branches with duplicate schema name:%s'
+ % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+
+ # Types are unique within unnamed schemas, and union is not allowed:
+ unnamed_branches = tuple(
+ filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas))
+ unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches))
+ if UNION in unique_types:
+ raise AvroException(
+ 'Invalid union branches contain other unions:%s'
+ % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+ if len(unique_types) != len(unnamed_branches):
+ raise AvroException(
+ 'Invalid union branches with duplicate type:%s'
+ % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+
+ @property
+ def schemas(self):
+ """Returns: the ordered list of schema branches in the union."""
+ return self._schemas
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ to_dump = []
+ for schema in self.schemas:
+ to_dump.append(schema.to_json(names))
+ return to_dump
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class ErrorUnionSchema(UnionSchema):
+ """Schema representing the declared errors of a protocol message."""
+
+ def __init__(self, schemas):
+ """Initializes an error-union schema.
+
+ Args:
+ schema: collection of error schema.
+ """
+ # TODO: check that string isn't already listed explicitly as an error.
+ # Prepend "string" to handle system errors
+ schemas = [PrimitiveSchema(type=STRING)] + list(schemas)
+ super(ErrorUnionSchema, self).__init__(schemas=schemas)
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ to_dump = []
+ for schema in self.schemas:
+ # Don't print the system error schema
+ if schema.type == STRING: continue
+ to_dump.append(schema.to_json(names))
+ return to_dump
+
+
+# ------------------------------------------------------------------------------
+
+
+class RecordSchema(NamedSchema):
+ """Schema of a record."""
+
+ @staticmethod
+ def _MakeField(index, field_desc, names):
+ """Builds field schemas from a list of field JSON descriptors.
+
+ Args:
+ index: 0-based index of the field in the record.
+ field_desc: JSON descriptors of a record field.
+ names: Avro schema tracker.
+ Return:
+ The field schema.
+ """
+ field_schema = SchemaFromJSONData(
+ json_data=field_desc['type'],
+ names=names,
+ )
+ other_props = (
+ dict(FilterKeysOut(items=field_desc, keys=FIELD_RESERVED_PROPS)))
+ return Field(
+ type=field_schema,
+ name=field_desc['name'],
+ index=index,
+ has_default=('default' in field_desc),
+ default=field_desc.get('default', _NO_DEFAULT),
+ order=field_desc.get('order', None),
+ names=names,
+ doc=field_desc.get('doc', None),
+ other_props=other_props,
+ )
+
+ @staticmethod
+ def _MakeFieldList(field_desc_list, names):
+ """Builds field schemas from a list of field JSON descriptors.
+
+ Guarantees field name unicity.
+
+ Args:
+ field_desc_list: collection of field JSON descriptors.
+ names: Avro schema tracker.
+ Yields
+ Field schemas.
+ """
+ for index, field_desc in enumerate(field_desc_list):
+ yield RecordSchema._MakeField(index, field_desc, names)
+
+ @staticmethod
+ def _MakeFieldMap(fields):
+ """Builds the field map.
+
+ Guarantees field name unicity.
+
+ Args:
+ fields: iterable of field schema.
+ Returns:
+ A read-only map of field schemas, indexed by name.
+ """
+ field_map = {}
+ for field in fields:
+ if field.name in field_map:
+ raise SchemaParseException(
+ 'Duplicate field name %r in list %r.' % (field.name, field_desc_list))
+ field_map[field.name] = field
+ return ImmutableDict(field_map)
+
+ def __init__(
+ self,
+ name,
+ namespace,
+ fields=None,
+ make_fields=None,
+ names=None,
+ record_type=RECORD,
+ doc=None,
+ other_props=None
+ ):
+ """Initializes a new record schema object.
+
+ Args:
+ name: Name of the record (absolute or relative).
+ namespace: Optional namespace the record belongs to, if name is relative.
+ fields: collection of fields to add to this record.
+ Exactly one of fields or make_fields must be specified.
+ make_fields: function creating the fields that belong to the record.
+ The function signature is: make_fields(names) -> ordered field list.
+ Exactly one of fields or make_fields must be specified.
+ names:
+ record_type: Type of the record: one of RECORD, ERROR or REQUEST.
+ Protocol requests are not named.
+ doc:
+ other_props:
+ """
+ if record_type == REQUEST:
+ # Protocol requests are not named:
+ super(NamedSchema, self).__init__(
+ type=REQUEST,
+ other_props=other_props,
+ )
+ elif record_type in [RECORD, ERROR]:
+ # Register this record name in the tracker:
+ super(RecordSchema, self).__init__(
+ type=record_type,
+ name=name,
+ namespace=namespace,
+ names=names,
+ other_props=other_props,
+ )
+ else:
+ raise SchemaParseException(
+ 'Invalid record type: %r.' % record_type)
+
+ if record_type in [RECORD, ERROR]:
+ avro_name = names.GetName(name=name, namespace=namespace)
+ nested_names = names.NewWithDefaultNamespace(namespace=avro_name.namespace)
+ elif record_type == REQUEST:
+ # Protocol request has no name: no need to change default namespace:
+ nested_names = names
+
+ if fields is None:
+ fields = make_fields(names=nested_names)
+ else:
+ assert (make_fields is None)
+ self._fields = tuple(fields)
+
+ self._field_map = RecordSchema._MakeFieldMap(self._fields)
+
+ self._props['fields'] = fields
+ if doc is not None:
+ self._props['doc'] = doc
+
+ @property
+ def fields(self):
+ """Returns: the field schemas, as an ordered tuple."""
+ return self._fields
+
+ @property
+ def field_map(self):
+ """Returns: a read-only map of the field schemas index by field names."""
+ return self._field_map
+
+ def to_json(self, names=None):
+ if names is None:
+ names = Names()
+ # Request records don't have names
+ if self.type == REQUEST:
+ return [f.to_json(names) for f in self.fields]
+
+ if self.fullname in names.names:
+ return self.name_ref(names)
+ else:
+ names.names[self.fullname] = self
+
+ to_dump = names.prune_namespace(self.props.copy())
+ to_dump['fields'] = [f.to_json(names) for f in self.fields]
+ return to_dump
+
+ def __eq__(self, that):
+ to_cmp = json.loads(str(self))
+ return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Module functions
+
+
+def FilterKeysOut(items, keys):
+ """Filters a collection of (key, value) items.
+
+ Exclude any item whose key belongs to keys.
+
+ Args:
+ items: Dictionary of items to filter the keys out of.
+ keys: Keys to filter out.
+ Yields:
+ Filtered items.
+ """
+ for key, value in items.items():
+ if key in keys: continue
+ yield (key, value)
+
+
+# ------------------------------------------------------------------------------
+
+
+def _SchemaFromJSONString(json_string, names):
+ if json_string in PRIMITIVE_TYPES:
+ return PrimitiveSchema(type=json_string)
+ else:
+ # Look for a known named schema:
+ schema = names.GetSchema(name=json_string)
+ if schema is None:
+ raise SchemaParseException(
+ 'Unknown named schema %r, known names: %r.'
+ % (json_string, sorted(names.names)))
+ return schema
+
+
+def _SchemaFromJSONArray(json_array, names):
+ def MakeSchema(desc):
+ return SchemaFromJSONData(json_dataÞsc, names=names)
+ return UnionSchema(map(MakeSchema, json_array))
+
+
+def _SchemaFromJSONObject(json_object, names):
+ type = json_object.get('type')
+ if type is None:
+ raise SchemaParseException(
+ 'Avro schema JSON descriptor has no "type" property: %r' % json_object)
+
+ other_props = dict(
+ FilterKeysOut(items=json_object, keys=SCHEMA_RESERVED_PROPS))
+
+ if type in PRIMITIVE_TYPES:
+ # FIXME should not ignore other properties
+ return PrimitiveSchema(type)
+
+ elif type in NAMED_TYPES:
+ name = json_object.get('name')
+ namespace = json_object.get('namespace', names.default_namespace)
+ if type == FIXED:
+ size = json_object.get('size')
+ return FixedSchema(name, namespace, size, names, other_props)
+ elif type == ENUM:
+ symbols = json_object.get('symbols')
+ doc = json_object.get('doc')
+ return EnumSchema(name, namespace, symbols, names, doc, other_props)
+
+ elif type in [RECORD, ERROR]:
+ field_desc_list = json_object.get('fields', ())
+
+ def MakeFields(names):
+ return tuple(RecordSchema._MakeFieldList(field_desc_list, names))
+
+ return RecordSchema(
+ name=name,
+ namespace=namespace,
+ make_fields=MakeFields,
+ names=names,
+ record_type=type,
+ doc=json_object.get('doc'),
+ other_props=other_props,
+ )
+ else:
+ raise Exception('Internal error: unknown type %r.' % type)
+
+ elif type in VALID_TYPES:
+ # Unnamed, non-primitive Avro type:
+
+ if type == ARRAY:
+ items_desc = json_object.get('items')
+ if items_desc is None:
+ raise SchemaParseException(
+ 'Invalid array schema descriptor with no "items" : %r.'
+ % json_object)
+ return ArraySchema(
+ items=SchemaFromJSONData(items_desc, names),
+ other_props=other_props,
+ )
+
+ elif type == MAP:
+ values_desc = json_object.get('values')
+ if values_desc is None:
+ raise SchemaParseException(
+ 'Invalid map schema descriptor with no "values" : %r.'
+ % json_object)
+ return MapSchema(
+ values=SchemaFromJSONData(values_desc, names=names),
+ other_props=other_props,
+ )
+
+ elif type == ERROR_UNION:
+ error_desc_list = json_object.get('declared_errors')
+ assert (error_desc_list is not None)
+ error_schemas = map(
+ lambda desc: SchemaFromJSONData(desc, names=names),
+ error_desc_list)
+ return ErrorUnionSchema(schemas=error_schemas)
+
+ else:
+ raise Exception('Internal error: unknown type %r.' % type)
+
+ raise SchemaParseException(
+ 'Invalid JSON descriptor for an Avro schema: %r' % json_object)
+
+
+# Parsers for the JSON data types:
+_JSONDataParserTypeMap = {
+ str: _SchemaFromJSONString,
+ list: _SchemaFromJSONArray,
+ dict: _SchemaFromJSONObject,
+}
+
+
+def SchemaFromJSONData(json_data, names=None):
+ """Builds an Avro Schema from its JSON descriptor.
+
+ Args:
+ json_data: JSON data representing the descriptor of the Avro schema.
+ names: Optional tracker for Avro named schemas.
+ Returns:
+ The Avro schema parsed from the JSON descriptor.
+ Raises:
+ SchemaParseException: if the descriptor is invalid.
+ """
+ if names is None:
+ names = Names()
+
+ # Select the appropriate parser based on the JSON data type:
+ parser = _JSONDataParserTypeMap.get(type(json_data))
+ if parser is None:
+ raise SchemaParseException(
+ 'Invalid JSON descriptor for an Avro schema: %r.' % json_data)
+ return parser(json_data, names=names)
+
+
+# ------------------------------------------------------------------------------
+
+
+def Parse(json_string):
+ """Constructs a Schema from its JSON descriptor in text form.
+
+ Args:
+ json_string: String representation of the JSON descriptor of the schema.
+ Returns:
+ The parsed schema.
+ Raises:
+ SchemaParseException: on JSON parsing error,
+ or if the JSON descriptor is invalid.
+ """
+ try:
+ json_data = json.loads(json_string)
+ except Exception as exn:
+ raise SchemaParseException(
+ 'Error parsing schema from JSON: %r. '
+ 'Error message: %r.'
+ % (json_string, exn))
+
+ # Initialize the names object
+ names = Names()
+
+ # construct the Avro Schema object
+ return SchemaFromJSONData(json_data, names)
Propchange: avro/trunk/lang/py3/avro/schema.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/av_bench.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/av_bench.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/av_bench.py (added)
+++ avro/trunk/lang/py3/avro/tests/av_bench.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import random
+import string
+import sys
+import time
+
+import avro.datafile
+import avro.io
+import avro.schema
+
+
+TYPES = ('A', 'CNAME',)
+FILENAME = 'datafile.avr'
+
+
+def GenerateRandomName():
+ return ''.join(random.sample(string.ascii_lowercase, 15))
+
+
+def GenerateRandomIP():
+ return '%s.%s.%s.%s' % (
+ random.randint(0, 255),
+ random.randint(0, 255),
+ random.randint(0, 255),
+ random.randint(0, 255),
+ )
+
+
+def Write(nrecords):
+ """Writes a data file with the specified number of random records.
+
+ Args:
+ nrecords: Number of records to write.
+ """
+ schema_s = """
+ {
+ "type": "record",
+ "name": "Query",
+ "fields" : [
+ {"name": "query", "type": "string"},
+ {"name": "response", "type": "string"},
+ {"name": "type", "type": "string", "default": "A"}
+ ]
+ }
+ """
+ schema = avro.schema.Parse(schema_s)
+ writer = avro.io.DatumWriter(schema)
+
+ with open(FILENAME, 'wb') as out:
+ with avro.datafile.DataFileWriter(
+ out, writer, schema,
+ # codec='deflate'
+ ) as data_writer:
+ for _ in range(nrecords):
+ response = GenerateRandomIP()
+ query = GenerateRandomName()
+ type = random.choice(TYPES)
+ data_writer.append({
+ 'query': query,
+ 'response': response,
+ 'type': type,
+ })
+
+
+def Read(expect_nrecords):
+ """Reads the data file generated by Write()."""
+ with open(FILENAME, 'rb') as f:
+ reader = avro.io.DatumReader()
+ with avro.datafile.DataFileReader(f, reader) as file_reader:
+ nrecords = 0
+ for record in file_reader:
+ nrecords += 1
+ assert (nrecords == expect_nrecords), (
+ 'Expecting %d records, got %d.' % (expected_nrecords, nrecords))
+
+
+def Timing(f, *args):
+ s = time.time()
+ f(*args)
+ e = time.time()
+ return e - s
+
+
+def Main(args):
+ nrecords = int(args[1])
+ print('Write %0.4f' % Timing(Write, nrecords))
+ print('Read %0.4f' % Timing(Read, nrecords))
+
+
+if __name__ == '__main__':
+ log_formatter = logging.Formatter(
+ '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s')
+ logging.root.setLevel(logging.DEBUG)
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(log_formatter)
+ console_handler.setLevel(logging.DEBUG)
+ logging.root.addHandler(console_handler)
+
+ Main(sys.argv)
Propchange: avro/trunk/lang/py3/avro/tests/av_bench.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/gen_interop_data.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/gen_interop_data.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/gen_interop_data.py (added)
+++ avro/trunk/lang/py3/avro/tests/gen_interop_data.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+DATUM = {
+ 'intField': 12,
+ 'longField': 15234324,
+ 'stringField': 'hey',
+ 'boolField': True,
+ 'floatField': 1234.0,
+ 'doubleField': -1234.0,
+ 'bytesField': '12312adf',
+ 'nullField': None,
+ 'arrayField': [5.0, 0.0, 12.0],
+ 'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}},
+ 'unionField': 12.0,
+ 'enumField': 'C',
+ 'fixedField': b'1019181716151413',
+ 'recordField': {
+ 'label': 'blah',
+ 'children': [{'label': 'inner', 'children': []}],
+ },
+}
+
+
+if __name__ == "__main__":
+ interop_schema = schema.Parse(open(sys.argv[1], 'r').read())
+ writer = open(sys.argv[2], 'wb')
+ datum_writer = io.DatumWriter()
+ # NB: not using compression
+ dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema)
+ dfw.append(DATUM)
+ dfw.close()
Propchange: avro/trunk/lang/py3/avro/tests/gen_interop_data.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/run_tests.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/run_tests.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/run_tests.py (added)
+++ avro/trunk/lang/py3/avro/tests/run_tests.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Runs all tests.
+
+Usage:
+
+- Run tests from all modules:
+ ./run_tests.py discover [-v]
+
+- Run tests in a specific module:
+ ./run_tests.py test_schema [-v]
+
+- Run a specific test:
+ ./run_tests.py test_schema.TestSchema.testParse [-v]
+
+- Set logging level:
+ PYTHON_LOG_LEVEL=<log-level> ./run_tests.py ...
+ log-level 0 includes all logging.
+ log-level 10 includes debug logging.
+ log-level 20 includes info logging.
+
+- Command-line help:
+ ./run_tests.py -h
+ ./run_tests.py discover -h
+"""
+
+import logging
+import os
+import sys
+import unittest
+
+from avro.tests.test_datafile import *
+from avro.tests.test_datafile_interop import *
+from avro.tests.test_io import *
+from avro.tests.test_ipc import *
+from avro.tests.test_protocol import *
+from avro.tests.test_schema import *
+from avro.tests.test_script import *
+
+
+def SetupLogging():
+ log_level = int(os.environ.get('PYTHON_LOG_LEVEL', logging.INFO))
+
+ log_formatter = logging.Formatter(
+ '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s')
+ logging.root.handlers = list() # list.clear() only exists in python 3.3+
+ logging.root.setLevel(log_level)
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(log_formatter)
+ console_handler.setLevel(logging.DEBUG)
+ logging.root.addHandler(console_handler)
+
+
+SetupLogging()
+
+
+if __name__ == '__main__':
+ unittest.main()
Propchange: avro/trunk/lang/py3/avro/tests/run_tests.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/sample_http_client.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_client.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/sample_http_client.py (added)
+++ avro/trunk/lang/py3/avro/tests/sample_http_client.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+from avro import ipc
+from avro import protocol
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+ client = ipc.HTTPTransceiver(SERVER_HOST, SERVER_PORT)
+ return ipc.Requestor(protocol, client)
+
+if __name__ == '__main__':
+ if len(sys.argv) not in [4, 5]:
+ raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+ # client code - attach to the server and send a message
+ # fill in the Message record
+ message = dict()
+ message['to'] = sys.argv[1]
+ message['from'] = sys.argv[2]
+ message['body'] = sys.argv[3]
+
+ try:
+ num_messages = int(sys.argv[4])
+ except:
+ num_messages = 1
+
+ # build the parameters for the request
+ params = {}
+ params['message'] = message
+
+ # send the requests and print the result
+ for msg_count in range(num_messages):
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ result = requestor.request('send', params)
+ print("Result: " + result)
+
+ # try out a replay message
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ result = requestor.request('replay', dict())
+ print("Replay Result: " + result)
Propchange: avro/trunk/lang/py3/avro/tests/sample_http_client.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/sample_http_server.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_server.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/sample_http_server.py (added)
+++ avro/trunk/lang/py3/avro/tests/sample_http_server.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+ def __init__(self):
+ ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+ def invoke(self, message, request):
+ if message.name == 'send':
+ request_content = request['message']
+ response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+ request_content
+ return response
+ elif message.name == 'replay':
+ return 'replay'
+
+class MailHandler(BaseHTTPRequestHandler):
+ def do_POST(self):
+ self.responder = MailResponder()
+ call_request_reader = ipc.FramedReader(self.rfile)
+ call_request = call_request_reader.read_framed_message()
+ resp_body = self.responder.respond(call_request)
+ self.send_response(200)
+ self.send_header('Content-Type', 'avro/binary')
+ self.end_headers()
+ resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+ mail_server = HTTPServer(SERVER_ADDRESS, MailHandler)
+ mail_server.allow_reuse_address = True
+ mail_server.serve_forever()
Propchange: avro/trunk/lang/py3/avro/tests/sample_http_server.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/test_datafile.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_datafile.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+import unittest
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+# ------------------------------------------------------------------------------
+
+
+SCHEMAS_TO_VALIDATE = (
+ ('"null"', None),
+ ('"boolean"', True),
+ ('"string"', 'adsfasdf09809dsf-sf'),
+ ('"bytes"', b'12345abcd'),
+ ('"int"', 1234),
+ ('"long"', 1234),
+ ('"float"', 1234.0),
+ ('"double"', 1234.0),
+ ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
+ ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
+ ('{"type": "array", "items": "long"}', [1, 3, 2]),
+ ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+ ('["string", "null", "long"]', None),
+
+ ("""
+ {
+ "type": "record",
+ "name": "Test",
+ "fields": [{"name": "f", "type": "long"}]
+ }
+ """,
+ {'f': 5}),
+
+ ("""
+ {
+ "type": "record",
+ "name": "Lisp",
+ "fields": [{
+ "name": "value",
+ "type": [
+ "null",
+ "string",
+ {
+ "type": "record",
+ "name": "Cons",
+ "fields": [{"name": "car", "type": "Lisp"},
+ {"name": "cdr", "type": "Lisp"}]
+ }
+ ]
+ }]
+ }
+ """,
+ {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+)
+
+CODECS_TO_VALIDATE = ('null', 'deflate')
+
+try:
+ import snappy
+ CODECS_TO_VALIDATE += ('snappy',)
+except ImportError:
+ logging.info('Snappy not present, will skip testing it.')
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestDataFile(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls._temp_dir = (
+ tempfile.TemporaryDirectory(prefix=cls.__name__, suffix='.tmp'))
+ logging.debug('Created temporary directory: %s', cls._temp_dir.name)
+
+ @classmethod
+ def tearDownClass(cls):
+ logging.debug('Cleaning up temporary directory: %s', cls._temp_dir.name)
+ cls._temp_dir.cleanup()
+
+ def NewTempFile(self):
+ """Creates a new temporary file.
+
+ File is automatically cleaned up after test.
+
+ Returns:
+ The path of the new temporary file.
+ """
+ temp_file = tempfile.NamedTemporaryFile(
+ dir=self._temp_dir.name,
+ prefix='test',
+ suffix='.avro',
+ deleteúlse,
+ )
+ return temp_file.name
+
+ def testRoundTrip(self):
+ correct = 0
+ for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
+ for codec in CODECS_TO_VALIDATE:
+ file_path = self.NewTempFile()
+
+ # Write the datum this many times in the data file:
+ nitems = 10
+
+ logging.debug(
+ 'Performing round-trip with codec %r in file %s for example #%d\n'
+ 'Writing datum: %r using writer schema:\n%s',
+ codec, file_path, iexample,
+ datum, writer_schema)
+
+ logging.debug('Creating data file %r', file_path)
+ with open(file_path, 'wb') as writer:
+ datum_writer = io.DatumWriter()
+ schema_object = schema.Parse(writer_schema)
+ with datafile.DataFileWriter(
+ writer=writer,
+ datum_writerÚtum_writer,
+ writer_schema=schema_object,
+ codec=codec,
+ ) as dfw:
+ for _ in range(nitems):
+ dfw.append(datum)
+
+ logging.debug('Reading data from %r', file_path)
+ with open(file_path, 'rb') as reader:
+ datum_reader = io.DatumReader()
+ with datafile.DataFileReader(reader, datum_reader) as dfr:
+ round_trip_data = list(dfr)
+
+ logging.debug(
+ 'Round-trip data has %d items: %r',
+ len(round_trip_data), round_trip_data)
+
+ if ([datum] * nitems) == round_trip_data:
+ correct += 1
+ else:
+ logging.error(
+ 'Round-trip data does not match:\n'
+ 'Expect: %r\n'
+ 'Actual: %r',
+ [datum] * nitems,
+ round_trip_data)
+
+ self.assertEqual(
+ correct,
+ len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+
+ def testAppend(self):
+ correct = 0
+ for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
+ for codec in CODECS_TO_VALIDATE:
+ file_path = self.NewTempFile()
+
+ logging.debug(
+ 'Performing append with codec %r in file %s for example #%d\n'
+ 'Writing datum: %r using writer schema:\n%s',
+ codec, file_path, iexample,
+ datum, writer_schema)
+
+ logging.debug('Creating data file %r', file_path)
+ with open(file_path, 'wb') as writer:
+ datum_writer = io.DatumWriter()
+ schema_object = schema.Parse(writer_schema)
+ with datafile.DataFileWriter(
+ writer=writer,
+ datum_writerÚtum_writer,
+ writer_schema=schema_object,
+ codec=codec,
+ ) as dfw:
+ dfw.append(datum)
+
+ logging.debug('Appending data to %r', file_path)
+ for i in range(9):
+ with open(file_path, 'ab+') as writer:
+ with datafile.DataFileWriter(writer, io.DatumWriter()) as dfw:
+ dfw.append(datum)
+
+ logging.debug('Reading appended data from %r', file_path)
+ with open(file_path, 'rb') as reader:
+ datum_reader = io.DatumReader()
+ with datafile.DataFileReader(reader, datum_reader) as dfr:
+ appended_data = list(dfr)
+
+ logging.debug(
+ 'Appended data has %d items: %r',
+ len(appended_data), appended_data)
+
+ if ([datum] * 10) == appended_data:
+ correct += 1
+ else:
+ logging.error(
+ 'Appended data does not match:\n'
+ 'Expect: %r\n'
+ 'Actual: %r',
+ [datum] * 10,
+ appended_data)
+
+ self.assertEqual(
+ correct,
+ len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+
+ def testContextManager(self):
+ file_path = self.NewTempFile()
+
+ # Test the writer with a 'with' statement.
+ with open(file_path, 'wb') as writer:
+ datum_writer = io.DatumWriter()
+ sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
+ schema_object = schema.Parse(sample_schema)
+ with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
+ dfw.append(sample_datum)
+ self.assertTrue(writer.closed)
+
+ # Test the reader with a 'with' statement.
+ datums = []
+ with open(file_path, 'rb') as reader:
+ datum_reader = io.DatumReader()
+ with datafile.DataFileReader(reader, datum_reader) as dfr:
+ for datum in dfr:
+ datums.append(datum)
+ self.assertTrue(reader.closed)
+
+ def testMetadata(self):
+ file_path = self.NewTempFile()
+
+ # Test the writer with a 'with' statement.
+ with open(file_path, 'wb') as writer:
+ datum_writer = io.DatumWriter()
+ sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
+ schema_object = schema.Parse(sample_schema)
+ with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
+ dfw.SetMeta('test.string', 'foo')
+ dfw.SetMeta('test.number', '1')
+ dfw.append(sample_datum)
+ self.assertTrue(writer.closed)
+
+ # Test the reader with a 'with' statement.
+ datums = []
+ with open(file_path, 'rb') as reader:
+ datum_reader = io.DatumReader()
+ with datafile.DataFileReader(reader, datum_reader) as dfr:
+ self.assertEqual(b'foo', dfr.GetMeta('test.string'))
+ self.assertEqual(b'1', dfr.GetMeta('test.number'))
+ for datum in dfr:
+ datums.append(datum)
+ self.assertTrue(reader.closed)
+
+
+# ------------------------------------------------------------------------------
+
+
+if __name__ == '__main__':
+ raise Exception('Use run_tests.py')
Propchange: avro/trunk/lang/py3/avro/tests/test_datafile.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile_interop.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/test_datafile_interop.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_datafile_interop.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,83 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+import unittest
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+def GetInteropSchema():
+ test_dir = os.path.dirname(os.path.abspath(__file__))
+ schema_json_path = os.path.join(test_dir, 'interop.avsc')
+ with open(schema_json_path, 'r') as f:
+ schema_json = f.read()
+ return schema.Parse(schema_json)
+
+
+INTEROP_SCHEMA = GetInteropSchema()
+INTEROP_DATUM = {
+ 'intField': 12,
+ 'longField': 15234324,
+ 'stringField': 'hey',
+ 'boolField': True,
+ 'floatField': 1234.0,
+ 'doubleField': -1234.0,
+ 'bytesField': b'12312adf',
+ 'nullField': None,
+ 'arrayField': [5.0, 0.0, 12.0],
+ 'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}},
+ 'unionField': 12.0,
+ 'enumField': 'C',
+ 'fixedField': b'1019181716151413',
+ 'recordField': {
+ 'label': 'blah',
+ 'children': [{'label': 'inner', 'children': []}],
+ },
+}
+
+
+def WriteDataFile(path, datum, schema):
+ datum_writer = io.DatumWriter()
+ with open(path, 'wb') as writer:
+ # NB: not using compression
+ with datafile.DataFileWriter(writer, datum_writer, schema) as dfw:
+ dfw.append(datum)
+
+
+class TestDataFileInterop(unittest.TestCase):
+ def testInterop(self):
+ with tempfile.NamedTemporaryFile() as temp_path:
+ WriteDataFile(temp_path.name, INTEROP_DATUM, INTEROP_SCHEMA)
+
+ # read data in binary from file
+ datum_reader = io.DatumReader()
+ with open(temp_path.name, 'rb') as reader:
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ for datum in dfr:
+ self.assertEqual(INTEROP_DATUM, datum)
+
+
+if __name__ == '__main__':
+ raise Exception('Use run_tests.py')
Propchange: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/py3/avro/tests/test_io.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_io.py?rev57225&view=auto
=============================================================================--- \
avro/trunk/lang/py3/avro/tests/test_io.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_io.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,351 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import binascii
+import io
+import logging
+import sys
+import unittest
+
+from avro import io as avro_io
+from avro import schema
+
+
+SCHEMAS_TO_VALIDATE = (
+ ('"null"', None),
+ ('"boolean"', True),
+ ('"string"', 'adsfasdf09809dsf-sf'),
+ ('"bytes"', b'12345abcd'),
+ ('"int"', 1234),
+ ('"long"', 1234),
+ ('"float"', 1234.0),
+ ('"double"', 1234.0),
+ ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
+ ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
+ ('{"type": "array", "items": "long"}', [1, 3, 2]),
+ ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+ ('["string", "null", "long"]', None),
+ ("""\
+ {"type": "record",
+ "name": "Test",
+ "fields": [{"name": "f", "type": "long"}]}
+ """, {'f': 5}),
+ ("""
+ {
+ "type": "record",
+ "name": "Lisp",
+ "fields": [{
+ "name": "value",
+ "type": [
+ "null",
+ "string",
+ {
+ "type": "record",
+ "name": "Cons",
+ "fields": [{"name": "car", "type": "Lisp"},
+ {"name": "cdr", "type": "Lisp"}]
+ }
+ ]
+ }]
+ }
+ """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+)
+
+BINARY_ENCODINGS = (
+ (0, '00'),
+ (-1, '01'),
+ (1, '02'),
+ (-2, '03'),
+ (2, '04'),
+ (-64, '7f'),
+ (64, '80 01'),
+ (8192, '80 80 01'),
+ (-8193, '81 80 01'),
+)
+
+DEFAULT_VALUE_EXAMPLES = (
+ ('"null"', 'null', None),
+ ('"boolean"', 'true', True),
+ ('"string"', '"foo"', 'foo'),
+ ('"bytes"', '"\u00FF\u00FF"', '\xff\xff'),
+ ('"int"', '5', 5),
+ ('"long"', '5', 5),
+ ('"float"', '1.1', 1.1),
+ ('"double"', '1.1', 1.1),
+ ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', '\xff\xff'),
+ ('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"', 'FOO'),
+ ('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]),
+ ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1, 'b': 2}),
+ ('["int", "null"]', '5', 5),
+ ('{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}',
+ '{"A": 5}', {'A': 5}),
+)
+
+LONG_RECORD_SCHEMA = schema.Parse("""
+{
+ "type": "record",
+ "name": "Test",
+ "fields": [
+ {"name": "A", "type": "int"},
+ {"name": "B", "type": "int"},
+ {"name": "C", "type": "int"},
+ {"name": "D", "type": "int"},
+ {"name": "E", "type": "int"},
+ {"name": "F", "type": "int"},
+ {"name": "G", "type": "int"}
+ ]
+}
+""")
+
+LONG_RECORD_DATUM = {'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7}
+
+
+def avro_hexlify(reader):
+ """Return the hex value, as a string, of a binary-encoded int or long."""
+ bytes = []
+ current_byte = reader.read(1)
+ bytes.append(binascii.hexlify(current_byte).decode())
+ while (ord(current_byte) & 0x80) != 0:
+ current_byte = reader.read(1)
+ bytes.append(binascii.hexlify(current_byte).decode())
+ return ' '.join(bytes)
+
+
+def write_datum(datum, writer_schema):
+ writer = io.BytesIO()
+ encoder = avro_io.BinaryEncoder(writer)
+ datum_writer = avro_io.DatumWriter(writer_schema)
+ datum_writer.write(datum, encoder)
+ return writer, encoder, datum_writer
+
+
+def read_datum(buffer, writer_schema, reader_schema=None):
+ reader = io.BytesIO(buffer.getvalue())
+ decoder = avro_io.BinaryDecoder(reader)
+ datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+ return datum_reader.read(decoder)
+
+
+def check_binary_encoding(number_type):
+ logging.debug('Testing binary encoding for type %s', number_type)
+ correct = 0
+ for datum, hex_encoding in BINARY_ENCODINGS:
+ logging.debug('Datum: %d', datum)
+ logging.debug('Correct Encoding: %s', hex_encoding)
+
+ writer_schema = schema.Parse('"%s"' % number_type.lower())
+ writer, encoder, datum_writer = write_datum(datum, writer_schema)
+ writer.seek(0)
+ hex_val = avro_hexlify(writer)
+
+ logging.debug('Read Encoding: %s', hex_val)
+ if hex_encoding == hex_val: correct += 1
+ return correct
+
+
+def check_skip_number(number_type):
+ logging.debug('Testing skip number for %s', number_type)
+ correct = 0
+ for value_to_skip, hex_encoding in BINARY_ENCODINGS:
+ VALUE_TO_READ = 6253
+ logging.debug('Value to Skip: %d', value_to_skip)
+
+ # write the value to skip and a known value
+ writer_schema = schema.Parse('"%s"' % number_type.lower())
+ writer, encoder, datum_writer = write_datum(value_to_skip, writer_schema)
+ datum_writer.write(VALUE_TO_READ, encoder)
+
+ # skip the value
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro_io.BinaryDecoder(reader)
+ decoder.skip_long()
+
+ # read data from string buffer
+ datum_reader = avro_io.DatumReader(writer_schema)
+ read_value = datum_reader.read(decoder)
+
+ logging.debug('Read Value: %d', read_value)
+ if read_value == VALUE_TO_READ: correct += 1
+ return correct
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestIO(unittest.TestCase):
+ #
+ # BASIC FUNCTIONALITY
+ #
+
+ def testValidate(self):
+ passed = 0
+ for example_schema, datum in SCHEMAS_TO_VALIDATE:
+ logging.debug('Schema: %r', example_schema)
+ logging.debug('Datum: %r', datum)
+ validated = avro_io.Validate(schema.Parse(example_schema), datum)
+ logging.debug('Valid: %s', validated)
+ if validated: passed += 1
+ self.assertEqual(passed, len(SCHEMAS_TO_VALIDATE))
+
+ def testRoundTrip(self):
+ correct = 0
+ for example_schema, datum in SCHEMAS_TO_VALIDATE:
+ logging.debug('Schema: %s', example_schema)
+ logging.debug('Datum: %s', datum)
+
+ writer_schema = schema.Parse(example_schema)
+ writer, encoder, datum_writer = write_datum(datum, writer_schema)
+ round_trip_datum = read_datum(writer, writer_schema)
+
+ logging.debug('Round Trip Datum: %s', round_trip_datum)
+ if datum == round_trip_datum: correct += 1
+ self.assertEqual(correct, len(SCHEMAS_TO_VALIDATE))
+
+ #
+ # BINARY ENCODING OF INT AND LONG
+ #
+
+ def testBinaryIntEncoding(self):
+ correct = check_binary_encoding('int')
+ self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+ def testBinaryLongEncoding(self):
+ correct = check_binary_encoding('long')
+ self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+ def testSkipInt(self):
+ correct = check_skip_number('int')
+ self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+ def testSkipLong(self):
+ correct = check_skip_number('long')
+ self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+ #
+ # SCHEMA RESOLUTION
+ #
+
+ def testSchemaPromotion(self):
+ # note that checking writer_schema.type in read_data
+ # allows us to handle promotion correctly
+ promotable_schemas = ['"int"', '"long"', '"float"', '"double"']
+ incorrect = 0
+ for i, ws in enumerate(promotable_schemas):
+ writer_schema = schema.Parse(ws)
+ datum_to_write = 219
+ for rs in promotable_schemas[i + 1:]:
+ reader_schema = schema.Parse(rs)
+ writer, enc, dw = write_datum(datum_to_write, writer_schema)
+ datum_read = read_datum(writer, writer_schema, reader_schema)
+ logging.debug('Writer: %s Reader: %s', writer_schema, reader_schema)
+ logging.debug('Datum Read: %s', datum_read)
+ if datum_read != datum_to_write: incorrect += 1
+ self.assertEqual(incorrect, 0)
+
+ def testUnknownSymbol(self):
+ writer_schema = schema.Parse("""\
+ {"type": "enum", "name": "Test",
+ "symbols": ["FOO", "BAR"]}""")
+ datum_to_write = 'FOO'
+
+ reader_schema = schema.Parse("""\
+ {"type": "enum", "name": "Test",
+ "symbols": ["BAR", "BAZ"]}""")
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro_io.BinaryDecoder(reader)
+ datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+ self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder)
+
+ def testDefaultValue(self):
+ writer_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ correct = 0
+ for field_type, default_json, default_datum in DEFAULT_VALUE_EXAMPLES:
+ reader_schema = schema.Parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "H", "type": %s, "default": %s}]}
+ """ % (field_type, default_json))
+ datum_to_read = {'H': default_datum}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+ datum_read = read_datum(writer, writer_schema, reader_schema)
+ logging.debug('Datum Read: %s', datum_read)
+ if datum_to_read == datum_read: correct += 1
+ self.assertEqual(correct, len(DEFAULT_VALUE_EXAMPLES))
+
+ def testNoDefaultValue(self):
+ writer_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ reader_schema = schema.Parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "H", "type": "int"}]}""")
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro_io.BinaryDecoder(reader)
+ datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+ self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder)
+
+ def testProjection(self):
+ writer_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ reader_schema = schema.Parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "E", "type": "int"},
+ {"name": "F", "type": "int"}]}""")
+ datum_to_read = {'E': 5, 'F': 6}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+ datum_read = read_datum(writer, writer_schema, reader_schema)
+ logging.debug('Datum Read: %s', datum_read)
+ self.assertEqual(datum_to_read, datum_read)
+
+ def testFieldOrder(self):
+ writer_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ reader_schema = schema.Parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "F", "type": "int"},
+ {"name": "E", "type": "int"}]}""")
+ datum_to_read = {'E': 5, 'F': 6}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+ datum_read = read_datum(writer, writer_schema, reader_schema)
+ logging.debug('Datum Read: %s', datum_read)
+ self.assertEqual(datum_to_read, datum_read)
+
+ def testTypeException(self):
+ writer_schema = schema.Parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "F", "type": "int"},
+ {"name": "E", "type": "int"}]}""")
+ datum_to_write = {'E': 5, 'F': 'Bad'}
+ self.assertRaises(
+ avro_io.AvroTypeException, write_datum, datum_to_write, writer_schema)
+
+
+if __name__ == '__main__':
+ raise Exception('Use run_tests.py')
Propchange: avro/trunk/lang/py3/avro/tests/test_io.py
------------------------------------------------------------------------------
svn:eol-style = native
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic