Почему StateStore моего Kafka Transformer недоступен?

avatar
Mark Lavin
7 апреля 2018 в 22:23
1626
1
3

Я пытаюсь реализовать класс Transformer

public class StreamSorterByTimeStampWithDelayTransformer < V > 
    implements Transformer< Long, V, KeyValue< Long, V > >

Конструктор класса создает StateStore для каждого экземпляра, таким образом:

this.state_store_name = "state_store_" + this.hashCode();

KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder = 
    Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );

Метод Transformer init ссылается на StateStore следующим образом:

public void init(ProcessorContext context) {
    this.context = context;
    this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
    // schedule a punctuate() method every "poll_ms" (wall clock time)
    this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME, 
            (timestamp) -> pushOutOldEnoughEntries() );
}

Я думаю, что должен пропустить один шаг, потому что при вызове getStateStore он приводит к исключению:

Processor KSTREAM-TRANSFORM-0000000003 has no access to StateStore

Что я упускаю или делаю неправильно?

Источник

Ответы (1)

avatar
Matthias J. Sax
8 апреля 2018 в 05:34
9

Нужно прикрепить магазин к трансформатору:

stream.transform(...  this.state_store_name);