package com.sun.messaging.jmq.jmsserver.multibroker;

import com.sun.messaging.jmq.io.SysMessageID;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.core.Consumer;
import com.sun.messaging.jmq.jmsserver.core.Destination;
import com.sun.messaging.jmq.jmsserver.core.DestinationUID;
import com.sun.messaging.jmq.jmsserver.core.PacketReference;
import com.sun.messaging.jmq.jmsserver.core.Subscription;
import com.sun.messaging.jmq.jmsserver.service.ConnectionUID;
import com.sun.messaging.jmq.jmsserver.util.BrokerException;
import com.sun.messaging.jmq.jmsserver.util.MQThread;
import com.sun.messaging.jmq.jmsserver.util.lists.RemoveReason;
import com.sun.messaging.jmq.transport.httptunnel.HttpTunnelDefaults;
import com.sun.messaging.jmq.util.lists.EventListener;
import com.sun.messaging.jmq.util.lists.EventType;
import com.sun.messaging.jmq.util.lists.Reason;
import com.sun.messaging.jmq.util.log.Logger;
import com.sun.messaging.jmq.util.selector.SelectorFormatException;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:119132-04/SUNWiqu/reloc/usr/share/lib/imq/imqbroker.jar:com/sun/messaging/jmq/jmsserver/multibroker/BrokerConsumers.class */
public class BrokerConsumers implements Runnable, ClusterConnection, EventListener {
    Protocol protocol;
    public static int BTOBFLOW;
    static Class class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers;
    static final boolean $assertionsDisabled;
    Thread thr = null;
    Logger logger = Globals.getLogger();
    boolean valid = true;
    Set activeConsumers = Collections.synchronizedSet(new HashSet());
    Map consumers = Collections.synchronizedMap(new HashMap());
    Map listeners = Collections.synchronizedMap(new HashMap());
    Map deliveredMessages = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:119132-04/SUNWiqu/reloc/usr/share/lib/imq/imqbroker.jar:com/sun/messaging/jmq/jmsserver/multibroker/BrokerConsumers$ackEntry.class */
    public class ackEntry {
        com.sun.messaging.jmq.jmsserver.core.ConsumerUID uid;
        com.sun.messaging.jmq.jmsserver.core.ConsumerUID storedcid;
        WeakReference pref;
        SysMessageID id;
        com.sun.messaging.jmq.jmsserver.core.BrokerAddress address;
        static final boolean $assertionsDisabled;
        private final BrokerConsumers this$0;

        public ackEntry(BrokerConsumers brokerConsumers, SysMessageID sysMessageID, com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID, com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress) {
            this.this$0 = brokerConsumers;
            this.uid = null;
            this.storedcid = null;
            this.pref = null;
            this.id = null;
            this.address = null;
            if (!$assertionsDisabled && sysMessageID == null) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && consumerUID == null) {
                throw new AssertionError();
            }
            this.id = sysMessageID;
            this.uid = consumerUID;
            this.address = brokerAddress;
            this.pref = null;
        }

        public com.sun.messaging.jmq.jmsserver.core.BrokerAddress getBrokerAddress() {
            return this.address;
        }

        public com.sun.messaging.jmq.jmsserver.core.ConsumerUID getConsumerUID() {
            return this.uid;
        }

        public SysMessageID getSysMessageID() {
            return this.id;
        }

        public PacketReference getReference() {
            return (PacketReference) this.pref.get();
        }

        public ackEntry(BrokerConsumers brokerConsumers, PacketReference packetReference, com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID, com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID2) {
            this.this$0 = brokerConsumers;
            this.uid = null;
            this.storedcid = null;
            this.pref = null;
            this.id = null;
            this.address = null;
            this.pref = new WeakReference(packetReference);
            this.id = packetReference.getSysMessageID();
            this.storedcid = consumerUID2;
            this.uid = consumerUID;
        }

        public void acknowledged(boolean z) {
            if (!$assertionsDisabled && this.pref == null) {
                throw new AssertionError();
            }
            PacketReference packetReference = (PacketReference) this.pref.get();
            if (packetReference == null) {
                try {
                    packetReference = Destination.get(this.id);
                } catch (Exception e) {
                    this.this$0.logger.logStack(4, "Unable to process acknowledgement, Ignoring", e);
                    if (!$assertionsDisabled) {
                        throw new AssertionError(packetReference);
                    }
                    return;
                }
            }
            if (packetReference == null) {
                return;
            }
            if (packetReference.acknowledged(this.uid, this.storedcid, !this.uid.isDupsOK(), z)) {
                Destination.getDestination(packetReference.getDestinationUID()).removeMessage(packetReference.getSysMessageID(), RemoveReason.ACKNOWLEDGED);
            }
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof ackEntry)) {
                return false;
            }
            ackEntry ackentry = (ackEntry) obj;
            return this.uid.equals(ackentry.uid) && this.id.equals(ackentry.id);
        }

        public int hashCode() {
            return (this.id.hashCode() * 15) + this.uid.hashCode();
        }

        static {
            Class cls;
            if (BrokerConsumers.class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers == null) {
                cls = BrokerConsumers.class$("com.sun.messaging.jmq.jmsserver.multibroker.BrokerConsumers");
                BrokerConsumers.class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers = cls;
            } else {
                cls = BrokerConsumers.class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers;
            }
            $assertionsDisabled = !cls.desiredAssertionStatus();
        }
    }

    public BrokerConsumers(Protocol protocol) {
        this.protocol = null;
        this.protocol = protocol;
        MQThread mQThread = new MQThread(this, "Broker Monitor");
        mQThread.setDaemon(true);
        mQThread.start();
    }

    public void destroy() {
        this.valid = false;
        synchronized (this.activeConsumers) {
            this.activeConsumers.notify();
        }
    }

    @Override // com.sun.messaging.jmq.util.lists.EventListener
    public void eventOccured(EventType eventType, Reason reason, Object obj, Object obj2, Object obj3, Object obj4) {
        if (eventType != EventType.BUSY_STATE_CHANGED && !$assertionsDisabled) {
            throw new AssertionError();
        }
        Consumer consumer = (Consumer) obj;
        synchronized (this.activeConsumers) {
            if (consumer.isBusy()) {
                this.activeConsumers.add(consumer);
            }
            this.activeConsumers.notify();
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void removeConsumers(com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress) throws BrokerException {
        HashSet hashSet = new HashSet();
        synchronized (this.consumers) {
            for (com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID : this.consumers.keySet()) {
                if (brokerAddress.equals(consumerUID.getBrokerAddress())) {
                    hashSet.add(consumerUID);
                }
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            removeConsumer((com.sun.messaging.jmq.jmsserver.core.ConsumerUID) it.next());
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void sendMessagesToRemote(PacketReference packetReference, Collection collection) {
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            Consumer consumer = (Consumer) it.next();
            com.sun.messaging.jmq.jmsserver.core.ConsumerUID storedConsumerUID = consumer.getStoredConsumerUID();
            if (!consumer.getConsumerUID().isNoAck()) {
                ackEntry ackentry = new ackEntry(this, packetReference, consumer.getConsumerUID(), storedConsumerUID);
                synchronized (this.deliveredMessages) {
                    this.deliveredMessages.put(ackentry, ackentry);
                }
            }
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void removeConsumers(ConnectionUID connectionUID) throws BrokerException {
        HashSet hashSet = new HashSet();
        synchronized (this.consumers) {
            for (com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID : this.consumers.keySet()) {
                if (connectionUID.equals(consumerUID.getConnectionUID())) {
                    hashSet.add(consumerUID);
                }
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            removeConsumer((com.sun.messaging.jmq.jmsserver.core.ConsumerUID) it.next());
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void removeConsumer(com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID) throws BrokerException {
        Consumer consumer = (Consumer) this.consumers.remove(consumerUID);
        if (consumer == null) {
            return;
        }
        consumer.pause("MultiBroker - removing consumer");
        Destination destination = Destination.getDestination(consumer.getDestinationUID());
        Object remove = this.listeners.remove(consumerUID);
        if (remove != null) {
            consumer.removeEventListener(remove);
        }
        this.activeConsumers.remove(consumer);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        synchronized (this.deliveredMessages) {
            Iterator it = this.deliveredMessages.values().iterator();
            while (it.hasNext()) {
                ackEntry ackentry = (ackEntry) it.next();
                if (ackentry.getConsumerUID() == consumerUID) {
                    it.remove();
                    if (consumer.isFalconRemote()) {
                        ackentry.acknowledged(false);
                    } else {
                        linkedHashSet.add(ackentry.getReference());
                    }
                }
            }
        }
        consumer.destroyConsumer(linkedHashSet, false, false);
        if (destination != null) {
            destination.removeConsumer(consumerUID, false);
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void acknowledgeMessageFromRemote(SysMessageID sysMessageID, com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID) {
        ackEntry ackentry = new ackEntry(this, sysMessageID, consumerUID, (com.sun.messaging.jmq.jmsserver.core.BrokerAddress) null);
        synchronized (this.deliveredMessages) {
            ackEntry ackentry2 = (ackEntry) this.deliveredMessages.remove(ackentry);
            if (ackentry2 == null) {
                return;
            }
            ackentry2.acknowledged(false);
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.multibroker.ClusterConnection
    public void addConsumer(Consumer consumer) throws BrokerException {
        com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID = consumer.getConsumerUID();
        if (!(consumer instanceof Subscription)) {
            this.consumers.put(consumerUID, consumer);
            this.listeners.put(consumerUID, consumer.addEventListener(this, EventType.BUSY_STATE_CHANGED, null));
        }
        DestinationUID destinationUID = consumer.getDestinationUID();
        int i = destinationUID.isQueue() ? 1 : 2;
        Destination destination = null;
        for (int i2 = 0; i2 < 2 && destination == null; i2++) {
            try {
                destination = Destination.getDestination(destinationUID.getName(), i, true, true);
                if (destination != null) {
                    try {
                        destination.incrementRefCount();
                        break;
                    } catch (BrokerException e) {
                        destination = null;
                    }
                }
            } catch (IOException e2) {
                throw new BrokerException(new StringBuffer().append("Unable to autocreate destination ").append(destinationUID).toString(), e2);
            }
        }
        if (destination == null) {
            throw new BrokerException(new StringBuffer().append("Unable to attach to destination ").append(destinationUID).toString());
        }
        try {
            if (consumer.getDestinationUID().isQueue() || (consumer instanceof Subscription) || consumer.getSubscription() != null) {
                int i3 = 100;
                if (destination != null) {
                    i3 = destination.getMaxPrefetch();
                } else {
                    this.logger.log(8, new StringBuffer().append("Internal Error: Unknown destination for consumer ").append(consumer.getDestinationUID()).append(" can not determine prefetch ").toString());
                }
                if (i3 > BTOBFLOW) {
                    i3 = BTOBFLOW;
                }
                consumer.setPrefetch(i3);
            } else {
                consumer.setFalconRemote(true);
            }
            if (consumer.getSubscription() == null) {
                destination.addConsumer(consumer, false);
            }
            if (!(consumer instanceof Subscription) && consumer.isBusy()) {
                synchronized (this.activeConsumers) {
                    this.activeConsumers.add(consumer);
                    this.activeConsumers.notify();
                }
            }
        } catch (SelectorFormatException e3) {
            StringBuffer append = new StringBuffer().append("unable to add destination ");
            throw new BrokerException(append.append(append).toString(), e3);
        } finally {
            destination.decrementRefCount();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        PacketReference andFillNextPacket;
        while (this.valid) {
            Consumer consumer = null;
            synchronized (this.activeConsumers) {
                while (this.valid && this.activeConsumers.isEmpty()) {
                    try {
                        this.activeConsumers.wait();
                    } catch (InterruptedException e) {
                    }
                }
                if (this.valid) {
                    Iterator it = this.activeConsumers.iterator();
                    consumer = (Consumer) it.next();
                    it.remove();
                    if (consumer.isBusy()) {
                        this.activeConsumers.add(consumer);
                    }
                }
            }
            if (consumer != null && (andFillNextPacket = consumer.getAndFillNextPacket(null)) != null) {
                HashSet hashSet = new HashSet();
                hashSet.add(consumer);
                boolean z = andFillNextPacket.getMessageDeliveredAck(consumer.getConsumerUID()) || consumer.isPaused();
                if (!consumer.getConsumerUID().isNoAck()) {
                    ackEntry ackentry = new ackEntry(this, andFillNextPacket, consumer.getConsumerUID(), consumer.getStoredConsumerUID());
                    synchronized (this.deliveredMessages) {
                        this.deliveredMessages.put(ackentry, ackentry);
                    }
                }
                this.protocol.sendMessage(andFillNextPacket, hashSet, z);
            }
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers == null) {
            cls = class$("com.sun.messaging.jmq.jmsserver.multibroker.BrokerConsumers");
            class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers = cls;
        } else {
            cls = class$com$sun$messaging$jmq$jmsserver$multibroker$BrokerConsumers;
        }
        $assertionsDisabled = !cls.desiredAssertionStatus();
        BTOBFLOW = Globals.getConfig().getIntProperty("imq.cluster.consumerFlowLimit", HttpTunnelDefaults.MIN_RETRANSMIT_PERIOD);
    }
}
