package com.sun.messaging.jmq.jmsserver.core;

import com.sun.messaging.jmq.io.DestMetricsCounters;
import com.sun.messaging.jmq.io.SysMessageID;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.service.ConnectionUID;
import com.sun.messaging.jmq.jmsserver.util.BrokerException;
import com.sun.messaging.jmq.jmsserver.util.FeatureUnavailableException;
import com.sun.messaging.jmq.jmsserver.util.lists.RemoveReason;
import com.sun.messaging.jmq.transport.httptunnel.HttpTunnelDefaults;
import com.sun.messaging.jmq.util.lists.EventType;
import com.sun.messaging.jmq.util.lists.Reason;
import com.sun.messaging.jmq.util.selector.Selector;
import com.sun.messaging.jmq.util.selector.SelectorFormatException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;

/* loaded from: input_file:119133-01/SUNWiqu/reloc/usr/share/lib/imq/imqbroker.jar:com/sun/messaging/jmq/jmsserver/core/Topic.class */
public class Topic extends Destination {
    static final long serialVersionUID = -5748515630523651753L;
    private transient Map selectorToInterest;
    private transient List selectors;
    private transient Map remoteConsumers;
    private boolean hasNoLocalConsumers;
    int maxSharedConsumers;
    int sharedPrefetch;
    public static final String MAX_SHARE_CONSUMERS = "max_shared_consumers";
    public static final String SHARED_PREFETCH = "sharedPrefetch";
    private static int TOPIC_DEFAULT_PREFETCH = Globals.getConfig().getIntProperty("imq.autocreate.topic.consumerFlowLimit", HttpTunnelDefaults.MIN_RETRANSMIT_PERIOD);
    public static int AUTO_MAX_SHARED_CONSUMER_LIMIT = Globals.getConfig().getIntProperty("imq.autocreate.topic.maxNumSharedConsumers", -1);
    public static int AUTO_MAX_SHARED_FLOW_LIMIT = Globals.getConfig().getIntProperty("imq.autocreate.topic.sharedConsumerFlowLimit", 5);
    public static int ADMIN_MAX_SHARED_CONSUMER_LIMIT = Globals.getConfig().getIntProperty("imq.admincreate.topic.maxNumSharedConsumers", -1);
    public static int ADMIN_MAX_SHARED_FLOW_LIMIT = Globals.getConfig().getIntProperty("imq.admincreate.topic.sharedConsumerFlowLimit", 5);

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public Hashtable getDebugState() {
        Hashtable debugState = super.getDebugState();
        Hashtable hashtable = new Hashtable();
        synchronized (this.selectorToInterest) {
            for (Selector selector : this.selectorToInterest.keySet()) {
                Set set = (Set) this.selectorToInterest.get(selector);
                Vector vector = new Vector();
                synchronized (set) {
                    Iterator it = set.iterator();
                    while (it.hasNext()) {
                        vector.add(String.valueOf(((Consumer) it.next()).getConsumerUID().longValue()));
                    }
                }
                hashtable.put(selector == null ? "no selector" : selector.toString(), vector);
            }
        }
        debugState.put("selectorInfo", hashtable);
        debugState.put(MAX_SHARE_CONSUMERS, new Integer(this.maxSharedConsumers));
        debugState.put(SHARED_PREFETCH, new Integer(this.sharedPrefetch));
        return debugState;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Topic(DestinationUID destinationUID) {
        super(destinationUID);
        this.selectorToInterest = null;
        this.selectors = null;
        this.remoteConsumers = null;
        this.hasNoLocalConsumers = false;
        this.maxSharedConsumers = 0;
        this.sharedPrefetch = 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Topic(String str, int i, boolean z, ConnectionUID connectionUID, boolean z2) throws FeatureUnavailableException, BrokerException, IOException {
        super(str, i, z, connectionUID, z2);
        this.selectorToInterest = null;
        this.selectors = null;
        this.remoteConsumers = null;
        this.hasNoLocalConsumers = false;
        this.maxSharedConsumers = 0;
        this.sharedPrefetch = 0;
        this.maxPrefetch = TOPIC_DEFAULT_PREFETCH;
        if (z2) {
            this.maxSharedConsumers = AUTO_MAX_SHARED_CONSUMER_LIMIT;
            this.sharedPrefetch = AUTO_MAX_SHARED_FLOW_LIMIT;
        } else {
            this.maxSharedConsumers = ADMIN_MAX_SHARED_CONSUMER_LIMIT;
            this.sharedPrefetch = ADMIN_MAX_SHARED_FLOW_LIMIT;
        }
        this.selectorToInterest = new HashMap();
        this.selectors = new ArrayList();
        this.remoteConsumers = new HashMap();
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public int getUnackSize() {
        HashSet<PacketReference> hashSet;
        synchronized (this.destMessages) {
            hashSet = new HashSet(this.destMessages.values());
        }
        int i = 0;
        for (PacketReference packetReference : hashSet) {
            if (!packetReference.isInvalid() && !packetReference.isDestroyed() && packetReference.getDeliverCnt() - packetReference.getCompleteCnt() > 0) {
                i++;
            }
        }
        return i;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public Set routeAndMoveMessage(PacketReference packetReference, PacketReference packetReference2) throws IOException, BrokerException {
        throw new RuntimeException("XXX not implemented");
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public boolean queueMessage(PacketReference packetReference, boolean z) throws BrokerException {
        if (z || this.consumers.size() != 0) {
            return super.queueMessage(packetReference, z);
        }
        return false;
    }

    private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
        objectInputStream.defaultReadObject();
        this.selectorToInterest = new HashMap();
        this.selectors = new ArrayList();
        this.remoteConsumers = new HashMap();
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination, com.sun.messaging.jmq.util.lists.EventListener
    public void eventOccured(EventType eventType, Reason reason, Object obj, Object obj2, Object obj3, Object obj4) {
        super.eventOccured(eventType, reason, obj, obj2, obj3, obj4);
    }

    public void routeNewMessage(SysMessageID sysMessageID) throws BrokerException, SelectorFormatException {
        PacketReference packetReference = (PacketReference) this.destMessages.get(sysMessageID);
        forwardMessage(routeNewMessage(packetReference), packetReference);
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public int getConsumerCount() {
        int consumerCount = super.getConsumerCount();
        synchronized (this.remoteConsumers) {
            Iterator it = this.remoteConsumers.values().iterator();
            while (it.hasNext()) {
                consumerCount += ((RemoteConsumer) it.next()).getConsumerCount();
            }
        }
        return consumerCount;
    }

    private Set matchRemoteConsumers(PacketReference packetReference, Set set) throws BrokerException, SelectorFormatException {
        synchronized (this.remoteConsumers) {
            if (this.remoteConsumers.isEmpty()) {
                return set;
            }
            HashSet hashSet = new HashSet(set);
            for (RemoteConsumer remoteConsumer : this.remoteConsumers.values()) {
                if (remoteConsumer.match(packetReference, hashSet)) {
                    set.add(remoteConsumer);
                }
            }
            return hashSet;
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    protected ConsumerUID[] routeLoadedTransactionMessage(PacketReference packetReference) throws BrokerException, SelectorFormatException {
        HashSet hashSet = new HashSet();
        Map map = null;
        HashMap hashMap = null;
        synchronized (this.selectorToInterest) {
            for (Selector selector : this.selectorToInterest.keySet()) {
                if (selector != null) {
                    if (map == null && selector.usesProperties()) {
                        try {
                            map = packetReference.getProperties();
                        } catch (ClassNotFoundException e) {
                            logger.logStack(32, "INTERNAL ERROR", e);
                            map = new HashMap();
                        }
                    }
                    if (hashMap == null && selector.usesFields()) {
                        hashMap = packetReference.getHeaders();
                    }
                }
                if (selector == null || selector.match(map, hashMap)) {
                    if (DEBUG) {
                        logger.log(8, new StringBuffer().append("Selector ").append(selector).append(" Matches ").append(packetReference.getSysMessageID()).toString());
                    }
                    Set set = (Set) this.selectorToInterest.get(selector);
                    if (set != null) {
                        synchronized (set) {
                            hashSet.addAll(set);
                        }
                    }
                }
            }
        }
        HashSet hashSet2 = new HashSet();
        Iterator it = hashSet.iterator();
        ConnectionUID producingConnectionUID = packetReference.getProducingConnectionUID();
        String clientID = packetReference.getClientID();
        while (it.hasNext()) {
            Consumer consumer = (Consumer) it.next();
            if (!consumer.getNoLocal()) {
                hashSet2.add(consumer.getConsumerUID());
            } else if ((consumer instanceof Subscription) && clientID != null && ((Subscription) consumer).getClientID() != null && ((Subscription) consumer).getClientID().equals(clientID)) {
                it.remove();
            } else if (consumer.getConsumerUID().getConnectionUID() == producingConnectionUID) {
                it.remove();
            } else {
                hashSet2.add(consumer.getConsumerUID());
            }
        }
        return (ConsumerUID[]) hashSet2.toArray(new ConsumerUID[0]);
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public Set routeNewMessage(PacketReference packetReference) throws BrokerException, SelectorFormatException {
        Set set;
        HashSet hashSet = new HashSet();
        Map map = null;
        HashMap hashMap = null;
        for (int i = 0; i < this.selectors.size(); i++) {
            try {
                Selector selector = (Selector) this.selectors.get(i);
                if (selector == null) {
                    Set set2 = (Set) this.selectorToInterest.get(selector);
                    if (set2 == null) {
                        continue;
                    } else {
                        synchronized (set2) {
                            hashSet.addAll(set2);
                        }
                    }
                } else {
                    if (map == null && selector.usesProperties()) {
                        try {
                            map = packetReference.getProperties();
                        } catch (ClassNotFoundException e) {
                            logger.logStack(32, "INTERNAL ERROR", e);
                            map = new HashMap();
                        }
                    }
                    if (hashMap == null && selector.usesFields()) {
                        hashMap = packetReference.getHeaders();
                    }
                    if (selector.match(map, hashMap) && (set = (Set) this.selectorToInterest.get(selector)) != null) {
                        synchronized (set) {
                            hashSet.addAll(set);
                        }
                    }
                }
            } catch (Exception e2) {
            }
        }
        if (this.hasNoLocalConsumers) {
            Iterator it = hashSet.iterator();
            ConnectionUID producingConnectionUID = packetReference.getProducingConnectionUID();
            String clientID = packetReference.getClientID();
            while (it.hasNext()) {
                Consumer consumer = (Consumer) it.next();
                if (consumer.getNoLocal()) {
                    if ((consumer instanceof Subscription) && clientID != null && ((Subscription) consumer).getClientID() != null && ((Subscription) consumer).getClientID().equals(clientID)) {
                        it.remove();
                    } else if (consumer.getConsumerUID().getConnectionUID() == producingConnectionUID) {
                        it.remove();
                    }
                }
            }
        }
        if (hashSet.isEmpty()) {
            removeMessage(packetReference.getSysMessageID(), RemoveReason.ACKNOWLEDGED);
            return null;
        }
        packetReference.store(hashSet);
        return hashSet;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void forwardOrphanMessage(PacketReference packetReference, ConsumerUID consumerUID) throws BrokerException {
        Consumer consumer = getConsumer(consumerUID);
        if (consumer == null) {
            logger.log(4, new StringBuffer().append("Dumping orphan message ").append(packetReference).toString());
            try {
                if (packetReference.acknowledged(consumerUID, consumerUID, false, false)) {
                    removeMessage(packetReference.getSysMessageID(), RemoveReason.ACKNOWLEDGED);
                }
            } catch (Exception e) {
                logger.logStack(4, "Error forwarding orphan", e);
            }
        }
        HashSet hashSet = new HashSet();
        hashSet.add(consumer);
        forwardMessage(hashSet, packetReference);
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void forwardMessage(Set set, PacketReference packetReference) throws BrokerException {
        HashSet hashSet = null;
        if (set == null || set.isEmpty()) {
            removeMessage(packetReference.getSysMessageID(), RemoveReason.ACKNOWLEDGED);
        } else {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                Consumer consumer = (Consumer) it.next();
                if (consumer.isFalconRemote()) {
                    if (hashSet == null) {
                        hashSet = new HashSet();
                    }
                    hashSet.add(consumer);
                } else if (!consumer.routeMessage(packetReference, false)) {
                    try {
                        ConsumerUID consumerUID = consumer.getConsumerUID();
                        if (packetReference.acknowledged(consumerUID, consumer.getStoredConsumerUID(), !consumerUID.isUnsafeAck(), true)) {
                            removeMessage(packetReference.getSysMessageID(), null);
                        }
                    } catch (IOException e) {
                    }
                }
            }
        }
        if (hashSet == null || hashSet.isEmpty()) {
            return;
        }
        Globals.getClusterBroadcast().forwardMessage(packetReference, hashSet);
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public Consumer addConsumer(Consumer consumer, boolean z) throws BrokerException, SelectorFormatException {
        Set set;
        if ((consumer instanceof Subscription) && this.consumers.get(consumer.getConsumerUID()) != null) {
            return null;
        }
        super.addConsumer(consumer, z);
        this.hasNoLocalConsumers |= consumer.getNoLocal();
        Selector selector = consumer.getSelector();
        synchronized (this.selectorToInterest) {
            set = (Set) this.selectorToInterest.get(selector);
            if (set == null) {
                set = new HashSet();
                this.selectorToInterest.put(selector, set);
                this.selectors.add(selector);
            }
        }
        synchronized (set) {
            set.add(consumer);
        }
        return null;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void removeConsumer(ConsumerUID consumerUID, boolean z) throws BrokerException {
        Consumer consumer = (Consumer) this.consumers.get(consumerUID);
        if (consumer == null) {
            return;
        }
        synchronized (this.selectorToInterest) {
            Set set = (Set) this.selectorToInterest.get(consumer.getSelector());
            synchronized (set) {
                set.remove(consumer);
                if (set.isEmpty()) {
                    this.selectorToInterest.remove(consumer.getSelector());
                    this.selectors.remove(consumer.getSelector());
                }
            }
        }
        super.removeConsumer(consumerUID, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void getDestinationProps(Map map) {
        super.getDestinationProps(map);
        map.put(MAX_SHARE_CONSUMERS, new Integer(this.maxSharedConsumers));
        map.put(SHARED_PREFETCH, new Integer(this.sharedPrefetch));
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void setDestinationProperties(Map map) throws BrokerException {
        super.setDestinationProperties(map);
        if (map.get(MAX_SHARE_CONSUMERS) != null) {
            try {
                setMaxSharedConsumers(((Integer) map.get(MAX_SHARE_CONSUMERS)).intValue());
            } catch (Exception e) {
                logger.log(8, "Internal Error ", (Throwable) e);
            }
        }
        if (map.get(SHARED_PREFETCH) != null) {
            try {
                setSharedFlowLimit(((Integer) map.get(SHARED_PREFETCH)).intValue());
            } catch (Exception e2) {
                logger.log(8, "Internal Error ", (Throwable) e2);
            }
        }
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public DestMetricsCounters getMetrics() {
        return super.getMetrics();
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void setMaxSharedConsumers(int i) {
        this.maxSharedConsumers = i;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public void setSharedFlowLimit(int i) {
        this.sharedPrefetch = i;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public int getMaxNumSharedConsumers() {
        return this.maxSharedConsumers;
    }

    @Override // com.sun.messaging.jmq.jmsserver.core.Destination
    public int getSharedConsumerFlowLimit() {
        return this.sharedPrefetch;
    }
}
