diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt
index 0926518cfe..2cc262dffe 100644
--- a/rclcpp/CMakeLists.txt
+++ b/rclcpp/CMakeLists.txt
@@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3)
 project(rclcpp)
 
 find_package(ament_cmake REQUIRED)
+find_package(rcl_interfaces REQUIRED)
 
 ament_export_dependencies(rmw)
 ament_export_dependencies(rcl_interfaces)
@@ -12,6 +13,18 @@ ament_export_include_directories(include)
 if(AMENT_ENABLE_TESTING)
   find_package(ament_lint_auto REQUIRED)
   ament_lint_auto_find_test_dependencies()
+
+  if(NOT WIN32)
+    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -Wextra")
+  endif()
+
+  include_directories(include)
+
+  ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
+  ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
+  if(TARGET test_intra_process_manager)
+    target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}")
+  endif()
 endif()
 
 ament_package(
diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp
index 3f698f1911..1bdddb880b 100644
--- a/rclcpp/include/rclcpp/any_executable.hpp
+++ b/rclcpp/include/rclcpp/any_executable.hpp
@@ -33,6 +33,7 @@ struct AnyExecutable
   {}
   // Only one of the following pointers will be set.
   rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
+  rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
   rclcpp::timer::TimerBase::SharedPtr timer;
   rclcpp::service::ServiceBase::SharedPtr service;
   rclcpp::client::ClientBase::SharedPtr client;
diff --git a/rclcpp/include/rclcpp/context.hpp b/rclcpp/include/rclcpp/context.hpp
index 55287926d8..1c56cbb11f 100644
--- a/rclcpp/include/rclcpp/context.hpp
+++ b/rclcpp/include/rclcpp/context.hpp
@@ -15,16 +15,24 @@
 #ifndef RCLCPP_RCLCPP_CONTEXT_HPP_
 #define RCLCPP_RCLCPP_CONTEXT_HPP_
 
+#include <rclcpp/macros.hpp>
+
+#include <iostream>
+
 #include <memory>
+#include <mutex>
+#include <typeinfo>
+#include <typeindex>
+#include <unordered_map>
 
-#include <rclcpp/macros.hpp>
+#include <rmw/rmw.h>
 
 namespace rclcpp
 {
+
 namespace context
 {
 
-/* ROS Context */
 class Context
 {
 public:
@@ -32,9 +40,38 @@ class Context
 
   Context() {}
 
+  template<typename SubContext, typename ... Args>
+  std::shared_ptr<SubContext>
+  get_sub_context(Args && ... args)
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    std::type_index type_i(typeid(SubContext));
+    std::shared_ptr<SubContext> sub_context;
+    auto it = sub_contexts_.find(type_i);
+    if (it == sub_contexts_.end()) {
+      // It doesn't exist yet, make it
+      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+      sub_context = std::shared_ptr<SubContext>(
+        new SubContext(std::forward<Args>(args) ...),
+        [] (SubContext * sub_context_ptr) {
+          delete sub_context_ptr;
+        });
+      // *INDENT-ON*
+      sub_contexts_[type_i] = sub_context;
+    } else {
+      // It exists, get it out and cast it.
+      sub_context = std::static_pointer_cast<SubContext>(it->second);
+    }
+    return sub_context;
+  }
+
 private:
   RCLCPP_DISABLE_COPY(Context);
 
+  std::unordered_map<std::type_index, std::shared_ptr<void>> sub_contexts_;
+  std::mutex mutex_;
+
 };
 
 } /* namespace context */
diff --git a/rclcpp/include/rclcpp/contexts/default_context.hpp b/rclcpp/include/rclcpp/contexts/default_context.hpp
index dc78f78338..c10225e68c 100644
--- a/rclcpp/include/rclcpp/contexts/default_context.hpp
+++ b/rclcpp/include/rclcpp/contexts/default_context.hpp
@@ -33,6 +33,13 @@ class DefaultContext : public rclcpp::context::Context
 
 };
 
+DefaultContext::SharedPtr
+get_global_default_context()
+{
+  static DefaultContext::SharedPtr default_context = DefaultContext::make_shared();
+  return default_context;
+}
+
 } // namespace default_context
 } // namespace contexts
 } // namespace rclcpp
diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp
index f32daa6938..c58370cc84 100644
--- a/rclcpp/include/rclcpp/executor.hpp
+++ b/rclcpp/include/rclcpp/executor.hpp
@@ -15,15 +15,16 @@
 #ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
 #define RCLCPP_RCLCPP_EXECUTOR_HPP_
 
-#include <iostream>
-
 #include <algorithm>
 #include <cassert>
 #include <cstdlib>
+#include <iostream>
 #include <list>
 #include <memory>
 #include <vector>
 
+#include <rcl_interfaces/msg/intra_process_message.hpp>
+
 #include <rclcpp/any_executable.hpp>
 #include <rclcpp/macros.hpp>
 #include <rclcpp/memory_strategy.hpp>
@@ -159,6 +160,9 @@ class Executor
     if (any_exec->subscription) {
       execute_subscription(any_exec->subscription);
     }
+    if (any_exec->subscription_intra_process) {
+      execute_intra_process_subscription(any_exec->subscription_intra_process);
+    }
     if (any_exec->service) {
       execute_service(any_exec->service);
     }
@@ -194,6 +198,24 @@ class Executor
     subscription->return_message(message);
   }
 
+  static void
+  execute_intra_process_subscription(
+    rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
+  {
+    rcl_interfaces::msg::IntraProcessMessage ipm;
+    bool taken = false;
+    rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
+    if (status == RMW_RET_OK) {
+      if (taken) {
+        subscription->handle_intra_process_message(ipm);
+      }
+    } else {
+      fprintf(stderr,
+        "[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
+        subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
+    }
+  }
+
   static void
   execute_timer(
     rclcpp::timer::TimerBase::SharedPtr & timer)
@@ -293,22 +315,24 @@ class Executor
           }));
     }
     // Use the number of subscriptions to allocate memory in the handles
-    size_t number_of_subscriptions = subs.size();
+    size_t max_number_of_subscriptions = subs.size() * 2;  // Times two for intra-process.
     rmw_subscriptions_t subscriber_handles;
-    subscriber_handles.subscriber_count = number_of_subscriptions;
+    subscriber_handles.subscriber_count = 0;
     // TODO(wjwwood): Avoid redundant malloc's
-    subscriber_handles.subscribers =
-      memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
+    subscriber_handles.subscribers = memory_strategy_->borrow_handles(
+      HandleType::subscription_handle, max_number_of_subscriptions);
     if (subscriber_handles.subscribers == NULL) {
       // TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
       throw std::runtime_error("Could not malloc for subscriber pointers.");
     }
     // Then fill the SubscriberHandles with ready subscriptions
-    size_t subscriber_handle_index = 0;
     for (auto & subscription : subs) {
-      subscriber_handles.subscribers[subscriber_handle_index] = \
+      subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
         subscription->subscription_handle_->data;
-      subscriber_handle_index += 1;
+      if (subscription->intra_process_subscription_handle_) {
+        subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
+          subscription->intra_process_subscription_handle_->data;
+      }
     }
 
     // Use the number of services to allocate memory in the handles
@@ -414,7 +438,7 @@ class Executor
     }
     // Add the new work to the class's list of things waiting to be executed
     // Starting with the subscribers
-    for (size_t i = 0; i < number_of_subscriptions; ++i) {
+    for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
       void * handle = subscriber_handles.subscribers[i];
       if (handle) {
         subscriber_handles_.push_back(handle);
@@ -463,13 +487,18 @@ class Executor
         }
         for (auto & weak_subscription : group->subscription_ptrs_) {
           auto subscription = weak_subscription.lock();
-          if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
-            return subscription;
+          if (subscription) {
+            if (subscription->subscription_handle_->data == subscriber_handle) {
+              return subscription;
+            }
+            if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
+              return subscription;
+            }
           }
         }
       }
     }
-    return rclcpp::subscription::SubscriptionBase::SharedPtr();
+    return nullptr;
   }
 
   rclcpp::service::ServiceBase::SharedPtr
@@ -653,6 +682,11 @@ class Executor
     while (it != subscriber_handles_.end()) {
       auto subscription = get_subscription_by_handle(*it);
       if (subscription) {
+        // Figure out if this is for intra-process or not.
+        bool is_intra_process = false;
+        if (subscription->intra_process_subscription_handle_) {
+          is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
+        }
         // Find the group for this handle and see if it can be serviced
         auto group = get_group_by_subscription(subscription);
         if (!group) {
@@ -668,7 +702,11 @@ class Executor
           continue;
         }
         // Otherwise it is safe to set and return the any_exec
-        any_exec->subscription = subscription;
+        if (is_intra_process) {
+          any_exec->subscription_intra_process = subscription;
+        } else {
+          any_exec->subscription = subscription;
+        }
         any_exec->callback_group = group;
         any_exec->node = get_node_by_group(group);
         subscriber_handles_.erase(it++);
@@ -804,7 +842,7 @@ class Executor
     }
     // Check the subscriptions to see if there are any that are ready
     get_next_subscription(any_exec);
-    if (any_exec->subscription) {
+    if (any_exec->subscription || any_exec->subscription_intra_process) {
       return any_exec;
     }
     // Check the services to see if there are any that are ready
diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp
new file mode 100644
index 0000000000..e68973141f
--- /dev/null
+++ b/rclcpp/include/rclcpp/intra_process_manager.hpp
@@ -0,0 +1,419 @@
+// Copyright 2015 Open Source Robotics Foundation, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
+#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_
+
+#include <rclcpp/mapped_ring_buffer.hpp>
+#include <rclcpp/macros.hpp>
+#include <rclcpp/publisher.hpp>
+#include <rclcpp/subscription.hpp>
+
+#include <algorithm>
+#include <atomic>
+#include <cstdint>
+#include <exception>
+#include <limits>
+#include <map>
+#include <unordered_map>
+#include <set>
+
+namespace rclcpp
+{
+namespace intra_process_manager
+{
+
+/// This class facilitates intra process communication between nodes.
+/* This class is used in the creation of publishers and subscriptions.
+ * A singleton instance of this class is owned by a rclcpp::Context and a
+ * rclcpp::Node can use an associated Context to get an instance of this class.
+ * Nodes which do not have a common Context will not exchange intra process
+ * messages because they will not share access to an instance of this class.
+ *
+ * When a Node creates a publisher or subscription, it will register them
+ * with this class.
+ * The node will also hook into the publisher's publish call
+ * in order to do intra process related work.
+ *
+ * When a publisher is created, it advertises on the topic the user provided,
+ * as well as a "shadowing" topic of type rcl_interfaces/IntraProcessMessage.
+ * For instance, if the user specified the topic '/namespace/chatter', then the
+ * corresponding intra process topic might be '/namespace/chatter__intra'.
+ * The publisher is also allocated an id which is unique among all publishers
+ * and subscriptions in this process.
+ * Additionally, when registered with this class a ring buffer is created and
+ * owned by this class as a temporary place to hold messages destined for intra
+ * process subscriptions.
+ *
+ * When a subscription is created, it subscribes to the topic provided by the
+ * user as well as to the corresponding intra process topic.
+ * It is also gets a unique id from the singleton instance of this class which
+ * is unique among publishers and subscriptions.
+ *
+ * When the user publishes a message, the message is stored by calling
+ * store_intra_process_message on this class.
+ * The instance of that message is uniquely identified by a publisher id and a
+ * message sequence number.
+ * The publisher id, message sequence pair is unique with in the process.
+ * At that point a list of the id's of intra process subscriptions which have
+ * been registered with the singleton instance of this class are stored with
+ * the message instance so that delivery is only made to those subscriptions.
+ * Then an instance of rcl_interfaces/IntraProcessMessage is published to the
+ * intra process topic which is specific to the topic specified by the user.
+ *
+ * When an instance of rcl_interfaces/IntraProcessMessage is received by a
+ * subscription, then it is handled by calling take_intra_process_message
+ * on a singleton of this class.
+ * The subscription passes a publisher id, message sequence pair which
+ * uniquely identifies the message instance it was suppose to receive as well
+ * as the subscriptions unique id.
+ * If the message is still being held by this class and the subscription's id
+ * is in the list of intended subscriptions then the message is returned.
+ * If either of those predicates are not satisfied then the message is not
+ * returned and the subscription does not call the users callback.
+ *
+ * Since the publisher builds a list of destined subscriptions on publish, and
+ * other requests are ignored, this class knows how many times a message
+ * instance should be requested.
+ * The final time a message is requested, the ownership is passed out of this
+ * class and passed to the final subscription, effectively freeing space in
+ * this class's internal storage.
+ *
+ * Since a topic is being used to ferry notifications about new intra process
+ * messages between publishers and subscriptions, it is possible for that
+ * notification to be lost.
+ * It is also possible that a subscription which was available when publish was
+ * called will no longer exist once the notification gets posted.
+ * In both cases this might result in a message instance getting requested
+ * fewer times than expected.
+ * This is why the internal storage of this class is a ring buffer.
+ * That way if a message is orphaned it will eventually be dropped from storage
+ * when a new message instance is stored and will not result in a memory leak.
+ *
+ * However, since the storage system is finite, this also means that a message
+ * instance might get displaced by an incoming message instance before all
+ * interested parties have called take_intra_process_message.
+ * Because of this the size of the internal storage should be carefully
+ * considered.
+ *
+ * /TODO(wjwwood): update to include information about handling latching.
+ * /TODO(wjwwood): consider thread safety of the class.
+ *
+ * This class is neither CopyConstructable nor CopyAssignable.
+ */
+class IntraProcessManager
+{
+private:
+  RCLCPP_DISABLE_COPY(IntraProcessManager);
+
+public:
+  RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager);
+
+  IntraProcessManager() = default;
+
+  /// Register a subscription with the manager, returns subscriptions unique id.
+  /* In addition to generating a unique intra process id for the subscription,
+   * this method also stores the topic name of the subscription.
+   *
+   * This method is normally called during the creation of a subscription,
+   * but after it creates the internal intra process rmw_subscription_t.
+   *
+   * This method will allocate memory.
+   *
+   * \param subscription the Subscription to register.
+   * \return an unsigned 64-bit integer which is the subscription's unique id.
+   */
+  uint64_t
+  add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
+  {
+    auto id = IntraProcessManager::get_next_unique_id();
+    subscriptions_[id] = subscription;
+    subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
+    return id;
+  }
+
+  /// Unregister a subscription using the subscription's unique id.
+  /* This method does not allocate memory.
+   *
+   * \param intra_process_subscription_id id of the subscription to remove.
+   */
+  void
+  remove_subscription(uint64_t intra_process_subscription_id)
+  {
+    subscriptions_.erase(intra_process_subscription_id);
+    for (auto & pair : subscription_ids_by_topic_) {
+      pair.second.erase(intra_process_subscription_id);
+    }
+    // Iterate over all publisher infos and all stored subscription id's and
+    // remove references to this subscription's id.
+    for (auto & publisher_pair : publishers_) {
+      for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
+        sub_pair.second.erase(intra_process_subscription_id);
+      }
+    }
+  }
+
+  /// Register a publisher with the manager, returns the publisher unique id.
+  /* In addition to generating and returning a unique id for the publisher,
+   * this method creates internal ring buffer storage for "in-flight" intra
+   * process messages which are stored when store_intra_process_message is
+   * called with this publisher's unique id.
+   *
+   * The buffer_size must be less than or equal to the max uint64_t value.
+   * If the buffer_size is 0 then a buffer size is calculated using the
+   * publisher's QoS settings.
+   * The default is to use the depth field of the publisher's QoS.
+   * TODO(wjwwood): Consider doing depth *= 1.2, round up, or similar.
+   * TODO(wjwwood): Consider what to do for keep all.
+   *
+   * This method is templated on the publisher's message type so that internal
+   * storage of the same type can be allocated.
+   *
+   * This method will allocate memory.
+   *
+   * \param publisher publisher to be registered with the manager.
+   * \param buffer_size if 0 (default) a size is calculated based on the QoS.
+   * \return an unsigned 64-bit integer which is the publisher's unique id.
+   */
+  template<typename MessageT>
+  uint64_t
+  add_publisher(publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0)
+  {
+    auto id = IntraProcessManager::get_next_unique_id();
+    publishers_[id].publisher = publisher;
+    size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
+    // As long as the size of the ring buffer is less than the max sequence number, we're safe.
+    if (size > std::numeric_limits<uint64_t>::max()) {
+      throw std::invalid_argument("the calculated buffer size is too large");
+    }
+    publishers_[id].sequence_number.store(0);
+    publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer<MessageT>::make_shared(size);
+    publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
+    return id;
+  }
+
+  /// Unregister a publisher using the publisher's unique id.
+  /* This method does not allocate memory.
+   *
+   * \param intra_process_publisher_id id of the publisher to remove.
+   */
+  void
+  remove_publisher(uint64_t intra_process_publisher_id)
+  {
+    publishers_.erase(intra_process_publisher_id);
+  }
+
+  /// Store a message in the manager, and return the message sequence number.
+  /* The given message is stored in internal storage using the given publisher
+   * id and the newly generated message sequence, which is also returned.
+   * The combination of publisher id and message sequence number can later
+   * be used with a subscription id to retrieve the message by calling
+   * take_intra_process_message.
+   * The number of times take_intra_process_message can be called with this
+   * unique pair of id's is determined by the number of subscriptions currently
+   * subscribed to the same topic and which share the same Context, i.e. once
+   * for each subscription which should receive the intra process message.
+   *
+   * The ownership of the incoming message is transfered to the internal
+   * storage in order to avoid copying the message data.
+   * Therefore, the message parameter will no longer contain the original
+   * message after calling this method.
+   * Instead it will either be a nullptr or it will contain the ownership of
+   * the message instance which was displaced.
+   * If the message parameter is not equal to nullptr after calling this method
+   * then a message was prematurely displaced, i.e. take_intra_process_message
+   * had not been called on it as many times as was expected.
+   *
+   * This method can throw an exception if the publisher id is not found or
+   * if the publisher shared_ptr given to add_publisher has gone out of scope.
+   *
+   * This method does allocate memory.
+   *
+   * \param intra_process_publisher_id the id of the publisher of this message.
+   * \param message the message that is being stored.
+   * \return the message sequence number.
+   */
+  template<typename MessageT>
+  uint64_t
+  store_intra_process_message(
+    uint64_t intra_process_publisher_id,
+    std::unique_ptr<MessageT> & message)
+  {
+    auto it = publishers_.find(intra_process_publisher_id);
+    if (it == publishers_.end()) {
+      throw std::runtime_error("store_intra_process_message called with invalid publisher id");
+    }
+    PublisherInfo & info = it->second;
+    // Calculate the next message sequence number.
+    uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed);
+    // Insert the message into the ring buffer using the message_seq to identify it.
+    typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
+    typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info.buffer);
+    bool did_replace = typed_buffer->push_and_replace(message_seq, message);
+    // TODO(wjwwood): do something when a message was displaced. log debug?
+    (void)did_replace;  // Avoid unused variable warning.
+    // Figure out what subscriptions should receive the message.
+    auto publisher = info.publisher.lock();
+    if (!publisher) {
+      throw std::runtime_error("publisher has unexpectedly gone out of scope");
+    }
+    auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
+    // Store the list for later comparison.
+    info.target_subscriptions_by_message_sequence[message_seq].clear();
+    std::copy(
+      destined_subscriptions.begin(), destined_subscriptions.end(),
+      // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
+      std::inserter(
+        info.target_subscriptions_by_message_sequence[message_seq],
+        // This ends up only being a hint to std::set, could also be .begin().
+        info.target_subscriptions_by_message_sequence[message_seq].end()
+      )
+    );
+    // Return the message sequence which is sent to the subscription.
+    return message_seq;
+  }
+
+  /// Take an intra process message.
+  /* The intra_process_publisher_id and message_sequence_number parameters
+   * uniquely identify a message instance, which should be taken.
+   *
+   * The requesting_subscriptions_intra_process_id parameter is used to make
+   * sure the requesting subscription was intended to receive this message
+   * instance.
+   * This check is made because it could happen that the requester
+   * comes up after the publish event, so it still receives the notification of
+   * a new intra process message, but it wasn't registered with the manager at
+   * the time of publishing, causing it to take when it wasn't intended.
+   * This should be avioded unless latching-like behavior is involved.
+   *
+   * The message parameter is used to store the taken message.
+   * On the last expected call to this method, the ownership is transfered out
+   * of internal storage and into the message parameter.
+   * On all previous calls a copy of the internally stored message is made and
+   * the ownership of the copy is transfered to the message parameter.
+   * TODO(wjwwood): update this documentation when latching is supported.
+   *
+   * The message parameter can be set to nullptr if:
+   *
+   * - The publisher id is not found.
+   * - The message sequence is not found for the given publisher id.
+   * - The requesting subscription's id is not in the list of intended takers.
+   * - The requesting subscription's id has been used before with this message.
+   *
+   * This method may allocate memory to copy the stored message.
+   *
+   * \param intra_process_publisher_id the id of the message's publisher.
+   * \param message_sequence_number the sequence number of the message.
+   * \param requesting_subscriptions_intra_process_id the subscription's id.
+   * \param message the message typed unique_ptr used to return the message.
+   */
+  template<typename MessageT>
+  void
+  take_intra_process_message(
+    uint64_t intra_process_publisher_id,
+    uint64_t message_sequence_number,
+    uint64_t requesting_subscriptions_intra_process_id,
+    std::unique_ptr<MessageT> & message)
+  {
+    message = nullptr;
+    PublisherInfo * info;
+    {
+      auto it = publishers_.find(intra_process_publisher_id);
+      if (it == publishers_.end()) {
+        // Publisher is either invalid or no longer exists.
+        return;
+      }
+      info = &it->second;
+    }
+    // Figure out how many subscriptions are left.
+    std::set<uint64_t> * target_subs;
+    {
+      auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
+      if (it == info->target_subscriptions_by_message_sequence.end()) {
+        // Message is no longer being stored by this publisher.
+        return;
+      }
+      target_subs = &it->second;
+    }
+    {
+      auto it = std::find(
+        target_subs->begin(), target_subs->end(),
+        requesting_subscriptions_intra_process_id);
+      if (it == target_subs->end()) {
+        // This publisher id/message seq pair was not intended for this subscription.
+        return;
+      }
+      target_subs->erase(it);
+    }
+    typedef typename mapped_ring_buffer::MappedRingBuffer<MessageT> TypedMRB;
+    typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(info->buffer);
+    // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
+    if (target_subs->size()) {
+      // There are more subscriptions to serve, return a copy.
+      typed_buffer->get_copy_at_key(message_sequence_number, message);
+    } else {
+      // This is the last one to be returned, transfer ownership.
+      typed_buffer->pop_at_key(message_sequence_number, message);
+    }
+  }
+
+private:
+  static uint64_t get_next_unique_id()
+  {
+    auto next_id = next_unique_id_.fetch_add(1, std::memory_order_relaxed);
+    // Check for rollover (we started at 1).
+    if (0 == next_id) {
+      // This puts a technical limit on the number of times you can add a publisher or subscriber.
+      // But even if you could add (and remove) them at 1 kHz (very optimistic rate)
+      // it would still be a very long time before you could exhaust the pool of id's:
+      //   2^64 / 1000 times per sec / 60 sec / 60 min / 24 hours / 365 days = 584,942,417 years
+      // So around 585 million years. Even at 1 GHz, it would take 585 years.
+      // I think it's safe to avoid trying to handle overflow.
+      // If we roll over then it's most likely a bug.
+      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+      throw std::overflow_error(
+        "exhausted the unique id's for publishers and subscribers in this process "
+        "(congratulations your computer is either extremely fast or extremely old)");
+      // *INDENT-ON*
+    }
+    return next_id;
+  }
+
+  static std::atomic<uint64_t> next_unique_id_;
+
+  std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr> subscriptions_;
+  std::map<std::string, std::set<uint64_t>> subscription_ids_by_topic_;
+
+  struct PublisherInfo
+  {
+    RCLCPP_DISABLE_COPY(PublisherInfo);
+
+    PublisherInfo() = default;
+
+    publisher::Publisher::WeakPtr publisher;
+    std::atomic<uint64_t> sequence_number;
+    mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
+    std::unordered_map<uint64_t, std::set<uint64_t>> target_subscriptions_by_message_sequence;
+  };
+
+  std::unordered_map<uint64_t, PublisherInfo> publishers_;
+
+};
+
+std::atomic<uint64_t> IntraProcessManager::next_unique_id_ {1};
+
+} /* namespace intra_process_manager */
+} /* namespace rclcpp */
+
+#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */
diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp
new file mode 100644
index 0000000000..4b5bb474a6
--- /dev/null
+++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp
@@ -0,0 +1,214 @@
+// Copyright 2015 Open Source Robotics Foundation, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_
+#define RCLCPP_RCLCPP_RING_BUFFER_HPP_
+
+#include <rclcpp/macros.hpp>
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+namespace rclcpp
+{
+namespace mapped_ring_buffer
+{
+
+class MappedRingBufferBase
+{
+public:
+  RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase);
+};
+
+/// Ring buffer container of unique_ptr's of T, which can be accessed by a key.
+/* T must be a CopyConstructable and CopyAssignable.
+ * This class can be used in a container by using the base class MappedRingBufferBase.
+ * This class must have a positive, non-zero size.
+ * This class cannot be resized nor can it reserve additional space after construction.
+ * This class is not CopyConstructable nor CopyAssignable.
+ *
+ * The key's are not guaranteed to be unique because push_and_replace does not
+ * check for colliding keys.
+ * It is up to the user to only use unique keys.
+ * A side effect of this is that when get_copy_at_key or pop_at_key are called,
+ * they return the first encountered instance of the key.
+ * But iteration does not begin with the ring buffer's head, and therefore
+ * there is no guarantee on which value is returned if a key is used multiple
+ * times.
+ */
+template<typename T>
+class MappedRingBuffer : public MappedRingBufferBase
+{
+public:
+  RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer<T>);
+
+  /// Constructor.
+  /* The constructor will allocate memory while reserving space.
+   *
+   * \param size size of the ring buffer; must be positive and non-zero.
+   */
+  MappedRingBuffer(size_t size)
+  : elements_(size), head_(0)
+  {
+    if (size == 0) {
+      throw std::invalid_argument("size must be a positive, non-zero value");
+    }
+  }
+  virtual ~MappedRingBuffer() {}
+
+  /// Return a copy of the value stored in the ring buffer at the given key.
+  /* The key is matched if an element in the ring buffer has a matching key.
+   * This method will allocate in order to return a copy.
+   *
+   * The key is not guaranteed to be unique, see the class docs for more.
+   *
+   * The contents of value before the method is called are discarded.
+   *
+   * \param key the key associated with the stored value
+   * \param value if the key is found, the value is stored in this parameter
+   */
+  void
+  get_copy_at_key(uint64_t key, std::unique_ptr<T> & value)
+  {
+    auto it = get_iterator_of_key(key);
+    value = nullptr;
+    if (it != elements_.end() && it->in_use) {
+      value = std::unique_ptr<T>(new T(*it->value));
+    }
+  }
+
+  /// Return ownership of the value stored in the ring buffer, leaving a copy.
+  /* The key is matched if an element in the ring bufer has a matching key.
+   * This method will allocate in order to store a copy.
+   *
+   * The key is not guaranteed to be unique, see the class docs for more.
+   *
+   * The ownership of the currently stored object is returned, but a copy is
+   * made and stored in its place.
+   * This means that multiple calls to this function for a particular element
+   * will result in returning the copied and stored object not the original.
+   * This also means that later calls to pop_at_key will not return the
+   * originally stored object, since it was returned by the first call to this
+   * method.
+   *
+   * The contents of value before the method is called are discarded.
+   *
+   * \param key the key associated with the stored value
+   * \param value if the key is found, the value is stored in this parameter
+   */
+  void
+  get_ownership_at_key(uint64_t key, std::unique_ptr<T> & value)
+  {
+    auto it = get_iterator_of_key(key);
+    value = nullptr;
+    if (it != elements_.end() && it->in_use) {
+      // Make a copy.
+      auto copy = std::unique_ptr<T>(new T(*it->value));
+      // Return the original.
+      value.swap(it->value);
+      // Store the copy.
+      it->value.swap(copy);
+    }
+  }
+
+  /// Return ownership of the value stored in the ring buffer at the given key.
+  /* The key is matched if an element in the ring buffer has a matching key.
+   *
+   * The key is not guaranteed to be unique, see the class docs for more.
+   *
+   * The contents of value before the method is called are discarded.
+   *
+   * \param key the key associated with the stored value
+   * \param value if the key is found, the value is stored in this parameter
+   */
+  void
+  pop_at_key(uint64_t key, std::unique_ptr<T> & value)
+  {
+    auto it = get_iterator_of_key(key);
+    value = nullptr;
+    if (it != elements_.end() && it->in_use) {
+      value.swap(it->value);
+      it->in_use = false;
+    }
+  }
+
+  /// Insert a key-value pair, displacing an existing pair if necessary.
+  /* The key's uniqueness is not checked on insertion.
+   * It is up to the user to ensure the key is unique.
+   * This method should not allocate memory.
+   *
+   * After insertion, if a pair was replaced, then value will contain ownership
+   * of that displaced value. Otherwise it will be a nullptr.
+   *
+   * \param key the key associated with the value to be stored
+   * \param value the value to store, and optionally the value displaced
+   */
+  bool
+  push_and_replace(uint64_t key, std::unique_ptr<T> & value)
+  {
+    bool did_replace = elements_[head_].in_use;
+    elements_[head_].key = key;
+    elements_[head_].value.swap(value);
+    elements_[head_].in_use = true;
+    head_ = (head_ + 1) % elements_.size();
+    return did_replace;
+  }
+
+  bool
+  push_and_replace(uint64_t key, std::unique_ptr<T> && value)
+  {
+    std::unique_ptr<T> temp = std::move(value);
+    return push_and_replace(key, temp);
+  }
+
+  /// Return true if the key is found in the ring buffer, otherwise false.
+  bool
+  has_key(uint64_t key)
+  {
+    return elements_.end() != get_iterator_of_key(key);
+  }
+
+private:
+  RCLCPP_DISABLE_COPY(MappedRingBuffer<T>);
+
+  struct element
+  {
+    uint64_t key;
+    std::unique_ptr<T> value;
+    bool in_use;
+  };
+
+  typename std::vector<element>::iterator
+  get_iterator_of_key(uint64_t key)
+  {
+    // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+    auto it = std::find_if(elements_.begin(), elements_.end(), [key](element & e) -> bool {
+      return e.key == key && e.in_use;
+    });
+    // *INDENT-ON*
+    return it;
+  }
+
+  std::vector<element> elements_;
+  size_t head_;
+
+};
+
+} /* namespace ring_buffer */
+} /* namespace rclcpp */
+
+#endif /* RCLCPP_RCLCPP_RING_BUFFER_HPP_ */
diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp
index 95d916dd09..03ff9de492 100644
--- a/rclcpp/include/rclcpp/node.hpp
+++ b/rclcpp/include/rclcpp/node.hpp
@@ -24,6 +24,7 @@
 #include <rcl_interfaces/msg/parameter_descriptor.hpp>
 #include <rcl_interfaces/msg/parameter_event.hpp>
 #include <rcl_interfaces/msg/set_parameters_result.hpp>
+#include <rosidl_generator_cpp/message_type_support.hpp>
 
 #include <rclcpp/callback_group.hpp>
 #include <rclcpp/client.hpp>
@@ -103,9 +104,11 @@ class Node
   RCLCPP_SMART_PTR_DEFINITIONS(Node);
 
   /* Create a node based on the node name. */
-  Node(const std::string & node_name);
+  Node(const std::string & node_name, bool use_intra_process_comms = false);
   /* Create a node based on the node name and a rclcpp::context::Context. */
-  Node(const std::string & node_name, rclcpp::context::Context::SharedPtr context);
+  Node(
+    const std::string & node_name, rclcpp::context::Context::SharedPtr context,
+    bool use_intra_process_comms = false);
 
   /* Get the name of the node. */
   const std::string &
@@ -197,6 +200,8 @@ class Node
 private:
   RCLCPP_DISABLE_COPY(Node);
 
+  static const rosidl_message_type_support_t * ipm_ts_;
+
   bool
   group_in_node(callback_group::CallbackGroup::SharedPtr & group);
 
@@ -214,6 +219,8 @@ class Node
   size_t number_of_services_;
   size_t number_of_clients_;
 
+  bool use_intra_process_comms_;
+
   mutable std::mutex mutex_;
 
   std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;
@@ -305,6 +312,9 @@ class Node
   }
 };
 
+const rosidl_message_type_support_t * Node::ipm_ts_ =
+  rosidl_generator_cpp::get_message_type_support_handle<rcl_interfaces::msg::IntraProcessMessage>();
+
 } /* namespace node */
 } /* namespace rclcpp */
 
diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp
index e67b8f3d23..23cdd27c9e 100644
--- a/rclcpp/include/rclcpp/node_impl.hpp
+++ b/rclcpp/include/rclcpp/node_impl.hpp
@@ -24,12 +24,14 @@
 #include <stdexcept>
 #include <string>
 
+#include <rcl_interfaces/msg/intra_process_message.hpp>
 #include <rmw/error_handling.h>
 #include <rmw/rmw.h>
 #include <rosidl_generator_cpp/message_type_support.hpp>
 #include <rosidl_generator_cpp/service_type_support.hpp>
 
 #include <rclcpp/contexts/default_context.hpp>
+#include <rclcpp/intra_process_manager.hpp>
 #include <rclcpp/parameter.hpp>
 
 #ifndef RCLCPP_RCLCPP_NODE_HPP_
@@ -39,15 +41,20 @@
 using namespace rclcpp;
 using namespace rclcpp::node;
 
-using rclcpp::contexts::default_context::DefaultContext;
-
-Node::Node(const std::string & node_name)
-: Node(node_name, DefaultContext::make_shared())
+Node::Node(const std::string & node_name, bool use_intra_process_comms)
+: Node(
+    node_name,
+    rclcpp::contexts::default_context::get_global_default_context(),
+    use_intra_process_comms)
 {}
 
-Node::Node(const std::string & node_name, context::Context::SharedPtr context)
+Node::Node(
+  const std::string & node_name,
+  context::Context::SharedPtr context,
+  bool use_intra_process_comms)
 : name_(node_name), context_(context),
-  number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0)
+  number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
+  use_intra_process_comms_(use_intra_process_comms)
 {
   size_t domain_id = 0;
   char * ros_domain_id = nullptr;
@@ -80,6 +87,7 @@ Node::Node(const std::string & node_name, context::Context::SharedPtr context)
     // *INDENT-ON*
   }
   // Initialize node handle shared_ptr with custom deleter.
+  // *INDENT-OFF*
   node_handle_.reset(node, [](rmw_node_t * node) {
     auto ret = rmw_destroy_node(node);
     if (ret != RMW_RET_OK) {
@@ -87,6 +95,7 @@ Node::Node(const std::string & node_name, context::Context::SharedPtr context)
         stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe());
     }
   });
+  // *INDENT-ON*
 
   using rclcpp::callback_group::CallbackGroupType;
   default_callback_group_ = create_callback_group(
@@ -123,7 +132,47 @@ Node::create_publisher(
     // *INDENT-ON*
   }
 
-  return publisher::Publisher::make_shared(node_handle_, publisher_handle);
+  auto publisher = publisher::Publisher::make_shared(
+    node_handle_, publisher_handle, topic_name, qos_profile.depth);
+
+  if (use_intra_process_comms_) {
+    rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
+      node_handle_.get(), ipm_ts_, (topic_name + "__intra").c_str(), qos_profile);
+    if (!intra_process_publisher_handle) {
+      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+      throw std::runtime_error(
+        std::string("could not create intra process publisher: ") +
+        rmw_get_error_string_safe());
+      // *INDENT-ON*
+    }
+
+    auto intra_process_manager =
+      context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
+    uint64_t intra_process_publisher_id =
+      intra_process_manager->add_publisher<MessageT>(publisher);
+    rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
+    // *INDENT-OFF*
+    auto shared_publish_callback =
+      [weak_ipm](uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
+    {
+      auto ipm = weak_ipm.lock();
+      if (!ipm) {
+        // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
+        throw std::runtime_error(
+          "intra process publish called after destruction of intra process manager");
+      }
+      auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
+      std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
+      uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
+      return message_seq;
+    };
+    // *INDENT-ON*
+    publisher->setup_intra_process(
+      intra_process_publisher_id,
+      shared_publish_callback,
+      intra_process_publisher_handle);
+  }
+  return publisher;
 }
 
 bool
@@ -163,8 +212,7 @@ Node::create_subscription(
   if (!subscriber_handle) {
     // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
     throw std::runtime_error(
-      std::string("could not create subscription: ") +
-      rmw_get_error_string_safe());
+      std::string("could not create subscription: ") + rmw_get_error_string_safe());
     // *INDENT-ON*
   }
 
@@ -178,10 +226,47 @@ Node::create_subscription(
     callback,
     msg_mem_strat);
   auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
+  // Setup intra process.
+  if (use_intra_process_comms_) {
+    rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
+      node_handle_.get(), ipm_ts_,
+      (topic_name + "__intra").c_str(), qos_profile, false);
+    if (!subscriber_handle) {
+      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+      throw std::runtime_error(
+        std::string("could not create intra process subscription: ") + rmw_get_error_string_safe());
+      // *INDENT-ON*
+    }
+    auto intra_process_manager =
+      context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
+    rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
+    uint64_t intra_process_subscription_id =
+      intra_process_manager->add_subscription(sub_base_ptr);
+    // *INDENT-OFF*
+    sub->setup_intra_process(
+      intra_process_subscription_id,
+      intra_process_subscriber_handle,
+      [weak_ipm](
+        uint64_t publisher_id,
+        uint64_t message_sequence,
+        uint64_t subscription_id,
+        std::unique_ptr<MessageT> & message)
+      {
+        auto ipm = weak_ipm.lock();
+        if (!ipm) {
+          // TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
+          throw std::runtime_error(
+            "intra process take called after destruction of intra process manager");
+        }
+        ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
+    });
+    // *INDENT-ON*
+  }
+  // Assign to a group.
   if (group) {
     if (!group_in_node(group)) {
       // TODO: use custom exception
-      throw std::runtime_error("Cannot create timer, group not in node.");
+      throw std::runtime_error("Cannot create subscription, group not in node.");
     }
     group->add_subscription(sub_base_ptr);
   } else {
diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp
index 3a8cc386ea..6f0ae14fd7 100644
--- a/rclcpp/include/rclcpp/publisher.hpp
+++ b/rclcpp/include/rclcpp/publisher.hpp
@@ -15,15 +15,18 @@
 #ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_
 #define RCLCPP_RCLCPP_PUBLISHER_HPP_
 
+#include <rclcpp/macros.hpp>
+
 #include <iostream>
 #include <memory>
 #include <sstream>
+#include <string>
 
+#include <rcl_interfaces/msg/intra_process_message.hpp>
+#include <rmw/impl/cpp/demangle.hpp>
 #include <rmw/error_handling.h>
 #include <rmw/rmw.h>
 
-#include <rclcpp/macros.hpp>
-
 namespace rclcpp
 {
 
@@ -38,23 +41,38 @@ namespace publisher
 
 class Publisher
 {
+  friend rclcpp::node::Node;
+
 public:
   RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
 
-  Publisher(std::shared_ptr<rmw_node_t> node_handle, rmw_publisher_t * publisher_handle)
-  : node_handle_(node_handle), publisher_handle_(publisher_handle)
+  Publisher(
+    std::shared_ptr<rmw_node_t> node_handle,
+    rmw_publisher_t * publisher_handle,
+    std::string topic,
+    size_t queue_size)
+  : node_handle_(node_handle), publisher_handle_(publisher_handle),
+    intra_process_publisher_handle_(nullptr),
+    topic_(topic), queue_size_(queue_size),
+    intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
   {}
 
   virtual ~Publisher()
   {
+    if (intra_process_publisher_handle_) {
+      if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
+        fprintf(
+          stderr,
+          "Error in destruction of intra process rmw publisher handle: %s\n",
+          rmw_get_error_string_safe());
+      }
+    }
     if (publisher_handle_) {
       if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) {
-        // *INDENT-OFF*
-        std::stringstream ss;
-        ss << "Error in destruction of rmw publisher handle: "
-           << rmw_get_error_string_safe() << '\n';
-        // *INDENT-ON*
-        (std::cerr << ss.str()).flush();
+        fprintf(
+          stderr,
+          "Error in destruction of rmw publisher handle: %s\n",
+          rmw_get_error_string_safe());
       }
     }
   }
@@ -63,19 +81,70 @@ class Publisher
   void
   publish(std::shared_ptr<MessageT> & msg)
   {
-    rmw_ret_t status = rmw_publish(publisher_handle_, msg.get());
-    if (status != RMW_RET_OK) {
-      // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
-      throw std::runtime_error(
-        std::string("failed to publish message: ") + rmw_get_error_string_safe());
-      // *INDENT-ON*
+    rmw_ret_t status;
+    if (!store_intra_process_message_) {
+      // TODO(wjwwood): for now, make intra process and inter process mutually exclusive.
+      // Later we'll have them together, when we have a way to filter more efficiently.
+      status = rmw_publish(publisher_handle_, msg.get());
+      if (status != RMW_RET_OK) {
+        // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+        throw std::runtime_error(
+          std::string("failed to publish message: ") + rmw_get_error_string_safe());
+        // *INDENT-ON*
+      }
+    }
+    if (store_intra_process_message_) {
+      uint64_t message_seq = store_intra_process_message_(intra_process_publisher_id_, msg);
+      rcl_interfaces::msg::IntraProcessMessage ipm;
+      ipm.publisher_id = intra_process_publisher_id_;
+      ipm.message_sequence = message_seq;
+      status = rmw_publish(intra_process_publisher_handle_, &ipm);
+      if (status != RMW_RET_OK) {
+        // *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
+        throw std::runtime_error(
+          std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
+        // *INDENT-ON*
+      }
     }
   }
 
+  const std::string &
+  get_topic_name() const
+  {
+    return topic_;
+  }
+
+  size_t
+  get_queue_size() const
+  {
+    return queue_size_;
+  }
+
+  typedef std::function<uint64_t(uint64_t, std::shared_ptr<void>)> StoreSharedMessageCallbackT;
+
+protected:
+  void
+  setup_intra_process(
+    uint64_t intra_process_publisher_id,
+    StoreSharedMessageCallbackT callback,
+    rmw_publisher_t * intra_process_publisher_handle)
+  {
+    intra_process_publisher_id_ = intra_process_publisher_id;
+    store_intra_process_message_ = callback;
+    intra_process_publisher_handle_ = intra_process_publisher_handle;
+  }
+
 private:
   std::shared_ptr<rmw_node_t> node_handle_;
 
   rmw_publisher_t * publisher_handle_;
+  rmw_publisher_t * intra_process_publisher_handle_;
+
+  std::string topic_;
+  size_t queue_size_;
+
+  uint64_t intra_process_publisher_id_;
+  StoreSharedMessageCallbackT store_intra_process_message_;
 
 };
 
diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp
index 141747c373..33dc9ffefe 100644
--- a/rclcpp/include/rclcpp/subscription.hpp
+++ b/rclcpp/include/rclcpp/subscription.hpp
@@ -21,6 +21,7 @@
 #include <sstream>
 #include <string>
 
+#include <rcl_interfaces/msg/intra_process_message.hpp>
 #include <rmw/error_handling.h>
 #include <rmw/rmw.h>
 
@@ -36,6 +37,11 @@ namespace executor
 class Executor;
 } // namespace executor
 
+namespace node
+{
+class Node;
+} // namespace node
+
 namespace subscription
 {
 
@@ -51,7 +57,8 @@ class SubscriptionBase
     rmw_subscription_t * subscription_handle,
     const std::string & topic_name,
     bool ignore_local_publications)
-  : node_handle_(node_handle),
+  : intra_process_subscription_handle_(nullptr),
+    node_handle_(node_handle),
     subscription_handle_(subscription_handle),
     topic_name_(topic_name),
     ignore_local_publications_(ignore_local_publications)
@@ -70,6 +77,15 @@ class SubscriptionBase
         (std::cerr << ss.str()).flush();
       }
     }
+    if (intra_process_subscription_handle_) {
+      auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_);
+      if (ret != RMW_RET_OK) {
+        std::stringstream ss;
+        ss << "Error in destruction of rmw intra process subscription handle: " <<
+          rmw_get_error_string_safe() << '\n';
+        (std::cerr << ss.str()).flush();
+      }
+    }
   }
 
   const std::string & get_topic_name() const
@@ -80,6 +96,10 @@ class SubscriptionBase
   virtual std::shared_ptr<void> create_message() = 0;
   virtual void handle_message(std::shared_ptr<void> & message) = 0;
   virtual void return_message(std::shared_ptr<void> & message) = 0;
+  virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm) = 0;
+
+protected:
+  rmw_subscription_t * intra_process_subscription_handle_;
 
 private:
   RCLCPP_DISABLE_COPY(SubscriptionBase);
@@ -87,6 +107,7 @@ class SubscriptionBase
   std::shared_ptr<rmw_node_t> node_handle_;
 
   rmw_subscription_t * subscription_handle_;
+
   std::string topic_name_;
   bool ignore_local_publications_;
 
@@ -95,6 +116,8 @@ class SubscriptionBase
 template<typename MessageT>
 class Subscription : public SubscriptionBase
 {
+  friend class rclcpp::node::Node;
+
 public:
   using CallbackType = std::function<void(const std::shared_ptr<MessageT> &)>;
   RCLCPP_SMART_PTR_DEFINITIONS(Subscription);
@@ -135,13 +158,56 @@ class Subscription : public SubscriptionBase
     message_memory_strategy_->return_message(typed_message);
   }
 
+  void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage & ipm)
+  {
+    if (!get_intra_process_message_callback_) {
+      // throw std::runtime_error(
+      //   "handle_intra_process_message called before setup_intra_process");
+      // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
+      // However, this can only really happen if this node has it disabled, but the other doesn't.
+      return;
+    }
+    std::unique_ptr<MessageT> msg;
+    get_intra_process_message_callback_(
+      ipm.publisher_id,
+      ipm.message_sequence,
+      intra_process_subscription_id_,
+      msg);
+    if (!msg) {
+      // This either occurred because the publisher no longer exists or the
+      // message requested is no longer being stored.
+      // TODO(wjwwood): should we notify someone of this? log error, log warning?
+      return;
+    }
+    typename MessageT::SharedPtr shared_msg = std::move(msg);
+    callback_(shared_msg);
+  }
+
 private:
+  typedef
+    std::function<
+      void (uint64_t, uint64_t, uint64_t, std::unique_ptr<MessageT> &)
+    > GetMessageCallbackType;
+
+  void setup_intra_process(
+    uint64_t intra_process_subscription_id,
+    rmw_subscription_t * intra_process_subscription,
+    GetMessageCallbackType callback)
+  {
+    intra_process_subscription_id_ = intra_process_subscription_id;
+    intra_process_subscription_handle_ = intra_process_subscription;
+    get_intra_process_message_callback_ = callback;
+  }
+
   RCLCPP_DISABLE_COPY(Subscription);
 
   CallbackType callback_;
   typename message_memory_strategy::MessageMemoryStrategy<MessageT>::SharedPtr
   message_memory_strategy_;
 
+  GetMessageCallbackType get_intra_process_message_callback_;
+  uint64_t intra_process_subscription_id_;
+
 };
 
 } /* namespace subscription */
diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp
index c0e067c3ea..9e77976d98 100644
--- a/rclcpp/include/rclcpp/utilities.hpp
+++ b/rclcpp/include/rclcpp/utilities.hpp
@@ -27,6 +27,7 @@
 #include <string.h>
 #include <thread>
 
+#include <rmw/error_handling.h>
 #include <rmw/macros.h>
 #include <rmw/rmw.h>
 
diff --git a/rclcpp/package.xml b/rclcpp/package.xml
index 6962b8c658..50fb4120a0 100644
--- a/rclcpp/package.xml
+++ b/rclcpp/package.xml
@@ -13,6 +13,7 @@
   <build_depend>rcl_interfaces</build_depend>
   <build_export_depend>rcl_interfaces</build_export_depend>
 
+  <test_depend>ament_cmake_gtest</test_depend>
   <test_depend>ament_lint_auto</test_depend>
   <test_depend>ament_lint_common</test_depend>
 
diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp
new file mode 100644
index 0000000000..8a57705768
--- /dev/null
+++ b/rclcpp/test/test_intra_process_manager.cpp
@@ -0,0 +1,788 @@
+// Copyright 2015 Open Source Robotics Foundation, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include <rclcpp/macros.hpp>
+
+// Mock up publisher and subscription base to avoid needing an rmw impl.
+namespace rclcpp
+{
+namespace publisher
+{
+namespace mock
+{
+
+class Publisher
+{
+public:
+  RCLCPP_SMART_PTR_DEFINITIONS(Publisher);
+
+  Publisher()
+  : mock_topic_name(""), mock_queue_size(0) {}
+
+  const std::string & get_topic_name() const
+  {
+    return mock_topic_name;
+  }
+  size_t get_queue_size() const
+  {
+    return mock_queue_size;
+  }
+
+  std::string mock_topic_name;
+  size_t mock_queue_size;
+};
+
+}
+}
+}
+
+namespace rclcpp
+{
+namespace subscription
+{
+namespace mock
+{
+
+class SubscriptionBase
+{
+public:
+  RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionBase);
+
+  SubscriptionBase()
+  : mock_topic_name(""), mock_queue_size(0) {}
+
+  const std::string & get_topic_name() const
+  {
+    return mock_topic_name;
+  }
+  size_t get_queue_size() const
+  {
+    return mock_queue_size;
+  }
+
+  std::string mock_topic_name;
+  size_t mock_queue_size;
+};
+
+}
+}
+}
+
+// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported.
+#define RCLCPP_RCLCPP_PUBLISHER_HPP_
+#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_
+// Force ipm to use our mock publisher class.
+#define Publisher mock::Publisher
+#define SubscriptionBase mock::SubscriptionBase
+#include <rclcpp/intra_process_manager.hpp>
+#undef SubscriptionBase
+#undef Publisher
+
+#include <rcl_interfaces/msg/intra_process_message.hpp>
+
+/*
+   This tests the "normal" usage of the class:
+   - Creates two publishers on different topics.
+   - Creates a subscription which matches one of them.
+   - Publishes on each publisher with different message content.
+   - Try's to take the message from the non-matching publish, should fail.
+   - Try's to take the message from the matching publish, should work.
+   - Asserts the message it got back was the one that went in (since there's only one subscription).
+   - Try's to take the message again, should fail.
+ */
+TEST(TestIntraProcessManager, nominal) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 2;
+
+  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p2->mock_topic_name = "nominal2";
+  p2->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
+  auto s1_id = ipm.add_subscription(s1);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto p1_m1_original_address = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm_msg->message_sequence = 43;
+  ipm_msg->publisher_id = 43;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);  // Should fail since p2 and s1 don't have the same topic.
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_EQ(p1_m1_original_address, unique_msg.get());
+  }
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);  // Should fail, since the message was already taken.
+
+  ipm_msg->message_sequence = 44;
+  ipm_msg->publisher_id = 44;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm_msg->message_sequence = 45;
+  ipm_msg->publisher_id = 45;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm_msg->message_sequence = 46;
+  ipm_msg->publisher_id = 46;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(44ul, unique_msg->message_sequence);
+    EXPECT_EQ(44ul, unique_msg->publisher_id);
+  }
+}
+
+/*
+   Simulates the case where a publisher is removed between publishing and the matching take.
+   - Creates a publisher and subscription on the same topic.
+   - Publishes a message.
+   - Remove the publisher.
+   - Try's to take the message, should fail since the publisher (and its storage) is gone.
+ */
+TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto s1_id = ipm.add_subscription(s1);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm.remove_publisher(p1_id);
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);  // Should fail, since the publisher is gone.
+}
+
+/*
+   Tests whether or not removed subscriptions affect take behavior.
+   - Creates a publisher and three subscriptions on the same topic.
+   - Publish a message, keep the original point for later comparison.
+   - Take with one subscription, should work.
+   - Remove a different subscription.
+   - Take with the final subscription, should work.
+   - Assert the previous take returned ownership of the original object published.
+ */
+TEST(TestIntraProcessManager, removed_subscription_affects_take) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s2->mock_topic_name = "nominal1";
+  s2->mock_queue_size = 10;
+
+  auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s3->mock_topic_name = "nominal1";
+  s3->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto s1_id = ipm.add_subscription(s1);
+  auto s2_id = ipm.add_subscription(s2);
+  auto s3_id = ipm.add_subscription(s3);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto original_message_pointer = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.remove_subscription(s2_id);
+
+  // Take using s3, the remaining subscription.
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    // Should match the original pointer since s2 was removed first.
+    EXPECT_EQ(original_message_pointer, unique_msg.get());
+  }
+
+  // Take using s2, should fail since s2 was removed.
+  unique_msg.reset();
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);
+}
+
+/*
+   This tests normal operation with multiple subscriptions and one publisher.
+   - Creates a publisher and three subscriptions on the same topic.
+   - Publish a message.
+   - Take with each subscription, checking that the last takes the original back.
+ */
+TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s2->mock_topic_name = "nominal1";
+  s2->mock_queue_size = 10;
+
+  auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s3->mock_topic_name = "nominal1";
+  s3->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto s1_id = ipm.add_subscription(s1);
+  auto s2_id = ipm.add_subscription(s2);
+  auto s3_id = ipm.add_subscription(s3);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto original_message_pointer = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    // Should match the original pointer.
+    EXPECT_EQ(original_message_pointer, unique_msg.get());
+  }
+}
+
+/*
+   This tests normal operation with multiple publishers and one subscription.
+   - Creates a publisher and three subscriptions on the same topic.
+   - Publish a message.
+   - Take with each subscription, checking that the last takes the original back.
+ */
+TEST(TestIntraProcessManager, multiple_publishers_one_subscription) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 10;
+
+  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p2->mock_topic_name = "nominal1";
+  p2->mock_queue_size = 10;
+
+  auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p3->mock_topic_name = "nominal1";
+  p3->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
+  auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
+  auto s1_id = ipm.add_subscription(s1);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  // First publish
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto original_message_pointer1 = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // Second publish
+  ipm_msg->message_sequence = 43;
+  ipm_msg->publisher_id = 43;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto original_message_pointer2 = unique_msg.get();
+  auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // Third publish
+  ipm_msg->message_sequence = 44;
+  ipm_msg->publisher_id = 44;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto original_message_pointer3 = unique_msg.get();
+  auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // First take
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer1, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  // Second take
+  ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(43ul, unique_msg->message_sequence);
+    EXPECT_EQ(43ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer2, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  // Third take
+  ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(44ul, unique_msg->message_sequence);
+    EXPECT_EQ(44ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer3, unique_msg.get());
+  }
+  unique_msg.reset();
+}
+
+/*
+   This tests normal operation with multiple publishers and subscriptions.
+   - Creates three publishers and three subscriptions on the same topic.
+   - Publish a message on each publisher.
+   - Take from each publisher with each subscription, checking the pointer.
+ */
+TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 10;
+
+  auto p2 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p2->mock_topic_name = "nominal1";
+  p2->mock_queue_size = 10;
+
+  auto p3 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p3->mock_topic_name = "nominal1";
+  p3->mock_queue_size = 10;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto s2 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s2->mock_topic_name = "nominal1";
+  s2->mock_queue_size = 10;
+
+  auto s3 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s3->mock_topic_name = "nominal1";
+  s3->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto p2_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p2);
+  auto p3_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p3);
+  auto s1_id = ipm.add_subscription(s1);
+  auto s2_id = ipm.add_subscription(s2);
+  auto s3_id = ipm.add_subscription(s3);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  // First publish
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto original_message_pointer1 = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // Second publish
+  ipm_msg->message_sequence = 43;
+  ipm_msg->publisher_id = 43;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto original_message_pointer2 = unique_msg.get();
+  auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // Third publish
+  ipm_msg->message_sequence = 44;
+  ipm_msg->publisher_id = 44;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto original_message_pointer3 = unique_msg.get();
+  auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  // First take
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer1, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s2_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer1, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s3_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer1, unique_msg.get());  // Final take.
+  }
+  unique_msg.reset();
+
+  // Second take
+  ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(43ul, unique_msg->message_sequence);
+    EXPECT_EQ(43ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer2, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p2_id, p2_m1_id, s2_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(43ul, unique_msg->message_sequence);
+    EXPECT_EQ(43ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer2, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p2_id, p2_m1_id, s3_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(43ul, unique_msg->message_sequence);
+    EXPECT_EQ(43ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer2, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  // Third take
+  ipm.take_intra_process_message(p3_id, p3_m1_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(44ul, unique_msg->message_sequence);
+    EXPECT_EQ(44ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer3, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p3_id, p3_m1_id, s2_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(44ul, unique_msg->message_sequence);
+    EXPECT_EQ(44ul, unique_msg->publisher_id);
+    EXPECT_NE(original_message_pointer3, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm.take_intra_process_message(p3_id, p3_m1_id, s3_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(44ul, unique_msg->message_sequence);
+    EXPECT_EQ(44ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer3, unique_msg.get());
+  }
+  unique_msg.reset();
+}
+
+/*
+   Tests displacing a message from the ring buffer before take is called.
+   - Creates a publisher (buffer_size = 2) and a subscription on the same topic.
+   - Publish a message on the publisher.
+   - Publish another message.
+   - Take the second message.
+   - Publish a message.
+   - Try to take the first message, should fail.
+ */
+TEST(TestIntraProcessManager, ring_buffer_displacement) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 2;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  auto s1_id = ipm.add_subscription(s1);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto original_message_pointer1 = unique_msg.get();
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm_msg->message_sequence = 43;
+  ipm_msg->publisher_id = 43;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  auto original_message_pointer2 = unique_msg.get();
+  auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);
+  if (unique_msg) {
+    EXPECT_EQ(43ul, unique_msg->message_sequence);
+    EXPECT_EQ(43ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer2, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  ipm_msg->message_sequence = 44;
+  ipm_msg->publisher_id = 44;
+  unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg));
+
+  ipm.store_intra_process_message(p1_id, unique_msg);
+  EXPECT_NE(nullptr, unique_msg);  // Should return the thing in the ring buffer it displaced.
+  if (unique_msg) {
+    // This should have been the first published message.
+    EXPECT_EQ(42ul, unique_msg->message_sequence);
+    EXPECT_EQ(42ul, unique_msg->publisher_id);
+    EXPECT_EQ(original_message_pointer1, unique_msg.get());
+  }
+  unique_msg.reset();
+
+  // Since it just got displaced it should no longer be there to take.
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);
+}
+
+/*
+   Simulates race condition where a subscription is created after publish.
+   - Creates a publisher.
+   - Publish a message on the publisher.
+   - Create a subscription on the same topic.
+   - Try to take the message with the newly created subscription, should fail.
+ */
+TEST(TestIntraProcessManager, subscription_creation_race_condition) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+  p1->mock_topic_name = "nominal1";
+  p1->mock_queue_size = 2;
+
+  auto p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+  ASSERT_EQ(nullptr, unique_msg);
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto s1_id = ipm.add_subscription(s1);
+
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);
+}
+
+/*
+   Simulates race condition where a publisher goes out of scope before take.
+   - Create a subscription.
+   - Creates a publisher on the same topic in a scope.
+   - Publish a message on the publisher in a scope.
+   - Let the scope expire.
+   - Try to take the message with the subscription, should fail.
+ */
+TEST(TestIntraProcessManager, publisher_out_of_scope_take) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  auto s1 = std::make_shared<rclcpp::subscription::mock::SubscriptionBase>();
+  s1->mock_topic_name = "nominal1";
+  s1->mock_queue_size = 10;
+
+  auto s1_id = ipm.add_subscription(s1);
+
+  uint64_t p1_id;
+  uint64_t p1_m1_id;
+  {
+    auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+    p1->mock_topic_name = "nominal1";
+    p1->mock_queue_size = 2;
+
+    p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+
+    auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+    ipm_msg->message_sequence = 42;
+    ipm_msg->publisher_id = 42;
+    rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+      new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+    );
+
+    p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg);
+    ASSERT_EQ(nullptr, unique_msg);
+
+    // Explicitly remove publisher from ipm (emulate's publisher's destructor).
+    ipm.remove_publisher(p1_id);
+  }
+
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(nullptr);
+  // Should fail because the publisher is out of scope.
+  ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg);
+  EXPECT_EQ(nullptr, unique_msg);
+}
+
+/*
+   Simulates race condition where a publisher goes out of scope before store.
+   - Creates a publisher in a scope.
+   - Let the scope expire.
+   - Publish a message on the publisher in a scope, should throw.
+ */
+TEST(TestIntraProcessManager, publisher_out_of_scope_store) {
+  rclcpp::intra_process_manager::IntraProcessManager ipm;
+
+  uint64_t p1_id;
+  {
+    auto p1 = std::make_shared<rclcpp::publisher::mock::Publisher>();
+    p1->mock_topic_name = "nominal1";
+    p1->mock_queue_size = 2;
+
+    p1_id = ipm.add_publisher<rcl_interfaces::msg::IntraProcessMessage>(p1);
+  }
+
+  auto ipm_msg = std::make_shared<rcl_interfaces::msg::IntraProcessMessage>();
+  ipm_msg->message_sequence = 42;
+  ipm_msg->publisher_id = 42;
+  rcl_interfaces::msg::IntraProcessMessage::UniquePtr unique_msg(
+    new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)
+  );
+
+  EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error);
+  ASSERT_EQ(nullptr, unique_msg);
+}
diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp
new file mode 100644
index 0000000000..3cb6128515
--- /dev/null
+++ b/rclcpp/test/test_mapped_ring_buffer.cpp
@@ -0,0 +1,172 @@
+// Copyright 2015 Open Source Robotics Foundation, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gtest/gtest.h>
+
+#include <rclcpp/mapped_ring_buffer.hpp>
+
+/*
+   Tests get_copy and pop on an empty mrb.
+ */
+TEST(TestMappedRingBuffer, empty) {
+  // Cannot create a buffer of size zero.
+  EXPECT_THROW(rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(0), std::invalid_argument);
+  // Getting or popping an empty buffer should result in a nullptr.
+  rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(1);
+
+  std::unique_ptr<char> actual;
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+
+  mrb.pop_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+}
+
+/*
+   Tests push_and_replace with a temporary object.
+ */
+TEST(TestMappedRingBuffer, temporary_l_value) {
+  rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
+  // Pass in value with temporary object
+  mrb.push_and_replace(1, std::unique_ptr<char>(new char('a')));
+
+  std::unique_ptr<char> actual;
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ('a', *actual);
+
+  mrb.pop_at_key(1, actual);
+  EXPECT_EQ('a', *actual);
+
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+}
+
+/*
+   Tests normal usage of the mrb.
+ */
+TEST(TestMappedRingBuffer, nominal) {
+  rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
+  std::unique_ptr<char> expected(new char('a'));
+  // Store expected value's address for later comparison.
+  char * expected_orig = expected.get();
+
+  EXPECT_FALSE(mrb.push_and_replace(1, expected));
+
+  std::unique_ptr<char> actual;
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);
+  }
+  EXPECT_NE(expected_orig, actual.get());
+
+  mrb.pop_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);
+  }
+  EXPECT_EQ(expected_orig, actual.get());
+
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+
+  expected.reset(new char('a'));
+  EXPECT_FALSE(mrb.push_and_replace(1, expected));
+
+  expected.reset(new char('b'));
+  EXPECT_FALSE(mrb.push_and_replace(2, expected));
+
+  expected.reset(new char('c'));
+  EXPECT_TRUE(mrb.push_and_replace(3, expected));
+
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+
+  mrb.get_copy_at_key(2, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('b', *actual);
+  }
+
+  mrb.get_copy_at_key(3, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('c', *actual);
+  }
+}
+
+/*
+   Tests get_ownership on a normal mrb.
+ */
+TEST(TestMappedRingBuffer, get_ownership) {
+  rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
+  std::unique_ptr<char> expected(new char('a'));
+  // Store expected value's address for later comparison.
+  char * expected_orig = expected.get();
+
+  EXPECT_FALSE(mrb.push_and_replace(1, expected));
+
+  std::unique_ptr<char> actual;
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);
+  }
+  EXPECT_NE(expected_orig, actual.get());
+
+  mrb.get_ownership_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);
+  }
+  EXPECT_EQ(expected_orig, actual.get());
+
+  mrb.pop_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);  // The value should be the same.
+  }
+  EXPECT_NE(expected_orig, actual.get());  // Even though we pop'ed, we didn't get the original.
+
+  mrb.get_copy_at_key(1, actual);
+  EXPECT_EQ(nullptr, actual);
+}
+
+/*
+   Tests the affect of reusing keys (non-unique keys) in a mrb.
+ */
+TEST(TestMappedRingBuffer, non_unique_keys) {
+  rclcpp::mapped_ring_buffer::MappedRingBuffer<char> mrb(2);
+
+  std::unique_ptr<char> input(new char('a'));
+  mrb.push_and_replace(1, input);
+  input.reset(new char('b'));
+
+  // Different value, same key.
+  mrb.push_and_replace(1, input);
+
+  std::unique_ptr<char> actual;
+  mrb.pop_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('a', *actual);
+  }
+
+  actual = nullptr;
+  mrb.pop_at_key(1, actual);
+  EXPECT_NE(nullptr, actual);
+  if (actual) {
+    EXPECT_EQ('b', *actual);
+  }
+}