程序员的知识教程库

网站首页 > 教程分享 正文

聊聊flink的slot.request.timeout配置

henian88 2024-09-01 18:02:29 教程分享 2 ℃ 0 评论

本文主要研究一下flink的slot.request.timeout配置

JobManagerOptions

flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
 //......
?
 /**
 * The timeout in milliseconds for requesting a slot from Slot Pool.
 */
 public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
 key("slot.request.timeout")
 .defaultValue(5L * 60L * 1000L)
 .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool.");
?
 //......
}
  • slot.request.timeout默认为5分钟

SlotManagerConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java

public class SlotManagerConfiguration {
?
 private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class);
?
 private final Time taskManagerRequestTimeout;
 private final Time slotRequestTimeout;
 private final Time taskManagerTimeout;
?
 public SlotManagerConfiguration(
 Time taskManagerRequestTimeout,
 Time slotRequestTimeout,
 Time taskManagerTimeout) {
 this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
 this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
 this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
 }
?
 public Time getTaskManagerRequestTimeout() {
 return taskManagerRequestTimeout;
 }
?
 public Time getSlotRequestTimeout() {
 return slotRequestTimeout;
 }
?
 public Time getTaskManagerTimeout() {
 return taskManagerTimeout;
 }
?
 public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
 final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT);
 final Time rpcTimeout;
?
 try {
 rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
 } catch (NumberFormatException e) {
 throw new ConfigurationException("Could not parse the resource manager's timeout " +
 "value " + AkkaOptions.ASK_TIMEOUT + '.', e);
 }
?
 final Time slotRequestTimeout = getSlotRequestTimeout(configuration);
 final Time taskManagerTimeout = Time.milliseconds(
 configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT));
?
 return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout);
 }
?
 private static Time getSlotRequestTimeout(final Configuration configuration) {
 final long slotRequestTimeoutMs;
 if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) {
 LOGGER.warn("Config key {} is deprecated; use {} instead.",
 ResourceManagerOptions.SLOT_REQUEST_TIMEOUT,
 JobManagerOptions.SLOT_REQUEST_TIMEOUT);
 slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT);
 } else {
 slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
 }
 return Time.milliseconds(slotRequestTimeoutMs);
 }
}
  • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT

SlotManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java

public class SlotManager implements AutoCloseable {
 private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
?
 /** Scheduled executor for timeouts. */
 private final ScheduledExecutor scheduledExecutor;
?
 /** Timeout for slot requests to the task manager. */
 private final Time taskManagerRequestTimeout;
?
 /** Timeout after which an allocation is discarded. */
 private final Time slotRequestTimeout;
?
 /** Timeout after which an unused TaskManager is released. */
 private final Time taskManagerTimeout;
?
 /** Map for all registered slots. */
 private final HashMap<SlotID, TaskManagerSlot> slots;
?
 /** Index of all currently free slots. */
 private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
?
 /** All currently registered task managers. */
 private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
?
 /** Map of fulfilled and active allocations for request deduplication purposes. */
 private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
?
 /** Map of pending/unfulfilled slot allocation requests. */
 private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
?
 private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
?
 /** ResourceManager's id. */
 private ResourceManagerId resourceManagerId;
?
 /** Executor for future callbacks which have to be "synchronized". */
 private Executor mainThreadExecutor;
?
 /** Callbacks for resource (de-)allocations. */
 private ResourceActions resourceActions;
?
 private ScheduledFuture<?> taskManagerTimeoutCheck;
?
 private ScheduledFuture<?> slotRequestTimeoutCheck;
?
 /** True iff the component has been started. */
 private boolean started;
?
 public SlotManager(
 ScheduledExecutor scheduledExecutor,
 Time taskManagerRequestTimeout,
 Time slotRequestTimeout,
 Time taskManagerTimeout) {
 this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
 this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
 this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
 this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
?
 slots = new HashMap<>(16);
 freeSlots = new LinkedHashMap<>(16);
 taskManagerRegistrations = new HashMap<>(4);
 fulfilledSlotRequests = new HashMap<>(16);
 pendingSlotRequests = new HashMap<>(16);
 pendingSlots = new HashMap<>(16);
?
 resourceManagerId = null;
 resourceActions = null;
 mainThreadExecutor = null;
 taskManagerTimeoutCheck = null;
 slotRequestTimeoutCheck = null;
?
 started = false;
 }
?
 public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
 LOG.info("Starting the SlotManager.");
?
 this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
 mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
 resourceActions = Preconditions.checkNotNull(newResourceActions);
?
 started = true;
?
 taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
 () -> mainThreadExecutor.execute(
 () -> checkTaskManagerTimeouts()),
 0L,
 taskManagerTimeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
?
 slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
 () -> mainThreadExecutor.execute(
 () -> checkSlotRequestTimeouts()),
 0L,
 slotRequestTimeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
 }
?
 /**
 * Suspends the component. This clears the internal state of the slot manager.
 */
 public void suspend() {
 LOG.info("Suspending the SlotManager.");
?
 // stop the timeout checks for the TaskManagers and the SlotRequests
 if (taskManagerTimeoutCheck != null) {
 taskManagerTimeoutCheck.cancel(false);
 taskManagerTimeoutCheck = null;
 }
?
 if (slotRequestTimeoutCheck != null) {
 slotRequestTimeoutCheck.cancel(false);
 slotRequestTimeoutCheck = null;
 }
?
 for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
 cancelPendingSlotRequest(pendingSlotRequest);
 }
?
 pendingSlotRequests.clear();
?
 ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());
?
 for (InstanceID registeredTaskManager : registeredTaskManagers) {
 unregisterTaskManager(registeredTaskManager);
 }
?
 resourceManagerId = null;
 resourceActions = null;
 started = false;
 }
?
 public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
 checkInit();
?
 if (checkDuplicateRequest(slotRequest.getAllocationId())) {
 LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
?
 return false;
 } else {
 PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
?
 pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
?
 try {
 internalRequestSlot(pendingSlotRequest);
 } catch (ResourceManagerException e) {
 // requesting the slot failed --> remove pending slot request
 pendingSlotRequests.remove(slotRequest.getAllocationId());
?
 throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
 }
?
 return true;
 }
 }
?
 private void checkSlotRequestTimeouts() {
 if (!pendingSlotRequests.isEmpty()) {
 long currentTime = System.currentTimeMillis();
?
 Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
?
 while (slotRequestIterator.hasNext()) {
 PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();
?
 if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
 slotRequestIterator.remove();
?
 if (slotRequest.isAssigned()) {
 cancelPendingSlotRequest(slotRequest);
 }
?
 resourceActions.notifyAllocationFailure(
 slotRequest.getJobId(),
 slotRequest.getAllocationId(),
 new TimeoutException("The allocation could not be fulfilled in time."));
 }
 }
 }
 }
?
 //......
?
}
  • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
  • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException
  • checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

小结

  • SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout默认为5分钟
  • SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
  • registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException;checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure

doc

  • slot-request-timeout

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表