feathr-ai/feathr

[BUG] support COUNT_DISTINCT

Open

#914 创建于 2022年12月10日

在 GitHub 查看
 (0 评论) (0 反应) (0 负责人)Scala (1,929 star) (244 fork)batch import
buggood first issue

描述

Willingness to contribute

No. I cannot contribute a bug fix at this time.

Feathr version

0.9.0

System information

NA

Describe the problem

Aggregation function COUNT_DISTINCT seems is not supported yet.

When I try to use that, it seems auto-types the feature to STRING, ignoring the type I explicitly defined, which is integer, and thus spark throws an error.

My codes:

# total number of different currencies used for transaction in the past week
num_currency_type_in_week = Feature(
    name="num_currency_type_in_week",
    key=account_id,
    feature_type=INT32,
    transform=WindowAggTransformation(
        agg_expr="transactionCurrencyCode", agg_func="COUNT_DISTINCT", window="7d"
    ),
)

Spark logs:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, num_currency_type_in_week), StringType, false), true, false, true) AS num_currency_type_in_week#819

and the error:

Caused by: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of string
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_12$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_7$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)

I see the test function for count_distinct is commented out: https://github.com/feathr-ai/feathr/blob/main/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala

Tracking information

No response

Code to reproduce bug

No response

What component(s) does this bug affect?

  • Python Client: This is the client users use to interact with most of our API. Mostly written in Python.
  • Computation Engine: The computation engine that execute the actual feature join and generation work. Mostly in Scala and Spark.
  • Feature Registry API: The frontend API layer supports SQL, Purview(Atlas) as storage. The API layer is in Python(FAST API)
  • Feature Registry Web UI: The Web UI for feature registry. Written in React

贡献者指南