/*
 * Decompiled with CFR 0.152.
 */
package org.ojalgo.data.batch;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import org.ojalgo.concurrent.Parallelism;
import org.ojalgo.concurrent.ProcessingService;
import org.ojalgo.function.special.PowerOf2;
import org.ojalgo.netio.DataInterpreter;
import org.ojalgo.netio.DataReader;
import org.ojalgo.netio.DataWriter;
import org.ojalgo.netio.FromFileReader;
import org.ojalgo.netio.ShardedFile;
import org.ojalgo.netio.ToFileWriter;
import org.ojalgo.type.function.AutoConsumer;
import org.ojalgo.type.function.AutoSupplier;
import org.ojalgo.type.function.TwoStepMapper;
import org.ojalgo.type.management.MBeanUtils;
import org.ojalgo.type.management.Throughput;

public final class BatchNode<T> {
    private static final Consumer<Boolean> DUMMY = b -> {};
    private final ToIntFunction<T> myDistributor;
    private final DataInterpreter<T> myInterpreter;
    private final IntSupplier myParallelism;
    private final ProcessingService myProcessor;
    private final int myQueueCapacity;
    private transient Function<File, AutoSupplier<T>> myReaderFactory = null;
    private final Throughput myReaderManager;
    private final ShardedFile myShards;
    private final Throughput myWriterManger;

    public static <T> Builder<T> newBuilder(File directory, DataInterpreter<T> interpreter) {
        return new Builder<T>(directory, interpreter);
    }

    public static <T> BatchNode<T> newInstance(File directory, DataInterpreter<T> interpreter) {
        return BatchNode.newBuilder(directory, interpreter).build();
    }

    BatchNode(Builder<T> builder) {
        this.myShards = builder.getShardedFile();
        this.myParallelism = builder.getParallelism();
        this.myInterpreter = builder.getInterpreter();
        this.myDistributor = builder.getDistributor();
        this.myProcessor = builder.getProcessor();
        this.myQueueCapacity = builder.getQueueCapacity();
        this.myWriterManger = new Throughput();
        this.myReaderManager = new Throughput();
        String name = builder.getName();
        MBeanUtils.register(this.myWriterManger, name + "-Writer");
        MBeanUtils.register(this.myReaderManager, name + "-Reader");
    }

    public void dispose() {
        this.myShards.delete();
    }

    public AutoConsumer<T> newWriter() {
        return ((ToFileWriter.Builder)((ToFileWriter.Builder)((ToFileWriter.Builder)ToFileWriter.newBuilder(this.myShards).queue(this.myQueueCapacity)).parallelism(this.myParallelism)).statistics(this.myWriterManger)).build(this.myDistributor, shard -> DataWriter.of(shard, this.myInterpreter));
    }

    public void processAll(Consumer<T> consumer) {
        this.myProcessor.process(this.myShards.files(), this.myParallelism, (W shard) -> this.process((File)shard, consumer));
    }

    public void processAll(Supplier<Consumer<T>> consumerFactory) {
        this.processMapped(() -> new TwoStepWrapper(consumerFactory), DUMMY);
    }

    public <H> void processMapped(Supplier<TwoStepMapper<T, H>> mapper, Consumer<H> consumer) {
        ThreadLocal threadLocal = ThreadLocal.withInitial(mapper);
        this.myProcessor.process(this.myShards.files(), this.myParallelism, (W shard) -> this.process((File)shard, threadLocal::get, consumer));
    }

    public <R> R reduceMapped(Supplier<TwoStepMapper<T, R>> mapper) {
        TwoStepMapper<T, R> totalResults = mapper.get();
        this.processMapped(mapper, totalResults::merge);
        return totalResults.getResults();
    }

    private Function<File, AutoSupplier<T>> getReaderFactory() {
        if (this.myReaderFactory == null) {
            Function<File, DataReader> baseReader = file -> DataReader.of(file, this.myInterpreter);
            this.myReaderFactory = file -> ((FromFileReader.Builder)((FromFileReader.Builder)((FromFileReader.Builder)FromFileReader.newBuilder(file).parallelism(1)).queue(this.myQueueCapacity / this.myParallelism.getAsInt())).statistics(this.myReaderManager)).build(baseReader);
        }
        return this.myReaderFactory;
    }

    private AutoSupplier<T> newReader(File file) {
        return this.getReaderFactory().apply(file);
    }

    private void process(File shard, Consumer<T> consumer) {
        try (AutoSupplier<T> reader = this.newReader(shard);){
            Object item = null;
            while (true) {
                T t = reader.read();
                item = t;
                if (t != null) {
                    consumer.accept(item);
                    continue;
                }
                break;
            }
        }
        catch (Exception cause) {
            throw new RuntimeException(cause);
        }
    }

    private <G> void process(File shard, Supplier<TwoStepMapper<T, G>> aggregatorSupplier, Consumer<G> consumer) {
        TwoStepMapper<T, G> aggregator = aggregatorSupplier.get();
        try (AutoSupplier<T> reader = this.newReader(shard);){
            Object item = null;
            while (true) {
                T t = reader.read();
                item = t;
                if (t == null) break;
                aggregator.consume(item);
            }
            consumer.accept(aggregator.getResults());
            aggregator.reset();
        }
        catch (Exception cause) {
            throw new RuntimeException(cause);
        }
    }

    private static final class TwoStepWrapper<T>
    implements TwoStepMapper<T, Boolean> {
        private final Consumer<T> myActualConsumer;

        TwoStepWrapper(Supplier<Consumer<T>> consumerFactory) {
            this.myActualConsumer = consumerFactory.get();
        }

        @Override
        public void consume(T item) {
            this.myActualConsumer.accept(item);
        }

        @Override
        public Boolean getResults() {
            return Boolean.TRUE;
        }

        @Override
        public void merge(Boolean aggregate) {
        }

        @Override
        public void reset() {
        }
    }

    public static final class Builder<T> {
        private final File myDirectory;
        private ToIntFunction<T> myDistributor = obj -> ThreadLocalRandom.current().nextInt();
        private ExecutorService myExecutor = null;
        private int myFragmentation = 64;
        private final DataInterpreter<T> myInterpreter;
        private int myParallelism = Parallelism.CORES.getAsInt();
        private int myQueueCapacity = 1024;

        Builder(File directory, DataInterpreter<T> interpreter) {
            this.myDirectory = directory;
            this.myInterpreter = interpreter;
        }

        public BatchNode<T> build() {
            return new BatchNode(this);
        }

        public Builder<T> distributor(ToIntFunction<T> distributor) {
            this.myDistributor = distributor;
            return this;
        }

        public Builder<T> executor(ExecutorService executor) {
            this.myExecutor = executor;
            return this;
        }

        public Builder<T> fragmentation(int fragmentation) {
            this.myFragmentation = fragmentation;
            return this;
        }

        public Builder<T> parallelism(int parallelism) {
            this.myParallelism = parallelism;
            return this;
        }

        public Builder<T> parallelism(IntSupplier parallelism) {
            return this.parallelism(parallelism.getAsInt());
        }

        public Builder<T> queue(int capacity) {
            this.myQueueCapacity = capacity;
            return this;
        }

        ToIntFunction<T> getDistributor() {
            return this.myDistributor;
        }

        int getFragmentation() {
            int parallelism = this.getParallelism().getAsInt();
            int factor = PowerOf2.adjustUp(Math.max(parallelism, this.myFragmentation) / parallelism);
            return factor * parallelism;
        }

        DataInterpreter<T> getInterpreter() {
            return this.myInterpreter;
        }

        String getName() {
            return "BatchNode-" + this.myDirectory.getName();
        }

        IntSupplier getParallelism() {
            return () -> PowerOf2.adjustDown(Math.min(this.myParallelism, this.myFragmentation));
        }

        ProcessingService getProcessor() {
            if (this.myExecutor != null) {
                return new ProcessingService(this.myExecutor);
            }
            return ProcessingService.newInstance(this.getName());
        }

        int getQueueCapacity() {
            return this.myQueueCapacity;
        }

        ShardedFile getShardedFile() {
            return ShardedFile.of(this.myDirectory, "Shard.data", this.getFragmentation());
        }
    }
}

