[prev in list] [next in list] [prev in thread] [next in thread] 

List:       avro-commits
Subject:    [avro] 19/23: AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
From:       rskraba () apache ! org
Date:       2020-01-29 8:54:38
Message-ID: 20200129085420.90BC38B6A3 () gitbox ! apache ! org
[Download RAW message or body]

This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e9eee386a14b592a7cca55a1214c14c10d03a3b2
Author: Michael A. Smith <michael@smith-li.com>
AuthorDate: Tue Oct 22 08:49:44 2019 -0400

    AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
---
 lang/py/scripts/avro                          | 24 +++++----
 lang/py/setup.py                              |  7 ++-
 lang/py/src/avro/__init__.py                  | 10 ++--
 lang/py/src/avro/constants.py                 | 13 +++--
 lang/py/src/avro/datafile.py                  |  2 +-
 lang/py/src/avro/io.py                        | 63 ++++++++++++----------
 lang/py/src/avro/ipc.py                       | 27 ++++++----
 lang/py/src/avro/protocol.py                  | 32 +++++------
 lang/py/src/avro/schema.py                    | 31 +++++++----
 lang/py/src/avro/tether/__init__.py           | 15 +++---
 lang/py/src/avro/tether/tether_task.py        | 41 +++++++-------
 lang/py/src/avro/tether/tether_task_runner.py | 61 ++++++++++-----------
 lang/py/src/avro/tether/util.py               | 37 ++++++-------
 lang/py/src/avro/timezones.py                 |  9 +++-
 lang/py/src/avro/tool.py                      | 45 +++++++++-------
 lang/py/src/avro/txipc.py                     | 13 +++--
 lang/py/{src/avro => test}/__init__.py        | 10 ++--
 lang/py/test/av_bench.py                      |  7 ++-
 lang/py/test/gen_interop_data.py              | 40 +++++++++++---
 lang/py/test/mock_tether_parent.py            | 36 ++++++-------
 lang/py/test/sample_http_client.py            |  7 ++-
 lang/py/test/sample_http_server.py            |  3 ++
 lang/py/test/set_avro_test_path.py            |  7 +++
 lang/py/test/test_datafile.py                 | 77 +++++++++++++++------------
 lang/py/test/test_datafile_interop.py         | 43 +++++++++------
 lang/py/test/test_io.py                       | 59 ++++++++++----------
 lang/py/test/test_ipc.py                      |  7 +++
 lang/py/test/test_script.py                   | 11 ++--
 lang/py/test/test_tether_task.py              | 39 ++++++++------
 lang/py/test/test_tether_task_runner.py       | 67 ++++++++++++-----------
 lang/py/test/test_tether_word_count.py        | 15 +++---
 lang/py/test/txsample_http_client.py          |  5 ++
 lang/py/test/txsample_http_server.py          |  3 ++
 lang/py/test/word_count_task.py               | 49 +++++++++--------
 34 files changed, 533 insertions(+), 382 deletions(-)

diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro
index b320a39..1aac028 100644
--- a/lang/py/scripts/avro
+++ b/lang/py/scripts/avro
@@ -18,16 +18,19 @@
 
 """Command line utility for reading and writing Avro files."""
 
-from avro.io import DatumReader, DatumWriter
-from avro.datafile import DataFileReader, DataFileWriter
-import avro.schema
+from __future__ import absolute_import, division, print_function
 
-import json
 import csv
-from sys import stdout, stdin
-from itertools import ifilter, imap
+import json
 from functools import partial
+from itertools import ifilter, imap
 from os.path import splitext
+from sys import stdin, stdout
+
+import avro.schema
+from avro.datafile import DataFileReader, DataFileWriter
+from avro.io import DatumReader, DatumWriter
+
 
 class AvroError(Exception):
     pass
@@ -52,9 +55,9 @@ def print_csv(row):
 
 def select_printer(format):
     return {
-        "json" : print_json,
-        "json-pretty" : print_json_pretty,
-        "csv" : print_csv
+        "json": print_json,
+        "json-pretty": print_json_pretty,
+        "csv": print_csv
     }[format]
 
 def record_match(expr, record):
@@ -102,7 +105,7 @@ def print_avro(avro, opts):
 def print_schema(avro):
     schema = avro.meta["avro.schema"]
     # Pretty print
-    print json.dumps(json.loads(schema), indent=4)
+    print(json.dumps(json.loads(schema), indent=4))
 
 def cat(opts, args):
     if not args:
@@ -257,4 +260,3 @@ def main(argv=None):
 
 if __name__ == "__main__":
     main()
-
diff --git a/lang/py/setup.py b/lang/py/setup.py
index c2e07c6..b978092 100755
--- a/lang/py/setup.py
+++ b/lang/py/setup.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -7,9 +8,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +18,8 @@
 # limitations under the License.
 
 
+from __future__ import absolute_import, division, print_function
+
 import distutils.errors
 import glob
 import os
diff --git a/lang/py/src/avro/__init__.py b/lang/py/src/avro/__init__.py
index 47d1295..9a859e9 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/src/avro/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
+from __future__ import absolute_import, division, print_function
 
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
diff --git a/lang/py/src/avro/constants.py b/lang/py/src/avro/constants.py
index 66e31df..2197201 100644
--- a/lang/py/src/avro/constants.py
+++ b/lang/py/src/avro/constants.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,18 +8,18 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-Contains Constants for Python Avro
-"""
+"""Contains Constants for Python Avro"""
+
+from __future__ import absolute_import, division, print_function
 
 DATE = "date"
 DECIMAL = "decimal"
diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py
index 75a8e4a..a9c9c22 100644
--- a/lang/py/src/avro/datafile.py
+++ b/lang/py/src/avro/datafile.py
@@ -401,4 +401,4 @@ def generate_sixteen_random_bytes():
   try:
     return os.urandom(16)
   except NotImplementedError:
-    return [chr(random.randrange(256)) for i in range(16)]
+    return bytes(random.randrange(256) for i in range(16))
diff --git a/lang/py/src/avro/io.py b/lang/py/src/avro/io.py
index c36c5b3..b18b148 100644
--- a/lang/py/src/avro/io.py
+++ b/lang/py/src/avro/io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -38,6 +41,8 @@ uses the following mapping:
   * Schema booleans are implemented as bool.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import datetime
 import json
 import struct
@@ -183,7 +188,7 @@ class BinaryDecoder(object):
 
   def read_boolean(self):
     """
-    a boolean is written as a single byte 
+    a boolean is written as a single byte
     whose value is either 0 (false) or 1 (true).
     """
     return ord(self.read(1)) == 1
@@ -261,7 +266,7 @@ class BinaryDecoder(object):
 
   def read_bytes(self):
     """
-    Bytes are encoded as a long followed by that many bytes of data. 
+    Bytes are encoded as a long followed by that many bytes of data.
     """
     return self.read(self.read_long())
 
@@ -297,7 +302,7 @@ class BinaryDecoder(object):
 
   def read_time_millis_from_int(self):
     """
-    int is decoded as python time object which represents 
+    int is decoded as python time object which represents
     the number of milliseconds after midnight, 00:00:00.000.
     """
     milliseconds = self.read_int()
@@ -305,7 +310,7 @@ class BinaryDecoder(object):
 
   def read_time_micros_from_long(self):
     """
-    long is decoded as python time object which represents 
+    long is decoded as python time object which represents
     the number of microseconds after midnight, 00:00:00.000000.
     """
     microseconds = self.read_long()
@@ -313,17 +318,17 @@ class BinaryDecoder(object):
 
   def read_timestamp_millis_from_long(self):
     """
-    long is decoded as python datetime object which represents 
+    long is decoded as python datetime object which represents
     the number of milliseconds from the unix epoch, 1 January 1970.
     """
     timestamp_millis = self.read_long()
     timedelta = datetime.timedelta(microseconds=timestamp_millis * 1000)
-    unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, \
tzinfo=timezones.utc)  +    unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, \
0, 0, tzinfo=timezones.utc)  return unix_epoch_datetime + timedelta
 
   def read_timestamp_micros_from_long(self):
     """
-    long is decoded as python datetime object which represents 
+    long is decoded as python datetime object which represents
     the number of microseconds from the unix epoch, 1 January 1970.
     """
     timestamp_micros = self.read_long()
@@ -386,10 +391,10 @@ class BinaryEncoder(object):
     null is written as zero bytes
     """
     pass
-  
+
   def write_boolean(self, datum):
     """
-    a boolean is written as a single byte 
+    a boolean is written as a single byte
     whose value is either 0 (false) or 1 (true).
     """
     if datum:
@@ -399,7 +404,7 @@ class BinaryEncoder(object):
 
   def write_int(self, datum):
     """
-    int and long values are written using variable-length, zig-zag coding.    
+    int and long values are written using variable-length, zig-zag coding.
     """
     self.write_long(datum);
 
@@ -491,7 +496,7 @@ class BinaryEncoder(object):
         bits_to_write = unscaled_datum >> (8 * index)
         self.write(chr(bits_to_write & 0xff))
     else:
-      for i in range(offset_bits/8):
+      for i in range(offset_bits // 8):
         self.write(chr(0))
       for index in range(bytes_req-1, -1, -1):
         bits_to_write = unscaled_datum >> (8 * index)
@@ -499,7 +504,7 @@ class BinaryEncoder(object):
 
   def write_bytes(self, datum):
     """
-    Bytes are encoded as a long followed by that many bytes of data. 
+    Bytes are encoded as a long followed by that many bytes of data.
     """
     self.write_long(len(datum))
     self.write(struct.pack('%ds' % len(datum), datum))
@@ -591,32 +596,32 @@ class DatumReader(object):
           and w_type == r_type):
       return True
     elif (w_type == r_type == 'record' and
-          DatumReader.check_props(writers_schema, readers_schema, 
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
     elif (w_type == r_type == 'error' and
-          DatumReader.check_props(writers_schema, readers_schema, 
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
     elif (w_type == r_type == 'request'):
       return True
-    elif (w_type == r_type == 'fixed' and 
-          DatumReader.check_props(writers_schema, readers_schema, 
+    elif (w_type == r_type == 'fixed' and
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname', 'size'])):
       return True
-    elif (w_type == r_type == 'enum' and 
-          DatumReader.check_props(writers_schema, readers_schema, 
+    elif (w_type == r_type == 'enum' and
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
-    elif (w_type == r_type == 'map' and 
+    elif (w_type == r_type == 'map' and
           DatumReader.check_props(writers_schema.values,
                                   readers_schema.values, ['type'])):
       return True
-    elif (w_type == r_type == 'array' and 
+    elif (w_type == r_type == 'array' and
           DatumReader.check_props(writers_schema.items,
                                   readers_schema.items, ['type'])):
       return True
-    
+
     # Handle schema promotion
     if w_type == 'int' and r_type in ['long', 'float', 'double']:
       return True
@@ -633,7 +638,7 @@ class DatumReader(object):
     reader the "reader's schema".
     """
     self._writers_schema = writers_schema
-    self._readers_schema = readers_schema 
+    self._readers_schema = readers_schema
 
   # read/write properties
   def set_writers_schema(self, writers_schema):
@@ -644,7 +649,7 @@ class DatumReader(object):
     self._readers_schema = readers_schema
   readers_schema = property(lambda self: self._readers_schema,
                             set_readers_schema)
-  
+
   def read(self, decoder):
     if self.readers_schema is None:
       self.readers_schema = self.writers_schema
@@ -682,13 +687,13 @@ class DatumReader(object):
       else:
         return decoder.read_int()
     elif writers_schema.type == 'long':
-      if (hasattr(writers_schema, 'logical_type') and 
+      if (hasattr(writers_schema, 'logical_type') and
           writers_schema.logical_type == constants.TIME_MICROS):
         return decoder.read_time_micros_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and 
+      elif (hasattr(writers_schema, 'logical_type') and
             writers_schema.logical_type == constants.TIMESTAMP_MILLIS):
         return decoder.read_timestamp_millis_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and 
+      elif (hasattr(writers_schema, 'logical_type') and
             writers_schema.logical_type == constants.TIMESTAMP_MICROS):
         return decoder.read_timestamp_micros_from_long()
       else:
@@ -886,7 +891,7 @@ class DatumReader(object):
                  % (index_of_schema, len(writers_schema.schemas))
       raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
     selected_writers_schema = writers_schema.schemas[index_of_schema]
-    
+
     # read data
     return self.read_data(selected_writers_schema, readers_schema, decoder)
 
@@ -914,7 +919,7 @@ class DatumReader(object):
      * if the reader's record schema has a field that contains a default value,
        and writer's schema does not have a field with the same name, then the
        reader should use the default value from its field.
-     * if the reader's record schema has a field with no default value, and 
+     * if the reader's record schema has a field with no default value, and
        writer's schema does not have a field with the same name, then the
        field's value is unset.
     """
@@ -933,7 +938,7 @@ class DatumReader(object):
     if len(readers_fields_dict) > len(read_record):
       writers_fields_dict = writers_schema.fields_dict
       for field_name, field in readers_fields_dict.items():
-        if not writers_fields_dict.has_key(field_name):
+        if field_name not in writers_fields_dict:
           if field.has_default:
             field_val = self._read_default_value(field.type, field.default)
             read_record[field.name] = field_val
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index 8cbf07b..1bba4f7 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,17 +8,19 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-"""
-Support for inter-process calls.
-"""
+
+"""Support for inter-process calls."""
+
+from __future__ import absolute_import, division, print_function
+
 import httplib
 
 from avro import io, protocol, schema
@@ -189,7 +194,7 @@ class BaseRequestor(object):
       * a one-byte error flag boolean, followed by either:
         o if the error flag is false,
           the message response, serialized per the message's response schema.
-        o if the error flag is true, 
+        o if the error flag is true,
           the error, serialized per the message's error union schema.
     """
     # response metadata
@@ -267,11 +272,11 @@ class Responder(object):
     buffer_encoder = io.BinaryEncoder(buffer_writer)
     error = None
     response_metadata = {}
-    
+
     try:
       remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder)
       # handshake failure
-      if remote_protocol is None:  
+      if remote_protocol is None:
         return buffer_writer.getvalue()
 
       # read request using remote protocol
@@ -296,9 +301,9 @@ class Responder(object):
       # perform server logic
       try:
         response = self.invoke(local_message, request)
-      except AvroRemoteException, e:
+      except AvroRemoteException as e:
         error = e
-      except Exception, e:
+      except Exception as e:
         error = AvroRemoteException(str(e))
 
       # write response using local protocol
@@ -310,7 +315,7 @@ class Responder(object):
       else:
         writers_schema = local_message.errors
         self.write_error(writers_schema, error, buffer_encoder)
-    except schema.AvroException, e:
+    except schema.AvroException as e:
       error = AvroRemoteException(str(e))
       buffer_encoder = io.BinaryEncoder(StringIO())
       META_WRITER.write(response_metadata, buffer_encoder)
diff --git a/lang/py/src/avro/protocol.py b/lang/py/src/avro/protocol.py
index 117401c..9b27a45 100644
--- a/lang/py/src/avro/protocol.py
+++ b/lang/py/src/avro/protocol.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-
-##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -21,10 +18,15 @@
 
 from __future__ import absolute_import, division, print_function
 
-import hashlib
 import json
 
-import avro.schema
+from avro import schema
+
+try:
+  from hashlib import md5
+except ImportError:
+  from md5 import md5
+
 
 #
 # Constants
@@ -37,7 +39,7 @@ VALID_TYPE_SCHEMA_TYPES = ('enum', 'record', 'error', 'fixed')
 # Exceptions
 #
 
-class ProtocolParseException(avro.schema.AvroException):
+class ProtocolParseException(schema.AvroException):
   pass
 
 #
@@ -49,7 +51,7 @@ class Protocol(object):
   def _parse_types(self, types, type_names):
     type_objects = []
     for type in types:
-      type_object = avro.schema.make_avsc_object(type, type_names)
+      type_object = schema.make_avsc_object(type, type_names)
       if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
         fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
         raise ProtocolParseException(fail_msg)
@@ -92,7 +94,7 @@ class Protocol(object):
 
     self._props = {}
     self.set_prop('name', name)
-    type_names = avro.schema.Names()
+    type_names = schema.Names()
     if namespace is not None:
       self.set_prop('namespace', namespace)
       type_names.default_namespace = namespace
@@ -100,13 +102,13 @@ class Protocol(object):
       self.set_prop('types', self._parse_types(types, type_names))
     if messages is not None:
       self.set_prop('messages', self._parse_messages(messages, type_names))
-    self._md5 = hashlib.md5(str(self)).digest()
+    self._md5 = md5(str(self)).digest()
 
   # read-only properties
   name = property(lambda self: self.get_prop('name'))
   namespace = property(lambda self: self.get_prop('namespace'))
   fullname = property(lambda self:
-                      avro.schema.Name(self.name, self.namespace).fullname)
+                      schema.Name(self.name, self.namespace).fullname)
   types = property(lambda self: self.get_prop('types'))
   types_dict = property(lambda self: dict([(type.name, type)
                                            for type in self.types]))
@@ -123,7 +125,7 @@ class Protocol(object):
   def to_json(self):
     to_dump = {}
     to_dump['protocol'] = self.name
-    names = avro.schema.Names(default_namespace=self.namespace)
+    names = schema.Names(default_namespace=self.namespace)
     if self.namespace:
       to_dump['namespace'] = self.namespace
     if self.types:
@@ -148,20 +150,20 @@ class Message(object):
     if not isinstance(request, list):
       fail_msg = 'Request property not a list: %s' % request
       raise ProtocolParseException(fail_msg)
-    return avro.schema.RecordSchema(None, None, request, names, 'request')
+    return schema.RecordSchema(None, None, request, names, 'request')
 
   def _parse_response(self, response, names):
     if isinstance(response, basestring) and names.has_name(response, None):
       return names.get_name(response, None)
     else:
-      return avro.schema.make_avsc_object(response, names)
+      return schema.make_avsc_object(response, names)
 
   def _parse_errors(self, errors, names):
     if not isinstance(errors, list):
       fail_msg = 'Errors property not a list: %s' % errors
       raise ProtocolParseException(fail_msg)
     errors_for_parsing = {'type': 'error_union', 'declared_errors': errors}
-    return avro.schema.make_avsc_object(errors_for_parsing, names)
+    return schema.make_avsc_object(errors_for_parsing, names)
 
   def __init__(self,  name, request, response, errors=None, names=None):
     self._name = name
@@ -190,7 +192,7 @@ class Message(object):
 
   def to_json(self, names=None):
     if names is None:
-      names = avro.schema.Names()
+      names = schema.Names()
     to_dump = {}
     to_dump['request'] = self.request.to_json(names)
     to_dump['response'] = self.response.to_json(names)
diff --git a/lang/py/src/avro/schema.py b/lang/py/src/avro/schema.py
index 822d76c..06c1aeb 100644
--- a/lang/py/src/avro/schema.py
+++ b/lang/py/src/avro/schema.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -35,7 +38,10 @@ A schema may be one of:
   Null.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import json
+import sys
 from math import floor, log10
 
 from avro import constants
@@ -231,11 +237,11 @@ class Names(object):
 
   def has_name(self, name_attr, space_attr):
       test = Name(name_attr, space_attr, self.default_namespace).fullname
-      return self.names.has_key(test)
+      return test in self.names
 
   def get_name(self, name_attr, space_attr):
       test = Name(name_attr, space_attr, self.default_namespace).fullname
-      if not self.names.has_key(test):
+      if test not in self.names:
           return None
       return self.names[test]
 
@@ -270,7 +276,7 @@ class Names(object):
     if to_add.fullname in VALID_TYPES:
       fail_msg = '%s is a reserved type name.' % to_add.fullname
       raise SchemaParseException(fail_msg)
-    elif self.names.has_key(to_add.fullname):
+    elif to_add.fullname in self.names:
       fail_msg = 'The name "%s" is already in use.' % to_add.fullname
       raise SchemaParseException(fail_msg)
 
@@ -377,7 +383,7 @@ class Field(object):
     else:
       try:
         type_schema = make_avsc_object(type, names)
-      except Exception, e:
+      except Exception as e:
         fail_msg = 'Type property "%s" not a valid Avro schema: %s' % (type, e)
         raise SchemaParseException(fail_msg)
     self.set_prop('type', type_schema)
@@ -578,7 +584,7 @@ class ArraySchema(Schema):
     else:
       try:
         items_schema = make_avsc_object(items, names)
-      except SchemaParseException, e:
+      except SchemaParseException as e:
         fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' \
% (items, e, names.names.keys())  raise SchemaParseException(fail_msg)
 
@@ -652,7 +658,7 @@ class UnionSchema(Schema):
       else:
         try:
           new_schema = make_avsc_object(schema, names)
-        except Exception, e:
+        except Exception as e:
           raise SchemaParseException('Union item must be a valid Avro schema: %s' % \
str(e))  # check the new schema
       if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES
@@ -708,7 +714,7 @@ class RecordSchema(NamedSchema):
         # null values can have a default value of None
         has_default = False
         default = None
-        if field.has_key('default'):
+        if 'default' in field:
           has_default = True
           default = field.get('default')
 
@@ -978,10 +984,13 @@ def parse(json_string):
   # parse the JSON
   try:
     json_data = json.loads(json_string)
-  except Exception, e:
-    import sys
-    raise SchemaParseException('Error parsing JSON: %s, error = %s'
-                               % (json_string, e)), None, sys.exc_info()[2]
+  except Exception as e:
+    msg = 'Error parsing JSON: {}, error = {}'.format(json_string, e)
+    new_exception = SchemaParseException(msg)
+    traceback = sys.exc_info()[2]
+    if not hasattr(new_exception, 'with_traceback'):
+      raise (new_exception, None, traceback)  # Python 2 syntax
+    raise new_exception.with_traceback(traceback)
 
   # Initialize the names object
   names = Names()
diff --git a/lang/py/src/avro/tether/__init__.py \
b/lang/py/src/avro/tether/__init__.py index 0dbd3d8..c60edf9 100644
--- a/lang/py/src/avro/tether/__init__.py
+++ b/lang/py/src/avro/tether/__init__.py
@@ -1,4 +1,6 @@
-#
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,12 +17,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
-from .util import *
-from .tether_task import *
-from .tether_task_runner import *
+from __future__ import absolute_import, division, print_function
 
-__all__=util.__all__
-__all__+=tether_task.__all__
-__all__+=tether_task_runner.__all__
+from avro.tether.tether_task import HTTPRequestor, TaskType, TetherTask, \
inputProtocol, outputProtocol +from avro.tether.tether_task_runner import TaskRunner
+from avro.tether.util import find_port
diff --git a/lang/py/src/avro/tether/tether_task.py \
b/lang/py/src/avro/tether/tether_task.py index 23112a7..4e2004d 100644
--- a/lang/py/src/avro/tether/tether_task.py
+++ b/lang/py/src/avro/tether/tether_task.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import collections
 import io as pyio
@@ -30,6 +31,8 @@ from StringIO import StringIO
 from avro import io as avio
 from avro import ipc, protocol, schema
 
+__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", \
"HTTPRequestor"] +
 # create protocol objects for the input and output protocols
 # The build process should copy InputProtocol.avpr and OutputProtocol.avpr
 # into the same directory as this module
diff --git a/lang/py/src/avro/tether/tether_task_runner.py \
b/lang/py/src/avro/tether/tether_task_runner.py index b248ffd..64bee7b 100644
--- a/lang/py/src/avro/tether/tether_task_runner.py
+++ b/lang/py/src/avro/tether/tether_task_runner.py
@@ -1,30 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TaskRunner"]
-
-if __name__ == "__main__":
-  # Relative imports don't work when being run directly
-  from avro import tether
-  from avro.tether import TetherTask, find_port, inputProtocol
-
-else:
-  from . import TetherTask, find_port, inputProtocol
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import logging
 import sys
@@ -33,12 +26,16 @@ import traceback
 import weakref
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
+import avro.tether.tether_task
+import avro.tether.util
 from avro import ipc
 
+__all__ = ["TaskRunner"]
+
 
 class TaskRunnerResponder(ipc.Responder):
   """
-  The responder for the thethered process
+  The responder for the tethered process
   """
   def __init__(self,runner):
     """
@@ -46,7 +43,7 @@ class TaskRunnerResponder(ipc.Responder):
     ----------------------------------------------------------
     runner - Instance of TaskRunner
     """
-    ipc.Responder.__init__(self, inputProtocol)
+    ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol)
 
     self.log=logging.getLogger("TaskRunnerResponder")
 
@@ -148,7 +145,7 @@ class TaskRunner(object):
 
     self.log=logging.getLogger("TaskRunner:")
 
-    if not(isinstance(task,TetherTask)):
+    if not(isinstance(task, avro.tether.tether_task.TetherTask)):
       raise ValueError("task must be an instance of tether task")
     self.task=task
 
@@ -172,7 +169,7 @@ class TaskRunner(object):
                 testing
     """
 
-    port=find_port()
+    port = avro.tether.util.find_port()
     address=("localhost",port)
 
 
@@ -212,7 +209,7 @@ if __name__ == '__main__':
   logging.basicConfig(level=logging.INFO)
 
   if (len(sys.argv)<=1):
-    print "Error: tether_task_runner.__main__: Usage: tether_task_runner \
task_package.task_module.TaskClass" +    print("Error: tether_task_runner.__main__: \
                Usage: tether_task_runner task_package.task_module.TaskClass")
     raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")
 
   fullcls=sys.argv[1]
diff --git a/lang/py/src/avro/tether/util.py b/lang/py/src/avro/tether/util.py
index cbeeef0..3d8ad3a 100644
--- a/lang/py/src/avro/tether/util.py
+++ b/lang/py/src/avro/tether/util.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
+#!/usr/bin/env python
 
-__all__=["find_port"]
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import socket
 
diff --git a/lang/py/src/avro/timezones.py b/lang/py/src/avro/timezones.py
index a4985b4..a306f6d 100644
--- a/lang/py/src/avro/timezones.py
+++ b/lang/py/src/avro/timezones.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,15 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from datetime import datetime, timedelta, tzinfo
 
 
diff --git a/lang/py/src/avro/tool.py b/lang/py/src/avro/tool.py
index 6a92fee..3c0c228 100644
--- a/lang/py/src/avro/tool.py
+++ b/lang/py/src/avro/tool.py
@@ -1,4 +1,6 @@
-#! /usr/bin/env python
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -6,20 +8,23 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 Command-line tool
 
 NOTE: The API for the command-line tool is experimental.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import sys
 import threading
 import urlparse
@@ -37,7 +42,7 @@ class GenericResponder(ipc.Responder):
 
   def invoke(self, message, request):
     if message.name == self.msg:
-      print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+      print("Message: %s Datum: %s" % (message.name, self.datum), file=sys.stderr)
       # server will shut down after processing a single Avro request
       global server_should_shutdown
       server_should_shutdown = True
@@ -55,7 +60,7 @@ class GenericHandler(BaseHTTPRequestHandler):
     resp_writer = ipc.FramedWriter(self.wfile)
     resp_writer.write_framed_message(resp_body)
     if server_should_shutdown:
-      print >> sys.stderr, "Shutting down server."
+      print("Shutting down server.", file=sys.stderr)
       quitter = threading.Thread(target=self.server.shutdown)
       quitter.daemon = True
       quitter.start()
@@ -68,10 +73,10 @@ def run_server(uri, proto, msg, datum):
   server_should_shutdown = False
   responder = GenericResponder(proto, msg, datum)
   server = HTTPServer(server_addr, GenericHandler)
-  print "Port: %s" % server.server_port
+  print("Port: %s" % server.server_port)
   sys.stdout.flush()
   server.allow_reuse_address = True
-  print >> sys.stderr, "Starting server."
+  print("Starting server.", file=sys.stderr)
   server.serve_forever()
 
 def send_message(uri, proto, msg, datum):
@@ -79,7 +84,7 @@ def send_message(uri, proto, msg, datum):
   client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
   proto_json = file(proto, 'r').read()
   requestor = ipc.Requestor(protocol.parse(proto_json), client)
-  print requestor.request(msg, datum)
+  print(requestor.request(msg, datum))
 
 def file_or_stdin(f):
   if f == "-":
@@ -89,20 +94,20 @@ def file_or_stdin(f):
 
 def main(args=sys.argv):
   if len(args) == 1:
-    print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0]
+    print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0])
     return 1
 
   if args[1] == "dump":
     if len(args) != 3:
-      print "Usage: %s dump input_file" % args[0]
+      print("Usage: %s dump input_file" % args[0])
       return 1
     for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
-      print repr(d)
+      print(repr(d))
   elif args[1] == "rpcreceive":
     usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
     usage_str += "message_name (-data d | -file f)"
     if len(args) not in [5, 7]:
-      print usage_str
+      print(usage_str)
       return 1
     uri, proto, msg = args[2:5]
     datum = None
@@ -111,19 +116,19 @@ def main(args=sys.argv):
         reader = open(args[6], 'rb')
         datum_reader = io.DatumReader()
         dfr = datafile.DataFileReader(reader, datum_reader)
-        datum = dfr.next()
+        datum = next(dfr)
       elif args[5] == "-data":
-        print "JSON Decoder not yet implemented."
+        print("JSON Decoder not yet implemented.")
         return 1
       else:
-        print usage_str
+        print(usage_str)
         return 1
     run_server(uri, proto, msg, datum)
   elif args[1] == "rpcsend":
     usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
     usage_str += "message_name (-data d | -file f)"
     if len(args) not in [5, 7]:
-      print usage_str
+      print(usage_str)
       return 1
     uri, proto, msg = args[2:5]
     datum = None
@@ -132,15 +137,15 @@ def main(args=sys.argv):
         reader = open(args[6], 'rb')
         datum_reader = io.DatumReader()
         dfr = datafile.DataFileReader(reader, datum_reader)
-        datum = dfr.next()
+        datum = next(dfr)
       elif args[5] == "-data":
-        print "JSON Decoder not yet implemented."
+        print("JSON Decoder not yet implemented.")
         return 1
       else:
-        print usage_str
+        print(usage_str)
         return 1
     send_message(uri, proto, msg, datum)
   return 0
-  
+
 if __name__ == "__main__":
   sys.exit(main(sys.argv))
diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py
index 72d63a6..66ca726 100644
--- a/lang/py/src/avro/txipc.py
+++ b/lang/py/src/avro/txipc.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,10 +16,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
+
+from __future__ import absolute_import, division, print_function
+
 from zope.interface import implements
 
 from avro import io, ipc
@@ -29,6 +29,11 @@ from twisted.web.client import Agent
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IBodyProducer
 
+try:
+  from cStringIO import StringIO
+except ImportError:
+  from StringIO import StringIO
+
 
 class TwistedRequestor(ipc.BaseRequestor):
   """A Twisted-compatible requestor. Returns a Deferred that will fire with the
diff --git a/lang/py/src/avro/__init__.py b/lang/py/test/__init__.py
similarity index 89%
copy from lang/py/src/avro/__init__.py
copy to lang/py/test/__init__.py
index 47d1295..a2a5bef 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/test/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,11 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
-
diff --git a/lang/py/test/av_bench.py b/lang/py/test/av_bench.py
index e90c987..1e6a05d 100644
--- a/lang/py/test/av_bench.py
+++ b/lang/py/test/av_bench.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import sys
 import time
 from random import choice, randint, sample
@@ -72,5 +75,5 @@ def t(f, *args):
 
 if __name__ == "__main__":
     n = int(sys.argv[1])
-    print "Write %0.4f" % t(write, n)
-    print "Read %0.4f" % t(read)
+    print("Write %0.4f" % t(write, n))
+    print("Read %0.4f" % t(read))
diff --git a/lang/py/test/gen_interop_data.py b/lang/py/test/gen_interop_data.py
index 336434e..13bf86c 100644
--- a/lang/py/test/gen_interop_data.py
+++ b/lang/py/test/gen_interop_data.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,15 +14,33 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import os
 import sys
 
 from avro import datafile, io, schema
 
+CODECS_TO_VALIDATE = ('null', 'deflate')
+
+try:
+  import snappy
+  CODECS_TO_VALIDATE += ('snappy',)
+except ImportError:
+  print('Snappy not present, will skip generating it.')
+try:
+  import zstandard
+  CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+  print('Zstandard not present, will skip generating it.')
+
 DATUM = {
   'intField': 12,
-  'longField': 15234324L,
+  'longField': 15234324,
   'stringField': unicode('hey'),
   'boolField': True,
   'floatField': 1234.0,
@@ -37,10 +56,15 @@ DATUM = {
 }
 
 if __name__ == "__main__":
-  interop_schema = schema.parse(open(sys.argv[1], 'r').read())
-  writer = open(sys.argv[2], 'wb')
-  datum_writer = io.DatumWriter()
-  # NB: not using compression
-  dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema)
-  dfw.append(DATUM)
-  dfw.close()
+  for codec in CODECS_TO_VALIDATE:
+    interop_schema = schema.parse(open(sys.argv[1], 'r').read())
+    filename = sys.argv[2]
+    if codec != 'null':
+      base, ext = os.path.splitext(filename)
+      filename = base + "_" + codec + ext
+    writer = open(filename, 'wb')
+    datum_writer = io.DatumWriter()
+    # NB: not using compression
+    dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema, codec=codec)
+    dfw.append(DATUM)
+    dfw.close()
diff --git a/lang/py/test/mock_tether_parent.py b/lang/py/test/mock_tether_parent.py
index c82e249..88d84dd 100644
--- a/lang/py/test/mock_tether_parent.py
+++ b/lang/py/test/mock_tether_parent.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,45 +17,36 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import socket
 import sys
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
+import avro.tether.tether_task
+import avro.tether.util
 import set_avro_test_path
-from avro import ipc, protocol, tether
-
-
-def find_port():
-  """
-  Return an unbound port
-  """
-  s=socket.socket()
-  s.bind(("127.0.0.1",0))
-
-  port=s.getsockname()[1]
-  s.close()
-
-  return port
+from avro import ipc, protocol
 
-SERVER_ADDRESS = ('localhost', find_port())
+SERVER_ADDRESS = ('localhost', avro.tether.util.find_port())
 
 class MockParentResponder(ipc.Responder):
   """
   The responder for the mocked parent
   """
   def __init__(self):
-    ipc.Responder.__init__(self, tether.outputProtocol)
+    ipc.Responder.__init__(self, avro.tether.tether_task.outputProtocol)
 
   def invoke(self, message, request):
     if message.name=='configure':
-      print "MockParentResponder: Recieved 'configure': \
inputPort={0}".format(request["port"]) +      print("MockParentResponder: Recieved \
'configure': inputPort={0}".format(request["port"]))  
     elif message.name=='status':
-      print "MockParentResponder: Recieved 'status': \
message={0}".format(request["message"]) +      print("MockParentResponder: Recieved \
'status': message={0}".format(request["message"]))  elif message.name=='fail':
-      print "MockParentResponder: Recieved 'fail': \
message={0}".format(request["message"]) +      print("MockParentResponder: Recieved \
'fail': message={0}".format(request["message"]))  else:
-      print "MockParentResponder: Recieved {0}".format(message.name)
+      print("MockParentResponder: Recieved {0}".format(message.name))
 
     # flush the output so it shows up in the parent process
     sys.stdout.flush()
@@ -85,7 +79,7 @@ if __name__ == '__main__':
       raise ValueError("Usage: mock_tether_parent start_server port")
 
     SERVER_ADDRESS=(SERVER_ADDRESS[0],port)
-    print "mock_tether_parent: Launching Server on Port: \
{0}".format(SERVER_ADDRESS[1]) +    print("mock_tether_parent: Launching Server on \
Port: {0}".format(SERVER_ADDRESS[1]))  
     # flush the output so it shows up in the parent process
     sys.stdout.flush()
diff --git a/lang/py/test/sample_http_client.py b/lang/py/test/sample_http_client.py
index 62c91fd..02b8421 100644
--- a/lang/py/test/sample_http_client.py
+++ b/lang/py/test/sample_http_client.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,8 +14,12 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import sys
 
 from avro import ipc, protocol
@@ -78,7 +83,7 @@ if __name__ == '__main__':
   # build the parameters for the request
   params = {}
   params['message'] = message
-   
+
   # send the requests and print the result
   for msg_count in range(num_messages):
     requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
diff --git a/lang/py/test/sample_http_server.py b/lang/py/test/sample_http_server.py
index e412ab5..c680afb 100644
--- a/lang/py/test/sample_http_server.py
+++ b/lang/py/test/sample_http_server.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
 from avro import ipc, protocol
diff --git a/lang/py/test/set_avro_test_path.py b/lang/py/test/set_avro_test_path.py
index 8e47faf..fd395da 100644
--- a/lang/py/test/set_avro_test_path.py
+++ b/lang/py/test/set_avro_test_path.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,6 +16,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 Module adjusts the path PYTHONPATH so the unittests
 will work even if an egg for AVRO is already installed.
@@ -28,6 +32,9 @@ being built. To work around this the unittests import this module \
before  importing AVRO. This module in turn adjusts the python path so that the test
 build of AVRO is higher on the path then any installed eggs.
 """
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import sys
 
diff --git a/lang/py/test/test_datafile.py b/lang/py/test/test_datafile.py
index 2b7061c..bceb071 100644
--- a/lang/py/test/test_datafile.py
+++ b/lang/py/test/test_datafile.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import unittest
 
@@ -56,25 +62,30 @@ try:
   import snappy
   CODECS_TO_VALIDATE += ('snappy',)
 except ImportError:
-  print 'Snappy not present, will skip testing it.'
+  print('Snappy not present, will skip testing it.')
+try:
+  import zstandard
+  CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+  print('Zstandard not present, will skip testing it.')
 
 # TODO(hammer): clean up written files with ant, not os.remove
 class TestDataFile(unittest.TestCase):
   def test_round_trip(self):
-    print ''
-    print 'TEST ROUND TRIP'
-    print '==============='
-    print ''
+    print('')
+    print('TEST ROUND TRIP')
+    print('===============')
+    print('')
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
-        print ''
-        print 'SCHEMA NUMBER %d' % (i + 1)
-        print '================'
-        print ''
-        print 'Schema: %s' % example_schema
-        print 'Datum: %s' % datum
-        print 'Codec: %s' % codec
+        print('')
+        print('SCHEMA NUMBER %d' % (i + 1))
+        print('================')
+        print('')
+        print('Schema: %s' % example_schema)
+        print('Datum: %s' % datum)
+        print('Codec: %s' % codec)
 
         # write data in binary to file 10 times
         writer = open(FILENAME, 'wb')
@@ -93,30 +104,30 @@ class TestDataFile(unittest.TestCase):
         for datum in dfr:
           round_trip_data.append(datum)
 
-        print 'Round Trip Data: %s' % round_trip_data
-        print 'Round Trip Data Length: %d' % len(round_trip_data)
+        print('Round Trip Data: %s' % round_trip_data)
+        print('Round Trip Data Length: %d' % len(round_trip_data))
         is_correct = [datum] * 10 == round_trip_data
         if is_correct: correct += 1
-        print 'Correct Round Trip: %s' % is_correct
-        print ''
+        print('Correct Round Trip: %s' % is_correct)
+        print('')
     os.remove(FILENAME)
     self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
   def test_append(self):
-    print ''
-    print 'TEST APPEND'
-    print '==========='
-    print ''
+    print('')
+    print('TEST APPEND')
+    print('===========')
+    print('')
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
-        print ''
-        print 'SCHEMA NUMBER %d' % (i + 1)
-        print '================'
-        print ''
-        print 'Schema: %s' % example_schema
-        print 'Datum: %s' % datum
-        print 'Codec: %s' % codec
+        print('')
+        print('SCHEMA NUMBER %d' % (i + 1))
+        print('================')
+        print('')
+        print('Schema: %s' % example_schema)
+        print('Datum: %s' % datum)
+        print('Codec: %s' % codec)
 
         # write data in binary to file once
         writer = open(FILENAME, 'wb')
@@ -141,12 +152,12 @@ class TestDataFile(unittest.TestCase):
         for datum in dfr:
           appended_data.append(datum)
 
-        print 'Appended Data: %s' % appended_data
-        print 'Appended Data Length: %d' % len(appended_data)
+        print('Appended Data: %s' % appended_data)
+        print('Appended Data Length: %d' % len(appended_data))
         is_correct = [datum] * 10 == appended_data
         if is_correct: correct += 1
-        print 'Correct Appended: %s' % is_correct
-        print ''
+        print('Correct Appended: %s' % is_correct)
+        print('')
     os.remove(FILENAME)
     self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
diff --git a/lang/py/test/test_datafile_interop.py \
b/lang/py/test/test_datafile_interop.py index ee02f99..329b9a1 100644
--- a/lang/py/test/test_datafile_interop.py
+++ b/lang/py/test/test_datafile_interop.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import unittest
 
@@ -22,20 +28,27 @@ from avro import datafile, io
 
 class TestDataFileInterop(unittest.TestCase):
   def test_interop(self):
-    print ''
-    print 'TEST INTEROP'
-    print '============'
-    print ''
+    print()
+    print('TEST INTEROP')
+    print('============')
+    print()
     for f in os.listdir('@INTEROP_DATA_DIR@'):
-      print 'READING %s' % f
-      print ''
-
-      # read data in binary from file
-      reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
-      datum_reader = io.DatumReader()
-      dfr = datafile.DataFileReader(reader, datum_reader)
-      for datum in dfr:
-        assert datum is not None
+      base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
+      if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+        print('READING %s' % f)
+        print('')
+
+        # read data in binary from file
+        reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        i = 0
+        for i, datum in enumerate(dfr, 1):
+          assert datum is not None
+        assert i > 0
+      else:
+        print('SKIPPING %s due to an unsupported codec' % f)
+        print('')
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py
index 533aa40..2d734a5 100644
--- a/lang/py/test/test_io.py
+++ b/lang/py/test/test_io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import datetime
 import unittest
 from binascii import hexlify
@@ -27,7 +33,6 @@ except ImportError:
   from StringIO import StringIO
 
 
-
 SCHEMAS_TO_VALIDATE = (
   ('"null"', None),
   ('"boolean"', True),
@@ -112,7 +117,7 @@ DEFAULT_VALUE_EXAMPLES = (
   ('"string"', '"foo"', u'foo'),
   ('"bytes"', '"\u00FF\u00FF"', u'\xff\xff'),
   ('"int"', '5', 5),
-  ('"long"', '5', 5L),
+  ('"long"', '5', 5),
   ('"float"', '1.1', 1.1),
   ('"double"', '1.1', 1.1),
   ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'),
@@ -148,10 +153,10 @@ def avro_hexlify(reader):
   return ' '.join(bytes)
 
 def print_test_name(test_name):
-  print ''
-  print test_name
-  print '=' * len(test_name)
-  print ''
+  print('')
+  print(test_name)
+  print('=' * len(test_name))
+  print('')
 
 def write_datum(datum, writers_schema):
   writer = StringIO()
@@ -170,17 +175,17 @@ def check_binary_encoding(number_type):
   print_test_name('TEST BINARY %s ENCODING' % number_type.upper())
   correct = 0
   for datum, hex_encoding in BINARY_ENCODINGS:
-    print 'Datum: %d' % datum
-    print 'Correct Encoding: %s' % hex_encoding
+    print('Datum: %d' % datum)
+    print('Correct Encoding: %s' % hex_encoding)
 
     writers_schema = schema.parse('"%s"' % number_type.lower())
     writer, encoder, datum_writer = write_datum(datum, writers_schema)
     writer.seek(0)
     hex_val = avro_hexlify(writer)
 
-    print 'Read Encoding: %s' % hex_val
+    print('Read Encoding: %s' % hex_val)
     if hex_encoding == hex_val: correct += 1
-    print ''
+    print('')
   return correct
 
 def check_skip_number(number_type):
@@ -188,7 +193,7 @@ def check_skip_number(number_type):
   correct = 0
   for value_to_skip, hex_encoding in BINARY_ENCODINGS:
     VALUE_TO_READ = 6253
-    print 'Value to Skip: %d' % value_to_skip
+    print('Value to Skip: %d' % value_to_skip)
 
     # write the value to skip and a known value
     writers_schema = schema.parse('"%s"' % number_type.lower())
@@ -204,11 +209,11 @@ def check_skip_number(number_type):
     datum_reader = io.DatumReader(writers_schema)
     read_value = datum_reader.read(decoder)
 
-    print 'Read Value: %d' % read_value
+    print('Read Value: %d' % read_value)
     if read_value == VALUE_TO_READ: correct += 1
-    print ''
+    print('')
   return correct
-    
+
 class TestIO(unittest.TestCase):
   #
   # BASIC FUNCTIONALITY
@@ -218,10 +223,10 @@ class TestIO(unittest.TestCase):
     print_test_name('TEST VALIDATE')
     passed = 0
     for example_schema, datum in SCHEMAS_TO_VALIDATE:
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
+      print('Schema: %s' % example_schema)
+      print('Datum: %s' % datum)
       validated = io.validate(schema.parse(example_schema), datum)
-      print 'Valid: %s' % validated
+      print('Valid: %s' % validated)
       if validated: passed += 1
     self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
 
@@ -229,14 +234,14 @@ class TestIO(unittest.TestCase):
     print_test_name('TEST ROUND TRIP')
     correct = 0
     for example_schema, datum in SCHEMAS_TO_VALIDATE:
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
+      print('Schema: %s' % example_schema)
+      print('Datum: %s' % datum)
 
       writers_schema = schema.parse(example_schema)
       writer, encoder, datum_writer = write_datum(datum, writers_schema)
       round_trip_datum = read_datum(writer, writers_schema)
 
-      print 'Round Trip Datum: %s' % round_trip_datum
+      print('Round Trip Datum: %s' % round_trip_datum)
       if isinstance(round_trip_datum, Decimal):
         round_trip_datum = round_trip_datum.to_eng_string()
         datum = str(datum)
@@ -283,8 +288,8 @@ class TestIO(unittest.TestCase):
         readers_schema = schema.parse(rs)
         writer, enc, dw = write_datum(datum_to_write, writers_schema)
         datum_read = read_datum(writer, writers_schema, readers_schema)
-        print 'Writer: %s Reader: %s' % (writers_schema, readers_schema)
-        print 'Datum Read: %s' % datum_read
+        print('Writer: %s Reader: %s' % (writers_schema, readers_schema))
+        print('Datum Read: %s' % datum_read)
         if datum_read != datum_to_write: incorrect += 1
     self.assertEquals(incorrect, 0)
 
@@ -320,7 +325,7 @@ class TestIO(unittest.TestCase):
 
       writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
       datum_read = read_datum(writer, writers_schema, readers_schema)
-      print 'Datum Read: %s' % datum_read
+      print('Datum Read: %s' % datum_read)
       if datum_to_read == datum_read: correct += 1
     self.assertEquals(correct, len(DEFAULT_VALUE_EXAMPLES))
 
@@ -352,7 +357,7 @@ class TestIO(unittest.TestCase):
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
     datum_read = read_datum(writer, writers_schema, readers_schema)
-    print 'Datum Read: %s' % datum_read
+    print('Datum Read: %s' % datum_read)
     self.assertEquals(datum_to_read, datum_read)
 
   def test_field_order(self):
@@ -368,7 +373,7 @@ class TestIO(unittest.TestCase):
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
     datum_read = read_datum(writer, writers_schema, readers_schema)
-    print 'Datum Read: %s' % datum_read
+    print('Datum Read: %s' % datum_read)
     self.assertEquals(datum_to_read, datum_read)
 
   def test_type_exception(self):
diff --git a/lang/py/test/test_ipc.py b/lang/py/test/test_ipc.py
index 575a0c9..bc9bd21 100644
--- a/lang/py/test/test_ipc.py
+++ b/lang/py/test/test_ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,10 +16,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 There are currently no IPC tests within python, in part because there are no
 servers yet available.
 """
+
+from __future__ import absolute_import, division, print_function
+
 import unittest
 
 import set_avro_test_path
diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py
index 214fc15..bd0cb4d 100644
--- a/lang/py/test/test_script.py
+++ b/lang/py/test/test_script.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,15 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import csv
 import json
 import unittest
@@ -125,7 +130,7 @@ class TestCat(unittest.TestCase):
 
     def test_json_pretty(self):
         out = self._run("--format", "json-pretty", "-n", "1", raw=1)
-        assert out.strip() == _JSON_PRETTY.strip()
+        self.assertEqual(out.strip(), _JSON_PRETTY.strip())
 
     def test_version(self):
         check_output([SCRIPT, "cat", "--version"])
diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py
index 9933070..85ed9cb 100644
--- a/lang/py/test/test_tether_task.py
+++ b/lang/py/test/test_tether_task.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,15 +17,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+from __future__ import absolute_import, division, print_function
 
 import os
+import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.tether.tether_task
+import avro.tether.util
+import mock_tether_parent
 import set_avro_test_path
+from avro import io as avio
+from avro import schema, tether
+from word_count_task import WordCountTask
 
 
 class TestTetherTask(unittest.TestCase):
@@ -34,15 +44,6 @@ class TestTetherTask(unittest.TestCase):
     Test that the thether_task is working. We run the mock_tether_parent in a \
separate  subprocess
     """
-    from avro import tether
-    from avro import io as avio
-    from avro import schema
-    from avro.tether import HTTPRequestor,inputProtocol, find_port
-
-    import StringIO
-    import mock_tether_parent
-    from word_count_task import WordCountTask
-
     task=WordCountTask()
 
     proc=None
@@ -51,13 +52,13 @@ class TestTetherTask(unittest.TestCase):
       # env["AVRO_TETHER_OUTPUT_PORT"]=output_port
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      server_port=find_port()
+      server_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", \
                pyfile,"start_server","{0}".format(server_port)])
-      input_port=find_port()
+      input_port = avro.tether.util.find_port()
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       # Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started  # so we give the subprocess time to start up
       time.sleep(1)
@@ -68,7 +69,11 @@ class TestTetherTask(unittest.TestCase):
 
       #***************************************************************
       # Test the mapper
-      task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema))
+      task.configure(
+        avro.tether.tether_task.TaskType.MAP,
+        str(task.inschema),
+        str(task.midschema)
+      )
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
@@ -84,7 +89,11 @@ class TestTetherTask(unittest.TestCase):
       task.input(data,1)
 
       # Test the reducer
-      task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema))
+      task.configure(
+        avro.tether.tether_task.TaskType.REDUCE,
+        str(task.midschema),
+        str(task.outschema)
+      )
 
       # Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
diff --git a/lang/py/test/test_tether_task_runner.py \
b/lang/py/test/test_tether_task_runner.py index 3832dbe..985eb3c 100644
--- a/lang/py/test/test_tether_task_runner.py
+++ b/lang/py/test/test_tether_task_runner.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,28 +17,29 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
+import logging
 import os
+import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.tether.tether_task
+import avro.tether.tether_task_runner
+import avro.tether.util
+import mock_tether_parent
 import set_avro_test_path
+from avro import io as avio
+from word_count_task import WordCountTask
 
 
 class TestTetherTaskRunner(unittest.TestCase):
-  """ unit test for a tethered task runner.
-  """
+  """unit test for a tethered task runner."""
 
   def test1(self):
-    from word_count_task import WordCountTask
-    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, \
                TaskType
-    from avro import io as avio
-    import mock_tether_parent
-    import subprocess
-    import StringIO
-    import logging
-
     # set the logging level to debug so that debug messages are printed
     logging.basicConfig(level=logging.DEBUG)
 
@@ -44,30 +48,34 @@ class TestTetherTaskRunner(unittest.TestCase):
       # launch the server in a separate process
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      parent_port=find_port()
+      parent_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", \
                pyfile,"start_server","{0}".format(parent_port)])
-      input_port=find_port()
+      input_port = avro.tether.util.find_port()
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       # Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started  # so we give the subprocess time to start up
       time.sleep(1)
 
-      runner=TaskRunner(WordCountTask())
+      runner = avro.tether.tether_task_runner.TaskRunner(WordCountTask())
 
       runner.start(outputport=parent_port,join=False)
 
-      # Test sending various messages to the server and ensuring they are
-      # processed correctly
-      requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol)
 +      # Test sending various messages to the server and ensuring they are processed \
correctly +      requestor = avro.tether.tether_task.HTTPRequestor(
+          "localhost", runner.server.server_address[1], \
avro.tether.tether_task.inputProtocol)  
       # TODO: We should validate that open worked by grabbing the STDOUT of the \
subproces  # and ensuring that it outputted the correct message.
 
       # Test the mapper
-      requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)})
 +      requestor.request("configure", {
+        "taskType": avro.tether.tether_task.TaskType.MAP,
+        "inSchema": str(runner.task.inschema),
+        "outSchema": str(runner.task.midschema)
+      })
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
@@ -83,8 +91,12 @@ class TestTetherTaskRunner(unittest.TestCase):
       # Call input to simulate calling map
       requestor.request("input",{"data":data,"count":1})
 
-      #Test the reducer
-      requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)})
 +      # Test the reducer
+      requestor.request("configure", {
+        "taskType": avro.tether.tether_task.TaskType.REDUCE,
+        "inSchema": str(runner.task.midschema),
+        "outSchema": str(runner.task.outschema)}
+      )
 
       #Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
@@ -133,15 +145,6 @@ class TestTetherTaskRunner(unittest.TestCase):
     as our main script everything works as expected. We do this by using subprocess \
to run it  in a separate thread.
     """
-    from word_count_task import WordCountTask
-    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, \
                TaskType
-    from avro.tether import tether_task_runner
-    from avro import io as avio
-    import mock_tether_parent
-    import subprocess
-    import StringIO
-
-
     proc=None
 
     runnerproc=None
@@ -149,7 +152,7 @@ class TestTetherTaskRunner(unittest.TestCase):
       #launch the server in a separate process
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      parent_port=find_port()
+      parent_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", \
pyfile,"start_server","{0}".format(parent_port)]) @@ -164,14 +167,14 @@ class \
TestTetherTaskRunner(unittest.TestCase):  \
env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}  \
env["PYTHONPATH"]=':'.join(sys.path)  
-      runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env)
 +      runnerproc = subprocess.Popen(["python", \
avro.tether.tether_task_runner.__file__, "word_count_task.WordCountTask"],env=env)  
       #possible race condition wait for the process to start
       time.sleep(1)
 
 
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       #Possible race condition? open tries to connect to the subprocess before the \
subprocess is fully started  #so we give the subprocess time to start up
       time.sleep(1)
diff --git a/lang/py/test/test_tether_word_count.py \
b/lang/py/test/test_tether_word_count.py index d2f1858..8c3fb08 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import inspect
 import os
 import subprocess
@@ -25,8 +30,7 @@ import set_avro_test_path
 
 
 class TestTetherWordCount(unittest.TestCase):
-  """ unittest for a python tethered map-reduce job.
-  """
+  """unittest for a python tethered map-reduce job."""
 
   def _write_lines(self,lines,fname):
     """
@@ -72,7 +76,7 @@ class TestTetherWordCount(unittest.TestCase):
       words=line.split()
 
       for w in words:
-        if not(counts.has_key(w.strip())):
+        if not(w.strip() in counts):
           counts[w.strip()]=0
 
         counts[w.strip()]=counts[w.strip()]+1
@@ -92,7 +96,6 @@ class TestTetherWordCount(unittest.TestCase):
     import avro
 
     import subprocess
-    import StringIO
     import shutil
     import tempfile
     import inspect
@@ -182,11 +185,11 @@ python -m avro.tether.tether_task_runner \
word_count_task.WordCountTask  exhf.close()
 
       # make it world executable
-      os.chmod(exfile,0755)
+      os.chmod(exfile,0o755)
 
       args.extend(["--program",exfile])
 
-      print "Command:\n\t{0}".format(" ".join(args))
+      print("Command:\n\t{0}".format(" ".join(args)))
       proc=subprocess.Popen(args)
 
 
diff --git a/lang/py/test/txsample_http_client.py \
b/lang/py/test/txsample_http_client.py index dba4ade..28c2c28 100644
--- a/lang/py/test/txsample_http_client.py
+++ b/lang/py/test/txsample_http_client.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,8 +14,12 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import sys
 
 from avro import protocol, txipc
diff --git a/lang/py/test/txsample_http_server.py \
b/lang/py/test/txsample_http_server.py index 604ef54..fafaecd 100644
--- a/lang/py/test/txsample_http_server.py
+++ b/lang/py/test/txsample_http_server.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from avro import ipc, protocol, txipc
 from twisted.internet import reactor
 from twisted.web import server
diff --git a/lang/py/test/word_count_task.py b/lang/py/test/word_count_task.py
index 24f2da2..8181340 100644
--- a/lang/py/test/word_count_task.py
+++ b/lang/py/test/word_count_task.py
@@ -1,33 +1,36 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["WordCountTask"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import logging
 
-from avro.tether import TetherTask
+import avro.tether.tether_task
+
+__all__ = ["WordCountTask"]
 
 
 #TODO::Make the logging level a parameter we can set
 #logging.basicConfig(level=logging.INFO)
-class WordCountTask(TetherTask):
+class WordCountTask(avro.tether.tether_task.TetherTask):
   """
-  Implements the mappper and reducer for the word count example
+  Implements the mapper and reducer for the word count example
   """
 
   def __init__(self):
@@ -40,7 +43,7 @@ class WordCountTask(TetherTask):
               {"name":"value","type":"long","order":"ignore"}]
               }"""
     outschema=midschema
-    TetherTask.__init__(self,inschema,midschema,outschema)
+    avro.tether.tether_task.TetherTask.__init__(self, inschema, midschema, \
outschema)  
 
     #keep track of the partial sums of the counts


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic