Я пытаюсь сделать жизнь созданием таблицы Bigquery перед самим процессом вставки. Вот код PTransform, который я использую -> Ссылка
Это преобразование я хотел бы применить к сообщениям Pubsub, которые позже будут вставлены в таблицу BQ.
-
Этап 1. Получение сообщений pubsub:
PCollection<PubsubMessage> messages = pipeline.apply( "ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes() .fromSubscription(options.getInputSubscription()));
-
Этап 2. Преобразование всех сообщений pubsub в TableRow:
PCollectionTuple convertedTableRows = messages .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
-
Этап 3. Вот проблема, мне нужно проверить, существует ли таблица, и загрузить результат в BQ:
###here is the schema for our BQ table public static final Schema schema1 = Schema.of( Field.of("name", StandardSQLTypeName.STRING), Field.of("post_abbr", StandardSQLTypeName.STRING)); ### here is the method that we using to extract table name from pubsub attributes static class PubSubAttributeExtractor implements SerializableFunction<ValueInSingleWindow<TableRow> String> { private final String attribute; public PubSubAttributeExtractor(String attribute) { this.attribute = attribute; } @Override public String apply(ValueInSingleWindow<TableRow> input) { TableRow row = input.getValue(); String tableName = (String) row.get("name"); return "my-project:myDS.pubsub_" + tableName; } } ### here is the part that doesn't work WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT) .apply(new BigQueryAutoCreateTable( new PubSubAttributeExtractor("event_name"),schema1)); .apply( "WriteSuccessfulRecords", BigQueryIO.writeTableRows() .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .to(new ProbPartitionDestinations(options.getOutputTableSpec()) ) );
Журналы ошибок: не удается найти символ символ: метод apply(java.lang.String,org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write<com.google.api.services.bigquery.model.TableRow>) расположение: интерфейс org.apache.beam.sdk.values.POutput
Зачем вам нужно проверять, существует ли таблица? CREATE_IF_NEEDED не является ответом на ваш вариант использования? В противном случае объясните больше ваших требований
@guillaumeblaquiere, мне это нужно из-за этой проблемы BEAM-3772, которую они не могли исправить в прошлом году. Если бы я использовал CREATE_IF_NEEDED, он создаст таблицы для первых нескольких сообщений и после этого начнет отправлять сообщение об ошибке -> ТАБЛИЦА НЕ НАЙДЕНА
Понял, я не знал об этой проблеме!! Какая у вас версия луча? кое-что проверю и сообщу
Я использую луч 2.25
Вам нужна другая таблица в соответствии с атрибутом PubSub?