Source code for tables.tests.common

########################################################################
#
# License: BSD
# Created: 2005-05-24
# Author: Ivan Vilata i Balaguer - ivan@selidor.net
#
# $Id$
#
########################################################################

"""Utilities for PyTables' test suites."""

import os
import re
import sys
import time
import locale
import platform
import tempfile
import warnings
from distutils.version import LooseVersion

import unittest

import numpy
import numexpr

import tables
from tables.utils import detect_number_of_cores
from tables.req_versions import min_blosc_bitshuffle_version

hdf5_version = LooseVersion(tables.hdf5_version)
blosc_version = LooseVersion(tables.which_lib_version("blosc")[1])


verbose = False
"""Show detailed output of the testing process."""

heavy = False
"""Run all tests even when they take long to complete."""

show_memory = False
"""Show the progress of memory consumption."""


def parse_argv(argv):
    global verbose, heavy

    if 'verbose' in argv:
        verbose = True
        argv.remove('verbose')

    if 'silent' in argv:  # take care of old flag, just in case
        verbose = False
        argv.remove('silent')

    if '--heavy' in argv:
        heavy = True
        argv.remove('--heavy')

    return argv


zlib_avail = tables.which_lib_version("zlib") is not None
lzo_avail = tables.which_lib_version("lzo") is not None
bzip2_avail = tables.which_lib_version("bzip2") is not None
blosc_avail = tables.which_lib_version("blosc") is not None


def print_heavy(heavy):
    if heavy:
        print("""Performing the complete test suite!""")
    else:
        print("""\
Performing only a light (yet comprehensive) subset of the test suite.
If you want a more complete test, try passing the --heavy flag to this script
(or set the 'heavy' parameter in case you are using tables.test() call).
The whole suite will take more than 4 hours to complete on a relatively
modern CPU and around 512 MB of main memory.""")
    print('-=' * 38)





def test_filename(filename):
    from pkg_resources import resource_filename
    return resource_filename('tables.tests', filename)


def verbosePrint(string, nonl=False):
    """Print out the `string` if verbose output is enabled."""
    if not verbose:
        return
    if nonl:
        print(string, end=' ')
    else:
        print(string)


def allequal(a, b, flavor="numpy"):
    """Checks if two numerical objects are equal."""

    # print("a-->", repr(a))
    # print("b-->", repr(b))
    if not hasattr(b, "shape"):
        # Scalar case
        return a == b

    if ((not hasattr(a, "shape") or a.shape == ()) and
            (not hasattr(b, "shape") or b.shape == ())):
        return a == b

    if a.shape != b.shape:
        if verbose:
            print("Shape is not equal:", a.shape, "!=", b.shape)
        return 0

    # Way to check the type equality without byteorder considerations
    if hasattr(b, "dtype") and a.dtype.str[1:] != b.dtype.str[1:]:
        if verbose:
            print("dtype is not equal:", a.dtype, "!=", b.dtype)
        return 0

    # Rank-0 case
    if len(a.shape) == 0:
        if a[()] == b[()]:
            return 1
        else:
            if verbose:
                print("Shape is not equal:", a.shape, "!=", b.shape)
            return 0

    # null arrays
    if a.size == 0:  # len(a) is not correct for generic shapes
        if b.size == 0:
            return 1
        else:
            if verbose:
                print("length is not equal")
                print("len(a.data) ==>", len(a.data))
                print("len(b.data) ==>", len(b.data))
            return 0

    # Multidimensional case
    result = (a == b)
    result = numpy.all(result)
    if not result and verbose:
        print("Some of the elements in arrays are not equal")

    return result


def areArraysEqual(arr1, arr2):
    """Are both `arr1` and `arr2` equal arrays?

    Arguments can be regular NumPy arrays, chararray arrays or
    structured arrays (including structured record arrays). They are
    checked for type and value equality.

    """

    t1 = type(arr1)
    t2 = type(arr2)

    if not ((hasattr(arr1, 'dtype') and arr1.dtype == arr2.dtype) or
            issubclass(t1, t2) or issubclass(t2, t1)):
        return False

    return numpy.all(arr1 == arr2)


class PyTablesTestCase(unittest.TestCase):
    def tearDown(self):
        super().tearDown()
        for key in self.__dict__:
            if self.__dict__[key].__class__.__name__ != 'instancemethod':
                self.__dict__[key] = None

    def _getName(self):
        """Get the name of this test case."""
        return self.id().split('.')[-2]

    def _getMethodName(self):
        """Get the name of the method currently running in the test case."""
        return self.id().split('.')[-1]

    def _verboseHeader(self):
        """Print a nice header for the current test method if verbose."""

        if verbose:
            name = self._getName()
            methodName = self._getMethodName()

            title = "Running {}.{}".format(name, methodName)
            print('{}\n{}'.format(title, '-' * len(title)))

    # COMPATIBILITY: assertWarns is new in Python 3.2
    if not hasattr(unittest.TestCase, 'assertWarns'):
        def assertWarns(self, expected_warning, callable_obj=None,
                        *args, **kwargs):
            context = _AssertWarnsContext(expected_warning, self, callable_obj)
            return context.handle('assertWarns', callable_obj, args, kwargs)

    def _checkEqualityGroup(self, node1, node2, hardlink=False):
        if verbose:
            print("Group 1:", node1)
            print("Group 2:", node2)
        if hardlink:
            self.assertTrue(
                node1._v_pathname != node2._v_pathname,
                "node1 and node2 have the same pathnames.")
        else:
            self.assertTrue(
                node1._v_pathname == node2._v_pathname,
                "node1 and node2 does not have the same pathnames.")
        self.assertTrue(
            node1._v_children == node2._v_children,
            "node1 and node2 does not have the same children.")

    def _checkEqualityLeaf(self, node1, node2, hardlink=False):
        if verbose:
            print("Leaf 1:", node1)
            print("Leaf 2:", node2)
        if hardlink:
            self.assertTrue(
                node1._v_pathname != node2._v_pathname,
                "node1 and node2 have the same pathnames.")
        else:
            self.assertTrue(
                node1._v_pathname == node2._v_pathname,
                "node1 and node2 does not have the same pathnames.")
        self.assertTrue(
            areArraysEqual(node1[:], node2[:]),
            "node1 and node2 does not have the same values.")


class TestFileMixin:
    h5fname = None
    open_kwargs = {}

    def setUp(self):
        super().setUp()
        self.h5file = tables.open_file(
            self.h5fname, title=self._getName(), **self.open_kwargs)

    def tearDown(self):
        """Close ``h5file``."""

        self.h5file.close()
        super().tearDown()


class TempFileMixin:
    open_mode = 'w'
    open_kwargs = {}

    def _getTempFileName(self):
        return tempfile.mktemp(prefix=self._getName(), suffix='.h5')

    def setUp(self):
        """Set ``h5file`` and ``h5fname`` instance attributes.

        * ``h5fname``: the name of the temporary HDF5 file.
        * ``h5file``: the writable, empty, temporary HDF5 file.

        """

        super().setUp()
        self.h5fname = self._getTempFileName()
        self.h5file = tables.open_file(
            self.h5fname, self.open_mode, title=self._getName(),
            **self.open_kwargs)

    def tearDown(self):
        """Close ``h5file`` and remove ``h5fname``."""

        self.h5file.close()
        self.h5file = None
        os.remove(self.h5fname)   # comment this for debugging purposes only
        super().tearDown()

    def _reopen(self, mode='r', **kwargs):
        """Reopen ``h5file`` in the specified ``mode``.

        Returns a true or false value depending on whether the file was
        reopenend or not.  If not, nothing is changed.

        """

        self.h5file.close()
        self.h5file = tables.open_file(self.h5fname, mode, **kwargs)
        return True


class ShowMemTime(PyTablesTestCase):
    tref = time.time()
    """Test for showing memory and time consumption."""

    def test00(self):
        """Showing memory and time consumption."""

        # Obtain memory info (only for Linux 2.6.x)
        for line in open("/proc/self/status"):
            if line.startswith("VmSize:"):
                vmsize = int(line.split()[1])
            elif line.startswith("VmRSS:"):
                vmrss = int(line.split()[1])
            elif line.startswith("VmData:"):
                vmdata = int(line.split()[1])
            elif line.startswith("VmStk:"):
                vmstk = int(line.split()[1])
            elif line.startswith("VmExe:"):
                vmexe = int(line.split()[1])
            elif line.startswith("VmLib:"):
                vmlib = int(line.split()[1])
        print("\nWallClock time:", time.time() - self.tref)
        print("Memory usage: ******* %s *******" % self._getName())
        print("VmSize: {:>7} kB\tVmRSS: {:>7} kB".format(vmsize, vmrss))
        print("VmData: {:>7} kB\tVmStk: {:>7} kB".format(vmdata, vmstk))
        print("VmExe:  {:>7} kB\tVmLib: {:>7} kB".format(vmexe, vmlib))


## Local Variables:
## mode: python
## py-indent-offset: 4
## tab-width: 4
## fill-column: 72
## End: