[prev in list] [next in list] [prev in thread] [next in thread]
List: mesos-commits
Subject: [24/35] Renamed 'third_party' to '3rdparty'.
From: benh () apache ! org
Date: 2013-05-29 17:41:02
Message-ID: 165713982672416d975ffde2b02d1c4f () git ! apache ! org
[Download RAW message or body]
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/71a01bd9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
new file mode 100644
index 0000000..86414e5
--- /dev/null
+++ b/3rdparty/libprocess/src/process.cpp
@@ -0,0 +1,3446 @@
+#include <errno.h>
+#include <ev.h>
+#include <limits.h>
+#include <libgen.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <arpa/inet.h>
+
+#include <glog/logging.h>
+
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+#include <algorithm>
+#include <deque>
+#include <fstream>
+#include <iomanip>
+#include <iostream>
+#include <list>
+#include <map>
+#include <queue>
+#include <set>
+#include <sstream>
+#include <stack>
+#include <stdexcept>
+#include <vector>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+
+#include <boost/shared_array.hpp>
+
+#include <process/clock.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/executor.hpp>
+#include <process/filter.hpp>
+#include <process/future.hpp>
+#include <process/gc.hpp>
+#include <process/id.hpp>
+#include <process/io.hpp>
+#include <process/logging.hpp>
+#include <process/mime.hpp>
+#include <process/process.hpp>
+#include <process/profiler.hpp>
+#include <process/socket.hpp>
+#include <process/statistics.hpp>
+#include <process/thread.hpp>
+#include <process/time.hpp>
+#include <process/timer.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/net.hpp>
+#include <stout/os.hpp>
+#include <stout/strings.hpp>
+
+#include "config.hpp"
+#include "decoder.hpp"
+#include "encoder.hpp"
+#include "gate.hpp"
+#include "synchronized.hpp"
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using process::http::BadRequest;
+using process::http::InternalServerError;
+using process::http::NotFound;
+using process::http::OK;
+using process::http::Request;
+using process::http::Response;
+using process::http::ServiceUnavailable;
+
+using std::deque;
+using std::find;
+using std::list;
+using std::map;
+using std::ostream;
+using std::pair;
+using std::queue;
+using std::set;
+using std::stack;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+// Represents a remote "node" (encapsulates IP address and port).
+class Node
+{
+public:
+ Node(uint32_t _ip = 0, uint16_t _port = 0)
+ : ip(_ip), port(_port) {}
+
+ bool operator < (const Node& that) const
+ {
+ if (ip == that.ip) {
+ return port < that.port;
+ } else {
+ return ip < that.ip;
+ }
+ }
+
+ ostream& operator << (ostream& stream) const
+ {
+ stream << ip << ":" << port;
+ return stream;
+ }
+
+ uint32_t ip;
+ uint16_t port;
+};
+
+
+namespace process {
+
+namespace ID {
+
+string generate(const string& prefix)
+{
+ static map<string, int> prefixes;
+ stringstream out;
+ out << __sync_add_and_fetch(&prefixes[prefix], 1);
+ return prefix + "(" + out.str() + ")";
+}
+
+} // namespace ID {
+
+
+namespace http {
+
+hashmap<uint16_t, string> statuses;
+
+} // namespace http {
+
+
+namespace mime {
+
+map<string, string> types;
+
+} // namespace mime {
+
+
+// Provides reference counting semantics for a process pointer.
+class ProcessReference
+{
+public:
+ ProcessReference() : process(NULL) {}
+
+ ~ProcessReference()
+ {
+ cleanup();
+ }
+
+ ProcessReference(const ProcessReference& that)
+ {
+ copy(that);
+ }
+
+ ProcessReference& operator = (const ProcessReference& that)
+ {
+ if (this != &that) {
+ cleanup();
+ copy(that);
+ }
+ return *this;
+ }
+
+ ProcessBase* operator -> ()
+ {
+ return process;
+ }
+
+ operator ProcessBase* ()
+ {
+ return process;
+ }
+
+ operator bool () const
+ {
+ return process != NULL;
+ }
+
+private:
+ friend class ProcessManager; // For ProcessManager::use.
+
+ ProcessReference(ProcessBase* _process)
+ : process(_process)
+ {
+ if (process != NULL) {
+ __sync_fetch_and_add(&(process->refs), 1);
+ }
+ }
+
+ void copy(const ProcessReference& that)
+ {
+ process = that.process;
+
+ if (process != NULL) {
+ // There should be at least one reference to the process, so
+ // we don't need to worry about checking if it's exiting or
+ // not, since we know we can always create another reference.
+ CHECK(process->refs > 0);
+ __sync_fetch_and_add(&(process->refs), 1);
+ }
+ }
+
+ void cleanup()
+ {
+ if (process != NULL) {
+ __sync_fetch_and_sub(&(process->refs), 1);
+ }
+ }
+
+ ProcessBase* process;
+};
+
+
+// Provides a process that manages sending HTTP responses so as to
+// satisfy HTTP/1.1 pipelining. Each request should either enqueue a
+// response, or ask the proxy to handle a future response. The process
+// is responsible for making sure the responses are sent in the same
+// order as the requests. Note that we use a 'Socket' in order to keep
+// the underyling file descriptor from getting closed while there
+// might still be outstanding responses even though the client might
+// have closed the connection (see more discussion in
+// SocketManger::close and SocketManager::proxy).
+class HttpProxy : public Process<HttpProxy>
+{
+public:
+ HttpProxy(const Socket& _socket);
+ virtual ~HttpProxy();
+
+ // Enqueues the response to be sent once all previously enqueued
+ // responses have been processed (e.g., waited for and sent).
+ void enqueue(const Response& response, const Request& request);
+
+ // Enqueues a future to a response that will get waited on (up to
+ // some timeout) and then sent once all previously enqueued
+ // responses have been processed (e.g., waited for and sent).
+ void handle(Future<Response>* future, const Request& request);
+
+private:
+ // Starts "waiting" on the next available future response.
+ void next();
+
+ // Invoked once a future response has been satisfied.
+ void waited(const Future<Response>& future);
+
+ // Demuxes and handles a response.
+ bool process(const Future<Response>& future, const Request& request);
+
+ // Handles stream (i.e., pipe) based responses.
+ void stream(const Future<short>& poll, const Request& request);
+
+ Socket socket; // Wrap the socket to keep it from getting closed.
+
+ // Describes a queue "item" that wraps the future to the response
+ // and the original request.
+ // The original request contains needed information such as what encodings
+ // are acceptable and whether to persist the connection.
+ struct Item
+ {
+ Item(const Request& _request, Future<Response>* _future)
+ : request(_request), future(_future) {}
+
+ ~Item()
+ {
+ delete future;
+ }
+
+ const Request request; // Make a copy.
+ Future<Response>* future;
+ };
+
+ queue<Item*> items;
+
+ Option<int> pipe; // Current pipe, if streaming.
+};
+
+
+class SocketManager
+{
+public:
+ SocketManager();
+ ~SocketManager();
+
+ Socket accepted(int s);
+
+ void link(ProcessBase* process, const UPID& to);
+
+ PID<HttpProxy> proxy(const Socket& socket);
+
+ void send(Encoder* encoder, bool persist);
+ void send(const Response& response,
+ const Request& request,
+ const Socket& socket);
+ void send(Message* message);
+
+ Encoder* next(int s);
+
+ void close(int s);
+
+ void exited(const Node& node);
+ void exited(ProcessBase* process);
+
+private:
+ // Map from UPID (local/remote) to process.
+ map<UPID, set<ProcessBase*> > links;
+
+ // Collection of all actice sockets.
+ map<int, Socket> sockets;
+
+ // Collection of sockets that should be disposed when they are
+ // finished being used (e.g., when there is no more data to send on
+ // them).
+ set<int> dispose;
+
+ // Map from socket to node (ip, port).
+ map<int, Node> nodes;
+
+ // Maps from node (ip, port) to temporary sockets (i.e., they will
+ // get closed once there is no more data to send on them).
+ map<Node, int> temps;
+
+ // Maps from node (ip, port) to persistent sockets (i.e., they will
+ // remain open even if there is no more data to send on them). We
+ // distinguish these from the 'temps' collection so we can tell when
+ // a persistant socket has been lost (and thus generate
+ // ExitedEvents).
+ map<Node, int> persists;
+
+ // Map from socket to outgoing queue.
+ map<int, queue<Encoder*> > outgoing;
+
+ // HTTP proxies.
+ map<int, HttpProxy*> proxies;
+
+ // Protects instance variables.
+ synchronizable(this);
+};
+
+
+class ProcessManager
+{
+public:
+ ProcessManager(const string& delegate);
+ ~ProcessManager();
+
+ ProcessReference use(const UPID& pid);
+
+ bool handle(
+ const Socket& socket,
+ Request* request);
+
+ bool deliver(
+ ProcessBase* receiver,
+ Event* event,
+ ProcessBase* sender = NULL);
+
+ bool deliver(
+ const UPID& to,
+ Event* event,
+ ProcessBase* sender = NULL);
+
+ UPID spawn(ProcessBase* process, bool manage);
+ void resume(ProcessBase* process);
+ void cleanup(ProcessBase* process);
+ void link(ProcessBase* process, const UPID& to);
+ void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
+ bool wait(const UPID& pid);
+
+ void enqueue(ProcessBase* process);
+ ProcessBase* dequeue();
+
+ void settle();
+
+private:
+ // Delegate process name to receive root HTTP requests.
+ const string delegate;
+
+ // Map of all local spawned and running processes.
+ map<string, ProcessBase*> processes;
+ synchronizable(processes);
+
+ // Gates for waiting threads (protected by synchronizable(processes)).
+ map<ProcessBase*, Gate*> gates;
+
+ // Queue of runnable processes (implemented using list).
+ list<ProcessBase*> runq;
+ synchronizable(runq);
+
+ // Number of running processes, to support Clock::settle operation.
+ int running;
+};
+
+
+// Unique id that can be assigned to each process.
+static uint32_t __id__ = 0;
+
+// Local server socket.
+static int __s__ = -1;
+
+// Local IP address.
+static uint32_t __ip__ = 0;
+
+// Local port.
+static uint16_t __port__ = 0;
+
+// Active SocketManager (eventually will probably be thread-local).
+static SocketManager* socket_manager = NULL;
+
+// Active ProcessManager (eventually will probably be thread-local).
+static ProcessManager* process_manager = NULL;
+
+// Event loop.
+static struct ev_loop* loop = NULL;
+
+// Asynchronous watcher for interrupting loop.
+static ev_async async_watcher;
+
+// Watcher for timeouts.
+static ev_timer timeouts_watcher;
+
+// Server watcher for accepting connections.
+static ev_io server_watcher;
+
+// Queue of I/O watchers.
+static queue<ev_io*>* watchers = new queue<ev_io*>();
+static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;
+
+// We store the timers in a map of lists indexed by the timeout of the
+// timer so that we can have two timers that have the same timeout. We
+// exploit that the map is SORTED!
+static map<Time, list<Timer> >* timeouts =
+ new map<Time, list<Timer> >();
+static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+// For supporting Clock::settle(), true if timers have been removed
+// from 'timeouts' but may not have been executed yet. Protected by
+// the timeouts lock. This is only used when the clock is paused.
+static bool pending_timers = false;
+
+// Flag to indicate whether or to update the timer on async interrupt.
+static bool update_timer = false;
+
+// Scheduling gate that threads wait at when there is nothing to run.
+static Gate* gate = new Gate();
+
+// Filter. Synchronized support for using the filterer needs to be
+// recursive incase a filterer wants to do anything fancy (which is
+// possible and likely given that filters will get used for testing).
+static Filter* filterer = NULL;
+static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+
+// Global garbage collector.
+PID<GarbageCollector> gc;
+
+// Per thread process pointer.
+ThreadLocal<ProcessBase>* _process_ = new ThreadLocal<ProcessBase>();
+
+// Per thread executor pointer.
+ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
+
+
+// We namespace the clock related variables to keep them well
+// named. In the future we'll probably want to associate a clock with
+// a specific ProcessManager/SocketManager instance pair, so this will
+// likely change.
+namespace clock {
+
+map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();
+
+Time initial = Time::EPOCH;
+Time current = Time::EPOCH;
+
+bool paused = false;
+
+} // namespace clock {
+
+
+Time Time::EPOCH = Time(Duration::zero());
+
+
+Time Time::MAX = Time(Duration::max());
+
+
+Time Clock::now()
+{
+ return now(__process__);
+}
+
+
+Time Clock::now(ProcessBase* process)
+{
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (process != NULL) {
+ if (clock::currents->count(process) != 0) {
+ return (*clock::currents)[process];
+ } else {
+ return (*clock::currents)[process] = clock::initial;
+ }
+ } else {
+ return clock::current;
+ }
+ }
+ }
+
+ // TODO(benh): Versus ev_now()?
+ double d = ev_time();
+ Try<Time> time = Time::create(d);
+
+ // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
+ // here.
+ if (time.isError()) {
+ LOG(FATAL) << "Failed to create a Time from " << d << ": "
+ << time.error();
+ }
+ return time.get();
+}
+
+
+void Clock::pause()
+{
+ process::initialize(); // To make sure the libev watchers are ready.
+
+ synchronized (timeouts) {
+ if (!clock::paused) {
+ clock::initial = clock::current = now();
+ clock::paused = true;
+ VLOG(2) << "Clock paused at " << clock::initial;
+ }
+ }
+
+ // Note that after pausing the clock an existing libev timer might
+ // still fire (invoking handle_timeout), but since paused == true no
+ // "time" will actually have passed, so no timer will actually fire.
+}
+
+
+bool Clock::paused()
+{
+ return clock::paused;
+}
+
+
+void Clock::resume()
+{
+ process::initialize(); // To make sure the libev watchers are ready.
+
+ synchronized (timeouts) {
+ if (clock::paused) {
+ VLOG(2) << "Clock resumed at " << clock::current;
+ clock::paused = false;
+ clock::currents->clear();
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+}
+
+
+void Clock::advance(const Duration& duration)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ clock::current += duration;
+ VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+ }
+}
+
+
+void Clock::advance(ProcessBase* process, const Duration& duration)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ Time current = now(process);
+ current += duration;
+ (*clock::currents)[process] = current;
+ VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
+ << ") to " << current;
+ }
+ }
+}
+
+
+void Clock::update(const Time& time)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ if (clock::current < time) {
+ clock::current = Time(time);
+ VLOG(2) << "Clock updated to " << clock::current;
+ if (!update_timer) {
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+ }
+ }
+}
+
+
+void Clock::update(ProcessBase* process, const Time& time)
+{
+ synchronized (timeouts) {
+ if (clock::paused) {
+ if (now(process) < time) {
+ VLOG(2) << "Clock of " << process->self() << " updated to " << time;
+ (*clock::currents)[process] = Time(time);
+ }
+ }
+ }
+}
+
+
+void Clock::order(ProcessBase* from, ProcessBase* to)
+{
+ update(to, now(from));
+}
+
+
+void Clock::settle()
+{
+ CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
+ process_manager->settle();
+}
+
+
+static Message* encode(const UPID& from,
+ const UPID& to,
+ const string& name,
+ const string& data = "")
+{
+ Message* message = new Message();
+ message->from = from;
+ message->to = to;
+ message->name = name;
+ message->body = data;
+ return message;
+}
+
+
+static void transport(Message* message, ProcessBase* sender = NULL)
+{
+ if (message->to.ip == __ip__ && message->to.port == __port__) {
+ // Local message.
+ process_manager->deliver(message->to, new MessageEvent(message), sender);
+ } else {
+ // Remote message.
+ socket_manager->send(message);
+ }
+}
+
+
+static bool libprocess(Request* request)
+{
+ return request->method == "POST" &&
+ request->headers.count("User-Agent") > 0 &&
+ request->headers["User-Agent"].find("libprocess/") == 0;
+}
+
+
+static Message* parse(Request* request)
+{
+ // TODO(benh): Do better error handling (to deal with a malformed
+ // libprocess message, malicious or otherwise).
+ const string& agent = request->headers["User-Agent"];
+ const string& identifier = "libprocess/";
+ size_t index = agent.find(identifier);
+ if (index != string::npos) {
+ // Okay, now determine 'from'.
+ const UPID from(agent.substr(index + identifier.size(), agent.size()));
+
+ // Now determine 'to'.
+ index = request->path.find('/', 1);
+ index = index != string::npos ? index - 1 : string::npos;
+ const UPID to(request->path.substr(1, index), __ip__, __port__);
+
+ // And now determine 'name'.
+ index = index != string::npos ? index + 2: request->path.size();
+ const string& name = request->path.substr(index);
+
+ VLOG(2) << "Parsed message name '" << name
+ << "' for " << to << " from " << from;
+
+ Message* message = new Message();
+ message->name = name;
+ message->from = from;
+ message->to = to;
+ message->body = request->body;
+
+ return message;
+ }
+
+ return NULL;
+}
+
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+ synchronized (watchers) {
+ // Start all the new I/O watchers.
+ while (!watchers->empty()) {
+ ev_io* watcher = watchers->front();
+ watchers->pop();
+ ev_io_start(loop, watcher);
+ }
+ }
+
+ synchronized (timeouts) {
+ if (update_timer) {
+ if (!timeouts->empty()) {
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat = (timeouts->begin()->first - Clock::now()).secs();
+
+ if (timeouts_watcher.repeat <= 0) {
+ // Feed the event now!
+ timeouts_watcher.repeat = 0;
+ ev_timer_again(loop, &timeouts_watcher);
+ ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+ } else {
+ // Don't fire the timer if the clock is paused since we
+ // don't want time to advance (instead a call to
+ // clock::advance() will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
+ ev_timer_again(loop, &timeouts_watcher);
+ }
+ }
+
+ update_timer = false;
+ }
+ }
+}
+
+
+void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+{
+ list<Timer> timedout;
+
+ synchronized (timeouts) {
+ Time now = Clock::now();
+
+ VLOG(3) << "Handling timeouts up to " << now;
+
+ foreachkey (const Time& timeout, *timeouts) {
+ if (timeout > now) {
+ break;
+ }
+
+ VLOG(3) << "Have timeout(s) at " << timeout;
+
+ // Record that we have pending timers to execute so the
+ // Clock::settle() operation can wait until we're done.
+ pending_timers = true;
+
+ foreach (const Timer& timer, (*timeouts)[timeout]) {
+ timedout.push_back(timer);
+ }
+ }
+
+ // Now erase the range of timeouts that timed out.
+ timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+
+ // Okay, so the timeout for the next timer should not have fired.
+ CHECK(timeouts->empty() || (timeouts->begin()->first > now));
+
+ // Update the timer as necessary.
+ if (!timeouts->empty()) {
+ // Determine when the next timer should fire.
+ timeouts_watcher.repeat =
+ (timeouts->begin()->first - Clock::now()).secs();
+
+ if (timeouts_watcher.repeat <= 0) {
+ // Feed the event now!
+ timeouts_watcher.repeat = 0;
+ ev_timer_again(loop, &timeouts_watcher);
+ ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
+ } else {
+ // Don't fire the timer if the clock is paused since we don't
+ // want time to advance (instead a call to Clock::advance()
+ // will handle the timer).
+ if (Clock::paused() && timeouts_watcher.repeat > 0) {
+ timeouts_watcher.repeat = 0;
+ }
+
+ ev_timer_again(loop, &timeouts_watcher);
+ }
+ }
+
+ update_timer = false; // Since we might have a queued update_timer.
+ }
+
+ // Update current time of process (if it's present/valid). It might
+ // be necessary to actually add some more synchronization around
+ // this so that, for example, pausing and resuming the clock doesn't
+ // cause some processes to get thier current times updated and
+ // others not. Since ProcessManager::use acquires the 'processes'
+ // lock we had to move this out of the synchronized (timeouts) above
+ // since there was a deadlock with acquring 'processes' then
+ // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
+ // current time may be greater than the timeout if a local message
+ // was received (and happens-before kicks in).
+ if (Clock::paused()) {
+ foreach (const Timer& timer, timedout) {
+ if (ProcessReference process = process_manager->use(timer.creator())) {
+ Clock::update(process, timer.timeout().time());
+ }
+ }
+ }
+
+ // Invoke the timers that timed out (TODO(benh): Do this
+ // asynchronously so that we don't tie up the event thread!).
+ foreach (const Timer& timer, timedout) {
+ timer();
+ }
+
+ // Mark ourselves as done executing the timers since it's now safe
+ // for a call to Clock::settle() to check if there will be any
+ // future timeouts reached.
+ synchronized (timeouts) {
+ pending_timers = false;
+ }
+}
+
+
+void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ DataDecoder* decoder = (DataDecoder*) watcher->data;
+
+ int s = watcher->fd;
+
+ while (true) {
+ const ssize_t size = 80 * 1024;
+ ssize_t length = 0;
+
+ char data[size];
+
+ length = recv(s, data, size, 0);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ break;
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while receiving: " << error;
+ } else {
+ VLOG(1) << "Socket closed while receiving";
+ }
+ socket_manager->close(s);
+ delete decoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ break;
+ } else {
+ CHECK(length > 0);
+
+ // Decode as much of the data as possible into HTTP requests.
+ const deque<Request*>& requests = decoder->decode(data, length);
+
+ if (!requests.empty()) {
+ foreach (Request* request, requests) {
+ process_manager->handle(decoder->socket(), request);
+ }
+ } else if (requests.empty() && decoder->failed()) {
+ VLOG(1) << "Decoder error while receiving";
+ socket_manager->close(s);
+ delete decoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ break;
+ }
+ }
+ }
+}
+
+
+void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ DataEncoder* encoder = (DataEncoder*) watcher->data;
+
+ int s = watcher->fd;
+
+ while (true) {
+ const void* data;
+ size_t size;
+
+ data = encoder->next(&size);
+ CHECK(size > 0);
+
+ ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ encoder->backup(size);
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ encoder->backup(size);
+ break;
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ socket_manager->close(s);
+ delete encoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ break;
+ } else {
+ CHECK(length > 0);
+
+ // Update the encoder with the amount sent.
+ encoder->backup(size - length);
+
+ // See if there is any more of the message to send.
+ if (encoder->remaining() == 0) {
+ delete encoder;
+
+ // Stop this watcher for now.
+ ev_io_stop(loop, watcher);
+
+ // Check for more stuff to send on socket.
+ Encoder* next = socket_manager->next(s);
+ if (next != NULL) {
+ watcher->data = next;
+ ev_io_init(watcher, next->sender(), s, EV_WRITE);
+ ev_io_start(loop, watcher);
+ } else {
+ // Nothing more to send right now, clean up.
+ delete watcher;
+ }
+ break;
+ }
+ }
+ }
+}
+
+
+void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ FileEncoder* encoder = (FileEncoder*) watcher->data;
+
+ int s = watcher->fd;
+
+ while (true) {
+ int fd;
+ off_t offset;
+ size_t size;
+
+ fd = encoder->next(&offset, &size);
+ CHECK(size > 0);
+
+ ssize_t length = sendfile(s, fd, offset, size);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ encoder->backup(size);
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ encoder->backup(size);
+ break;
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ socket_manager->close(s);
+ delete encoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ break;
+ } else {
+ CHECK(length > 0);
+
+ // Update the encoder with the amount sent.
+ encoder->backup(size - length);
+
+ // See if there is any more of the message to send.
+ if (encoder->remaining() == 0) {
+ delete encoder;
+
+ // Stop this watcher for now.
+ ev_io_stop(loop, watcher);
+
+ // Check for more stuff to send on socket.
+ Encoder* next = socket_manager->next(s);
+ if (next != NULL) {
+ watcher->data = next;
+ ev_io_init(watcher, next->sender(), s, EV_WRITE);
+ ev_io_start(loop, watcher);
+ } else {
+ // Nothing more to send right now, clean up.
+ delete watcher;
+ }
+ break;
+ }
+ }
+ }
+}
+
+
+void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ int s = watcher->fd;
+
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ socket_manager->close(s);
+ MessageEncoder* encoder = (MessageEncoder*) watcher->data;
+ delete encoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ } else {
+ // We're connected! Now let's do some sending.
+ ev_io_stop(loop, watcher);
+ ev_io_init(watcher, send_data, s, EV_WRITE);
+ ev_io_start(loop, watcher);
+ }
+}
+
+
+void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ int s = watcher->fd;
+
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ socket_manager->close(s);
+ DataDecoder* decoder = (DataDecoder*) watcher->data;
+ delete decoder;
+ ev_io_stop(loop, watcher);
+ delete watcher;
+ } else {
+ // We're connected! Now let's do some receiving.
+ ev_io_stop(loop, watcher);
+ ev_io_init(watcher, recv_data, s, EV_READ);
+ ev_io_start(loop, watcher);
+ }
+}
+
+
+void accept(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ CHECK_EQ(__s__, watcher->fd);
+
+ sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);
+
+ if (s < 0) {
+ return;
+ }
+
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return;
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return;
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ } else {
+ // Inform the socket manager for proper bookkeeping.
+ const Socket& socket = socket_manager->accepted(s);
+
+ // Allocate and initialize the decoder and watcher.
+ DataDecoder* decoder = new DataDecoder(socket);
+
+ ev_io* watcher = new ev_io();
+ watcher->data = decoder;
+
+ ev_io_init(watcher, recv_data, s, EV_READ);
+ ev_io_start(loop, watcher);
+ }
+}
+
+
+void polled(struct ev_loop* loop, ev_io* watcher, int revents)
+{
+ Promise<short>* promise = (Promise<short>*) watcher->data;
+ promise->set(revents);
+ delete promise;
+
+ ev_io_stop(loop, watcher);
+ delete watcher;
+}
+
+
+void* serve(void* arg)
+{
+ ev_loop(((struct ev_loop*) arg), 0);
+
+ return NULL;
+}
+
+
+void* schedule(void* arg)
+{
+ do {
+ ProcessBase* process = process_manager->dequeue();
+ if (process == NULL) {
+ Gate::state_t old = gate->approach();
+ process = process_manager->dequeue();
+ if (process == NULL) {
+ gate->arrive(old); // Wait at gate if idle.
+ continue;
+ } else {
+ gate->leave();
+ }
+ }
+ process_manager->resume(process);
+ } while (true);
+}
+
+
+// We might find value in catching terminating signals at some point.
+// However, for now, adding signal handlers freely is not allowed
+// because they will clash with Java and Python virtual machines and
+// causes hard to debug crashes/segfaults.
+
+// void sigbad(int signal, struct sigcontext *ctx)
+// {
+// // Pass on the signal (so that a core file is produced).
+// struct sigaction sa;
+// sa.sa_handler = SIG_DFL;
+// sigemptyset(&sa.sa_mask);
+// sa.sa_flags = 0;
+// sigaction(signal, &sa, NULL);
+// raise(signal);
+// }
+
+
+void initialize(const string& delegate)
+{
+ // TODO(benh): Return an error if attempting to initialize again
+ // with a different delegate then originally specified.
+
+ // static pthread_once_t init = PTHREAD_ONCE_INIT;
+ // pthread_once(&init, ...);
+
+ static volatile bool initialized = false;
+ static volatile bool initializing = true;
+
+ // Try and do the initialization or wait for it to complete.
+ if (initialized && !initializing) {
+ return;
+ } else if (initialized && initializing) {
+ while (initializing);
+ return;
+ } else {
+ if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
+ while (initializing);
+ return;
+ }
+ }
+
+// // Install signal handler.
+// struct sigaction sa;
+
+// sa.sa_handler = (void (*) (int)) sigbad;
+// sigemptyset (&sa.sa_mask);
+// sa.sa_flags = SA_RESTART;
+
+// sigaction (SIGTERM, &sa, NULL);
+// sigaction (SIGINT, &sa, NULL);
+// sigaction (SIGQUIT, &sa, NULL);
+// sigaction (SIGSEGV, &sa, NULL);
+// sigaction (SIGILL, &sa, NULL);
+// #ifdef SIGBUS
+// sigaction (SIGBUS, &sa, NULL);
+// #endif
+// #ifdef SIGSTKFLT
+// sigaction (SIGSTKFLT, &sa, NULL);
+// #endif
+// sigaction (SIGABRT, &sa, NULL);
+
+// sigaction (SIGFPE, &sa, NULL);
+
+#ifdef __sun__
+ /* Need to ignore this since we can't do MSG_NOSIGNAL on Solaris. */
+ signal(SIGPIPE, SIG_IGN);
+#endif // __sun__
+
+ // Create a new ProcessManager and SocketManager.
+ process_manager = new ProcessManager(delegate);
+ socket_manager = new SocketManager();
+
+ // Setup processing threads.
+ long cpus = std::max(4L, sysconf(_SC_NPROCESSORS_ONLN));
+
+ for (int i = 0; i < cpus; i++) {
+ pthread_t thread; // For now, not saving handles on our threads.
+ if (pthread_create(&thread, NULL, schedule, NULL) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_create";
+ }
+ }
+
+ __ip__ = 0;
+ __port__ = 0;
+
+ char* value;
+
+ // Check environment for ip.
+ value = getenv("LIBPROCESS_IP");
+ if (value != NULL) {
+ int result = inet_pton(AF_INET, value, &__ip__);
+ if (result == 0) {
+ LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
+ } else if (result < 0) {
+ PLOG(FATAL) << "Failed to initialize, inet_pton";
+ }
+ }
+
+ // Check environment for port.
+ value = getenv("LIBPROCESS_PORT");
+ if (value != NULL) {
+ int result = atoi(value);
+ if (result < 0 || result > USHRT_MAX) {
+ LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
+ }
+ __port__ = result;
+ }
+
+ // Create a "server" socket for communicating with other nodes.
+ if ((__s__ = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ PLOG(FATAL) << "Failed to initialize, socket";
+ }
+
+ // Make socket non-blocking.
+ Try<Nothing> nonblock = os::nonblock(__s__);
+ if (nonblock.isError()) {
+ LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
+ }
+
+ // Set FD_CLOEXEC flag.
+ Try<Nothing> cloexec = os::cloexec(__s__);
+ if (cloexec.isError()) {
+ LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
+ }
+
+ // Allow address reuse.
+ int on = 1;
+ if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
+ }
+
+ // Set up socket.
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_addr.s_addr = __ip__;
+ addr.sin_port = htons(__port__);
+
+ if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
+ PLOG(FATAL) << "Failed to initialize, bind";
+ }
+
+ // Lookup and store assigned ip and assigned port.
+ socklen_t addrlen = sizeof(addr);
+ if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
+ PLOG(FATAL) << "Failed to initialize, getsockname";
+ }
+
+ __ip__ = addr.sin_addr.s_addr;
+ __port__ = ntohs(addr.sin_port);
+
+ // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
+ // actually have a valid external ip address. Note that we need only
+ // one ip address, so that other processes can send and receive and
+ // don't get confused as to whom they are sending to.
+ if (__ip__ == 0 || __ip__ == 2130706433) {
+ char hostname[512];
+
+ if (gethostname(hostname, sizeof(hostname)) < 0) {
+ PLOG(FATAL) << "Ffailed to initialize, gethostname";
+ }
+
+ // Lookup IP address of local hostname.
+ hostent* he;
+
+ if ((he = gethostbyname2(hostname, AF_INET)) == NULL) {
+ PLOG(FATAL) << "Failed to initialize, gethostbyname2";
+ }
+
+ __ip__ = *((uint32_t *) he->h_addr_list[0]);
+ }
+
+ if (listen(__s__, 500000) < 0) {
+ PLOG(FATAL) << "Failed to initialize, listen";
+ }
+
+ // Setup event loop.
+#ifdef __sun__
+ loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
+#else
+ loop = ev_default_loop(EVFLAG_AUTO);
+#endif // __sun__
+
+ ev_async_init(&async_watcher, handle_async);
+ ev_async_start(loop, &async_watcher);
+
+ ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
+ ev_timer_again(loop, &timeouts_watcher);
+
+ ev_io_init(&server_watcher, accept, __s__, EV_READ);
+ ev_io_start(loop, &server_watcher);
+
+// ev_child_init(&child_watcher, child_exited, pid, 0);
+// ev_child_start(loop, &cw);
+
+// /* Install signal handler. */
+// struct sigaction sa;
+
+// sa.sa_handler = ev_sighandler;
+// sigfillset (&sa.sa_mask);
+// sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
+// sigaction (w->signum, &sa, 0);
+
+// sigemptyset (&sa.sa_mask);
+// sigaddset (&sa.sa_mask, w->signum);
+// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
+
+ pthread_t thread; // For now, not saving handles on our threads.
+ if (pthread_create(&thread, NULL, serve, loop) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_create";
+ }
+
+ // Need to set initialzing here so that we can actually invoke
+ // 'spawn' below for the garbage collector.
+ initializing = false;
+
+ // TODO(benh): Make sure creating the garbage collector, logging
+ // process, and profiler always succeeds and use supervisors to make
+ // sure that none terminate.
+
+ // Create global garbage collector process.
+ gc = spawn(new GarbageCollector());
+
+ // Create the global logging process.
+ spawn(new Logging(), true);
+
+ // Create the global profiler process.
+ spawn(new Profiler(), true);
+
+ // Create the global statistics.
+ // TODO(bmahler): Investigate memory implications of this window
+ // size. We may also want to provide a maximum memory size rather than
+ // time window. Or, offload older data to disk, etc.
+ process::statistics = new Statistics(Weeks(2));
+
+ // Initialize the mime types.
+ mime::initialize();
+
+ // Initialize the response statuses.
+ http::initialize();
+
+ char temp[INET_ADDRSTRLEN];
+ if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
+ PLOG(FATAL) << "Failed to initialize, inet_ntop";
+ }
+
+ VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
+ << " for " << cpus << " cpus";
+}
+
+
+uint32_t ip()
+{
+ process::initialize();
+ return __ip__;
+}
+
+
+uint16_t port()
+{
+ process::initialize();
+ return __port__;
+}
+
+
+HttpProxy::HttpProxy(const Socket& _socket)
+ : ProcessBase(ID::generate("__http__")),
+ socket(_socket) {}
+
+
+HttpProxy::~HttpProxy()
+{
+ // Need to make sure response producers know not to continue to
+ // create a response (streaming or otherwise).
+ if (pipe.isSome()) {
+ os::close(pipe.get());
+ }
+ pipe = None();
+
+ while (!items.empty()) {
+ Item* item = items.front();
+
+ // Attempt to discard the future.
+ item->future->discard();
+
+ // But it might have already been ready ...
+ if (item->future->isReady()) {
+ const Response& response = item->future->get();
+ if (response.type == Response::PIPE) {
+ os::close(response.pipe);
+ }
+ }
+
+ items.pop();
+ delete item;
+ }
+}
+
+
+void HttpProxy::enqueue(const Response& response, const Request& request)
+{
+ handle(new Future<Response>(response), request);
+}
+
+
+void HttpProxy::handle(Future<Response>* future, const Request& request)
+{
+ items.push(new Item(request, future));
+
+ if (items.size() == 1) {
+ next();
+ }
+}
+
+
+void HttpProxy::next()
+{
+ if (items.size() > 0) {
+ // Wait for any transition of the future.
+ items.front()->future->onAny(
+ defer(self(), &HttpProxy::waited, lambda::_1));
+ }
+}
+
+
+void HttpProxy::waited(const Future<Response>& future)
+{
+ CHECK(items.size() > 0);
+ Item* item = items.front();
+
+ CHECK(future == *item->future);
+
+ // Process the item and determine if we're done or not (so we know
+ // whether to start waiting on the next responses).
+ bool processed = process(*item->future, item->request);
+
+ items.pop();
+ delete item;
+
+ if (processed) {
+ next();
+ }
+}
+
+
+bool HttpProxy::process(const Future<Response>& future, const Request& request)
+{
+ if (!future.isReady()) {
+ // TODO(benh): Consider handling other "states" of future
+ // (discarded, failed, etc) with different HTTP statuses.
+ socket_manager->send(ServiceUnavailable(), request, socket);
+ return true; // All done, can process next response.
+ }
+
+ Response response = future.get();
+
+ // If the response specifies a path, try and perform a sendfile.
+ if (response.type == Response::PATH) {
+ // Make sure no body is sent (this is really an error and
+ // should be reported and no response sent.
+ response.body.clear();
+
+ const string& path = response.path;
+ int fd = open(path.c_str(), O_RDONLY);
+ if (fd < 0) {
+ if (errno == ENOENT || errno == ENOTDIR) {
+ VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
+ socket_manager->send(NotFound(), request, socket);
+ } else {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to send file at '" << path << "': " << error;
+ socket_manager->send(InternalServerError(), request, socket);
+ }
+ } else {
+ struct stat s; // Need 'struct' because of function named 'stat'.
+ if (fstat(fd, &s) != 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to send file at '" << path << "': " << error;
+ socket_manager->send(InternalServerError(), request, socket);
+ } else if (S_ISDIR(s.st_mode)) {
+ VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
+ socket_manager->send(NotFound(), request, socket);
+ } else {
+ // While the user is expected to properly set a 'Content-Type'
+ // header, we fill in (or overwrite) 'Content-Length' header.
+ stringstream out;
+ out << s.st_size;
+ response.headers["Content-Length"] = out.str();
+
+ if (s.st_size == 0) {
+ socket_manager->send(response, request, socket);
+ return true; // All done, can process next request.
+ }
+
+ VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size;
+
+ // TODO(benh): Consider a way to have the socket manager turn
+ // on TCP_CORK for both sends and then turn it off.
+ socket_manager->send(
+ new HttpResponseEncoder(socket, response, request),
+ true);
+
+ // Note the file descriptor gets closed by FileEncoder.
+ socket_manager->send(
+ new FileEncoder(socket, fd, s.st_size),
+ request.keepAlive);
+ }
+ }
+ } else if (response.type == Response::PIPE) {
+ // Make sure no body is sent (this is really an error and
+ // should be reported and no response sent.
+ response.body.clear();
+
+ // Make sure the pipe is nonblocking.
+ Try<Nothing> nonblock = os::nonblock(response.pipe);
+ if (nonblock.isError()) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed make pipe nonblocking: " << error;
+ socket_manager->send(InternalServerError(), request, socket);
+ return true; // All done, can process next response.
+ }
+
+ // While the user is expected to properly set a 'Content-Type'
+ // header, we fill in (or overwrite) 'Transfer-Encoding' header.
+ response.headers["Transfer-Encoding"] = "chunked";
+
+ VLOG(1) << "Starting \"chunked\" streaming";
+
+ socket_manager->send(
+ new HttpResponseEncoder(socket, response, request),
+ true);
+
+ pipe = response.pipe;
+
+ io::poll(pipe.get(), io::READ).onAny(
+ defer(self(), &Self::stream, lambda::_1, request));
+
+ return false; // Streaming, don't process next response (yet)!
+ } else {
+ socket_manager->send(response, request, socket);
+ }
+
+ return true; // All done, can process next response.
+}
+
+
+void HttpProxy::stream(const Future<short>& poll, const Request& request)
+{
+ // TODO(benh): Use 'splice' on Linux.
+
+ CHECK(pipe.isSome());
+
+ bool finished = false; // Whether we're done streaming.
+
+ if (poll.isReady()) {
+ // Read and write.
+ CHECK(poll.get() == io::READ);
+ const size_t size = 4 * 1024; // 4K.
+ char data[size];
+ while (!finished) {
+ ssize_t length = ::read(pipe.get(), data, size);
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ io::poll(pipe.get(), io::READ).onAny(
+ defer(self(), &Self::stream, lambda::_1, request));
+ break;
+ } else {
+ std::ostringstream out;
+ if (length <= 0) {
+ // Error or closed, treat both as closed.
+ if (length < 0) {
+ // Error.
+ const char* error = strerror(errno);
+ VLOG(1) << "Read error while streaming: " << error;
+ }
+ out << "0\r\n" << "\r\n";
+ finished = true;
+ } else {
+ // Data!
+ out << std::hex << length << "\r\n";
+ out.write(data, length);
+ out << "\r\n";
+ }
+
+ // We always persist the connection when we're not finished
+ // streaming.
+ socket_manager->send(
+ new DataEncoder(socket, out.str()),
+ finished ? request.keepAlive : true);
+ }
+ }
+ } else if (poll.isFailed()) {
+ VLOG(1) << "Failed to poll: " << poll.failure();
+ socket_manager->send(InternalServerError(), request, socket);
+ finished = true;
+ } else {
+ VLOG(1) << "Unexpected discarded future while polling";
+ socket_manager->send(InternalServerError(), request, socket);
+ finished = true;
+ }
+
+ if (finished) {
+ os::close(pipe.get());
+ pipe = None();
+ next();
+ }
+}
+
+
+SocketManager::SocketManager()
+{
+ synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+}
+
+
+SocketManager::~SocketManager() {}
+
+
+Socket SocketManager::accepted(int s)
+{
+ synchronized (this) {
+ return sockets[s] = Socket(s);
+ }
+}
+
+
+void SocketManager::link(ProcessBase* process, const UPID& to)
+{
+ // TODO(benh): The semantics we want to support for link are such
+ // that if there is nobody to link to (local or remote) then an
+ // ExitedEvent gets generated. This functionality has only been
+ // implemented when the link is local, not remote. Of course, if
+ // there is nobody listening on the remote side, then this should
+ // work remotely ... but if there is someone listening remotely just
+ // not at that id, then it will silently continue executing.
+
+ CHECK(process != NULL);
+
+ Node node(to.ip, to.port);
+
+ synchronized (this) {
+ // Check if node is remote and there isn't a persistant link.
+ if ((node.ip != __ip__ || node.port != __port__)
+ && persists.count(node) == 0) {
+ // Okay, no link, lets create a socket.
+ int s;
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ PLOG(FATAL) << "Failed to link, socket";
+ }
+
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
+ }
+
+ sockets[s] = Socket(s);
+ nodes[s] = node;
+
+ persists[node] = s;
+
+ // Allocate and initialize the decoder and watcher (we really
+ // only "receive" on this socket so that we can react when it
+ // gets closed and generate appropriate lost events).
+ DataDecoder* decoder = new DataDecoder(sockets[s]);
+
+ ev_io* watcher = new ev_io();
+ watcher->data = decoder;
+
+ // Try and connect to the node using this socket.
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_port = htons(to.port);
+ addr.sin_addr.s_addr = to.ip;
+
+ if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
+ if (errno != EINPROGRESS) {
+ PLOG(FATAL) << "Failed to link, connect";
+ }
+
+ // Wait for socket to be connected.
+ ev_io_init(watcher, receiving_connect, s, EV_WRITE);
+ } else {
+ ev_io_init(watcher, recv_data, s, EV_READ);
+ }
+
+ // Enqueue the watcher.
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
+
+ // Interrupt the loop.
+ ev_async_send(loop, &async_watcher);
+ }
+
+ links[to].insert(process);
+ }
+}
+
+
+PID<HttpProxy> SocketManager::proxy(const Socket& socket)
+{
+ HttpProxy* proxy = NULL;
+
+ synchronized (this) {
+ // This socket might have been asked to get closed (e.g., remote
+ // side hang up) while a process is attempting to handle an HTTP
+ // request. Thus, if there is no more socket, return an empty PID.
+ if (sockets.count(socket) > 0) {
+ if (proxies.count(socket) > 0) {
+ return proxies[socket]->self();
+ } else {
+ proxy = new HttpProxy(sockets[socket]);
+ proxies[socket] = proxy;
+ }
+ }
+ }
+
+ // Now check if we need to spawn a newly created proxy. Note that we
+ // need to do this outside of the synchronized block above to avoid
+ // a possible deadlock (because spawn eventually synchronizes on
+ // ProcessManager and ProcessManager::cleanup synchronizes on
+ // ProcessManager and then SocketManager, so a deadlock results if
+ // we do spawn within the synchronized block above).
+ if (proxy != NULL) {
+ return spawn(proxy, true);
+ }
+
+ return PID<HttpProxy>();
+}
+
+
+void SocketManager::send(Encoder* encoder, bool persist)
+{
+ CHECK(encoder != NULL);
+
+ synchronized (this) {
+ if (sockets.count(encoder->socket()) > 0) {
+ // Update whether or not this socket should get disposed after
+ // there is no more data to send.
+ if (!persist) {
+ dispose.insert(encoder->socket());
+ }
+
+ if (outgoing.count(encoder->socket()) > 0) {
+ outgoing[encoder->socket()].push(encoder);
+ } else {
+ // Initialize the outgoing queue.
+ outgoing[encoder->socket()];
+
+ // Allocate and initialize the watcher.
+ ev_io* watcher = new ev_io();
+ watcher->data = encoder;
+
+ ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);
+
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
+
+ ev_async_send(loop, &async_watcher);
+ }
+ } else {
+ VLOG(1) << "Attempting to send on a no longer valid socket!";
+ delete encoder;
+ }
+ }
+}
+
+
+void SocketManager::send(
+ const Response& response,
+ const Request& request,
+ const Socket& socket)
+{
+ bool persist = request.keepAlive;
+
+ // Don't persist the connection if the headers include
+ // 'Connection: close'.
+ if (response.headers.contains("Connection")) {
+ if (response.headers.get("Connection").get() == "close") {
+ persist = false;
+ }
+ }
+
+ send(new HttpResponseEncoder(socket, response, request), persist);
+}
+
+
+void SocketManager::send(Message* message)
+{
+ CHECK(message != NULL);
+
+ Node node(message->to.ip, message->to.port);
+
+ synchronized (this) {
+ // Check if there is already a socket.
+ bool persist = persists.count(node) > 0;
+ bool temp = temps.count(node) > 0;
+ if (persist || temp) {
+ int s = persist ? persists[node] : temps[node];
+ CHECK(sockets.count(s) > 0);
+ send(new MessageEncoder(sockets[s], message), persist);
+ } else {
+ // No peristant or temporary socket to the node currently
+ // exists, so we create a temporary one.
+ int s;
+
+ if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ PLOG(FATAL) << "Failed to send, socket";
+ }
+
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
+ }
+
+ sockets[s] = Socket(s);
+ nodes[s] = node;
+ temps[node] = s;
+
+ dispose.insert(s);
+
+ // Initialize the outgoing queue.
+ outgoing[s];
+
+ // Allocate and initialize the watcher.
+ ev_io* watcher = new ev_io();
+ watcher->data = new MessageEncoder(sockets[s], message);
+
+ // Try and connect to the node using this socket.
+ sockaddr_in addr;
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = PF_INET;
+ addr.sin_port = htons(message->to.port);
+ addr.sin_addr.s_addr = message->to.ip;
+
+ if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
+ if (errno != EINPROGRESS) {
+ PLOG(FATAL) << "Failed to send, connect";
+ }
+
+ // Initialize watcher for connecting.
+ ev_io_init(watcher, sending_connect, s, EV_WRITE);
+ } else {
+ // Initialize watcher for sending.
+ ev_io_init(watcher, send_data, s, EV_WRITE);
+ }
+
+ // Enqueue the watcher.
+ synchronized (watchers) {
+ watchers->push(watcher);
+ }
+
+ ev_async_send(loop, &async_watcher);
+ }
+ }
+}
+
+
+Encoder* SocketManager::next(int s)
+{
+ HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
+
+ synchronized (this) {
+ // We cannot assume 'sockets.count(s) > 0' here because it's
+ // possible that 's' has been removed with a a call to
+ // SocketManager::close. For example, it could be the case that a
+ // socket has gone to CLOSE_WAIT and the call to 'recv' in
+ // recv_data returned 0 causing SocketManager::close to get
+ // invoked. Later a call to 'send' or 'sendfile' (e.g., in
+ // send_data or send_file) can "succeed" (because the socket is
+ // not "closed" yet because there are still some Socket
+ // references, namely the reference being used in send_data or
+ // send_file!). However, when SocketManger::next is actually
+ // invoked we find out there there is no more data and thus stop
+ // sending.
+ // TODO(benh): Should we actually finish sending the data!?
+ if (sockets.count(s) > 0) {
+ CHECK(outgoing.count(s) > 0);
+
+ if (!outgoing[s].empty()) {
+ // More messages!
+ Encoder* encoder = outgoing[s].front();
+ outgoing[s].pop();
+ return encoder;
+ } else {
+ // No more messages ... erase the outgoing queue.
+ outgoing.erase(s);
+
+ if (dispose.count(s) > 0) {
+ // This is either a temporary socket we created or it's a
+ // socket that we were receiving data from and possibly
+ // sending HTTP responses back on. Clean up either way.
+ if (nodes.count(s) > 0) {
+ const Node& node = nodes[s];
+ CHECK(temps.count(node) > 0 && temps[node] == s);
+ temps.erase(node);
+ nodes.erase(s);
+ }
+
+ if (proxies.count(s) > 0) {
+ proxy = proxies[s];
+ proxies.erase(s);
+ }
+
+ dispose.erase(s);
+ sockets.erase(s);
+
+ // We don't actually close the socket (we wait for the Socket
+ // abstraction to close it once there are no more references),
+ // but we do shutdown the receiving end so any DataDecoder
+ // will get cleaned up (which might have the last reference).
+ shutdown(s, SHUT_RD);
+ }
+ }
+ }
+ }
+
+ // We terminate the proxy outside the synchronized block to avoid
+ // possible deadlock between the ProcessManager and SocketManager
+ // (see comment in SocketManager::proxy for more information).
+ if (proxy != NULL) {
+ terminate(proxy);
+ }
+
+ return NULL;
+}
+
+
+void SocketManager::close(int s)
+{
+ HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
+
+ synchronized (this) {
+ // This socket might not be active if it was already asked to get
+ // closed (e.g., a write on the socket failed so we try and close
+ // it and then later the read side of the socket gets closed so we
+ // try and close it again). Thus, ignore the request if we don't
+ // know about the socket.
+ if (sockets.count(s) > 0) {
+ // Clean up any remaining encoders for this socket.
+ if (outgoing.count(s) > 0) {
+ while (!outgoing[s].empty()) {
+ Encoder* encoder = outgoing[s].front();
+ delete encoder;
+ outgoing[s].pop();
+ }
+
+ outgoing.erase(s);
+ }
+
+ // Clean up after sockets used for node communication.
+ if (nodes.count(s) > 0) {
+ const Node& node = nodes[s];
+
+ // Don't bother invoking exited unless socket was persistant.
+ if (persists.count(node) > 0 && persists[node] == s) {
+ persists.erase(node);
+ exited(node); // Generate ExitedEvent(s)!
+ } else if (temps.count(node) > 0 && temps[node] == s) {
+ temps.erase(node);
+ }
+
+ nodes.erase(s);
+ }
+
+ // Clean up any proxy associated with this socket.
+ if (proxies.count(s) > 0) {
+ proxy = proxies[s];
+ proxies.erase(s);
+ }
+
+ dispose.erase(s);
+ sockets.erase(s);
+ }
+ }
+
+ // We terminate the proxy outside the synchronized block to avoid
+ // possible deadlock between the ProcessManager and SocketManager.
+ if (proxy != NULL) {
+ terminate(proxy);
+ }
+
+ // Note that we don't actually:
+ //
+ // close(s);
+ //
+ // Because, for example, there could be a race between an HttpProxy
+ // trying to do send a response with SocketManager::send() or a
+ // process might be responding to another Request (e.g., trying
+ // to do a sendfile) since these things may be happening
+ // asynchronously we can't close the socket yet, because it might
+ // get reused before any of the above things have finished, and then
+ // we'll end up sending data on the wrong socket! Instead, we rely
+ // on the last reference of our Socket object to close the
+ // socket. Note, however, that since socket is no longer in
+ // 'sockets' any attempt to send with it will just get ignored.
+}
+
+
+void SocketManager::exited(const Node& node)
+{
+ // TODO(benh): It would be cleaner if this routine could call back
+ // into ProcessManager ... then we wouldn't have to convince
+ // ourselves that the accesses to each Process object will always be
+ // valid.
+ synchronized (this) {
+ list<UPID> removed;
+ // Look up all linked processes.
+ foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
+ if (linkee.ip == node.ip && linkee.port == node.port) {
+ foreach (ProcessBase* linker, processes) {
+ linker->enqueue(new ExitedEvent(linkee));
+ }
+ removed.push_back(linkee);
+ }
+ }
+
+ foreach (const UPID& pid, removed) {
+ links.erase(pid);
+ }
+ }
+}
+
+
+void SocketManager::exited(ProcessBase* process)
+{
+ // An exited event is enough to cause the process to get deleted
+ // (e.g., by the garbage collector), which means we can't
+ // dereference process (or even use the address) after we enqueue at
+ // least one exited event. Thus, we save the process pid.
+ const UPID pid = process->pid;
+
+ // Likewise, we need to save the current time of the process so we
+ // can update the clocks of linked processes as appropriate.
+ const Time time = Clock::now(process);
+
+ synchronized (this) {
+ // Iterate through the links, removing any links the process might
+ // have had and creating exited events for any linked processes.
+ foreachpair (const UPID& linkee, set<ProcessBase*>& processes, links) {
+ processes.erase(process);
+
+ if (linkee == pid) {
+ foreach (ProcessBase* linker, processes) {
+ CHECK(linker != process) << "Process linked with itself";
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ Clock::update(linker, time);
+ }
+ }
+ linker->enqueue(new ExitedEvent(linkee));
+ }
+ }
+ }
+
+ links.erase(pid);
+ }
+}
+
+
+ProcessManager::ProcessManager(const string& _delegate)
+ : delegate(_delegate)
+{
+ synchronizer(processes) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+ synchronizer(runq) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+ running = 0;
+ __sync_synchronize(); // Ensure write to 'running' visible in other threads.
+}
+
+
+ProcessManager::~ProcessManager() {}
+
+
+ProcessReference ProcessManager::use(const UPID& pid)
+{
+ if (pid.ip == __ip__ && pid.port == __port__) {
+ synchronized (processes) {
+ if (processes.count(pid.id) > 0) {
+ // Note that the ProcessReference constructor _must_ get
+ // called while holding the lock on processes so that waiting
+ // for references is atomic (i.e., race free).
+ return ProcessReference(processes[pid.id]);
+ }
+ }
+ }
+
+ return ProcessReference(NULL);
+}
+
+
+bool ProcessManager::handle(
+ const Socket& socket,
+ Request* request)
+{
+ CHECK(request != NULL);
+
+ // Check if this is a libprocess request (i.e., 'User-Agent:
+ // libprocess/id@ip:port') and if so, parse as a message.
+ if (libprocess(request)) {
+ Message* message = parse(request);
+ if (message != NULL) {
+ delete request;
+ // TODO(benh): Use the sender PID in order to capture
+ // happens-before timing relationships for testing.
+ return deliver(message->to, new MessageEvent(message));
+ }
+
+ VLOG(1) << "Failed to handle libprocess request: "
+ << request->method << " " << request->path
+ << " (User-Agent: " << request->headers["User-Agent"] << ")";
+
+ delete request;
+ return false;
+ }
+
+ // Treat this as an HTTP request. Start by checking that the path
+ // starts with a '/' (since the code below assumes as much).
+ if (request->path.find('/') != 0) {
+ VLOG(1) << "Returning '400 Bad Request' for '" << request->path << "'";
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, BadRequest(), *request);
+
+ // Cleanup request.
+ delete request;
+ return false;
+ }
+
+ // Ignore requests with relative paths (i.e., contain "/..").
+ if (request->path.find("/..") != string::npos) {
+ VLOG(1) << "Returning '404 Not Found' for '" << request->path
+ << "' (ignoring requests with relative paths)";
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
+
+ // Cleanup request.
+ delete request;
+ return false;
+ }
+
+ // Split the path by '/'.
+ vector<string> tokens = strings::tokenize(request->path, "/");
+
+ // Try and determine a receiver, otherwise try and delegate.
+ ProcessReference receiver;
+
+ if (tokens.size() == 0 && delegate != "") {
+ request->path = "/" + delegate;
+ receiver = use(UPID(delegate, __ip__, __port__));
+ } else if (tokens.size() > 0) {
+ receiver = use(UPID(tokens[0], __ip__, __port__));
+ }
+
+ if (!receiver && delegate != "") {
+ // Try and delegate the request.
+ request->path = "/" + delegate + request->path;
+ receiver = use(UPID(delegate, __ip__, __port__));
+ }
+
+ if (receiver) {
+ // TODO(benh): Use the sender PID in order to capture
+ // happens-before timing relationships for testing.
+ return deliver(receiver, new HttpEvent(socket, request));
+ }
+
+ // This has no receiver, send error response.
+ VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
+
+ // Cleanup request.
+ delete request;
+ return false;
+}
+
+
+bool ProcessManager::deliver(
+ ProcessBase* receiver,
+ Event* event,
+ ProcessBase* sender)
+{
+ CHECK(event != NULL);
+
+ // If we are using a manual clock then update the current time of
+ // the receiver using the sender if necessary to preserve the
+ // happens-before relationship between the sender and receiver. Note
+ // that the assumption is that the sender remains valid for at least
+ // the duration of this routine (so that we can look up it's current
+ // time).
+ if (Clock::paused()) {
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (sender != NULL) {
+ Clock::order(sender, receiver);
+ } else {
+ Clock::update(receiver, Clock::now());
+ }
+ }
+ }
+ }
+
+ receiver->enqueue(event);
+
+ return true;
+}
+
+bool ProcessManager::deliver(
+ const UPID& to,
+ Event* event,
+ ProcessBase* sender)
+{
+ CHECK(event != NULL);
+
+ if (ProcessReference receiver = use(to)) {
+ return deliver(receiver, event, sender);
+ }
+
+ delete event;
+ return false;
+}
+
+
+UPID ProcessManager::spawn(ProcessBase* process, bool manage)
+{
+ CHECK(process != NULL);
+
+ synchronized (processes) {
+ if (processes.count(process->pid.id) > 0) {
+ return UPID();
+ } else {
+ processes[process->pid.id] = process;
+ }
+ }
+
+ // Use the garbage collector if requested.
+ if (manage) {
+ dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
+ }
+
+ // We save the PID before enqueueing the process to avoid the race
+ // condition that occurs when a user has a very short process and
+ // the process gets run and cleaned up before we return from enqueue
+ // (e.g., when 'manage' is set to true).
+ UPID pid = process->self();
+
+ // Add process to the run queue (so 'initialize' will get invoked).
+ enqueue(process);
+
+ VLOG(2) << "Spawned process " << pid;
+
+ return pid;
+}
+
+
+void ProcessManager::resume(ProcessBase* process)
+{
+ __process__ = process;
+
+ VLOG(2) << "Resuming " << process->pid << " at " << Clock::now();
+
+ bool terminate = false;
+ bool blocked = false;
+
+ CHECK(process->state == ProcessBase::BOTTOM ||
+ process->state == ProcessBase::READY);
+
+ if (process->state == ProcessBase::BOTTOM) {
+ process->state = ProcessBase::RUNNING;
+ try { process->initialize(); }
+ catch (...) { terminate = true; }
+ }
+
+ while (!terminate && !blocked) {
+ Event* event = NULL;
+
+ process->lock();
+ {
+ if (process->events.size() > 0) {
+ event = process->events.front();
+ process->events.pop_front();
+ process->state = ProcessBase::RUNNING;
+ } else {
+ process->state = ProcessBase::BLOCKED;
+ blocked = true;
+ }
+ }
+ process->unlock();
+
+ if (!blocked) {
+ CHECK(event != NULL);
+
+ // Determine if we should filter this event.
+ synchronized (filterer) {
+ if (filterer != NULL) {
+ bool filter = false;
+ struct FilterVisitor : EventVisitor
+ {
+ FilterVisitor(bool* _filter) : filter(_filter) {}
+
+ virtual void visit(const MessageEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const DispatchEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const HttpEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ virtual void visit(const ExitedEvent& event)
+ {
+ *filter = filterer->filter(event);
+ }
+
+ bool* filter;
+ } visitor(&filter);
+
+ event->visit(&visitor);
+
+ if (filter) {
+ delete event;
+ continue; // Try and execute the next event.
+ }
+ }
+ }
+
+ // Determine if we should terminate.
+ terminate = event->is<TerminateEvent>();
+
+ // Now service the event.
+ try {
+ process->serve(*event);
+ } catch (const std::exception& e) {
+ std::cerr << "libprocess: " << process->pid
+ << " terminating due to "
+ << e.what() << std::endl;
+ terminate = true;
+ } catch (...) {
+ std::cerr << "libprocess: " << process->pid
+ << " terminating due to unknown exception" << std::endl;
+ terminate = true;
+ }
+
+ delete event;
+
+ if (terminate) {
+ cleanup(process);
+ }
+ }
+ }
+
+ __process__ = NULL;
+
+ CHECK_GE(running, 1);
+ __sync_fetch_and_sub(&running, 1);
+}
+
+
+void ProcessManager::cleanup(ProcessBase* process)
+{
+ VLOG(2) << "Cleaning up " << process->pid;
+
+ // First, set the terminating state so no more events will get
+ // enqueued and delete al the pending events. We want to delete the
+ // events before we hold the processes lock because deleting an
+ // event could cause code outside libprocess to get executed which
+ // might cause a deadlock with the processes lock. Likewise,
+ // deleting the events now rather than later has the nice property
+ // of making sure that any events that might have gotten enqueued on
+ // the process we are cleaning up will get dropped (since it's
+ // terminating) and eliminates the potential of enqueueing them on
+ // another process that gets spawned with the same PID.
+ deque<Event*> events;
+
+ process->lock();
+ {
+ process->state = ProcessBase::TERMINATING;
+ events = process->events;
+ process->events.clear();
+ }
+ process->unlock();
+
+ // Delete pending events.
+ while (!events.empty()) {
+ Event* event = events.front();
+ events.pop_front();
+ delete event;
+ }
+
+ // Possible gate non-libprocess threads are waiting at.
+ Gate* gate = NULL;
+
+ // Remove process.
+ synchronized (processes) {
+ // Wait for all process references to get cleaned up.
+ while (process->refs > 0) {
+ asm ("pause");
+ __sync_synchronize();
+ }
+
+ process->lock();
+ {
+ CHECK(process->events.empty());
+
+ processes.erase(process->pid.id);
+
+ // Lookup gate to wake up waiting threads.
+ map<ProcessBase*, Gate*>::iterator it = gates.find(process);
+ if (it != gates.end()) {
+ gate = it->second;
+ // N.B. The last thread that leaves the gate also free's it.
+ gates.erase(it);
+ }
+
+ CHECK(process->refs == 0);
+ process->state = ProcessBase::TERMINATED;
+ }
+ process->unlock();
+
+ // Note that we don't remove the process from the clock during
+ // cleanup, but rather the clock is reset for a process when it is
+ // created (see ProcessBase::ProcessBase). We do this so that
+ // SocketManager::exited can access the current time of the
+ // process to "order" exited events. TODO(benh): It might make
+ // sense to consider storing the time of the process as a field of
+ // the class instead.
+
+ // Now we tell the socket manager about this process exiting so
+ // that it can create exited events for linked processes. We
+ // _must_ do this while synchronized on processes because
+ // otherwise another process could attempt to link this process
+ // and SocketManger::link would see that the processes doesn't
+ // exist when it attempts to get a ProcessReference (since we
+ // removed the process above) thus causing an exited event, which
+ // could cause the process to get deleted (e.g., the garbage
+ // collector might link _after_ the process has already been
+ // removed from processes thus getting an exited event but we
+ // don't want that exited event to fire and actually delete the
+ // process until after we have used the process in
+ // SocketManager::exited).
+ socket_manager->exited(process);
+
+ // ***************************************************************
+ // At this point we can no longer dereference the process since it
+ // might already be deallocated (e.g., by the garbage collector).
+ // ***************************************************************
+
+ // Note that we need to open the gate while synchronized on
+ // processes because otherwise we might _open_ the gate before
+ // another thread _approaches_ the gate causing that thread to
+ // wait on _arrival_ to the gate forever (see
+ // ProcessManager::wait).
+ if (gate != NULL) {
+ gate->open();
+ }
+ }
+}
+
+
+void ProcessManager::link(ProcessBase* process, const UPID& to)
+{
+ // Check if the pid is local.
+ if (!(to.ip == __ip__ && to.port == __port__)) {
+ socket_manager->link(process, to);
+ } else {
+ // Since the pid is local we want to get a reference to it's
+ // underlying process so that while we are invoking the link
+ // manager we don't miss sending a possible ExitedEvent.
+ if (ProcessReference _ = use(to)) {
+ socket_manager->link(process, to);
+ } else {
+ // Since the pid isn't valid it's process must have already died
+ // (or hasn't been spawned yet) so send a process exit message.
+ process->enqueue(new ExitedEvent(to));
+ }
+ }
+}
+
+
+void ProcessManager::terminate(
+ const UPID& pid,
+ bool inject,
+ ProcessBase* sender)
+{
+ if (ProcessReference process = use(pid)) {
+ if (Clock::paused()) {
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (sender != NULL) {
+ Clock::order(sender, process);
+ } else {
+ Clock::update(process, Clock::now());
+ }
+ }
+ }
+ }
+
+ if (sender != NULL) {
+ process->enqueue(new TerminateEvent(sender->self()), inject);
+ } else {
+ process->enqueue(new TerminateEvent(UPID()), inject);
+ }
+ }
+}
+
+
+bool ProcessManager::wait(const UPID& pid)
+{
+ // We use a gate for waiters. A gate is single use. That is, a new
+ // gate is created when the first thread shows up and wants to wait
+ // for a process that currently has no gate. Once that process
+ // exits, the last thread to leave the gate will also clean it
+ // up. Note that a gate will never get more threads waiting on it
+ // after it has been opened, since the process should no longer be
+ // valid and therefore will not have an entry in 'processes'.
+
+ Gate* gate = NULL;
+ Gate::state_t old;
+
+ ProcessBase* process = NULL; // Set to non-null if we donate thread.
+
+ // Try and approach the gate if necessary.
+ synchronized (processes) {
+ if (processes.count(pid.id) > 0) {
+ process = processes[pid.id];
+ CHECK(process->state != ProcessBase::TERMINATED);
+
+ // Check and see if a gate already exists.
+ if (gates.find(process) == gates.end()) {
+ gates[process] = new Gate();
+ }
+
+ gate = gates[process];
+ old = gate->approach();
+
+ // Check if it is runnable in order to donate this thread.
+ if (process->state == ProcessBase::BOTTOM ||
+ process->state == ProcessBase::READY) {
+ synchronized (runq) {
+ list<ProcessBase*>::iterator it =
+ find(runq.begin(), runq.end(), process);
+ if (it != runq.end()) {
+ runq.erase(it);
+ } else {
+ // Another thread has resumed the process ...
+ process = NULL;
+ }
+ }
+ } else {
+ // Process is not runnable, so no need to donate ...
+ process = NULL;
+ }
+ }
+ }
+
+ if (process != NULL) {
+ VLOG(2) << "Donating thread to " << process->pid << " while waiting";
+ ProcessBase* donator = __process__;
+ __sync_fetch_and_add(&running, 1);
+ process_manager->resume(process);
+ __process__ = donator;
+ }
+
+ // TODO(benh): Donating only once may not be sufficient, so we might
+ // still deadlock here ... perhaps warn if that's the case?
+
+ // Now arrive at the gate and wait until it opens.
+ if (gate != NULL) {
+ gate->arrive(old);
+
+ if (gate->empty()) {
+ delete gate;
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+
+void ProcessManager::enqueue(ProcessBase* process)
+{
+ CHECK(process != NULL);
+
+ // TODO(benh): Check and see if this process has it's own thread. If
+ // it does, push it on that threads runq, and wake up that thread if
+ // it's not running. Otherwise, check and see which thread this
+ // process was last running on, and put it on that threads runq.
+
+ synchronized (runq) {
+ CHECK(find(runq.begin(), runq.end(), process) == runq.end());
+ runq.push_back(process);
+ }
+
+ // Wake up the processing thread if necessary.
+ gate->open();
+}
+
+
+ProcessBase* ProcessManager::dequeue()
+{
+ // TODO(benh): Remove a process from this thread's runq. If there
+ // are no processes to run, and this is not a dedicated thread, then
+ // steal one from another threads runq.
+
+ ProcessBase* process = NULL;
+
+ synchronized (runq) {
+ if (!runq.empty()) {
+ process = runq.front();
+ runq.pop_front();
+ // Increment the running count of processes in order to support
+ // the Clock::settle() operation (this must be done atomically
+ // with removing the process from the runq).
+ __sync_fetch_and_add(&running, 1);
+ }
+ }
+
+ return process;
+}
+
+
+void ProcessManager::settle()
+{
+ bool done = true;
+ do {
+ os::sleep(Milliseconds(10));
+ done = true;
+ // Hopefully this is the only place we acquire both these locks.
+ synchronized (runq) {
+ synchronized (timeouts) {
+ CHECK(Clock::paused()); // Since another thread could resume the clock!
+
+ if (!runq.empty()) {
+ done = false;
+ }
+
+ __sync_synchronize(); // Read barrier for 'running'.
+ if (running > 0) {
+ done = false;
+ }
+
+ if (timeouts->size() > 0 &&
+ timeouts->begin()->first <= clock::current) {
+ done = false;
+ }
+
+ if (pending_timers) {
+ done = false;
+ }
+ }
+ }
+ } while (!done);
+}
+
+
+Timer Timer::create(
+ const Duration& duration,
+ const lambda::function<void(void)>& thunk)
+{
+ static uint64_t id = 1; // Start at 1 since Timer() instances start with 0.
+
+ // Assumes Clock::now() does Clock::now(__process__).
+ Timeout timeout = Timeout::in(duration);
+
+ UPID pid = __process__ != NULL ? __process__->self() : UPID();
+
+ Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);
+
+ VLOG(3) << "Created a timer for " << timeout.time();
+
+ // Add the timer.
+ synchronized (timeouts) {
+ if (timeouts->size() == 0 ||
+ timer.timeout().time() < timeouts->begin()->first) {
+ // Need to interrupt the loop to update/set timer repeat.
+ (*timeouts)[timer.timeout().time()].push_back(timer);
+ update_timer = true;
+ ev_async_send(loop, &async_watcher);
+ } else {
+ // Timer repeat is adequate, just add the timeout.
+ CHECK(timeouts->size() >= 1);
+ (*timeouts)[timer.timeout().time()].push_back(timer);
+ }
+ }
+
+ return timer;
+}
+
+
+bool Timer::cancel(const Timer& timer)
+{
+ bool canceled = false;
+ synchronized (timeouts) {
+ // Check if the timeout is still pending, and if so, erase it. In
+ // addition, erase an empty list if we just removed the last
+ // timeout.
+ // TODO(benh): If two timers are created with the same timeout,
+ // this will erase *both*. Fix this!
+ Time time = timer.timeout().time();
+ if (timeouts->count(time) > 0) {
+ canceled = true;
+ (*timeouts)[time].remove(timer);
+ if ((*timeouts)[time].empty()) {
+ timeouts->erase(time);
+ }
+ }
+ }
+
+ return canceled;
+}
+
+
+ProcessBase::ProcessBase(const string& id)
+{
+ process::initialize();
+
+ state = ProcessBase::BOTTOM;
+
+ pthread_mutexattr_t attr;
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+ pthread_mutex_init(&m, &attr);
+ pthread_mutexattr_destroy(&attr);
+
+ refs = 0;
+
+ pid.id = id != "" ? id : ID::generate();
+ pid.ip = __ip__;
+ pid.port = __port__;
+
+ // If using a manual clock, try and set current time of process
+ // using happens before relationship between creator and createe!
+ if (Clock::paused()) {
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ clock::currents->erase(this); // In case the address is reused!
+ if (__process__ != NULL) {
+ Clock::order(__process__, this);
+ } else {
+ Clock::update(this, Clock::now());
+ }
+ }
+ }
+ }
+}
+
+
+ProcessBase::~ProcessBase() {}
+
+
+void ProcessBase::enqueue(Event* event, bool inject)
+{
+ CHECK(event != NULL);
+
+ lock();
+ {
+ if (state != TERMINATING && state != TERMINATED) {
+ if (!inject) {
+ events.push_back(event);
+ } else {
+ events.push_front(event);
+ }
+
+ if (state == BLOCKED) {
+ state = READY;
+ process_manager->enqueue(this);
+ }
+
+ CHECK(state == BOTTOM ||
+ state == READY ||
+ state == RUNNING);
+ } else {
+ delete event;
+ }
+ }
+ unlock();
+}
+
+
+void ProcessBase::inject(const UPID& from, const string& name, const char* data, size_t length)
+{
+ if (!from)
+ return;
+
+ Message* message = encode(from, pid, name, string(data, length));
+
+ enqueue(new MessageEvent(message), true);
+}
+
+
+void ProcessBase::send(const UPID& to, const string& name, const char* data, size_t length)
+{
+ if (!to) {
+ return;
+ }
+
+ // Encode and transport outgoing message.
+ transport(encode(pid, to, name, string(data, length)), this);
+}
+
+
+void ProcessBase::visit(const MessageEvent& event)
+{
+ if (handlers.message.count(event.message->name) > 0) {
+ handlers.message[event.message->name](
+ event.message->from,
+ event.message->body);
+ } else if (delegates.count(event.message->name) > 0) {
+ VLOG(1) << "Delegating message '" << event.message->name
+ << "' to " << delegates[event.message->name];
+ Message* message = new Message(*event.message);
+ message->to = delegates[event.message->name];
+ transport(message, this);
+ }
+}
+
+
+void ProcessBase::visit(const DispatchEvent& event)
+{
+ (*event.f)(this);
+}
+
+
+void ProcessBase::visit(const HttpEvent& event)
+{
+ VLOG(1) << "Handling HTTP event for process '" << pid.id << "'"
+ << " with path: '" << event.request->path << "'";
+
+ CHECK(event.request->path.find('/') == 0); // See ProcessManager::handle.
+
+ // Split the path by '/'.
+ vector<string> tokens = strings::tokenize(event.request->path, "/");
+ CHECK(tokens.size() >= 1);
+ CHECK(tokens[0] == pid.id);
+
+ const string& name = tokens.size() > 1 ? tokens[1] : "";
+
+ if (handlers.http.count(name) > 0) {
+ // Create the promise to link with whatever gets returned, as well
+ // as a future to wait for the response.
+ std::tr1::shared_ptr<Promise<Response> > promise(
+ new Promise<Response>());
+
+ Future<Response>* future = new Future<Response>(promise->future());
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
+
+ // Let the HttpProxy know about this request (via the future).
+ dispatch(proxy, &HttpProxy::handle, future, *event.request);
+
+ // Now call the handler and associate the response with the promise.
+ promise->associate(handlers.http[name](*event.request));
+ } else if (assets.count(name) > 0) {
+ OK response;
+ response.type = Response::PATH;
+ response.path = assets[name].path;
+
+ // Construct the final path by appending remaining tokens.
+ for (int i = 2; i < tokens.size(); i++) {
+ response.path += "/" + tokens[i];
+ }
+
+ // Try and determine the Content-Type from an extension.
+ Try<string> basename = os::basename(response.path);
+ if (!basename.isError()) {
+ size_t index = basename.get().find_last_of('.');
+ if (index != string::npos) {
+ string extension = basename.get().substr(index);
+ if (assets[name].types.count(extension) > 0) {
+ response.headers["Content-Type"] = assets[name].types[extension];
+ }
+ }
+ }
+
+ // TODO(benh): Use "text/plain" for assets that don't have an
+ // extension or we don't have a mapping for? It might be better to
+ // just let the browser guess (or do it's own default).
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, response, *event.request);
+ } else {
+ VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";
+
+ // Get the HttpProxy pid for this socket.
+ PID<HttpProxy> proxy = socket_manager->proxy(event.socket);
+
+ // Enqueue the response with the HttpProxy so that it respects the
+ // order of requests to account for HTTP/1.1 pipelining.
+ dispatch(proxy, &HttpProxy::enqueue, NotFound(), *event.request);
+ }
+}
+
+
+void ProcessBase::visit(const ExitedEvent& event)
+{
+ exited(event.pid);
+}
+
+
+void ProcessBase::visit(const TerminateEvent& event)
+{
+ finalize();
+}
+
+
+UPID ProcessBase::link(const UPID& to)
+{
+ if (!to) {
+ return to;
+ }
+
+ process_manager->link(this, to);
+
+ return to;
+}
+
+
+UPID spawn(ProcessBase* process, bool manage)
+{
+ process::initialize();
+
+ if (process != NULL) {
+ // If using a manual clock, try and set current time of process
+ // using happens before relationship between spawner and spawnee!
+ if (Clock::paused()) {
+ synchronized (timeouts) {
+ if (Clock::paused()) {
+ if (__process__ != NULL) {
+ Clock::order(__process__, process);
+ } else {
+ Clock::update(process, Clock::now());
+ }
+ }
+ }
+ }
+
+ return process_manager->spawn(process, manage);
+ } else {
+ return UPID();
+ }
+}
+
+
+void terminate(const UPID& pid, bool inject)
+{
+ process_manager->terminate(pid, inject, __process__);
+}
+
+
+class WaitWaiter : public Process<WaitWaiter>
+{
+public:
+ WaitWaiter(const UPID& _pid, const Duration& _duration, bool* _waited)
+ : ProcessBase(ID::generate("__waiter__")),
+ pid(_pid),
+ duration(_duration),
+ waited(_waited) {}
+
+ virtual void initialize()
+ {
+ VLOG(3) << "Running waiter process for " << pid;
+ link(pid);
+ delay(duration, self(), &WaitWaiter::timeout);
+ }
+
+private:
+ virtual void exited(const UPID&)
+ {
+ VLOG(3) << "Waiter process waited for " << pid;
+ *waited = true;
+ terminate(self());
+ }
+
+ void timeout()
+ {
+ VLOG(3) << "Waiter process timed out waiting for " << pid;
+ *waited = false;
+ terminate(self());
+ }
+
+private:
+ const UPID pid;
+ const Duration duration;
+ bool* const waited;
+};
+
+
+bool wait(const UPID& pid, const Duration& duration)
+{
+ process::initialize();
+
+ if (!pid) {
+ return false;
+ }
+
+ // This could result in a deadlock if some code decides to wait on a
+ // process that has invoked that code!
+ if (__process__ != NULL && __process__->self() == pid) {
+ std::cerr << "\n**** DEADLOCK DETECTED! ****\nYou are waiting on process "
+ << pid << " that it is currently executing." << std::endl;
+ }
+
+ if (duration == Seconds(-1)) {
+ return process_manager->wait(pid);
+ }
+
+ bool waited = false;
+
+ WaitWaiter waiter(pid, duration, &waited);
+ spawn(waiter);
+ wait(waiter);
+
+ return waited;
+}
+
+
+void filter(Filter *filter)
+{
+ process::initialize();
+
+ synchronized (filterer) {
+ filterer = filter;
+ }
<TRUNCATED>
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic