Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mqtt-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.4.0-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.eclipse.jetty.servlet.ServletHolder;

/**
* MQTT additional servlet.
Expand All @@ -36,8 +35,8 @@ public String getBasePath() {
}

@Override
public ServletHolder getServletHolder() {
return new ServletHolder(new MQTTServiceServlet(pulsarService));
public Object getServletInstance() {
return new MQTTServiceServlet(pulsarService);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion mqtt-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.4.0-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.streamnative.pulsar.handlers.mqtt.common.utils.EventParserUtils;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;

Expand Down
2 changes: 1 addition & 1 deletion mqtt-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.4.0-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt-proxy</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.streamnative.pulsar.handlers.mqtt.proxy.web;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException;
Expand All @@ -32,26 +31,25 @@
import org.apache.pulsar.broker.web.UnrecognizedPropertyExceptionMapper;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.jetty.metrics.JettyStatisticsCollector;
import org.eclipse.jetty.ee8.servlet.ServletContextHandler;
import org.eclipse.jetty.ee8.servlet.ServletHolder;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.ForwardedRequestCustomizer;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnectionLimit;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.QoSHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.resource.ResourceFactory;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
Expand Down Expand Up @@ -99,7 +97,7 @@ public WebService(MQTTProxyService proxyService) {
config.getHttpServerThreadPoolQueueSize());
this.server = new Server(webServiceExecutor);
if (config.getMaxHttpServerConnections() > 0) {
server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server));
server.addBean(new NetworkConnectionLimit(config.getMaxHttpServerConnections(), server));
}
List<ServerConnector> connectors = new ArrayList<>();

Expand Down Expand Up @@ -176,14 +174,15 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
if (attributeMap != null) {
attributeMap.forEach(servletContextHandler::setAttribute);
}
handlers.add(servletContextHandler);
handlers.add(servletContextHandler.get());
}

public void addStaticResources(String basePath, String resourcePath) {
ContextHandler capHandler = new ContextHandler();
capHandler.setContextPath(basePath);
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
ResourceFactory resourceFactory = ResourceFactory.root();
resHandler.setBaseResource(resourceFactory.newClassLoaderResource(resourcePath, true));
resHandler.setEtags(true);
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
Expand All @@ -192,19 +191,15 @@ public void addStaticResources(String basePath, String resourcePath) {

public void start() throws MQTTProxyException {
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
RequestLog requestLogger = JettyRequestLogFactory.createRequestLogger(false, server);
requestLogHandler.setRequestLog(requestLogger);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
server.setRequestLog(JettyRequestLogFactory.createRequestLogger(false, server));

ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
contexts.setHandlers(handlers);

Handler handlerForContexts = GzipHandlerUtil.wrapWithGzipHandler(contexts,
config.getHttpServerGzipCompressionExcludedPaths());
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] {handlerForContexts, new DefaultHandler(), requestLogHandler});
Handler.Collection handlerCollection = new Handler.Sequence();
handlerCollection.setHandlers(handlerForContexts, new DefaultHandler());

// Metrics handler
StatisticsHandler stats = new StatisticsHandler();
Expand All @@ -216,7 +211,14 @@ public void start() throws MQTTProxyException {
// Already registered. Eg: in unit tests
}

server.setHandler(stats);
Handler serverHandler = stats;
if (config.getMaxConcurrentHttpRequests() > 0) {
QoSHandler qoSHandler = new QoSHandler(serverHandler);
qoSHandler.setMaxRequestCount(config.getMaxConcurrentHttpRequests());
serverHandler = qoSHandler;
}
server.setHandler(serverHandler);

server.start();

if (httpConnector != null) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<version>3.4.0-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
<name>StreamNative :: Pulsar Protocol Handler :: MoP Parent</name>
<description>Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler.</description>

Expand Down
2 changes: 1 addition & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.4.0-SNAPSHOT</version>
<version>4.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pulsar-protocol-handler-mqtt-tests</artifactId>
Expand Down
Loading