[prev in list] [next in list] [prev in thread] [next in thread]
List: avro-commits
Subject: [avro] 19/23: AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
From: rskraba () apache ! org
Date: 2020-01-29 8:54:38
Message-ID: 20200129085420.90BC38B6A3 () gitbox ! apache ! org
[Download RAW message or body]
This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git
commit e9eee386a14b592a7cca55a1214c14c10d03a3b2
Author: Michael A. Smith <michael@smith-li.com>
AuthorDate: Tue Oct 22 08:49:44 2019 -0400
AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
---
lang/py/scripts/avro | 24 +++++----
lang/py/setup.py | 7 ++-
lang/py/src/avro/__init__.py | 10 ++--
lang/py/src/avro/constants.py | 13 +++--
lang/py/src/avro/datafile.py | 2 +-
lang/py/src/avro/io.py | 63 ++++++++++++----------
lang/py/src/avro/ipc.py | 27 ++++++----
lang/py/src/avro/protocol.py | 32 +++++------
lang/py/src/avro/schema.py | 31 +++++++----
lang/py/src/avro/tether/__init__.py | 15 +++---
lang/py/src/avro/tether/tether_task.py | 41 +++++++-------
lang/py/src/avro/tether/tether_task_runner.py | 61 ++++++++++-----------
lang/py/src/avro/tether/util.py | 37 ++++++-------
lang/py/src/avro/timezones.py | 9 +++-
lang/py/src/avro/tool.py | 45 +++++++++-------
lang/py/src/avro/txipc.py | 13 +++--
lang/py/{src/avro => test}/__init__.py | 10 ++--
lang/py/test/av_bench.py | 7 ++-
lang/py/test/gen_interop_data.py | 40 +++++++++++---
lang/py/test/mock_tether_parent.py | 36 ++++++-------
lang/py/test/sample_http_client.py | 7 ++-
lang/py/test/sample_http_server.py | 3 ++
lang/py/test/set_avro_test_path.py | 7 +++
lang/py/test/test_datafile.py | 77 +++++++++++++++------------
lang/py/test/test_datafile_interop.py | 43 +++++++++------
lang/py/test/test_io.py | 59 ++++++++++----------
lang/py/test/test_ipc.py | 7 +++
lang/py/test/test_script.py | 11 ++--
lang/py/test/test_tether_task.py | 39 ++++++++------
lang/py/test/test_tether_task_runner.py | 67 ++++++++++++-----------
lang/py/test/test_tether_word_count.py | 15 +++---
lang/py/test/txsample_http_client.py | 5 ++
lang/py/test/txsample_http_server.py | 3 ++
lang/py/test/word_count_task.py | 49 +++++++++--------
34 files changed, 533 insertions(+), 382 deletions(-)
diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro
index b320a39..1aac028 100644
--- a/lang/py/scripts/avro
+++ b/lang/py/scripts/avro
@@ -18,16 +18,19 @@
"""Command line utility for reading and writing Avro files."""
-from avro.io import DatumReader, DatumWriter
-from avro.datafile import DataFileReader, DataFileWriter
-import avro.schema
+from __future__ import absolute_import, division, print_function
-import json
import csv
-from sys import stdout, stdin
-from itertools import ifilter, imap
+import json
from functools import partial
+from itertools import ifilter, imap
from os.path import splitext
+from sys import stdin, stdout
+
+import avro.schema
+from avro.datafile import DataFileReader, DataFileWriter
+from avro.io import DatumReader, DatumWriter
+
class AvroError(Exception):
pass
@@ -52,9 +55,9 @@ def print_csv(row):
def select_printer(format):
return {
- "json" : print_json,
- "json-pretty" : print_json_pretty,
- "csv" : print_csv
+ "json": print_json,
+ "json-pretty": print_json_pretty,
+ "csv": print_csv
}[format]
def record_match(expr, record):
@@ -102,7 +105,7 @@ def print_avro(avro, opts):
def print_schema(avro):
schema = avro.meta["avro.schema"]
# Pretty print
- print json.dumps(json.loads(schema), indent=4)
+ print(json.dumps(json.loads(schema), indent=4))
def cat(opts, args):
if not args:
@@ -257,4 +260,3 @@ def main(argv=None):
if __name__ == "__main__":
main()
-
diff --git a/lang/py/setup.py b/lang/py/setup.py
index c2e07c6..b978092 100755
--- a/lang/py/setup.py
+++ b/lang/py/setup.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -7,9 +8,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +18,8 @@
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
import distutils.errors
import glob
import os
diff --git a/lang/py/src/avro/__init__.py b/lang/py/src/avro/__init__.py
index 47d1295..9a859e9 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/src/avro/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,14 +8,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
+from __future__ import absolute_import, division, print_function
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
diff --git a/lang/py/src/avro/constants.py b/lang/py/src/avro/constants.py
index 66e31df..2197201 100644
--- a/lang/py/src/avro/constants.py
+++ b/lang/py/src/avro/constants.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,18 +8,18 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""
-Contains Constants for Python Avro
-"""
+"""Contains Constants for Python Avro"""
+
+from __future__ import absolute_import, division, print_function
DATE = "date"
DECIMAL = "decimal"
diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py
index 75a8e4a..a9c9c22 100644
--- a/lang/py/src/avro/datafile.py
+++ b/lang/py/src/avro/datafile.py
@@ -401,4 +401,4 @@ def generate_sixteen_random_bytes():
try:
return os.urandom(16)
except NotImplementedError:
- return [chr(random.randrange(256)) for i in range(16)]
+ return bytes(random.randrange(256) for i in range(16))
diff --git a/lang/py/src/avro/io.py b/lang/py/src/avro/io.py
index c36c5b3..b18b148 100644
--- a/lang/py/src/avro/io.py
+++ b/lang/py/src/avro/io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -38,6 +41,8 @@ uses the following mapping:
* Schema booleans are implemented as bool.
"""
+from __future__ import absolute_import, division, print_function
+
import datetime
import json
import struct
@@ -183,7 +188,7 @@ class BinaryDecoder(object):
def read_boolean(self):
"""
- a boolean is written as a single byte
+ a boolean is written as a single byte
whose value is either 0 (false) or 1 (true).
"""
return ord(self.read(1)) == 1
@@ -261,7 +266,7 @@ class BinaryDecoder(object):
def read_bytes(self):
"""
- Bytes are encoded as a long followed by that many bytes of data.
+ Bytes are encoded as a long followed by that many bytes of data.
"""
return self.read(self.read_long())
@@ -297,7 +302,7 @@ class BinaryDecoder(object):
def read_time_millis_from_int(self):
"""
- int is decoded as python time object which represents
+ int is decoded as python time object which represents
the number of milliseconds after midnight, 00:00:00.000.
"""
milliseconds = self.read_int()
@@ -305,7 +310,7 @@ class BinaryDecoder(object):
def read_time_micros_from_long(self):
"""
- long is decoded as python time object which represents
+ long is decoded as python time object which represents
the number of microseconds after midnight, 00:00:00.000000.
"""
microseconds = self.read_long()
@@ -313,17 +318,17 @@ class BinaryDecoder(object):
def read_timestamp_millis_from_long(self):
"""
- long is decoded as python datetime object which represents
+ long is decoded as python datetime object which represents
the number of milliseconds from the unix epoch, 1 January 1970.
"""
timestamp_millis = self.read_long()
timedelta = datetime.timedelta(microseconds=timestamp_millis * 1000)
- unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, \
tzinfo=timezones.utc) + unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, \
0, 0, tzinfo=timezones.utc) return unix_epoch_datetime + timedelta
def read_timestamp_micros_from_long(self):
"""
- long is decoded as python datetime object which represents
+ long is decoded as python datetime object which represents
the number of microseconds from the unix epoch, 1 January 1970.
"""
timestamp_micros = self.read_long()
@@ -386,10 +391,10 @@ class BinaryEncoder(object):
null is written as zero bytes
"""
pass
-
+
def write_boolean(self, datum):
"""
- a boolean is written as a single byte
+ a boolean is written as a single byte
whose value is either 0 (false) or 1 (true).
"""
if datum:
@@ -399,7 +404,7 @@ class BinaryEncoder(object):
def write_int(self, datum):
"""
- int and long values are written using variable-length, zig-zag coding.
+ int and long values are written using variable-length, zig-zag coding.
"""
self.write_long(datum);
@@ -491,7 +496,7 @@ class BinaryEncoder(object):
bits_to_write = unscaled_datum >> (8 * index)
self.write(chr(bits_to_write & 0xff))
else:
- for i in range(offset_bits/8):
+ for i in range(offset_bits // 8):
self.write(chr(0))
for index in range(bytes_req-1, -1, -1):
bits_to_write = unscaled_datum >> (8 * index)
@@ -499,7 +504,7 @@ class BinaryEncoder(object):
def write_bytes(self, datum):
"""
- Bytes are encoded as a long followed by that many bytes of data.
+ Bytes are encoded as a long followed by that many bytes of data.
"""
self.write_long(len(datum))
self.write(struct.pack('%ds' % len(datum), datum))
@@ -591,32 +596,32 @@ class DatumReader(object):
and w_type == r_type):
return True
elif (w_type == r_type == 'record' and
- DatumReader.check_props(writers_schema, readers_schema,
+ DatumReader.check_props(writers_schema, readers_schema,
['fullname'])):
return True
elif (w_type == r_type == 'error' and
- DatumReader.check_props(writers_schema, readers_schema,
+ DatumReader.check_props(writers_schema, readers_schema,
['fullname'])):
return True
elif (w_type == r_type == 'request'):
return True
- elif (w_type == r_type == 'fixed' and
- DatumReader.check_props(writers_schema, readers_schema,
+ elif (w_type == r_type == 'fixed' and
+ DatumReader.check_props(writers_schema, readers_schema,
['fullname', 'size'])):
return True
- elif (w_type == r_type == 'enum' and
- DatumReader.check_props(writers_schema, readers_schema,
+ elif (w_type == r_type == 'enum' and
+ DatumReader.check_props(writers_schema, readers_schema,
['fullname'])):
return True
- elif (w_type == r_type == 'map' and
+ elif (w_type == r_type == 'map' and
DatumReader.check_props(writers_schema.values,
readers_schema.values, ['type'])):
return True
- elif (w_type == r_type == 'array' and
+ elif (w_type == r_type == 'array' and
DatumReader.check_props(writers_schema.items,
readers_schema.items, ['type'])):
return True
-
+
# Handle schema promotion
if w_type == 'int' and r_type in ['long', 'float', 'double']:
return True
@@ -633,7 +638,7 @@ class DatumReader(object):
reader the "reader's schema".
"""
self._writers_schema = writers_schema
- self._readers_schema = readers_schema
+ self._readers_schema = readers_schema
# read/write properties
def set_writers_schema(self, writers_schema):
@@ -644,7 +649,7 @@ class DatumReader(object):
self._readers_schema = readers_schema
readers_schema = property(lambda self: self._readers_schema,
set_readers_schema)
-
+
def read(self, decoder):
if self.readers_schema is None:
self.readers_schema = self.writers_schema
@@ -682,13 +687,13 @@ class DatumReader(object):
else:
return decoder.read_int()
elif writers_schema.type == 'long':
- if (hasattr(writers_schema, 'logical_type') and
+ if (hasattr(writers_schema, 'logical_type') and
writers_schema.logical_type == constants.TIME_MICROS):
return decoder.read_time_micros_from_long()
- elif (hasattr(writers_schema, 'logical_type') and
+ elif (hasattr(writers_schema, 'logical_type') and
writers_schema.logical_type == constants.TIMESTAMP_MILLIS):
return decoder.read_timestamp_millis_from_long()
- elif (hasattr(writers_schema, 'logical_type') and
+ elif (hasattr(writers_schema, 'logical_type') and
writers_schema.logical_type == constants.TIMESTAMP_MICROS):
return decoder.read_timestamp_micros_from_long()
else:
@@ -886,7 +891,7 @@ class DatumReader(object):
% (index_of_schema, len(writers_schema.schemas))
raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
selected_writers_schema = writers_schema.schemas[index_of_schema]
-
+
# read data
return self.read_data(selected_writers_schema, readers_schema, decoder)
@@ -914,7 +919,7 @@ class DatumReader(object):
* if the reader's record schema has a field that contains a default value,
and writer's schema does not have a field with the same name, then the
reader should use the default value from its field.
- * if the reader's record schema has a field with no default value, and
+ * if the reader's record schema has a field with no default value, and
writer's schema does not have a field with the same name, then the
field's value is unset.
"""
@@ -933,7 +938,7 @@ class DatumReader(object):
if len(readers_fields_dict) > len(read_record):
writers_fields_dict = writers_schema.fields_dict
for field_name, field in readers_fields_dict.items():
- if not writers_fields_dict.has_key(field_name):
+ if field_name not in writers_fields_dict:
if field.has_default:
field_val = self._read_default_value(field.type, field.default)
read_record[field.name] = field_val
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index 8cbf07b..1bba4f7 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,17 +8,19 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""
-Support for inter-process calls.
-"""
+
+"""Support for inter-process calls."""
+
+from __future__ import absolute_import, division, print_function
+
import httplib
from avro import io, protocol, schema
@@ -189,7 +194,7 @@ class BaseRequestor(object):
* a one-byte error flag boolean, followed by either:
o if the error flag is false,
the message response, serialized per the message's response schema.
- o if the error flag is true,
+ o if the error flag is true,
the error, serialized per the message's error union schema.
"""
# response metadata
@@ -267,11 +272,11 @@ class Responder(object):
buffer_encoder = io.BinaryEncoder(buffer_writer)
error = None
response_metadata = {}
-
+
try:
remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder)
# handshake failure
- if remote_protocol is None:
+ if remote_protocol is None:
return buffer_writer.getvalue()
# read request using remote protocol
@@ -296,9 +301,9 @@ class Responder(object):
# perform server logic
try:
response = self.invoke(local_message, request)
- except AvroRemoteException, e:
+ except AvroRemoteException as e:
error = e
- except Exception, e:
+ except Exception as e:
error = AvroRemoteException(str(e))
# write response using local protocol
@@ -310,7 +315,7 @@ class Responder(object):
else:
writers_schema = local_message.errors
self.write_error(writers_schema, error, buffer_encoder)
- except schema.AvroException, e:
+ except schema.AvroException as e:
error = AvroRemoteException(str(e))
buffer_encoder = io.BinaryEncoder(StringIO())
META_WRITER.write(response_metadata, buffer_encoder)
diff --git a/lang/py/src/avro/protocol.py b/lang/py/src/avro/protocol.py
index 117401c..9b27a45 100644
--- a/lang/py/src/avro/protocol.py
+++ b/lang/py/src/avro/protocol.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-
-##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -21,10 +18,15 @@
from __future__ import absolute_import, division, print_function
-import hashlib
import json
-import avro.schema
+from avro import schema
+
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import md5
+
#
# Constants
@@ -37,7 +39,7 @@ VALID_TYPE_SCHEMA_TYPES = ('enum', 'record', 'error', 'fixed')
# Exceptions
#
-class ProtocolParseException(avro.schema.AvroException):
+class ProtocolParseException(schema.AvroException):
pass
#
@@ -49,7 +51,7 @@ class Protocol(object):
def _parse_types(self, types, type_names):
type_objects = []
for type in types:
- type_object = avro.schema.make_avsc_object(type, type_names)
+ type_object = schema.make_avsc_object(type, type_names)
if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
raise ProtocolParseException(fail_msg)
@@ -92,7 +94,7 @@ class Protocol(object):
self._props = {}
self.set_prop('name', name)
- type_names = avro.schema.Names()
+ type_names = schema.Names()
if namespace is not None:
self.set_prop('namespace', namespace)
type_names.default_namespace = namespace
@@ -100,13 +102,13 @@ class Protocol(object):
self.set_prop('types', self._parse_types(types, type_names))
if messages is not None:
self.set_prop('messages', self._parse_messages(messages, type_names))
- self._md5 = hashlib.md5(str(self)).digest()
+ self._md5 = md5(str(self)).digest()
# read-only properties
name = property(lambda self: self.get_prop('name'))
namespace = property(lambda self: self.get_prop('namespace'))
fullname = property(lambda self:
- avro.schema.Name(self.name, self.namespace).fullname)
+ schema.Name(self.name, self.namespace).fullname)
types = property(lambda self: self.get_prop('types'))
types_dict = property(lambda self: dict([(type.name, type)
for type in self.types]))
@@ -123,7 +125,7 @@ class Protocol(object):
def to_json(self):
to_dump = {}
to_dump['protocol'] = self.name
- names = avro.schema.Names(default_namespace=self.namespace)
+ names = schema.Names(default_namespace=self.namespace)
if self.namespace:
to_dump['namespace'] = self.namespace
if self.types:
@@ -148,20 +150,20 @@ class Message(object):
if not isinstance(request, list):
fail_msg = 'Request property not a list: %s' % request
raise ProtocolParseException(fail_msg)
- return avro.schema.RecordSchema(None, None, request, names, 'request')
+ return schema.RecordSchema(None, None, request, names, 'request')
def _parse_response(self, response, names):
if isinstance(response, basestring) and names.has_name(response, None):
return names.get_name(response, None)
else:
- return avro.schema.make_avsc_object(response, names)
+ return schema.make_avsc_object(response, names)
def _parse_errors(self, errors, names):
if not isinstance(errors, list):
fail_msg = 'Errors property not a list: %s' % errors
raise ProtocolParseException(fail_msg)
errors_for_parsing = {'type': 'error_union', 'declared_errors': errors}
- return avro.schema.make_avsc_object(errors_for_parsing, names)
+ return schema.make_avsc_object(errors_for_parsing, names)
def __init__(self, name, request, response, errors=None, names=None):
self._name = name
@@ -190,7 +192,7 @@ class Message(object):
def to_json(self, names=None):
if names is None:
- names = avro.schema.Names()
+ names = schema.Names()
to_dump = {}
to_dump['request'] = self.request.to_json(names)
to_dump['response'] = self.response.to_json(names)
diff --git a/lang/py/src/avro/schema.py b/lang/py/src/avro/schema.py
index 822d76c..06c1aeb 100644
--- a/lang/py/src/avro/schema.py
+++ b/lang/py/src/avro/schema.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -35,7 +38,10 @@ A schema may be one of:
Null.
"""
+from __future__ import absolute_import, division, print_function
+
import json
+import sys
from math import floor, log10
from avro import constants
@@ -231,11 +237,11 @@ class Names(object):
def has_name(self, name_attr, space_attr):
test = Name(name_attr, space_attr, self.default_namespace).fullname
- return self.names.has_key(test)
+ return test in self.names
def get_name(self, name_attr, space_attr):
test = Name(name_attr, space_attr, self.default_namespace).fullname
- if not self.names.has_key(test):
+ if test not in self.names:
return None
return self.names[test]
@@ -270,7 +276,7 @@ class Names(object):
if to_add.fullname in VALID_TYPES:
fail_msg = '%s is a reserved type name.' % to_add.fullname
raise SchemaParseException(fail_msg)
- elif self.names.has_key(to_add.fullname):
+ elif to_add.fullname in self.names:
fail_msg = 'The name "%s" is already in use.' % to_add.fullname
raise SchemaParseException(fail_msg)
@@ -377,7 +383,7 @@ class Field(object):
else:
try:
type_schema = make_avsc_object(type, names)
- except Exception, e:
+ except Exception as e:
fail_msg = 'Type property "%s" not a valid Avro schema: %s' % (type, e)
raise SchemaParseException(fail_msg)
self.set_prop('type', type_schema)
@@ -578,7 +584,7 @@ class ArraySchema(Schema):
else:
try:
items_schema = make_avsc_object(items, names)
- except SchemaParseException, e:
+ except SchemaParseException as e:
fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' \
% (items, e, names.names.keys()) raise SchemaParseException(fail_msg)
@@ -652,7 +658,7 @@ class UnionSchema(Schema):
else:
try:
new_schema = make_avsc_object(schema, names)
- except Exception, e:
+ except Exception as e:
raise SchemaParseException('Union item must be a valid Avro schema: %s' % \
str(e)) # check the new schema
if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES
@@ -708,7 +714,7 @@ class RecordSchema(NamedSchema):
# null values can have a default value of None
has_default = False
default = None
- if field.has_key('default'):
+ if 'default' in field:
has_default = True
default = field.get('default')
@@ -978,10 +984,13 @@ def parse(json_string):
# parse the JSON
try:
json_data = json.loads(json_string)
- except Exception, e:
- import sys
- raise SchemaParseException('Error parsing JSON: %s, error = %s'
- % (json_string, e)), None, sys.exc_info()[2]
+ except Exception as e:
+ msg = 'Error parsing JSON: {}, error = {}'.format(json_string, e)
+ new_exception = SchemaParseException(msg)
+ traceback = sys.exc_info()[2]
+ if not hasattr(new_exception, 'with_traceback'):
+ raise (new_exception, None, traceback) # Python 2 syntax
+ raise new_exception.with_traceback(traceback)
# Initialize the names object
names = Names()
diff --git a/lang/py/src/avro/tether/__init__.py \
b/lang/py/src/avro/tether/__init__.py index 0dbd3d8..c60edf9 100644
--- a/lang/py/src/avro/tether/__init__.py
+++ b/lang/py/src/avro/tether/__init__.py
@@ -1,4 +1,6 @@
-#
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,12 +17,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
-from .util import *
-from .tether_task import *
-from .tether_task_runner import *
+from __future__ import absolute_import, division, print_function
-__all__=util.__all__
-__all__+=tether_task.__all__
-__all__+=tether_task_runner.__all__
+from avro.tether.tether_task import HTTPRequestor, TaskType, TetherTask, \
inputProtocol, outputProtocol +from avro.tether.tether_task_runner import TaskRunner
+from avro.tether.util import find_port
diff --git a/lang/py/src/avro/tether/tether_task.py \
b/lang/py/src/avro/tether/tether_task.py index 23112a7..4e2004d 100644
--- a/lang/py/src/avro/tether/tether_task.py
+++ b/lang/py/src/avro/tether/tether_task.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
import collections
import io as pyio
@@ -30,6 +31,8 @@ from StringIO import StringIO
from avro import io as avio
from avro import ipc, protocol, schema
+__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", \
"HTTPRequestor"] +
# create protocol objects for the input and output protocols
# The build process should copy InputProtocol.avpr and OutputProtocol.avpr
# into the same directory as this module
diff --git a/lang/py/src/avro/tether/tether_task_runner.py \
b/lang/py/src/avro/tether/tether_task_runner.py index b248ffd..64bee7b 100644
--- a/lang/py/src/avro/tether/tether_task_runner.py
+++ b/lang/py/src/avro/tether/tether_task_runner.py
@@ -1,30 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TaskRunner"]
-
-if __name__ == "__main__":
- # Relative imports don't work when being run directly
- from avro import tether
- from avro.tether import TetherTask, find_port, inputProtocol
-
-else:
- from . import TetherTask, find_port, inputProtocol
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
import logging
import sys
@@ -33,12 +26,16 @@ import traceback
import weakref
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import avro.tether.tether_task
+import avro.tether.util
from avro import ipc
+__all__ = ["TaskRunner"]
+
class TaskRunnerResponder(ipc.Responder):
"""
- The responder for the thethered process
+ The responder for the tethered process
"""
def __init__(self,runner):
"""
@@ -46,7 +43,7 @@ class TaskRunnerResponder(ipc.Responder):
----------------------------------------------------------
runner - Instance of TaskRunner
"""
- ipc.Responder.__init__(self, inputProtocol)
+ ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol)
self.log=logging.getLogger("TaskRunnerResponder")
@@ -148,7 +145,7 @@ class TaskRunner(object):
self.log=logging.getLogger("TaskRunner:")
- if not(isinstance(task,TetherTask)):
+ if not(isinstance(task, avro.tether.tether_task.TetherTask)):
raise ValueError("task must be an instance of tether task")
self.task=task
@@ -172,7 +169,7 @@ class TaskRunner(object):
testing
"""
- port=find_port()
+ port = avro.tether.util.find_port()
address=("localhost",port)
@@ -212,7 +209,7 @@ if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
if (len(sys.argv)<=1):
- print "Error: tether_task_runner.__main__: Usage: tether_task_runner \
task_package.task_module.TaskClass" + print("Error: tether_task_runner.__main__: \
Usage: tether_task_runner task_package.task_module.TaskClass")
raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")
fullcls=sys.argv[1]
diff --git a/lang/py/src/avro/tether/util.py b/lang/py/src/avro/tether/util.py
index cbeeef0..3d8ad3a 100644
--- a/lang/py/src/avro/tether/util.py
+++ b/lang/py/src/avro/tether/util.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
+#!/usr/bin/env python
-__all__=["find_port"]
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
import socket
diff --git a/lang/py/src/avro/timezones.py b/lang/py/src/avro/timezones.py
index a4985b4..a306f6d 100644
--- a/lang/py/src/avro/timezones.py
+++ b/lang/py/src/avro/timezones.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,15 +8,17 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
from datetime import datetime, timedelta, tzinfo
diff --git a/lang/py/src/avro/tool.py b/lang/py/src/avro/tool.py
index 6a92fee..3c0c228 100644
--- a/lang/py/src/avro/tool.py
+++ b/lang/py/src/avro/tool.py
@@ -1,4 +1,6 @@
-#! /usr/bin/env python
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -6,20 +8,23 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
"""
Command-line tool
NOTE: The API for the command-line tool is experimental.
"""
+from __future__ import absolute_import, division, print_function
+
import sys
import threading
import urlparse
@@ -37,7 +42,7 @@ class GenericResponder(ipc.Responder):
def invoke(self, message, request):
if message.name == self.msg:
- print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+ print("Message: %s Datum: %s" % (message.name, self.datum), file=sys.stderr)
# server will shut down after processing a single Avro request
global server_should_shutdown
server_should_shutdown = True
@@ -55,7 +60,7 @@ class GenericHandler(BaseHTTPRequestHandler):
resp_writer = ipc.FramedWriter(self.wfile)
resp_writer.write_framed_message(resp_body)
if server_should_shutdown:
- print >> sys.stderr, "Shutting down server."
+ print("Shutting down server.", file=sys.stderr)
quitter = threading.Thread(target=self.server.shutdown)
quitter.daemon = True
quitter.start()
@@ -68,10 +73,10 @@ def run_server(uri, proto, msg, datum):
server_should_shutdown = False
responder = GenericResponder(proto, msg, datum)
server = HTTPServer(server_addr, GenericHandler)
- print "Port: %s" % server.server_port
+ print("Port: %s" % server.server_port)
sys.stdout.flush()
server.allow_reuse_address = True
- print >> sys.stderr, "Starting server."
+ print("Starting server.", file=sys.stderr)
server.serve_forever()
def send_message(uri, proto, msg, datum):
@@ -79,7 +84,7 @@ def send_message(uri, proto, msg, datum):
client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
proto_json = file(proto, 'r').read()
requestor = ipc.Requestor(protocol.parse(proto_json), client)
- print requestor.request(msg, datum)
+ print(requestor.request(msg, datum))
def file_or_stdin(f):
if f == "-":
@@ -89,20 +94,20 @@ def file_or_stdin(f):
def main(args=sys.argv):
if len(args) == 1:
- print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0]
+ print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0])
return 1
if args[1] == "dump":
if len(args) != 3:
- print "Usage: %s dump input_file" % args[0]
+ print("Usage: %s dump input_file" % args[0])
return 1
for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
- print repr(d)
+ print(repr(d))
elif args[1] == "rpcreceive":
usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
usage_str += "message_name (-data d | -file f)"
if len(args) not in [5, 7]:
- print usage_str
+ print(usage_str)
return 1
uri, proto, msg = args[2:5]
datum = None
@@ -111,19 +116,19 @@ def main(args=sys.argv):
reader = open(args[6], 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
- datum = dfr.next()
+ datum = next(dfr)
elif args[5] == "-data":
- print "JSON Decoder not yet implemented."
+ print("JSON Decoder not yet implemented.")
return 1
else:
- print usage_str
+ print(usage_str)
return 1
run_server(uri, proto, msg, datum)
elif args[1] == "rpcsend":
usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
usage_str += "message_name (-data d | -file f)"
if len(args) not in [5, 7]:
- print usage_str
+ print(usage_str)
return 1
uri, proto, msg = args[2:5]
datum = None
@@ -132,15 +137,15 @@ def main(args=sys.argv):
reader = open(args[6], 'rb')
datum_reader = io.DatumReader()
dfr = datafile.DataFileReader(reader, datum_reader)
- datum = dfr.next()
+ datum = next(dfr)
elif args[5] == "-data":
- print "JSON Decoder not yet implemented."
+ print("JSON Decoder not yet implemented.")
return 1
else:
- print usage_str
+ print(usage_str)
return 1
send_message(uri, proto, msg, datum)
return 0
-
+
if __name__ == "__main__":
sys.exit(main(sys.argv))
diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py
index 72d63a6..66ca726 100644
--- a/lang/py/src/avro/txipc.py
+++ b/lang/py/src/avro/txipc.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,10 +16,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
+
+from __future__ import absolute_import, division, print_function
+
from zope.interface import implements
from avro import io, ipc
@@ -29,6 +29,11 @@ from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
class TwistedRequestor(ipc.BaseRequestor):
"""A Twisted-compatible requestor. Returns a Deferred that will fire with the
diff --git a/lang/py/src/avro/__init__.py b/lang/py/test/__init__.py
similarity index 89%
copy from lang/py/src/avro/__init__.py
copy to lang/py/test/__init__.py
index 47d1295..a2a5bef 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/test/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,14 +8,11 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
-
diff --git a/lang/py/test/av_bench.py b/lang/py/test/av_bench.py
index e90c987..1e6a05d 100644
--- a/lang/py/test/av_bench.py
+++ b/lang/py/test/av_bench.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +17,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
import sys
import time
from random import choice, randint, sample
@@ -72,5 +75,5 @@ def t(f, *args):
if __name__ == "__main__":
n = int(sys.argv[1])
- print "Write %0.4f" % t(write, n)
- print "Read %0.4f" % t(read)
+ print("Write %0.4f" % t(write, n))
+ print("Read %0.4f" % t(read))
diff --git a/lang/py/test/gen_interop_data.py b/lang/py/test/gen_interop_data.py
index 336434e..13bf86c 100644
--- a/lang/py/test/gen_interop_data.py
+++ b/lang/py/test/gen_interop_data.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -13,15 +14,33 @@
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import os
import sys
from avro import datafile, io, schema
+CODECS_TO_VALIDATE = ('null', 'deflate')
+
+try:
+ import snappy
+ CODECS_TO_VALIDATE += ('snappy',)
+except ImportError:
+ print('Snappy not present, will skip generating it.')
+try:
+ import zstandard
+ CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+ print('Zstandard not present, will skip generating it.')
+
DATUM = {
'intField': 12,
- 'longField': 15234324L,
+ 'longField': 15234324,
'stringField': unicode('hey'),
'boolField': True,
'floatField': 1234.0,
@@ -37,10 +56,15 @@ DATUM = {
}
if __name__ == "__main__":
- interop_schema = schema.parse(open(sys.argv[1], 'r').read())
- writer = open(sys.argv[2], 'wb')
- datum_writer = io.DatumWriter()
- # NB: not using compression
- dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema)
- dfw.append(DATUM)
- dfw.close()
+ for codec in CODECS_TO_VALIDATE:
+ interop_schema = schema.parse(open(sys.argv[1], 'r').read())
+ filename = sys.argv[2]
+ if codec != 'null':
+ base, ext = os.path.splitext(filename)
+ filename = base + "_" + codec + ext
+ writer = open(filename, 'wb')
+ datum_writer = io.DatumWriter()
+ # NB: not using compression
+ dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema, codec=codec)
+ dfw.append(DATUM)
+ dfw.close()
diff --git a/lang/py/test/mock_tether_parent.py b/lang/py/test/mock_tether_parent.py
index c82e249..88d84dd 100644
--- a/lang/py/test/mock_tether_parent.py
+++ b/lang/py/test/mock_tether_parent.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,45 +17,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
import socket
import sys
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+import avro.tether.tether_task
+import avro.tether.util
import set_avro_test_path
-from avro import ipc, protocol, tether
-
-
-def find_port():
- """
- Return an unbound port
- """
- s=socket.socket()
- s.bind(("127.0.0.1",0))
-
- port=s.getsockname()[1]
- s.close()
-
- return port
+from avro import ipc, protocol
-SERVER_ADDRESS = ('localhost', find_port())
+SERVER_ADDRESS = ('localhost', avro.tether.util.find_port())
class MockParentResponder(ipc.Responder):
"""
The responder for the mocked parent
"""
def __init__(self):
- ipc.Responder.__init__(self, tether.outputProtocol)
+ ipc.Responder.__init__(self, avro.tether.tether_task.outputProtocol)
def invoke(self, message, request):
if message.name=='configure':
- print "MockParentResponder: Recieved 'configure': \
inputPort={0}".format(request["port"]) + print("MockParentResponder: Recieved \
'configure': inputPort={0}".format(request["port"]))
elif message.name=='status':
- print "MockParentResponder: Recieved 'status': \
message={0}".format(request["message"]) + print("MockParentResponder: Recieved \
'status': message={0}".format(request["message"])) elif message.name=='fail':
- print "MockParentResponder: Recieved 'fail': \
message={0}".format(request["message"]) + print("MockParentResponder: Recieved \
'fail': message={0}".format(request["message"])) else:
- print "MockParentResponder: Recieved {0}".format(message.name)
+ print("MockParentResponder: Recieved {0}".format(message.name))
# flush the output so it shows up in the parent process
sys.stdout.flush()
@@ -85,7 +79,7 @@ if __name__ == '__main__':
raise ValueError("Usage: mock_tether_parent start_server port")
SERVER_ADDRESS=(SERVER_ADDRESS[0],port)
- print "mock_tether_parent: Launching Server on Port: \
{0}".format(SERVER_ADDRESS[1]) + print("mock_tether_parent: Launching Server on \
Port: {0}".format(SERVER_ADDRESS[1]))
# flush the output so it shows up in the parent process
sys.stdout.flush()
diff --git a/lang/py/test/sample_http_client.py b/lang/py/test/sample_http_client.py
index 62c91fd..02b8421 100644
--- a/lang/py/test/sample_http_client.py
+++ b/lang/py/test/sample_http_client.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -13,8 +14,12 @@
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
import sys
from avro import ipc, protocol
@@ -78,7 +83,7 @@ if __name__ == '__main__':
# build the parameters for the request
params = {}
params['message'] = message
-
+
# send the requests and print the result
for msg_count in range(num_messages):
requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
diff --git a/lang/py/test/sample_http_server.py b/lang/py/test/sample_http_server.py
index e412ab5..c680afb 100644
--- a/lang/py/test/sample_http_server.py
+++ b/lang/py/test/sample_http_server.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +17,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from avro import ipc, protocol
diff --git a/lang/py/test/set_avro_test_path.py b/lang/py/test/set_avro_test_path.py
index 8e47faf..fd395da 100644
--- a/lang/py/test/set_avro_test_path.py
+++ b/lang/py/test/set_avro_test_path.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -13,6 +16,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
"""
Module adjusts the path PYTHONPATH so the unittests
will work even if an egg for AVRO is already installed.
@@ -28,6 +32,9 @@ being built. To work around this the unittests import this module \
before importing AVRO. This module in turn adjusts the python path so that the test
build of AVRO is higher on the path then any installed eggs.
"""
+
+from __future__ import absolute_import, division, print_function
+
import os
import sys
diff --git a/lang/py/test/test_datafile.py b/lang/py/test/test_datafile.py
index 2b7061c..bceb071 100644
--- a/lang/py/test/test_datafile.py
+++ b/lang/py/test/test_datafile.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,14 +8,17 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
import os
import unittest
@@ -56,25 +62,30 @@ try:
import snappy
CODECS_TO_VALIDATE += ('snappy',)
except ImportError:
- print 'Snappy not present, will skip testing it.'
+ print('Snappy not present, will skip testing it.')
+try:
+ import zstandard
+ CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+ print('Zstandard not present, will skip testing it.')
# TODO(hammer): clean up written files with ant, not os.remove
class TestDataFile(unittest.TestCase):
def test_round_trip(self):
- print ''
- print 'TEST ROUND TRIP'
- print '==============='
- print ''
+ print('')
+ print('TEST ROUND TRIP')
+ print('===============')
+ print('')
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
for codec in CODECS_TO_VALIDATE:
- print ''
- print 'SCHEMA NUMBER %d' % (i + 1)
- print '================'
- print ''
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
- print 'Codec: %s' % codec
+ print('')
+ print('SCHEMA NUMBER %d' % (i + 1))
+ print('================')
+ print('')
+ print('Schema: %s' % example_schema)
+ print('Datum: %s' % datum)
+ print('Codec: %s' % codec)
# write data in binary to file 10 times
writer = open(FILENAME, 'wb')
@@ -93,30 +104,30 @@ class TestDataFile(unittest.TestCase):
for datum in dfr:
round_trip_data.append(datum)
- print 'Round Trip Data: %s' % round_trip_data
- print 'Round Trip Data Length: %d' % len(round_trip_data)
+ print('Round Trip Data: %s' % round_trip_data)
+ print('Round Trip Data Length: %d' % len(round_trip_data))
is_correct = [datum] * 10 == round_trip_data
if is_correct: correct += 1
- print 'Correct Round Trip: %s' % is_correct
- print ''
+ print('Correct Round Trip: %s' % is_correct)
+ print('')
os.remove(FILENAME)
self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
def test_append(self):
- print ''
- print 'TEST APPEND'
- print '==========='
- print ''
+ print('')
+ print('TEST APPEND')
+ print('===========')
+ print('')
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
for codec in CODECS_TO_VALIDATE:
- print ''
- print 'SCHEMA NUMBER %d' % (i + 1)
- print '================'
- print ''
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
- print 'Codec: %s' % codec
+ print('')
+ print('SCHEMA NUMBER %d' % (i + 1))
+ print('================')
+ print('')
+ print('Schema: %s' % example_schema)
+ print('Datum: %s' % datum)
+ print('Codec: %s' % codec)
# write data in binary to file once
writer = open(FILENAME, 'wb')
@@ -141,12 +152,12 @@ class TestDataFile(unittest.TestCase):
for datum in dfr:
appended_data.append(datum)
- print 'Appended Data: %s' % appended_data
- print 'Appended Data Length: %d' % len(appended_data)
+ print('Appended Data: %s' % appended_data)
+ print('Appended Data Length: %d' % len(appended_data))
is_correct = [datum] * 10 == appended_data
if is_correct: correct += 1
- print 'Correct Appended: %s' % is_correct
- print ''
+ print('Correct Appended: %s' % is_correct)
+ print('')
os.remove(FILENAME)
self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
diff --git a/lang/py/test/test_datafile_interop.py \
b/lang/py/test/test_datafile_interop.py index ee02f99..329b9a1 100644
--- a/lang/py/test/test_datafile_interop.py
+++ b/lang/py/test/test_datafile_interop.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,14 +8,17 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
import os
import unittest
@@ -22,20 +28,27 @@ from avro import datafile, io
class TestDataFileInterop(unittest.TestCase):
def test_interop(self):
- print ''
- print 'TEST INTEROP'
- print '============'
- print ''
+ print()
+ print('TEST INTEROP')
+ print('============')
+ print()
for f in os.listdir('@INTEROP_DATA_DIR@'):
- print 'READING %s' % f
- print ''
-
- # read data in binary from file
- reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- for datum in dfr:
- assert datum is not None
+ base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
+ if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+ print('READING %s' % f)
+ print('')
+
+ # read data in binary from file
+ reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ i = 0
+ for i, datum in enumerate(dfr, 1):
+ assert datum is not None
+ assert i > 0
+ else:
+ print('SKIPPING %s due to an unsupported codec' % f)
+ print('')
if __name__ == '__main__':
unittest.main()
diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py
index 533aa40..2d734a5 100644
--- a/lang/py/test/test_io.py
+++ b/lang/py/test/test_io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,14 +8,17 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
import datetime
import unittest
from binascii import hexlify
@@ -27,7 +33,6 @@ except ImportError:
from StringIO import StringIO
-
SCHEMAS_TO_VALIDATE = (
('"null"', None),
('"boolean"', True),
@@ -112,7 +117,7 @@ DEFAULT_VALUE_EXAMPLES = (
('"string"', '"foo"', u'foo'),
('"bytes"', '"\u00FF\u00FF"', u'\xff\xff'),
('"int"', '5', 5),
- ('"long"', '5', 5L),
+ ('"long"', '5', 5),
('"float"', '1.1', 1.1),
('"double"', '1.1', 1.1),
('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'),
@@ -148,10 +153,10 @@ def avro_hexlify(reader):
return ' '.join(bytes)
def print_test_name(test_name):
- print ''
- print test_name
- print '=' * len(test_name)
- print ''
+ print('')
+ print(test_name)
+ print('=' * len(test_name))
+ print('')
def write_datum(datum, writers_schema):
writer = StringIO()
@@ -170,17 +175,17 @@ def check_binary_encoding(number_type):
print_test_name('TEST BINARY %s ENCODING' % number_type.upper())
correct = 0
for datum, hex_encoding in BINARY_ENCODINGS:
- print 'Datum: %d' % datum
- print 'Correct Encoding: %s' % hex_encoding
+ print('Datum: %d' % datum)
+ print('Correct Encoding: %s' % hex_encoding)
writers_schema = schema.parse('"%s"' % number_type.lower())
writer, encoder, datum_writer = write_datum(datum, writers_schema)
writer.seek(0)
hex_val = avro_hexlify(writer)
- print 'Read Encoding: %s' % hex_val
+ print('Read Encoding: %s' % hex_val)
if hex_encoding == hex_val: correct += 1
- print ''
+ print('')
return correct
def check_skip_number(number_type):
@@ -188,7 +193,7 @@ def check_skip_number(number_type):
correct = 0
for value_to_skip, hex_encoding in BINARY_ENCODINGS:
VALUE_TO_READ = 6253
- print 'Value to Skip: %d' % value_to_skip
+ print('Value to Skip: %d' % value_to_skip)
# write the value to skip and a known value
writers_schema = schema.parse('"%s"' % number_type.lower())
@@ -204,11 +209,11 @@ def check_skip_number(number_type):
datum_reader = io.DatumReader(writers_schema)
read_value = datum_reader.read(decoder)
- print 'Read Value: %d' % read_value
+ print('Read Value: %d' % read_value)
if read_value == VALUE_TO_READ: correct += 1
- print ''
+ print('')
return correct
-
+
class TestIO(unittest.TestCase):
#
# BASIC FUNCTIONALITY
@@ -218,10 +223,10 @@ class TestIO(unittest.TestCase):
print_test_name('TEST VALIDATE')
passed = 0
for example_schema, datum in SCHEMAS_TO_VALIDATE:
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
+ print('Schema: %s' % example_schema)
+ print('Datum: %s' % datum)
validated = io.validate(schema.parse(example_schema), datum)
- print 'Valid: %s' % validated
+ print('Valid: %s' % validated)
if validated: passed += 1
self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
@@ -229,14 +234,14 @@ class TestIO(unittest.TestCase):
print_test_name('TEST ROUND TRIP')
correct = 0
for example_schema, datum in SCHEMAS_TO_VALIDATE:
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
+ print('Schema: %s' % example_schema)
+ print('Datum: %s' % datum)
writers_schema = schema.parse(example_schema)
writer, encoder, datum_writer = write_datum(datum, writers_schema)
round_trip_datum = read_datum(writer, writers_schema)
- print 'Round Trip Datum: %s' % round_trip_datum
+ print('Round Trip Datum: %s' % round_trip_datum)
if isinstance(round_trip_datum, Decimal):
round_trip_datum = round_trip_datum.to_eng_string()
datum = str(datum)
@@ -283,8 +288,8 @@ class TestIO(unittest.TestCase):
readers_schema = schema.parse(rs)
writer, enc, dw = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
- print 'Writer: %s Reader: %s' % (writers_schema, readers_schema)
- print 'Datum Read: %s' % datum_read
+ print('Writer: %s Reader: %s' % (writers_schema, readers_schema))
+ print('Datum Read: %s' % datum_read)
if datum_read != datum_to_write: incorrect += 1
self.assertEquals(incorrect, 0)
@@ -320,7 +325,7 @@ class TestIO(unittest.TestCase):
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
- print 'Datum Read: %s' % datum_read
+ print('Datum Read: %s' % datum_read)
if datum_to_read == datum_read: correct += 1
self.assertEquals(correct, len(DEFAULT_VALUE_EXAMPLES))
@@ -352,7 +357,7 @@ class TestIO(unittest.TestCase):
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
- print 'Datum Read: %s' % datum_read
+ print('Datum Read: %s' % datum_read)
self.assertEquals(datum_to_read, datum_read)
def test_field_order(self):
@@ -368,7 +373,7 @@ class TestIO(unittest.TestCase):
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
datum_read = read_datum(writer, writers_schema, readers_schema)
- print 'Datum Read: %s' % datum_read
+ print('Datum Read: %s' % datum_read)
self.assertEquals(datum_to_read, datum_read)
def test_type_exception(self):
diff --git a/lang/py/test/test_ipc.py b/lang/py/test/test_ipc.py
index 575a0c9..bc9bd21 100644
--- a/lang/py/test/test_ipc.py
+++ b/lang/py/test/test_ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -13,10 +16,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
"""
There are currently no IPC tests within python, in part because there are no
servers yet available.
"""
+
+from __future__ import absolute_import, division, print_function
+
import unittest
import set_avro_test_path
diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py
index 214fc15..bd0cb4d 100644
--- a/lang/py/test/test_script.py
+++ b/lang/py/test/test_script.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -5,15 +8,17 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# https://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
import csv
import json
import unittest
@@ -125,7 +130,7 @@ class TestCat(unittest.TestCase):
def test_json_pretty(self):
out = self._run("--format", "json-pretty", "-n", "1", raw=1)
- assert out.strip() == _JSON_PRETTY.strip()
+ self.assertEqual(out.strip(), _JSON_PRETTY.strip())
def test_version(self):
check_output([SCRIPT, "cat", "--version"])
diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py
index 9933070..85ed9cb 100644
--- a/lang/py/test/test_tether_task.py
+++ b/lang/py/test/test_tether_task.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,15 +17,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
+from __future__ import absolute_import, division, print_function
import os
+import StringIO
import subprocess
import sys
import time
import unittest
+import avro.tether.tether_task
+import avro.tether.util
+import mock_tether_parent
import set_avro_test_path
+from avro import io as avio
+from avro import schema, tether
+from word_count_task import WordCountTask
class TestTetherTask(unittest.TestCase):
@@ -34,15 +44,6 @@ class TestTetherTask(unittest.TestCase):
Test that the thether_task is working. We run the mock_tether_parent in a \
separate subprocess
"""
- from avro import tether
- from avro import io as avio
- from avro import schema
- from avro.tether import HTTPRequestor,inputProtocol, find_port
-
- import StringIO
- import mock_tether_parent
- from word_count_task import WordCountTask
-
task=WordCountTask()
proc=None
@@ -51,13 +52,13 @@ class TestTetherTask(unittest.TestCase):
# env["AVRO_TETHER_OUTPUT_PORT"]=output_port
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
- server_port=find_port()
+ server_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", \
pyfile,"start_server","{0}".format(server_port)])
- input_port=find_port()
+ input_port = avro.tether.util.find_port()
- print "Mock server started process pid={0}".format(proc.pid)
+ print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started # so we give the subprocess time to start up
time.sleep(1)
@@ -68,7 +69,11 @@ class TestTetherTask(unittest.TestCase):
#***************************************************************
# Test the mapper
- task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema))
+ task.configure(
+ avro.tether.tether_task.TaskType.MAP,
+ str(task.inschema),
+ str(task.midschema)
+ )
# Serialize some data so we can send it to the input function
datum="This is a line of text"
@@ -84,7 +89,11 @@ class TestTetherTask(unittest.TestCase):
task.input(data,1)
# Test the reducer
- task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema))
+ task.configure(
+ avro.tether.tether_task.TaskType.REDUCE,
+ str(task.midschema),
+ str(task.outschema)
+ )
# Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
diff --git a/lang/py/test/test_tether_task_runner.py \
b/lang/py/test/test_tether_task_runner.py index 3832dbe..985eb3c 100644
--- a/lang/py/test/test_tether_task_runner.py
+++ b/lang/py/test/test_tether_task_runner.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,28 +17,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
+import logging
import os
+import StringIO
import subprocess
import sys
import time
import unittest
+import avro.tether.tether_task
+import avro.tether.tether_task_runner
+import avro.tether.util
+import mock_tether_parent
import set_avro_test_path
+from avro import io as avio
+from word_count_task import WordCountTask
class TestTetherTaskRunner(unittest.TestCase):
- """ unit test for a tethered task runner.
- """
+ """unit test for a tethered task runner."""
def test1(self):
- from word_count_task import WordCountTask
- from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, \
TaskType
- from avro import io as avio
- import mock_tether_parent
- import subprocess
- import StringIO
- import logging
-
# set the logging level to debug so that debug messages are printed
logging.basicConfig(level=logging.DEBUG)
@@ -44,30 +48,34 @@ class TestTetherTaskRunner(unittest.TestCase):
# launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
- parent_port=find_port()
+ parent_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", \
pyfile,"start_server","{0}".format(parent_port)])
- input_port=find_port()
+ input_port = avro.tether.util.find_port()
- print "Mock server started process pid={0}".format(proc.pid)
+ print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started # so we give the subprocess time to start up
time.sleep(1)
- runner=TaskRunner(WordCountTask())
+ runner = avro.tether.tether_task_runner.TaskRunner(WordCountTask())
runner.start(outputport=parent_port,join=False)
- # Test sending various messages to the server and ensuring they are
- # processed correctly
- requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol)
+ # Test sending various messages to the server and ensuring they are processed \
correctly + requestor = avro.tether.tether_task.HTTPRequestor(
+ "localhost", runner.server.server_address[1], \
avro.tether.tether_task.inputProtocol)
# TODO: We should validate that open worked by grabbing the STDOUT of the \
subproces # and ensuring that it outputted the correct message.
# Test the mapper
- requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)})
+ requestor.request("configure", {
+ "taskType": avro.tether.tether_task.TaskType.MAP,
+ "inSchema": str(runner.task.inschema),
+ "outSchema": str(runner.task.midschema)
+ })
# Serialize some data so we can send it to the input function
datum="This is a line of text"
@@ -83,8 +91,12 @@ class TestTetherTaskRunner(unittest.TestCase):
# Call input to simulate calling map
requestor.request("input",{"data":data,"count":1})
- #Test the reducer
- requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)})
+ # Test the reducer
+ requestor.request("configure", {
+ "taskType": avro.tether.tether_task.TaskType.REDUCE,
+ "inSchema": str(runner.task.midschema),
+ "outSchema": str(runner.task.outschema)}
+ )
#Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
@@ -133,15 +145,6 @@ class TestTetherTaskRunner(unittest.TestCase):
as our main script everything works as expected. We do this by using subprocess \
to run it in a separate thread.
"""
- from word_count_task import WordCountTask
- from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, \
TaskType
- from avro.tether import tether_task_runner
- from avro import io as avio
- import mock_tether_parent
- import subprocess
- import StringIO
-
-
proc=None
runnerproc=None
@@ -149,7 +152,7 @@ class TestTetherTaskRunner(unittest.TestCase):
#launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
- parent_port=find_port()
+ parent_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", \
pyfile,"start_server","{0}".format(parent_port)]) @@ -164,14 +167,14 @@ class \
TestTetherTaskRunner(unittest.TestCase): \
env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)} \
env["PYTHONPATH"]=':'.join(sys.path)
- runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env)
+ runnerproc = subprocess.Popen(["python", \
avro.tether.tether_task_runner.__file__, "word_count_task.WordCountTask"],env=env)
#possible race condition wait for the process to start
time.sleep(1)
- print "Mock server started process pid={0}".format(proc.pid)
+ print("Mock server started process pid={0}".format(proc.pid))
#Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started #so we give the subprocess time to start up
time.sleep(1)
diff --git a/lang/py/test/test_tether_word_count.py \
b/lang/py/test/test_tether_word_count.py index d2f1858..8c3fb08 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,6 +17,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
import inspect
import os
import subprocess
@@ -25,8 +30,7 @@ import set_avro_test_path
class TestTetherWordCount(unittest.TestCase):
- """ unittest for a python tethered map-reduce job.
- """
+ """unittest for a python tethered map-reduce job."""
def _write_lines(self,lines,fname):
"""
@@ -72,7 +76,7 @@ class TestTetherWordCount(unittest.TestCase):
words=line.split()
for w in words:
- if not(counts.has_key(w.strip())):
+ if not(w.strip() in counts):
counts[w.strip()]=0
counts[w.strip()]=counts[w.strip()]+1
@@ -92,7 +96,6 @@ class TestTetherWordCount(unittest.TestCase):
import avro
import subprocess
- import StringIO
import shutil
import tempfile
import inspect
@@ -182,11 +185,11 @@ python -m avro.tether.tether_task_runner \
word_count_task.WordCountTask exhf.close()
# make it world executable
- os.chmod(exfile,0755)
+ os.chmod(exfile,0o755)
args.extend(["--program",exfile])
- print "Command:\n\t{0}".format(" ".join(args))
+ print("Command:\n\t{0}".format(" ".join(args)))
proc=subprocess.Popen(args)
diff --git a/lang/py/test/txsample_http_client.py \
b/lang/py/test/txsample_http_client.py index dba4ade..28c2c28 100644
--- a/lang/py/test/txsample_http_client.py
+++ b/lang/py/test/txsample_http_client.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -13,8 +14,12 @@
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
import sys
from avro import protocol, txipc
diff --git a/lang/py/test/txsample_http_server.py \
b/lang/py/test/txsample_http_server.py index 604ef54..fafaecd 100644
--- a/lang/py/test/txsample_http_server.py
+++ b/lang/py/test/txsample_http_server.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,6 +17,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import absolute_import, division, print_function
+
from avro import ipc, protocol, txipc
from twisted.internet import reactor
from twisted.web import server
diff --git a/lang/py/test/word_count_task.py b/lang/py/test/word_count_task.py
index 24f2da2..8181340 100644
--- a/lang/py/test/word_count_task.py
+++ b/lang/py/test/word_count_task.py
@@ -1,33 +1,36 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["WordCountTask"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
import logging
-from avro.tether import TetherTask
+import avro.tether.tether_task
+
+__all__ = ["WordCountTask"]
#TODO::Make the logging level a parameter we can set
#logging.basicConfig(level=logging.INFO)
-class WordCountTask(TetherTask):
+class WordCountTask(avro.tether.tether_task.TetherTask):
"""
- Implements the mappper and reducer for the word count example
+ Implements the mapper and reducer for the word count example
"""
def __init__(self):
@@ -40,7 +43,7 @@ class WordCountTask(TetherTask):
{"name":"value","type":"long","order":"ignore"}]
}"""
outschema=midschema
- TetherTask.__init__(self,inschema,midschema,outschema)
+ avro.tether.tether_task.TetherTask.__init__(self, inschema, midschema, \
outschema)
#keep track of the partial sums of the counts
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic