Custom Encoding Variants
Learn how to create custom encoding variants and extend uubed’s functionality for specialized use cases.
Overview
uubed’s modular architecture allows you to create custom encoding variants for specific requirements:
- Domain-specific encodings: Optimized for particular data types
- Hybrid methods: Combining multiple encoding strategies
- Experimental variants: Testing new encoding approaches
- Performance-tuned versions: Optimized for specific hardware
Creating Custom Encoders
Basic Encoder Interface
from uubed.encoders.base import BaseEncoder
import numpy as np
class CustomEncoder(BaseEncoder):
def __init__(self, **kwargs):
super().__init__()
self.params = kwargs
def encode(self, data: np.ndarray) -> str:
"""Encode numpy array to string."""
# Your custom encoding logic here
processed = self.preprocess(data)
encoded = self.apply_encoding(processed)
return self.format_output(encoded)
def decode(self, encoded: str) -> np.ndarray:
"""Decode string back to numpy array."""
# Your custom decoding logic here
parsed = self.parse_input(encoded)
decoded = self.apply_decoding(parsed)
return self.postprocess(decoded)
def preprocess(self, data: np.ndarray) -> np.ndarray:
"""Preprocess input data."""
# Normalization, scaling, etc.
return data
def apply_encoding(self, data: np.ndarray) -> bytes:
"""Apply core encoding algorithm."""
raise NotImplementedError
def format_output(self, encoded_bytes: bytes) -> str:
"""Format encoded bytes as string."""
from uubed.encoders.q64 import q64_encode
return q64_encode(encoded_bytes)
Example: Sparse Vector Encoder
class SparseVectorEncoder(BaseEncoder):
"""Encoder optimized for sparse vectors."""
def __init__(self, threshold=0.01, max_indices=32):
super().__init__()
self.threshold = threshold
self.max_indices = max_indices
def encode(self, data: np.ndarray) -> str:
# Find significant indices
significant_mask = np.abs(data) > self.threshold
indices = np.where(significant_mask)[0]
values = data[indices]
# Limit to top-k by magnitude
if len(indices) > self.max_indices:
top_k = np.argsort(np.abs(values))[-self.max_indices:]
indices = indices[top_k]
values = values[top_k]
# Pack indices and values
packed_data = self.pack_sparse_data(indices, values)
# Apply position-safe encoding
from uubed.encoders.q64 import q64_encode
return q64_encode(packed_data)
def pack_sparse_data(self, indices, values):
"""Pack sparse indices and values into bytes."""
# Custom packing format for sparse data
packed = bytearray()
# Number of elements (1 byte)
packed.append(len(indices))
# Indices (2 bytes each, supports up to 65535 dimensions)
for idx in indices:
packed.extend(idx.to_bytes(2, 'little'))
# Values (quantized to uint8)
quantized_values = self.quantize_values(values)
packed.extend(quantized_values)
return bytes(packed)
def quantize_values(self, values):
"""Quantize float values to uint8."""
# Normalize to [0, 1] then scale to [0, 255]
normalized = (values - values.min()) / (values.max() - values.min())
return (normalized * 255).astype(np.uint8)
Example: Multi-Scale Encoder
class MultiScaleEncoder(BaseEncoder):
"""Encoder that processes data at multiple scales."""
def __init__(self, scales=[1, 2, 4], base_method="shq64"):
super().__init__()
self.scales = scales
self.base_method = base_method
self.encoders = self.create_scale_encoders()
def create_scale_encoders(self):
"""Create encoders for each scale."""
from uubed.encoders import get_encoder
encoders = {}
for scale in self.scales:
# Adjust parameters based on scale
params = self.get_scale_params(scale)
encoders[scale] = get_encoder(self.base_method, **params)
return encoders
def encode(self, data: np.ndarray) -> str:
scale_encodings = []
for scale in self.scales:
# Downsample data for this scale
scaled_data = self.downsample(data, scale)
# Encode at this scale
scale_encoding = self.encoders[scale].encode(scaled_data)
scale_encodings.append(scale_encoding)
# Combine scale encodings
return self.combine_encodings(scale_encodings)
def downsample(self, data, scale):
"""Downsample data by averaging."""
if scale == 1:
return data
# Simple average pooling
new_size = len(data) // scale
downsampled = np.zeros(new_size)
for i in range(new_size):
start_idx = i * scale
end_idx = min(start_idx + scale, len(data))
downsampled[i] = np.mean(data[start_idx:end_idx])
return downsampled
Hybrid Encoding Strategies
Adaptive Encoder
class AdaptiveEncoder(BaseEncoder):
"""Encoder that chooses method based on data characteristics."""
def __init__(self):
super().__init__()
self.method_selectors = {
'sparse': lambda x: np.count_nonzero(x) / len(x) < 0.1,
'dense': lambda x: np.std(x) > 0.5,
'uniform': lambda x: np.std(x) < 0.1
}
self.method_encoders = {
'sparse': SparseVectorEncoder(),
'dense': get_encoder('shq64'),
'uniform': get_encoder('eq64')
}
def encode(self, data: np.ndarray) -> str:
# Analyze data characteristics
selected_method = self.select_method(data)
# Encode with selected method
encoding = self.method_encoders[selected_method].encode(data)
# Prepend method identifier
method_id = list(self.method_encoders.keys()).index(selected_method)
return f"{method_id}:{encoding}"
def select_method(self, data):
"""Select encoding method based on data characteristics."""
for method, selector in self.method_selectors.items():
if selector(data):
return method
# Default to dense if no specific pattern detected
return 'dense'
Ensemble Encoder
class EnsembleEncoder(BaseEncoder):
"""Encoder that combines multiple encoding methods."""
def __init__(self, methods=['shq64', 't8q64'], weights=None):
super().__init__()
self.methods = methods
self.weights = weights or [1.0] * len(methods)
self.encoders = [get_encoder(method) for method in methods]
def encode(self, data: np.ndarray) -> str:
# Encode with each method
encodings = []
for encoder in self.encoders:
encoding = encoder.encode(data)
encodings.append(encoding)
# Combine encodings with weights
combined = self.combine_weighted_encodings(encodings, self.weights)
return combined
def combine_weighted_encodings(self, encodings, weights):
"""Combine multiple encodings with weights."""
# Simple concatenation with weight indicators
combined_parts = []
for encoding, weight in zip(encodings, weights):
weight_byte = int(weight * 255).to_bytes(1, 'little')
combined_parts.append(weight_byte + encoding.encode('utf-8'))
# Encode the combined result
from uubed.encoders.q64 import q64_encode
return q64_encode(b''.join(combined_parts))
Registration System
Register Custom Encoder
from uubed.encoders import register_encoder
# Register your custom encoder
register_encoder('sparse', SparseVectorEncoder)
register_encoder('multiscale', MultiScaleEncoder)
register_encoder('adaptive', AdaptiveEncoder)
# Now you can use it with the standard API
from uubed import encode
result = encode(data, method='sparse', threshold=0.05)
Plugin System
# Create a plugin file: uubed_custom_plugin.py
class CustomEncoderPlugin:
def get_encoders(self):
return {
'my_custom': MyCustomEncoder,
'experimental_v1': ExperimentalEncoder
}
def get_config_schema(self):
return {
'my_custom': {
'param1': {'type': 'int', 'default': 10},
'param2': {'type': 'float', 'default': 0.5}
}
}
# Register plugin
from uubed.plugins import register_plugin
register_plugin(CustomEncoderPlugin())
Performance Optimization
SIMD-Optimized Encoder
class SIMDOptimizedEncoder(BaseEncoder):
"""Encoder with SIMD optimizations."""
def __init__(self):
super().__init__()
self.simd_available = self.check_simd_support()
def encode(self, data: np.ndarray) -> str:
if self.simd_available:
return self.encode_simd(data)
else:
return self.encode_fallback(data)
def encode_simd(self, data):
"""SIMD-optimized encoding."""
# Use numba or other SIMD libraries
import numba
@numba.jit(nopython=True)
def simd_process(arr):
# Vectorized operations
result = np.zeros_like(arr)
for i in numba.prange(len(arr)):
result[i] = self.process_element_simd(arr[i])
return result
processed = simd_process(data)
return self.finalize_encoding(processed)
GPU-Accelerated Encoder
class GPUEncoder(BaseEncoder):
"""GPU-accelerated encoder using CuPy."""
def __init__(self):
super().__init__()
self.gpu_available = self.check_gpu_support()
def encode(self, data: np.ndarray) -> str:
if not self.gpu_available:
return super().encode(data)
try:
import cupy as cp
# Transfer to GPU
gpu_data = cp.asarray(data)
# GPU processing
processed = self.process_on_gpu(gpu_data)
# Transfer back to CPU
result = cp.asnumpy(processed)
return self.finalize_encoding(result)
except Exception:
# Fallback to CPU
return super().encode(data)
Testing Custom Encoders
Unit Tests
import unittest
import numpy as np
class TestCustomEncoder(unittest.TestCase):
def setUp(self):
self.encoder = SparseVectorEncoder(threshold=0.1)
self.test_data = np.random.rand(100)
def test_encode_decode_roundtrip(self):
"""Test that encode/decode preserves important information."""
encoded = self.encoder.encode(self.test_data)
decoded = self.encoder.decode(encoded)
# Test that significant values are preserved
significant_mask = np.abs(self.test_data) > 0.1
np.testing.assert_allclose(
self.test_data[significant_mask],
decoded[significant_mask],
rtol=0.1
)
def test_compression_ratio(self):
"""Test compression efficiency."""
original_size = len(self.test_data.tobytes())
encoded = self.encoder.encode(self.test_data)
compressed_size = len(encoded.encode('utf-8'))
compression_ratio = compressed_size / original_size
self.assertLess(compression_ratio, 0.5) # At least 50% compression
Benchmark Tests
def benchmark_custom_encoder():
"""Benchmark custom encoder performance."""
import time
encoder = SparseVectorEncoder()
test_data = np.random.rand(1000, 512)
# Benchmark encoding
start_time = time.time()
for data in test_data:
encoded = encoder.encode(data)
encoding_time = time.time() - start_time
print(f"Encoding rate: {len(test_data) / encoding_time:.2f} vectors/sec")
# Benchmark quality
quality_scores = []
for data in test_data[:100]: # Sample for quality check
encoded = encoder.encode(data)
decoded = encoder.decode(encoded)
quality = calculate_similarity(data, decoded)
quality_scores.append(quality)
print(f"Average quality: {np.mean(quality_scores):.3f}")
Deployment and Distribution
Packaging Custom Encoders
# setup.py for custom encoder package
from setuptools import setup, find_packages
setup(
name="uubed-custom-encoders",
version="1.0.0",
packages=find_packages(),
install_requires=["uubed>=1.0.0"],
entry_points={
'uubed.encoders': [
'sparse = my_encoders:SparseVectorEncoder',
'multiscale = my_encoders:MultiScaleEncoder',
]
}
)
Docker Deployment
FROM python:3.9-slim
# Install uubed and custom encoders
RUN pip install uubed uubed-custom-encoders
# Copy configuration
COPY custom_config.toml /etc/uubed/config.toml
# Set up entrypoint
ENTRYPOINT ["python", "-m", "uubed.server"]
Best Practices
- Follow Interface: Implement the BaseEncoder interface correctly
- Error Handling: Add robust error handling and validation
- Documentation: Document parameters and behavior thoroughly
- Testing: Include comprehensive unit and integration tests
- Performance: Benchmark and optimize critical paths
- Backward Compatibility: Maintain compatibility with existing data