ENH: merge_asof() has type specializations and can take multiple 'by' parameters (#13936)

closes #13936

Author: Christopher C. Aycock <christopher.aycock@twosigma.com>

Closes #14783 from chrisaycock/GH13936 and squashes the following commits:

ffcf0c2 [Christopher C. Aycock] Added test to reject float16; fixed typos
1f208a8 [Christopher C. Aycock] Use tuple representation instead of strings
77eb47b [Christopher C. Aycock] Merge master branch into GH13936
89256f0 [Christopher C. Aycock] Test 8-bit integers and raise error on 16-bit floats; add comments
0ad1687 [Christopher C. Aycock] Fixed whatsnew
2bce3cc [Christopher C. Aycock] Revert dict back to PyObjectHashTable in response to code review
fafbb02 [Christopher C. Aycock] Updated benchmarks to reflect new ASV setup
5eeb7d9 [Christopher C. Aycock] Merge master into GH13936
c33c4cb [Christopher C. Aycock] Merge branch 'master' into GH13936
46cc309 [Christopher C. Aycock] Update documentation
f01142c [Christopher C. Aycock] Merge master branch
75157fc [Christopher C. Aycock] merge_asof() has type specializations and can take multiple 'by' parameters (#13936)

(cherry picked from commit e7df7516ff)
This commit is contained in:
Christopher C. Aycock 2016-12-16 18:04:53 -05:00 committed by Joris Van den Bossche
parent a5091722a3
commit c520b25944
4 changed files with 251 additions and 43 deletions

View File

@ -30,6 +30,7 @@ Other Enhancements
~~~~~~~~~~~~~~~~~~
- ``pd.merge_asof()`` gained ``left_index``/``right_index`` and ``left_by``/``right_by`` arguments (:issue:`14253`)
- ``pd.merge_asof()`` can take multiple columns in ``by`` parameter and has specialized dtypes for better performace (:issue:`13936`)
.. _whatsnew_0192.bug_fixes:

View File

@ -1,3 +1,4 @@
# cython: boundscheck=False, wraparound=False
"""
Template for each `dtype` helper function for hashtable
@ -14,7 +15,9 @@ WARNING: DO NOT edit .pxi FILE directly, .pxi is generated from .pxi.in
by_dtypes = [('PyObjectHashTable', 'object'), ('Int64HashTable', 'int64_t')]
# on_dtype
on_dtypes = ['int64_t', 'double']
on_dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t',
'int8_t', 'int16_t', 'int32_t', 'int64_t',
'float', 'double']
}}
@ -98,7 +101,9 @@ def asof_join_{{on_dtype}}_by_{{by_dtype}}(ndarray[{{on_dtype}}] left_values,
{{py:
# on_dtype
dtypes = ['int64_t', 'double']
dtypes = ['uint8_t', 'uint16_t', 'uint32_t', 'uint64_t',
'int8_t', 'int16_t', 'int32_t', 'int64_t',
'float', 'double']
}}

View File

@ -5,6 +5,8 @@ SQL-style merge routines
import copy
import warnings
import string
import numpy as np
from pandas.compat import range, lrange, lzip, zip, map, filter
import pandas.compat as compat
@ -28,7 +30,8 @@ from pandas.types.common import (is_datetime64tz_dtype,
is_list_like,
_ensure_int64,
_ensure_float64,
_ensure_object)
_ensure_object,
_get_dtype)
from pandas.types.missing import na_value_for_dtype
from pandas.core.generic import NDFrame
@ -271,8 +274,8 @@ def merge_asof(left, right, on=None,
DataFrame whose 'on' key is less than or equal to the left's key. Both
DataFrames must be sorted by the key.
Optionally perform group-wise merge. This searches for the nearest match
on the 'on' key within the same group according to 'by'.
Optionally match on equivalent keys with 'by' before searching for nearest
match with 'on'.
.. versionadded:: 0.19.0
@ -299,16 +302,15 @@ def merge_asof(left, right, on=None,
.. versionadded:: 0.19.2
by : column name
Group both the left and right DataFrames by the group column; perform
the merge operation on these pieces and recombine.
by : column name or list of column names
Match on these columns before performing merge operation.
left_by : column name
Field name to group by in the left DataFrame.
Field names to match on in the left DataFrame.
.. versionadded:: 0.19.2
right_by : column name
Field name to group by in the right DataFrame.
Field names to match on in the right DataFrame.
.. versionadded:: 0.19.2
@ -997,17 +999,13 @@ class _OrderedMerge(_MergeOperation):
return result
_asof_functions = {
'int64_t': _join.asof_join_int64_t,
'double': _join.asof_join_double,
}
def _asof_function(on_type):
return getattr(_join, 'asof_join_%s' % on_type, None)
def _asof_by_function(on_type, by_type):
return getattr(_join, 'asof_join_%s_by_%s' % (on_type, by_type), None)
_asof_by_functions = {
('int64_t', 'int64_t'): _join.asof_join_int64_t_by_int64_t,
('double', 'int64_t'): _join.asof_join_double_by_int64_t,
('int64_t', 'object'): _join.asof_join_int64_t_by_object,
('double', 'object'): _join.asof_join_double_by_object,
}
_type_casters = {
'int64_t': _ensure_int64,
@ -1015,9 +1013,32 @@ _type_casters = {
'object': _ensure_object,
}
_cython_types = {
'uint8': 'uint8_t',
'uint32': 'uint32_t',
'uint16': 'uint16_t',
'uint64': 'uint64_t',
'int8': 'int8_t',
'int32': 'int32_t',
'int16': 'int16_t',
'int64': 'int64_t',
'float16': 'error',
'float32': 'float',
'float64': 'double',
}
def _get_cython_type(dtype):
""" Given a dtype, return 'int64_t', 'double', or 'object' """
""" Given a dtype, return a C name like 'int64_t' or 'double' """
type_name = _get_dtype(dtype).name
ctype = _cython_types.get(type_name, 'object')
if ctype == 'error':
raise MergeError('unsupported type: ' + type_name)
return ctype
def _get_cython_type_upcast(dtype):
""" Upcast a dtype to 'int64_t', 'double', or 'object' """
if is_integer_dtype(dtype):
return 'int64_t'
elif is_float_dtype(dtype):
@ -1084,11 +1105,6 @@ class _AsOfMerge(_OrderedMerge):
if not is_list_like(self.right_by):
self.right_by = [self.right_by]
if len(self.left_by) != 1:
raise MergeError("can only asof by a single key")
if len(self.right_by) != 1:
raise MergeError("can only asof by a single key")
self.left_on = self.left_by + list(self.left_on)
self.right_on = self.right_by + list(self.right_on)
@ -1142,6 +1158,13 @@ class _AsOfMerge(_OrderedMerge):
def _get_join_indexers(self):
""" return the join indexers """
def flip(xs):
""" unlike np.transpose, this returns an array of tuples """
labels = list(string.ascii_lowercase[:len(xs)])
dtypes = [x.dtype for x in xs]
labeled_dtypes = list(zip(labels, dtypes))
return np.array(lzip(*xs), labeled_dtypes)
# values to compare
left_values = (self.left.index.values if self.left_index else
self.left_join_keys[-1])
@ -1165,22 +1188,23 @@ class _AsOfMerge(_OrderedMerge):
# a "by" parameter requires special handling
if self.left_by is not None:
left_by_values = self.left_join_keys[0]
right_by_values = self.right_join_keys[0]
if len(self.left_join_keys) > 2:
# get tuple representation of values if more than one
left_by_values = flip(self.left_join_keys[0:-1])
right_by_values = flip(self.right_join_keys[0:-1])
else:
left_by_values = self.left_join_keys[0]
right_by_values = self.right_join_keys[0]
# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
by_type = _get_cython_type(left_by_values.dtype)
on_type_caster = _type_casters[on_type]
# upcast 'by' parameter because HashTable is limited
by_type = _get_cython_type_upcast(left_by_values.dtype)
by_type_caster = _type_casters[by_type]
func = _asof_by_functions[(on_type, by_type)]
left_values = on_type_caster(left_values)
right_values = on_type_caster(right_values)
left_by_values = by_type_caster(left_by_values)
right_by_values = by_type_caster(right_by_values)
# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
func = _asof_by_function(on_type, by_type)
return func(left_values,
right_values,
left_by_values,
@ -1190,12 +1214,7 @@ class _AsOfMerge(_OrderedMerge):
else:
# choose appropriate function by type
on_type = _get_cython_type(left_values.dtype)
type_caster = _type_casters[on_type]
func = _asof_functions[on_type]
left_values = type_caster(left_values)
right_values = type_caster(right_values)
func = _asof_function(on_type)
return func(left_values,
right_values,
self.allow_exact_matches,

View File

@ -221,6 +221,117 @@ class TestAsOfMerge(tm.TestCase):
expected.loc[expected.ticker == 'MSFT', ['bid', 'ask']] = np.nan
assert_frame_equal(result, expected)
def test_multiby(self):
# GH13936
trades = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.046',
'20160525 13:30:00.048',
'20160525 13:30:00.050']),
'ticker': ['MSFT', 'MSFT',
'GOOG', 'GOOG', 'AAPL'],
'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'],
'price': [51.95, 51.95,
720.77, 720.92, 98.00],
'quantity': [75, 155,
100, 100, 100]},
columns=['time', 'ticker', 'exch',
'price', 'quantity'])
quotes = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.030',
'20160525 13:30:00.041',
'20160525 13:30:00.045',
'20160525 13:30:00.049']),
'ticker': ['GOOG', 'MSFT', 'MSFT',
'MSFT', 'GOOG', 'AAPL'],
'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA',
'NSDQ', 'ARCA'],
'bid': [720.51, 51.95, 51.97, 51.99,
720.50, 97.99],
'ask': [720.92, 51.96, 51.98, 52.00,
720.93, 98.01]},
columns=['time', 'ticker', 'exch', 'bid', 'ask'])
expected = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.046',
'20160525 13:30:00.048',
'20160525 13:30:00.050']),
'ticker': ['MSFT', 'MSFT',
'GOOG', 'GOOG', 'AAPL'],
'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'],
'price': [51.95, 51.95,
720.77, 720.92, 98.00],
'quantity': [75, 155,
100, 100, 100],
'bid': [np.nan, 51.95, 720.50, 720.51, np.nan],
'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]},
columns=['time', 'ticker', 'exch',
'price', 'quantity', 'bid', 'ask'])
result = pd.merge_asof(trades, quotes, on='time',
by=['ticker', 'exch'])
assert_frame_equal(result, expected)
def test_multiby_heterogeneous_types(self):
# GH13936
trades = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.046',
'20160525 13:30:00.048',
'20160525 13:30:00.050']),
'ticker': [0, 0, 1, 1, 2],
'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'],
'price': [51.95, 51.95,
720.77, 720.92, 98.00],
'quantity': [75, 155,
100, 100, 100]},
columns=['time', 'ticker', 'exch',
'price', 'quantity'])
quotes = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.030',
'20160525 13:30:00.041',
'20160525 13:30:00.045',
'20160525 13:30:00.049']),
'ticker': [1, 0, 0, 0, 1, 2],
'exch': ['BATS', 'NSDQ', 'ARCA', 'ARCA',
'NSDQ', 'ARCA'],
'bid': [720.51, 51.95, 51.97, 51.99,
720.50, 97.99],
'ask': [720.92, 51.96, 51.98, 52.00,
720.93, 98.01]},
columns=['time', 'ticker', 'exch', 'bid', 'ask'])
expected = pd.DataFrame({
'time': pd.to_datetime(['20160525 13:30:00.023',
'20160525 13:30:00.023',
'20160525 13:30:00.046',
'20160525 13:30:00.048',
'20160525 13:30:00.050']),
'ticker': [0, 0, 1, 1, 2],
'exch': ['ARCA', 'NSDQ', 'NSDQ', 'BATS', 'NSDQ'],
'price': [51.95, 51.95,
720.77, 720.92, 98.00],
'quantity': [75, 155,
100, 100, 100],
'bid': [np.nan, 51.95, 720.50, 720.51, np.nan],
'ask': [np.nan, 51.96, 720.93, 720.92, np.nan]},
columns=['time', 'ticker', 'exch',
'price', 'quantity', 'bid', 'ask'])
result = pd.merge_asof(trades, quotes, on='time',
by=['ticker', 'exch'])
assert_frame_equal(result, expected)
def test_basic2(self):
expected = self.read_data('asof2.csv')
@ -542,6 +653,78 @@ class TestAsOfMerge(tm.TestCase):
assert_frame_equal(result, expected)
def test_on_specialized_type(self):
# GH13936
for dtype in [np.uint8, np.uint16, np.uint32, np.uint64,
np.int8, np.int16, np.int32, np.int64,
np.float16, np.float32, np.float64]:
df1 = pd.DataFrame({
'value': [5, 2, 25, 100, 78, 120, 79],
'symbol': list("ABCDEFG")},
columns=['symbol', 'value'])
df1.value = dtype(df1.value)
df2 = pd.DataFrame({
'value': [0, 80, 120, 125],
'result': list('xyzw')},
columns=['value', 'result'])
df2.value = dtype(df2.value)
df1 = df1.sort_values('value').reset_index(drop=True)
if dtype == np.float16:
with self.assertRaises(MergeError):
pd.merge_asof(df1, df2, on='value')
continue
result = pd.merge_asof(df1, df2, on='value')
expected = pd.DataFrame(
{'symbol': list("BACEGDF"),
'value': [2, 5, 25, 78, 79, 100, 120],
'result': list('xxxxxyz')
}, columns=['symbol', 'value', 'result'])
expected.value = dtype(expected.value)
assert_frame_equal(result, expected)
def test_on_specialized_type_by_int(self):
# GH13936
for dtype in [np.uint8, np.uint16, np.uint32, np.uint64,
np.int8, np.int16, np.int32, np.int64,
np.float16, np.float32, np.float64]:
df1 = pd.DataFrame({
'value': [5, 2, 25, 100, 78, 120, 79],
'key': [1, 2, 3, 2, 3, 1, 2],
'symbol': list("ABCDEFG")},
columns=['symbol', 'key', 'value'])
df1.value = dtype(df1.value)
df2 = pd.DataFrame({
'value': [0, 80, 120, 125],
'key': [1, 2, 2, 3],
'result': list('xyzw')},
columns=['value', 'key', 'result'])
df2.value = dtype(df2.value)
df1 = df1.sort_values('value').reset_index(drop=True)
if dtype == np.float16:
with self.assertRaises(MergeError):
pd.merge_asof(df1, df2, on='value', by='key')
else:
result = pd.merge_asof(df1, df2, on='value', by='key')
expected = pd.DataFrame({
'symbol': list("BACEGDF"),
'key': [2, 1, 3, 3, 2, 2, 1],
'value': [2, 5, 25, 78, 79, 100, 120],
'result': [np.nan, 'x', np.nan, np.nan, np.nan, 'y', 'x']},
columns=['symbol', 'key', 'value', 'result'])
expected.value = dtype(expected.value)
assert_frame_equal(result, expected)
def test_on_float_by_int(self):
# type specialize both "by" and "on" parameters
df1 = pd.DataFrame({