
 
定义基础类
 
import abc
import pandas as pd
import numpy as np
import re
class Expression(abc.ABC):
    
    def __str__(self):
        return type(self).__name__
    def __repr__(self):
        return str(self)
    def __add__(self, other):
        return Add(self, other)  
    def __radd__(self, other):
        return Add(other, self)
    def __sub__(self, other):
        return Sub(self, other) 
    def __rsub__(self, other):
        return Sub(other, self)
    def __mul__(self, other):
        return Mul(self, other)
    def __rmul__(self, other):
        return Mul(self, other)
    def __div__(self, other):
        return Div(self, other)
    def __rdiv__(self, other):
        return Div(other, self)
    def load(self, instrument, start_index, end_index, *args):
        series = self._load_internal(instrument, start_index, end_index, *args)
        return series
    @abc.abstractmethod
    def _load_internal(self, instrument, start_index, end_index, *args) -> pd.Series:
        raise NotImplementedError("This function must be implemented in your newly defined feature")
class ExpressionOps(Expression):
    pass
class PairOperator(ExpressionOps):
    def __init__(self, feature_left, feature_right):
        self.feature_left = feature_left
        self.feature_right = feature_right
    def __str__(self):
        return "{}({},{})".format(type(self).__name__, self.feature_left, self.feature_right)
class NpPairOperator(PairOperator):
    def __init__(self, feature_left, feature_right, func):
        self.func = func
        super(NpPairOperator, self).__init__(feature_left, feature_right)
    def _load_internal(self, instrument, start_index, end_index, *args):
        if isinstance(self.feature_left, (Expression,)):
            series_left = self.feature_left.load(instrument, start_index, end_index, *args)
        else:
            series_left = self.feature_left  
        if isinstance(self.feature_right, (Expression,)):
            series_right = self.feature_right.load(instrument, start_index, end_index, *args)
        else:
            series_right = self.feature_right
        res = getattr(np, self.func)(series_left, series_right)
        return res
class Add(NpPairOperator):
    def __init__(self, feature_left, feature_right):
        super(Add, self).__init__(feature_left, feature_right, "add")
class Sub(NpPairOperator):
    def __init__(self, feature_left, feature_right):
        super(Sub, self).__init__(feature_left, feature_right, "subtract")
class Mul(NpPairOperator):
    def __init__(self, feature_left, feature_right):
        super(Mul, self).__init__(feature_left, feature_right, "multiply")
class Div(NpPairOperator):
    def __init__(self, feature_left, feature_right):
        super(Div, self).__init__(feature_left, feature_right, "divide")
class Feature(Expression):
    """Static Expression
    This kind of feature will load data from provider
    """
    def __init__(self, name=None):
        if name:
            self._name = name
        else:
            self._name = type(self).__name__
    def __str__(self):
        return "$" + self._name
    def _load_internal(self, instrument, start_index, end_index):
        return instrument.loc[start_index:end_index][self._name]
    
 
解析表达式
 
def parse_field(field):
    
    
    
    
    if not isinstance(field, str):
        field = str(field)
    for pattern, new in [
        (rf"\$([\w]+)", r'Feature("\1")'),
    ]:  
        field = re.sub(pattern, new, field)
    return field
def compute_feature(df, exp):
    exp = eval(parse_field(exp))
    return exp.load(df, 0, len(df))
def compute_features(df, exps, labels):
    data = dict()
    for label, exp in zip(labels, exps):
        data[label] = compute_feature(df, exp)
    if len(data) > 1:
        return pd.concat(data, axis=1)
    else:
        return pd.DataFrame(data)
 
样例
 
data = {
    'a' : [1,2,3,4,5], 
    'b' : [6,7,8,9,0]
}
df = pd.DataFrame(data)
compute_features(df, ["$a", "$b", "($a + $b) * ($a - $b)"], ['a','b', 'a^2 - b^2'])
 
