SessionOptions options;
options.config = req->config();
+ std::vector<string> filtered_worker_list;
+ DeviceFinder::GetRemoteWorkers(req->config().device_filters(), env_,
+ worker_cache, &filtered_worker_list);
+
MasterSession* session = env_->master_session_factory(
options, env_, std::move(remote_devices), std::move(worker_cache_ptr),
- std::move(device_set));
+ std::move(device_set), std::move(filtered_worker_list));
GraphDef* gdef =
const_cast<CreateSessionRequest*>(req)->mutable_graph_def();
SessionOptions, MasterEnv*,
std::unique_ptr<std::vector<std::unique_ptr<Device>>>,
std::unique_ptr<WorkerCacheInterface>,
- std::unique_ptr<DeviceSet> device_set)>
+ std::unique_ptr<DeviceSet> device_set,
+ std::vector<string> filtered_worker_list)>
master_session_factory;
std::function<Status(const WorkerCacheFactoryOptions&,
if (!s.ok()) {
for (Part& part : partitions_) {
worker_cache_->ReleaseWorker(part.name, part.worker);
+ part.worker = nullptr;
}
return s;
}
std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs,
std::unique_ptr<WorkerCacheInterface> worker_cache,
std::unique_ptr<DeviceSet> device_set,
+ std::vector<string> filtered_worker_list,
StatsPublisherFactory stats_publisher_factory)
: session_opts_(opt),
env_(env),
remote_devs_(std::move(remote_devs)),
worker_cache_(std::move(worker_cache)),
devices_(std::move(device_set)),
+ filtered_worker_list_(std::move(filtered_worker_list)),
stats_publisher_factory_(std::move(stats_publisher_factory)),
graph_version_(0),
run_graphs_(5),
Status MasterSession::CreateWorkerSessions(
const WorkerCacheFactoryOptions& options) {
- std::vector<string> worker_names;
+ const std::vector<string> worker_names = filtered_worker_list_;
WorkerCacheInterface* worker_cache = get_worker_cache();
- worker_cache->ListWorkers(&worker_names);
struct WorkerGroup {
// The worker name. (Not owned.)
Status MasterSession::DeleteWorkerSessions() {
WorkerCacheInterface* worker_cache = get_worker_cache();
- std::vector<string> worker_names;
- worker_cache->ListWorkers(&worker_names);
+ const std::vector<string>& worker_names = filtered_worker_list_;
struct WorkerGroup {
// The worker name. (Not owned.)
std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs,
std::unique_ptr<WorkerCacheInterface> worker_cache,
std::unique_ptr<DeviceSet> device_set,
+ std::vector<string> filtered_worker_list,
StatsPublisherFactory stats_publisher_factory);
// Initialize the MasterSession for "def". Must be called before Extend(),
// The device set used by this session.
std::unique_ptr<DeviceSet> devices_;
+ // The (partial device) names of remote worker tasks that this
+ // session will contact.
+ const std::vector<string> filtered_worker_list_;
+
StatsPublisherFactory stats_publisher_factory_;
std::atomic_ulong last_access_time_usec_;
// workers.
Status CreateWorkerSessions(const WorkerCacheFactoryOptions& server_def);
- // TODO(b/36574172): Always use Create/DeleteWorkerSession.
bool should_delete_worker_sessions_ = false;
Status DeleteWorkerSessions();
SessionOptions options, const MasterEnv* env,
std::unique_ptr<std::vector<std::unique_ptr<Device>>> remote_devs,
std::unique_ptr<WorkerCacheInterface> worker_cache,
- std::unique_ptr<DeviceSet> device_set) {
+ std::unique_ptr<DeviceSet> device_set,
+ std::vector<string> filtered_worker_list) {
options.config.MergeFrom(config);
return new MasterSession(options, env, std::move(remote_devs),
std::move(worker_cache), std::move(device_set),
+ std::move(filtered_worker_list),
stats_factory);
};
master_env_.worker_cache_factory =