import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.List; import org.apache.commons.httpclient.Credentials; import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.NTCredentials; import org.apache.commons.httpclient.SimpleHttpConnectionManager; import org.apache.commons.httpclient.auth.AuthScope; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.HttpConnectionManagerParams; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.logging.Log; import com.oaklandsw.http.AutomaticHttpRetryException; import com.oaklandsw.http.Callback; import com.oaklandsw.http.NtlmCredential; import com.oaklandsw.http.ntlm.Ntlm; import com.oaklandsw.util.LogUtils; import com.oaklandsw.util.Util; /** * HTTP client performance test example */ public class TestPerf { static final Log _log = LogUtils .makeLogger("com.oaklandsw.http.TestPerf"); byte[] _buffer; static final int RETRY_COUNT = 20; int _timesPerThread; int _totalTimes; int _threads = 1; int _repeatTestTimes = 1; int _repeatIndex = 0; int _expectedSize = -1; static final int REQ_NO_QUERY = 0; static final int REQ_DELAY = 1; static final int REQ_ID = 2; int _requestType; String _startTime; // Per thread - to synchronize _actualTimes between the blocking // thread and the writing thread Thread _threadObjects[]; int _actualTimes[]; int _totalActualTimes; int _failed; int _dumpAllInterval; boolean _warmUp = true; // Careful with these, the implementation in PerfComparison test // depends on the order of this array static final int IMP_OAKLAND = 0; static final int IMP_SUN = 1; static final int IMP_JAKARTA = 2; static final int IMP_OAKLAND_PIPE = 3; int _implementation; static final String[] _impNames = { "Oakland", "Sun", "Jakarta", "Oakland" }; boolean _doClose; boolean _quiet; int _pipeDepth; // Use pipelining round-robin connection allocation boolean _pipeRr = true; int _maxConn; int _reqsPerConn; int _actualPipeMaxDepth; int _actualForcedFlushes; int _actualBufferFlushes; int _actualAvoidFlushes; int _actualRetries; int _actualPipelineRetries; // Append these in order to each url int _suffixIndex; String _urlString; boolean _ipv4 = true; boolean _doLog; long _totalTime; float _transTime; HttpConnectionManager _jakartaConnMgr; HttpClient _jakartaClient; boolean _useAuth; String _user = "httptest"; String _password = "httptestpw"; String _host = "repoman"; String _domain = "oaklandsw"; public class PipelineCallback implements Callback { int _threadNum; public PipelineCallback(int threadNum) { _threadNum = threadNum; } public int _responses; public void writeRequest(com.oaklandsw.http.HttpURLConnection urlCon, OutputStream os) { // Used only for a post request, write the data here } public void readResponse(com.oaklandsw.http.HttpURLConnection urlCon, InputStream is) { try { processStream(urlCon.getInputStream()); incrementActualtimes(_threadNum); } catch (AutomaticHttpRetryException arex) { System.out.println("Pipeline automatic retry: " + urlCon); // The read will be redriven return; } catch (IOException e) { System.out.println("ERROR - IOException: " + urlCon); e.printStackTrace(); } } public void error(com.oaklandsw.http.HttpURLConnection urlCon, InputStream is, Exception ex) { try { System.err.println("pipeline error method called: " + urlCon + ": " + urlCon.getResponseCode()); } catch (IOException e) { System.err.println("Error getting response code: " + e); } ex.printStackTrace(); } } public TestPerf() { } public void run(String args[]) throws Exception { for (int i = 0; i < args.length; i++) { if (args[i].equalsIgnoreCase("-timesThread")) { _timesPerThread = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-times")) { _totalTimes = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-repeat")) { _repeatTestTimes = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-threads")) { _threads = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-dump")) { _dumpAllInterval = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-sun")) { _implementation = IMP_SUN; } else if (args[i].equalsIgnoreCase("-jakarta")) { _implementation = IMP_JAKARTA; } else if (args[i].equalsIgnoreCase("-ip4")) { _ipv4 = true; } else if (args[i].equalsIgnoreCase("-ip6")) { _ipv4 = false; } else if (args[i].equalsIgnoreCase("-url")) { _urlString = args[++i]; } else if (args[i].equalsIgnoreCase("-maxconn")) { _maxConn = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-pipe")) { _implementation = IMP_OAKLAND_PIPE; } else if (args[i].equalsIgnoreCase("-pipedepth")) { _pipeDepth = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-pipenorr")) { _pipeRr = false; } else if (args[i].equalsIgnoreCase("-reqsperconn")) { _reqsPerConn = Integer.parseInt(args[++i]); } else if (args[i].equalsIgnoreCase("-log")) { _doLog = true; } else if (args[i].equalsIgnoreCase("-nowarmup")) { _warmUp = false; } else if (args[i].equalsIgnoreCase("-close")) { _doClose = true; } else if (args[i].equalsIgnoreCase("-quiet")) { _quiet = true; } else if (args[i].equalsIgnoreCase("-auth")) { _useAuth = true; } else if (args[i].equalsIgnoreCase("-help")) { usage(); return; } else if (args[i].equalsIgnoreCase("")) { // Silently ignore empty args } else { System.out.println("Arg: " + args[i] + " ignored"); } } run(); // For profiling // Thread.sleep(1000000); } public void log(String str) { _log.info(str); System.out.println(str); } public void run() throws Exception { // LogUtils.logFile("/home/francis/log4jperf1.txt"); if (_expectedSize >= 0) { _buffer = new byte[_expectedSize]; } else { // 1MB _buffer = new byte[(int)Math.pow(2, 20)]; } for (int i = 0; i < _repeatTestTimes; i++) { runSingleTest(); _repeatIndex++; } } public void runSingleTest() throws Exception { if (_urlString == null) { System.out.println("Please specify a URL using -url"); return; } if (_timesPerThread != 0 && _totalTimes != 0 && _timesPerThread != _totalTimes / _threads) { System.out .println("Specify either -times or -timesThread, but not both"); return; } if (_ipv4) System.setProperty("java.net.preferIPv4Stack", "true"); else System.setProperty("java.net.preferIPv6Stack", "true"); if (_totalTimes != 0) _timesPerThread = _totalTimes / _threads; com.oaklandsw.http.HttpURLConnection.resetStatistics(); com.oaklandsw.http.HttpURLConnection.closeAllPooledConnections(); if (_doLog) LogUtils.logAll(); // LogUtils.logNone(); // Implementation specific setup switch (_implementation) { case IMP_OAKLAND: case IMP_OAKLAND_PIPE: // Make sure we don't get stuck for a long time if something // hangs com.oaklandsw.http.HttpURLConnection .setDefaultRequestTimeout(5000); com.oaklandsw.http.HttpURLConnection .setDefaultConnectionTimeout(5000); com.oaklandsw.http.HttpURLConnection .setDefaultMaxTries(RETRY_COUNT); if (_maxConn > 0) { com.oaklandsw.http.HttpURLConnection .setMaxConnectionsPerHost(_maxConn); } else { com.oaklandsw.http.HttpURLConnection .setMaxConnectionsPerHost(com.oaklandsw.http.HttpURLConnection.DEFAULT_MAX_CONNECTIONS); } if (_reqsPerConn > 0) { com.oaklandsw.http.HttpURLConnection .setDefaultConnectionRequestLimit(_reqsPerConn); } else { com.oaklandsw.http.HttpURLConnection .setDefaultConnectionRequestLimit(); } if (_implementation == IMP_OAKLAND_PIPE) { if (_pipeDepth != 0) { com.oaklandsw.http.HttpURLConnection .setDefaultPipeliningMaxDepth(_pipeDepth); } else { com.oaklandsw.http.HttpURLConnection .setDefaultPipeliningMaxDepth(0); } int options = com.oaklandsw.http.HttpURLConnection.PIPE_STANDARD_OPTIONS; if (!_pipeRr) options &= ~com.oaklandsw.http.HttpURLConnection.PIPE_MAX_CONNECTIONS; com.oaklandsw.http.HttpURLConnection .setDefaultPipeliningOptions(options); } else { com.oaklandsw.http.HttpURLConnection .setDefaultCallback(null); com.oaklandsw.http.HttpURLConnection .setDefaultPipelining(false); } if (_useAuth) { if (false) { // Jakarta config Ntlm._authMessageLmResponse = Ntlm.V1; Ntlm._authMessageNtResponse = Ntlm.NONE; Ntlm._authMessageFlags = 0x5206; } SampleUserAgent ua = new SampleUserAgent(); ua._normalCredential = new NtlmCredential(); ua._normalCredential.setUser(_user); ua._normalCredential.setPassword(_password); ua._normalCredential.setDomain(_domain); ua._normalCredential.setHost(_host); com.oaklandsw.http.HttpURLConnection .setDefaultUserAgent(ua); com.oaklandsw.http.HttpURLConnection .setNtlmPreferredEncoding(com.oaklandsw.http.HttpURLConnection.NTLM_ENCODING_OEM); } break; case IMP_SUN: // Note this is only respected on JRE 5 and 6 if (_maxConn > 0) { System.setProperty("http.maxConnections", Integer .toString(_maxConn)); } else { System.getProperties().remove("http.maxConnections"); } System.setProperty("http.keepAlive", "true"); break; case IMP_JAKARTA: HttpConnectionManagerParams params = new HttpConnectionManagerParams(); if (_maxConn > 0) params.setDefaultMaxConnectionsPerHost(_maxConn); // This makes it go faster, but does not match Oakland // implementation params.setStaleCheckingEnabled(false); // Set retry count, and turn on retrying a method that has // successfully been sent to match the Oakland implementation params .setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(RETRY_COUNT, true)); if (_threads == 1) _jakartaConnMgr = new SimpleHttpConnectionManager(); else _jakartaConnMgr = new MultiThreadedHttpConnectionManager(); _jakartaConnMgr.setParams(params); _jakartaClient = new HttpClient(_jakartaConnMgr); if (_useAuth) { Credentials defaultcreds = new NTCredentials(_user, _password, _host, _domain); _jakartaClient.getState().setCredentials(AuthScope.ANY, defaultcreds); } break; } if (!_quiet) { log("Start Time: " + _startTime); log("Using: " + _impNames[_implementation] + " implementation"); log("URL: " + _urlString); log("Times/thread: " + _timesPerThread); log("Threads: " + _threads); if (_implementation == IMP_OAKLAND_PIPE) log("Pipelining"); if (_maxConn > 0) log("Using max: " + _maxConn + " connections."); if (_pipeDepth > 0) log("Using pipeline depth: " + _pipeDepth); if (_reqsPerConn > 0) { log("Using max: " + _reqsPerConn + " requests/connection."); } // log("http.maxConnections: // " + // System.getProperty("http.maxConnections")); } _actualTimes = null; // Do one to initialize everything but don't count that // in the timing if (_warmUp) { runOneThread(1, 0); } // Start clean System.gc(); _totalActualTimes = 0; long startTime = System.currentTimeMillis(); // log("START time: " + startTime); runAllThreads(); long endTime = System.currentTimeMillis(); _totalTime = endTime - startTime; // log("END/Total time: " + endTime + "/" + _totalTime); for (int i = 0; i < _actualTimes.length; i++) { synchronized (_threadObjects[i]) { _totalActualTimes += _actualTimes[i]; } } if (_totalActualTimes != _timesPerThread * _threads) { log("!!!Expected " + (_timesPerThread * _threads) + " got: " + _totalActualTimes); // Make it a high number since the test results are invalid _transTime = 999999; } else { _transTime = (float)_totalTime / (float)_totalActualTimes; } _actualPipeMaxDepth = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_PIPELINE_DEPTH_HIGH); _actualForcedFlushes = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_FORCED_FLUSHES); _actualBufferFlushes = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_BUFFER_FLUSHES); _actualAvoidFlushes = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_AVOIDED_FLUSHES); _actualPipelineRetries = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_TOTAL_PIPELINE_RETRY); _actualRetries = com.oaklandsw.http.HttpURLConnection .getConnectionManager() .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_TOTAL_RETRY); if (!_quiet) { if (_implementation == IMP_OAKLAND || _implementation == IMP_OAKLAND_PIPE) { log("connection pool before close"); log(com.oaklandsw.http.HttpURLConnection.dumpAll()); if (_doClose) { com.oaklandsw.http.HttpURLConnection .closeAllPooledConnections(); log("connection pool after close"); log(com.oaklandsw.http.HttpURLConnection.dumpAll()); } } log("Total Time (ms): " + _totalTime + " Time/trans (ms): " + _transTime); } } static void usage() { System.out.println("java TestPerf [options]"); System.out.println("options: "); System.out.println(" -url - url to send to"); System.out .println(" -sun - use JRE implementation (oakland is default)"); System.out .println(" -jakarta - use Jakarta implementation (oakland is default)"); System.out.println(" -pipe - test with pipelining (oakland only)"); System.out .println(" -pipenorr - don't use standard round-robin pipline connection alloc (oakland only)"); System.out.println(" -pipedepth - set max pipeline depth"); System.out .println(" -times - total number of times (all threads) (def 1)"); System.out.println(" -repeat - run test num times (def 1)"); System.out .println(" -timesThread - number of times per thread (def 1)"); System.out.println(" -threads - number of threads (def 1)"); System.out.println(" -repeat - run test num times (def 1)"); System.out.println(" -nowarm - include the initialization in timing"); System.out.println(" -ip4 - prefer the IPv4 stack (default)"); System.out.println(" -ip6 - prefer the IPv6 stack"); System.out.println(" -close - explicitly close all connections"); System.out.println(" -auth - use authentication"); System.out.println(" -quiet - don't print anything"); System.out .println(" -dump - dump the statistics at this interval"); } void runOne(PipelineCallback cb, int times, int threadNum) throws Exception { for (int i = 0; i < times; i++) { HttpURLConnection urlCon; String urlToUse = _urlString; URL url; char queryStart = '?'; if (_urlString.indexOf("?") >= 0) queryStart = '&'; switch (_requestType) { case REQ_NO_QUERY: urlToUse = _urlString; break; case REQ_ID: urlToUse = _urlString + queryStart + "rep=" + _repeatIndex + "&seq=" + i; break; case REQ_DELAY: urlToUse = _urlString + queryStart + "delay=5"; break; } url = new URL(urlToUse); int responseCode = 0; switch (_implementation) { case IMP_OAKLAND: urlCon = com.oaklandsw.http.HttpURLConnection .openConnection(url); responseCode = urlCon.getResponseCode(); processStream(urlCon.getInputStream()); incrementActualtimes(threadNum); break; case IMP_OAKLAND_PIPE: urlCon = com.oaklandsw.http.HttpURLConnection .openConnection(url); ((com.oaklandsw.http.HttpURLConnection)urlCon) .setCallback(cb); ((com.oaklandsw.http.HttpURLConnection)urlCon) .pipelineExecute(); break; case IMP_SUN: urlCon = (HttpURLConnection)url.openConnection(); responseCode = urlCon.getResponseCode(); processStream(urlCon.getInputStream()); incrementActualtimes(threadNum); break; case IMP_JAKARTA: GetMethod method = new GetMethod(urlToUse); _jakartaClient.executeMethod(method); responseCode = method.getStatusCode(); // Read the stream InputStream is = method.getResponseBodyAsStream(); processStream(is); method.releaseConnection(); incrementActualtimes(threadNum); break; default: Util.impossible("Invalid implementation: " + _implementation); // throws } if (_implementation != IMP_OAKLAND_PIPE && responseCode != 200) { throw new RuntimeException("Bad response code: " + responseCode); } } } // Runs a thread for the test void runOneThread(int times, int threadNum) { try { PipelineCallback cb = null; if (_implementation == IMP_OAKLAND_PIPE) { cb = new PipelineCallback(threadNum); } runOne(cb, times, threadNum); if (_implementation == IMP_OAKLAND_PIPE) com.oaklandsw.http.HttpURLConnection.pipelineBlock(); } catch (Exception ex) { System.out.println("Thread exception: " + ex); ex.printStackTrace(); _failed = 1; } } void incrementActualtimes(int threadNum) { if (_actualTimes == null) return; synchronized (_threadObjects[threadNum]) { _actualTimes[threadNum]++; if (_dumpAllInterval > 0 && (_actualTimes[threadNum] % _dumpAllInterval == 0)) { System.out.println(_actualTimes + " ---------------------------------------"); System.out.println(com.oaklandsw.http.HttpURLConnection .dumpAll()); } } } void runAllThreads() throws Exception { Thread t; _failed = 0; _actualTimes = new int[_threads]; _threadObjects = new Thread[_threads]; List threads = new ArrayList(); for (int i = 0; i < _threads; i++) { final int threadNum = i; t = new Thread() { public void run() { Thread.currentThread().setName("TestPerf" + _repeatIndex + "_" + threadNum); runOneThread(_timesPerThread, threadNum); } }; _threadObjects[threadNum] = t; t.start(); threads.add(t); } for (int i = 0; i < threads.size(); i++) { ((Thread)threads.get(i)).join(); } if (_failed > 0) System.out.println("One or more threads failed"); } public void processStream(InputStream inputStream) throws IOException { int totalBytes = 0; int nb = 0; if (_buffer.length > 0) { // Since the buffer is exactly the size of the response, we provide // the opportunity to read the entire thing with one read while (true) { nb = inputStream.read(_buffer); if (nb == -1) break; totalBytes += nb; } } if (_expectedSize != -1 && _expectedSize != totalBytes) { throw new RuntimeException("Expected: " + _expectedSize + " got: " + totalBytes); } inputStream.close(); } public static void main(String args[]) throws Exception { TestPerf tp = new TestPerf(); tp.run(args); } }