Add an optional multithreading to stream server, and enable multithreading (one thread per client) to the RPC server.

This commit is contained in:
scriptjunkie 2011-11-17 08:05:20 -08:00
parent 8fe435c091
commit c4eb32d2ff
5 changed files with 57 additions and 13 deletions

View File

@ -63,7 +63,8 @@ class Service
self.options[:ssl],
self.options[:context],
self.options[:comm],
self.options[:cert]
self.options[:cert],
true
)
self.service.add_resource(self.uri, {

View File

@ -68,9 +68,13 @@ module StreamServer
self.listener_thread = Rex::ThreadFactory.spawn("StreamServerListener", false) {
monitor_listener
}
self.clients_thread = Rex::ThreadFactory.spawn("StreamServerClientMonitor", false) {
monitor_clients
}
if self.threaded
self.clients_thread = []
else
self.clients_thread = Rex::ThreadFactory.spawn("StreamServerClientMonitor", false) {
monitor_clients
}
end
end
#
@ -78,7 +82,7 @@ module StreamServer
#
def stop
self.listener_thread.kill
self.clients_thread.kill
self.clients_thread.kill unless self.threaded
self.clients.each { |cli|
close_client(cli)
@ -91,6 +95,7 @@ module StreamServer
#
def close_client(client)
if (client)
clients_thread.delete_at(clients.index(client)) if self.threaded
clients.delete(client)
begin
@ -130,7 +135,7 @@ module StreamServer
attr_accessor :on_client_close_proc
attr_accessor :clients # :nodoc:
attr_accessor :listener_thread, :clients_thread # :nodoc:
attr_accessor :listener_thread, :clients_thread, :threaded # :nodoc:
attr_accessor :client_waiter
protected
@ -156,6 +161,12 @@ protected
# Initialize the connection processing
on_client_connect(cli)
if self.threaded # Start thread
self.clients_thread << Rex::ThreadFactory.spawn("StreamServerClientMonitor#{cli.to_s}", false, cli) {
monitor_clients(cli)
}
end
# Notify the client monitor
self.client_waiter.push(cli)
@ -175,16 +186,20 @@ protected
# This method monitors client connections for data and calls the
# +on_client_data+ routine when new data arrives.
#
def monitor_clients
def monitor_clients(cli = nil)
begin
# Wait for a notify if our client list is empty
if (clients.length == 0)
if (not self.threaded and clients.length == 0)
self.client_waiter.pop
next
end
sd = Rex::ThreadSafe.select(clients, nil, nil, nil)
if self.threaded
sd = Rex::ThreadSafe.select([cli])
else
sd = Rex::ThreadSafe.select(clients, nil, nil, nil)
end
sd[0].each { |cfd|
begin
@ -203,6 +218,8 @@ protected
}
rescue ::Rex::StreamClosedError => e
# Remove thread from the list if threaded
clients_thread.delete_at(clients.index(client)) if self.threaded
# Remove the closed stream from the list
clients.delete(e.stream)
rescue ::Interrupt

View File

@ -99,13 +99,14 @@ class Server
# Initializes an HTTP server as listening on the provided port and
# hostname.
#
def initialize(port = 80, listen_host = '0.0.0.0', ssl = false, context = {}, comm = nil, ssl_cert = nil)
def initialize(port = 80, listen_host = '0.0.0.0', ssl = false, context = {}, comm = nil, ssl_cert = nil, threaded = false)
self.listen_host = listen_host
self.listen_port = port
self.ssl = ssl
self.context = context
self.comm = comm
self.ssl_cert = ssl_cert
self.threaded = threaded
self.listener = nil
self.resources = {}
@ -137,7 +138,8 @@ class Server
'Context' => self.context,
'SSL' => self.ssl,
'SSLCert' => self.ssl_cert,
'Comm' => self.comm
'Comm' => self.comm,
'Threaded' => self.threaded
)
# Register callbacks
@ -260,7 +262,7 @@ class Server
end
attr_accessor :listen_port, :listen_host, :server_name, :context, :ssl, :comm, :ssl_cert
attr_accessor :listener, :resources
attr_accessor :listener, :resources, :threaded
protected

View File

@ -673,6 +673,9 @@ module Socket
self.localport = params.localport
self.context = params.context || {}
self.ipv = params.v6 ? 6 : 4
if (self.respond_to?('threaded'))
self.threaded = params.threaded?
end
end
end

View File

@ -97,6 +97,11 @@ class Rex::Socket::Parameters
#
# The number of seconds before a connection should time out
#
# Threaded
#
# Whether the server is to spin off a new thread for each client.
#
def initialize(hash)
if (hash['PeerHost'])
@ -139,6 +144,12 @@ class Rex::Socket::Parameters
self.ssl = false
end
if (hash['Threaded'] and hash['Threaded'].to_s =~ /^(t|y|1)/i)
self.threaded = true
else
self.threaded = false
end
if (hash['SSLVersion'] and hash['SSLVersion'].to_s =~ /^(SSL2|SSL3|TLS1)$/i)
self.ssl_version = hash['SSLVersion']
end
@ -273,6 +284,12 @@ class Rex::Socket::Parameters
return v6
end
#
# Returns true if Multithreading has been enabled
#
def threaded?
return threaded
end
##
#
@ -344,6 +361,10 @@ class Rex::Socket::Parameters
# Whether we should use IPv6
#
attr_accessor :v6
#
# Whether we should be multithreaded
#
attr_accessor :threaded
attr_accessor :proxies