Создание таблицы PubsubToBQ перед вставкой

avatar
vamper1234
1 июля 2021 в 20:39
57
0
0

Я пытаюсь сделать жизнь созданием таблицы Bigquery перед самим процессом вставки. Вот код PTransform, который я использую -> Ссылка

Это преобразование я хотел бы применить к сообщениям Pubsub, которые позже будут вставлены в таблицу BQ.

  • Этап 1. Получение сообщений pubsub:

         PCollection<PubsubMessage> messages =
             pipeline.apply(
                     "ReadPubSubSubscription",
                     PubsubIO.readMessagesWithAttributes()
                             .fromSubscription(options.getInputSubscription()));
    
  • Этап 2. Преобразование всех сообщений pubsub в TableRow:

          PCollectionTuple convertedTableRows =
                  messages
                      .apply("ConvertMessageToTableRow", new PubsubMessageToTableRow(options));
    
  • Этап 3. Вот проблема, мне нужно проверить, существует ли таблица, и загрузить результат в BQ:

      ###here is the schema for our BQ table
      public static final Schema schema1 =
          Schema.of(
                  Field.of("name", StandardSQLTypeName.STRING),
                  Field.of("post_abbr", StandardSQLTypeName.STRING));
    
      ### here is the method that we using to extract table name from pubsub attributes
      static class PubSubAttributeExtractor implements SerializableFunction<ValueInSingleWindow<TableRow>  String> {
    
      private final String attribute;
    
      public PubSubAttributeExtractor(String attribute) {
          this.attribute = attribute;
      }
    
      @Override
      public String apply(ValueInSingleWindow<TableRow> input) {
          TableRow row = input.getValue();
          String tableName = (String) row.get("name");
          return "my-project:myDS.pubsub_" + tableName;
      }
      }
      ### here is the part that doesn't work 
      WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT)                
              .apply(new BigQueryAutoCreateTable(
                        new PubSubAttributeExtractor("event_name"),schema1));
              .apply(
                              "WriteSuccessfulRecords",
                              BigQueryIO.writeTableRows()
                                      .withoutValidation()
                                      .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                                      .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                                      .withExtendedErrorInfo()
                                      .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                                      .to(new ProbPartitionDestinations(options.getOutputTableSpec())
                                      )
                      );
    

Журналы ошибок: не удается найти символ символ: метод apply(java.lang.String,org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write<com.google.api.services.bigquery.model.TableRow>) расположение: интерфейс org.apache.beam.sdk.values.POutput

Источник
guillaume blaquiere
1 июля 2021 в 21:11
1

Зачем вам нужно проверять, существует ли таблица? CREATE_IF_NEEDED не является ответом на ваш вариант использования? В противном случае объясните больше ваших требований

vamper1234
2 июля 2021 в 07:00
0

@guillaumeblaquiere, мне это нужно из-за этой проблемы BEAM-3772, которую они не могли исправить в прошлом году. Если бы я использовал CREATE_IF_NEEDED, он создаст таблицы для первых нескольких сообщений и после этого начнет отправлять сообщение об ошибке -> ТАБЛИЦА НЕ НАЙДЕНА

guillaume blaquiere
2 июля 2021 в 07:03
0

Понял, я не знал об этой проблеме!! Какая у вас версия луча? кое-что проверю и сообщу

vamper1234
2 июля 2021 в 07:39
0

Я использую луч 2.25

guillaume blaquiere
2 июля 2021 в 19:46
0

Вам нужна другая таблица в соответствии с атрибутом PubSub?

Ответы (0)