import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NTCredentials;
import org.apache.commons.httpclient.SimpleHttpConnectionManager;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.logging.Log;

import com.oaklandsw.http.AutomaticHttpRetryException;
import com.oaklandsw.http.Callback;
import com.oaklandsw.http.NtlmCredential;
import com.oaklandsw.http.ntlm.Ntlm;
import com.oaklandsw.util.LogUtils;
import com.oaklandsw.util.Util;

/**
 * HTTP client performance test example
 */
public class TestPerf
{
    static final Log      _log             = LogUtils
                                                   .makeLogger("com.oaklandsw.http.TestPerf");

    byte[]                _buffer;

    static final int      RETRY_COUNT      = 20;

    int                   _timesPerThread;
    int                   _totalTimes;
    int                   _threads         = 1;

    int                   _repeatTestTimes = 1;
    int                   _repeatIndex     = 0;

    int                   _expectedSize    = -1;

    static final int      REQ_NO_QUERY     = 0;
    static final int      REQ_DELAY        = 1;
    static final int      REQ_ID           = 2;

    int                   _requestType;

    String                _startTime;

    // Per thread - to synchronize _actualTimes between the blocking
    // thread and the writing thread
    Thread                _threadObjects[];
    int                   _actualTimes[];

    int                   _totalActualTimes;

    int                   _failed;

    int                   _dumpAllInterval;

    boolean               _warmUp          = true;

    // Careful with these, the implementation in PerfComparison test
    // depends on the order of this array
    static final int      IMP_OAKLAND      = 0;
    static final int      IMP_SUN          = 1;
    static final int      IMP_JAKARTA      = 2;
    static final int      IMP_OAKLAND_PIPE = 3;

    int                   _implementation;

    static final String[] _impNames        = { "Oakland", "Sun", "Jakarta",
        "Oakland"                         };

    boolean               _doClose;
    boolean               _quiet;

    int                   _pipeDepth;
    // Use pipelining round-robin connection allocation
    boolean               _pipeRr          = true;
    int                   _maxConn;
    int                   _reqsPerConn;

    int                   _actualPipeMaxDepth;
    int                   _actualForcedFlushes;
    int                   _actualBufferFlushes;
    int                   _actualAvoidFlushes;
    int                   _actualRetries;
    int                   _actualPipelineRetries;

    // Append these in order to each url
    int                   _suffixIndex;
    String                _urlString;

    boolean               _ipv4            = true;

    boolean               _doLog;

    long                  _totalTime;
    float                 _transTime;

    HttpConnectionManager _jakartaConnMgr;
    HttpClient            _jakartaClient;

    boolean               _useAuth;

    String                _user            = "httptest";
    String                _password        = "httptestpw";
    String                _host            = "repoman";
    String                _domain          = "oaklandsw";

    public class PipelineCallback implements Callback
    {
        int _threadNum;

        public PipelineCallback(int threadNum)
        {
            _threadNum = threadNum;
        }

        public int _responses;

        public void writeRequest(com.oaklandsw.http.HttpURLConnection urlCon,
                                 OutputStream os)
        {
            // Used only for a post request, write the data here
        }

        public void readResponse(com.oaklandsw.http.HttpURLConnection urlCon,
                                 InputStream is)
        {
            try
            {
                processStream(urlCon.getInputStream());

                incrementActualtimes(_threadNum);
            }
            catch (AutomaticHttpRetryException arex)
            {
                System.out.println("Pipeline automatic retry: " + urlCon);
                // The read will be redriven
                return;
            }
            catch (IOException e)
            {
                System.out.println("ERROR - IOException: " + urlCon);
                e.printStackTrace();
            }
        }

        public void error(com.oaklandsw.http.HttpURLConnection urlCon,
                          InputStream is,
                          Exception ex)
        {
            try
            {
                System.err.println("pipeline error method called: "
                    + urlCon
                    + ": "
                    + urlCon.getResponseCode());
            }
            catch (IOException e)
            {
                System.err.println("Error getting response code: " + e);
            }

            ex.printStackTrace();
        }
    }

    public TestPerf()
    {
    }

    public void run(String args[]) throws Exception
    {
        for (int i = 0; i < args.length; i++)
        {
            if (args[i].equalsIgnoreCase("-timesThread"))
            {
                _timesPerThread = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-times"))
            {
                _totalTimes = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-repeat"))
            {
                _repeatTestTimes = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-threads"))
            {
                _threads = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-dump"))
            {
                _dumpAllInterval = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-sun"))
            {
                _implementation = IMP_SUN;
            }
            else if (args[i].equalsIgnoreCase("-jakarta"))
            {
                _implementation = IMP_JAKARTA;
            }
            else if (args[i].equalsIgnoreCase("-ip4"))
            {
                _ipv4 = true;
            }
            else if (args[i].equalsIgnoreCase("-ip6"))
            {
                _ipv4 = false;
            }
            else if (args[i].equalsIgnoreCase("-url"))
            {
                _urlString = args[++i];
            }
            else if (args[i].equalsIgnoreCase("-maxconn"))
            {
                _maxConn = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-pipe"))
            {
                _implementation = IMP_OAKLAND_PIPE;
            }
            else if (args[i].equalsIgnoreCase("-pipedepth"))
            {
                _pipeDepth = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-pipenorr"))
            {
                _pipeRr = false;
            }
            else if (args[i].equalsIgnoreCase("-reqsperconn"))
            {
                _reqsPerConn = Integer.parseInt(args[++i]);
            }
            else if (args[i].equalsIgnoreCase("-log"))
            {
                _doLog = true;
            }
            else if (args[i].equalsIgnoreCase("-nowarmup"))
            {
                _warmUp = false;
            }
            else if (args[i].equalsIgnoreCase("-close"))
            {
                _doClose = true;
            }
            else if (args[i].equalsIgnoreCase("-quiet"))
            {
                _quiet = true;
            }
            else if (args[i].equalsIgnoreCase("-auth"))
            {
                _useAuth = true;
            }
            else if (args[i].equalsIgnoreCase("-help"))
            {
                usage();
                return;
            }
            else if (args[i].equalsIgnoreCase(""))
            {
                // Silently ignore empty args
            }
            else
            {
                System.out.println("Arg: " + args[i] + " ignored");
            }
        }

        run();
        // For profiling
        // Thread.sleep(1000000);
    }

    public void log(String str)
    {
        _log.info(str);
        System.out.println(str);
    }

    public void run() throws Exception
    {
        // LogUtils.logFile("/home/francis/log4jperf1.txt");

        if (_expectedSize >= 0)
        {
            _buffer = new byte[_expectedSize];
        }
        else
        {
            // 1MB
            _buffer = new byte[(int)Math.pow(2, 20)];
        }

        for (int i = 0; i < _repeatTestTimes; i++)
        {
            runSingleTest();
            _repeatIndex++;
        }
    }

    public void runSingleTest() throws Exception
    {
        if (_urlString == null)
        {
            System.out.println("Please specify a URL using -url");
            return;
        }

        if (_timesPerThread != 0
            && _totalTimes != 0
            && _timesPerThread != _totalTimes / _threads)
        {
            System.out
                    .println("Specify either -times or -timesThread, but not both");
            return;
        }

        if (_ipv4)
            System.setProperty("java.net.preferIPv4Stack", "true");
        else
            System.setProperty("java.net.preferIPv6Stack", "true");

        if (_totalTimes != 0)
            _timesPerThread = _totalTimes / _threads;

        com.oaklandsw.http.HttpURLConnection.resetStatistics();
        com.oaklandsw.http.HttpURLConnection.closeAllPooledConnections();

        if (_doLog)
            LogUtils.logAll();
        // LogUtils.logNone();

        // Implementation specific setup
        switch (_implementation)
        {
            case IMP_OAKLAND:
            case IMP_OAKLAND_PIPE:

                // Make sure we don't get stuck for a long time if something
                // hangs
                com.oaklandsw.http.HttpURLConnection
                        .setDefaultRequestTimeout(5000);
                com.oaklandsw.http.HttpURLConnection
                        .setDefaultConnectionTimeout(5000);
                com.oaklandsw.http.HttpURLConnection
                        .setDefaultMaxTries(RETRY_COUNT);

                if (_maxConn > 0)
                {
                    com.oaklandsw.http.HttpURLConnection
                            .setMaxConnectionsPerHost(_maxConn);
                }
                else
                {
                    com.oaklandsw.http.HttpURLConnection
                            .setMaxConnectionsPerHost(com.oaklandsw.http.HttpURLConnection.DEFAULT_MAX_CONNECTIONS);

                }

                if (_reqsPerConn > 0)
                {
                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultConnectionRequestLimit(_reqsPerConn);
                }
                else
                {
                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultConnectionRequestLimit();

                }

                if (_implementation == IMP_OAKLAND_PIPE)
                {
                    if (_pipeDepth != 0)
                    {
                        com.oaklandsw.http.HttpURLConnection
                                .setDefaultPipeliningMaxDepth(_pipeDepth);
                    }
                    else
                    {
                        com.oaklandsw.http.HttpURLConnection
                                .setDefaultPipeliningMaxDepth(0);
                    }

                    int options = com.oaklandsw.http.HttpURLConnection.PIPE_STANDARD_OPTIONS;
                    if (!_pipeRr)
                        options &= ~com.oaklandsw.http.HttpURLConnection.PIPE_MAX_CONNECTIONS;

                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultPipeliningOptions(options);
                }
                else
                {
                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultCallback(null);
                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultPipelining(false);

                }

                if (_useAuth)
                {
                    if (false)
                    {
                        // Jakarta config
                        Ntlm._authMessageLmResponse = Ntlm.V1;
                        Ntlm._authMessageNtResponse = Ntlm.NONE;
                        Ntlm._authMessageFlags = 0x5206;
                    }
                    SampleUserAgent ua = new SampleUserAgent();
                    ua._normalCredential = new NtlmCredential();
                    ua._normalCredential.setUser(_user);
                    ua._normalCredential.setPassword(_password);
                    ua._normalCredential.setDomain(_domain);
                    ua._normalCredential.setHost(_host);
                    com.oaklandsw.http.HttpURLConnection
                            .setDefaultUserAgent(ua);
                    com.oaklandsw.http.HttpURLConnection
                            .setNtlmPreferredEncoding(com.oaklandsw.http.HttpURLConnection.NTLM_ENCODING_OEM);
                }
                break;

            case IMP_SUN:
                // Note this is only respected on JRE 5 and 6
                if (_maxConn > 0)
                {
                    System.setProperty("http.maxConnections", Integer
                            .toString(_maxConn));
                }
                else
                {
                    System.getProperties().remove("http.maxConnections");
                }
                System.setProperty("http.keepAlive", "true");
                break;

            case IMP_JAKARTA:
                HttpConnectionManagerParams params = new HttpConnectionManagerParams();
                if (_maxConn > 0)
                    params.setDefaultMaxConnectionsPerHost(_maxConn);
                // This makes it go faster, but does not match Oakland
                // implementation
                params.setStaleCheckingEnabled(false);
                // Set retry count, and turn on retrying a method that has
                // successfully been sent to match the Oakland implementation
                params
                        .setParameter(HttpMethodParams.RETRY_HANDLER,
                                      new DefaultHttpMethodRetryHandler(RETRY_COUNT,
                                                                        true));
                if (_threads == 1)
                    _jakartaConnMgr = new SimpleHttpConnectionManager();
                else
                    _jakartaConnMgr = new MultiThreadedHttpConnectionManager();
                _jakartaConnMgr.setParams(params);

                _jakartaClient = new HttpClient(_jakartaConnMgr);

                if (_useAuth)
                {
                    Credentials defaultcreds = new NTCredentials(_user,
                                                                 _password,
                                                                 _host,
                                                                 _domain);
                    _jakartaClient.getState().setCredentials(AuthScope.ANY,
                                                             defaultcreds);
                }
                break;
        }

        if (!_quiet)
        {
            log("Start Time: " + _startTime);
            log("Using: " + _impNames[_implementation] + " implementation");
            log("URL: " + _urlString);
            log("Times/thread: " + _timesPerThread);
            log("Threads: " + _threads);
            if (_implementation == IMP_OAKLAND_PIPE)
                log("Pipelining");
            if (_maxConn > 0)
                log("Using max: " + _maxConn + " connections.");
            if (_pipeDepth > 0)
                log("Using pipeline depth: " + _pipeDepth);
            if (_reqsPerConn > 0)
            {
                log("Using max: " + _reqsPerConn + " requests/connection.");
            }
            // log("http.maxConnections:
            // " +
            // System.getProperty("http.maxConnections"));
        }

        _actualTimes = null;

        // Do one to initialize everything but don't count that
        // in the timing
        if (_warmUp)
        {
            runOneThread(1, 0);
        }

        // Start clean
        System.gc();

        _totalActualTimes = 0;

        long startTime = System.currentTimeMillis();
        // log("START time: " + startTime);

        runAllThreads();

        long endTime = System.currentTimeMillis();
        _totalTime = endTime - startTime;
        // log("END/Total time: " + endTime + "/" + _totalTime);

        for (int i = 0; i < _actualTimes.length; i++)
        {
            synchronized (_threadObjects[i])
            {
                _totalActualTimes += _actualTimes[i];
            }
        }

        if (_totalActualTimes != _timesPerThread * _threads)
        {
            log("!!!Expected "
                + (_timesPerThread * _threads)
                + " got: "
                + _totalActualTimes);
            // Make it a high number since the test results are invalid
            _transTime = 999999;
        }
        else
        {
            _transTime = (float)_totalTime / (float)_totalActualTimes;
        }

        _actualPipeMaxDepth = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_PIPELINE_DEPTH_HIGH);
        _actualForcedFlushes = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_FORCED_FLUSHES);
        _actualBufferFlushes = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_BUFFER_FLUSHES);
        _actualAvoidFlushes = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_AVOIDED_FLUSHES);
        _actualPipelineRetries = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_TOTAL_PIPELINE_RETRY);
        _actualRetries = com.oaklandsw.http.HttpURLConnection
                .getConnectionManager()
                .getCount(com.oaklandsw.http.HttpConnectionManager.COUNT_TOTAL_RETRY);

        if (!_quiet)
        {
            if (_implementation == IMP_OAKLAND
                || _implementation == IMP_OAKLAND_PIPE)
            {
                log("connection pool before close");
                log(com.oaklandsw.http.HttpURLConnection.dumpAll());
                if (_doClose)
                {
                    com.oaklandsw.http.HttpURLConnection
                            .closeAllPooledConnections();
                    log("connection pool after close");
                    log(com.oaklandsw.http.HttpURLConnection.dumpAll());
                }
            }

            log("Total Time (ms): "
                + _totalTime
                + " Time/trans (ms): "
                + _transTime);
        }
    }

    static void usage()
    {
        System.out.println("java TestPerf [options]");
        System.out.println("options: ");
        System.out.println("  -url <url> - url to send to");
        System.out
                .println("  -sun - use JRE implementation (oakland is default)");
        System.out
                .println("  -jakarta - use Jakarta implementation (oakland is default)");
        System.out.println("  -pipe - test with pipelining (oakland only)");
        System.out
                .println("  -pipenorr - don't use standard round-robin pipline connection alloc (oakland only)");
        System.out.println("  -pipedepth <depth> - set max pipeline depth");
        System.out
                .println("  -times <num> - total number of times (all threads) (def 1)");
        System.out.println("  -repeat <num> - run test num times (def 1)");
        System.out
                .println("  -timesThread <num> - number of times per thread (def 1)");
        System.out.println("  -threads <num> - number of threads (def 1)");
        System.out.println("  -repeat <num> - run test num times (def 1)");
        System.out.println("  -nowarm - include the initialization in timing");
        System.out.println("  -ip4 - prefer the IPv4 stack (default)");
        System.out.println("  -ip6 - prefer the IPv6 stack");
        System.out.println("  -close - explicitly close all connections");
        System.out.println("  -auth - use authentication");
        System.out.println("  -quiet - don't print anything");
        System.out
                .println("  -dump <num> - dump the statistics at this interval");
    }

    void runOne(PipelineCallback cb, int times, int threadNum) throws Exception
    {
        for (int i = 0; i < times; i++)
        {
            HttpURLConnection urlCon;

            String urlToUse = _urlString;

            URL url;

            char queryStart = '?';
            if (_urlString.indexOf("?") >= 0)
                queryStart = '&';

            switch (_requestType)
            {
                case REQ_NO_QUERY:
                    urlToUse = _urlString;
                    break;
                case REQ_ID:
                    urlToUse = _urlString
                        + queryStart
                        + "rep="
                        + _repeatIndex
                        + "&seq="
                        + i;
                    break;
                case REQ_DELAY:
                    urlToUse = _urlString + queryStart + "delay=5";
                    break;
            }

            url = new URL(urlToUse);

            int responseCode = 0;
            switch (_implementation)
            {
                case IMP_OAKLAND:
                    urlCon = com.oaklandsw.http.HttpURLConnection
                            .openConnection(url);
                    responseCode = urlCon.getResponseCode();
                    processStream(urlCon.getInputStream());
                    incrementActualtimes(threadNum);
                    break;

                case IMP_OAKLAND_PIPE:
                    urlCon = com.oaklandsw.http.HttpURLConnection
                            .openConnection(url);
                    ((com.oaklandsw.http.HttpURLConnection)urlCon)
                            .setCallback(cb);
                    ((com.oaklandsw.http.HttpURLConnection)urlCon)
                            .pipelineExecute();
                    break;

                case IMP_SUN:
                    urlCon = (HttpURLConnection)url.openConnection();
                    responseCode = urlCon.getResponseCode();
                    processStream(urlCon.getInputStream());
                    incrementActualtimes(threadNum);
                    break;

                case IMP_JAKARTA:
                    GetMethod method = new GetMethod(urlToUse);
                    _jakartaClient.executeMethod(method);
                    responseCode = method.getStatusCode();

                    // Read the stream
                    InputStream is = method.getResponseBodyAsStream();
                    processStream(is);
                    method.releaseConnection();
                    incrementActualtimes(threadNum);
                    break;

                default:
                    Util.impossible("Invalid implementation: "
                        + _implementation);
                    // throws
            }

            if (_implementation != IMP_OAKLAND_PIPE && responseCode != 200)
            {
                throw new RuntimeException("Bad response code: " + responseCode);
            }
        }
    }

    // Runs a thread for the test
    void runOneThread(int times, int threadNum)
    {
        try
        {
            PipelineCallback cb = null;
            if (_implementation == IMP_OAKLAND_PIPE)
            {
                cb = new PipelineCallback(threadNum);
            }

            runOne(cb, times, threadNum);

            if (_implementation == IMP_OAKLAND_PIPE)
                com.oaklandsw.http.HttpURLConnection.pipelineBlock();
        }
        catch (Exception ex)
        {
            System.out.println("Thread exception: " + ex);
            ex.printStackTrace();
            _failed = 1;
        }
    }

    void incrementActualtimes(int threadNum)
    {
        if (_actualTimes == null)
            return;
        synchronized (_threadObjects[threadNum])
        {
            _actualTimes[threadNum]++;
            if (_dumpAllInterval > 0
                && (_actualTimes[threadNum] % _dumpAllInterval == 0))
            {
                System.out.println(_actualTimes
                    + " ---------------------------------------");
                System.out.println(com.oaklandsw.http.HttpURLConnection
                        .dumpAll());
            }
        }

    }

    void runAllThreads() throws Exception
    {
        Thread t;

        _failed = 0;

        _actualTimes = new int[_threads];
        _threadObjects = new Thread[_threads];

        List threads = new ArrayList();
        for (int i = 0; i < _threads; i++)
        {
            final int threadNum = i;
            t = new Thread()
            {
                public void run()
                {
                    Thread.currentThread().setName("TestPerf"
                        + _repeatIndex
                        + "_"
                        + threadNum);
                    runOneThread(_timesPerThread, threadNum);
                }
            };

            _threadObjects[threadNum] = t;
            t.start();
            threads.add(t);
        }

        for (int i = 0; i < threads.size(); i++)
        {
            ((Thread)threads.get(i)).join();
        }

        if (_failed > 0)
            System.out.println("One or more threads failed");
    }

    public void processStream(InputStream inputStream) throws IOException
    {
        int totalBytes = 0;
        int nb = 0;
        if (_buffer.length > 0)
        {
            // Since the buffer is exactly the size of the response, we provide
            // the opportunity to read the entire thing with one read
            while (true)
            {
                nb = inputStream.read(_buffer);
                if (nb == -1)
                    break;
                totalBytes += nb;
            }
        }

        if (_expectedSize != -1 && _expectedSize != totalBytes)
        {
            throw new RuntimeException("Expected: "
                + _expectedSize
                + " got: "
                + totalBytes);
        }
        inputStream.close();
    }

    public static void main(String args[]) throws Exception
    {
        TestPerf tp = new TestPerf();
        tp.run(args);
    }

}
