WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/bthread/bthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "bthread/timer_thread.h"
#include "bthread/list_of_abafree_id.h"
#include "bthread/bthread.h"
#include "bthread/worker_idle.h"

namespace bthread {
extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
Expand Down Expand Up @@ -597,6 +598,17 @@ int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
return 0;
}

int bthread_register_worker_idle_function(int (*init_fn)(void),
bool (*idle_fn)(void),
uint64_t timeout_us,
int* handle) {
return bthread::register_worker_idle_function(init_fn, idle_fn, timeout_us, handle);
}

int bthread_unregister_worker_idle_function(int handle) {
return bthread::unregister_worker_idle_function(handle);
}

int bthread_set_create_span_func(void* (*func)()) {
if (func == NULL) {
return EINVAL;
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/parking_lot.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ class BAIDU_CACHELINE_ALIGNMENT ParkingLot {

// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
void wait(const State& expected_state) {
void wait(const State& expected_state, const timespec* timeout = NULL) {
if (get_state().val != expected_state.val) {
// Fast path, no need to futex_wait.
return;
}
if (_no_signal_when_no_waiter) {
_waiter_num.fetch_add(1, butil::memory_order_relaxed);
}
futex_wait_private(&_pending_signal, expected_state.val, NULL);
futex_wait_private(&_pending_signal, expected_state.val, timeout);
if (_no_signal_when_no_waiter) {
_waiter_num.fetch_sub(1, butil::memory_order_relaxed);
}
Expand Down
11 changes: 9 additions & 2 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"
#include "bthread/bthread.h"
#include "bthread/worker_idle.h"

#ifdef __x86_64__
#include <x86intrin.h>
Expand Down Expand Up @@ -167,7 +168,10 @@ bool TaskGroup::wait_task(bthread_t* tid) {
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
run_worker_idle_functions();
const timespec timeout = get_worker_idle_timeout();
const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0);
_pl->wait(_last_pl_state, empty_time ? NULL : &timeout);
if (steal_task(tid)) {
return true;
}
Expand All @@ -176,10 +180,13 @@ bool TaskGroup::wait_task(bthread_t* tid) {
if (st.stopped()) {
return false;
}
run_worker_idle_functions();
if (steal_task(tid)) {
return true;
}
_pl->wait(st);
const timespec timeout = get_worker_idle_timeout();
const bool empty_time = (timeout.tv_sec == 0 && timeout.tv_nsec == 0);
_pl->wait(st, empty_time ? NULL : &timeout);
#endif
} while (true);
}
Expand Down
40 changes: 40 additions & 0 deletions src/bthread/unstable.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,46 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
// Add a startup function with tag
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));

// Registers a per-worker init function and an idle function.
//
// The init function is called at most once per worker thread, before the first
// invocation of idle_fn in that worker.
//
// The idle function is called when a worker has no task to run.
// The return value of idle_fn is ignored.
// If no idle function is registered, the worker waits indefinitely. Otherwise
// the worker waits for at most the minimal timeout among registered functions
// before trying again.
//
// This function is thread-safe.
//
// Args:
// init_fn: Optional. Called once per worker thread. Return 0 on success. A
// non-zero return value disables idle_fn for that worker thread.
// idle_fn: Required. Must not be NULL. Return true if any work is done.
// timeout_us: Required. Must be > 0. Maximum waiting time when worker is idle.
// handle: Optional output. On success, set to a positive handle for later
// unregistration.
//
// Returns:
// 0 on success, error code otherwise.
extern int bthread_register_worker_idle_function(int (*init_fn)(void),
bool (*idle_fn)(void),
uint64_t timeout_us,
int* handle);

// Unregisters an idle function by handle returned by
// bthread_register_worker_idle_function().
//
// This function is thread-safe.
//
// Args:
// handle: Handle returned by bthread_register_worker_idle_function().
//
// Returns:
// 0 on success, error code otherwise.
extern int bthread_unregister_worker_idle_function(int handle);

// Add a create span function
extern int bthread_set_create_span_func(void* (*func)());

Expand Down
188 changes: 188 additions & 0 deletions src/bthread/worker_idle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "bthread/worker_idle.h"

#include <errno.h>

#include <algorithm>
#include <new>
#include <vector>

#include "butil/atomicops.h"
#include "butil/containers/doubly_buffered_data.h"
#include "butil/time.h"
#include "butil/thread_local.h"

namespace bthread {
namespace {

enum InitState : uint8_t {
INIT_STATE_NOT_RUN = 0,
INIT_STATE_OK = 1,
INIT_STATE_FAILED = 2,
};

struct WorkerIdleEntry {
int id;
int (*init_fn)(void);
bool (*idle_fn)(void);
uint64_t timeout_us;
};

typedef std::vector<WorkerIdleEntry> WorkerIdleEntryList;

static butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true> g_entries;
static butil::atomic<int> g_next_id(1);

struct WorkerIdleTLS {
std::vector<uint8_t> init_states;
};

BAIDU_THREAD_LOCAL WorkerIdleTLS* tls_worker_idle = NULL;

static WorkerIdleTLS* get_or_create_tls() {
if (tls_worker_idle) {
return tls_worker_idle;
}
tls_worker_idle = new (std::nothrow) WorkerIdleTLS;
return tls_worker_idle;
}

} // namespace

int register_worker_idle_function(int (*init_fn)(void),
bool (*idle_fn)(void),
uint64_t timeout_us,
int* handle) {
if (idle_fn == NULL) {
return EINVAL;
}
if (timeout_us == 0) {
return EINVAL;
}
const int id = g_next_id.fetch_add(1, butil::memory_order_relaxed);
WorkerIdleEntry e;
e.id = id;
e.init_fn = init_fn;
e.idle_fn = idle_fn;
e.timeout_us = timeout_us;
g_entries.Modify([&](WorkerIdleEntryList& bg) {
bg.push_back(e);
return static_cast<size_t>(1);
});
if (handle) {
*handle = id;
}
return 0;
}

int unregister_worker_idle_function(int handle) {
if (handle <= 0) {
return EINVAL;
}
size_t removed = g_entries.Modify([&](WorkerIdleEntryList& bg) {
const size_t old_size = bg.size();
bg.erase(std::remove_if(bg.begin(), bg.end(),
[&](const WorkerIdleEntry& e) {
return e.id == handle;
}),
bg.end());
return old_size - bg.size();
});
return removed ? 0 : EINVAL;
}

bool has_worker_idle_functions() {
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
if (g_entries.Read(&p) != 0) {
return false;
}
return !p->empty();
}

void run_worker_idle_functions() {
if (!has_worker_idle_functions()) {
return;
}
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
if (g_entries.Read(&p) != 0) {
return;
}
if (p->empty()) {
return;
}

WorkerIdleTLS* tls = get_or_create_tls();
if (tls == NULL) {
return;
}

// Step 1: Ensure per-worker init is called at most once for each entry.
// Step 2: Run idle callbacks for initialized entries.
// Step 3: Ignore callback return values. The caller decides how to proceed.
for (const auto& e : *p) {
if (e.id <= 0 || e.idle_fn == NULL) {
continue;
}
if (tls->init_states.size() <= static_cast<size_t>(e.id)) {
tls->init_states.resize(static_cast<size_t>(e.id) + 1, INIT_STATE_NOT_RUN);
}
uint8_t& st = tls->init_states[static_cast<size_t>(e.id)];
if (st == INIT_STATE_NOT_RUN) {
// Run the init callback function once.
if (e.init_fn) {
const int rc = e.init_fn();
st = (rc == 0) ? INIT_STATE_OK : INIT_STATE_FAILED;
} else {
st = INIT_STATE_OK;
}
}
if (st != INIT_STATE_OK) {
continue;
}
// Run the idle callback function.
e.idle_fn();
}
}

timespec get_worker_idle_timeout() {
butil::DoublyBufferedData<WorkerIdleEntryList, butil::Void, true>::ScopedPtr p;
if (g_entries.Read(&p) != 0) {
return {0, 0};
}
if (p->empty()) {
return {0, 0};
}
uint64_t min_us = 0;
for (const auto& e : *p) {
if (e.timeout_us == 0) {
continue;
}
if (min_us == 0 || e.timeout_us < min_us) {
min_us = e.timeout_us;
}
}
if (min_us == 0) {
return {0, 0};
}
return butil::microseconds_to_timespec(min_us);
}

} // namespace bthread


67 changes: 67 additions & 0 deletions src/bthread/worker_idle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BTHREAD_WORKER_IDLE_H
#define BTHREAD_WORKER_IDLE_H

#include <stdint.h>
#include <time.h>

namespace bthread {

// Registers a per-worker init function and an idle function.
//
// The init function is called at most once per worker thread, before running
// the idle function in that worker thread.
//
// Args:
// init_fn: Optional. Can be NULL.
// idle_fn: Required. Must not be NULL.
// timeout_us: Required. Must be > 0.
// handle: Optional output handle for unregistering later.
//
// Returns:
// 0 on success, error code otherwise.
int register_worker_idle_function(int (*init_fn)(void),
bool (*idle_fn)(void),
uint64_t timeout_us,
int* handle);

// Unregisters a previously registered idle function by handle.
//
// Args:
// handle: Handle returned by register_worker_idle_function().
//
// Returns:
// 0 on success, error code otherwise.
int unregister_worker_idle_function(int handle);

// Returns true if any idle function is registered.
bool has_worker_idle_functions();

// Runs all registered idle functions for current worker thread.
void run_worker_idle_functions();

// Get the minimal timeout among all registered functions.
// Returns {0,0} if no idle function is registered.
timespec get_worker_idle_timeout();

} // namespace bthread

#endif // BTHREAD_WORKER_IDLE_H


Loading