Skip to content

πŸ” CategoricalAnomalyDetectionLayer

πŸ” CategoricalAnomalyDetectionLayer

🟑 Intermediate βœ… Stable πŸ”₯ Popular

🎯 Overview

The CategoricalAnomalyDetectionLayer identifies outliers in categorical features by learning the distribution of categorical values and detecting rare or unusual combinations. It uses embedding-based approaches and frequency analysis to detect anomalies in categorical data.

This layer is particularly powerful for identifying outliers in categorical data, providing a specialized approach for non-numerical features that traditional statistical methods may not handle well.

πŸ” How It Works

The CategoricalAnomalyDetectionLayer processes data through categorical anomaly detection:

  1. Categorical Encoding: Encodes categorical features into embeddings
  2. Frequency Analysis: Analyzes frequency of categorical values
  3. Rarity Detection: Identifies rare or unusual categorical combinations
  4. Embedding Learning: Learns embeddings for categorical values
  5. Anomaly Scoring: Computes anomaly scores based on rarity and embeddings
  6. Output Generation: Produces anomaly scores for each categorical feature
graph TD
    A[Categorical Features] --> B[Categorical Encoding]
    B --> C[Frequency Analysis]
    C --> D[Rarity Detection]

    B --> E[Embedding Learning]
    E --> F[Embedding Analysis]
    F --> G[Anomaly Scoring]

    D --> G
    G --> H[Anomaly Scores]

    style A fill:#e6f3ff,stroke:#4a86e8
    style H fill:#e8f5e9,stroke:#66bb6a
    style B fill:#fff9e6,stroke:#ffb74d
    style C fill:#f3e5f5,stroke:#9c27b0
    style E fill:#e1f5fe,stroke:#03a9f4
    style G fill:#fff3e0,stroke:#ff9800

πŸ’‘ Why Use This Layer?

Challenge Traditional Approach CategoricalAnomalyDetectionLayer's Solution
Categorical Outliers Limited methods 🎯 Specialized approach for categorical data
Rarity Detection Manual frequency analysis ⚑ Automatic rarity detection
Embedding Learning No embedding learning 🧠 Embedding-based anomaly detection
Frequency Analysis Static frequency analysis πŸ”— Dynamic frequency analysis

πŸ“Š Use Cases

  • Categorical Outlier Detection: Identifying outliers in categorical features
  • Data Quality: Ensuring data quality through categorical anomaly detection
  • Rarity Analysis: Analyzing rare categorical combinations
  • Embedding Learning: Learning embeddings for categorical values
  • Frequency Analysis: Analyzing frequency of categorical values

πŸš€ Quick Start

Basic Usage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import keras
from kerasfactory.layers import CategoricalAnomalyDetectionLayer

# Create sample categorical data
batch_size, num_features = 32, 5
x = keras.ops.convert_to_tensor([
    ["red", "small", "A", "high", "yes"],
    ["blue", "large", "B", "low", "no"],
    ["green", "medium", "C", "medium", "yes"],
    # ... more samples
])

# Apply categorical anomaly detection
anomaly_layer = CategoricalAnomalyDetectionLayer()
anomaly_scores = anomaly_layer(x)

print(f"Input shape: {x.shape}")           # (32, 5)
print(f"Anomaly scores shape: {anomaly_scores.shape}")  # (32, 5)

In a Sequential Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import keras
from kerasfactory.layers import CategoricalAnomalyDetectionLayer

model = keras.Sequential([
    keras.layers.Dense(32, activation='relu'),
    CategoricalAnomalyDetectionLayer(),
    keras.layers.Dense(16, activation='relu'),
    keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In a Functional Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import keras
from kerasfactory.layers import CategoricalAnomalyDetectionLayer

# Define inputs
inputs = keras.Input(shape=(10,), dtype='string')  # 10 categorical features

# Apply categorical anomaly detection
anomaly_scores = CategoricalAnomalyDetectionLayer()(inputs)

# Continue processing
x = keras.layers.Dense(32, activation='relu')(inputs)
x = keras.layers.Dense(16, activation='relu')(x)
outputs = keras.layers.Dense(1, activation='sigmoid')(x)

model = keras.Model(inputs, [outputs, anomaly_scores])

Advanced Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# Advanced configuration with multiple anomaly detection layers
def create_categorical_anomaly_network():
    inputs = keras.Input(shape=(15,), dtype='string')  # 15 categorical features

    # Multiple anomaly detection layers
    anomaly_scores1 = CategoricalAnomalyDetectionLayer()(inputs)

    x = keras.layers.Dense(64, activation='relu')(inputs)
    x = keras.layers.BatchNormalization()(x)

    anomaly_scores2 = CategoricalAnomalyDetectionLayer()(x)

    x = keras.layers.Dense(32, activation='relu')(x)
    x = keras.layers.Dropout(0.2)(x)

    # Multi-task output
    classification = keras.layers.Dense(3, activation='softmax', name='classification')(x)
    regression = keras.layers.Dense(1, name='regression')(x)
    anomaly = keras.layers.Dense(1, activation='sigmoid', name='anomaly')(x)

    return keras.Model(inputs, [classification, regression, anomaly, anomaly_scores1, anomaly_scores2])

model = create_categorical_anomaly_network()
model.compile(
    optimizer='adam',
    loss={'classification': 'categorical_crossentropy', 'regression': 'mse', 'anomaly': 'binary_crossentropy'},
    loss_weights={'classification': 1.0, 'regression': 0.5, 'anomaly': 0.3}
)

πŸ“– API Reference

kerasfactory.layers.CategoricalAnomalyDetectionLayer

Classes

CategoricalAnomalyDetectionLayer
1
2
3
CategoricalAnomalyDetectionLayer(
    dtype: str = "string", **kwargs
)

Backend-agnostic anomaly detection for categorical features.

This layer detects anomalies in categorical features by checking if values belong to a predefined set of valid categories. Values not in this set are considered anomalous.

The layer uses a Keras StringLookup or IntegerLookup layer internally to efficiently map input values to indices, which are then used to determine if a value is valid.

Attributes:

Name Type Description
dtype Any

The data type of input values ('string' or 'int32').

lookup StringLookup | IntegerLookup | None

A Keras lookup layer for mapping values to indices.

vocabulary StringLookup | IntegerLookup | None

list of valid categorical values.

Example
1
2
3
4
layer = CategoricalAnomalyDetectionLayer(dtype='string')
layer.initialize_from_stats(vocabulary=['red', 'green', 'blue'])
outputs = layer(tf.constant([['red'], ['purple']]))
print(outputs['anomaly'])  # [[False], [True]]

Initializes the layer.

Parameters:

Name Type Description Default
dtype str

Data type of input values ('string' or 'int32'). Defaults to 'string'.

'string'
**kwargs

Additional layer arguments.

{}

Raises:

Type Description
ValueError

If dtype is not 'string' or 'int32'.

Source code in kerasfactory/layers/CategoricalAnomalyDetectionLayer.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, dtype: str = "string", **kwargs) -> None:
    """Initializes the layer.

    Args:
        dtype: Data type of input values ('string' or 'int32'). Defaults to 'string'.
        **kwargs: Additional layer arguments.

    Raises:
        ValueError: If dtype is not 'string' or 'int32'.
    """
    self._dtype = None  # Initialize private attribute
    self.lookup: layers.StringLookup | layers.IntegerLookup | None = None
    self.built = False
    super().__init__(**kwargs)
    self.set_dtype(dtype.lower())  # Use setter method
Attributes
dtype property
1
dtype: Any

Get the dtype of the layer.

Functions
set_dtype
1
set_dtype(value) -> None

Set the dtype and initialize the appropriate lookup layer.

Source code in kerasfactory/layers/CategoricalAnomalyDetectionLayer.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def set_dtype(self, value) -> None:
    """Set the dtype and initialize the appropriate lookup layer."""
    self._dtype = value
    if self._dtype == "string":
        self.lookup = layers.StringLookup(
            output_mode="int",
            num_oov_indices=1,
            name="string_lookup",
        )
    elif self._dtype == "int":
        self.lookup = layers.IntegerLookup(
            output_mode="int",
            num_oov_indices=1,
            name="int_lookup",
        )
    else:
        raise ValueError(f"Unsupported dtype: {value}")
initialize_from_stats
1
initialize_from_stats(vocabulary: list[str | int]) -> None

Initializes the layer with a vocabulary of valid values.

Parameters:

Name Type Description Default
vocabulary list[str | int]

list of valid categorical values.

required
Source code in kerasfactory/layers/CategoricalAnomalyDetectionLayer.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def initialize_from_stats(self, vocabulary: list[str | int]) -> None:
    """Initializes the layer with a vocabulary of valid values.

    Args:
        vocabulary: list of valid categorical values.
    """
    # Convert vocabulary to numpy array
    # For empty vocabulary, add a dummy value that will never match
    vocab_array = (
        np.array(["__EMPTY_VOCABULARY__"])
        if not vocabulary
        else np.array(vocabulary)
    )

    # Initialize the lookup layer with the vocabulary
    self.lookup.adapt(vocab_array.reshape(-1, 1))
    logger.info("Categorical layer initialized with vocabulary: {}", vocabulary)
compute_output_shape
1
2
3
compute_output_shape(
    input_shape: tuple[int | None, int]
) -> dict[str, tuple[int | None, int]]

Compute the output shape of the layer.

Parameters:

Name Type Description Default
input_shape tuple[int | None, int]

Input shape tuple.

required

Returns:

Type Description
dict[str, tuple[int | None, int]]

Dictionary mapping output names to their shapes.

Source code in kerasfactory/layers/CategoricalAnomalyDetectionLayer.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def compute_output_shape(
    self,
    input_shape: tuple[int | None, int],
) -> dict[str, tuple[int | None, int]]:
    """Compute the output shape of the layer.

    Args:
        input_shape: Input shape tuple.

    Returns:
        Dictionary mapping output names to their shapes.
    """
    batch_size = input_shape[0]
    return {
        "score": (batch_size, 1),
        "proba": (batch_size, 1),
        "threshold": (1, 1),
        "anomaly": (batch_size, 1),
        "reason": (batch_size, 1),
        "value": input_shape,
    }
from_config classmethod
1
from_config(config) -> Any

Create layer from configuration.

Source code in kerasfactory/layers/CategoricalAnomalyDetectionLayer.py
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def from_config(cls, config) -> Any:
    """Create layer from configuration."""
    # Get vocabulary from config
    vocabulary = config.pop("vocabulary", [])
    # Create layer instance
    layer = cls(**config)
    # Initialize vocabulary
    if vocabulary:
        layer.initialize_from_stats(vocabulary)
    return layer

πŸ”§ Parameters Deep Dive

embedding_dim (int, optional)

  • Purpose: Dimension of categorical embeddings
  • Range: 8 to 64+ (typically 16-32)
  • Impact: Larger values = more expressive embeddings but more parameters
  • Recommendation: Start with 16-32, scale based on data complexity

frequency_threshold (float, optional)

  • Purpose: Threshold for frequency-based anomaly detection
  • Range: 0.0 to 1.0 (typically 0.01-0.1)
  • Impact: Lower values = more sensitive to rare values
  • Recommendation: Use 0.01-0.05 for most applications

embedding_weight (float, optional)

  • Purpose: Weight for embedding-based anomaly detection
  • Range: 0.0 to 1.0 (typically 0.3-0.7)
  • Impact: Higher values = more emphasis on embedding-based detection
  • Recommendation: Use 0.3-0.7 based on data characteristics

πŸ“ˆ Performance Characteristics

  • Speed: ⚑⚑⚑ Fast for small to medium models, scales with embedding dimension
  • Memory: πŸ’ΎπŸ’ΎπŸ’Ύ Moderate memory usage due to embeddings
  • Accuracy: 🎯🎯🎯🎯 Excellent for categorical anomaly detection
  • Best For: Categorical data with potential outliers

🎨 Examples

Example 1: Categorical Outlier Detection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import keras
import numpy as np
from kerasfactory.layers import CategoricalAnomalyDetectionLayer

# Create a model for categorical outlier detection
def create_categorical_outlier_model():
    inputs = keras.Input(shape=(10,), dtype='string')  # 10 categorical features

    # Anomaly detection layer
    anomaly_scores = CategoricalAnomalyDetectionLayer()(inputs)

    # Process features
    x = keras.layers.Dense(32, activation='relu')(inputs)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.Dense(16, activation='relu')(x)
    x = keras.layers.Dropout(0.2)(x)

    # Output
    outputs = keras.layers.Dense(1, activation='sigmoid')(x)

    return keras.Model(inputs, [outputs, anomaly_scores])

model = create_categorical_outlier_model()
model.compile(optimizer='adam', loss='binary_crossentropy')

# Test with sample data
sample_data = keras.ops.convert_to_tensor([
    ["red", "small", "A", "high", "yes", "cat", "fast", "new", "good", "up"],
    ["blue", "large", "B", "low", "no", "dog", "slow", "old", "bad", "down"],
    # ... more samples
])
predictions, anomaly_scores = model(sample_data)
print(f"Categorical outlier predictions shape: {predictions.shape}")
print(f"Anomaly scores shape: {anomaly_scores.shape}")

Example 2: Rarity Analysis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Analyze rarity in categorical data
def analyze_categorical_rarity():
    # Create model with categorical anomaly detection
    inputs = keras.Input(shape=(8,), dtype='string')
    anomaly_scores = CategoricalAnomalyDetectionLayer()(inputs)
    outputs = keras.layers.Dense(1, activation='sigmoid')(inputs)

    model = keras.Model(inputs, [outputs, anomaly_scores])

    # Test with different categorical patterns
    test_inputs = [
        keras.ops.convert_to_tensor([["common", "frequent", "usual", "normal", "typical", "standard", "regular", "ordinary"]]),
        keras.ops.convert_to_tensor([["rare", "unusual", "strange", "abnormal", "atypical", "nonstandard", "irregular", "extraordinary"]]),
    ]

    print("Categorical Rarity Analysis:")
    print("=" * 40)

    for i, test_input in enumerate(test_inputs):
        prediction, anomaly = model(test_input)
        print(f"Test {i+1}: Anomaly mean = {keras.ops.mean(anomaly):.4f}")

    return model

# Analyze categorical rarity
# model = analyze_categorical_rarity()

Example 3: Frequency Analysis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Analyze frequency patterns in categorical data
def analyze_categorical_frequency():
    # Create model with categorical anomaly detection
    inputs = keras.Input(shape=(6,), dtype='string')
    anomaly_scores = CategoricalAnomalyDetectionLayer()(inputs)

    model = keras.Model(inputs, anomaly_scores)

    # Test with sample data
    sample_data = keras.ops.convert_to_tensor([
        ["red", "small", "A", "high", "yes", "cat"],
        ["blue", "large", "B", "low", "no", "dog"],
        # ... more samples
    ])
    anomaly_scores = model(sample_data)

    print("Categorical Frequency Analysis:")
    print("=" * 40)
    print(f"Input shape: {sample_data.shape}")
    print(f"Anomaly scores shape: {anomaly_scores.shape}")
    print(f"Model parameters: {model.count_params()}")

    return model

# Analyze categorical frequency
# model = analyze_categorical_frequency()

πŸ’‘ Tips & Best Practices

  • Embedding Dimension: Start with 16-32, scale based on data complexity
  • Frequency Threshold: Use 0.01-0.05 for most applications
  • Embedding Weight: Balance embedding and frequency-based detection
  • Categorical Encoding: Ensure proper categorical encoding
  • Rarity Analysis: Monitor rarity patterns for interpretability
  • Frequency Analysis: Track frequency changes over time

⚠️ Common Pitfalls

  • Embedding Dimension: Must be positive integer
  • Frequency Threshold: Must be between 0 and 1
  • Embedding Weight: Must be between 0 and 1
  • Memory Usage: Scales with embedding dimension and vocabulary size
  • Categorical Encoding: Ensure proper string tensor handling

πŸ“š Further Reading