|
@@ -451,11 +451,10 @@ class CollaborativeOptimizer(DecentralizedOptimizerBase):
|
|
|
|
|
|
for state in valid_peer_states:
|
|
|
total_samples_per_second += state.samples_per_second
|
|
|
- delta_time = current_time - state.time
|
|
|
- if state.step >= global_optimizer_step - self.step_tolerance and delta_time > self.staleness_timeout:
|
|
|
+ if state.step >= global_optimizer_step and current_time - state.time < self.staleness_timeout:
|
|
|
total_samples_accumulated += state.samples_accumulated
|
|
|
estimated_current_samples += (
|
|
|
- state.samples_accumulated + max(0, delta_time) * state.samples_per_second
|
|
|
+ state.samples_accumulated + max(0, current_time - state.time) * state.samples_per_second
|
|
|
)
|
|
|
# note: we deliberately count only valid peers for samples_accumulated, but all peers for performance;
|
|
|
# the rationale behind this is that outdated peers will synchronize and begin contributing shortly.
|