/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.executor.streaming;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.streaming.Batch;
import org.opensearch.sql.executor.streaming.MetadataLog;
import org.opensearch.sql.executor.streaming.Offset;
import org.opensearch.sql.executor.streaming.StreamingSource;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import shaded.com.google.common.base.Preconditions;

public class MicroBatchStreamingExecution {
    private static final Logger log = LogManager.getLogger(MicroBatchStreamingExecution.class);
    static final long INITIAL_LATEST_BATCH_ID = -1L;
    private final StreamingSource source;
    private final LogicalPlan batchPlan;
    private final QueryService queryService;
    private final MetadataLog<Offset> offsetLog;
    private final MetadataLog<Offset> committedLog;

    public MicroBatchStreamingExecution(StreamingSource source2, LogicalPlan batchPlan, QueryService queryService, MetadataLog<Offset> offsetLog, MetadataLog<Offset> committedLog) {
        this.source = source2;
        this.batchPlan = batchPlan;
        this.queryService = queryService;
        this.offsetLog = offsetLog;
        this.committedLog = committedLog;
    }

    public void execute() {
        Long latestBatchId = this.offsetLog.getLatest().map(Pair::getKey).orElse(-1L);
        Long latestCommittedBatchId = this.committedLog.getLatest().map(Pair::getKey).orElse(-1L);
        Optional<Offset> committedOffset = this.offsetLog.get(latestCommittedBatchId);
        final AtomicLong currentBatchId = new AtomicLong(-1L);
        if (latestBatchId.equals(latestCommittedBatchId)) {
            currentBatchId.set(latestCommittedBatchId + 1L);
        } else {
            Preconditions.checkArgument(latestBatchId.equals(latestCommittedBatchId + 1L), "[BUG] Expected latestBatchId - latestCommittedBatchId = 0 or 1, but latestBatchId=%d, latestCommittedBatchId=%d", (Object)latestBatchId, (Object)latestCommittedBatchId);
            currentBatchId.set(latestBatchId);
        }
        final Optional<Offset> availableOffsets = this.source.getLatestOffset();
        if (this.hasNewData(availableOffsets, committedOffset)) {
            Batch batch = this.source.getBatch(committedOffset, availableOffsets.get());
            this.offsetLog.add(currentBatchId.get(), availableOffsets.get());
            this.queryService.executePlan(this.batchPlan, new PlanContext(batch.getSplit()), new ResponseListener<ExecutionEngine.QueryResponse>(){

                @Override
                public void onResponse(ExecutionEngine.QueryResponse response) {
                    long finalBatchId = currentBatchId.get();
                    Offset finalAvailableOffsets = (Offset)availableOffsets.get();
                    MicroBatchStreamingExecution.this.committedLog.add(finalBatchId, finalAvailableOffsets);
                }

                @Override
                public void onFailure(Exception e) {
                    log.error("streaming processing failed. source = {} {}", (Object)MicroBatchStreamingExecution.this.source, (Object)e);
                }
            });
        }
    }

    private boolean hasNewData(Optional<Offset> availableOffsets, Optional<Offset> committedOffset) {
        if (availableOffsets.equals(committedOffset)) {
            log.debug("source does not have new data, exit. source = {}", (Object)this.source);
            return false;
        }
        Preconditions.checkArgument(availableOffsets.isPresent(), "[BUG] available offsets must be no empty");
        log.debug("source has new data. source = {}, availableOffsets:{}, committedOffset:{}", (Object)this.source, availableOffsets, committedOffset);
        return true;
    }
}

